You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/12/08 05:53:39 UTC

[14/19] kylin git commit: KYLIN-2255 drop v1 coprocessor impl

KYLIN-2255 drop v1 coprocessor impl


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/545201f6
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/545201f6
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/545201f6

Branch: refs/heads/master-cdh5.7
Commit: 545201f6c0c2e4da9b3dcabac0f606e2ef880d19
Parents: 21bcd2f
Author: Li Yang <li...@apache.org>
Authored: Thu Dec 8 10:41:18 2016 +0800
Committer: Li Yang <li...@apache.org>
Committed: Thu Dec 8 10:54:58 2016 +0800

----------------------------------------------------------------------
 .../apache/kylin/query/ITCombinationTest.java   |   15 -
 .../org/apache/kylin/query/KylinTestBase.java   |    2 -
 .../kylin/rest/controller/CubeController.java   |   14 -
 .../hbase/cube/v1/CubeSegmentTupleIterator.java |  344 -----
 .../storage/hbase/cube/v1/CubeStorageQuery.java |  796 -----------
 .../hbase/cube/v1/CubeTupleConverter.java       |  271 ----
 .../hbase/cube/v1/RegionScannerAdapter.java     |   97 --
 .../hbase/cube/v1/ResultScannerAdapter.java     |  100 --
 .../cube/v1/SerializedHBaseTupleIterator.java   |  156 ---
 .../observer/AggregateRegionObserver.java       |  112 --
 .../observer/AggregationScanner.java            |  188 ---
 .../observer/ObserverAggregationCache.java      |  166 ---
 .../observer/ObserverAggregators.java           |  265 ----
 .../coprocessor/observer/ObserverEnabler.java   |  191 ---
 .../v1/coprocessor/observer/ObserverTuple.java  |   71 -
 .../hbase/cube/v1/filter/FuzzyRowFilterV2.java  |  574 --------
 .../hbase/cube/v1/filter/UnsafeAccess.java      |  433 ------
 .../v1/filter/generated/FilterProtosExt.java    | 1298 ------------------
 .../cube/v1/filter/protobuf/FilterExt.proto     |   39 -
 .../cube/MeasureTypeOnlyAggrInBaseTest.java     |   21 -
 .../observer/AggregateRegionObserverTest.java   |  339 -----
 .../observer/RowAggregatorsTest.java            |   62 -
 .../cube/v1/filter/TestFuzzyRowFilterV2.java    |  249 ----
 .../v1/filter/TestFuzzyRowFilterV2EndToEnd.java |  346 -----
 24 files changed, 6149 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/545201f6/kylin-it/src/test/java/org/apache/kylin/query/ITCombinationTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/query/ITCombinationTest.java b/kylin-it/src/test/java/org/apache/kylin/query/ITCombinationTest.java
index 496ac4e..84573b5 100644
--- a/kylin-it/src/test/java/org/apache/kylin/query/ITCombinationTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/query/ITCombinationTest.java
@@ -27,7 +27,6 @@ import org.apache.kylin.metadata.realization.RealizationType;
 import org.apache.kylin.query.routing.Candidate;
 import org.apache.kylin.query.routing.rules.RemoveBlackoutRealizationsRule;
 import org.apache.kylin.storage.hbase.HBaseStorage;
