You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by nj...@apache.org on 2018/12/19 06:53:08 UTC

[kylin] branch master updated: KYLIN-3365 Add unit test for the coprocessor code, CubeVisitService

This is an automated email from the ASF dual-hosted git repository.

nju_yaho pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new 91efa0c  KYLIN-3365 Add unit test for the coprocessor code, CubeVisitService
91efa0c is described below

commit 91efa0cff4c23246dd4e9c8eee87b7aeee54837e
Author: kyotoYaho <nj...@apache.org>
AuthorDate: Wed Dec 19 14:49:32 2018 +0800

    KYLIN-3365 Add unit test for the coprocessor code, CubeVisitService
---
 storage-hbase/pom.xml                              |  13 +-
 .../hbase/cube/v2/CubeHBaseEndpointRPC.java        |   4 +-
 .../coprocessor/endpoint/CubeVisitServiceTest.java | 568 +++++++++++++++++++++
 3 files changed, 581 insertions(+), 4 deletions(-)

diff --git a/storage-hbase/pom.xml b/storage-hbase/pom.xml
index 190dd8c..167280d 100644
--- a/storage-hbase/pom.xml
+++ b/storage-hbase/pom.xml
@@ -17,7 +17,8 @@
  limitations under the License.
 -->
 
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <modelVersion>4.0.0</modelVersion>
 
     <artifactId>kylin-storage-hbase</artifactId>
@@ -65,6 +66,13 @@
             <scope>test</scope>
         </dependency>
         <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-hdfs</artifactId>
+            <version>${hadoop2.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>org.apache.hbase</groupId>
             <artifactId>hbase-common</artifactId>
             <scope>provided</scope>
@@ -135,7 +143,8 @@
                             <shadedArtifactAttached>true</shadedArtifactAttached>
                             <shadedClassifierName>coprocessor</shadedClassifierName>
                             <transformers>
-                                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                             </transformers>
                             <artifactSet>
                                 <includes>
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index c9be666..d1aee2f 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -427,7 +427,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
         }
     }
 
