You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/06/03 03:47:02 UTC
incubator-kylin git commit: KYLIN-810, LongSerializer not thread-safe,
causes incorrect count() result
Repository: incubator-kylin
Updated Branches:
refs/heads/0.8.0 2e6d27c89 -> b75a24006
KYLIN-810, LongSerializer not thread-safe, causes incorrect count() result
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/b75a2400
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/b75a2400
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/b75a2400
Branch: refs/heads/0.8.0
Commit: b75a240062aa4205d13f33e4af2881cd1a424f56
Parents: 2e6d27c
Author: Yang Li <li...@apache.org>
Authored: Wed Jun 3 09:46:42 2015 +0800
Committer: Yang Li <li...@apache.org>
Committed: Wed Jun 3 09:46:52 2015 +0800
----------------------------------------------------------------------
.../apache/kylin/common/util/HadoopUtil.java | 10 +--
.../kylin/job/inmemcubing/InMemCubeBuilder.java | 68 ++++++++++++++++----
.../inmemcubing/ConcurrentDiskStoreTest.java | 2 +-
.../serializer/BigDecimalSerializer.java | 7 +-
.../metadata/serializer/DateTimeSerializer.java | 25 +++++--
.../metadata/serializer/DoubleSerializer.java | 27 +++++---
.../metadata/serializer/HLLCSerializer.java | 34 +++++++---
.../metadata/serializer/LongSerializer.java | 27 +++++---
.../java/org/apache/kylin/rest/DebugTomcat.java | 3 +-
.../kylin/storage/cube/CubeCodeSystem.java | 18 +++++-
.../storage/gridtable/GTAggregateScanner.java | 63 +++++++++++++-----
11 files changed, 210 insertions(+), 74 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b75a2400/common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java b/common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
index 2486d6c..8ee1440 100644
--- a/common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
+++ b/common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
@@ -62,11 +62,13 @@ public class HadoopUtil {
throw new IllegalArgumentException("Cannot create FileSystem from URI: " + filePath, e);
}
}
-
+
public static String fixWindowsPath(String path) {
// fix windows path
if (path.startsWith("file://") && !path.startsWith("file:///") && path.contains(":\\")) {
path = path.replace("file://", "file:///");
+ }
+ if (path.startsWith("file:///")) {
path = path.replace('\\', '/');
}
return path;
@@ -134,8 +136,6 @@ public class HadoopUtil {
String znodePath = m.group(3);
conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, znodePath);
-
-
return conf;
}
@@ -148,7 +148,7 @@ public class HadoopUtil {
int cut = table.indexOf('.');
String database = cut >= 0 ? table.substring(0, cut).trim() : "DEFAULT";
String tableName = cut >= 0 ? table.substring(cut + 1).trim() : table.trim();
-
- return new String[] {database, tableName};
+
+ return new String[] { database, tableName };
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b75a2400/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java b/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
index 9e98976..a8e6d02 100644
--- a/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
@@ -22,14 +22,17 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.SortedMap;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
import org.apache.kylin.common.util.ByteArray;
@@ -93,11 +96,13 @@ public class InMemCubeBuilder implements Runnable {
private AtomicInteger taskCuboidCompleted;
private CuboidResult baseResult;
- private TreeMap<Long, CuboidResult> outputPending;
+ private SortedMap<Long, CuboidResult> outputPending;
private Thread outputThread;
private Throwable outputThreadException;
private int outputCuboidExpected;
+ private Object[] totalSumForSanityCheck;
+
public InMemCubeBuilder(BlockingQueue<List<String>> queue, CubeDesc cubeDesc, Map<TblColRef, Dictionary<?>> dictionaryMap, ICuboidWriter gtRecordWriter) {
if (dictionaryMap == null || dictionaryMap.isEmpty()) {
throw new IllegalArgumentException("dictionary cannot be empty");
@@ -158,12 +163,12 @@ public class InMemCubeBuilder implements Runnable {
private GridTable newGridTableByCuboidID(long cuboidID) throws IOException {
GTInfo info = CubeGridTable.newGTInfo(cubeDesc, cuboidID, dictionaryMap);
-
+
// Before several store implementation are very similar in performance. The ConcurrentDiskStore is the simplest.
// MemDiskStore store = new MemDiskStore(info, memBudget == null ? MemoryBudgetController.ZERO_BUDGET : memBudget);
// MemDiskStore store = new MemDiskStore(info, MemoryBudgetController.ZERO_BUDGET);
ConcurrentDiskStore store = new ConcurrentDiskStore(info);
-
+
GridTable gridTable = new GridTable(info, store);
return gridTable;
}
@@ -289,13 +294,17 @@ public class InMemCubeBuilder implements Runnable {
while (taskCuboidCompleted.get() < outputCuboidExpected) {
CuboidTask task = null;
synchronized (taskPending) {
- while (task == null) {
+ while (task == null && taskHasNoException()) {
task = taskPending.pollFirst();
if (task == null)
- taskPending.wait();
+ taskPending.wait(60000);
}
}
+ // if task error occurs
+ if (task == null)
+ break;
+
CuboidResult newCuboid = buildCuboid(task.parent, task.childCuboidId);
addChildTasks(newCuboid);
taskCuboidCompleted.incrementAndGet();
@@ -308,12 +317,21 @@ public class InMemCubeBuilder implements Runnable {
}
}
} catch (Throwable ex) {
- if (taskCuboidCompleted.get() < outputCuboidExpected)
+ if (taskCuboidCompleted.get() < outputCuboidExpected) {
+ logger.error("task thread exception", ex);
taskThreadExceptions[id] = ex;
+ }
}
}
}
+ private boolean taskHasNoException() {
+ for (int i = 0; i < taskThreadExceptions.length; i++)
+ if (taskThreadExceptions[i] != null)
+ return false;
+ return true;
+ }
+
private void addChildTasks(CuboidResult parent) {
List<Long> children = cuboidScheduler.getSpanningCuboid(parent.cuboidId);
if (!children.isEmpty()) {
@@ -326,10 +344,10 @@ public class InMemCubeBuilder implements Runnable {
}
}
- private TreeMap<Long, CuboidResult> prepareOutputPending() {
+ private SortedMap<Long, CuboidResult> prepareOutputPending() {
TreeMap<Long, CuboidResult> result = new TreeMap<Long, CuboidResult>();
prepareOutputPendingRecursive(Cuboid.getBaseCuboidId(cubeDesc), result);
- return result;
+ return Collections.synchronizedSortedMap(result);
}
private void prepareOutputPendingRecursive(Long cuboidId, TreeMap<Long, CuboidResult> result) {
@@ -344,20 +362,26 @@ public class InMemCubeBuilder implements Runnable {
public void run() {
try {
while (!outputPending.isEmpty()) {
- CuboidResult result = outputPending.firstEntry().getValue();
+ CuboidResult result = outputPending.get(outputPending.firstKey());
synchronized (result) {
- while (result.table == null) {
+ while (result.table == null && taskHasNoException()) {
try {
- result.wait();
+ result.wait(60000);
} catch (InterruptedException e) {
logger.error("interrupted", e);
}
}
}
+
+ // if task error occurs
+ if (result.table == null)
+ break;
+
outputCuboid(result.cuboidId, result.table);
outputPending.remove(result.cuboidId);
}
} catch (Throwable ex) {
+ logger.error("output thread exception", ex);
outputThreadException = ex;
}
}
@@ -498,7 +522,7 @@ public class InMemCubeBuilder implements Runnable {
logger.info("Calculating cuboid " + cuboidId);
GTScanRequest req = new GTScanRequest(gridTable.getInfo(), null, aggregationColumns, measureColumns, metricsAggrFuncs, null);
- IGTScanner scanner = gridTable.scan(req);
+ GTAggregateScanner scanner = (GTAggregateScanner) gridTable.scan(req);
GridTable newGridTable = newGridTableByCuboidID(cuboidId);
GTBuilder builder = newGridTable.rebuild();
@@ -546,6 +570,8 @@ public class InMemCubeBuilder implements Runnable {
builder.write(newRecord);
}
+
+ sanityCheck(scanner.getTotalSumForSanityCheck());
} finally {
scanner.close();
builder.close();
@@ -557,6 +583,24 @@ public class InMemCubeBuilder implements Runnable {
return updateCuboidResult(cuboidId, newGridTable, count, timeSpent, 0);
}
+ private void sanityCheck(Object[] totalSum) {
+ // double sum introduces error and causes result not exactly equal
+ for (int i = 0; i < totalSum.length; i++) {
+ if (totalSum[i] instanceof DoubleWritable) {
+ totalSum[i] = Math.round(((DoubleWritable) totalSum[i]).get());
+ }
+ }
+ logger.info(Arrays.toString(totalSum));
+
+ if (totalSumForSanityCheck == null) {
+ totalSumForSanityCheck = totalSum;
+ return;
+ }
+ if (Arrays.equals(totalSumForSanityCheck, totalSum) == false) {
+ throw new IllegalStateException();
+ }
+ }
+
private void outputCuboid(long cuboidId, GridTable gridTable) throws IOException {
long startTime = System.currentTimeMillis();
GTScanRequest req = new GTScanRequest(gridTable.getInfo(), null, null, null);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b75a2400/job/src/test/java/org/apache/kylin/job/inmemcubing/ConcurrentDiskStoreTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/inmemcubing/ConcurrentDiskStoreTest.java b/job/src/test/java/org/apache/kylin/job/inmemcubing/ConcurrentDiskStoreTest.java
index 7aa4640..2fbcd94 100644
--- a/job/src/test/java/org/apache/kylin/job/inmemcubing/ConcurrentDiskStoreTest.java
+++ b/job/src/test/java/org/apache/kylin/job/inmemcubing/ConcurrentDiskStoreTest.java
@@ -47,7 +47,7 @@ public class ConcurrentDiskStoreTest {
@Test
public void testMultiThreadRead() throws IOException, InterruptedException {
long start = System.currentTimeMillis();
- verifyOneTableWriteAndRead(5);
+ verifyOneTableWriteAndRead(20);
long end = System.currentTimeMillis();
System.out.println("Cost " + (end - start) + " millis");
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b75a2400/metadata/src/main/java/org/apache/kylin/metadata/serializer/BigDecimalSerializer.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/serializer/BigDecimalSerializer.java b/metadata/src/main/java/org/apache/kylin/metadata/serializer/BigDecimalSerializer.java
index af67112..72e3696 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/serializer/BigDecimalSerializer.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/serializer/BigDecimalSerializer.java
@@ -38,7 +38,8 @@ public class BigDecimalSerializer extends DataTypeSerializer<BigDecimal> {
final DataType type;
final int maxLength;
- int counter = 0;
+
+ int avoidVerbose = 0;
public BigDecimalSerializer(DataType type) {
this.type = type;
@@ -49,8 +50,8 @@ public class BigDecimalSerializer extends DataTypeSerializer<BigDecimal> {
@Override
public void serialize(BigDecimal value, ByteBuffer out) {
if (value.scale() > type.getScale()) {
- if (counter % 10000 == 0) {
- logger.warn("value's scale has exceeded the " + type.getScale() + ", cut it off, to ensure encoded value do not exceed maxLength " + maxLength + " times:" + (counter++));
+ if (avoidVerbose % 10000 == 0) {
+ logger.warn("value's scale has exceeded the " + type.getScale() + ", cut it off, to ensure encoded value do not exceed maxLength " + maxLength + " times:" + (avoidVerbose++));
}
value = value.setScale(type.getScale(), BigDecimal.ROUND_HALF_EVEN);
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b75a2400/metadata/src/main/java/org/apache/kylin/metadata/serializer/DateTimeSerializer.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/serializer/DateTimeSerializer.java b/metadata/src/main/java/org/apache/kylin/metadata/serializer/DateTimeSerializer.java
index f817693..23f7695 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/serializer/DateTimeSerializer.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/serializer/DateTimeSerializer.java
@@ -9,8 +9,8 @@ import org.apache.kylin.common.util.DateFormat;
public class DateTimeSerializer extends DataTypeSerializer<LongWritable> {
- // avoid mass object creation
- LongWritable current = new LongWritable();
+ // be thread-safe and avoid repeated obj creation
+ private static ThreadLocal<LongWritable> current = new ThreadLocal<LongWritable>();
public DateTimeSerializer(DataType type) {
}
@@ -20,10 +20,20 @@ public class DateTimeSerializer extends DataTypeSerializer<LongWritable> {
out.putLong(value.get());
}
+ private LongWritable current() {
+ LongWritable l = current.get();
+ if (l == null) {
+ l = new LongWritable();
+ current.set(l);
+ }
+ return l;
+ }
+
@Override
public LongWritable deserialize(ByteBuffer in) {
- current.set(in.getLong());
- return current;
+ LongWritable l = current();
+ l.set(in.getLong());
+ return l;
}
@Override
@@ -38,11 +48,12 @@ public class DateTimeSerializer extends DataTypeSerializer<LongWritable> {
@Override
public LongWritable valueOf(byte[] value) {
+ LongWritable l = current();
if (value == null)
- current.set(0L);
+ l.set(0L);
else
- current.set(DateFormat.stringToMillis(Bytes.toString(value)));
- return current;
+ l.set(DateFormat.stringToMillis(Bytes.toString(value)));
+ return l;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b75a2400/metadata/src/main/java/org/apache/kylin/metadata/serializer/DoubleSerializer.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/serializer/DoubleSerializer.java b/metadata/src/main/java/org/apache/kylin/metadata/serializer/DoubleSerializer.java
index 0ebeb78..adafcc0 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/serializer/DoubleSerializer.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/serializer/DoubleSerializer.java
@@ -20,8 +20,8 @@ package org.apache.kylin.metadata.serializer;
import java.nio.ByteBuffer;
-import org.apache.kylin.common.util.Bytes;
import org.apache.hadoop.io.DoubleWritable;
+import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.metadata.model.DataType;
/**
@@ -30,8 +30,8 @@ import org.apache.kylin.metadata.model.DataType;
*/
public class DoubleSerializer extends DataTypeSerializer<DoubleWritable> {
- // avoid mass object creation
- DoubleWritable current = new DoubleWritable();
+ // be thread-safe and avoid repeated obj creation
+ private static ThreadLocal<DoubleWritable> current = new ThreadLocal<DoubleWritable>();
public DoubleSerializer(DataType type) {
}
@@ -41,10 +41,20 @@ public class DoubleSerializer extends DataTypeSerializer<DoubleWritable> {
out.putDouble(value.get());
}
+ private DoubleWritable current() {
+ DoubleWritable d = current.get();
+ if (d == null) {
+ d = new DoubleWritable();
+ current.set(d);
+ }
+ return d;
+ }
+
@Override
public DoubleWritable deserialize(ByteBuffer in) {
- current.set(in.getDouble());
- return current;
+ DoubleWritable d = current();
+ d.set(in.getDouble());
+ return d;
}
@Override
@@ -59,11 +69,12 @@ public class DoubleSerializer extends DataTypeSerializer<DoubleWritable> {
@Override
public DoubleWritable valueOf(byte[] value) {
+ DoubleWritable d = current();
if (value == null)
- current.set(0d);
+ d.set(0d);
else
- current.set(Double.parseDouble(Bytes.toString(value)));
- return current;
+ d.set(Double.parseDouble(Bytes.toString(value)));
+ return d;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b75a2400/metadata/src/main/java/org/apache/kylin/metadata/serializer/HLLCSerializer.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/serializer/HLLCSerializer.java b/metadata/src/main/java/org/apache/kylin/metadata/serializer/HLLCSerializer.java
index 3f39866..1b9d225 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/serializer/HLLCSerializer.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/serializer/HLLCSerializer.java
@@ -30,10 +30,13 @@ import org.apache.kylin.metadata.model.DataType;
*/
public class HLLCSerializer extends DataTypeSerializer<HyperLogLogPlusCounter> {
- HyperLogLogPlusCounter current;
+ // be thread-safe and avoid repeated obj creation
+ private static ThreadLocal<HyperLogLogPlusCounter> current = new ThreadLocal<HyperLogLogPlusCounter>();
+
+ private int precision;
public HLLCSerializer(DataType type) {
- current = new HyperLogLogPlusCounter(type.getPrecision());
+ this.precision = type.getPrecision();
}
@Override
@@ -45,34 +48,45 @@ public class HLLCSerializer extends DataTypeSerializer<HyperLogLogPlusCounter> {
}
}
+ private HyperLogLogPlusCounter current() {
+ HyperLogLogPlusCounter hllc = current.get();
+ if (hllc == null) {
+ hllc = new HyperLogLogPlusCounter(precision);
+ current.set(hllc);
+ }
+ return hllc;
+ }
+
@Override
public HyperLogLogPlusCounter deserialize(ByteBuffer in) {
+ HyperLogLogPlusCounter hllc = current();
try {
- current.readRegisters(in);
+ hllc.readRegisters(in);
} catch (IOException e) {
throw new RuntimeException(e);
}
- return current;
+ return hllc;
}
@Override
public int peekLength(ByteBuffer in) {
- return current.peekLength(in);
+ return current().peekLength(in);
}
@Override
public int maxLength() {
- return current.maxLength();
+ return current().maxLength();
}
@Override
public HyperLogLogPlusCounter valueOf(byte[] value) {
- current.clear();
+ HyperLogLogPlusCounter hllc = current();
+ hllc.clear();
if (value == null)
- current.add("__nUlL__");
+ hllc.add("__nUlL__");
else
- current.add(value);
- return current;
+ hllc.add(value);
+ return hllc;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b75a2400/metadata/src/main/java/org/apache/kylin/metadata/serializer/LongSerializer.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/serializer/LongSerializer.java b/metadata/src/main/java/org/apache/kylin/metadata/serializer/LongSerializer.java
index ba9ff8b..28b009f 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/serializer/LongSerializer.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/serializer/LongSerializer.java
@@ -20,8 +20,8 @@ package org.apache.kylin.metadata.serializer;
import java.nio.ByteBuffer;
-import org.apache.kylin.common.util.Bytes;
import org.apache.hadoop.io.LongWritable;
+import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.metadata.model.DataType;
@@ -31,8 +31,8 @@ import org.apache.kylin.metadata.model.DataType;
*/
public class LongSerializer extends DataTypeSerializer<LongWritable> {
- // avoid mass object creation
- LongWritable current = new LongWritable();
+ // be thread-safe and avoid repeated obj creation
+ private static ThreadLocal<LongWritable> current = new ThreadLocal<LongWritable>();
public LongSerializer(DataType type) {
}
@@ -42,10 +42,20 @@ public class LongSerializer extends DataTypeSerializer<LongWritable> {
BytesUtil.writeVLong(value.get(), out);
}
+ private LongWritable current() {
+ LongWritable l = current.get();
+ if (l == null) {
+ l = new LongWritable();
+ current.set(l);
+ }
+ return l;
+ }
+
@Override
public LongWritable deserialize(ByteBuffer in) {
- current.set(BytesUtil.readVLong(in));
- return current;
+ LongWritable l = current();
+ l.set(BytesUtil.readVLong(in));
+ return l;
}
@Override
@@ -66,11 +76,12 @@ public class LongSerializer extends DataTypeSerializer<LongWritable> {
@Override
public LongWritable valueOf(byte[] value) {
+ LongWritable l = current();
if (value == null)
- current.set(0L);
+ l.set(0L);
else
- current.set(Long.parseLong(Bytes.toString(value)));
- return current;
+ l.set(Long.parseLong(Bytes.toString(value)));
+ return l;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b75a2400/server/src/main/java/org/apache/kylin/rest/DebugTomcat.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/DebugTomcat.java b/server/src/main/java/org/apache/kylin/rest/DebugTomcat.java
index 0b5db67..bbdde7a 100644
--- a/server/src/main/java/org/apache/kylin/rest/DebugTomcat.java
+++ b/server/src/main/java/org/apache/kylin/rest/DebugTomcat.java
@@ -38,8 +38,9 @@ public class DebugTomcat {
// test_case_data/sandbox/ contains HDP 2.2 site xmls which is dev sandbox
ClasspathUtil.addClasspath(new File("../examples/test_case_data/sandbox").getAbsolutePath());
System.setProperty(KylinConfig.KYLIN_CONF, "../examples/test_case_data/sandbox");
- System.setProperty("hdp.version", "2.2.4.2-2"); // mapred-site.xml ref this
System.setProperty("spring.profiles.active", "testing");
+ if (System.getProperty("hdp.version") == null)
+ System.setProperty("hdp.version", "2.2.4.2-2"); // mapred-site.xml ref this
// workaround for job submission from win to linux -- https://issues.apache.org/jira/browse/MAPREDUCE-4052
if (Shell.WINDOWS) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b75a2400/storage/src/main/java/org/apache/kylin/storage/cube/CubeCodeSystem.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/cube/CubeCodeSystem.java b/storage/src/main/java/org/apache/kylin/storage/cube/CubeCodeSystem.java
index 881e7a1..68b46b5 100644
--- a/storage/src/main/java/org/apache/kylin/storage/cube/CubeCodeSystem.java
+++ b/storage/src/main/java/org/apache/kylin/storage/cube/CubeCodeSystem.java
@@ -6,8 +6,8 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
-import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.cube.kv.RowConstants;
import org.apache.kylin.dict.Dictionary;
@@ -196,16 +196,27 @@ public class CubeCodeSystem implements IGTCodeSystem {
static class FixLenSerializer extends DataTypeSerializer {
+ // be thread-safe and avoid repeated obj creation
+ private static ThreadLocal<byte[]> current = new ThreadLocal<byte[]>();
+
private int fixLen;
- private byte[] buf;
FixLenSerializer(int fixLen) {
this.fixLen = fixLen;
- this.buf = new byte[fixLen];
+ }
+
+ private byte[] currentBuf() {
+ byte[] buf = current.get();
+ if (buf == null) {
+ buf = new byte[fixLen];
+ current.set(buf);
+ }
+ return buf;
}
@Override
public void serialize(Object value, ByteBuffer out) {
+ byte[] buf = currentBuf();
if (value == null) {
Arrays.fill(buf, Dictionary.NULL);
out.put(buf);
@@ -223,6 +234,7 @@ public class CubeCodeSystem implements IGTCodeSystem {
@Override
public Object deserialize(ByteBuffer in) {
+ byte[] buf = currentBuf();
in.get(buf);
int tail = fixLen;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b75a2400/storage/src/main/java/org/apache/kylin/storage/gridtable/GTAggregateScanner.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTAggregateScanner.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTAggregateScanner.java
index c34adc7..6005d2d 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTAggregateScanner.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTAggregateScanner.java
@@ -9,6 +9,8 @@ import java.util.SortedMap;
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.metadata.measure.HLLCAggregator;
+import org.apache.kylin.metadata.measure.LDCAggregator;
import org.apache.kylin.metadata.measure.MeasureAggregator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -27,8 +29,7 @@ public class GTAggregateScanner implements IGTScanner {
final ImmutableBitSet metrics;
final String[] metricsAggrFuncs;
final IGTScanner inputScanner;
-
- long estimateSizeOfAggrCache;
+ final AggregationCache aggrCache;
public GTAggregateScanner(IGTScanner inputScanner, GTScanRequest req) {
if (req.hasAggregation() == false)
@@ -40,6 +41,7 @@ public class GTAggregateScanner implements IGTScanner {
this.metrics = req.getAggrMetrics();
this.metricsAggrFuncs = req.getAggrMetricsFuncs();
this.inputScanner = inputScanner;
+ this.aggrCache = new AggregationCache();
}
@Override
@@ -64,19 +66,19 @@ public class GTAggregateScanner implements IGTScanner {
@Override
public Iterator<GTRecord> iterator() {
- AggregationCache aggrCache = new AggregationCache();
for (GTRecord r : inputScanner) {
aggrCache.aggregate(r);
}
-
- estimateSizeOfAggrCache = aggrCache.esitmateMemSize();
-
return aggrCache.iterator();
}
-
- /** for last call to iterator(), return the estimate memory size of its aggregation cache */
+
+ /** return the estimate memory size of aggregation cache */
public long getEstimateSizeOfAggrCache() {
- return estimateSizeOfAggrCache;
+ return aggrCache.esitmateMemSize();
+ }
+
+ public Object[] getTotalSumForSanityCheck() {
+ return aggrCache.calculateTotalSumSanityCheck();
}
class AggregationCache {
@@ -149,11 +151,7 @@ public class GTAggregateScanner implements IGTScanner {
final byte[] key = createKey(r);
MeasureAggregator[] aggrs = aggBufMap.get(key);
if (aggrs == null) {
- aggrs = new MeasureAggregator[metricsAggrFuncs.length];
- for (int i = 0; i < aggrs.length; i++) {
- int col = metrics.trueBitAt(i);
- aggrs[i] = info.codeSystem.newMetricsAggregator(metricsAggrFuncs[i], col);
- }
+ aggrs = newAggregators();
aggBufMap.put(key, aggrs);
}
for (int i = 0; i < aggrs.length; i++) {
@@ -162,11 +160,44 @@ public class GTAggregateScanner implements IGTScanner {
aggrs[i].aggregate(metrics);
}
}
-
+
+ private MeasureAggregator[] newAggregators() {
+ MeasureAggregator[] aggrs;
+ aggrs = new MeasureAggregator[metricsAggrFuncs.length];
+ for (int i = 0; i < aggrs.length; i++) {
+ int col = metrics.trueBitAt(i);
+ aggrs[i] = info.codeSystem.newMetricsAggregator(metricsAggrFuncs[i], col);
+ }
+ return aggrs;
+ }
+
+ public Object[] calculateTotalSumSanityCheck() {
+ MeasureAggregator[] totalSum = newAggregators();
+
+ // skip expensive aggregation
+ for (int i = 0; i < totalSum.length; i++) {
+ if (totalSum[i] instanceof HLLCAggregator || totalSum[i] instanceof LDCAggregator)
+ totalSum[i] = null;
+ }
+
+ for (MeasureAggregator[] entry : aggBufMap.values()) {
+ for (int i = 0; i < totalSum.length; i++) {
+ if (totalSum[i] != null)
+ totalSum[i].aggregate(entry[i].getState());
+ }
+ }
+ Object[] result = new Object[totalSum.length];
+ for (int i = 0; i < totalSum.length; i++) {
+ if (totalSum[i] != null)
+ result[i] = totalSum[i].getState();
+ }
+ return result;
+ }
+
public long esitmateMemSize() {
if (aggBufMap.isEmpty())
return 0;
-
+
byte[] sampleKey = aggBufMap.firstKey();
MeasureAggregator<?>[] sampleValue = aggBufMap.get(sampleKey);
return estimateSizeOfAggrCache(sampleKey, sampleValue, aggBufMap.size());