-import org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer.ObserverEnabler;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.runner.RunWith;
@@ -67,7 +66,6 @@ public class ITCombinationTest extends ITKylinQueryTest {
     public static Collection<Object[]> configs() {
         return Arrays.asList(new Object[][] { //
                 { "inner", "on", "v2", true }, //
-                { "left", "on", "v1", true }, //
                 { "left", "on", "v2", true }, //
                 //{ "inner", "on", "v2", false }, // exclude view to simply model/cube selection
                 //{ "left", "on", "v1", false }, // exclude view to simply model/cube selection
@@ -84,23 +82,10 @@ public class ITCombinationTest extends ITKylinQueryTest {
         ITKylinQueryTest.joinType = joinType;
         ITKylinQueryTest.setupAll();
 
-        if (coprocessorToggle.equals("on")) {
-            ObserverEnabler.forceCoprocessorOn();
-        } else if (coprocessorToggle.equals("off")) {
-            ObserverEnabler.forceCoprocessorOff();
-        } else if (coprocessorToggle.equals("unset")) {
-            // unset
-        }
-
         RemoveBlackoutRealizationsRule.blackList.clear();
         if (excludeViewCubes) {
             RemoveBlackoutRealizationsRule.blackList.add("CUBE[name=test_kylin_cube_with_view_left_join_empty]");
             RemoveBlackoutRealizationsRule.blackList.add("CUBE[name=test_kylin_cube_with_view_inner_join_empty]");
         }
-
-        if ("v1".equalsIgnoreCase(queryEngine))
-            HBaseStorage.overwriteStorageQuery = HBaseStorage.v1CubeStorageQuery;
-        else
-            HBaseStorage.overwriteStorageQuery = null;
     }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/545201f6/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java b/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
index 52461c4..bcf55e5 100644
--- a/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
+++ b/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
@@ -52,7 +52,6 @@ import org.apache.kylin.query.enumerator.OLAPQuery;
 import org.apache.kylin.query.relnode.OLAPContext;
 import org.apache.kylin.query.routing.rules.RemoveBlackoutRealizationsRule;
 import org.apache.kylin.query.schema.OLAPSchemaFactory;
-import org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer.ObserverEnabler;
 import org.dbunit.Assertion;
 import org.dbunit.database.DatabaseConfig;
 import org.dbunit.database.DatabaseConnection;
@@ -628,7 +627,6 @@ public class KylinTestBase {
         if (h2Connection != null)
             closeConnection(h2Connection);
 
-        ObserverEnabler.forceCoprocessorUnset();
         HBaseMetadataTestCase.staticCleanupTestMetadata();
         RemoveBlackoutRealizationsRule.blackList.clear();
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/545201f6/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
index ab32551..aef1ffc 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
@@ -55,7 +55,6 @@ import org.apache.kylin.rest.response.HBaseResponse;
 import org.apache.kylin.rest.service.CubeService;
 import org.apache.kylin.rest.service.JobService;
 import org.apache.kylin.source.kafka.util.KafkaClient;
-import org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer.ObserverEnabler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -198,19 +197,6 @@ public class CubeController extends BasicController {
         }
     }
 
-    @RequestMapping(value = "/{cubeName}/coprocessor", method = { RequestMethod.PUT })
-    @ResponseBody
-    public Map<String, Boolean> updateCubeCoprocessor(@PathVariable String cubeName, @RequestParam(value = "force") String force) {
-        try {
-            ObserverEnabler.updateCubeOverride(cubeName, force);
-            return ObserverEnabler.getCubeOverrides();
-        } catch (Exception e) {
-            String message = "Failed to update cube coprocessor: " + cubeName + " : " + force;
-            logger.error(message, e);
-            throw new InternalErrorException(message + " Caused by: " + e.getMessage(), e);
-        }
-    }
-
     /**
      * Force rebuild a cube's lookup table snapshot
      *

http://git-wip-us.apache.org/repos/asf/kylin/blob/545201f6/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java
deleted file mode 100644
index 8ac3832..0000000
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java
+++ /dev/null
@@ -1,344 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.storage.hbase.cube.v1;
-
-import java.text.MessageFormat;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
-import java.util.Set;
-
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
-import org.apache.hadoop.hbase.filter.Filter;
-import org.apache.hadoop.hbase.filter.FuzzyRowFilter;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.persistence.StorageException;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.model.HBaseColumnDesc;
-import org.apache.kylin.measure.MeasureType;
-import org.apache.kylin.metadata.filter.TupleFilter;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.tuple.ITupleIterator;
-import org.apache.kylin.metadata.tuple.Tuple;
-import org.apache.kylin.metadata.tuple.TupleInfo;
-import org.apache.kylin.storage.StorageContext;
-import org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer.ObserverEnabler;
-import org.apache.kylin.storage.hbase.cube.v1.filter.FuzzyRowFilterV2;
-import org.apache.kylin.storage.hbase.steps.RowValueDecoder;
-import org.apache.kylin.storage.translate.HBaseKeyRange;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-
-/**
- * @author xjiang
- *
- */
-public class CubeSegmentTupleIterator implements ITupleIterator {
-
-    public static final Logger logger = LoggerFactory.getLogger(CubeSegmentTupleIterator.class);
-
-    protected final CubeSegment cubeSeg;
-    private final TupleFilter filter;
-    private final Collection<TblColRef> groupBy;
-    protected final List<RowValueDecoder> rowValueDecoders;
-    private final StorageContext context;
-    private final String tableName;
-    private final HTableInterface table;
-
-    protected CubeTupleConverter tupleConverter;
-    protected final Iterator<HBaseKeyRange> rangeIterator;
-    protected final Tuple oneTuple; // avoid new instance
-
-    private Scan scan;
-    private ResultScanner scanner;
-    protected Iterator<Result> resultIterator;
-    protected int scanCount;
-    protected int scanCountDelta;
-    protected Tuple next;
-    protected final Cuboid cuboid;
-
-    private List<MeasureType.IAdvMeasureFiller> advMeasureFillers;
-    private int advMeasureRowsRemaining;
-    private int advMeasureRowIndex;
-
-    public CubeSegmentTupleIterator(CubeSegment cubeSeg, List<HBaseKeyRange> keyRanges, HConnection conn, //
-            Set<TblColRef> dimensions, TupleFilter filter, Set<TblColRef> groupBy, //
-            List<RowValueDecoder> rowValueDecoders, StorageContext context, TupleInfo returnTupleInfo) {
-        this.cubeSeg = cubeSeg;
-        this.filter = filter;
-        this.groupBy = groupBy;
-        this.rowValueDecoders = rowValueDecoders;
-        this.context = context;
-        this.tableName = cubeSeg.getStorageLocationIdentifier();
-
-        cuboid = keyRanges.get(0).getCuboid();
-        for (HBaseKeyRange range : keyRanges) {
-            assert cuboid.equals(range.getCuboid());
-        }
-
-        this.tupleConverter = new CubeTupleConverter(cubeSeg, cuboid, rowValueDecoders, returnTupleInfo);
-        this.oneTuple = new Tuple(returnTupleInfo);
-        this.rangeIterator = keyRanges.iterator();
-
-        try {
-            this.table = conn.getTable(tableName);
-        } catch (Throwable t) {
-            throw new StorageException("Error when open connection to table " + tableName, t);
-        }
-    }
-
-    @Override
-    public boolean hasNext() {
-
-        if (next != null)
-            return true;
-
-        // consume any left rows from advanced measure filler
-        if (advMeasureRowsRemaining > 0) {
-            for (MeasureType.IAdvMeasureFiller filler : advMeasureFillers) {
-                filler.fillTuple(oneTuple, advMeasureRowIndex);
-            }
-            advMeasureRowIndex++;
-            advMeasureRowsRemaining--;
-            next = oneTuple;
-            return true;
-        }
-
-        if (resultIterator == null) {
-            if (rangeIterator.hasNext() == false)
-                return false;
-
-            resultIterator = doScan(rangeIterator.next());
-        }
-
-        if (resultIterator.hasNext() == false) {
-            closeScanner();
-            resultIterator = null;
-            return hasNext();
-        }
-
-        Result result = resultIterator.next();
-        scanCount++;
-        if (++scanCountDelta >= 1000)
-            flushScanCountDelta();
-
-        // translate into tuple
-        advMeasureFillers = tupleConverter.translateResult(result, oneTuple);
-
-        // the simple case
-        if (advMeasureFillers == null) {
-            next = oneTuple;
-            return true;
-        }
-
-        // advanced measure filling, like TopN, will produce multiple tuples out of one record
-        advMeasureRowsRemaining = -1;
-        for (MeasureType.IAdvMeasureFiller filler : advMeasureFillers) {
-            if (advMeasureRowsRemaining < 0)
-                advMeasureRowsRemaining = filler.getNumOfRows();
-            if (advMeasureRowsRemaining != filler.getNumOfRows())
-                throw new IllegalStateException();
-        }
-        if (advMeasureRowsRemaining < 0)
-            throw new IllegalStateException();
-
-        advMeasureRowIndex = 0;
-        return hasNext();
-
-    }
-
-    @Override
-    public Tuple next() {
-        if (next == null) {
-            hasNext();
-            if (next == null)
-                throw new NoSuchElementException();
-        }
-        Tuple r = next;
-        next = null;
-        return r;
-    }
-
-    @Override
-    public void remove() {
-        throw new UnsupportedOperationException();
-    }
-
-    protected final Iterator<Result> doScan(HBaseKeyRange keyRange) {
-        Iterator<Result> iter = null;
-        try {
-            scan = buildScan(keyRange);
-            applyFuzzyFilter(scan, keyRange);
-            logScan(keyRange);
-
-            scanner = ObserverEnabler.scanWithCoprocessorIfBeneficial(cubeSeg, keyRange.getCuboid(), filter, groupBy, rowValueDecoders, context, table, scan);
-
-            iter = scanner.iterator();
-        } catch (Throwable t) {
-            String msg = MessageFormat.format("Error when scan from lower key {1} to upper key {2} on table {0}.", tableName, Bytes.toString(keyRange.getStartKey()), Bytes.toString(keyRange.getStopKey()));
-            throw new StorageException(msg, t);
-        }
-        return iter;
-    }
-
-    private void logScan(HBaseKeyRange keyRange) {
-        StringBuilder info = new StringBuilder();
-        info.append("Scan hbase table ").append(tableName).append(": ");
-        if (keyRange.getCuboid().requirePostAggregation()) {
-            info.append(" cuboid require post aggregation, from ");
-        } else {
-            info.append(" cuboid exact match, from ");
-        }
-        info.append(keyRange.getCuboid().getInputID());
-        info.append(" to ");
-        info.append(keyRange.getCuboid().getId());
-        info.append(" Start: ");
-        info.append(keyRange.getStartKeyAsString());
-        info.append(" - ");
-        info.append(Bytes.toStringBinary(keyRange.getStartKey()));
-        info.append(" Stop:  ");
-        info.append(keyRange.getStopKeyAsString());
-        info.append(" - ");
-        info.append(Bytes.toStringBinary(keyRange.getStopKey()));
-        if (this.scan.getFilter() != null) {
-            info.append(" Fuzzy key counts: " + keyRange.getFuzzyKeys().size());
-            info.append(" Fuzzy: ");
-            info.append(keyRange.getFuzzyKeyAsString());
-        }
-        logger.info(info.toString());
-    }
-
-    private Scan buildScan(HBaseKeyRange keyRange) {
-        Scan scan = new Scan();
-        tuneScanParameters(scan);
-        scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_ENABLE, Bytes.toBytes(Boolean.TRUE));
-        for (RowValueDecoder valueDecoder : this.rowValueDecoders) {
-            HBaseColumnDesc hbaseColumn = valueDecoder.getHBaseColumn();
-            byte[] byteFamily = Bytes.toBytes(hbaseColumn.getColumnFamilyName());
-            byte[] byteQualifier = Bytes.toBytes(hbaseColumn.getQualifier());
-            scan.addColumn(byteFamily, byteQualifier);
-        }
-        scan.setStartRow(keyRange.getStartKey());
-        scan.setStopRow(keyRange.getStopKey());
-        return scan;
-    }
-
-    private void tuneScanParameters(Scan scan) {
-        KylinConfig config = cubeSeg.getCubeDesc().getConfig();
-
-        scan.setCaching(config.getHBaseScanCacheRows());
-        scan.setMaxResultSize(config.getHBaseScanMaxResultSize());
-        scan.setCacheBlocks(true);
-
-        // cache less when there are memory hungry measures
-        //        if (RowValueDecoder.hasMemHungryMeasures(rowValueDecoders)) {
-        //            scan.setCaching(scan.getCaching() / 10);
-        //        }
-    }
-
-    private void applyFuzzyFilter(Scan scan, HBaseKeyRange keyRange) {
-        List<org.apache.kylin.common.util.Pair<byte[], byte[]>> fuzzyKeys = keyRange.getFuzzyKeys();
-        if (fuzzyKeys != null && fuzzyKeys.size() > 0) {
-
-            //FuzzyRowFilterV2 is a back ported from https://issues.apache.org/jira/browse/HBASE-13761
-            //However we found a bug of it and fixed it in https://issues.apache.org/jira/browse/HBASE-14269
-            //After fix the performance is not much faster than the original one. So by default use defalt one.
-            boolean useFuzzyRowFilterV2 = false;
-            Filter fuzzyFilter = null;
-            if (useFuzzyRowFilterV2) {
-                fuzzyFilter = new FuzzyRowFilterV2(convertToHBasePair(fuzzyKeys));
-            } else {
-                fuzzyFilter = new FuzzyRowFilter(convertToHBasePair(fuzzyKeys));
-            }
-
-            Filter filter = scan.getFilter();
-            if (filter != null) {
-                throw new RuntimeException("Scan filter not empty : " + filter);
-            } else {
-                scan.setFilter(fuzzyFilter);
-            }
-        }
-    }
-
-    private List<org.apache.hadoop.hbase.util.Pair<byte[], byte[]>> convertToHBasePair(List<org.apache.kylin.common.util.Pair<byte[], byte[]>> pairList) {
-        List<org.apache.hadoop.hbase.util.Pair<byte[], byte[]>> result = Lists.newArrayList();
-        for (org.apache.kylin.common.util.Pair<byte[], byte[]> pair : pairList) {
-            org.apache.hadoop.hbase.util.Pair<byte[], byte[]> element = new org.apache.hadoop.hbase.util.Pair<byte[], byte[]>(pair.getFirst(), pair.getSecond());
-            result.add(element);
-        }
-
-        return result;
-    }
-
-    protected void closeScanner() {
-        flushScanCountDelta();
-
-        if (logger.isDebugEnabled() && scan != null) {
-            byte[] metricsBytes = scan.getAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA);
-            if (metricsBytes != null) {
-                ScanMetrics scanMetrics = ProtobufUtil.toScanMetrics(metricsBytes);
-                logger.debug("HBase Metrics when scanning " + this.tableName + " count={}, ms={}, bytes={}, remote_bytes={}, regions={}, not_serving_region={}, rpc={}, rpc_retries={}, remote_rpc={}, remote_rpc_retries={}", //
-                        new Object[] { scanCount, scanMetrics.sumOfMillisSecBetweenNexts, scanMetrics.countOfBytesInResults, scanMetrics.countOfBytesInRemoteResults, scanMetrics.countOfRegions, scanMetrics.countOfNSRE, scanMetrics.countOfRPCcalls, scanMetrics.countOfRPCRetries, scanMetrics.countOfRemoteRPCcalls, scanMetrics.countOfRemoteRPCRetries });
-            }
-            scan = null;
-        }
-        try {
-            if (scanner != null) {
-                scanner.close();
-                scanner = null;
-            }
-        } catch (Throwable t) {
-            throw new StorageException("Error when close scanner for table " + tableName, t);
-        }
-    }
-
-    private void closeTable() {
-        try {
-            if (table != null) {
-                table.close();
-            }
-        } catch (Throwable t) {
-            throw new StorageException("Error when close table " + tableName, t);
-        }
-    }
-
-    @Override
-    public void close() {
-        logger.info("Closing CubeSegmentTupleIterator");
-        closeScanner();
-        closeTable();
-    }
-
-    protected void flushScanCountDelta() {
-        context.increaseTotalScanCount(scanCountDelta);
-        scanCountDelta = 0;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/545201f6/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
deleted file mode 100644
index 75c3fd7..0000000
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
+++ /dev/null
@@ -1,796 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.storage.hbase.cube.v1;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.BitSet;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
-
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.common.util.Dictionary;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.common.util.ShardingHash;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.RawQueryLastHacker;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.kv.RowConstants;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
-import org.apache.kylin.cube.model.HBaseColumnDesc;
-import org.apache.kylin.cube.model.HBaseMappingDesc;
-import org.apache.kylin.cube.model.RowKeyDesc;
-import org.apache.kylin.dict.lookup.LookupStringTable;
-import org.apache.kylin.measure.MeasureType;
-import org.apache.kylin.metadata.filter.ColumnTupleFilter;
-import org.apache.kylin.metadata.filter.CompareTupleFilter;
-import org.apache.kylin.metadata.filter.LogicalTupleFilter;
-import org.apache.kylin.metadata.filter.TupleFilter;
-import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
-import org.apache.kylin.metadata.model.FunctionDesc;
-import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.realization.SQLDigest;
-import org.apache.kylin.metadata.tuple.ITupleIterator;
-import org.apache.kylin.metadata.tuple.TupleInfo;
-import org.apache.kylin.storage.IStorageQuery;
-import org.apache.kylin.storage.StorageContext;
-import org.apache.kylin.storage.hbase.HBaseConnection;
-import org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer.ObserverEnabler;
-import org.apache.kylin.storage.hbase.steps.RowValueDecoder;
-import org.apache.kylin.storage.translate.ColumnValueRange;
-import org.apache.kylin.storage.translate.DerivedFilterTranslator;
-import org.apache.kylin.storage.translate.HBaseKeyRange;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-@SuppressWarnings("unused")
-public class CubeStorageQuery implements IStorageQuery {
-
-    private static final Logger logger = LoggerFactory.getLogger(CubeStorageQuery.class);
-
-    private static final int MERGE_KEYRANGE_THRESHOLD = 100;
-
-    private final CubeInstance cubeInstance;
-    private final CubeDesc cubeDesc;
-    private final String uuid;
-
-    public CubeStorageQuery(CubeInstance cube) {
-        this.cubeInstance = cube;
-        this.cubeDesc = cube.getDescriptor();
-        this.uuid = cube.getUuid();
-    }
-
-    @Override
-    public ITupleIterator search(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) {
-
-        //deal with participant columns in subquery join
-        sqlDigest.includeSubqueryJoinParticipants();
-
-        //cope with queries with no aggregations
-        RawQueryLastHacker.hackNoAggregations(sqlDigest, cubeDesc, returnTupleInfo);
-
-        // Customized measure taking effect: e.g. allow custom measures to help raw queries
-        notifyBeforeStorageQuery(sqlDigest);
-
-        Collection<TblColRef> groups = sqlDigest.groupbyColumns;
-        TupleFilter filter = sqlDigest.filter;
-
-        // build dimension & metrics
-        Collection<TblColRef> dimensions = new HashSet<TblColRef>();
-        Collection<FunctionDesc> metrics = new HashSet<FunctionDesc>();
-        buildDimensionsAndMetrics(dimensions, metrics, sqlDigest);
-
-        // all dimensions = groups + others
-        Set<TblColRef> others = Sets.newHashSet(dimensions);
-        others.removeAll(groups);
-
-        // expand derived
-        Set<TblColRef> derivedPostAggregation = Sets.newHashSet();
-        Set<TblColRef> groupsD = expandDerived(groups, derivedPostAggregation);
-        Set<TblColRef> othersD = expandDerived(others, derivedPostAggregation);
-        othersD.removeAll(groupsD);
-
-        // identify cuboid
-        Set<TblColRef> dimensionsD = Sets.newHashSet();
-        dimensionsD.addAll(groupsD);
-        dimensionsD.addAll(othersD);
-        Cuboid cuboid = identifyCuboid(dimensionsD, metrics);
-        context.setCuboid(cuboid);
-
-        // isExactAggregation? meaning: tuples returned from storage requires no further aggregation in query engine
-        Set<TblColRef> singleValuesD = findSingleValueColumns(filter);
-        boolean isExactAggregation = isExactAggregation(cuboid, groupsD, othersD, singleValuesD, derivedPostAggregation);
-        context.setExactAggregation(isExactAggregation);
-
-        // translate filter for scan range and compose returning groups for coprocessor, note:
-        // - columns on non-evaluatable filter have to return
-        // - columns on loosened filter (due to derived translation) have to return
-        Set<TblColRef> groupsCopD = Sets.newHashSet(groupsD);
-        collectNonEvaluable(filter, groupsCopD);
-        TupleFilter filterD = translateDerived(filter, groupsCopD);
-
-        // translate filter into segment scan ranges
-        List<HBaseKeyRange> scans = buildScanRanges(flattenToOrAndFilter(filterD), dimensionsD);
-
-        // check involved measures, build value decoder for each each family:column
-        List<RowValueDecoder> valueDecoders = translateAggregation(cubeDesc.getHbaseMapping(), metrics, context);
-
-        // memory hungry distinct count are pushed down to coprocessor, no need to set threshold any more
-        setThreshold(dimensionsD, valueDecoders, context); // set cautious threshold to prevent out of memory
-        setCoprocessor(groupsCopD, valueDecoders, context); // enable coprocessor if beneficial
-        setLimit(filter, context);
-
-        HConnection conn = HBaseConnection.get(context.getConnUrl());
-
-        // notice we're passing filterD down to storage instead of flatFilter
-        return new SerializedHBaseTupleIterator(conn, scans, cubeInstance, dimensionsD, filterD, groupsCopD, valueDecoders, context, returnTupleInfo);
-    }
-
-    private void buildDimensionsAndMetrics(Collection<TblColRef> dimensions, Collection<FunctionDesc> metrics, SQLDigest sqlDigest) {
-
-        for (FunctionDesc func : sqlDigest.aggregations) {
-            if (!func.isDimensionAsMetric()) {
-                metrics.add(func);
-            }
-        }
-
-        for (TblColRef column : sqlDigest.allColumns) {
-            // skip measure columns
-            if (sqlDigest.metricColumns.contains(column)) {
-                continue;
-            }
-
-            dimensions.add(column);
-        }
-    }
-
-    private Cuboid identifyCuboid(Set<TblColRef> dimensions, Collection<FunctionDesc> metrics) {
-        for (FunctionDesc metric : metrics) {
-            if (metric.getMeasureType().onlyAggrInBaseCuboid())
-                return Cuboid.getBaseCuboid(cubeDesc);
-        }
-
-        long cuboidID = 0;
-        for (TblColRef column : dimensions) {
-            int index = cubeDesc.getRowkey().getColumnBitIndex(column);
-            cuboidID |= 1L << index;
-        }
-        return Cuboid.findById(cubeDesc, cuboidID);
-    }
-
-    private boolean isExactAggregation(Cuboid cuboid, Collection<TblColRef> groups, Set<TblColRef> othersD, Set<TblColRef> singleValuesD, Set<TblColRef> derivedPostAggregation) {
-        boolean exact = true;
-
-        if (cuboid.requirePostAggregation()) {
-            exact = false;
-            logger.info("exactAggregation is false because cuboid " + cuboid.getInputID() + "=> " + cuboid.getId());
-        }
-
-        // derived aggregation is bad, unless expanded columns are already in group by
-        if (groups.containsAll(derivedPostAggregation) == false) {
-            exact = false;
-            logger.info("exactAggregation is false because derived column require post aggregation: " + derivedPostAggregation);
-        }
-
-        // other columns (from filter) is bad, unless they are ensured to have single value
-        if (singleValuesD.containsAll(othersD) == false) {
-            exact = false;
-            logger.info("exactAggregation is false because some column not on group by: " + othersD //
-                    + " (single value column: " + singleValuesD + ")");
-        }
-
-        if (exact) {
-            logger.info("exactAggregation is true");
-        }
-        return exact;
-    }
-
-    private Set<TblColRef> expandDerived(Collection<TblColRef> cols, Set<TblColRef> derivedPostAggregation) {
-        Set<TblColRef> expanded = Sets.newHashSet();
-        for (TblColRef col : cols) {
-            if (cubeDesc.hasHostColumn(col)) {
-                DeriveInfo hostInfo = cubeDesc.getHostInfo(col);
-                for (TblColRef hostCol : hostInfo.columns) {
-                    expanded.add(hostCol);
-                    if (hostInfo.isOneToOne == false)
-                        derivedPostAggregation.add(hostCol);
-                }
-            } else {
-                expanded.add(col);
-            }
-        }
-        return expanded;
-    }
-
-    @SuppressWarnings("unchecked")
-    private Set<TblColRef> findSingleValueColumns(TupleFilter filter) {
-        Collection<? extends TupleFilter> toCheck;
-        if (filter instanceof CompareTupleFilter) {
-            toCheck = Collections.singleton(filter);
-        } else if (filter instanceof LogicalTupleFilter && filter.getOperator() == FilterOperatorEnum.AND) {
-            toCheck = filter.getChildren();
-        } else {
-            return (Set<TblColRef>) Collections.EMPTY_SET;
-        }
-
-        Set<TblColRef> result = Sets.newHashSet();
-        for (TupleFilter f : toCheck) {
-            if (f instanceof CompareTupleFilter) {
-                CompareTupleFilter compFilter = (CompareTupleFilter) f;
-                // is COL=const ?
-                if (compFilter.getOperator() == FilterOperatorEnum.EQ && compFilter.getValues().size() == 1 && compFilter.getColumn() != null) {
-                    result.add(compFilter.getColumn());
-                }
-            }
-        }
-
-        // expand derived
-        Set<TblColRef> resultD = Sets.newHashSet();
-        for (TblColRef col : result) {
-            if (cubeDesc.isExtendedColumn(col)) {
-                throw new CubeDesc.CannotFilterExtendedColumnException(col);
-            }
-
-            if (cubeDesc.isDerived(col)) {
-                DeriveInfo hostInfo = cubeDesc.getHostInfo(col);
-                if (hostInfo.isOneToOne) {
-                    for (TblColRef hostCol : hostInfo.columns) {
-                        resultD.add(hostCol);
-                    }
-                }
-                //if not one2one, it will be pruned
-            } else {
-                resultD.add(col);
-            }
-        }
-        return resultD;
-    }
-
-    private void collectNonEvaluable(TupleFilter filter, Set<TblColRef> collector) {
-        if (filter == null)
-            return;
-
-        if (filter.isEvaluable()) {
-            for (TupleFilter child : filter.getChildren())
-                collectNonEvaluable(child, collector);
-        } else {
-            collectColumnsRecursively(filter, collector);
-        }
-    }
-
-    private void collectColumnsRecursively(TupleFilter filter, Set<TblColRef> collector) {
-        if (filter == null)
-            return;
-
-        if (filter instanceof ColumnTupleFilter) {
-            collectColumns(((ColumnTupleFilter) filter).getColumn(), collector);
-        }
-        for (TupleFilter child : filter.getChildren()) {
-            collectColumnsRecursively(child, collector);
-        }
-    }
-
-    private void collectColumns(TblColRef col, Set<TblColRef> collector) {
-        if (cubeDesc.isExtendedColumn(col)) {
-            throw new CubeDesc.CannotFilterExtendedColumnException(col);
-        }
-
-        if (cubeDesc.isDerived(col)) {
-            DeriveInfo hostInfo = cubeDesc.getHostInfo(col);
-            for (TblColRef h : hostInfo.columns)
-                collector.add(h);
-        } else {
-            collector.add(col);
-        }
-    }
-
-    @SuppressWarnings("unchecked")
-    private TupleFilter translateDerived(TupleFilter filter, Set<TblColRef> collector) {
-        if (filter == null)
-            return filter;
-
-        if (filter instanceof CompareTupleFilter) {
-            return translateDerivedInCompare((CompareTupleFilter) filter, collector);
-        }
-
-        List<TupleFilter> children = (List<TupleFilter>) filter.getChildren();
-        List<TupleFilter> newChildren = Lists.newArrayListWithCapacity(children.size());
-        boolean modified = false;
-        for (TupleFilter child : children) {
-            TupleFilter translated = translateDerived(child, collector);
-            newChildren.add(translated);
-            if (child != translated)
-                modified = true;
-        }
-        if (modified) {
-            filter = replaceChildren(filter, newChildren);
-        }
-        return filter;
-    }
-
-    private TupleFilter replaceChildren(TupleFilter filter, List<TupleFilter> newChildren) {
-        if (filter instanceof LogicalTupleFilter) {
-            LogicalTupleFilter r = new LogicalTupleFilter(filter.getOperator());
-            r.addChildren(newChildren);
-            return r;
-        } else
-            throw new IllegalStateException("Cannot replaceChildren on " + filter);
-    }
-
-    private TupleFilter translateDerivedInCompare(CompareTupleFilter compf, Set<TblColRef> collector) {
-        if (compf.getColumn() == null || compf.getValues().isEmpty())
-            return compf;
-
-        TblColRef derived = compf.getColumn();
-        if (cubeDesc.isExtendedColumn(derived)) {
-            throw new CubeDesc.CannotFilterExtendedColumnException(derived);
-        }
-        if (cubeDesc.isDerived(derived) == false) {
-            return compf;
-        }
-
-        DeriveInfo hostInfo = cubeDesc.getHostInfo(derived);
-        CubeManager cubeMgr = CubeManager.getInstance(this.cubeInstance.getConfig());
-        CubeSegment seg = cubeInstance.getLatestReadySegment();
-        LookupStringTable lookup = cubeMgr.getLookupTable(seg, hostInfo.join);
-        Pair<TupleFilter, Boolean> translated = DerivedFilterTranslator.translate(lookup, hostInfo, compf);
-        TupleFilter translatedFilter = translated.getFirst();
-        boolean loosened = translated.getSecond();
-        if (loosened) {
-            collectColumnsRecursively(translatedFilter, collector);
-        }
-        return translatedFilter;
-    }
-
-    private List<RowValueDecoder> translateAggregation(HBaseMappingDesc hbaseMapping, Collection<FunctionDesc> metrics, //
-            StorageContext context) {
-        Map<HBaseColumnDesc, RowValueDecoder> codecMap = Maps.newHashMap();
-        for (FunctionDesc aggrFunc : metrics) {
-            Collection<HBaseColumnDesc> hbCols = hbaseMapping.findHBaseColumnByFunction(aggrFunc);
-            if (hbCols.isEmpty()) {
-                throw new IllegalStateException("can't find HBaseColumnDesc for function " + aggrFunc.getFullExpression());
-            }
-            HBaseColumnDesc bestHBCol = null;
-            int bestIndex = -1;
-            for (HBaseColumnDesc hbCol : hbCols) {
-                bestHBCol = hbCol;
-                bestIndex = hbCol.findMeasure(aggrFunc);
-                // we used to prefer specific measure over another (holistic distinct count), now it's gone
-                break;
-            }
-
-            RowValueDecoder codec = codecMap.get(bestHBCol);
-            if (codec == null) {
-                codec = new RowValueDecoder(bestHBCol);
-                codecMap.put(bestHBCol, codec);
-            }
-            codec.setProjectIndex(bestIndex);
-        }
-        return new ArrayList<RowValueDecoder>(codecMap.values());
-    }
-
-    //check TupleFilter.flatFilter's comment
-    private TupleFilter flattenToOrAndFilter(TupleFilter filter) {
-        if (filter == null)
-            return null;
-
-        // core
-        TupleFilter flatFilter = filter.flatFilter();
-
-        // normalize to OR-AND filter
-        if (flatFilter.getOperator() == FilterOperatorEnum.AND) {
-            LogicalTupleFilter f = new LogicalTupleFilter(FilterOperatorEnum.OR);
-            f.addChild(flatFilter);
-            flatFilter = f;
-        }
-
-        if (flatFilter.getOperator() != FilterOperatorEnum.OR)
-            throw new IllegalStateException();
-
-        return flatFilter;
-    }
-
-    private List<HBaseKeyRange> buildScanRanges(TupleFilter flatFilter, Collection<TblColRef> dimensionColumns) {
-
-        List<HBaseKeyRange> result = Lists.newArrayList();
-
-        logger.info("Current cubeInstance is " + cubeInstance + " with " + cubeInstance.getSegments().size() + " segs in all");
-        List<CubeSegment> segs = cubeInstance.getSegments(SegmentStatusEnum.READY);
-        logger.info("READY segs count: " + segs.size());
-
-        // build row key range for each cube segment
-        StringBuilder sb = new StringBuilder("hbasekeyrange trace: ");
-        for (CubeSegment cubeSeg : segs) {
-            CubeDesc cubeDesc = cubeSeg.getCubeDesc();
-            if (cubeDesc.getConfig().isSkippingEmptySegments() && cubeSeg.getInputRecords() == 0) {
-                logger.info("Skip cube segment {} because its input record is 0", cubeSeg);
-                continue;
-            }
-            // consider derived (lookup snapshot), filter on dimension may
-            // differ per segment
-            List<Collection<ColumnValueRange>> orAndDimRanges = translateToOrAndDimRanges(flatFilter, cubeSeg);
-            if (orAndDimRanges == null) { // has conflict
-                continue;
-            }
-
-            List<HBaseKeyRange> scanRanges = Lists.newArrayListWithCapacity(orAndDimRanges.size());
-            for (Collection<ColumnValueRange> andDimRanges : orAndDimRanges) {
-                HBaseKeyRange rowKeyRange = new HBaseKeyRange(dimensionColumns, andDimRanges, cubeSeg, cubeDesc);
-                scanRanges.add(rowKeyRange);
-            }
-
-            //log
-            sb.append(scanRanges.size() + "=(mergeoverlap)>");
-
-            List<HBaseKeyRange> mergedRanges = mergeOverlapRanges(scanRanges);
-
-            //log
-            sb.append(mergedRanges.size() + "=(mergetoomany)>");
-
-            mergedRanges = mergeTooManyRanges(mergedRanges);
-
-            //log
-            sb.append(mergedRanges.size() + ",");
-
-            result.addAll(mergedRanges);
-        }
-        logger.info(sb.toString());
-
-        logger.info("hbasekeyrange count: " + result.size());
-
-        dropUnhitSegments(result);
-        logger.info("hbasekeyrange count after dropping unhit :" + result.size());
-
-        //TODO: should use LazyRowKeyEncoder.getRowKeysDifferentShards like CubeHBaseRPC, not do so because v1 query engine is retiring. not worth changing it
-        if (cubeDesc.isEnableSharding()) {
-            result = duplicateRangeByShard(result);
-        }
-        logger.info("hbasekeyrange count after dropping duplicatebyshard :" + result.size());
-
-        return result;
-    }
-
-    private List<Collection<ColumnValueRange>> translateToOrAndDimRanges(TupleFilter flatFilter, CubeSegment cubeSegment) {
-        List<Collection<ColumnValueRange>> result = Lists.newArrayList();
-
-        if (flatFilter == null) {
-            result.add(Collections.<ColumnValueRange> emptyList());
-            return result;
-        }
-
-        for (TupleFilter andFilter : flatFilter.getChildren()) {
-            if (andFilter.getOperator() != FilterOperatorEnum.AND) {
-                throw new IllegalStateException("Filter should be AND instead of " + andFilter);
-            }
-
-            Collection<ColumnValueRange> andRanges = translateToAndDimRanges(andFilter.getChildren(), cubeSegment);
-
-            if (andRanges != null) {
-                result.add(andRanges);
-            }
-        }
-
-        return preprocessConstantConditions(result);
-    }
-
-    private List<Collection<ColumnValueRange>> preprocessConstantConditions(List<Collection<ColumnValueRange>> orAndRanges) {
-        boolean globalAlwaysTrue = false;
-        Iterator<Collection<ColumnValueRange>> iterator = orAndRanges.iterator();
-        while (iterator.hasNext()) {
-            Collection<ColumnValueRange> andRanges = iterator.next();
-            Iterator<ColumnValueRange> iterator2 = andRanges.iterator();
-            boolean hasAlwaysFalse = false;
-            while (iterator2.hasNext()) {
-                ColumnValueRange range = iterator2.next();
-                if (range.satisfyAll())
-                    iterator2.remove();
-                else if (range.satisfyNone())
-                    hasAlwaysFalse = true;
-            }
-            if (hasAlwaysFalse) {
-                iterator.remove();
-            } else if (andRanges.isEmpty()) {
-                globalAlwaysTrue = true;
-                break;
-            }
-        }
-        if (globalAlwaysTrue) {
-            orAndRanges.clear();
-            orAndRanges.add(Collections.<ColumnValueRange> emptyList());
-        }
-        return orAndRanges;
-    }
-
-    // return empty collection to mean true; return null to mean false
-    @SuppressWarnings("unchecked")
-    private Collection<ColumnValueRange> translateToAndDimRanges(List<? extends TupleFilter> andFilters, CubeSegment cubeSegment) {
-        Map<TblColRef, ColumnValueRange> rangeMap = new HashMap<TblColRef, ColumnValueRange>();
-        for (TupleFilter filter : andFilters) {
-            if ((filter instanceof CompareTupleFilter) == false) {
-                continue;
-            }
-
-            CompareTupleFilter comp = (CompareTupleFilter) filter;
-            if (comp.getColumn() == null) {
-                continue;
-            }
-
-            ColumnValueRange range = new ColumnValueRange(comp.getColumn(), (Collection<String>) comp.getValues(), comp.getOperator());
-            andMerge(range, rangeMap);
-        }
-
-        // a little pre-evaluation to remove invalid EQ/IN values and round start/end according to dictionary
-        RowKeyDesc rowkey = cubeSegment.getCubeDesc().getRowkey();
-        Iterator<ColumnValueRange> it = rangeMap.values().iterator();
-        while (it.hasNext()) {
-            ColumnValueRange range = it.next();
-            if (rowkey.isUseDictionary(range.getColumn())) {
-                range.preEvaluateWithDict((Dictionary<String>) cubeSegment.getDictionary(range.getColumn()));
-            }
-            if (range.satisfyAll())
-                it.remove();
-            else if (range.satisfyNone())
-                return null;
-        }
-
-        return rangeMap.values();
-    }
-
-    private void andMerge(ColumnValueRange range, Map<TblColRef, ColumnValueRange> rangeMap) {
-        ColumnValueRange columnRange = rangeMap.get(range.getColumn());
-        if (columnRange == null) {
-            rangeMap.put(range.getColumn(), range);
-        } else {
-            columnRange.andMerge(range);
-        }
-    }
-
-    private List<HBaseKeyRange> mergeOverlapRanges(List<HBaseKeyRange> keyRanges) {
-        if (keyRanges.size() <= 1) {
-            return keyRanges;
-        }
-
-        if (logger.isDebugEnabled()) {
-            logger.debug("Merging key range from " + keyRanges.size());
-        }
-
-        // sort ranges by start key
-        Collections.sort(keyRanges);
-
-        // merge the overlap range
-        List<HBaseKeyRange> mergedRanges = new LinkedList<HBaseKeyRange>();
-        int beginIndex = 0;
-        byte[] maxStopKey = keyRanges.get(0).getStopKey();
-        for (int index = 0; index < keyRanges.size(); index++) {
-            HBaseKeyRange keyRange = keyRanges.get(index);
-            if (Bytes.compareTo(maxStopKey, keyRange.getStartKey()) < 0) {
-                // merge the current key ranges
-                HBaseKeyRange mergedRange = mergeKeyRange(keyRanges, beginIndex, index - 1);
-                mergedRanges.add(mergedRange);
-                // start new merge
-                beginIndex = index;
-            }
-            if (Bytes.compareTo(maxStopKey, keyRange.getStopKey()) < 0) {
-                // update the stop key
-                maxStopKey = keyRange.getStopKey();
-            }
-        }
-        // merge last range
-        HBaseKeyRange mergedRange = mergeKeyRange(keyRanges, beginIndex, keyRanges.size() - 1);
-        mergedRanges.add(mergedRange);
-        if (logger.isDebugEnabled()) {
-            logger.debug("Merging key range to " + mergedRanges.size());
-        }
-        return mergedRanges;
-    }
-
-    private HBaseKeyRange mergeKeyRange(List<HBaseKeyRange> keyRanges, int from, int to) {
-        HBaseKeyRange keyRange = keyRanges.get(from);
-        int mergeSize = to - from + 1;
-        if (mergeSize > 1) {
-            // merge range from mergeHeader to i - 1
-            CubeSegment cubeSegment = keyRange.getCubeSegment();
-            Cuboid cuboid = keyRange.getCuboid();
-            byte[] startKey = keyRange.getStartKey();
-            byte[] stopKey = keyRange.getStopKey();
-            long partitionColumnStartDate = Long.MAX_VALUE;
-            long partitionColumnEndDate = 0;
-
-            TreeSet<Pair<byte[], byte[]>> newFuzzyKeys = new TreeSet<>(new Comparator<Pair<byte[], byte[]>>() {
-                @Override
-                public int compare(Pair<byte[], byte[]> o1, Pair<byte[], byte[]> o2) {
-                    int partialResult = Bytes.compareTo(o1.getFirst(), o2.getFirst());
-                    if (partialResult != 0) {
-                        return partialResult;
-                    } else {
-                        return Bytes.compareTo(o1.getSecond(), o2.getSecond());
-                    }
-                }
-            });
-            List<Collection<ColumnValueRange>> newFlatOrAndFilter = Lists.newLinkedList();
-
-            boolean hasNonFuzzyRange = false;
-            for (int k = from; k <= to; k++) {
-                HBaseKeyRange nextRange = keyRanges.get(k);
-                hasNonFuzzyRange = hasNonFuzzyRange || nextRange.getFuzzyKeys().isEmpty();
-                newFuzzyKeys.addAll(nextRange.getFuzzyKeys());
-                newFlatOrAndFilter.addAll(nextRange.getFlatOrAndFilter());
-                if (Bytes.compareTo(stopKey, nextRange.getStopKey()) < 0) {
-                    stopKey = nextRange.getStopKey();
-                }
-                if (nextRange.getPartitionColumnStartDate() > 0 && nextRange.getPartitionColumnStartDate() < partitionColumnStartDate) {
-                    partitionColumnStartDate = nextRange.getPartitionColumnStartDate();
-                }
-                if (nextRange.getPartitionColumnEndDate() < Long.MAX_VALUE && nextRange.getPartitionColumnEndDate() > partitionColumnEndDate) {
-                    partitionColumnEndDate = nextRange.getPartitionColumnEndDate();
-                }
-            }
-
-            // if any range is non-fuzzy, then all fuzzy keys must be cleared
-            if (hasNonFuzzyRange) {
-                newFuzzyKeys.clear();
-            }
-
-            partitionColumnStartDate = (partitionColumnStartDate == Long.MAX_VALUE) ? 0 : partitionColumnStartDate;
-            partitionColumnEndDate = (partitionColumnEndDate == 0) ? Long.MAX_VALUE : partitionColumnEndDate;
-            keyRange = new HBaseKeyRange(cubeSegment, cuboid, startKey, stopKey, Lists.newArrayList(newFuzzyKeys), newFlatOrAndFilter, partitionColumnStartDate, partitionColumnEndDate);
-        }
-        return keyRange;
-    }
-
-    private List<HBaseKeyRange> mergeTooManyRanges(List<HBaseKeyRange> keyRanges) {
-        if (keyRanges.size() < MERGE_KEYRANGE_THRESHOLD) {
-            return keyRanges;
-        }
-        // TODO: check the distance between range. and merge the large distance range
-        List<HBaseKeyRange> mergedRanges = new LinkedList<HBaseKeyRange>();
-        HBaseKeyRange mergedRange = mergeKeyRange(keyRanges, 0, keyRanges.size() - 1);
-        mergedRanges.add(mergedRange);
-        return mergedRanges;
-    }
-
-    private void dropUnhitSegments(List<HBaseKeyRange> scans) {
-        if (cubeDesc.getModel().getPartitionDesc().isPartitioned()) {
-            Iterator<HBaseKeyRange> iterator = scans.iterator();
-            while (iterator.hasNext()) {
-                HBaseKeyRange scan = iterator.next();
-                if (scan.hitSegment() == false) {
-                    iterator.remove();
-                }
-            }
-        }
-    }
-
-    private List<HBaseKeyRange> duplicateRangeByShard(List<HBaseKeyRange> scans) {
-        List<HBaseKeyRange> ret = Lists.newArrayList();
-
-        for (HBaseKeyRange scan : scans) {
-            CubeSegment segment = scan.getCubeSegment();
-
-            byte[] startKey = scan.getStartKey();
-            byte[] stopKey = scan.getStopKey();
-
-            short cuboidShardNum = segment.getCuboidShardNum(scan.getCuboid().getId());
-            short cuboidShardBase = segment.getCuboidBaseShard(scan.getCuboid().getId());
-            for (short i = 0; i < cuboidShardNum; ++i) {
-                short newShard = ShardingHash.normalize(cuboidShardBase, i, segment.getTotalShards(scan.getCuboid().getId()));
-                byte[] newStartKey = duplicateKeyAndChangeShard(newShard, startKey);
-                byte[] newStopKey = duplicateKeyAndChangeShard(newShard, stopKey);
-                HBaseKeyRange newRange = new HBaseKeyRange(segment, scan.getCuboid(), newStartKey, newStopKey, //
-                        scan.getFuzzyKeys(), scan.getFlatOrAndFilter(), scan.getPartitionColumnStartDate(), scan.getPartitionColumnEndDate());
-                ret.add(newRange);
-            }
-        }
-
-        Collections.sort(ret, new Comparator<HBaseKeyRange>() {
-            @Override
-            public int compare(HBaseKeyRange o1, HBaseKeyRange o2) {
-                return Bytes.compareTo(o1.getStartKey(), o2.getStartKey());
-            }
-        });
-
-        return ret;
-    }
-
-    private byte[] duplicateKeyAndChangeShard(short newShard, byte[] bytes) {
-        byte[] ret = Arrays.copyOf(bytes, bytes.length);
-        BytesUtil.writeShort(newShard, ret, 0, RowConstants.ROWKEY_SHARDID_LEN);
-        return ret;
-    }
-
-    private void setThreshold(Collection<TblColRef> dimensions, List<RowValueDecoder> valueDecoders, StorageContext context) {
-        if (RowValueDecoder.hasMemHungryMeasures(valueDecoders) == false) {
-            return;
-        }
-
-        int rowSizeEst = dimensions.size() * 3;
-        for (RowValueDecoder decoder : valueDecoders) {
-            MeasureDesc[] measures = decoder.getMeasures();
-            BitSet projectionIndex = decoder.getProjectionIndex();
-            for (int i = projectionIndex.nextSetBit(0); i >= 0; i = projectionIndex.nextSetBit(i + 1)) {
-                FunctionDesc func = measures[i].getFunction();
-                // FIXME getStorageBytesEstimate() is not appropriate as here we want size in memory (not in storage)
-                rowSizeEst += func.getReturnDataType().getStorageBytesEstimate();
-            }
-        }
-
-        long rowEst = this.cubeInstance.getConfig().getQueryMemBudget() / rowSizeEst;
-        if (rowEst > 0) {
-            logger.info("Memory budget is set to: " + rowEst);
-            context.setThreshold((int) rowEst);
-        } else {
-            logger.info("Memory budget is not set.");
-        }
-    }
-
-    private void setLimit(TupleFilter filter, StorageContext context) {
-        boolean goodAggr = context.isExactAggregation();
-        boolean goodFilter = filter == null || (TupleFilter.isEvaluableRecursively(filter) && context.isCoprocessorEnabled());
-        boolean goodSort = !context.hasSort();
-        if (goodAggr && goodFilter && goodSort) {
-            logger.info("Enable limit " + context.getLimit());
-            context.enableLimit();
-        }
-    }
-
-    private void setCoprocessor(Set<TblColRef> groupsCopD, List<RowValueDecoder> valueDecoders, StorageContext context) {
-        ObserverEnabler.enableCoprocessorIfBeneficial(cubeInstance, groupsCopD, valueDecoders, context);
-    }
-
-    private void notifyBeforeStorageQuery(SQLDigest sqlDigest) {
-
-        Map<String, List<MeasureDesc>> map = Maps.newHashMap();
-        for (MeasureDesc measure : cubeDesc.getMeasures()) {
-            MeasureType<?> measureType = measure.getFunction().getMeasureType();
-
-            String key = measureType.getClass().getCanonicalName();
-            List<MeasureDesc> temp = null;
-            if ((temp = map.get(key)) != null) {
-                temp.add(measure);
-            } else {
-                map.put(key, Lists.<MeasureDesc> newArrayList(measure));
-            }
-        }
-
-        for (List<MeasureDesc> sublist : map.values()) {
-            sublist.get(0).getFunction().getMeasureType().adjustSqlDigest(sublist, sqlDigest);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/545201f6/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeTupleConverter.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeTupleConverter.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeTupleConverter.java
deleted file mode 100644
index 64feff0..0000000
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeTupleConverter.java
+++ /dev/null
@@ -1,271 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.storage.hbase.cube.v1;
-
-import java.io.IOException;
-import java.util.BitSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.kylin.common.util.Array;
-import org.apache.kylin.common.util.Dictionary;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.kv.RowKeyDecoder;
-import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
-import org.apache.kylin.dict.lookup.LookupStringTable;
-import org.apache.kylin.measure.MeasureType;
-import org.apache.kylin.metadata.model.FunctionDesc;
-import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.tuple.Tuple;
-import org.apache.kylin.metadata.tuple.TupleInfo;
-import org.apache.kylin.storage.hbase.steps.RowValueDecoder;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-public class CubeTupleConverter {
-
-    final CubeSegment cubeSeg;
-    final Cuboid cuboid;
-    final TupleInfo tupleInfo;
-    final RowKeyDecoder rowKeyDecoder;
-    final List<RowValueDecoder> rowValueDecoders;
-    final List<IDerivedColumnFiller> derivedColFillers;
-    final int[] dimensionTupleIdx;
-    final int[][] metricsMeasureIdx;
-    final int[][] metricsTupleIdx;
-
-    final List<MeasureType<?>> measureTypes;
-
-    final List<MeasureType.IAdvMeasureFiller> advMeasureFillers;
-    final List<Pair<Integer, Integer>> advMeasureIndexInRV;//first=> which rowValueDecoders,second => metric index
-
-    public CubeTupleConverter(CubeSegment cubeSeg, Cuboid cuboid, List<RowValueDecoder> rowValueDecoders, TupleInfo tupleInfo) {
-        this.cubeSeg = cubeSeg;
-        this.cuboid = cuboid;
-        this.tupleInfo = tupleInfo;
-        this.rowKeyDecoder = new RowKeyDecoder(this.cubeSeg);
-        this.rowValueDecoders = rowValueDecoders;
-        this.derivedColFillers = Lists.newArrayList();
-
-        List<TblColRef> dimCols = cuboid.getColumns();
-
-        measureTypes = Lists.newArrayList();
-        advMeasureFillers = Lists.newArrayListWithCapacity(1);
-        advMeasureIndexInRV = Lists.newArrayListWithCapacity(1);
-
-        // pre-calculate dimension index mapping to tuple
-        dimensionTupleIdx = new int[dimCols.size()];
-        for (int i = 0; i < dimCols.size(); i++) {
-            TblColRef col = dimCols.get(i);
-            dimensionTupleIdx[i] = tupleInfo.hasColumn(col) ? tupleInfo.getColumnIndex(col) : -1;
-        }
-
-        // pre-calculate metrics index mapping to tuple
-        metricsMeasureIdx = new int[rowValueDecoders.size()][];
-        metricsTupleIdx = new int[rowValueDecoders.size()][];
-        for (int i = 0; i < rowValueDecoders.size(); i++) {
-            RowValueDecoder decoder = rowValueDecoders.get(i);
-            MeasureDesc[] measures = decoder.getMeasures();
-            BitSet selectedMeasures = decoder.getProjectionIndex();
-            metricsMeasureIdx[i] = new int[selectedMeasures.cardinality()];
-            metricsTupleIdx[i] = new int[selectedMeasures.cardinality()];
-            for (int j = 0, mi = selectedMeasures.nextSetBit(0); j < metricsMeasureIdx[i].length; j++, mi = selectedMeasures.nextSetBit(mi + 1)) {
-                FunctionDesc aggrFunc = measures[mi].getFunction();
-
-                int tupleIdx;
-                if (aggrFunc.needRewrite()) {
-                    // a rewrite metrics is identified by its rewrite field name
-                    String rewriteFieldName = aggrFunc.getRewriteFieldName();
-                    tupleIdx = tupleInfo.hasField(rewriteFieldName) ? tupleInfo.getFieldIndex(rewriteFieldName) : -1;
-                } else {
-                    // a non-rewrite metrics (like sum, or dimension playing as metrics) is like a dimension column
-                    TblColRef col = aggrFunc.getParameter().getColRefs().get(0);
-                    tupleIdx = tupleInfo.hasColumn(col) ? tupleInfo.getColumnIndex(col) : -1;
-                }
-                metricsMeasureIdx[i][j] = mi;
-                metricsTupleIdx[i][j] = tupleIdx;
-
-                MeasureType<?> measureType = aggrFunc.getMeasureType();
-                if (measureType.needAdvancedTupleFilling()) {
-                    Map<TblColRef, Dictionary<String>> dictionaryMap = buildDictionaryMap(measureType.getColumnsNeedDictionary(aggrFunc));
-                    advMeasureFillers.add(measureType.getAdvancedTupleFiller(aggrFunc, tupleInfo, dictionaryMap));
-                    advMeasureIndexInRV.add(Pair.newPair(i, mi));
-                    measureTypes.add(null);
-                } else {
-                    measureTypes.add(measureType);
-                }
-            }
-        }
-
-        // prepare derived columns and filler
-        Map<Array<TblColRef>, List<DeriveInfo>> hostToDerivedInfo = cuboid.getCubeDesc().getHostToDerivedInfo(dimCols, null);
-        for (Entry<Array<TblColRef>, List<DeriveInfo>> entry : hostToDerivedInfo.entrySet()) {
-            TblColRef[] hostCols = entry.getKey().data;
-            for (DeriveInfo deriveInfo : entry.getValue()) {
-                IDerivedColumnFiller filler = newDerivedColumnFiller(hostCols, deriveInfo);
-                if (filler != null) {
-                    derivedColFillers.add(filler);
-                }
-            }
-        }
-    }
-
-    // load only needed dictionaries
-    private Map<TblColRef, Dictionary<String>> buildDictionaryMap(List<TblColRef> columnsNeedDictionary) {
-        Map<TblColRef, Dictionary<String>> result = Maps.newHashMap();
-        for (TblColRef col : columnsNeedDictionary) {
-            result.put(col, cubeSeg.getDictionary(col));
-        }
-        return result;
-    }
-
-    public List<MeasureType.IAdvMeasureFiller> translateResult(Result hbaseRow, Tuple tuple) {
-        try {
-            byte[] rowkey = hbaseRow.getRow();
-            rowKeyDecoder.decode(rowkey);
-        } catch (IOException ex) {
-            throw new RuntimeException("Cannot translate hbase result " + hbaseRow);
-        }
-
-        // dimensions
-        List<String> dimensionValues = rowKeyDecoder.getValues();
-        for (int i = 0; i < dimensionValues.size(); i++) {
-            int tupleIdx = dimensionTupleIdx[i];
-            if (tupleIdx >= 0) {
-                tuple.setDimensionValue(tupleIdx, dimensionValues.get(i));
-            }
-        }
-
-        // derived
-        for (IDerivedColumnFiller filler : derivedColFillers) {
-            filler.fillDerivedColumns(dimensionValues, tuple);
-        }
-
-        // measures
-        int index = 0;
-        for (int i = 0; i < rowValueDecoders.size(); i++) {
-            RowValueDecoder rowValueDecoder = rowValueDecoders.get(i);
-            rowValueDecoder.decodeAndConvertJavaObj(hbaseRow);
-            Object[] measureValues = rowValueDecoder.getValues();
-
-            int[] measureIdx = metricsMeasureIdx[i];
-            int[] tupleIdx = metricsTupleIdx[i];
-            for (int j = 0; j < measureIdx.length; j++) {
-                if (measureTypes.get(index++) != null) {
-                    tuple.setMeasureValue(tupleIdx[j], measureValues[measureIdx[j]]);
-                }
-            }
-        }
-
-        // advanced measure filling, due to possible row split, will complete at caller side
-        if (advMeasureFillers.isEmpty()) {
-            return null;
-        } else {
-            for (int i = 0; i < advMeasureFillers.size(); i++) {
-                Pair<Integer, Integer> metricLocation = advMeasureIndexInRV.get(i);
-                Object measureValue = rowValueDecoders.get(metricLocation.getFirst()).getValues()[metricLocation.getSecond()];
-                advMeasureFillers.get(i).reload(measureValue);
-            }
-            return advMeasureFillers;
-        }
-    }
-
-    private interface IDerivedColumnFiller {
-        public void fillDerivedColumns(List<String> rowValues, Tuple tuple);
-    }
-
-    private IDerivedColumnFiller newDerivedColumnFiller(TblColRef[] hostCols, final DeriveInfo deriveInfo) {
-        List<TblColRef> rowColumns = cuboid.getColumns();
-
-        final int[] hostColIdx = new int[hostCols.length];
-        for (int i = 0; i < hostCols.length; i++) {
-            hostColIdx[i] = rowColumns.indexOf(hostCols[i]);
-        }
-
-        boolean needCopyDerived = false;
-        final int[] derivedTupleIdx = new int[deriveInfo.columns.length];
-        for (int i = 0; i < deriveInfo.columns.length; i++) {
-            TblColRef col = deriveInfo.columns[i];
-            derivedTupleIdx[i] = tupleInfo.hasColumn(col) ? tupleInfo.getColumnIndex(col) : -1;
-            needCopyDerived = needCopyDerived || derivedTupleIdx[i] >= 0;
-        }
-
-        if (needCopyDerived == false)
-            return null;
-
-        switch (deriveInfo.type) {
-        case LOOKUP:
-            return new IDerivedColumnFiller() {
-                CubeManager cubeMgr = CubeManager.getInstance(cubeSeg.getCubeInstance().getConfig());
-                LookupStringTable lookupTable = cubeMgr.getLookupTable(cubeSeg, deriveInfo.join);
-                int[] derivedColIdx = initDerivedColIdx();
-                Array<String> lookupKey = new Array<String>(new String[hostColIdx.length]);
-
-                private int[] initDerivedColIdx() {
-                    int[] idx = new int[deriveInfo.columns.length];
-                    for (int i = 0; i < idx.length; i++) {
-                        idx[i] = deriveInfo.columns[i].getColumnDesc().getZeroBasedIndex();
-                    }
-                    return idx;
-                }
-
-                @Override
-                public void fillDerivedColumns(List<String> rowValues, Tuple tuple) {
-                    for (int i = 0; i < hostColIdx.length; i++) {
-                        lookupKey.data[i] = rowValues.get(hostColIdx[i]);
-                    }
-
-                    String[] lookupRow = lookupTable.getRow(lookupKey);
-
-                    if (lookupRow != null) {
-                        for (int i = 0; i < derivedTupleIdx.length; i++) {
-                            if (derivedTupleIdx[i] >= 0) {
-                                String value = lookupRow[derivedColIdx[i]];
-                                tuple.setDimensionValue(derivedTupleIdx[i], value);
-                            }
-                        }
-                    } else {
-                        for (int i = 0; i < derivedTupleIdx.length; i++) {
-                            if (derivedTupleIdx[i] >= 0) {
-                                tuple.setDimensionValue(derivedTupleIdx[i], null);
-                            }
-                        }
-                    }
-                }
-            };
-        case PK_FK:
-            return new IDerivedColumnFiller() {
-                @Override
-                public void fillDerivedColumns(List<String> rowValues, Tuple tuple) {
-                    // composite keys are split, so only copy [0] is enough, see CubeDesc.initDimensionColumns()
-                    tuple.setDimensionValue(derivedTupleIdx[0], rowValues.get(hostColIdx[0]));
-                }
-            };
-        default:
-            throw new IllegalArgumentException();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/545201f6/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/RegionScannerAdapter.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/RegionScannerAdapter.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/RegionScannerAdapter.java
deleted file mode 100644
index 8a20c65..0000000
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/RegionScannerAdapter.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.storage.hbase.cube.v1;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.regionserver.RegionScanner;
-
-/**
- * @author yangli9
- * 
- */
-public class RegionScannerAdapter implements RegionScanner {
-
-    private ResultScanner scanner;
-
-    public RegionScannerAdapter(ResultScanner scanner) {
-        this.scanner = scanner;
-    }
-
-    @Override
-    public boolean next(List<Cell> results) throws IOException {
-        Result result = scanner.next();
-        if (result == null) // EOF
-            return false;
-
-        results.addAll(result.listCells());
-        return true;
-    }
-
-    @Override
-    public boolean next(List<Cell> result, int limit) throws IOException {
-        return next(result);
-    }
-
-    @Override
-    public boolean nextRaw(List<Cell> result) throws IOException {
-        return next(result);
-    }
-
-    @Override
-    public boolean nextRaw(List<Cell> result, int limit) throws IOException {
-        return next(result);
-    }
-
-    @Override
-    public void close() throws IOException {
-        scanner.close();
-    }
-
-    @Override
-    public HRegionInfo getRegionInfo() {
-        return null;
-    }
-
-    @Override
-    public long getMaxResultSize() {
-        return Long.MAX_VALUE;
-    }
-
-    @Override
-    public boolean isFilterDone() throws IOException {
-        return false;
-    }
-
-    @Override
-    public boolean reseek(byte[] row) throws IOException {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public long getMvccReadPoint() {
-        return Long.MAX_VALUE;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/545201f6/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/ResultScannerAdapter.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/ResultScannerAdapter.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/ResultScannerAdapter.java
deleted file mode 100644
index 99058e7..0000000
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/ResultScannerAdapter.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.storage.hbase.cube.v1;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.regionserver.RegionScanner;
-
-import com.google.common.collect.Lists;
-
-/**
- * @author yangli9
- * 
- */
-public class ResultScannerAdapter implements ResultScanner {
-
-    private RegionScanner scanner;
-
-    public ResultScannerAdapter(RegionScanner scanner) {
-        this.scanner = scanner;
-    }
-
-    @Override
-    public Iterator<Result> iterator() {
-        return new Iterator<Result>() {
-
-            Result next = null;
-
-            @Override
-            public boolean hasNext() {
-                if (next == null) {
-                    try {
-                        next = ResultScannerAdapter.this.next();
-                    } catch (IOException e) {
-                        throw new RuntimeException(e);
-                    }
-                }
-                return next != null;
-            }
-
-            @Override
-            public Result next() {
-                Result r = next;
-                next = null;
-                return r;
-            }
-
-            @Override
-            public void remove() {
-                throw new UnsupportedOperationException();
-            }
-        };
-    }
-
-    @Override
-    public Result next() throws IOException {
-        List<Cell> cells = Lists.newArrayList();
-        scanner.next(cells);
-        if (cells.isEmpty())
-            return null;
-        else
-            return Result.create(cells);
-    }
-
-    @Override
-    public Result[] next(int nbRows) throws IOException {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void close() {
-        try {
-            scanner.close();
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/545201f6/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/SerializedHBaseTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/SerializedHBaseTupleIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/SerializedHBaseTupleIterator.java
deleted file mode 100644
index e8dd5b9..0000000
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/SerializedHBaseTupleIterator.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.storage.hbase.cube.v1;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.Set;
-
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.metadata.filter.TupleFilter;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.tuple.ITuple;
-import org.apache.kylin.metadata.tuple.ITupleIterator;
-import org.apache.kylin.metadata.tuple.TupleInfo;
-import org.apache.kylin.storage.StorageContext;
-import org.apache.kylin.storage.exception.ScanOutOfLimitException;
-import org.apache.kylin.storage.hbase.steps.RowValueDecoder;
-import org.apache.kylin.storage.translate.HBaseKeyRange;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-/**
- * @author xjiang
- */
-public class SerializedHBaseTupleIterator implements ITupleIterator {
-
-    private static final int PARTIAL_DEFAULT_LIMIT = 10000;
-
-    private final StorageContext context;
-    private final int partialResultLimit;
-    private final List<CubeSegmentTupleIterator> segmentIteratorList;
-    private final Iterator<CubeSegmentTupleIterator> segmentIteratorIterator;
-
-    private ITupleIterator segmentIterator;
-    private int scanCount;
-    private ITuple next;
-
-    public SerializedHBaseTupleIterator(HConnection conn, List<HBaseKeyRange> segmentKeyRanges, CubeInstance cube, //
-            Set<TblColRef> dimensions, TupleFilter filter, Set<TblColRef> groupBy, List<RowValueDecoder> rowValueDecoders, //
-            StorageContext context, TupleInfo returnTupleInfo) {
-
-        this.context = context;
-        int limit = context.getLimit();
-        this.partialResultLimit = Math.max(limit, PARTIAL_DEFAULT_LIMIT);
-
-        this.segmentIteratorList = new ArrayList<CubeSegmentTupleIterator>(segmentKeyRanges.size());
-        Map<CubeSegment, List<HBaseKeyRange>> rangesMap = makeRangesMap(segmentKeyRanges);
-
-        for (Map.Entry<CubeSegment, List<HBaseKeyRange>> entry : rangesMap.entrySet()) {
-            CubeSegmentTupleIterator it = new CubeSegmentTupleIterator(entry.getKey(), entry.getValue(), conn, dimensions, filter, groupBy, rowValueDecoders, context, returnTupleInfo);
-            this.segmentIteratorList.add(it);
-        }
-
-        this.segmentIteratorIterator = this.segmentIteratorList.iterator();
-        if (this.segmentIteratorIterator.hasNext()) {
-            this.segmentIterator = this.segmentIteratorIterator.next();
-        } else {
-            this.segmentIterator = ITupleIterator.EMPTY_TUPLE_ITERATOR;
-        }
-    }
-
-    private Map<CubeSegment, List<HBaseKeyRange>> makeRangesMap(List<HBaseKeyRange> segmentKeyRanges) {
-        Map<CubeSegment, List<HBaseKeyRange>> map = Maps.newHashMap();
-        for (HBaseKeyRange range : segmentKeyRanges) {
-            List<HBaseKeyRange> list = map.get(range.getCubeSegment());
-            if (list == null) {
-                list = Lists.newArrayList();
-                map.put(range.getCubeSegment(), list);
-            }
-            list.add(range);
-        }
-        return map;
-    }
-
-    @Override
-    public boolean hasNext() {
-        if (next != null)
-            return true;
-
-        // 1. check limit
-        if (context.isLimitEnabled() && scanCount >= context.getLimit() + context.getOffset()) {
-            return false;
-        }
-        // 2. check partial result
-        if (context.isAcceptPartialResult() && scanCount > partialResultLimit) {
-            context.setPartialResultReturned(true);
-            return false;
-        }
-        // 3. check threshold
-        if (scanCount >= context.getThreshold()) {
-            throw new ScanOutOfLimitException("Scan row count exceeded threshold: " + context.getThreshold() + ", please add filter condition to narrow down backend scan range, like where clause.");
-        }
-        // 4. check cube segments
-        if (segmentIterator.hasNext()) {
-            next = segmentIterator.next();
-            scanCount++;
-            return true;
-        } else if (segmentIteratorIterator.hasNext()) {
-            segmentIterator.close();
-            segmentIterator = segmentIteratorIterator.next();
-            return hasNext();
-        }
-        return false;
-    }
-
-    @Override
-    public ITuple next() {
-        if (next == null) {
-            hasNext();
-            if (next == null)
-                throw new NoSuchElementException();
-        }
-        ITuple r = next;
-        next = null;
-        return r;
-    }
-
-    @Override
-    public void remove() {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void close() {
-        // hasNext() loop may exit because of limit, threshold, etc.
-        // close all the remaining segmentIterator
-        segmentIterator.close();
-        while (segmentIteratorIterator.hasNext()) {
-            segmentIterator = segmentIteratorIterator.next();
-            segmentIterator.close();
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/545201f6/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserver.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserver.java
deleted file mode 100644
index 7139ca7..0000000
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserver.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer;
-
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
-import org.apache.hadoop.hbase.coprocessor.ObserverContext;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
-import org.apache.hadoop.hbase.regionserver.RegionScanner;
-import org.apache.kylin.gridtable.StorageSideBehavior;
-import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorFilter;
-import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorProjector;
-import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorRowType;
-
-/**
- * @author yangli9
- */
-public class AggregateRegionObserver extends BaseRegionObserver {
-
-    // HBase uses common logging (vs. Kylin uses slf4j)
-    static final Log LOG = LogFactory.getLog(AggregateRegionObserver.class);
-
-    static final String COPROCESSOR_ENABLE = "_Coprocessor_Enable";
-    static final String TYPE = "_Type";
-    static final String PROJECTOR = "_Projector";
-    static final String AGGREGATORS = "_Aggregators";
-    static final String FILTER = "_Filter";
-    static final String BEHAVIOR = "_Behavior";
-
-    @Override
-    public final RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> ctxt, final Scan scan, final RegionScanner innerScanner) throws IOException {
-
-        boolean copAbortOnError = ctxt.getEnvironment().getConfiguration().getBoolean(RegionCoprocessorHost.ABORT_ON_ERROR_KEY, RegionCoprocessorHost.DEFAULT_ABORT_ON_ERROR);
-
-        // never throw out exception that could abort region server
-        if (copAbortOnError) {
-            try {
-                return doPostScannerObserver(ctxt, scan, innerScanner);
-            } catch (Throwable e) {
-                LOG.error("Kylin Coprocessor Error", e);
-                return innerScanner;
-            }
-        } else {
-            return doPostScannerObserver(ctxt, scan, innerScanner);
-        }
-    }
-
-    private RegionScanner doPostScannerObserver(final ObserverContext<RegionCoprocessorEnvironment> ctxt, final Scan scan, final RegionScanner innerScanner) throws IOException {
-        byte[] coprocessorEnableBytes = scan.getAttribute(COPROCESSOR_ENABLE);
-        if (coprocessorEnableBytes == null || coprocessorEnableBytes.length == 0 || coprocessorEnableBytes[0] == 0) {
-            return innerScanner;
-        }
-
-        byte[] typeBytes = scan.getAttribute(TYPE);
-        CoprocessorRowType type = CoprocessorRowType.deserialize(typeBytes);
-
-        byte[] projectorBytes = scan.getAttribute(PROJECTOR);
-        CoprocessorProjector projector = CoprocessorProjector.deserialize(projectorBytes);
-
-        byte[] aggregatorBytes = scan.getAttribute(AGGREGATORS);
-        ObserverAggregators aggregators = ObserverAggregators.deserialize(aggregatorBytes);
-
-        byte[] filterBytes = scan.getAttribute(FILTER);
-        CoprocessorFilter filter = CoprocessorFilter.deserialize(filterBytes);
-
-        StorageSideBehavior storageSideBehavior = StorageSideBehavior.SCAN_FILTER_AGGR_CHECKMEM;
-        try {
-            byte[] behavior = scan.getAttribute(BEHAVIOR);
-            if (behavior != null && behavior.length != 0) {
-                storageSideBehavior = StorageSideBehavior.valueOf(new String(behavior));
-            }
-        } catch (Exception e) {
-            LOG.error("failed to parse behavior,using default behavior SCAN_FILTER_AGGR_CHECKMEM", e);
-            storageSideBehavior = StorageSideBehavior.SCAN_FILTER_AGGR_CHECKMEM;
-        }
-
-        // start/end region operation & sync on scanner is suggested by the
-        // javadoc of RegionScanner.nextRaw()
-        // FIXME: will the lock still work when a iterator is returned? is it safe? Is readonly attribute helping here? by mhb
-        HRegion region = ctxt.getEnvironment().getRegion();
-        region.startRegionOperation();
-        try {
-            synchronized (innerScanner) {
-                return new AggregationScanner(type, filter, projector, aggregators, innerScanner, storageSideBehavior);
-            }
-        } finally {
-            region.closeRegionOperation();
-        }
-    }
-}