You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2015/06/25 06:23:37 UTC

incubator-kylin git commit: KYLIN-857 backport the new AggrKey from 0.8 to 0.7

Repository: incubator-kylin
Updated Branches:
  refs/heads/0.7-staging 269d92253 -> cc25a0424


KYLIN-857 backport the new AggrKey from 0.8 to 0.7


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/cc25a042
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/cc25a042
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/cc25a042

Branch: refs/heads/0.7-staging
Commit: cc25a04243a32bf60cf794e87211661c0dc16080
Parents: 269d922
Author: honma <ho...@ebay.com>
Authored: Thu Jun 25 12:22:15 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Thu Jun 25 12:22:15 2015 +0800

----------------------------------------------------------------------
 .../storage/hbase/coprocessor/AggrKey.java      | 116 +++++++++++++++++++
 .../hbase/coprocessor/AggregationCache.java     |   4 +-
 .../hbase/coprocessor/CoprocessorProjector.java |  85 ++------------
 .../endpoint/EndpointAggregationCache.java      |   4 +-
 .../hbase/coprocessor/endpoint/IIEndpoint.java  |  30 ++---
 .../observer/AggregationScanner.java            |   3 +-
 .../observer/ObserverAggregationCache.java      |  25 ++--
 .../hbase/coprocessor/RowProjectorTest.java     |  14 +--
 .../endpoint/EndpointAggregationTest.java       |  13 ++-
 9 files changed, 166 insertions(+), 128 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/cc25a042/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/AggrKey.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/AggrKey.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/AggrKey.java
