You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/02/12 06:43:33 UTC

[43/50] incubator-kylin git commit: KYLIN-608 support HLL at ii storage

KYLIN-608 support HLL at ii storage


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/c5d329fe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/c5d329fe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/c5d329fe

Branch: refs/heads/inverted-index
Commit: c5d329fe098a0d4886f3e73a6ce0e99a621c8e67
Parents: 0a96d74
Author: honma <ho...@ebay.com>
Authored: Thu Feb 12 10:36:37 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Thu Feb 12 10:36:37 2015 +0800

----------------------------------------------------------------------
 .../common/hll/HyperLogLogPlusCounter.java      |  14 +-
 .../org/apache/kylin/common/util/BasicTest.java |  19 +-
 .../common/util/HyperLogLogCounterTest.java     |  12 +
 .../invertedindex/index/RawTableRecord.java     |  20 +-
 .../apache/kylin/invertedindex/index/Slice.java | 330 +++++++++----------
 .../kylin/invertedindex/index/TableRecord.java  |  12 +-
 .../measure/fixedlen/FixedHLLCodec.java         |  14 +-
 .../measure/fixedlen/FixedLenMeasureCodec.java  |   3 +-
 .../measure/fixedlen/FixedPointLongCodec.java   |   6 +-
 .../endpoint/EndpointAggregators.java           | 117 ++++---
 .../endpoint/EndpointTupleIterator.java         |  18 +-
 .../hbase/coprocessor/endpoint/IIEndpoint.java  |   2 +-
 .../org/apache/kylin/storage/tuple/Tuple.java   |  12 +-
 13 files changed, 325 insertions(+), 254 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c5d329fe/common/src/main/java/org/apache/kylin/common/hll/HyperLogLogPlusCounter.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/hll/HyperLogLogPlusCounter.java b/common/src/main/java/org/apache/kylin/common/hll/HyperLogLogPlusCounter.java
index c323b90..49a6756 100644
--- a/common/src/main/java/org/apache/kylin/common/hll/HyperLogLogPlusCounter.java
+++ b/common/src/main/java/org/apache/kylin/common/hll/HyperLogLogPlusCounter.java
@@ -28,10 +28,10 @@ import java.util.zip.GZIPInputStream;
 import java.util.zip.GZIPOutputStream;
 
 import org.apache.commons.compress.utils.IOUtils;
+import org.apache.kylin.common.util.BytesUtil;
 
 import com.google.common.hash.HashFunction;
 import com.google.common.hash.Hashing;
-import org.apache.kylin.common.util.BytesUtil;
 import com.ning.compress.lzf.LZFDecoder;
 import com.ning.compress.lzf.LZFEncoder;
 
@@ -72,8 +72,12 @@ public class HyperLogLogPlusCounter implements Comparable<HyperLogLogPlusCounter
     }
 
     public void clear() {
-        for (int i = 0; i < m; i++)
-            registers[i] = 0;
+        byte zero = (byte) 0;
+        Arrays.fill(registers, zero);
+    }
+
+    public void add(int value) {
+        add(hashFunc.hashInt(value).asLong());
     }
 
     public void add(String value) {
@@ -84,6 +88,10 @@ public class HyperLogLogPlusCounter implements Comparable<HyperLogLogPlusCounter
         add(hashFunc.hashBytes(value).asLong());
     }
 
