You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2015/06/25 06:23:37 UTC
incubator-kylin git commit: KYLIN-857 backport the new AggrKey from
0.8 to 0.7
Repository: incubator-kylin
Updated Branches:
refs/heads/0.7-staging 269d92253 -> cc25a0424
KYLIN-857 backport the new AggrKey from 0.8 to 0.7
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/cc25a042
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/cc25a042
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/cc25a042
Branch: refs/heads/0.7-staging
Commit: cc25a04243a32bf60cf794e87211661c0dc16080
Parents: 269d922
Author: honma <ho...@ebay.com>
Authored: Thu Jun 25 12:22:15 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Thu Jun 25 12:22:15 2015 +0800
----------------------------------------------------------------------
.../storage/hbase/coprocessor/AggrKey.java | 116 +++++++++++++++++++
.../hbase/coprocessor/AggregationCache.java | 4 +-
.../hbase/coprocessor/CoprocessorProjector.java | 85 ++------------
.../endpoint/EndpointAggregationCache.java | 4 +-
.../hbase/coprocessor/endpoint/IIEndpoint.java | 30 ++---
.../observer/AggregationScanner.java | 3 +-
.../observer/ObserverAggregationCache.java | 25 ++--
.../hbase/coprocessor/RowProjectorTest.java | 14 +--
.../endpoint/EndpointAggregationTest.java | 13 ++-
9 files changed, 166 insertions(+), 128 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/cc25a042/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/AggrKey.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/AggrKey.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/AggrKey.java
new file mode 100644
index 0000000..7309f61
--- /dev/null
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/AggrKey.java
@@ -0,0 +1,116 @@
+package org.apache.kylin.storage.hbase.coprocessor;
+
+import com.google.common.collect.Lists;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.cube.kv.RowConstants;
+
+import java.util.Arrays;
+import java.util.LinkedList;
+
+/**
+ * Created by qianzhou on 4/20/15.
+ */
+public class AggrKey implements Comparable<AggrKey> {
+
+ final byte[] groupByMask;
+ final transient int[] groupByMaskSet;
+ transient int hashcode;
+ byte[] data;
+ int offset;
+
+ AggrKey(byte[] groupByMask) {
+ this.groupByMask = groupByMask;
+ LinkedList<Integer> list = Lists.newLinkedList();
+ for (int i = 0; i < groupByMask.length; i++) {
+ if (groupByMask[i] != 0) {
+ list.add(i);
+ }
+ }
+ groupByMaskSet = new int[list.size()];
+ for (int i = 0; i < list.size(); i++) {
+ groupByMaskSet[i] = list.get(i);
+ }
+ }
+
+ private AggrKey(byte[] groupByMask, int[] groupByMaskSet, byte[] data, int offset, int hashcode) {
+ this.groupByMask = groupByMask;
+ this.groupByMaskSet = groupByMaskSet;
+ this.data = data;
+ this.offset = offset;
+ this.hashcode = hashcode;
+ }
+
+ private int calculateHash()
+ {
+ int hash = 1;
+ for (int i = 0; i < groupByMaskSet.length; i++) {
+ byte t = data[offset + groupByMaskSet[i]];
+ if(t!= RowConstants.ROWKEY_PLACE_HOLDER_BYTE) {
+ hash = (31 * hash) + t;
+ }
+ }
+ return hash;
+ }
+
+ public byte[] get() {
+ return data;
+ }
+
+ public int offset() {
+ return offset;
+ }
+
+ public int length() {
+ return groupByMask.length;
+ }
+
+ void set(byte[] data, int offset) {
+ this.data = data;
+ this.offset = offset;
+ this.hashcode = calculateHash();
+ }
+
+ public byte[] getGroupByMask() {
+ return this.groupByMask;
+ }
+
+ public byte[] copyBytes() {
+ return Arrays.copyOfRange(data, offset, offset + length());
+ }
+
+ AggrKey copy() {
+ AggrKey copy = new AggrKey(this.groupByMask, this.groupByMaskSet, copyBytes(), 0, this.hashcode);
+ return copy;
+ }
+
+ @Override
+ public int hashCode() {
+ return hashcode;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ AggrKey other = (AggrKey) obj;
+ return compareTo(other) == 0;
+ }
+
+ @Override
+ public int compareTo(AggrKey o) {
+ int comp = this.length() - o.length();
+ if (comp != 0)
+ return comp;
+
+ for (int i = 0; i < groupByMaskSet.length; i++) {
+ comp = BytesUtil.compareByteUnsigned(this.data[this.offset + groupByMaskSet[i]], o.data[o.offset + groupByMaskSet[i]]);
+ if (comp != 0)
+ return comp;
+ }
+ return 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/cc25a042/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/AggregationCache.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/AggregationCache.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/AggregationCache.java
index 0c48487..483c72c 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/AggregationCache.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/AggregationCache.java
@@ -30,7 +30,7 @@ import java.util.SortedMap;
public abstract class AggregationCache {
transient int rowMemBytes;
static final int MEMORY_USAGE_CAP = 500 * 1024 * 1024; // 500 MB
- protected final SortedMap<CoprocessorProjector.AggrKey, MeasureAggregator[]> aggBufMap;
+ protected final SortedMap<AggrKey, MeasureAggregator[]> aggBufMap;
public AggregationCache() {
this.aggBufMap = Maps.newTreeMap();
@@ -38,7 +38,7 @@ public abstract class AggregationCache {
public abstract MeasureAggregator[] createBuffer();
- public MeasureAggregator[] getBuffer(CoprocessorProjector.AggrKey aggkey) {
+ public MeasureAggregator[] getBuffer(AggrKey aggkey) {
MeasureAggregator[] aggBuf = aggBufMap.get(aggkey);
if (aggBuf == null) {
aggBuf = createBuffer();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/cc25a042/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/CoprocessorProjector.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/CoprocessorProjector.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/CoprocessorProjector.java
index b959383..1f0a920 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/CoprocessorProjector.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/CoprocessorProjector.java
@@ -18,11 +18,6 @@
package org.apache.kylin.storage.hbase.coprocessor;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-
import org.apache.hadoop.hbase.Cell;
import org.apache.kylin.common.util.BytesSerializer;
import org.apache.kylin.common.util.BytesUtil;
@@ -32,6 +27,11 @@ import org.apache.kylin.cube.kv.RowKeyEncoder;
import org.apache.kylin.invertedindex.index.TableRecordInfo;
import org.apache.kylin.metadata.model.TblColRef;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
/**
* @author yangli9
*/
@@ -102,10 +102,11 @@ public class CoprocessorProjector {
// ============================================================================
final byte[] groupByMask; // mask out columns that are not needed (by group by)
- final AggrKey aggrKey = new AggrKey();
+ final AggrKey aggrKey;
public CoprocessorProjector(byte[] groupByMask) {
this.groupByMask = groupByMask;
+ this.aggrKey = new AggrKey(this.groupByMask);
}
public AggrKey getAggrKey(List<Cell> rowCells) {
@@ -123,76 +124,4 @@ public class CoprocessorProjector {
aggrKey.set(row, 0);
return aggrKey;
}
-
- public class AggrKey implements Comparable<AggrKey> {
- byte[] data;
- int offset;
-
- public byte[] get() {
- return data;
- }
-
- public int offset() {
- return offset;
- }
-
- public int length() {
- return groupByMask.length;
- }
-
- void set(byte[] data, int offset) {
- this.data = data;
- this.offset = offset;
- }
-
- public AggrKey copy() {
- AggrKey copy = new AggrKey();
- copy.set(new byte[length()], 0);
- System.arraycopy(this.data, this.offset, copy.data, copy.offset, length());
- return copy;
- }
-
- @Override
- public int hashCode() {
- int hash = 1;
- for (int i = 0, j = offset, n = length(); i < n; i++, j++) {
- if (groupByMask[i] != 0)
- hash = (31 * hash) + (int) data[j];
- }
- return hash;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- AggrKey other = (AggrKey) obj;
- if (this.length() != other.length())
- return false;
-
- return compareTo(other) == 0;
- }
-
- @Override
- public int compareTo(AggrKey o) {
- int comp = this.length() - o.length();
- if (comp != 0)
- return comp;
-
- int n = this.length();
- for (int i = 0, j = offset, k = o.offset; i < n; i++, j++, k++) {
- if (groupByMask[i] != 0) {
- comp = BytesUtil.compareByteUnsigned(this.data[j], o.data[k]);
- if (comp != 0)
- return comp;
- }
- }
- return 0;
- }
- }
-
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/cc25a042/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregationCache.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregationCache.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregationCache.java
index b6722dc..ac6fa70 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregationCache.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregationCache.java
@@ -19,8 +19,8 @@
package org.apache.kylin.storage.hbase.coprocessor.endpoint;
import org.apache.kylin.metadata.measure.MeasureAggregator;
+import org.apache.kylin.storage.hbase.coprocessor.AggrKey;
import org.apache.kylin.storage.hbase.coprocessor.AggregationCache;
-import org.apache.kylin.storage.hbase.coprocessor.CoprocessorProjector;
import java.util.Map;
import java.util.Set;
@@ -41,7 +41,7 @@ public class EndpointAggregationCache extends AggregationCache {
return this.aggregators.createBuffer();
}
- public Set<Map.Entry<CoprocessorProjector.AggrKey, MeasureAggregator[]>> getAllEntries() {
+ public Set<Map.Entry<AggrKey, MeasureAggregator[]>> getAllEntries() {
return aggBufMap.entrySet();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/cc25a042/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/IIEndpoint.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/IIEndpoint.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/IIEndpoint.java
index 77bed3b..2e85eac 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/IIEndpoint.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/IIEndpoint.java
@@ -22,22 +22,7 @@ import com.google.protobuf.ByteString;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
-
-import org.apache.kylin.invertedindex.index.RawTableRecord;
-import org.apache.kylin.invertedindex.index.Slice;
-import org.apache.kylin.invertedindex.index.TableRecordInfoDigest;
-import org.apache.kylin.invertedindex.model.IIDesc;
-import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
-import org.apache.kylin.metadata.measure.MeasureAggregator;
-import org.apache.kylin.storage.filter.BitMapFilterEvaluator;
-import org.apache.kylin.storage.hbase.coprocessor.CoprocessorConstants;
-import org.apache.kylin.storage.hbase.coprocessor.CoprocessorProjector;
-import org.apache.kylin.storage.hbase.coprocessor.endpoint.generated.IIProtos;
-import org.apache.kylin.storage.hbase.coprocessor.CoprocessorFilter;
-import org.apache.kylin.storage.hbase.coprocessor.CoprocessorRowType;
-
import it.uniroma3.mat.extendedset.intset.ConciseSet;
-
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
@@ -49,6 +34,15 @@ import org.apache.hadoop.hbase.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.invertedindex.index.RawTableRecord;
+import org.apache.kylin.invertedindex.index.Slice;
+import org.apache.kylin.invertedindex.index.TableRecordInfoDigest;
+import org.apache.kylin.invertedindex.model.IIDesc;
+import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
+import org.apache.kylin.metadata.measure.MeasureAggregator;
+import org.apache.kylin.storage.filter.BitMapFilterEvaluator;
+import org.apache.kylin.storage.hbase.coprocessor.*;
+import org.apache.kylin.storage.hbase.coprocessor.endpoint.generated.IIProtos;
import java.io.IOException;
import java.util.Iterator;
@@ -137,7 +131,7 @@ public class IIEndpoint extends IIProtos.RowsService implements Coprocessor, Cop
Iterator<RawTableRecord> iterator = slice.iterateWithBitmap(result);
while (iterator.hasNext()) {
byte[] data = iterator.next().getBytes();
- CoprocessorProjector.AggrKey aggKey = projector.getAggrKey(data);
+ AggrKey aggKey = projector.getAggrKey(data);
MeasureAggregator[] bufs = aggCache.getBuffer(aggKey);
aggregators.aggregate(bufs, data);
aggCache.checkMemoryUsage();
@@ -145,8 +139,8 @@ public class IIEndpoint extends IIProtos.RowsService implements Coprocessor, Cop
}
byte[] metricBuffer = new byte[CoprocessorConstants.METRIC_SERIALIZE_BUFFER_SIZE];
- for (Map.Entry<CoprocessorProjector.AggrKey, MeasureAggregator[]> entry : aggCache.getAllEntries()) {
- CoprocessorProjector.AggrKey aggrKey = entry.getKey();
+ for (Map.Entry<AggrKey, MeasureAggregator[]> entry : aggCache.getAllEntries()) {
+ AggrKey aggrKey = entry.getKey();
IIProtos.IIResponse.IIRow.Builder rowBuilder = IIProtos.IIResponse.IIRow.newBuilder().setColumns(ByteString.copyFrom(aggrKey.get(), aggrKey.offset(), aggrKey.length()));
int length = aggregators.serializeMetricValues(entry.getValue(), metricBuffer);
rowBuilder.setMeasures(ByteString.copyFrom(metricBuffer, 0, length));
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/cc25a042/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregationScanner.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregationScanner.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregationScanner.java
index 85f3f6e..158b08f 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregationScanner.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregationScanner.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.kylin.metadata.measure.MeasureAggregator;
+import org.apache.kylin.storage.hbase.coprocessor.AggrKey;
import org.apache.kylin.storage.hbase.coprocessor.CoprocessorFilter;
import org.apache.kylin.storage.hbase.coprocessor.CoprocessorProjector;
import org.apache.kylin.storage.hbase.coprocessor.CoprocessorRowType;
@@ -80,7 +81,7 @@ public class AggregationScanner implements RegionScanner {
continue;
if (behavior.ordinal() >= ObserverBehavior.SCAN_FILTER_AGGR.ordinal()) {
- CoprocessorProjector.AggrKey aggKey = projector.getAggrKey(results);
+ AggrKey aggKey = projector.getAggrKey(results);
MeasureAggregator[] bufs = aggCache.getBuffer(aggKey);
aggregators.aggregate(bufs, results);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/cc25a042/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverAggregationCache.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverAggregationCache.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverAggregationCache.java
index 6dce289..28feda7 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverAggregationCache.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverAggregationCache.java
@@ -18,22 +18,21 @@
package org.apache.kylin.storage.hbase.coprocessor.observer;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map.Entry;
-
-import org.apache.kylin.storage.hbase.coprocessor.AggregationCache;
-import org.apache.kylin.storage.hbase.coprocessor.CoprocessorProjector;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
-
import org.apache.kylin.metadata.measure.MeasureAggregator;
+import org.apache.kylin.storage.hbase.coprocessor.AggrKey;
+import org.apache.kylin.storage.hbase.coprocessor.AggregationCache;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
/**
* @author yangli9
@@ -59,7 +58,7 @@ public class ObserverAggregationCache extends AggregationCache {
private class AggregationRegionScanner implements RegionScanner {
private final RegionScanner innerScanner;
- private final Iterator<Entry<CoprocessorProjector.AggrKey, MeasureAggregator[]>> iterator;
+ private final Iterator<Entry<AggrKey, MeasureAggregator[]>> iterator;
public AggregationRegionScanner(RegionScanner innerScanner) {
this.innerScanner = innerScanner;
@@ -71,7 +70,7 @@ public class ObserverAggregationCache extends AggregationCache {
// AggregateRegionObserver.LOG.info("Kylin Scanner next()");
boolean hasMore = false;
if (iterator.hasNext()) {
- Entry<CoprocessorProjector.AggrKey, MeasureAggregator[]> entry = iterator.next();
+ Entry<AggrKey, MeasureAggregator[]> entry = iterator.next();
makeCells(entry, results);
hasMore = iterator.hasNext();
}
@@ -79,12 +78,12 @@ public class ObserverAggregationCache extends AggregationCache {
return hasMore;
}
- private void makeCells(Entry<CoprocessorProjector.AggrKey, MeasureAggregator[]> entry, List<Cell> results) {
+ private void makeCells(Entry<AggrKey, MeasureAggregator[]> entry, List<Cell> results) {
byte[][] families = aggregators.getHColFamilies();
byte[][] qualifiers = aggregators.getHColQualifiers();
int nHCols = aggregators.getHColsNum();
- CoprocessorProjector.AggrKey rowKey = entry.getKey();
+ AggrKey rowKey = entry.getKey();
MeasureAggregator[] aggBuf = entry.getValue();
ByteBuffer[] rowValues = aggregators.getHColValues(aggBuf);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/cc25a042/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/RowProjectorTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/RowProjectorTest.java b/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/RowProjectorTest.java
index 6a22d74..f597c7e 100644
--- a/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/RowProjectorTest.java
+++ b/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/RowProjectorTest.java
@@ -18,19 +18,17 @@
package org.apache.kylin.storage.hbase.coprocessor;
-import static org.junit.Assert.*;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
+import com.google.common.collect.Lists;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.kylin.common.util.Bytes;
import org.junit.Test;
-import com.google.common.collect.Lists;
-import org.apache.kylin.storage.hbase.coprocessor.CoprocessorProjector.AggrKey;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
/**
* @author yangli9
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/cc25a042/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregationTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregationTest.java b/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregationTest.java
index 24291ac..87c6445 100644
--- a/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregationTest.java
+++ b/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregationTest.java
@@ -18,23 +18,22 @@
package org.apache.kylin.storage.hbase.coprocessor.endpoint;
-import static org.junit.Assert.assertEquals;
-
+import org.apache.hadoop.io.LongWritable;
import org.apache.kylin.common.util.LocalFileMetadataTestCase;
import org.apache.kylin.invertedindex.IIInstance;
import org.apache.kylin.invertedindex.IIManager;
import org.apache.kylin.invertedindex.index.TableRecord;
import org.apache.kylin.invertedindex.index.TableRecordInfo;
import org.apache.kylin.metadata.MetadataManager;
-import org.apache.kylin.metadata.measure.MeasureAggregator;
import org.apache.kylin.metadata.filter.ColumnTupleFilter;
import org.apache.kylin.metadata.filter.CompareTupleFilter;
import org.apache.kylin.metadata.filter.ConstantTupleFilter;
import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.kylin.metadata.measure.MeasureAggregator;
import org.apache.kylin.metadata.model.*;
+import org.apache.kylin.storage.hbase.coprocessor.AggrKey;
import org.apache.kylin.storage.hbase.coprocessor.CoprocessorFilter;
import org.apache.kylin.storage.hbase.coprocessor.CoprocessorProjector;
-import org.apache.hadoop.io.LongWritable;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -42,6 +41,8 @@ import org.junit.Test;
import java.io.IOException;
import java.util.*;
+import static org.junit.Assert.assertEquals;
+
/**
* Created by Hongbin Ma(Binmahone) on 11/27/14.
*
@@ -173,7 +174,7 @@ public class EndpointAggregationTest extends LocalFileMetadataTestCase {
for (int i = 0; i < tableData.size(); ++i) {
byte[] data = tableData.get(i).getBytes();
- CoprocessorProjector.AggrKey aggKey = projector.getAggrKey(data);
+ AggrKey aggKey = projector.getAggrKey(data);
MeasureAggregator[] bufs = aggCache.getBuffer(aggKey);
aggregators.aggregate(bufs, data);
aggCache.checkMemoryUsage();
@@ -183,7 +184,7 @@ public class EndpointAggregationTest extends LocalFileMetadataTestCase {
long sumTotal = 0;
long minTotal = 0;
- for (Map.Entry<CoprocessorProjector.AggrKey, MeasureAggregator[]> entry : aggCache.getAllEntries()) {
+ for (Map.Entry<AggrKey, MeasureAggregator[]> entry : aggCache.getAllEntries()) {
sumTotal += ((LongWritable) entry.getValue()[0].getState()).get();
minTotal += ((LongWritable) entry.getValue()[1].getState()).get();