new file mode 100644
index 0000000..7309f61
--- /dev/null
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/AggrKey.java
@@ -0,0 +1,116 @@
+package org.apache.kylin.storage.hbase.coprocessor;
+
+import com.google.common.collect.Lists;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.cube.kv.RowConstants;
+
+import java.util.Arrays;
+import java.util.LinkedList;
+
+/**
+ * Created by qianzhou on 4/20/15.
+ */
+public class AggrKey implements Comparable<AggrKey> {
+
+    final byte[] groupByMask;
+    final transient int[] groupByMaskSet;
+    transient int hashcode;
+    byte[] data;
+    int offset;
+
+    AggrKey(byte[] groupByMask) {
+        this.groupByMask = groupByMask;
+        LinkedList<Integer> list = Lists.newLinkedList();
+        for (int i = 0; i < groupByMask.length; i++) {
+            if (groupByMask[i] != 0) {
+                list.add(i);
+            }
+        }
+        groupByMaskSet = new int[list.size()];
+        for (int i = 0; i < list.size(); i++) {
+            groupByMaskSet[i] = list.get(i);
+        }
+    }
+
+    private AggrKey(byte[] groupByMask, int[] groupByMaskSet, byte[] data, int offset, int hashcode) {
+        this.groupByMask = groupByMask;
+        this.groupByMaskSet = groupByMaskSet;
+        this.data = data;
+        this.offset = offset;
+        this.hashcode = hashcode;
+    }
+
+    private int calculateHash()
+    {
+        int hash = 1;
+        for (int i = 0; i < groupByMaskSet.length; i++) {
+            byte t = data[offset + groupByMaskSet[i]];
+            if(t!= RowConstants.ROWKEY_PLACE_HOLDER_BYTE) {
+                hash = (31 * hash) + t;
+            }
+        }
+        return hash;
+    }
+
+    public byte[] get() {
+        return data;
+    }
+
+    public int offset() {
+        return offset;
+    }
+
+    public int length() {
+        return groupByMask.length;
+    }
+
+    void set(byte[] data, int offset) {
+        this.data = data;
+        this.offset = offset;
+        this.hashcode = calculateHash();
+    }
+
+    public byte[] getGroupByMask() {
+        return this.groupByMask;
+    }
+
+    public byte[] copyBytes() {
+        return Arrays.copyOfRange(data, offset, offset + length());
+    }
+
+    AggrKey copy() {
+        AggrKey copy = new AggrKey(this.groupByMask, this.groupByMaskSet, copyBytes(), 0, this.hashcode);
+        return copy;
+    }
+
+    @Override
+    public int hashCode() {
+        return hashcode;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        AggrKey other = (AggrKey) obj;
+        return compareTo(other) == 0;
+    }
+
+    @Override
+    public int compareTo(AggrKey o) {
+        int comp = this.length() - o.length();
+        if (comp != 0)
+            return comp;
+
+        for (int i = 0; i < groupByMaskSet.length; i++) {
+            comp = BytesUtil.compareByteUnsigned(this.data[this.offset + groupByMaskSet[i]], o.data[o.offset + groupByMaskSet[i]]);
+            if (comp != 0)
+                return comp;
+        }
+        return 0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/cc25a042/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/AggregationCache.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/AggregationCache.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/AggregationCache.java
index 0c48487..483c72c 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/AggregationCache.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/AggregationCache.java
@@ -30,7 +30,7 @@ import java.util.SortedMap;
 public abstract class AggregationCache {
     transient int rowMemBytes;
     static final int MEMORY_USAGE_CAP = 500 * 1024 * 1024; // 500 MB
-    protected final SortedMap<CoprocessorProjector.AggrKey, MeasureAggregator[]> aggBufMap;
+    protected final SortedMap<AggrKey, MeasureAggregator[]> aggBufMap;
 
     public AggregationCache() {
         this.aggBufMap = Maps.newTreeMap();
@@ -38,7 +38,7 @@ public abstract class AggregationCache {
 
     public abstract MeasureAggregator[] createBuffer();
 
-    public MeasureAggregator[] getBuffer(CoprocessorProjector.AggrKey aggkey) {
+    public MeasureAggregator[] getBuffer(AggrKey aggkey) {
         MeasureAggregator[] aggBuf = aggBufMap.get(aggkey);
         if (aggBuf == null) {
             aggBuf = createBuffer();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/cc25a042/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/CoprocessorProjector.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/CoprocessorProjector.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/CoprocessorProjector.java
index b959383..1f0a920 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/CoprocessorProjector.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/CoprocessorProjector.java
@@ -18,11 +18,6 @@
 
 package org.apache.kylin.storage.hbase.coprocessor;
 
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-
 import org.apache.hadoop.hbase.Cell;
 import org.apache.kylin.common.util.BytesSerializer;
 import org.apache.kylin.common.util.BytesUtil;
@@ -32,6 +27,11 @@ import org.apache.kylin.cube.kv.RowKeyEncoder;
 import org.apache.kylin.invertedindex.index.TableRecordInfo;
 import org.apache.kylin.metadata.model.TblColRef;
 
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
 /**
  * @author yangli9
  */
@@ -102,10 +102,11 @@ public class CoprocessorProjector {
     // ============================================================================
 
     final byte[] groupByMask; // mask out columns that are not needed (by group by)
-    final AggrKey aggrKey = new AggrKey();
+    final AggrKey aggrKey;
 
     public CoprocessorProjector(byte[] groupByMask) {
         this.groupByMask = groupByMask;
+        this.aggrKey = new AggrKey(this.groupByMask);
     }
 
     public AggrKey getAggrKey(List<Cell> rowCells) {
@@ -123,76 +124,4 @@ public class CoprocessorProjector {
         aggrKey.set(row, 0);
         return aggrKey;
     }
-
-    public class AggrKey implements Comparable<AggrKey> {
-        byte[] data;
-        int offset;
-
-        public byte[] get() {
-            return data;
-        }
-
-        public int offset() {
-            return offset;
-        }
-
-        public int length() {
-            return groupByMask.length;
-        }
-
-        void set(byte[] data, int offset) {
-            this.data = data;
-            this.offset = offset;
-        }
-
-        public AggrKey copy() {
-            AggrKey copy = new AggrKey();
-            copy.set(new byte[length()], 0);
-            System.arraycopy(this.data, this.offset, copy.data, copy.offset, length());
-            return copy;
-        }
-
-        @Override
-        public int hashCode() {
-            int hash = 1;
-            for (int i = 0, j = offset, n = length(); i < n; i++, j++) {
-                if (groupByMask[i] != 0)
-                    hash = (31 * hash) + (int) data[j];
-            }
-            return hash;
-        }
-
-        @Override
-        public boolean equals(Object obj) {
-            if (this == obj)
-                return true;
-            if (obj == null)
-                return false;
-            if (getClass() != obj.getClass())
-                return false;
-            AggrKey other = (AggrKey) obj;
-            if (this.length() != other.length())
-                return false;
-
-            return compareTo(other) == 0;
-        }
-
-        @Override
-        public int compareTo(AggrKey o) {
-            int comp = this.length() - o.length();
-            if (comp != 0)
-                return comp;
-
-            int n = this.length();
-            for (int i = 0, j = offset, k = o.offset; i < n; i++, j++, k++) {
-                if (groupByMask[i] != 0) {
-                    comp = BytesUtil.compareByteUnsigned(this.data[j], o.data[k]);
-                    if (comp != 0)
-                        return comp;
-                }
-            }
-            return 0;
-        }
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/cc25a042/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregationCache.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregationCache.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregationCache.java
index b6722dc..ac6fa70 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregationCache.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregationCache.java
@@ -19,8 +19,8 @@
 package org.apache.kylin.storage.hbase.coprocessor.endpoint;
 
 import org.apache.kylin.metadata.measure.MeasureAggregator;
+import org.apache.kylin.storage.hbase.coprocessor.AggrKey;
 import org.apache.kylin.storage.hbase.coprocessor.AggregationCache;
-import org.apache.kylin.storage.hbase.coprocessor.CoprocessorProjector;
 
 import java.util.Map;
 import java.util.Set;
@@ -41,7 +41,7 @@ public class EndpointAggregationCache extends AggregationCache {
         return this.aggregators.createBuffer();
     }
 
-    public Set<Map.Entry<CoprocessorProjector.AggrKey, MeasureAggregator[]>> getAllEntries() {
+    public Set<Map.Entry<AggrKey, MeasureAggregator[]>> getAllEntries() {
         return aggBufMap.entrySet();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/cc25a042/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/IIEndpoint.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/IIEndpoint.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/IIEndpoint.java
index 77bed3b..2e85eac 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/IIEndpoint.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/IIEndpoint.java
@@ -22,22 +22,7 @@ import com.google.protobuf.ByteString;
 import com.google.protobuf.RpcCallback;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.Service;
-
-import org.apache.kylin.invertedindex.index.RawTableRecord;
-import org.apache.kylin.invertedindex.index.Slice;
-import org.apache.kylin.invertedindex.index.TableRecordInfoDigest;
-import org.apache.kylin.invertedindex.model.IIDesc;
-import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
-import org.apache.kylin.metadata.measure.MeasureAggregator;
-import org.apache.kylin.storage.filter.BitMapFilterEvaluator;
-import org.apache.kylin.storage.hbase.coprocessor.CoprocessorConstants;
-import org.apache.kylin.storage.hbase.coprocessor.CoprocessorProjector;
-import org.apache.kylin.storage.hbase.coprocessor.endpoint.generated.IIProtos;
-import org.apache.kylin.storage.hbase.coprocessor.CoprocessorFilter;
-import org.apache.kylin.storage.hbase.coprocessor.CoprocessorRowType;
-
 import it.uniroma3.mat.extendedset.intset.ConciseSet;
-
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.hbase.Coprocessor;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
@@ -49,6 +34,15 @@ import org.apache.hadoop.hbase.protobuf.ResponseConverter;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.invertedindex.index.RawTableRecord;
+import org.apache.kylin.invertedindex.index.Slice;
+import org.apache.kylin.invertedindex.index.TableRecordInfoDigest;
+import org.apache.kylin.invertedindex.model.IIDesc;
+import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
+import org.apache.kylin.metadata.measure.MeasureAggregator;
+import org.apache.kylin.storage.filter.BitMapFilterEvaluator;
+import org.apache.kylin.storage.hbase.coprocessor.*;
+import org.apache.kylin.storage.hbase.coprocessor.endpoint.generated.IIProtos;
 
 import java.io.IOException;
 import java.util.Iterator;
@@ -137,7 +131,7 @@ public class IIEndpoint extends IIProtos.RowsService implements Coprocessor, Cop
             Iterator<RawTableRecord> iterator = slice.iterateWithBitmap(result);
             while (iterator.hasNext()) {
                 byte[] data = iterator.next().getBytes();
-                CoprocessorProjector.AggrKey aggKey = projector.getAggrKey(data);
+                AggrKey aggKey = projector.getAggrKey(data);
                 MeasureAggregator[] bufs = aggCache.getBuffer(aggKey);
                 aggregators.aggregate(bufs, data);
                 aggCache.checkMemoryUsage();
@@ -145,8 +139,8 @@ public class IIEndpoint extends IIProtos.RowsService implements Coprocessor, Cop
         }
 
         byte[] metricBuffer = new byte[CoprocessorConstants.METRIC_SERIALIZE_BUFFER_SIZE];
-        for (Map.Entry<CoprocessorProjector.AggrKey, MeasureAggregator[]> entry : aggCache.getAllEntries()) {
-            CoprocessorProjector.AggrKey aggrKey = entry.getKey();
+        for (Map.Entry<AggrKey, MeasureAggregator[]> entry : aggCache.getAllEntries()) {
+            AggrKey aggrKey = entry.getKey();
             IIProtos.IIResponse.IIRow.Builder rowBuilder = IIProtos.IIResponse.IIRow.newBuilder().setColumns(ByteString.copyFrom(aggrKey.get(), aggrKey.offset(), aggrKey.length()));
             int length = aggregators.serializeMetricValues(entry.getValue(), metricBuffer);
             rowBuilder.setMeasures(ByteString.copyFrom(metricBuffer, 0, length));

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/cc25a042/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregationScanner.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregationScanner.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregationScanner.java
index 85f3f6e..158b08f 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregationScanner.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregationScanner.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.kylin.metadata.measure.MeasureAggregator;
+import org.apache.kylin.storage.hbase.coprocessor.AggrKey;
 import org.apache.kylin.storage.hbase.coprocessor.CoprocessorFilter;
 import org.apache.kylin.storage.hbase.coprocessor.CoprocessorProjector;
 import org.apache.kylin.storage.hbase.coprocessor.CoprocessorRowType;
@@ -80,7 +81,7 @@ public class AggregationScanner implements RegionScanner {
                     continue;
 
                 if (behavior.ordinal() >= ObserverBehavior.SCAN_FILTER_AGGR.ordinal()) {
-                    CoprocessorProjector.AggrKey aggKey = projector.getAggrKey(results);
+                    AggrKey aggKey = projector.getAggrKey(results);
                     MeasureAggregator[] bufs = aggCache.getBuffer(aggKey);
                     aggregators.aggregate(bufs, results);
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/cc25a042/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverAggregationCache.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverAggregationCache.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverAggregationCache.java
index 6dce289..28feda7 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverAggregationCache.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverAggregationCache.java
@@ -18,22 +18,21 @@
 
 package org.apache.kylin.storage.hbase.coprocessor.observer;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map.Entry;
-
-import org.apache.kylin.storage.hbase.coprocessor.AggregationCache;
-import org.apache.kylin.storage.hbase.coprocessor.CoprocessorProjector;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValue.Type;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
-
 import org.apache.kylin.metadata.measure.MeasureAggregator;
+import org.apache.kylin.storage.hbase.coprocessor.AggrKey;
+import org.apache.kylin.storage.hbase.coprocessor.AggregationCache;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
 
 /**
  * @author yangli9
@@ -59,7 +58,7 @@ public class ObserverAggregationCache extends AggregationCache {
     private class AggregationRegionScanner implements RegionScanner {
 
         private final RegionScanner innerScanner;
-        private final Iterator<Entry<CoprocessorProjector.AggrKey, MeasureAggregator[]>> iterator;
+        private final Iterator<Entry<AggrKey, MeasureAggregator[]>> iterator;
 
         public AggregationRegionScanner(RegionScanner innerScanner) {
             this.innerScanner = innerScanner;
@@ -71,7 +70,7 @@ public class ObserverAggregationCache extends AggregationCache {
             // AggregateRegionObserver.LOG.info("Kylin Scanner next()");
             boolean hasMore = false;
             if (iterator.hasNext()) {
-                Entry<CoprocessorProjector.AggrKey, MeasureAggregator[]> entry = iterator.next();
+                Entry<AggrKey, MeasureAggregator[]> entry = iterator.next();
                 makeCells(entry, results);
                 hasMore = iterator.hasNext();
             }
@@ -79,12 +78,12 @@ public class ObserverAggregationCache extends AggregationCache {
             return hasMore;
         }
 
-        private void makeCells(Entry<CoprocessorProjector.AggrKey, MeasureAggregator[]> entry, List<Cell> results) {
+        private void makeCells(Entry<AggrKey, MeasureAggregator[]> entry, List<Cell> results) {
             byte[][] families = aggregators.getHColFamilies();
             byte[][] qualifiers = aggregators.getHColQualifiers();
             int nHCols = aggregators.getHColsNum();
 
-            CoprocessorProjector.AggrKey rowKey = entry.getKey();
+            AggrKey rowKey = entry.getKey();
             MeasureAggregator[] aggBuf = entry.getValue();
             ByteBuffer[] rowValues = aggregators.getHColValues(aggBuf);
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/cc25a042/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/RowProjectorTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/RowProjectorTest.java b/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/RowProjectorTest.java
index 6a22d74..f597c7e 100644
--- a/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/RowProjectorTest.java
+++ b/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/RowProjectorTest.java
@@ -18,19 +18,17 @@
 
 package org.apache.kylin.storage.hbase.coprocessor;
 
-import static org.junit.Assert.*;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
+import com.google.common.collect.Lists;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.kylin.common.util.Bytes;
 import org.junit.Test;
 
-import com.google.common.collect.Lists;
-import org.apache.kylin.storage.hbase.coprocessor.CoprocessorProjector.AggrKey;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
 
 /**
  * @author yangli9

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/cc25a042/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregationTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregationTest.java b/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregationTest.java
index 24291ac..87c6445 100644
--- a/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregationTest.java
+++ b/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregationTest.java
@@ -18,23 +18,22 @@
 
 package org.apache.kylin.storage.hbase.coprocessor.endpoint;
 
-import static org.junit.Assert.assertEquals;
-
+import org.apache.hadoop.io.LongWritable;
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
 import org.apache.kylin.invertedindex.IIInstance;
 import org.apache.kylin.invertedindex.IIManager;
 import org.apache.kylin.invertedindex.index.TableRecord;
 import org.apache.kylin.invertedindex.index.TableRecordInfo;
 import org.apache.kylin.metadata.MetadataManager;
-import org.apache.kylin.metadata.measure.MeasureAggregator;
 import org.apache.kylin.metadata.filter.ColumnTupleFilter;
 import org.apache.kylin.metadata.filter.CompareTupleFilter;
 import org.apache.kylin.metadata.filter.ConstantTupleFilter;
 import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.kylin.metadata.measure.MeasureAggregator;
 import org.apache.kylin.metadata.model.*;
+import org.apache.kylin.storage.hbase.coprocessor.AggrKey;
 import org.apache.kylin.storage.hbase.coprocessor.CoprocessorFilter;
 import org.apache.kylin.storage.hbase.coprocessor.CoprocessorProjector;
-import org.apache.hadoop.io.LongWritable;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -42,6 +41,8 @@ import org.junit.Test;
 import java.io.IOException;
 import java.util.*;
 
+import static org.junit.Assert.assertEquals;
+
 /**
  * Created by Hongbin Ma(Binmahone) on 11/27/14.
  *
@@ -173,7 +174,7 @@ public class EndpointAggregationTest extends LocalFileMetadataTestCase {
 
         for (int i = 0; i < tableData.size(); ++i) {
             byte[] data = tableData.get(i).getBytes();
-            CoprocessorProjector.AggrKey aggKey = projector.getAggrKey(data);
+            AggrKey aggKey = projector.getAggrKey(data);
             MeasureAggregator[] bufs = aggCache.getBuffer(aggKey);
             aggregators.aggregate(bufs, data);
             aggCache.checkMemoryUsage();
@@ -183,7 +184,7 @@ public class EndpointAggregationTest extends LocalFileMetadataTestCase {
 
         long sumTotal = 0;
         long minTotal = 0;
-        for (Map.Entry<CoprocessorProjector.AggrKey, MeasureAggregator[]> entry : aggCache.getAllEntries()) {
+        for (Map.Entry<AggrKey, MeasureAggregator[]> entry : aggCache.getAllEntries()) {
             sumTotal += ((LongWritable) entry.getValue()[0].getState()).get();
             minTotal += ((LongWritable) entry.getValue()[1].getState()).get();