You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/02/12 06:43:11 UTC
[21/50] incubator-kylin git commit: hll on ii on going
hll on ii on going
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/0a96d742
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/0a96d742
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/0a96d742
Branch: refs/heads/inverted-index
Commit: 0a96d742771c8a3cafc2f5c54265cbc6531ecf2e
Parents: 9fb8aaa
Author: honma <ho...@ebay.com>
Authored: Wed Feb 11 09:14:20 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Wed Feb 11 09:14:20 2015 +0800
----------------------------------------------------------------------
.../common/hll/HyperLogLogPlusCounter.java | 28 ++++++--
.../org/apache/kylin/common/util/BasicTest.java | 6 +-
.../common/util/HBaseMiniclusterHelper.java | 2 -
.../common/util/HyperLogLogCounterTest.java | 4 +-
.../invertedindex/index/TableRecordInfo.java | 3 +-
.../cardinality/ColumnCardinalityMapper.java | 2 +-
.../cardinality/ColumnCardinalityReducer.java | 4 +-
.../job/tools/ColumnCardinalityMapperTest.java | 4 +-
.../job/tools/ColumnCardinalityReducerTest.java | 2 +-
.../kylin/metadata/measure/HLLCSerializer.java | 6 +-
.../measure/fixedlen/FixedHLLCodec.java | 58 ++++++++++++++++
.../measure/fixedlen/FixedLenMeasureCodec.java | 1 +
.../query/sqlfunc/HLLDistinctCountAggFunc.java | 4 +-
.../endpoint/EndpointAggregators.java | 71 +++++++++++++++-----
14 files changed, 149 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0a96d742/common/src/main/java/org/apache/kylin/common/hll/HyperLogLogPlusCounter.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/hll/HyperLogLogPlusCounter.java b/common/src/main/java/org/apache/kylin/common/hll/HyperLogLogPlusCounter.java
index e725e21..c323b90 100644
--- a/common/src/main/java/org/apache/kylin/common/hll/HyperLogLogPlusCounter.java
+++ b/common/src/main/java/org/apache/kylin/common/hll/HyperLogLogPlusCounter.java
@@ -66,7 +66,7 @@ public class HyperLogLogPlusCounter implements Comparable<HyperLogLogPlusCounter
/** The larger p is, the more storage (2^p bytes), the better accuracy */
private HyperLogLogPlusCounter(int p, HashFunction hashFunc) {
this.p = p;
- this.m = (int) Math.pow(2, p);
+ this.m = 1 << p;//(int) Math.pow(2, p);
this.hashFunc = hashFunc;
this.registers = new byte[m];
}
@@ -77,7 +77,7 @@ public class HyperLogLogPlusCounter implements Comparable<HyperLogLogPlusCounter
}
public void add(String value) {
- add(hashFunc.hashString(value,Charset.defaultCharset()).asLong());
+ add(hashFunc.hashString(value, Charset.defaultCharset()).asLong());
}
public void add(byte[] value) {
@@ -223,7 +223,7 @@ public class HyperLogLogPlusCounter implements Comparable<HyperLogLogPlusCounter
// the
// moment
- public void writeRegisters(final ByteBuffer out) throws IOException {
+ public void writeCompactRegisters(final ByteBuffer out) throws IOException {
int startPos = out.position();
final int indexLen = getRegisterIndexSize();
@@ -264,7 +264,7 @@ public class HyperLogLogPlusCounter implements Comparable<HyperLogLogPlusCounter
out.put(compressed);
}
- public void readRegisters(ByteBuffer in) throws IOException {
+ public void readCompactRegisters(ByteBuffer in) throws IOException {
byte scheme = in.get();
if ((scheme & COMPRESSION_FLAG) > 0) {
scheme ^= COMPRESSION_FLAG;
@@ -286,12 +286,26 @@ public class HyperLogLogPlusCounter implements Comparable<HyperLogLogPlusCounter
registers[key] = in.get();
}
} else { // array scheme
- for (int i = 0; i < m; i++) {
- registers[i] = in.get();
- }
+ in.get(registers);
}
}
+ /**
+ * For compressed output use writeCompactRegisters
+ * @param out
+ */
+ public void writeRegisters(final ByteBuffer out) {
+ out.put(this.registers);
+ }
+
+ /**
+ * For compressed input use readCompactRegisters
+ * @param in
+ */
+ public void readRegisters(ByteBuffer in) {
+ in.get(registers, 0, m);
+ }
+
private int getRegisterIndexSize() {
return (p - 1) / 8 + 1; // 2 when p=16, 3 when p=17
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0a96d742/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/kylin/common/util/BasicTest.java b/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
index 476b9c1..5952c33 100644
--- a/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
+++ b/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
@@ -64,13 +64,9 @@ public class BasicTest {
public void test1() throws Exception {
}
+
@Test
@Ignore("fix it later")
public void test2() throws IOException, ConfigurationException {
- PropertiesConfiguration a = new PropertiesConfiguration();
- a.setProperty("hi", "dd");
-
- System.out.println("dfads");
- a.save(System.out);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0a96d742/common/src/test/java/org/apache/kylin/common/util/HBaseMiniclusterHelper.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/kylin/common/util/HBaseMiniclusterHelper.java b/common/src/test/java/org/apache/kylin/common/util/HBaseMiniclusterHelper.java
index 58058ab..3846302 100644
--- a/common/src/test/java/org/apache/kylin/common/util/HBaseMiniclusterHelper.java
+++ b/common/src/test/java/org/apache/kylin/common/util/HBaseMiniclusterHelper.java
@@ -18,7 +18,6 @@
package org.apache.kylin.common.util;
-import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -27,7 +26,6 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.persistence.HBaseConnection;
import org.apache.kylin.common.persistence.HBaseResourceStore;
/**
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0a96d742/common/src/test/java/org/apache/kylin/common/util/HyperLogLogCounterTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/kylin/common/util/HyperLogLogCounterTest.java b/common/src/test/java/org/apache/kylin/common/util/HyperLogLogCounterTest.java
index 75e84c1..088219f 100644
--- a/common/src/test/java/org/apache/kylin/common/util/HyperLogLogCounterTest.java
+++ b/common/src/test/java/org/apache/kylin/common/util/HyperLogLogCounterTest.java
@@ -108,9 +108,9 @@ public class HyperLogLogCounterTest {
private void checkSerialize(HyperLogLogPlusCounter hllc) throws IOException {
long estimate = hllc.getCountEstimate();
buf.clear();
- hllc.writeRegisters(buf);
+ hllc.writeCompactRegisters(buf);
buf.flip();
- hllc.readRegisters(buf);
+ hllc.readCompactRegisters(buf);
Assert.assertEquals(estimate, hllc.getCountEstimate());
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0a96d742/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java
index dca1a65..886c649 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java
@@ -134,7 +134,8 @@ public class TableRecordInfo {
return -1;
for (int i = 0; i < allColumns.size(); ++i) {
TblColRef tblColRef = allColumns.get(i);
- if (measureSerializers[i] != null && tblColRef.isSameAs(desc.getFactTableName(), metricColumnName)) {
+ if (measureSerializers[i] != null // has measureSerializers means it is a metric
+ && tblColRef.isSameAs(desc.getFactTableName(), metricColumnName)) {
return i;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0a96d742/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityMapper.java
index dd7c720..329914a 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityMapper.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityMapper.java
@@ -94,7 +94,7 @@ public class ColumnCardinalityMapper<T> extends KylinMapper<T, HCatRecord, IntWr
HyperLogLogPlusCounter hllc = hllcMap.get(key);
ByteBuffer buf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
buf.clear();
- hllc.writeRegisters(buf);
+ hllc.writeCompactRegisters(buf);
buf.flip();
context.write(new IntWritable(key), new BytesWritable(buf.array(), buf.limit()));
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0a96d742/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityReducer.java b/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityReducer.java
index a9ac1a3..b4b55d3 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityReducer.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityReducer.java
@@ -50,7 +50,7 @@ public class ColumnCardinalityReducer extends KylinReducer<IntWritable, BytesWri
for (BytesWritable v : values) {
ByteBuffer buffer = ByteBuffer.wrap(v.getBytes());
HyperLogLogPlusCounter hll = new HyperLogLogPlusCounter();
- hll.readRegisters(buffer);
+ hll.readCompactRegisters(buffer);
getHllc(skey).merge(hll);
hll.clear();
}
@@ -77,7 +77,7 @@ public class ColumnCardinalityReducer extends KylinReducer<IntWritable, BytesWri
HyperLogLogPlusCounter hllc = hllcMap.get(key);
ByteBuffer buf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
buf.clear();
- hllc.writeRegisters(buf);
+ hllc.writeCompactRegisters(buf);
buf.flip();
context.write(new IntWritable(key), new LongWritable(hllc.getCountEstimate()));
// context.write(new Text("ErrorRate_" + key), new
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0a96d742/job/src/test/java/org/apache/kylin/job/tools/ColumnCardinalityMapperTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/tools/ColumnCardinalityMapperTest.java b/job/src/test/java/org/apache/kylin/job/tools/ColumnCardinalityMapperTest.java
index e13289a..08c1611 100644
--- a/job/src/test/java/org/apache/kylin/job/tools/ColumnCardinalityMapperTest.java
+++ b/job/src/test/java/org/apache/kylin/job/tools/ColumnCardinalityMapperTest.java
@@ -84,7 +84,7 @@ public class ColumnCardinalityMapperTest {
BytesWritable value1 = result.get(0).getSecond();
byte[] bytes = value1.getBytes();
HyperLogLogPlusCounter hllc = new HyperLogLogPlusCounter();
- hllc.readRegisters(ByteBuffer.wrap(bytes));
+ hllc.readCompactRegisters(ByteBuffer.wrap(bytes));
assertTrue(key1 > 0);
assertEquals(8, hllc.getCountEstimate());
}
@@ -117,7 +117,7 @@ public class ColumnCardinalityMapperTest {
BytesWritable value1 = result.get(0).getSecond();
byte[] bytes = value1.getBytes();
HyperLogLogPlusCounter hllc = new HyperLogLogPlusCounter();
- hllc.readRegisters(ByteBuffer.wrap(bytes));
+ hllc.readCompactRegisters(ByteBuffer.wrap(bytes));
System.out.println("ab\177ab".length());
assertTrue(key1 > 0);
assertEquals(1, hllc.getCountEstimate());
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0a96d742/job/src/test/java/org/apache/kylin/job/tools/ColumnCardinalityReducerTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/tools/ColumnCardinalityReducerTest.java b/job/src/test/java/org/apache/kylin/job/tools/ColumnCardinalityReducerTest.java
index 034c90e..6ad27a8 100644
--- a/job/src/test/java/org/apache/kylin/job/tools/ColumnCardinalityReducerTest.java
+++ b/job/src/test/java/org/apache/kylin/job/tools/ColumnCardinalityReducerTest.java
@@ -67,7 +67,7 @@ public class ColumnCardinalityReducerTest {
}
ByteBuffer buf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
buf.clear();
- hllc.writeRegisters(buf);
+ hllc.writeCompactRegisters(buf);
buf.flip();
return buf.array();
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0a96d742/metadata/src/main/java/org/apache/kylin/metadata/measure/HLLCSerializer.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/measure/HLLCSerializer.java b/metadata/src/main/java/org/apache/kylin/metadata/measure/HLLCSerializer.java
index f63c8d7..20c6c21 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/measure/HLLCSerializer.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/measure/HLLCSerializer.java
@@ -31,14 +31,14 @@ public class HLLCSerializer extends MeasureSerializer<HyperLogLogPlusCounter> {
HyperLogLogPlusCounter current;
- HLLCSerializer(int p) {
+ public HLLCSerializer(int p) {
current = new HyperLogLogPlusCounter(p);
}
@Override
public void serialize(HyperLogLogPlusCounter value, ByteBuffer out) {
try {
- value.writeRegisters(out);
+ value.writeCompactRegisters(out);
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -47,7 +47,7 @@ public class HLLCSerializer extends MeasureSerializer<HyperLogLogPlusCounter> {
@Override
public HyperLogLogPlusCounter deserialize(ByteBuffer in) {
try {
- current.readRegisters(in);
+ current.readCompactRegisters(in);
} catch (IOException e) {
throw new RuntimeException(e);
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0a96d742/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedHLLCodec.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedHLLCodec.java b/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedHLLCodec.java
new file mode 100644
index 0000000..9f6a1ba
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedHLLCodec.java
@@ -0,0 +1,58 @@
+package org.apache.kylin.metadata.measure.fixedlen;
+
+import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.metadata.measure.HLLCSerializer;
+import org.apache.kylin.metadata.model.DataType;
+
+import java.util.Map;
+
+/**
+ * Created by Hongbin Ma(Binmahone) on 2/10/15.
+ */
+public class FixedHLLCodec extends FixedLenMeasureCodec<HyperLogLogPlusCounter> {
+
+ private DataType type;
+ private int presision;
+ private HyperLogLogPlusCounter current;
+
+ public FixedHLLCodec(DataType type) {
+ this.type = type;
+ this.presision = type.getPrecision();
+ this.current = new HyperLogLogPlusCounter(this.presision);
+ }
+
+ @Override
+ public int getLength() {
+ return 1 << presision;
+ }
+
+ @Override
+ public DataType getDataType() {
+ return type;
+ }
+
+ @Override
+ public HyperLogLogPlusCounter valueOf(String value) {
+ current.clear();
+ if (value == null)
+ current.add("__nUlL__");
+ else
+ current.add(value.getBytes());
+ return current;
+ }
+
+ @Override
+ public String toString(HyperLogLogPlusCounter value) {
+ return String.valueOf(value.getCountEstimate());
+ }
+
+ @Override
+ public HyperLogLogPlusCounter read(byte[] buf, int offset) {
+ return serializer.deserialize();
+ }
+
+ @Override
+ public void write(HyperLogLogPlusCounter v, byte[] buf, int offset) {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0a96d742/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedLenMeasureCodec.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedLenMeasureCodec.java b/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedLenMeasureCodec.java
index 00de630..41a6356 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedLenMeasureCodec.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedLenMeasureCodec.java
@@ -20,6 +20,7 @@ package org.apache.kylin.metadata.measure.fixedlen;
import org.apache.kylin.metadata.model.DataType;
+
abstract public class FixedLenMeasureCodec<T> {
public static FixedLenMeasureCodec<?> get(DataType type) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0a96d742/query/src/main/java/org/apache/kylin/query/sqlfunc/HLLDistinctCountAggFunc.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/sqlfunc/HLLDistinctCountAggFunc.java b/query/src/main/java/org/apache/kylin/query/sqlfunc/HLLDistinctCountAggFunc.java
index 356c1e3..8d9ace0 100644
--- a/query/src/main/java/org/apache/kylin/query/sqlfunc/HLLDistinctCountAggFunc.java
+++ b/query/src/main/java/org/apache/kylin/query/sqlfunc/HLLDistinctCountAggFunc.java
@@ -118,12 +118,12 @@ public class HLLDistinctCountAggFunc {
}
@Override
- public void writeRegisters(ByteBuffer out) throws IOException {
+ public void writeCompactRegisters(ByteBuffer out) throws IOException {
throw new UnsupportedOperationException();
}
@Override
- public void readRegisters(ByteBuffer in) throws IOException {
+ public void readCompactRegisters(ByteBuffer in) throws IOException {
throw new UnsupportedOperationException();
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0a96d742/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregators.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregators.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregators.java
index 227ecbe..b199862 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregators.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregators.java
@@ -20,6 +20,7 @@ package org.apache.kylin.storage.hbase.coprocessor.endpoint;
import com.google.common.collect.Lists;
+import com.yammer.metrics.core.Metric;
import org.apache.kylin.common.util.BytesSerializer;
import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.invertedindex.index.TableRecordInfo;
@@ -28,6 +29,7 @@ import org.apache.kylin.metadata.measure.MeasureAggregator;
import org.apache.kylin.metadata.measure.fixedlen.FixedLenMeasureCodec;
import org.apache.kylin.metadata.model.DataType;
import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.storage.hbase.coprocessor.CoprocessorConstants;
import org.apache.hadoop.io.LongWritable;
@@ -40,10 +42,28 @@ import java.util.List;
@SuppressWarnings({ "rawtypes", "unchecked" })
public class EndpointAggregators {
+ private enum MetricType {
+ Count, DimensionAsMetric, DistinctCount, Normal
+ }
+
+ private static class MetricInfo {
+ private MetricType type;
+ private int refIndex = -1;
+
+ public MetricInfo(MetricType type, int refIndex) {
+ this.type = type;
+ this.refIndex = refIndex;
+ }
+
+ public MetricInfo(MetricType type) {
+ this.type = type;
+ }
+ }
+
public static EndpointAggregators fromFunctions(TableRecordInfo tableInfo, List<FunctionDesc> metrics) {
String[] funcNames = new String[metrics.size()];
String[] dataTypes = new String[metrics.size()];
- int[] refColIndex = new int[metrics.size()];
+ MetricInfo[] metricInfos = new MetricInfo[metrics.size()];
for (int i = 0; i < metrics.size(); i++) {
FunctionDesc functionDesc = metrics.get(i);
@@ -53,34 +73,40 @@ public class EndpointAggregators {
dataTypes[i] = functionDesc.getReturnType();
if (functionDesc.isCount()) {
- refColIndex[i] = -1;//-1 for count, -2 for metricOnDimension
+ metricInfos[i] = new MetricInfo(MetricType.Count);
} else if (functionDesc.isAppliedOnDimension()) {
- refColIndex[i] = -2;
+ metricInfos[i] = new MetricInfo(MetricType.DimensionAsMetric);
} else {
- refColIndex[i] = tableInfo.findMetric(functionDesc.getParameter().getValue());
- if (refColIndex[i] < 0) {
+ int index = tableInfo.findMetric(functionDesc.getParameter().getValue());
+ if (index < 0) {
throw new IllegalStateException("Column " + functionDesc.getParameter().getColRefs().get(0) + " is not found in II");
}
+
+ if (functionDesc.isCountDistinct()) {
+ metricInfos[i] = new MetricInfo(MetricType.DistinctCount, index);
+ } else {
+ metricInfos[i] = new MetricInfo(MetricType.Normal, index);
+ }
}
}
- return new EndpointAggregators(funcNames, dataTypes, refColIndex, tableInfo.getDigest());
+ return new EndpointAggregators(funcNames, dataTypes, metricInfos, tableInfo.getDigest());
}
final String[] funcNames;
final String[] dataTypes;
- final int[] refColIndex;
+ final MetricInfo[] metricInfos;
final TableRecordInfoDigest tableRecordInfo;
final transient FixedLenMeasureCodec[] measureSerializers;
final transient Object[] metricValues;
- final LongWritable one = new LongWritable(1);
+ final LongWritable ONE = new LongWritable(1);
- public EndpointAggregators(String[] funcNames, String[] dataTypes, int[] refColIndex, TableRecordInfoDigest tableInfo) {
+ public EndpointAggregators(String[] funcNames, String[] dataTypes, MetricInfo[] metricInfos, TableRecordInfoDigest tableInfo) {
this.funcNames = funcNames;
this.dataTypes = dataTypes;
- this.refColIndex = refColIndex;
+ this.metricInfos = metricInfos;
this.tableRecordInfo = tableInfo;
this.metricValues = new Object[funcNames.length];
@@ -111,12 +137,19 @@ public class EndpointAggregators {
int rawIndex = 0;
int columnCount = tableRecordInfo.getColumnCount();
- //normal column values to aggregate
for (int columnIndex = 0; columnIndex < columnCount; ++columnIndex) {
- if (tableRecordInfo.isMetrics(columnIndex)) {
- for (int metricIndex = 0; metricIndex < refColIndex.length; ++metricIndex) {
- if (refColIndex[metricIndex] == columnIndex) {
+ for (int metricIndex = 0; metricIndex < metricInfos.length; ++metricIndex) {
+ if (metricInfos[metricIndex].refIndex == columnIndex) {
+ if (metricInfos[metricIndex].type == MetricType.Normal) {
+ //normal column values to aggregate
measureAggrs[metricIndex].aggregate(measureSerializers[metricIndex].read(row, rawIndex));
+ } else if (metricInfos[metricIndex].type == MetricType.DistinctCount) {
+ if (tableRecordInfo.isMetrics(columnCount)) {
+ measureAggrs[metricIndex].aggregate(measureSerializers[metricIndex].read(row, rawIndex));
+ } else {
+ //TODO: for unified dictionary, this is okay. but if different data blocks uses different dictionary, we'll have to aggregate original data
+ measureAggrs[metricIndex].aggregate(tableRecordInfo.);
+ }
}
}
}
@@ -124,9 +157,11 @@ public class EndpointAggregators {
}
//aggregate for "count"
- for (int i = 0; i < refColIndex.length; ++i) {
- if (refColIndex[i] == -1) {
- measureAggrs[i].aggregate(one);
+ for (int i = 0; i < metricInfos.length; ++i) {
+ if (metricInfos[i].type == MetricType.Count) {
+ measureAggrs[i].aggregate(ONE);
+ } else if (metricInfos[i].type == MetricType.DistinctCount) {
+
}
}
}
@@ -180,7 +215,7 @@ public class EndpointAggregators {
public void serialize(EndpointAggregators value, ByteBuffer out) {
BytesUtil.writeAsciiStringArray(value.funcNames, out);
BytesUtil.writeAsciiStringArray(value.dataTypes, out);
- BytesUtil.writeIntArray(value.refColIndex, out);
+ BytesUtil.writeIntArray(value.metricInfos, out);
BytesUtil.writeByteArray(TableRecordInfoDigest.serialize(value.tableRecordInfo), out);
}