-    private ByteString serializeGTScanReq(GTScanRequest scanRequest) {
+    public static ByteString serializeGTScanReq(GTScanRequest scanRequest) {
         ByteString scanRequestByteString;
         int scanRequestBufferSize = BytesSerializer.SERIALIZE_BUFFER_SIZE;
         while (true) {
@@ -445,7 +445,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
         return scanRequestByteString;
     }
 
-    private ByteString serializeRawScans(List<RawScan> rawScans) {
+    public static ByteString serializeRawScans(List<RawScan> rawScans) {
         ByteString rawScanByteString;
         int rawScanBufferSize = BytesSerializer.SERIALIZE_BUFFER_SIZE;
         while (true) {
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitServiceTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitServiceTest.java
new file mode 100644
index 0000000..b3e5a93
--- /dev/null
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitServiceTest.java
@@ -0,0 +1,568 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.BitSet;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.TestRowProcessorEndpoint;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.common.util.CompressionUtils;
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.cube.gridtable.CubeCodeSystem;
+import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.dict.StringBytesConverter;
+import org.apache.kylin.dict.TrieDictionaryBuilder;
+import org.apache.kylin.dimension.DateDimEnc;
+import org.apache.kylin.dimension.DictionaryDimEnc;
+import org.apache.kylin.dimension.DimensionEncoding;
+import org.apache.kylin.gridtable.GTBuilder;
+import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.GTScanRequestBuilder;
+import org.apache.kylin.gridtable.GridTable;
+import org.apache.kylin.gridtable.IGTScanner;
+import org.apache.kylin.gridtable.memstore.GTSimpleMemStore;
+import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.expression.BinaryTupleExpression;
+import org.apache.kylin.metadata.expression.CaseTupleExpression;
+import org.apache.kylin.metadata.expression.ColumnTupleExpression;
+import org.apache.kylin.metadata.expression.NumberTupleExpression;
+import org.apache.kylin.metadata.expression.TupleExpression;
+import org.apache.kylin.metadata.expression.TupleExpression.ExpressionOperatorEnum;
+import org.apache.kylin.metadata.filter.ColumnTupleFilter;
+import org.apache.kylin.metadata.filter.CompareTupleFilter;
+import org.apache.kylin.metadata.filter.ConstantTupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.storage.gtrecord.PartitionResultIterator;
+import org.apache.kylin.storage.hbase.cube.v2.CubeHBaseEndpointRPC;
+import org.apache.kylin.storage.hbase.cube.v2.RawScan;
+import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.powermock.api.mockito.PowerMockito;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.protobuf.HBaseZeroCopyByteString;
+import com.google.protobuf.RpcCallback;
+
+public class CubeVisitServiceTest extends LocalFileMetadataTestCase {
+
+    private static final TableName TABLE = TableName.valueOf("KYLIN_testtable");
+
+    private static HBaseTestingUtility util = new HBaseTestingUtility();
+
+    private volatile static HRegion region = null;
+    private volatile static GTInfo gtInfo = null;
+    private static final long baseCuboid = 3L;
+
+    private final static byte[] FAM = Bytes.toBytes("f1");
+    private final static byte[] COL_M = Bytes.toBytes("m");
+
+    private static final List<String> dateList = Lists.newArrayList("2018-01-14", "2018-01-15", "2018-01-16");
+    private static final List<String> userList = Lists.newArrayList("Ken", "Lisa", "Gang", "Kalin", "Julian", "John");
+    private static final List<BigDecimal> priceList = Lists.newArrayList(new BigDecimal("10.5"),
+            new BigDecimal("15.5"));
+
+    private static final Map<String, Double> expUserStddevRet = Maps.newHashMap();
+    private static final Map<String, BigDecimal> expUserRet = Maps.newHashMap();
+    private static final BigDecimal userCnt = new BigDecimal(dateList.size());
+
+    public static void prepareTestData() throws Exception {
+        try {
+            util.getHBaseAdmin().disableTable(TABLE);
+            util.getHBaseAdmin().deleteTable(TABLE);
+        } catch (Exception e) {
+            // ignore table not found
+        }
+        Table table = util.createTable(TABLE, FAM);
+        HRegionInfo hRegionInfo = new HRegionInfo(table.getName());
+        region = util.createLocalHRegion(hRegionInfo, table.getTableDescriptor());
+
+        gtInfo = newInfo();
+        GridTable gridTable = newTable(gtInfo);
+        IGTScanner scanner = gridTable.scan(new GTScanRequestBuilder().setInfo(gtInfo).setRanges(null)
+                .setDimensions(null).setFilterPushDown(null).createGTScanRequest());
+        for (GTRecord record : scanner) {
+            byte[] value = record.exportColumns(gtInfo.getPrimaryKey()).toBytes();
+            byte[] key = new byte[RowConstants.ROWKEY_SHARD_AND_CUBOID_LEN + value.length];
+            System.arraycopy(Bytes.toBytes(baseCuboid), 0, key, RowConstants.ROWKEY_SHARDID_LEN,
+                    RowConstants.ROWKEY_CUBOIDID_LEN);
+            System.arraycopy(value, 0, key, RowConstants.ROWKEY_SHARD_AND_CUBOID_LEN, value.length);
+            Put put = new Put(key);
+            put.addColumn(FAM, COL_M, record.exportColumns(gtInfo.getColumnBlock(1)).toBytes());
+            region.put(put);
+        }
+    }
+
+    @BeforeClass
+    public static void setupBeforeClass() throws Exception {
+        Configuration conf = util.getConfiguration();
+        conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
+                TestRowProcessorEndpoint.RowProcessorEndpoint.class.getName());
+        conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
+        conf.setInt(HConstants.MASTER_PORT, 17000);
+        conf.setInt(HConstants.MASTER_INFO_PORT, 17010);
+        conf.setInt(HConstants.REGIONSERVER_PORT, 17020);
+        conf.setLong("hbase.hregion.row.processor.timeout", 1000L);
+        util.startMiniCluster();
+        staticCreateTestMetadata();
+
+        prepareTestData();
+    }
+
+    @AfterClass
+    public static void tearDownAfterClass() throws Exception {
+        util.shutdownMiniCluster();
+        staticCleanupTestMetadata();
+    }
+
+    @Test(expected = CoprocessorException.class)
+    public void testStart() throws IOException {
+        CoprocessorEnvironment env = PowerMockito.mock(RegionServerCoprocessorEnvironment.class);
+        CubeVisitService service = new CubeVisitService();
+        service.start(env);
+    }
+
+    @Test
+    public void testVisitCube() throws Exception {
+        RawScan rawScan = mockFullScan(gtInfo, getTestConfig());
+
+        CoprocessorEnvironment env = PowerMockito.mock(RegionCoprocessorEnvironment.class);
+        PowerMockito.when(env, "getRegion").thenReturn(region);
+
+        final CubeVisitService service = new CubeVisitService();
+        service.start(env);
+
+        CubeVisitProtos.CubeVisitRequest request = mockFullScanRequest(gtInfo, Lists.newArrayList(rawScan));
+
+        RpcCallback<CubeVisitProtos.CubeVisitResponse> done = new RpcCallback<CubeVisitProtos.CubeVisitResponse>() {
+            @Override
+            public void run(CubeVisitProtos.CubeVisitResponse result) {
+                CubeVisitProtos.CubeVisitResponse.Stats stats = result.getStats();
+                Assert.assertEquals(0L, stats.getAggregatedRowCount());
+                Assert.assertEquals(0L, stats.getFilteredRowCount());
+                Assert.assertEquals(dateList.size() * userList.size(), stats.getScannedRowCount());
+
+                try {
+                    byte[] rawData = CompressionUtils
+                            .decompress(HBaseZeroCopyByteString.zeroCopyGetBytes(result.getCompressedRows()));
+                    PartitionResultIterator iterator = new PartitionResultIterator(rawData, gtInfo, setOf(0, 1, 2, 3));
+                    int nReturn = 0;
+                    while (iterator.hasNext()) {
+                        iterator.next();
+                        nReturn++;
+                    }
+                    Assert.assertEquals(dateList.size() * userList.size(), nReturn);
+                } catch (Exception e) {
+                    Assert.fail("Fail due to " + e);
+                }
+            }
+        };
+        service.visitCube(null, request, done);
+    }
+
+    @Test
+    public void testVisitCubeWithRuntimeAggregates() throws Exception {
+        RawScan rawScan = mockFullScan(gtInfo, getTestConfig());
+
+        CoprocessorEnvironment env = PowerMockito.mock(RegionCoprocessorEnvironment.class);
+        PowerMockito.when(env, "getRegion").thenReturn(region);
+
+        final CubeVisitService service = new CubeVisitService();
+        service.start(env);
+
+        final CubeVisitProtos.CubeVisitRequest request = mockScanRequestWithRuntimeAggregates(gtInfo,
+                Lists.newArrayList(rawScan));
+
+        RpcCallback<CubeVisitProtos.CubeVisitResponse> done = new RpcCallback<CubeVisitProtos.CubeVisitResponse>() {
+            @Override
+            public void run(CubeVisitProtos.CubeVisitResponse result) {
+                try {
+                    byte[] rawData = CompressionUtils
+                            .decompress(HBaseZeroCopyByteString.zeroCopyGetBytes(result.getCompressedRows()));
+                    PartitionResultIterator iterator = new PartitionResultIterator(rawData, gtInfo, setOf(1, 3));
+                    Map<String, BigDecimal> actRet = Maps.newHashMap();
+                    while (iterator.hasNext()) {
+                        GTRecord record = iterator.next();
+                        String key = (String) record.decodeValue(1);
+                        BigDecimal value = (BigDecimal) record.decodeValue(3);
+                        actRet.put(key, value);
+                    }
+
+                    Map<String, BigDecimal> innerExpUserRet = Maps.newHashMap();
+                    for (String key : expUserRet.keySet()) {
+                        BigDecimal value = new BigDecimal(0);
+                        if (key.equals("Ken")) {
+                            value = value.add(expUserRet.get(key));
+                            value = value.multiply(new BigDecimal(2));
+                            value = value.add(userCnt);
+                        } else {
+                            value = value.add(userCnt);
+                        }
+                        innerExpUserRet.put(key, value);
+                    }
+                    Assert.assertEquals(innerExpUserRet, actRet);
+                } catch (Exception e) {
+                    Assert.fail("Fail due to " + e);
+                }
+            }
+        };
+        service.visitCube(null, request, done);
+    }
+
+    @Test
+    public void testVisitCubeWithRuntimeDimensions() throws Exception {
+        GTInfo.Builder builder = GTInfo.builder();
+        builder.setColumns(//
+                DataType.getType("date"), //
+                DataType.getType("string"), //
+                DataType.getType("decimal"), //
+                DataType.getType("decimal") // for runtime aggregation
+        );
+        builder.enableDynamicDims(setOf(3));
+
+        final GTInfo gtInfo = newInfo(builder);
+        RawScan rawScan = mockFullScan(gtInfo, getTestConfig());
+
+        CoprocessorEnvironment env = PowerMockito.mock(RegionCoprocessorEnvironment.class);
+        PowerMockito.when(env, "getRegion").thenReturn(region);
+
+        final CubeVisitService service = new CubeVisitService();
+        service.start(env);
+
+        CubeVisitProtos.CubeVisitRequest request = mockScanRequestWithRuntimeDimensions(gtInfo,
+                Lists.newArrayList(rawScan));
+
+        RpcCallback<CubeVisitProtos.CubeVisitResponse> done = new RpcCallback<CubeVisitProtos.CubeVisitResponse>() {
+            @Override
+            public void run(CubeVisitProtos.CubeVisitResponse result) {
+                try {
+                    byte[] rawData = CompressionUtils
+                            .decompress(HBaseZeroCopyByteString.zeroCopyGetBytes(result.getCompressedRows()));
+                    PartitionResultIterator iterator = new PartitionResultIterator(rawData, gtInfo, setOf(2, 3));
+                    Map<BigDecimal, BigDecimal> actRet = Maps.newHashMap();
+                    while (iterator.hasNext()) {
+                        GTRecord record = iterator.next();
+                        BigDecimal key = (BigDecimal) record.decodeValue(3);
+                        BigDecimal value = (BigDecimal) record.decodeValue(2);
+                        actRet.put(key, value);
+                    }
+
+                    Map<BigDecimal, BigDecimal> innerExpUserRet = Maps.newHashMap();
+                    for (String key : expUserRet.keySet()) {
+                        BigDecimal keyI;
+                        if (key.equals("Ken")) {
+                            keyI = new BigDecimal(1);
+                        } else {
+                            keyI = new BigDecimal(2);
+                        }
+                        BigDecimal value = innerExpUserRet.get(keyI);
+                        if (value == null) {
+                            value = new BigDecimal(0);
+                        }
+                        value = value.add(expUserRet.get(key));
+                        innerExpUserRet.put(keyI, value);
+                    }
+                    Assert.assertEquals(innerExpUserRet, actRet);
+                } catch (Exception e) {
+                    Assert.fail("Fail due to " + e);
+                }
+            }
+        };
+        service.visitCube(null, request, done);
+    }
+
+    public static CubeVisitProtos.CubeVisitRequest mockScanRequestWithRuntimeDimensions(GTInfo gtInfo,
+            List<RawScan> rawScans) throws IOException {
+        ImmutableBitSet dimensions = setOf();
+        ImmutableBitSet aggrGroupBy = setOf(3);
+        ImmutableBitSet aggrMetrics = setOf(2);
+        String[] aggrMetricsFuncs = { "SUM" };
+        ImmutableBitSet dynColumns = setOf(3);
+
+        TupleFilter whenFilter = getCompareTupleFilter(1, "Ken");
+        TupleExpression thenExpr = new NumberTupleExpression(1);
+
+        List<Pair<TupleFilter, TupleExpression>> whenList = Lists.newArrayList();
+        whenList.add(new Pair<>(whenFilter, thenExpr));
+
+        TupleExpression elseExpr = new NumberTupleExpression(2);
+
+        /**
+         * case
+         *  when user = 'Ken' then 1
+         *  else 2
+         * end
+         */
+        TupleExpression caseExpression = new CaseTupleExpression(whenList, elseExpr);
+
+        Map<Integer, TupleExpression> tupleExpressionMap = Maps.newHashMap();
+        tupleExpressionMap.put(3, caseExpression);
+
+        GTScanRequest scanRequest = new GTScanRequestBuilder().setInfo(gtInfo).setRanges(null)//
+                .setDimensions(dimensions).setAggrGroupBy(aggrGroupBy)//
+                .setAggrMetrics(aggrMetrics).setAggrMetricsFuncs(aggrMetricsFuncs)//
+                .setDynamicColumns(dynColumns).setExprsPushDown(tupleExpressionMap)//
+                .setStartTime(System.currentTimeMillis()).createGTScanRequest();
+
+        final List<CubeVisitProtos.CubeVisitRequest.IntList> intListList = mockIntList(setOf(2));
+        return mockScanRequest(rawScans, scanRequest, intListList);
+    }
+
+    public static CubeVisitProtos.CubeVisitRequest mockScanRequestWithRuntimeAggregates(GTInfo gtInfo,
+            List<RawScan> rawScans) throws IOException {
+        ImmutableBitSet dimensions = setOf(1);
+        ImmutableBitSet aggrGroupBy = setOf(1);
+        ImmutableBitSet aggrMetrics = setOf(3);
+        String[] aggrMetricsFuncs = { "SUM" };
+        ImmutableBitSet dynColumns = setOf(3);
+        ImmutableBitSet rtAggrMetrics = setOf(2);
+
+        TupleFilter whenFilter = getCompareTupleFilter(1, "Ken");
+        TupleExpression colExpression = new ColumnTupleExpression(gtInfo.colRef(2));
+        TupleExpression constExpression1 = new NumberTupleExpression(1);
+        TupleExpression constExpression2 = new NumberTupleExpression(2);
+        TupleExpression biExpression = new BinaryTupleExpression(ExpressionOperatorEnum.MULTIPLE,
+                Lists.newArrayList(colExpression, constExpression2));
+        TupleExpression thenExpression = new BinaryTupleExpression(ExpressionOperatorEnum.PLUS,
+                Lists.newArrayList(biExpression, constExpression1));
+
+        List<Pair<TupleFilter, TupleExpression>> whenList = Lists.newArrayList();
+        whenList.add(new Pair<>(whenFilter, thenExpression));
+
+        TupleExpression elseExpression = new NumberTupleExpression(1);
+
+        /**
+         * case
+         *  when user = 'Ken' then price * 2 + 1
+         *  else 1
+         * end
+         */
+        TupleExpression caseExpression = new CaseTupleExpression(whenList, elseExpression);
+
+        Map<Integer, TupleExpression> tupleExpressionMap = Maps.newHashMap();
+        tupleExpressionMap.put(3, caseExpression);
+
+        GTScanRequest scanRequest = new GTScanRequestBuilder().setInfo(gtInfo).setRanges(null)//
+                .setDimensions(dimensions).setAggrGroupBy(aggrGroupBy)//
+                .setAggrMetrics(aggrMetrics).setAggrMetricsFuncs(aggrMetricsFuncs)//
+                .setRtAggrMetrics(rtAggrMetrics)//
+                .setDynamicColumns(dynColumns).setExprsPushDown(tupleExpressionMap)//
+                .setStartTime(System.currentTimeMillis()).createGTScanRequest();
+
+        final List<CubeVisitProtos.CubeVisitRequest.IntList> intListList = mockIntList(setOf(2));
+        return mockScanRequest(rawScans, scanRequest, intListList);
+    }
+
+    public static CompareTupleFilter getCompareTupleFilter(int col, Object value) {
+        TblColRef colRef = gtInfo.colRef(col);
+        ColumnTupleFilter colFilter = new ColumnTupleFilter(colRef);
+
+        ByteArray space = new ByteArray(gtInfo.getCodeSystem().maxCodeLength(col));
+        gtInfo.getCodeSystem().encodeColumnValue(col, value, space.asBuffer());
+        ConstantTupleFilter constFilter = new ConstantTupleFilter(space);
+
+        CompareTupleFilter compareFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.EQ);
+        compareFilter.addChild(colFilter);
+        compareFilter.addChild(constFilter);
+
+        return compareFilter;
+    }
+
+    public static CubeVisitProtos.CubeVisitRequest mockFullScanRequest(GTInfo gtInfo, List<RawScan> rawScans)
+            throws IOException {
+        GTScanRequest scanRequest = new GTScanRequestBuilder().setInfo(gtInfo).setRanges(null).setDimensions(null)
+                .setStartTime(System.currentTimeMillis()).createGTScanRequest();
+
+        final List<CubeVisitProtos.CubeVisitRequest.IntList> intListList = mockIntList(setOf(2, 3));
+        return mockScanRequest(rawScans, scanRequest, intListList);
+    }
+
+    public static CubeVisitProtos.CubeVisitRequest mockScanRequest(List<RawScan> rawScans, GTScanRequest scanRequest,
+            List<CubeVisitProtos.CubeVisitRequest.IntList> intListList) throws IOException {
+        final CubeVisitProtos.CubeVisitRequest.Builder builder = CubeVisitProtos.CubeVisitRequest.newBuilder();
+        builder.setGtScanRequest(CubeHBaseEndpointRPC.serializeGTScanReq(scanRequest))
+                .setHbaseRawScan(CubeHBaseEndpointRPC.serializeRawScans(rawScans));
+        for (CubeVisitProtos.CubeVisitRequest.IntList intList : intListList) {
+            builder.addHbaseColumnsToGT(intList);
+        }
+        builder.setRowkeyPreambleSize(RowConstants.ROWKEY_SHARD_AND_CUBOID_LEN);
+        builder.setKylinProperties(getTestConfig().exportAllToString());
+        builder.setQueryId(UUID.randomUUID().toString());
+        builder.setSpillEnabled(getTestConfig().getQueryCoprocessorSpillEnabled());
+        builder.setMaxScanBytes(getTestConfig().getPartitionMaxScanBytes());
+
+        return builder.build();
+    }
+
+    private static List<CubeVisitProtos.CubeVisitRequest.IntList> mockIntList(ImmutableBitSet selectedCols) {
+        List<List<Integer>> hbaseColumnsToGT = Lists.newArrayList();
+        hbaseColumnsToGT.add(Lists.newArrayList(selectedCols.iterator()));
+
+        List<CubeVisitProtos.CubeVisitRequest.IntList> hbaseColumnsToGTIntList = Lists.newArrayList();
+        for (List<Integer> list : hbaseColumnsToGT) {
+            hbaseColumnsToGTIntList.add(CubeVisitProtos.CubeVisitRequest.IntList.newBuilder().addAllInts(list).build());
+        }
+
+        return hbaseColumnsToGTIntList;
+    }
+
+    private static RawScan mockFullScan(GTInfo gtInfo, KylinConfig kylinConfig) {
+        final List<Pair<byte[], byte[]>> selectedColumns = Lists.newArrayList();
+        selectedColumns.add(new Pair<>(FAM, COL_M));
+
+        int headerLength = RowConstants.ROWKEY_SHARD_AND_CUBOID_LEN;
+        int bodyLength = 0;
+        ImmutableBitSet primaryKey = gtInfo.getPrimaryKey();
+        for (int i = 0; i < primaryKey.trueBitCount(); i++) {
+            bodyLength += gtInfo.getCodeSystem().getDimEnc(primaryKey.trueBitAt(i)).getLengthOfEncoding();
+        }
+        //Mock start key
+        byte[] start = new byte[headerLength + bodyLength];
+        BytesUtil.writeShort((short) 0, start, 0, RowConstants.ROWKEY_SHARDID_LEN);
+        System.arraycopy(Bytes.toBytes(baseCuboid), 0, start, RowConstants.ROWKEY_SHARDID_LEN,
+                RowConstants.ROWKEY_CUBOIDID_LEN);
+
+        //Mock end key
+        byte[] end = new byte[headerLength + bodyLength + 1];
+        for (int i = 0; i < end.length - 1; i++) {
+            end[i] = RowConstants.ROWKEY_UPPER_BYTE;
+        }
+        BytesUtil.writeShort((short) 0, end, 0, RowConstants.ROWKEY_SHARDID_LEN);
+        System.arraycopy(Bytes.toBytes(baseCuboid), 0, end, RowConstants.ROWKEY_SHARDID_LEN,
+                RowConstants.ROWKEY_CUBOIDID_LEN);
+
+        //Mock fuzzy key
+        List<Pair<byte[], byte[]>> fuzzyKeys = Collections.emptyList();
+
+        return new RawScan(start, end, selectedColumns, fuzzyKeys, kylinConfig.getHBaseScanCacheRows(),
+                kylinConfig.getHBaseScanMaxResultSize());
+    }
+
+    private static GridTable newTable(GTInfo info) throws IOException {
+        GTSimpleMemStore store = new GTSimpleMemStore(info);
+        GridTable table = new GridTable(info, store);
+        GTRecord record = new GTRecord(info);
+
+        Random rand = new Random();
+        GTBuilder builder = table.rebuild();
+        expUserRet.clear();
+        Map<String, List<BigDecimal>> contents = Maps.newHashMap();
+        for (String date : dateList) {
+            for (String user : userList) {
+                List<BigDecimal> innerList = contents.get(user);
+                if (innerList == null) {
+                    innerList = Lists.newArrayList();
+                    contents.put(user, innerList);
+                }
+
+                BigDecimal value = priceList.get(rand.nextInt(priceList.size()));
+                innerList.add(value);
+
+                builder.write(record.setValues(date, user, value, new BigDecimal(0)));
+            }
+        }
+        for (String user : contents.keySet()) {
+            BigDecimal sum = new BigDecimal(0);
+            for (BigDecimal innerValue : contents.get(user)) {
+                sum = sum.add(innerValue);
+            }
+            expUserRet.put(user, sum);
+        }
+        builder.close();
+
+        return table;
+    }
+
+    private static GTInfo newInfo() {
+        GTInfo.Builder builder = GTInfo.builder();
+        builder.setColumns(//
+                DataType.getType("date"), //
+                DataType.getType("string"), //
+                DataType.getType("decimal"), //
+                DataType.getType("decimal") // for runtime aggregation
+        );
+        return newInfo(builder);
+    }
+
+    private static GTInfo newInfo(GTInfo.Builder builder) {
+        //Dimension
+        ImmutableBitSet dimensionColumns = setOf(0, 1);
+        DimensionEncoding[] dimEncs = new DimensionEncoding[2];
+        dimEncs[0] = new DateDimEnc();
+        dimEncs[1] = new DictionaryDimEnc(strsToDict(userList));
+        builder.setCodeSystem(new CubeCodeSystem(dimEncs));
+        builder.setPrimaryKey(dimensionColumns);
+
+        //Measure
+        ImmutableBitSet measureColumns = setOf(2, 3);
+
+        builder.enableColumnBlock(new ImmutableBitSet[] { dimensionColumns, measureColumns });
+        GTInfo info = builder.build();
+        return info;
+    }
+
+    @SuppressWarnings("rawtypes")
+    private static Dictionary strsToDict(Collection<String> strs) {
+        TrieDictionaryBuilder<String> builder = new TrieDictionaryBuilder<>(new StringBytesConverter());
+        for (String str : strs) {
+            builder.addValue(str);
+        }
+        return builder.build(0);
+    }
+
+    public static ImmutableBitSet setOf(int... values) {
+        BitSet set = new BitSet();
+        for (int i : values)
+            set.set(i);
+        return new ImmutableBitSet(set);
+    }
+}