You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/02/27 07:25:27 UTC
[14/41] incubator-kylin git commit: KYLIN-608 hll works
KYLIN-608 hll works
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/579e793a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/579e793a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/579e793a
Branch: refs/heads/inverted-index
Commit: 579e793a16bff1c0a3cb5063051d8c0c8cbc0c19
Parents: 16b184d
Author: honma <ho...@ebay.com>
Authored: Thu Feb 12 17:09:51 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Thu Feb 12 17:39:23 2015 +0800
----------------------------------------------------------------------
.../org/apache/kylin/common/util/BasicTest.java | 2 +
.../invertedindex/index/TableRecordInfo.java | 7 ++--
.../kylin/invertedindex/model/IIDesc.java | 40 ++++++++++++++------
.../measure/fixedlen/FixedHLLCodec.java | 2 +-
.../measure/fixedlen/FixedLenMeasureCodec.java | 8 ++--
.../apache/kylin/metadata/model/DataType.java | 1 +
.../kylin/metadata/model/FunctionDesc.java | 12 +++---
.../AdjustForWeeklyMatchedRealization.java | 8 ++--
.../apache/kylin/query/test/IIQueryTest.java | 9 ++++-
.../resources/query/sql_fast_common/query00.sql | 5 +++
.../kylin/storage/hbase/CubeStorageEngine.java | 2 +-
.../hbase/coprocessor/CoprocessorConstants.java | 2 +-
.../endpoint/EndpointAggregators.java | 25 +++++++-----
.../endpoint/EndpointTupleIterator.java | 22 ++++++++---
14 files changed, 97 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/579e793a/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/kylin/common/util/BasicTest.java b/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
index 0a33f9f..59ed5f3 100644
--- a/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
+++ b/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
@@ -67,5 +67,7 @@ public class BasicTest {
@Test
@Ignore("fix it later")
public void test2() throws IOException, ConfigurationException {
+ int m = 1 << 15;
+ System.out.println(m);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/579e793a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java
index 886c649..7af3dcb 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java
@@ -129,13 +129,12 @@ public class TableRecordInfo {
return desc.findColumn(col);
}
- public int findMetric(String metricColumnName) {
- if (metricColumnName == null)
+ public int findFactTableColumn(String columnName) {
+ if (columnName == null)
return -1;
for (int i = 0; i < allColumns.size(); ++i) {
TblColRef tblColRef = allColumns.get(i);
- if (measureSerializers[i] != null // has measureSerializers means it is a metric
- && tblColRef.isSameAs(desc.getFactTableName(), metricColumnName)) {
+ if (tblColRef.isSameAs(desc.getFactTableName(), columnName)) {
return i;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/579e793a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java
index 5bc611e..6e1224a 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java
@@ -26,6 +26,7 @@ import java.util.HashSet;
import java.util.List;
import org.apache.commons.net.util.Base64;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.util.Bytes;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
@@ -109,14 +110,18 @@ public class IIDesc extends RootPersistentEntity {
IIDimension.capicalizeStrings(valueDimensions);
StringUtil.toUpperCaseArray(metricNames, metricNames);
- // retrieve all columns and all tables
+ // retrieve all columns and all tables, and make available measure to ii
HashSet<String> allTableNames = Sets.newHashSet();
+ measureDescs = Lists.newArrayList();
+ measureDescs.add(makeCountMeasure());
for (IIDimension iiDimension : Iterables.concat(bitmapDimensions, valueDimensions)) {
TableDesc tableDesc = this.getTableDesc(iiDimension.getTable());
for (String column : iiDimension.getColumns()) {
ColumnDesc columnDesc = tableDesc.findColumnByName(column);
allColumns.add(new TblColRef(columnDesc));
+ measureDescs.add(makeHLLMeasure(columnDesc, null));
}
+
if (!allTableNames.contains(tableDesc.getIdentity())) {
allTableNames.add(tableDesc.getIdentity());
allTables.add(tableDesc);
@@ -126,6 +131,9 @@ public class IIDesc extends RootPersistentEntity {
TableDesc tableDesc = this.getTableDesc(this.getFactTableName());
ColumnDesc columnDesc = tableDesc.findColumnByName(column);
allColumns.add(new TblColRef(columnDesc));
+ measureDescs.add(makeNormalMeasure("SUM", columnDesc));
+ measureDescs.add(makeNormalMeasure("MIN", columnDesc));
+ measureDescs.add(makeNormalMeasure("MAX", columnDesc));
if (!allTableNames.contains(tableDesc.getIdentity())) {
allTableNames.add(tableDesc.getIdentity());
allTables.add(tableDesc);
@@ -136,9 +144,7 @@ public class IIDesc extends RootPersistentEntity {
bitmapCols = new int[IIDimension.getColumnCount(bitmapDimensions)];
valueCols = new int[IIDimension.getColumnCount(valueDimensions)];
metricsCols = new int[metricNames.length];
-
metricsColSet = new BitSet(this.getTableDesc(this.getFactTableName()).getColumnCount());
- measureDescs = Lists.newArrayList();
int totalIndex = 0;
for (int i = 0; i < bitmapCols.length; ++i, ++totalIndex) {
@@ -150,14 +156,7 @@ public class IIDesc extends RootPersistentEntity {
for (int i = 0; i < metricsCols.length; ++i, ++totalIndex) {
metricsCols[i] = totalIndex;
metricsColSet.set(totalIndex);
-
- ColumnDesc col = this.getTableDesc(this.getFactTableName()).findColumnByName(metricNames[i]);
- measureDescs.add(makeMeasureDescs("SUM", col));
- measureDescs.add(makeMeasureDescs("MIN", col));
- measureDescs.add(makeMeasureDescs("MAX", col));
- // TODO support for HLL
}
- measureDescs.add(makeCountMeasure());
// partitioning column
tsCol = -1;
@@ -197,7 +196,7 @@ public class IIDesc extends RootPersistentEntity {
return functions;
}
- private MeasureDesc makeMeasureDescs(String func, ColumnDesc columnDesc) {
+ private MeasureDesc makeNormalMeasure(String func, ColumnDesc columnDesc) {
String columnName = columnDesc.getName();
String returnType = columnDesc.getTypeName();
MeasureDesc measureDesc = new MeasureDesc();
@@ -213,6 +212,25 @@ public class IIDesc extends RootPersistentEntity {
return measureDesc;
}
+ /**
+ *
+ * @param hllType represents the presision
+ */
+ private MeasureDesc makeHLLMeasure(ColumnDesc columnDesc, String hllType) {
+ String columnName = columnDesc.getName();
+ MeasureDesc measureDesc = new MeasureDesc();
+ FunctionDesc f1 = new FunctionDesc();
+ f1.setExpression("COUNT_DISTINCT");
+ ParameterDesc p1 = new ParameterDesc();
+ p1.setType("column");
+ p1.setValue(columnName);
+ p1.setColRefs(ImmutableList.of(new TblColRef(columnDesc)));
+ f1.setParameter(p1);
+ f1.setReturnType(hllType);
+ measureDesc.setFunction(f1);
+ return measureDesc;
+ }
+
private MeasureDesc makeCountMeasure() {
MeasureDesc measureDesc = new MeasureDesc();
FunctionDesc f1 = new FunctionDesc();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/579e793a/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedHLLCodec.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedHLLCodec.java b/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedHLLCodec.java
index c6d4dc9..138940f 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedHLLCodec.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedHLLCodec.java
@@ -53,6 +53,6 @@ public class FixedHLLCodec extends FixedLenMeasureCodec<HyperLogLogPlusCounter>
@Override
public void write(HyperLogLogPlusCounter v, byte[] buf, int offset) {
- current.writeRegistersArray(ByteBuffer.wrap(buf, offset, buf.length - offset));
+ v.writeRegistersArray(ByteBuffer.wrap(buf, offset, buf.length - offset));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/579e793a/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedLenMeasureCodec.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedLenMeasureCodec.java b/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedLenMeasureCodec.java
index 650432a..ad8c483 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedLenMeasureCodec.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedLenMeasureCodec.java
@@ -20,11 +20,14 @@ package org.apache.kylin.metadata.measure.fixedlen;
import org.apache.kylin.metadata.model.DataType;
-
abstract public class FixedLenMeasureCodec<T> {
public static FixedLenMeasureCodec<?> get(DataType type) {
- return new FixedPointLongCodec(type);
+ if (type.isHLLC()) {
+ return new FixedHLLCodec(type);
+ } else {
+ return new FixedPointLongCodec(type);
+ }
}
abstract public int getLength();
@@ -33,7 +36,6 @@ abstract public class FixedLenMeasureCodec<T> {
abstract public T valueOf(String value);
-
abstract public Object getValue();
abstract public T read(byte[] buf, int offset);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/579e793a/metadata/src/main/java/org/apache/kylin/metadata/model/DataType.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/model/DataType.java b/metadata/src/main/java/org/apache/kylin/metadata/model/DataType.java
index 70c24c9..a4e8db6 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/model/DataType.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/model/DataType.java
@@ -89,6 +89,7 @@ public class DataType {
private static final ConcurrentMap<DataType, DataType> CACHE = new ConcurrentHashMap<DataType, DataType>();
+
public static DataType getInstance(String type) {
if (type == null)
return null;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/579e793a/metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java b/metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
index e80532c..eda31a1 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
@@ -48,7 +48,7 @@ public class FunctionDesc {
private String returnType;
private DataType returnDataType;
- private boolean isAppliedOnDimension = false;
+ private boolean isDimensionAsMetric = false;
public String getRewriteFieldName() {
if (isSum()) {
@@ -62,7 +62,7 @@ public class FunctionDesc {
}
public boolean needRewrite() {
- return !isSum() && !isHolisticCountDistinct() && !isAppliedOnDimension();
+ return !isSum() && !isHolisticCountDistinct() && !isDimensionAsMetric();
}
public boolean isMin() {
@@ -106,12 +106,12 @@ public class FunctionDesc {
return sb.toString();
}
- public boolean isAppliedOnDimension() {
- return isAppliedOnDimension;
+ public boolean isDimensionAsMetric() {
+ return isDimensionAsMetric;
}
- public void setAppliedOnDimension(boolean isAppliedOnDimension) {
- this.isAppliedOnDimension = isAppliedOnDimension;
+ public void setDimensionAsMetric(boolean isDimensionAsMetric) {
+ this.isDimensionAsMetric = isDimensionAsMetric;
}
public String getExpression() {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/579e793a/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/AdjustForWeeklyMatchedRealization.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/AdjustForWeeklyMatchedRealization.java b/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/AdjustForWeeklyMatchedRealization.java
index 5f49ba4..7a36cf1 100644
--- a/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/AdjustForWeeklyMatchedRealization.java
+++ b/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/AdjustForWeeklyMatchedRealization.java
@@ -61,7 +61,7 @@ public class AdjustForWeeklyMatchedRealization extends RoutingRule {
private static void adjustOLAPContextIfNecessary(IIInstance ii, OLAPContext olapContext) {
IIDesc iiDesc = ii.getDescriptor();
Collection<FunctionDesc> iiFuncs = iiDesc.listAllFunctions();
- convertAggreationToDimension(olapContext, iiFuncs, iiDesc.getFactTableName());
+ convertAggregationToDimension(olapContext, iiFuncs, iiDesc.getFactTableName());
}
private static void adjustOLAPContextIfNecessary(CubeInstance cube, OLAPContext olapContext) {
@@ -70,17 +70,17 @@ public class AdjustForWeeklyMatchedRealization extends RoutingRule {
CubeDesc cubeDesc = cube.getDescriptor();
Collection<FunctionDesc> cubeFuncs = cubeDesc.listAllFunctions();
- convertAggreationToDimension(olapContext, cubeFuncs, cubeDesc.getFactTable());
+ convertAggregationToDimension(olapContext, cubeFuncs, cubeDesc.getFactTable());
}
- private static void convertAggreationToDimension(OLAPContext olapContext, Collection<FunctionDesc> availableAggregations, String factTableName) {
+ private static void convertAggregationToDimension(OLAPContext olapContext, Collection<FunctionDesc> availableAggregations, String factTableName) {
Iterator<FunctionDesc> it = olapContext.aggregations.iterator();
while (it.hasNext()) {
FunctionDesc functionDesc = it.next();
if (!availableAggregations.contains(functionDesc)) {
// try to convert the metric to dimension to see if it works
TblColRef col = functionDesc.selectTblColRef(olapContext.metricsColumns, factTableName);
- functionDesc.setAppliedOnDimension(true);
+ functionDesc.setDimensionAsMetric(true);
olapContext.rewriteFields.remove(functionDesc.getRewriteFieldName());
if (col != null) {
olapContext.metricsColumns.remove(col);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/579e793a/query/src/test/java/org/apache/kylin/query/test/IIQueryTest.java
----------------------------------------------------------------------
diff --git a/query/src/test/java/org/apache/kylin/query/test/IIQueryTest.java b/query/src/test/java/org/apache/kylin/query/test/IIQueryTest.java
index d37eac0..d36ffb9 100644
--- a/query/src/test/java/org/apache/kylin/query/test/IIQueryTest.java
+++ b/query/src/test/java/org/apache/kylin/query/test/IIQueryTest.java
@@ -37,7 +37,7 @@ public class IIQueryTest extends KylinQueryTest {
public static void setUp() throws Exception {
KylinQueryTest.setUp();//invoke super class
- distinctCountSupported = false;
+ distinctCountSupported = true;
Map<RealizationType, Integer> priorities = Maps.newHashMap();
priorities.put(RealizationType.INVERTED_INDEX, 0);
@@ -64,10 +64,15 @@ public class IIQueryTest extends KylinQueryTest {
@Test
public void testSingleRunQuery() throws Exception {
- String queryFileName = "src/test/resources/query/sql_ii/query04.sql";
+ String queryFileName = "src/test/resources/query/sql_distinct/query00.sql";
File sqlFile = new File(queryFileName);
runSQL(sqlFile, true, true);
runSQL(sqlFile, true, false);
}
+
+ @Test
+ public void testDistinctCountQuery() throws Exception {
+ super.testDistinctCountQuery();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/579e793a/query/src/test/resources/query/sql_fast_common/query00.sql
----------------------------------------------------------------------
diff --git a/query/src/test/resources/query/sql_fast_common/query00.sql b/query/src/test/resources/query/sql_fast_common/query00.sql
new file mode 100644
index 0000000..198aea1
--- /dev/null
+++ b/query/src/test/resources/query/sql_fast_common/query00.sql
@@ -0,0 +1,5 @@
+select lstg_format_name, cal_dt,
+ sum(price) as GMV,
+ count(1) as TRANS_CNT
+ from test_kylin_fact
+ group by lstg_format_name, cal_dt
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/579e793a/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageEngine.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageEngine.java b/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageEngine.java
index 6eddd8d..a4ecc2a 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageEngine.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageEngine.java
@@ -136,7 +136,7 @@ public class CubeStorageEngine implements IStorageEngine {
private void buildDimensionsAndMetrics(Collection<TblColRef> dimensions, Collection<FunctionDesc> metrics, SQLDigest sqlDigest) {
for (FunctionDesc func : sqlDigest.aggregations) {
- if (!func.isAppliedOnDimension()) {
+ if (!func.isDimensionAsMetric()) {
metrics.add(func);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/579e793a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/CoprocessorConstants.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/CoprocessorConstants.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/CoprocessorConstants.java
index 40ba64a..7efb283 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/CoprocessorConstants.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/CoprocessorConstants.java
@@ -23,5 +23,5 @@ package org.apache.kylin.storage.hbase.coprocessor;
*/
public class CoprocessorConstants {
public static final int SERIALIZE_BUFFER_SIZE = 65536;
- public static final int METRIC_SERIALIZE_BUFFER_SIZE = 1024;
+ public static final int METRIC_SERIALIZE_BUFFER_SIZE = 65536;
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/579e793a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregators.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregators.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregators.java
index 516c160..f8bf182 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregators.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregators.java
@@ -50,12 +50,12 @@ public class EndpointAggregators {
private static class MetricInfo {
private MetricType type;
private int refIndex = -1;
- private int presision = -1;
+ private int precision = -1;
public MetricInfo(MetricType type, int refIndex, int presision) {
this.type = type;
this.refIndex = refIndex;
- this.presision = presision;
+ this.precision = presision;
}
public MetricInfo(MetricType type, int refIndex) {
@@ -83,12 +83,12 @@ public class EndpointAggregators {
if (functionDesc.isCount()) {
metricInfos[i] = new MetricInfo(MetricType.Count);
- } else if (functionDesc.isAppliedOnDimension()) {
+ } else if (functionDesc.isDimensionAsMetric()) {
metricInfos[i] = new MetricInfo(MetricType.DimensionAsMetric);
} else {
- int index = tableInfo.findMetric(functionDesc.getParameter().getValue());
+ int index = tableInfo.findFactTableColumn(functionDesc.getParameter().getValue());
if (index < 0) {
- throw new IllegalStateException("Column " + functionDesc.getParameter().getColRefs().get(0) + " is not found in II");
+ throw new IllegalStateException("Column " + functionDesc.getParameter().getValue() + " is not found in II");
}
if (functionDesc.isCountDistinct()) {
@@ -141,9 +141,13 @@ public class EndpointAggregators {
public MeasureAggregator[] createBuffer() {
MeasureAggregator[] aggrs = new MeasureAggregator[funcNames.length];
- for (int j = 0; j < aggrs.length; j++) {
- //all fixed length measures can be aggregated as long
- aggrs[j] = MeasureAggregator.create(funcNames[j], "long");
+ for (int i = 0; i < aggrs.length; i++) {
+ if (metricInfos[i].type == MetricType.DistinctCount) {
+ aggrs[i] = MeasureAggregator.create(funcNames[i], dataTypes[i]);
+ } else {
+ //all other fixed length measures can be aggregated as long
+ aggrs[i] = MeasureAggregator.create(funcNames[i], "long");
+ }
}
return aggrs;
}
@@ -179,7 +183,8 @@ public class EndpointAggregators {
//TODO: for unified dictionary, this is okay. but if different data blocks uses different dictionary, we'll have to aggregate original data
HyperLogLogPlusCounter hllc = hllcs[metricIndex];
if (hllc == null) {
- hllc = new HyperLogLogPlusCounter(metricInfo.presision);
+ int precision = metricInfo.precision;
+ hllc = new HyperLogLogPlusCounter(precision);
}
hllc.clear();
hllc.add(byteBuffer.get(), byteBuffer.getOffset(), byteBuffer.getLength());
@@ -244,7 +249,7 @@ public class EndpointAggregators {
MetricInfo metricInfo = value.metricInfos[i];
BytesUtil.writeAsciiString(metricInfo.type.toString(), out);
BytesUtil.writeVInt(metricInfo.refIndex, out);
- BytesUtil.writeVInt(metricInfo.presision, out);
+ BytesUtil.writeVInt(metricInfo.precision, out);
}
BytesUtil.writeByteArray(TableRecordInfoDigest.serialize(value.tableRecordInfoDigest), out);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/579e793a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointTupleIterator.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointTupleIterator.java
index 465f7f3..5bf22e7 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointTupleIterator.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointTupleIterator.java
@@ -54,6 +54,8 @@ import org.apache.kylin.storage.hbase.coprocessor.CoprocessorFilter;
import org.apache.kylin.metadata.tuple.ITuple;
import org.apache.kylin.metadata.tuple.ITupleIterator;
+import javax.xml.datatype.DatatypeConfigurationException;
+
/**
* Created by Hongbin Ma(Binmahone) on 12/2/14.
*/
@@ -99,7 +101,9 @@ public class EndpointTupleIterator implements ITupleIterator {
if (measures == null) {
measures = Lists.newArrayList();
}
- initMeaureParameters(measures, segment.getColumns());
+
+ //this method will change measures
+ rewriteMeasureParameters(measures, segment.getColumns());
this.seg = segment;
this.context = context;
@@ -137,7 +141,7 @@ public class EndpointTupleIterator implements ITupleIterator {
* @param measures
* @param columns
*/
- private void initMeaureParameters(List<FunctionDesc> measures, List<TblColRef> columns) {
+ private void rewriteMeasureParameters(List<FunctionDesc> measures, List<TblColRef> columns) {
for (FunctionDesc functionDesc : measures) {
if (functionDesc.isCount()) {
functionDesc.setReturnType("bigint");
@@ -146,8 +150,15 @@ public class EndpointTupleIterator implements ITupleIterator {
boolean updated = false;
for (TblColRef column : columns) {
if (column.isSameAs(factTableName, functionDesc.getParameter().getValue())) {
- functionDesc.setReturnType(column.getColumn().getType().toString());
- functionDesc.setReturnDataType(DataType.getInstance(functionDesc.getReturnType()));
+ if (functionDesc.isCountDistinct()) {
+ //TODO: default precision might need be configurable
+ String iiDefaultHLLC = "hllc10";
+ functionDesc.setReturnType(iiDefaultHLLC);
+ functionDesc.setReturnDataType(DataType.getInstance(iiDefaultHLLC));
+ } else {
+ functionDesc.setReturnType(column.getColumn().getType().toString());
+ functionDesc.setReturnDataType(DataType.getInstance(functionDesc.getReturnType()));
+ }
functionDesc.getParameter().setColRefs(ImmutableList.of(column));
updated = true;
break;
@@ -292,6 +303,7 @@ public class EndpointTupleIterator implements ITupleIterator {
this.tableRecord.setBytes(columnsBytes, 0, columnsBytes.length);
if (currentRow.hasMeasures()) {
byte[] measuresBytes = currentRow.getMeasures().toByteArray();
+
this.measureValues = pushedDownAggregators.deserializeMetricValues(measuresBytes, 0);
}
@@ -318,7 +330,7 @@ public class EndpointTupleIterator implements ITupleIterator {
if (measureValues != null) {
for (int i = 0; i < measures.size(); ++i) {
- if (!measures.get(i).isAppliedOnDimension()) {
+ if (!measures.get(i).isDimensionAsMetric()) {
String fieldName = measures.get(i).getRewriteFieldName();
Object value = measureValues.get(i);
String dataType = tuple.getDataType(fieldName);