+    public void add(byte[] value, int offset, int length) {
+        add(hashFunc.hashBytes(value, offset, length).asLong());
+    }
+
     protected void add(long hash) {
         int bucketMask = m - 1;
         int bucket = (int) (hash & bucketMask);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c5d329fe/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/kylin/common/util/BasicTest.java b/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
index 5952c33..0a33f9f 100644
--- a/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
+++ b/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
@@ -22,17 +22,9 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.commons.httpclient.DefaultHttpMethodRetryHandler;
-import org.apache.commons.httpclient.HttpClient;
-import org.apache.commons.httpclient.HttpException;
-import org.apache.commons.httpclient.HttpStatus;
-import org.apache.commons.httpclient.methods.GetMethod;
-import org.apache.commons.httpclient.params.HttpMethodParams;
 import org.junit.Ignore;
 import org.junit.Test;
-import org.slf4j.*;
-import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
 * Created by honma on 10/17/14.
@@ -59,12 +51,19 @@ public class BasicTest {
         System.out.printf("b");
     }
 
+    private enum MetricType {
+        Count, DimensionAsMetric, DistinctCount, Normal
+    }
+
     @Test
     @Ignore("convenient trial tool for dev")
     public void test1() throws Exception {
+        String x =  MetricType.DimensionAsMetric.toString();
+        System.out.println(x);
+        MetricType y = MetricType.valueOf(x);
+        System.out.println(y == MetricType.DimensionAsMetric);
     }
 
-
     @Test
     @Ignore("fix it later")
     public void test2() throws IOException, ConfigurationException {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c5d329fe/common/src/test/java/org/apache/kylin/common/util/HyperLogLogCounterTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/kylin/common/util/HyperLogLogCounterTest.java b/common/src/test/java/org/apache/kylin/common/util/HyperLogLogCounterTest.java
index 088219f..a7d275a 100644
--- a/common/src/test/java/org/apache/kylin/common/util/HyperLogLogCounterTest.java
+++ b/common/src/test/java/org/apache/kylin/common/util/HyperLogLogCounterTest.java
@@ -204,6 +204,18 @@ public class HyperLogLogCounterTest {
         System.out.println("Perf test result: " + duration / 1000 + " seconds");
     }
 
+    @Test
+    public void testEquivalence() {
+        byte[] a = new byte[] { 0, 3, 4, 42, 2, 2 };
+        byte[] b = new byte[] { 3, 4, 42 };
+        HyperLogLogPlusCounter ha = new HyperLogLogPlusCounter();
+        HyperLogLogPlusCounter hb = new HyperLogLogPlusCounter();
+        ha.add(a, 1, 3);
+        hb.add(b);
+
+        Assert.assertTrue(ha.getCountEstimate()==hb.getCountEstimate());
+    }
+
     private HyperLogLogPlusCounter newHLLC() {
         return new HyperLogLogPlusCounter(16);
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c5d329fe/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/RawTableRecord.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/RawTableRecord.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/RawTableRecord.java
index 14ea62b..895fd4f 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/RawTableRecord.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/RawTableRecord.java
@@ -49,42 +49,44 @@ public class RawTableRecord implements Cloneable {
         Arrays.fill(buf, Dictionary.NULL);
     }
 
-    protected boolean isMetric(int col) {
+    public boolean isMetric(int col) {
         return digest.isMetrics(col);
     }
 
-    protected FixedLenMeasureCodec<LongWritable> codec(int col) {
+    public FixedLenMeasureCodec<LongWritable> codec(int col) {
         return digest.codec(col);
     }
 
-    protected int length(int col) {
+    public int length(int col) {
         return digest.length(col);
     }
 
-    protected int getColumnCount() {
+    public int getColumnCount() {
         return digest.getColumnCount();
     }
 
-    protected void setValueID(int col, int id) {
+    public void setValueID(int col, int id) {
         BytesUtil.writeUnsigned(id, buf, digest.offset(col), digest.length(col));
     }
 
-    protected int getValueID(int col) {
+    public int getValueID(int col) {
         return BytesUtil.readUnsigned(buf, digest.offset(col), digest.length(col));
     }
 
-    protected void setValueMetrics(int col, LongWritable value) {
+    public void setValueMetrics(int col, LongWritable value) {
         digest.codec(col).write(value, buf, digest.offset(col));
     }
 
-    protected LongWritable getValueMetrics(int col) {
-        return digest.codec(col).read(buf, digest.offset(col));
+    public String getValueMetric(int col) {
+        digest.codec(col).read(buf, digest.offset(col));
+        return (String) digest.codec(col).getValue();
     }
 
     public byte[] getBytes() {
         return buf;
     }
 
+    //TODO is it possible to avoid copying?
     public void setBytes(byte[] bytes, int offset, int length) {
         assert buf.length == length;
         System.arraycopy(bytes, offset, buf, 0, length);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c5d329fe/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/Slice.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/Slice.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/Slice.java
index fef9892..59dd9cd 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/Slice.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/Slice.java
@@ -31,171 +31,169 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
  */
 public class Slice implements Iterable<RawTableRecord>, Comparable<Slice> {
 
-	TableRecordInfoDigest info;
-	int nColumns;
-
-	short shard;
-	long timestamp;
-	int nRecords;
-	ColumnValueContainer[] containers;
-
-	public Slice(TableRecordInfoDigest digest, short shard, long timestamp,
-			ColumnValueContainer[] containers) {
-		this.info = digest;
-		this.nColumns = digest.getColumnCount();
-
-		this.shard = shard;
-		this.timestamp = timestamp;
-		this.nRecords = containers[0].getSize();
-		this.containers = containers;
-
-		assert nColumns == containers.length;
-		for (int i = 0; i < nColumns; i++) {
-			assert nRecords == containers[i].getSize();
-		}
-	}
-
-	public int getRecordCount() {
-		return this.nRecords;
-	}
-
-	public short getShard() {
-		return shard;
-	}
-
-	public long getTimestamp() {
-		return timestamp;
-	}
-
-	public ColumnValueContainer[] getColumnValueContainers() {
-		return containers;
-	}
-
-	public ColumnValueContainer getColumnValueContainer(int col) {
-		return containers[col];
-	}
-
-	public Iterator<RawTableRecord> iterateWithBitmap(
-			final ConciseSet resultBitMap) {
-		if (resultBitMap == null) {
-			return this.iterator();
-		} else {
-			return new Iterator<RawTableRecord>() {
-				int i = 0;
-				int iteratedCount = 0;
-				int resultSize = resultBitMap.size();
-
-				RawTableRecord rec = info.createTableRecordBytes();
-				ImmutableBytesWritable temp = new ImmutableBytesWritable();
-
-				@Override
-				public boolean hasNext() {
-					return iteratedCount < resultSize;
-				}
-
-				@Override
-				public RawTableRecord next() {
-					while (!resultBitMap.contains(i)) {
-						i++;
-					}
-					for (int col = 0; col < nColumns; col++) {
-						containers[col].getValueAt(i, temp);
-						rec.setValueBytes(col, temp);
-					}
-					iteratedCount++;
-					i++;
-
-					return rec;
-				}
-
-				@Override
-				public void remove() {
-					throw new UnsupportedOperationException();
-				}
-
-			};
-		}
-	}
-
-	@Override
-	public Iterator<RawTableRecord> iterator() {
-		return new Iterator<RawTableRecord>() {
-			int i = 0;
-			RawTableRecord rec = info.createTableRecordBytes();
-			ImmutableBytesWritable temp = new ImmutableBytesWritable();
-
-			@Override
-			public boolean hasNext() {
-				return i < nRecords;
-			}
-
-			@Override
-			public RawTableRecord next() {
-				for (int col = 0; col < nColumns; col++) {
-					containers[col].getValueAt(i, temp);
-					rec.setValueBytes(col, temp);
-				}
-				i++;
-				return rec;
-			}
-
-			@Override
-			public void remove() {
-				throw new UnsupportedOperationException();
-			}
-
-		};
-	}
-
-	/*
-	 * (non-Javadoc)
-	 * 
-	 * @see java.lang.Object#hashCode()
-	 */
-	@Override
-	public int hashCode() {
-		final int prime = 31;
-		int result = 1;
-		result = prime * result + ((info == null) ? 0 : info.hashCode());
-		result = prime * result + shard;
-		result = prime * result + (int) (timestamp ^ (timestamp >>> 32));
-		return result;
-	}
-
-	/*
-	 * (non-Javadoc)
-	 * 
-	 * @see java.lang.Object#equals(java.lang.Object)
-	 */
-	@Override
-	public boolean equals(Object obj) {
-		if (this == obj)
-			return true;
-		if (obj == null)
-			return false;
-		if (getClass() != obj.getClass())
-			return false;
-		Slice other = (Slice) obj;
-		if (info == null) {
-			if (other.info != null)
-				return false;
-		} else if (!info.equals(other.info))
-			return false;
-		if (shard != other.shard)
-			return false;
-		if (timestamp != other.timestamp)
-			return false;
-		return true;
-	}
-
-	@Override
-	public int compareTo(Slice o) {
-		int comp = this.shard - o.shard;
-		if (comp != 0)
-			return comp;
-
-		comp = (int) (this.timestamp - o.timestamp);
-		return comp;
-	}
+    TableRecordInfoDigest info;
+    int nColumns;
+
+    short shard;
+    long timestamp;
+    int nRecords;
+    ColumnValueContainer[] containers;
+
+    public Slice(TableRecordInfoDigest digest, short shard, long timestamp, ColumnValueContainer[] containers) {
+        this.info = digest;
+        this.nColumns = digest.getColumnCount();
+
+        this.shard = shard;
+        this.timestamp = timestamp;
+        this.nRecords = containers[0].getSize();
+        this.containers = containers;
+
+        assert nColumns == containers.length;
+        for (int i = 0; i < nColumns; i++) {
+            assert nRecords == containers[i].getSize();
+        }
+    }
+
+    public int getRecordCount() {
+        return this.nRecords;
+    }
+
+    public short getShard() {
+        return shard;
+    }
+
+    public long getTimestamp() {
+        return timestamp;
+    }
+
+    public ColumnValueContainer[] getColumnValueContainers() {
+        return containers;
+    }
+
+    public ColumnValueContainer getColumnValueContainer(int col) {
+        return containers[col];
+    }
+
+    public Iterator<RawTableRecord> iterateWithBitmap(final ConciseSet resultBitMap) {
+        if (resultBitMap == null) {
+            return this.iterator();
+        } else {
+            final RawTableRecord rec = info.createTableRecordBytes();
+            final ImmutableBytesWritable temp = new ImmutableBytesWritable();
+
+            return new Iterator<RawTableRecord>() {
+                int i = 0;
+                int iteratedCount = 0;
+                int resultSize = resultBitMap.size();
+
+                @Override
+                public boolean hasNext() {
+                    return iteratedCount < resultSize;
+                }
+
+                @Override
+                public RawTableRecord next() {
+                    while (!resultBitMap.contains(i)) {
+                        i++;
+                    }
+                    for (int col = 0; col < nColumns; col++) {
+                        containers[col].getValueAt(i, temp);
+                        rec.setValueBytes(col, temp);
+                    }
+                    iteratedCount++;
+                    i++;
+
+                    return rec;
+                }
+
+                @Override
+                public void remove() {
+                    throw new UnsupportedOperationException();
+                }
+
+            };
+        }
+    }
+
+    @Override
+    public Iterator<RawTableRecord> iterator() {
+        return new Iterator<RawTableRecord>() {
+            int i = 0;
+            RawTableRecord rec = info.createTableRecordBytes();
+            ImmutableBytesWritable temp = new ImmutableBytesWritable();
+
+            @Override
+            public boolean hasNext() {
+                return i < nRecords;
+            }
+
+            @Override
+            public RawTableRecord next() {
+                for (int col = 0; col < nColumns; col++) {
+                    containers[col].getValueAt(i, temp);
+                    rec.setValueBytes(col, temp);
+                }
+                i++;
+                return rec;
+            }
+
+            @Override
+            public void remove() {
+                throw new UnsupportedOperationException();
+            }
+
+        };
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see java.lang.Object#hashCode()
+     */
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + ((info == null) ? 0 : info.hashCode());
+        result = prime * result + shard;
+        result = prime * result + (int) (timestamp ^ (timestamp >>> 32));
+        return result;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see java.lang.Object#equals(java.lang.Object)
+     */
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        Slice other = (Slice) obj;
+        if (info == null) {
+            if (other.info != null)
+                return false;
+        } else if (!info.equals(other.info))
+            return false;
+        if (shard != other.shard)
+            return false;
+        if (timestamp != other.timestamp)
+            return false;
+        return true;
+    }
+
+    @Override
+    public int compareTo(Slice o) {
+        int comp = this.shard - o.shard;
+        if (comp != 0)
+            return comp;
+
+        comp = (int) (this.timestamp - o.timestamp);
+        return comp;
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c5d329fe/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecord.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecord.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecord.java
index 1abbe18..3b8d969 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecord.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecord.java
@@ -73,7 +73,7 @@ public class TableRecord implements Cloneable {
         return rawRecord.length(col);
     }
 
-    public List<String> getValueList() {
+    public List<String> getOriginTableColumnValues() {
         List<String> ret = Lists.newArrayList();
         for (int i = 0; i < info.nColumns; ++i) {
             ret.add(getValueString(i));
@@ -91,9 +91,13 @@ public class TableRecord implements Cloneable {
         }
     }
 
+    /**
+     * get value of columns which belongs to the original table columns.
+     * i.e. columns like min_xx, max_yy will never appear
+     */
     public String getValueString(int col) {
         if (rawRecord.isMetric(col))
-            return rawRecord.codec(col).toString(getValueMetrics(col));
+            return getValueMetric(col);
         else
             return info.dict(col).getValueFromId(rawRecord.getValueID(col));
     }
@@ -106,8 +110,8 @@ public class TableRecord implements Cloneable {
         rawRecord.setValueMetrics(col, value);
     }
 
-    private LongWritable getValueMetrics(int col) {
-        return rawRecord.getValueMetrics(col);
+    private String getValueMetric(int col) {
+        return rawRecord.getValueMetric(col);
     }
 
     public short getShard() {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c5d329fe/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedHLLCodec.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedHLLCodec.java b/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedHLLCodec.java
index 9f6a1ba..d787cbc 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedHLLCodec.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedHLLCodec.java
@@ -1,11 +1,10 @@
 package org.apache.kylin.metadata.measure.fixedlen;
 
+import java.nio.ByteBuffer;
+
 import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
-import org.apache.kylin.metadata.measure.HLLCSerializer;
 import org.apache.kylin.metadata.model.DataType;
 
-import java.util.Map;
-
 /**
  * Created by Hongbin Ma(Binmahone) on 2/10/15.
  */
@@ -42,17 +41,18 @@ public class FixedHLLCodec extends FixedLenMeasureCodec<HyperLogLogPlusCounter>
     }
 
     @Override
-    public String toString(HyperLogLogPlusCounter value) {
-        return String.valueOf(value.getCountEstimate());
+    public Object getValue() {
+        return current;
     }
 
     @Override
     public HyperLogLogPlusCounter read(byte[] buf, int offset) {
-        return serializer.deserialize();
+        current.readRegisters(ByteBuffer.wrap(buf, offset, buf.length - offset));
+        return current;
     }
 
     @Override
     public void write(HyperLogLogPlusCounter v, byte[] buf, int offset) {
-
+        current.writeRegisters(ByteBuffer.wrap(buf, offset, buf.length - offset));
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c5d329fe/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedLenMeasureCodec.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedLenMeasureCodec.java b/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedLenMeasureCodec.java
index 41a6356..650432a 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedLenMeasureCodec.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedLenMeasureCodec.java
@@ -33,7 +33,8 @@ abstract public class FixedLenMeasureCodec<T> {
 
     abstract public T valueOf(String value);
 
-    abstract public String toString(T value);
+
+    abstract public Object getValue();
 
     abstract public T read(byte[] buf, int offset);
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c5d329fe/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedPointLongCodec.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedPointLongCodec.java b/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedPointLongCodec.java
index f27e446..9ccb479 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedPointLongCodec.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedPointLongCodec.java
@@ -61,11 +61,11 @@ public class FixedPointLongCodec extends FixedLenMeasureCodec<LongWritable> {
     }
 
     @Override
-    public String toString(LongWritable value) {
+    public String getValue() {
         if (scale == 0)
-            return value.toString();
+            return current.toString();
         else
-            return "" + (new BigDecimal(value.get()).divide(scalePower));
+            return "" + (new BigDecimal(current.get()).divide(scalePower));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c5d329fe/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregators.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregators.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregators.java
index b199862..c6d8c49 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregators.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregators.java
@@ -18,23 +18,24 @@
 
 package org.apache.kylin.storage.hbase.coprocessor.endpoint;
 
-import com.google.common.collect.Lists;
+import java.nio.ByteBuffer;
+import java.util.List;
 
-import com.yammer.metrics.core.Metric;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
 import org.apache.kylin.common.util.BytesSerializer;
 import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.invertedindex.index.RawTableRecord;
 import org.apache.kylin.invertedindex.index.TableRecordInfo;
 import org.apache.kylin.invertedindex.index.TableRecordInfoDigest;
 import org.apache.kylin.metadata.measure.MeasureAggregator;
 import org.apache.kylin.metadata.measure.fixedlen.FixedLenMeasureCodec;
 import org.apache.kylin.metadata.model.DataType;
 import org.apache.kylin.metadata.model.FunctionDesc;
-import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.storage.hbase.coprocessor.CoprocessorConstants;
-import org.apache.hadoop.io.LongWritable;
 
-import java.nio.ByteBuffer;
-import java.util.List;
+import com.google.common.collect.Lists;
 
 /**
  * @author honma
@@ -49,6 +50,13 @@ public class EndpointAggregators {
     private static class MetricInfo {
         private MetricType type;
         private int refIndex = -1;
+        private int presision = -1;
+
+        public MetricInfo(MetricType type, int refIndex, int presision) {
+            this.type = type;
+            this.refIndex = refIndex;
+            this.presision = presision;
+        }
 
         public MetricInfo(MetricType type, int refIndex) {
             this.type = type;
@@ -58,6 +66,7 @@ public class EndpointAggregators {
         public MetricInfo(MetricType type) {
             this.type = type;
         }
+
     }
 
     public static EndpointAggregators fromFunctions(TableRecordInfo tableInfo, List<FunctionDesc> metrics) {
@@ -83,7 +92,7 @@ public class EndpointAggregators {
                 }
 
                 if (functionDesc.isCountDistinct()) {
-                    metricInfos[i] = new MetricInfo(MetricType.DistinctCount, index);
+                    metricInfos[i] = new MetricInfo(MetricType.DistinctCount, index, functionDesc.getReturnDataType().getPrecision());
                 } else {
                     metricInfos[i] = new MetricInfo(MetricType.Normal, index);
                 }
@@ -96,8 +105,11 @@ public class EndpointAggregators {
     final String[] funcNames;
     final String[] dataTypes;
     final MetricInfo[] metricInfos;
-    final TableRecordInfoDigest tableRecordInfo;
 
+    final transient TableRecordInfoDigest tableRecordInfoDigest;
+    final transient RawTableRecord rawTableRecord;
+    final transient ImmutableBytesWritable byteBuffer;
+    final transient HyperLogLogPlusCounter[] hllcs;
     final transient FixedLenMeasureCodec[] measureSerializers;
     final transient Object[] metricValues;
 
@@ -107,8 +119,11 @@ public class EndpointAggregators {
         this.funcNames = funcNames;
         this.dataTypes = dataTypes;
         this.metricInfos = metricInfos;
-        this.tableRecordInfo = tableInfo;
+        this.tableRecordInfoDigest = tableInfo;
+        this.rawTableRecord = tableInfo.createTableRecordBytes();
+        this.byteBuffer = new ImmutableBytesWritable();
 
+        this.hllcs = new HyperLogLogPlusCounter[this.metricInfos.length];
         this.metricValues = new Object[funcNames.length];
         this.measureSerializers = new FixedLenMeasureCodec[funcNames.length];
         for (int i = 0; i < this.measureSerializers.length; ++i) {
@@ -116,8 +131,8 @@ public class EndpointAggregators {
         }
     }
 
-    public TableRecordInfoDigest getTableRecordInfo() {
-        return tableRecordInfo;
+    public TableRecordInfoDigest getTableRecordInfoDigest() {
+        return tableRecordInfoDigest;
     }
 
     public boolean isEmpty() {
@@ -133,35 +148,41 @@ public class EndpointAggregators {
         return aggrs;
     }
 
+    /**
+     * this method is heavily called at coprocessor side,
+     * Make sure as little object creation as possible
+     */
     public void aggregate(MeasureAggregator[] measureAggrs, byte[] row) {
-        int rawIndex = 0;
-        int columnCount = tableRecordInfo.getColumnCount();
-
-        for (int columnIndex = 0; columnIndex < columnCount; ++columnIndex) {
-            for (int metricIndex = 0; metricIndex < metricInfos.length; ++metricIndex) {
-                if (metricInfos[metricIndex].refIndex == columnIndex) {
-                    if (metricInfos[metricIndex].type == MetricType.Normal) {
-                        //normal column values to aggregate
-                        measureAggrs[metricIndex].aggregate(measureSerializers[metricIndex].read(row, rawIndex));
-                    } else if (metricInfos[metricIndex].type == MetricType.DistinctCount) {
-                        if (tableRecordInfo.isMetrics(columnCount)) {
-                            measureAggrs[metricIndex].aggregate(measureSerializers[metricIndex].read(row, rawIndex));
-                        } else {
-                            //TODO: for unified dictionary, this is okay. but if different data blocks uses different dictionary, we'll have to aggregate original data
-                            measureAggrs[metricIndex].aggregate(tableRecordInfo.);
-                        }
-                    }
+
+        rawTableRecord.setBytes(row, 0, row.length);
+
+        for (int metricIndex = 0; metricIndex < metricInfos.length; ++metricIndex) {
+
+            MetricInfo metricInfo = metricInfos[metricIndex];
+            MeasureAggregator aggregator = measureAggrs[metricIndex];
+            FixedLenMeasureCodec measureSerializer = measureSerializers[metricIndex];
+
+            //get the raw bytes
+            rawTableRecord.getValueBytes(metricInfo.refIndex, byteBuffer);
+
+            if (metricInfo.type == MetricType.Normal) {
+                aggregator.aggregate(measureSerializer.read(byteBuffer.get(), byteBuffer.getOffset()));
+            } else if (metricInfo.type == MetricType.DistinctCount) {
+                //TODO: for unified dictionary, this is okay. but if different data blocks uses different dictionary, we'll have to aggregate original data
+                HyperLogLogPlusCounter hllc = hllcs[metricIndex];
+                if (hllc == null) {
+                    hllc = new HyperLogLogPlusCounter(metricInfo.presision);
                 }
+                hllc.clear();
+                hllc.add(byteBuffer.get(), byteBuffer.getOffset(), byteBuffer.getLength());
+                aggregator.aggregate(hllc);
             }
-            rawIndex += tableRecordInfo.length(columnIndex);
         }
 
         //aggregate for "count"
         for (int i = 0; i < metricInfos.length; ++i) {
             if (metricInfos[i].type == MetricType.Count) {
                 measureAggrs[i].aggregate(ONE);
-            } else if (metricInfos[i].type == MetricType.DistinctCount) {
-
             }
         }
     }
@@ -184,11 +205,12 @@ public class EndpointAggregators {
         return metricBytesOffset;
     }
 
-    public List<String> deserializeMetricValues(byte[] metricBytes, int offset) {
-        List<String> ret = Lists.newArrayList();
+    public List<Object> deserializeMetricValues(byte[] metricBytes, int offset) {
+        List<Object> ret = Lists.newArrayList();
         int metricBytesOffset = offset;
         for (int i = 0; i < measureSerializers.length; i++) {
-            String valueString = measureSerializers[i].toString(measureSerializers[i].read(metricBytes, metricBytesOffset));
+            measureSerializers[i].read(metricBytes, metricBytesOffset);
+            Object valueString = measureSerializers[i].getValue();
             metricBytesOffset += measureSerializers[i].getLength();
             ret.add(valueString);
         }
@@ -215,18 +237,37 @@ public class EndpointAggregators {
         public void serialize(EndpointAggregators value, ByteBuffer out) {
             BytesUtil.writeAsciiStringArray(value.funcNames, out);
             BytesUtil.writeAsciiStringArray(value.dataTypes, out);
-            BytesUtil.writeIntArray(value.metricInfos, out);
-            BytesUtil.writeByteArray(TableRecordInfoDigest.serialize(value.tableRecordInfo), out);
+
+            BytesUtil.writeVInt(value.metricInfos.length, out);
+            for (int i = 0; i < value.metricInfos.length; ++i) {
+                MetricInfo metricInfo = value.metricInfos[i];
+                BytesUtil.writeAsciiString(metricInfo.type.toString(), out);
+                BytesUtil.writeVInt(metricInfo.refIndex, out);
+                BytesUtil.writeVInt(metricInfo.presision, out);
+            }
+
+            BytesUtil.writeByteArray(TableRecordInfoDigest.serialize(value.tableRecordInfoDigest), out);
         }
 
         @Override
         public EndpointAggregators deserialize(ByteBuffer in) {
+
             String[] funcNames = BytesUtil.readAsciiStringArray(in);
             String[] dataTypes = BytesUtil.readAsciiStringArray(in);
-            int[] refColIndex = BytesUtil.readIntArray(in);
+
+            int metricInfoLength = BytesUtil.readVInt(in);
+            MetricInfo[] infos = new MetricInfo[metricInfoLength];
+            for (int i = 0; i < infos.length; ++i) {
+                MetricType type = MetricType.valueOf(BytesUtil.readAsciiString(in));
+                int refIndex = BytesUtil.readVInt(in);
+                int presision = BytesUtil.readVInt(in);
+                infos[i] = new MetricInfo(type, refIndex, presision);
+            }
+
             byte[] temp = BytesUtil.readByteArray(in);
             TableRecordInfoDigest tableInfo = TableRecordInfoDigest.deserialize(temp);
-            return new EndpointAggregators(funcNames, dataTypes, refColIndex, tableInfo);
+
+            return new EndpointAggregators(funcNames, dataTypes, infos, tableInfo);
         }
 
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c5d329fe/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointTupleIterator.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointTupleIterator.java
index d63bc0d..465f7f3 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointTupleIterator.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointTupleIterator.java
@@ -266,7 +266,7 @@ public class EndpointTupleIterator implements ITupleIterator {
 
         //not thread safe!
         private TableRecord tableRecord;
-        private List<String> measureValues;
+        private List<Object> measureValues;
         private Tuple tuple;
 
         public SingleRegionTupleIterator(List<IIProtos.IIResponse.IIRow> rows) {
@@ -305,26 +305,32 @@ public class EndpointTupleIterator implements ITupleIterator {
 
         }
 
-        private ITuple makeTuple(TableRecord tableRecord, List<String> measureValues) {
+        private ITuple makeTuple(TableRecord tableRecord, List<Object> measureValues) {
             // groups
-            List<String> columnValues = tableRecord.getValueList();
+            List<String> columnValues = tableRecord.getOriginTableColumnValues();
             for (int i = 0; i < columnNames.size(); i++) {
                 TblColRef column = columns.get(i);
                 if (!tuple.hasColumn(column)) {
                     continue;
                 }
-                tuple.setValue(columnNames.get(i), columnValues.get(i));
+                tuple.setDimensionValue(columnNames.get(i), columnValues.get(i));
             }
 
             if (measureValues != null) {
                 for (int i = 0; i < measures.size(); ++i) {
                     if (!measures.get(i).isAppliedOnDimension()) {
-                        tuple.setValue(measures.get(i).getRewriteFieldName(), measureValues.get(i));
+                        String fieldName = measures.get(i).getRewriteFieldName();
+                        Object value = measureValues.get(i);
+                        String dataType = tuple.getDataType(fieldName);
+                        //TODO: currently in II all metrics except HLLC is returned as String
+                        if (dataType.toLowerCase().equalsIgnoreCase("hllc")) {
+                            value = Tuple.convertOptiqCellValue((String) value, dataType);
+                        }
+                        tuple.setMeasureValue(fieldName, value);
                     }
                 }
             }
             return tuple;
         }
-
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c5d329fe/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/IIEndpoint.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/IIEndpoint.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/IIEndpoint.java
index db68803..4852e3b 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/IIEndpoint.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/IIEndpoint.java
@@ -85,7 +85,7 @@ public class IIEndpoint extends IIProtos.RowsService implements Coprocessor, Cop
         aggregators = EndpointAggregators.deserialize(request.getAggregator().toByteArray());
         filter = CoprocessorFilter.deserialize(request.getFilter().toByteArray());
 
-        TableRecordInfoDigest tableRecordInfoDigest = aggregators.getTableRecordInfo();
+        TableRecordInfoDigest tableRecordInfoDigest = aggregators.getTableRecordInfoDigest();
 
         IIProtos.IIResponse response = null;
         RegionScanner innerScanner = null;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c5d329fe/storage/src/main/java/org/apache/kylin/storage/tuple/Tuple.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/tuple/Tuple.java b/storage/src/main/java/org/apache/kylin/storage/tuple/Tuple.java
index 7b5fe1f..dd19e0c 100644
--- a/storage/src/main/java/org/apache/kylin/storage/tuple/Tuple.java
+++ b/storage/src/main/java/org/apache/kylin/storage/tuple/Tuple.java
@@ -78,18 +78,17 @@ public class Tuple implements ITuple {
         return values[index];
     }
 
+    public String getDataType(String fieldName) {
+        return info.getDataType(fieldName);
+    }
+
     private void setFieldObjectValue(String fieldName, Object fieldValue) {
         int index = info.getFieldIndex(fieldName);
         values[index] = fieldValue;
     }
 
-    public void setValue(String fieldName, String fieldValue) {
-        this.setDimensionValue(fieldName, fieldValue);
-    }
-
     public void setDimensionValue(String fieldName, String fieldValue) {
-        String dataType = info.getDataType(fieldName);
-        Object objectValue = convertOptiqCellValue(fieldValue, dataType);
+        Object objectValue = convertOptiqCellValue(fieldValue, getDataType(fieldName));
         setFieldObjectValue(fieldName, objectValue);
     }
 
@@ -121,6 +120,7 @@ public class Tuple implements ITuple {
         return sb.toString();
     }
 
+
     public static Object convertOptiqCellValue(String strValue, String dataType) {
         if (strValue == null)
             return null;