You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/02/12 06:43:33 UTC
[43/50] incubator-kylin git commit: KYLIN-608 support HLL at ii
storage
KYLIN-608 support HLL at ii storage
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/c5d329fe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/c5d329fe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/c5d329fe
Branch: refs/heads/inverted-index
Commit: c5d329fe098a0d4886f3e73a6ce0e99a621c8e67
Parents: 0a96d74
Author: honma <ho...@ebay.com>
Authored: Thu Feb 12 10:36:37 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Thu Feb 12 10:36:37 2015 +0800
----------------------------------------------------------------------
.../common/hll/HyperLogLogPlusCounter.java | 14 +-
.../org/apache/kylin/common/util/BasicTest.java | 19 +-
.../common/util/HyperLogLogCounterTest.java | 12 +
.../invertedindex/index/RawTableRecord.java | 20 +-
.../apache/kylin/invertedindex/index/Slice.java | 330 +++++++++----------
.../kylin/invertedindex/index/TableRecord.java | 12 +-
.../measure/fixedlen/FixedHLLCodec.java | 14 +-
.../measure/fixedlen/FixedLenMeasureCodec.java | 3 +-
.../measure/fixedlen/FixedPointLongCodec.java | 6 +-
.../endpoint/EndpointAggregators.java | 117 ++++---
.../endpoint/EndpointTupleIterator.java | 18 +-
.../hbase/coprocessor/endpoint/IIEndpoint.java | 2 +-
.../org/apache/kylin/storage/tuple/Tuple.java | 12 +-
13 files changed, 325 insertions(+), 254 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c5d329fe/common/src/main/java/org/apache/kylin/common/hll/HyperLogLogPlusCounter.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/hll/HyperLogLogPlusCounter.java b/common/src/main/java/org/apache/kylin/common/hll/HyperLogLogPlusCounter.java
index c323b90..49a6756 100644
--- a/common/src/main/java/org/apache/kylin/common/hll/HyperLogLogPlusCounter.java
+++ b/common/src/main/java/org/apache/kylin/common/hll/HyperLogLogPlusCounter.java
@@ -28,10 +28,10 @@ import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
import org.apache.commons.compress.utils.IOUtils;
+import org.apache.kylin.common.util.BytesUtil;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
-import org.apache.kylin.common.util.BytesUtil;
import com.ning.compress.lzf.LZFDecoder;
import com.ning.compress.lzf.LZFEncoder;
@@ -72,8 +72,12 @@ public class HyperLogLogPlusCounter implements Comparable<HyperLogLogPlusCounter
}
public void clear() {
- for (int i = 0; i < m; i++)
- registers[i] = 0;
+ byte zero = (byte) 0;
+ Arrays.fill(registers, zero);
+ }
+
+ public void add(int value) {
+ add(hashFunc.hashInt(value).asLong());
}
public void add(String value) {
@@ -84,6 +88,10 @@ public class HyperLogLogPlusCounter implements Comparable<HyperLogLogPlusCounter
add(hashFunc.hashBytes(value).asLong());
}
+ public void add(byte[] value, int offset, int length) {
+ add(hashFunc.hashBytes(value, offset, length).asLong());
+ }
+
protected void add(long hash) {
int bucketMask = m - 1;
int bucket = (int) (hash & bucketMask);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c5d329fe/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/kylin/common/util/BasicTest.java b/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
index 5952c33..0a33f9f 100644
--- a/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
+++ b/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
@@ -22,17 +22,9 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.commons.httpclient.DefaultHttpMethodRetryHandler;
-import org.apache.commons.httpclient.HttpClient;
-import org.apache.commons.httpclient.HttpException;
-import org.apache.commons.httpclient.HttpStatus;
-import org.apache.commons.httpclient.methods.GetMethod;
-import org.apache.commons.httpclient.params.HttpMethodParams;
import org.junit.Ignore;
import org.junit.Test;
-import org.slf4j.*;
-import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Created by honma on 10/17/14.
@@ -59,12 +51,19 @@ public class BasicTest {
System.out.printf("b");
}
+ private enum MetricType {
+ Count, DimensionAsMetric, DistinctCount, Normal
+ }
+
@Test
@Ignore("convenient trial tool for dev")
public void test1() throws Exception {
+ String x = MetricType.DimensionAsMetric.toString();
+ System.out.println(x);
+ MetricType y = MetricType.valueOf(x);
+ System.out.println(y == MetricType.DimensionAsMetric);
}
-
@Test
@Ignore("fix it later")
public void test2() throws IOException, ConfigurationException {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c5d329fe/common/src/test/java/org/apache/kylin/common/util/HyperLogLogCounterTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/kylin/common/util/HyperLogLogCounterTest.java b/common/src/test/java/org/apache/kylin/common/util/HyperLogLogCounterTest.java
index 088219f..a7d275a 100644
--- a/common/src/test/java/org/apache/kylin/common/util/HyperLogLogCounterTest.java
+++ b/common/src/test/java/org/apache/kylin/common/util/HyperLogLogCounterTest.java
@@ -204,6 +204,18 @@ public class HyperLogLogCounterTest {
System.out.println("Perf test result: " + duration / 1000 + " seconds");
}
+ @Test
+ public void testEquivalence() {
+ byte[] a = new byte[] { 0, 3, 4, 42, 2, 2 };
+ byte[] b = new byte[] { 3, 4, 42 };
+ HyperLogLogPlusCounter ha = new HyperLogLogPlusCounter();
+ HyperLogLogPlusCounter hb = new HyperLogLogPlusCounter();
+ ha.add(a, 1, 3);
+ hb.add(b);
+
+ Assert.assertTrue(ha.getCountEstimate()==hb.getCountEstimate());
+ }
+
private HyperLogLogPlusCounter newHLLC() {
return new HyperLogLogPlusCounter(16);
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c5d329fe/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/RawTableRecord.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/RawTableRecord.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/RawTableRecord.java
index 14ea62b..895fd4f 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/RawTableRecord.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/RawTableRecord.java
@@ -49,42 +49,44 @@ public class RawTableRecord implements Cloneable {
Arrays.fill(buf, Dictionary.NULL);
}
- protected boolean isMetric(int col) {
+ public boolean isMetric(int col) {
return digest.isMetrics(col);
}
- protected FixedLenMeasureCodec<LongWritable> codec(int col) {
+ public FixedLenMeasureCodec<LongWritable> codec(int col) {
return digest.codec(col);
}
- protected int length(int col) {
+ public int length(int col) {
return digest.length(col);
}
- protected int getColumnCount() {
+ public int getColumnCount() {
return digest.getColumnCount();
}
- protected void setValueID(int col, int id) {
+ public void setValueID(int col, int id) {
BytesUtil.writeUnsigned(id, buf, digest.offset(col), digest.length(col));
}
- protected int getValueID(int col) {
+ public int getValueID(int col) {
return BytesUtil.readUnsigned(buf, digest.offset(col), digest.length(col));
}
- protected void setValueMetrics(int col, LongWritable value) {
+ public void setValueMetrics(int col, LongWritable value) {
digest.codec(col).write(value, buf, digest.offset(col));
}
- protected LongWritable getValueMetrics(int col) {
- return digest.codec(col).read(buf, digest.offset(col));
+ public String getValueMetric(int col) {
+ digest.codec(col).read(buf, digest.offset(col));
+ return (String) digest.codec(col).getValue();
}
public byte[] getBytes() {
return buf;
}
+ //TODO is it possible to avoid copying?
public void setBytes(byte[] bytes, int offset, int length) {
assert buf.length == length;
System.arraycopy(bytes, offset, buf, 0, length);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c5d329fe/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/Slice.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/Slice.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/Slice.java
index fef9892..59dd9cd 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/Slice.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/Slice.java
@@ -31,171 +31,169 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
*/
public class Slice implements Iterable<RawTableRecord>, Comparable<Slice> {
- TableRecordInfoDigest info;
- int nColumns;
-
- short shard;
- long timestamp;
- int nRecords;
- ColumnValueContainer[] containers;
-
- public Slice(TableRecordInfoDigest digest, short shard, long timestamp,
- ColumnValueContainer[] containers) {
- this.info = digest;
- this.nColumns = digest.getColumnCount();
-
- this.shard = shard;
- this.timestamp = timestamp;
- this.nRecords = containers[0].getSize();
- this.containers = containers;
-
- assert nColumns == containers.length;
- for (int i = 0; i < nColumns; i++) {
- assert nRecords == containers[i].getSize();
- }
- }
-
- public int getRecordCount() {
- return this.nRecords;
- }
-
- public short getShard() {
- return shard;
- }
-
- public long getTimestamp() {
- return timestamp;
- }
-
- public ColumnValueContainer[] getColumnValueContainers() {
- return containers;
- }
-
- public ColumnValueContainer getColumnValueContainer(int col) {
- return containers[col];
- }
-
- public Iterator<RawTableRecord> iterateWithBitmap(
- final ConciseSet resultBitMap) {
- if (resultBitMap == null) {
- return this.iterator();
- } else {
- return new Iterator<RawTableRecord>() {
- int i = 0;
- int iteratedCount = 0;
- int resultSize = resultBitMap.size();
-
- RawTableRecord rec = info.createTableRecordBytes();
- ImmutableBytesWritable temp = new ImmutableBytesWritable();
-
- @Override
- public boolean hasNext() {
- return iteratedCount < resultSize;
- }
-
- @Override
- public RawTableRecord next() {
- while (!resultBitMap.contains(i)) {
- i++;
- }
- for (int col = 0; col < nColumns; col++) {
- containers[col].getValueAt(i, temp);
- rec.setValueBytes(col, temp);
- }
- iteratedCount++;
- i++;
-
- return rec;
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
-
- };
- }
- }
-
- @Override
- public Iterator<RawTableRecord> iterator() {
- return new Iterator<RawTableRecord>() {
- int i = 0;
- RawTableRecord rec = info.createTableRecordBytes();
- ImmutableBytesWritable temp = new ImmutableBytesWritable();
-
- @Override
- public boolean hasNext() {
- return i < nRecords;
- }
-
- @Override
- public RawTableRecord next() {
- for (int col = 0; col < nColumns; col++) {
- containers[col].getValueAt(i, temp);
- rec.setValueBytes(col, temp);
- }
- i++;
- return rec;
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
-
- };
- }
-
- /*
- * (non-Javadoc)
- *
- * @see java.lang.Object#hashCode()
- */
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + ((info == null) ? 0 : info.hashCode());
- result = prime * result + shard;
- result = prime * result + (int) (timestamp ^ (timestamp >>> 32));
- return result;
- }
-
- /*
- * (non-Javadoc)
- *
- * @see java.lang.Object#equals(java.lang.Object)
- */
- @Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- Slice other = (Slice) obj;
- if (info == null) {
- if (other.info != null)
- return false;
- } else if (!info.equals(other.info))
- return false;
- if (shard != other.shard)
- return false;
- if (timestamp != other.timestamp)
- return false;
- return true;
- }
-
- @Override
- public int compareTo(Slice o) {
- int comp = this.shard - o.shard;
- if (comp != 0)
- return comp;
-
- comp = (int) (this.timestamp - o.timestamp);
- return comp;
- }
+ TableRecordInfoDigest info;
+ int nColumns;
+
+ short shard;
+ long timestamp;
+ int nRecords;
+ ColumnValueContainer[] containers;
+
+ public Slice(TableRecordInfoDigest digest, short shard, long timestamp, ColumnValueContainer[] containers) {
+ this.info = digest;
+ this.nColumns = digest.getColumnCount();
+
+ this.shard = shard;
+ this.timestamp = timestamp;
+ this.nRecords = containers[0].getSize();
+ this.containers = containers;
+
+ assert nColumns == containers.length;
+ for (int i = 0; i < nColumns; i++) {
+ assert nRecords == containers[i].getSize();
+ }
+ }
+
+ public int getRecordCount() {
+ return this.nRecords;
+ }
+
+ public short getShard() {
+ return shard;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ public ColumnValueContainer[] getColumnValueContainers() {
+ return containers;
+ }
+
+ public ColumnValueContainer getColumnValueContainer(int col) {
+ return containers[col];
+ }
+
+ public Iterator<RawTableRecord> iterateWithBitmap(final ConciseSet resultBitMap) {
+ if (resultBitMap == null) {
+ return this.iterator();
+ } else {
+ final RawTableRecord rec = info.createTableRecordBytes();
+ final ImmutableBytesWritable temp = new ImmutableBytesWritable();
+
+ return new Iterator<RawTableRecord>() {
+ int i = 0;
+ int iteratedCount = 0;
+ int resultSize = resultBitMap.size();
+
+ @Override
+ public boolean hasNext() {
+ return iteratedCount < resultSize;
+ }
+
+ @Override
+ public RawTableRecord next() {
+ while (!resultBitMap.contains(i)) {
+ i++;
+ }
+ for (int col = 0; col < nColumns; col++) {
+ containers[col].getValueAt(i, temp);
+ rec.setValueBytes(col, temp);
+ }
+ iteratedCount++;
+ i++;
+
+ return rec;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ };
+ }
+ }
+
+ @Override
+ public Iterator<RawTableRecord> iterator() {
+ return new Iterator<RawTableRecord>() {
+ int i = 0;
+ RawTableRecord rec = info.createTableRecordBytes();
+ ImmutableBytesWritable temp = new ImmutableBytesWritable();
+
+ @Override
+ public boolean hasNext() {
+ return i < nRecords;
+ }
+
+ @Override
+ public RawTableRecord next() {
+ for (int col = 0; col < nColumns; col++) {
+ containers[col].getValueAt(i, temp);
+ rec.setValueBytes(col, temp);
+ }
+ i++;
+ return rec;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ };
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.lang.Object#hashCode()
+ */
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((info == null) ? 0 : info.hashCode());
+ result = prime * result + shard;
+ result = prime * result + (int) (timestamp ^ (timestamp >>> 32));
+ return result;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.lang.Object#equals(java.lang.Object)
+ */
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ Slice other = (Slice) obj;
+ if (info == null) {
+ if (other.info != null)
+ return false;
+ } else if (!info.equals(other.info))
+ return false;
+ if (shard != other.shard)
+ return false;
+ if (timestamp != other.timestamp)
+ return false;
+ return true;
+ }
+
+ @Override
+ public int compareTo(Slice o) {
+ int comp = this.shard - o.shard;
+ if (comp != 0)
+ return comp;
+
+ comp = (int) (this.timestamp - o.timestamp);
+ return comp;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c5d329fe/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecord.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecord.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecord.java
index 1abbe18..3b8d969 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecord.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecord.java
@@ -73,7 +73,7 @@ public class TableRecord implements Cloneable {
return rawRecord.length(col);
}
- public List<String> getValueList() {
+ public List<String> getOriginTableColumnValues() {
List<String> ret = Lists.newArrayList();
for (int i = 0; i < info.nColumns; ++i) {
ret.add(getValueString(i));
@@ -91,9 +91,13 @@ public class TableRecord implements Cloneable {
}
}
+ /**
+ * get value of columns which belongs to the original table columns.
+ * i.e. columns like min_xx, max_yy will never appear
+ */
public String getValueString(int col) {
if (rawRecord.isMetric(col))
- return rawRecord.codec(col).toString(getValueMetrics(col));
+ return getValueMetric(col);
else
return info.dict(col).getValueFromId(rawRecord.getValueID(col));
}
@@ -106,8 +110,8 @@ public class TableRecord implements Cloneable {
rawRecord.setValueMetrics(col, value);
}
- private LongWritable getValueMetrics(int col) {
- return rawRecord.getValueMetrics(col);
+ private String getValueMetric(int col) {
+ return rawRecord.getValueMetric(col);
}
public short getShard() {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c5d329fe/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedHLLCodec.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedHLLCodec.java b/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedHLLCodec.java
index 9f6a1ba..d787cbc 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedHLLCodec.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedHLLCodec.java
@@ -1,11 +1,10 @@
package org.apache.kylin.metadata.measure.fixedlen;
+import java.nio.ByteBuffer;
+
import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
-import org.apache.kylin.metadata.measure.HLLCSerializer;
import org.apache.kylin.metadata.model.DataType;
-import java.util.Map;
-
/**
* Created by Hongbin Ma(Binmahone) on 2/10/15.
*/
@@ -42,17 +41,18 @@ public class FixedHLLCodec extends FixedLenMeasureCodec<HyperLogLogPlusCounter>
}
@Override
- public String toString(HyperLogLogPlusCounter value) {
- return String.valueOf(value.getCountEstimate());
+ public Object getValue() {
+ return current;
}
@Override
public HyperLogLogPlusCounter read(byte[] buf, int offset) {
- return serializer.deserialize();
+ current.readRegisters(ByteBuffer.wrap(buf, offset, buf.length - offset));
+ return current;
}
@Override
public void write(HyperLogLogPlusCounter v, byte[] buf, int offset) {
-
+ current.writeRegisters(ByteBuffer.wrap(buf, offset, buf.length - offset));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c5d329fe/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedLenMeasureCodec.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedLenMeasureCodec.java b/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedLenMeasureCodec.java
index 41a6356..650432a 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedLenMeasureCodec.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedLenMeasureCodec.java
@@ -33,7 +33,8 @@ abstract public class FixedLenMeasureCodec<T> {
abstract public T valueOf(String value);
- abstract public String toString(T value);
+
+ abstract public Object getValue();
abstract public T read(byte[] buf, int offset);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c5d329fe/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedPointLongCodec.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedPointLongCodec.java b/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedPointLongCodec.java
index f27e446..9ccb479 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedPointLongCodec.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedPointLongCodec.java
@@ -61,11 +61,11 @@ public class FixedPointLongCodec extends FixedLenMeasureCodec<LongWritable> {
}
@Override
- public String toString(LongWritable value) {
+ public String getValue() {
if (scale == 0)
- return value.toString();
+ return current.toString();
else
- return "" + (new BigDecimal(value.get()).divide(scalePower));
+ return "" + (new BigDecimal(current.get()).divide(scalePower));
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c5d329fe/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregators.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregators.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregators.java
index b199862..c6d8c49 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregators.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregators.java
@@ -18,23 +18,24 @@
package org.apache.kylin.storage.hbase.coprocessor.endpoint;
-import com.google.common.collect.Lists;
+import java.nio.ByteBuffer;
+import java.util.List;
-import com.yammer.metrics.core.Metric;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
import org.apache.kylin.common.util.BytesSerializer;
import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.invertedindex.index.RawTableRecord;
import org.apache.kylin.invertedindex.index.TableRecordInfo;
import org.apache.kylin.invertedindex.index.TableRecordInfoDigest;
import org.apache.kylin.metadata.measure.MeasureAggregator;
import org.apache.kylin.metadata.measure.fixedlen.FixedLenMeasureCodec;
import org.apache.kylin.metadata.model.DataType;
import org.apache.kylin.metadata.model.FunctionDesc;
-import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.storage.hbase.coprocessor.CoprocessorConstants;
-import org.apache.hadoop.io.LongWritable;
-import java.nio.ByteBuffer;
-import java.util.List;
+import com.google.common.collect.Lists;
/**
* @author honma
@@ -49,6 +50,13 @@ public class EndpointAggregators {
private static class MetricInfo {
private MetricType type;
private int refIndex = -1;
+ private int presision = -1;
+
+ public MetricInfo(MetricType type, int refIndex, int presision) {
+ this.type = type;
+ this.refIndex = refIndex;
+ this.presision = presision;
+ }
public MetricInfo(MetricType type, int refIndex) {
this.type = type;
@@ -58,6 +66,7 @@ public class EndpointAggregators {
public MetricInfo(MetricType type) {
this.type = type;
}
+
}
public static EndpointAggregators fromFunctions(TableRecordInfo tableInfo, List<FunctionDesc> metrics) {
@@ -83,7 +92,7 @@ public class EndpointAggregators {
}
if (functionDesc.isCountDistinct()) {
- metricInfos[i] = new MetricInfo(MetricType.DistinctCount, index);
+ metricInfos[i] = new MetricInfo(MetricType.DistinctCount, index, functionDesc.getReturnDataType().getPrecision());
} else {
metricInfos[i] = new MetricInfo(MetricType.Normal, index);
}
@@ -96,8 +105,11 @@ public class EndpointAggregators {
final String[] funcNames;
final String[] dataTypes;
final MetricInfo[] metricInfos;
- final TableRecordInfoDigest tableRecordInfo;
+ final transient TableRecordInfoDigest tableRecordInfoDigest;
+ final transient RawTableRecord rawTableRecord;
+ final transient ImmutableBytesWritable byteBuffer;
+ final transient HyperLogLogPlusCounter[] hllcs;
final transient FixedLenMeasureCodec[] measureSerializers;
final transient Object[] metricValues;
@@ -107,8 +119,11 @@ public class EndpointAggregators {
this.funcNames = funcNames;
this.dataTypes = dataTypes;
this.metricInfos = metricInfos;
- this.tableRecordInfo = tableInfo;
+ this.tableRecordInfoDigest = tableInfo;
+ this.rawTableRecord = tableInfo.createTableRecordBytes();
+ this.byteBuffer = new ImmutableBytesWritable();
+ this.hllcs = new HyperLogLogPlusCounter[this.metricInfos.length];
this.metricValues = new Object[funcNames.length];
this.measureSerializers = new FixedLenMeasureCodec[funcNames.length];
for (int i = 0; i < this.measureSerializers.length; ++i) {
@@ -116,8 +131,8 @@ public class EndpointAggregators {
}
}
- public TableRecordInfoDigest getTableRecordInfo() {
- return tableRecordInfo;
+ public TableRecordInfoDigest getTableRecordInfoDigest() {
+ return tableRecordInfoDigest;
}
public boolean isEmpty() {
@@ -133,35 +148,41 @@ public class EndpointAggregators {
return aggrs;
}
+ /**
+ * this method is heavily called at coprocessor side,
+ * Make sure as little object creation as possible
+ */
public void aggregate(MeasureAggregator[] measureAggrs, byte[] row) {
- int rawIndex = 0;
- int columnCount = tableRecordInfo.getColumnCount();
-
- for (int columnIndex = 0; columnIndex < columnCount; ++columnIndex) {
- for (int metricIndex = 0; metricIndex < metricInfos.length; ++metricIndex) {
- if (metricInfos[metricIndex].refIndex == columnIndex) {
- if (metricInfos[metricIndex].type == MetricType.Normal) {
- //normal column values to aggregate
- measureAggrs[metricIndex].aggregate(measureSerializers[metricIndex].read(row, rawIndex));
- } else if (metricInfos[metricIndex].type == MetricType.DistinctCount) {
- if (tableRecordInfo.isMetrics(columnCount)) {
- measureAggrs[metricIndex].aggregate(measureSerializers[metricIndex].read(row, rawIndex));
- } else {
- //TODO: for unified dictionary, this is okay. but if different data blocks uses different dictionary, we'll have to aggregate original data
- measureAggrs[metricIndex].aggregate(tableRecordInfo.);
- }
- }
+
+ rawTableRecord.setBytes(row, 0, row.length);
+
+ for (int metricIndex = 0; metricIndex < metricInfos.length; ++metricIndex) {
+
+ MetricInfo metricInfo = metricInfos[metricIndex];
+ MeasureAggregator aggregator = measureAggrs[metricIndex];
+ FixedLenMeasureCodec measureSerializer = measureSerializers[metricIndex];
+
+ //get the raw bytes
+ rawTableRecord.getValueBytes(metricInfo.refIndex, byteBuffer);
+
+ if (metricInfo.type == MetricType.Normal) {
+ aggregator.aggregate(measureSerializer.read(byteBuffer.get(), byteBuffer.getOffset()));
+ } else if (metricInfo.type == MetricType.DistinctCount) {
+ //TODO: for unified dictionary, this is okay. but if different data blocks uses different dictionary, we'll have to aggregate original data
+ HyperLogLogPlusCounter hllc = hllcs[metricIndex];
+ if (hllc == null) {
+ hllc = new HyperLogLogPlusCounter(metricInfo.presision);
}
+ hllc.clear();
+ hllc.add(byteBuffer.get(), byteBuffer.getOffset(), byteBuffer.getLength());
+ aggregator.aggregate(hllc);
}
- rawIndex += tableRecordInfo.length(columnIndex);
}
//aggregate for "count"
for (int i = 0; i < metricInfos.length; ++i) {
if (metricInfos[i].type == MetricType.Count) {
measureAggrs[i].aggregate(ONE);
- } else if (metricInfos[i].type == MetricType.DistinctCount) {
-
}
}
}
@@ -184,11 +205,12 @@ public class EndpointAggregators {
return metricBytesOffset;
}
- public List<String> deserializeMetricValues(byte[] metricBytes, int offset) {
- List<String> ret = Lists.newArrayList();
+ public List<Object> deserializeMetricValues(byte[] metricBytes, int offset) {
+ List<Object> ret = Lists.newArrayList();
int metricBytesOffset = offset;
for (int i = 0; i < measureSerializers.length; i++) {
- String valueString = measureSerializers[i].toString(measureSerializers[i].read(metricBytes, metricBytesOffset));
+ measureSerializers[i].read(metricBytes, metricBytesOffset);
+ Object valueString = measureSerializers[i].getValue();
metricBytesOffset += measureSerializers[i].getLength();
ret.add(valueString);
}
@@ -215,18 +237,37 @@ public class EndpointAggregators {
public void serialize(EndpointAggregators value, ByteBuffer out) {
BytesUtil.writeAsciiStringArray(value.funcNames, out);
BytesUtil.writeAsciiStringArray(value.dataTypes, out);
- BytesUtil.writeIntArray(value.metricInfos, out);
- BytesUtil.writeByteArray(TableRecordInfoDigest.serialize(value.tableRecordInfo), out);
+
+ BytesUtil.writeVInt(value.metricInfos.length, out);
+ for (int i = 0; i < value.metricInfos.length; ++i) {
+ MetricInfo metricInfo = value.metricInfos[i];
+ BytesUtil.writeAsciiString(metricInfo.type.toString(), out);
+ BytesUtil.writeVInt(metricInfo.refIndex, out);
+ BytesUtil.writeVInt(metricInfo.presision, out);
+ }
+
+ BytesUtil.writeByteArray(TableRecordInfoDigest.serialize(value.tableRecordInfoDigest), out);
}
@Override
public EndpointAggregators deserialize(ByteBuffer in) {
+
String[] funcNames = BytesUtil.readAsciiStringArray(in);
String[] dataTypes = BytesUtil.readAsciiStringArray(in);
- int[] refColIndex = BytesUtil.readIntArray(in);
+
+ int metricInfoLength = BytesUtil.readVInt(in);
+ MetricInfo[] infos = new MetricInfo[metricInfoLength];
+ for (int i = 0; i < infos.length; ++i) {
+ MetricType type = MetricType.valueOf(BytesUtil.readAsciiString(in));
+ int refIndex = BytesUtil.readVInt(in);
+ int presision = BytesUtil.readVInt(in);
+ infos[i] = new MetricInfo(type, refIndex, presision);
+ }
+
byte[] temp = BytesUtil.readByteArray(in);
TableRecordInfoDigest tableInfo = TableRecordInfoDigest.deserialize(temp);
- return new EndpointAggregators(funcNames, dataTypes, refColIndex, tableInfo);
+
+ return new EndpointAggregators(funcNames, dataTypes, infos, tableInfo);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c5d329fe/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointTupleIterator.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointTupleIterator.java
index d63bc0d..465f7f3 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointTupleIterator.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointTupleIterator.java
@@ -266,7 +266,7 @@ public class EndpointTupleIterator implements ITupleIterator {
//not thread safe!
private TableRecord tableRecord;
- private List<String> measureValues;
+ private List<Object> measureValues;
private Tuple tuple;
public SingleRegionTupleIterator(List<IIProtos.IIResponse.IIRow> rows) {
@@ -305,26 +305,32 @@ public class EndpointTupleIterator implements ITupleIterator {
}
- private ITuple makeTuple(TableRecord tableRecord, List<String> measureValues) {
+ private ITuple makeTuple(TableRecord tableRecord, List<Object> measureValues) {
// groups
- List<String> columnValues = tableRecord.getValueList();
+ List<String> columnValues = tableRecord.getOriginTableColumnValues();
for (int i = 0; i < columnNames.size(); i++) {
TblColRef column = columns.get(i);
if (!tuple.hasColumn(column)) {
continue;
}
- tuple.setValue(columnNames.get(i), columnValues.get(i));
+ tuple.setDimensionValue(columnNames.get(i), columnValues.get(i));
}
if (measureValues != null) {
for (int i = 0; i < measures.size(); ++i) {
if (!measures.get(i).isAppliedOnDimension()) {
- tuple.setValue(measures.get(i).getRewriteFieldName(), measureValues.get(i));
+ String fieldName = measures.get(i).getRewriteFieldName();
+ Object value = measureValues.get(i);
+ String dataType = tuple.getDataType(fieldName);
+ //TODO: currently in II all metrics except HLLC is returned as String
+ if (dataType.toLowerCase().equalsIgnoreCase("hllc")) {
+ value = Tuple.convertOptiqCellValue((String) value, dataType);
+ }
+ tuple.setMeasureValue(fieldName, value);
}
}
}
return tuple;
}
-
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c5d329fe/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/IIEndpoint.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/IIEndpoint.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/IIEndpoint.java
index db68803..4852e3b 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/IIEndpoint.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/IIEndpoint.java
@@ -85,7 +85,7 @@ public class IIEndpoint extends IIProtos.RowsService implements Coprocessor, Cop
aggregators = EndpointAggregators.deserialize(request.getAggregator().toByteArray());
filter = CoprocessorFilter.deserialize(request.getFilter().toByteArray());
- TableRecordInfoDigest tableRecordInfoDigest = aggregators.getTableRecordInfo();
+ TableRecordInfoDigest tableRecordInfoDigest = aggregators.getTableRecordInfoDigest();
IIProtos.IIResponse response = null;
RegionScanner innerScanner = null;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c5d329fe/storage/src/main/java/org/apache/kylin/storage/tuple/Tuple.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/tuple/Tuple.java b/storage/src/main/java/org/apache/kylin/storage/tuple/Tuple.java
index 7b5fe1f..dd19e0c 100644
--- a/storage/src/main/java/org/apache/kylin/storage/tuple/Tuple.java
+++ b/storage/src/main/java/org/apache/kylin/storage/tuple/Tuple.java
@@ -78,18 +78,17 @@ public class Tuple implements ITuple {
return values[index];
}
+ public String getDataType(String fieldName) {
+ return info.getDataType(fieldName);
+ }
+
private void setFieldObjectValue(String fieldName, Object fieldValue) {
int index = info.getFieldIndex(fieldName);
values[index] = fieldValue;
}
- public void setValue(String fieldName, String fieldValue) {
- this.setDimensionValue(fieldName, fieldValue);
- }
-
public void setDimensionValue(String fieldName, String fieldValue) {
- String dataType = info.getDataType(fieldName);
- Object objectValue = convertOptiqCellValue(fieldValue, dataType);
+ Object objectValue = convertOptiqCellValue(fieldValue, getDataType(fieldName));
setFieldObjectValue(fieldName, objectValue);
}
@@ -121,6 +120,7 @@ public class Tuple implements ITuple {
return sb.toString();
}
+
public static Object convertOptiqCellValue(String strValue, String dataType) {
if (strValue == null)
return null;