You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2016/07/08 07:15:17 UTC

[4/8] kylin git commit: KYLIN-1858 remove all ii related code

http://git-wip-us.apache.org/repos/asf/kylin/blob/2cc0b9c4/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHFileMapper.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHFileMapper.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHFileMapper.java
deleted file mode 100644
index 545d058..0000000
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHFileMapper.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.storage.hbase.ii;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValue.Type;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.kylin.engine.mr.KylinMapper;
-import org.apache.kylin.invertedindex.model.IIDesc;
-
-/**
- * @author yangli9
- */
-public class IICreateHFileMapper extends KylinMapper<ImmutableBytesWritable, ImmutableBytesWritable, ImmutableBytesWritable, KeyValue> {
-
-    long timestamp;
-
-    @Override
-    protected void setup(Context context) throws IOException, InterruptedException {
-        super.bindCurrentConfiguration(context.getConfiguration());
-
-        timestamp = System.currentTimeMillis();
-    }
-
-    @Override
-    protected void map(ImmutableBytesWritable key, ImmutableBytesWritable value, Context context) throws IOException, InterruptedException {
-
-        ByteBuffer buffer = ByteBuffer.wrap(value.get(), value.getOffset(), value.getLength());
-        int totalLength = value.getLength();
-        int valueLength = buffer.getInt();
-        int dictionaryLength = totalLength - valueLength - 4;
-        KeyValue kv = new KeyValue(key.get(), key.getOffset(), key.getLength(), //
-                IIDesc.HBASE_FAMILY_BYTES, 0, IIDesc.HBASE_FAMILY_BYTES.length, //
-                IIDesc.HBASE_QUALIFIER_BYTES, 0, IIDesc.HBASE_QUALIFIER_BYTES.length, //
-                timestamp, Type.Put, //
-                buffer.array(), buffer.position(), valueLength);
-
-        // write value
-        context.write(key, kv);
-
-        kv = new KeyValue(key.get(), key.getOffset(), key.getLength(), //
-                IIDesc.HBASE_FAMILY_BYTES, 0, IIDesc.HBASE_FAMILY_BYTES.length, //
-                IIDesc.HBASE_DICTIONARY_BYTES, 0, IIDesc.HBASE_DICTIONARY_BYTES.length, //
-                timestamp, Type.Put, //
-                buffer.array(), buffer.position() + valueLength, dictionaryLength);
-
-        // write dictionary
-        context.write(key, kv);
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/2cc0b9c4/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHTableJob.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHTableJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHTableJob.java
deleted file mode 100644
index 8099276..0000000
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHTableJob.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.storage.hbase.ii;
-
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.io.compress.Compression;
-import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
-import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
-import org.apache.hadoop.hbase.security.User;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IIManager;
-import org.apache.kylin.invertedindex.model.IIDesc;
-import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
-import org.apache.kylin.metadata.realization.IRealizationConstants;
-import org.apache.kylin.storage.hbase.util.IIDeployCoprocessorCLI;
-
-/**
- * @author George Song (ysong1)
- */
-public class IICreateHTableJob extends AbstractHadoopJob {
-
-    @Override
-    public int run(String[] args) throws Exception {
-        Options options = new Options();
-        HBaseAdmin admin = null;
-        try {
-            options.addOption(OPTION_II_NAME);
-            options.addOption(OPTION_HTABLE_NAME);
-            parseOptions(options, args);
-
-            String tableName = getOptionValue(OPTION_HTABLE_NAME);
-            String iiName = getOptionValue(OPTION_II_NAME);
-
-            KylinConfig config = KylinConfig.getInstanceFromEnv();
-            IIManager iiManager = IIManager.getInstance(config);
-            IIInstance ii = iiManager.getII(iiName);
-            int sharding = ii.getDescriptor().getSharding();
-
-            Configuration conf = HBaseConfiguration.create(getConf());
-            // check if the table already exists
-            admin = new HBaseAdmin(conf);
-            if (admin.tableExists(tableName)) {
-                if (admin.isTableEnabled(tableName)) {
-                    logger.info("Table " + tableName + " already exists and is enabled, no need to create.");
-                    return 0;
-                } else {
-                    logger.error("Table " + tableName + " is disabled, couldn't append data");
-                    return 1;
-                }
-            }
-
-            // table doesn't exist, need to create
-
-            HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName));
-            HColumnDescriptor cf = new HColumnDescriptor(IIDesc.HBASE_FAMILY);
-            cf.setMaxVersions(1);
-
-            String hbaseDefaultCC = config.getHbaseDefaultCompressionCodec().toLowerCase();
-
-            switch (hbaseDefaultCC) {
-            case "snappy": {
-                logger.info("hbase will use snappy to compress data");
-                cf.setCompressionType(Compression.Algorithm.SNAPPY);
-                break;
-            }
-            case "lzo": {
-                logger.info("hbase will use lzo to compress data");
-                cf.setCompressionType(Compression.Algorithm.LZO);
-                break;
-            }
-            case "gz":
-            case "gzip": {
-                logger.info("hbase will use gzip to compress data");
-                cf.setCompressionType(Compression.Algorithm.GZ);
-                break;
-            }
-            case "lz4": {
-                logger.info("hbase will use lz4 to compress data");
-                cf.setCompressionType(Compression.Algorithm.LZ4);
-                break;
-            }
-            default: {
-                logger.info("hbase will not user any compression codec to compress data");
-            }
-            }
-
-            cf.setDataBlockEncoding(DataBlockEncoding.FAST_DIFF);
-            tableDesc.addFamily(cf);
-            tableDesc.setValue(IRealizationConstants.HTableTag, config.getMetadataUrlPrefix());
-            tableDesc.setValue(IRealizationConstants.HTableCreationTime, String.valueOf(System.currentTimeMillis()));
-            tableDesc.setValue(HTableDescriptor.SPLIT_POLICY, DisabledRegionSplitPolicy.class.getName());
-
-            if (User.isHBaseSecurityEnabled(conf)) {
-                // add coprocessor for bulk load
-                tableDesc.addCoprocessor("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint");
-            }
-
-            IIDeployCoprocessorCLI.deployCoprocessor(tableDesc);
-
-            // create table
-            byte[][] splitKeys = getSplits(sharding);
-            if (splitKeys.length == 0)
-                splitKeys = null;
-            admin.createTable(tableDesc, splitKeys);
-            if (splitKeys != null) {
-                for (int i = 0; i < splitKeys.length; i++) {
-                    logger.info("split key " + i + ": " + BytesUtil.toHex(splitKeys[i]));
-                }
-            }
-            logger.info("create hbase table " + tableName + " done.");
-
-            return 0;
-        } catch (Exception e) {
-            printUsage(options);
-            throw e;
-        } finally {
-            if (admin != null)
-                admin.close();
-        }
-    }
-
-    //one region for one shard
-    private byte[][] getSplits(int shard) {
-        byte[][] result = new byte[shard - 1][];
-        for (int i = 1; i < shard; ++i) {
-            byte[] split = new byte[IIKeyValueCodec.SHARD_LEN];
-            BytesUtil.writeUnsigned(i, split, 0, IIKeyValueCodec.SHARD_LEN);
-            result[i - 1] = split;
-        }
-        return result;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/2cc0b9c4/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/InvertedIndexStorageQuery.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/InvertedIndexStorageQuery.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/InvertedIndexStorageQuery.java
deleted file mode 100644
index fef9662..0000000
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/InvertedIndexStorageQuery.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.storage.hbase.ii;
-
-import java.util.ArrayList;
-
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IISegment;
-import org.apache.kylin.metadata.realization.SQLDigest;
-import org.apache.kylin.metadata.tuple.ITupleIterator;
-import org.apache.kylin.metadata.tuple.TupleInfo;
-import org.apache.kylin.storage.IStorageQuery;
-import org.apache.kylin.storage.StorageContext;
-import org.apache.kylin.storage.hbase.HBaseConnection;
-import org.apache.kylin.storage.hbase.ii.coprocessor.endpoint.EndpointTupleIterator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author yangli9
- */
-public class InvertedIndexStorageQuery implements IStorageQuery {
-
-    private static Logger logger = LoggerFactory.getLogger(InvertedIndexStorageQuery.class);
-
-    private IISegment seg;
-    private String uuid;
-    private EndpointTupleIterator dataIterator;
-
-    public InvertedIndexStorageQuery(IIInstance ii) {
-        this.seg = ii.getFirstSegment();
-        this.uuid = ii.getUuid();
-    }
-
-    @Override
-    public ITupleIterator search(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) {
-        String tableName = seg.getStorageLocationIdentifier();
-
-        //HConnection is cached, so need not be closed
-        HConnection conn = HBaseConnection.get(context.getConnUrl());
-        try {
-            dataIterator = new EndpointTupleIterator(seg, sqlDigest.filter, sqlDigest.groupbyColumns, new ArrayList<>(sqlDigest.aggregations), context, conn, returnTupleInfo);
-            return dataIterator;
-        } catch (Throwable e) {
-            logger.error("Error when connecting to II htable " + tableName, e);
-            throw new IllegalStateException("Error when connecting to II htable " + tableName, e);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/2cc0b9c4/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/BitMapFilterEvaluator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/BitMapFilterEvaluator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/BitMapFilterEvaluator.java
deleted file mode 100644
index 1f024fe..0000000
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/BitMapFilterEvaluator.java
+++ /dev/null
@@ -1,249 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.storage.hbase.ii.coprocessor.endpoint;
-
-import java.util.List;
-
-import org.apache.kylin.common.util.Dictionary;
-import org.apache.kylin.metadata.filter.CompareTupleFilter;
-import org.apache.kylin.metadata.filter.ConstantTupleFilter;
-import org.apache.kylin.metadata.filter.LogicalTupleFilter;
-import org.apache.kylin.metadata.filter.TupleFilter;
-import org.apache.kylin.metadata.model.TblColRef;
-
-import it.uniroma3.mat.extendedset.intset.ConciseSet;
-
-/**
- * @author yangli9
- *
- * Evaluate a group of records against a filter in batch.
- */
-public class BitMapFilterEvaluator {
-
-    /** Provides bitmaps for a record group ranging [0..N-1], where N is the size of the group */
-    public static interface BitMapProvider {
-
-        /** return records whose specified column having specified value */
-        ConciseSet getBitMap(TblColRef col, Integer startId, Integer endId);
-
-        /** return the size of the group */
-        int getRecordCount();
-
-        /** return the max value ID of a column according to dictionary */
-        int getMaxValueId(TblColRef col);
-    }
-
-    BitMapProvider provider;
-
-    public BitMapFilterEvaluator(BitMapProvider bitMapProvider) {
-        this.provider = bitMapProvider;
-    }
-
-    /**
-     * @param filter
-     * @return a set of records that match the filter; or null if filter is null or unable to evaluate
-     */
-    public ConciseSet evaluate(TupleFilter filter) {
-        if (filter == null)
-            return null;
-
-        if (filter instanceof LogicalTupleFilter)
-            return evalLogical((LogicalTupleFilter) filter);
-
-        if (filter instanceof CompareTupleFilter)
-            return evalCompare((CompareTupleFilter) filter);
-
-        if (filter instanceof ConstantTupleFilter) {
-            if (!filter.evaluate(null, null)) {
-                return new ConciseSet();
-            }
-        }
-
-        return null; // unable to evaluate
-    }
-
-    private ConciseSet evalCompare(CompareTupleFilter filter) {
-        switch (filter.getOperator()) {
-        case ISNULL:
-            return evalCompareIsNull(filter);
-        case ISNOTNULL:
-            return evalCompareIsNotNull(filter);
-        case EQ:
-            return evalCompareEqual(filter);
-        case NEQ:
-            return evalCompareNotEqual(filter);
-        case IN:
-            return evalCompareIn(filter);
-        case NOTIN:
-            return evalCompareNotIn(filter);
-        case LT:
-            return evalCompareLT(filter);
-        case LTE:
-            return evalCompareLTE(filter);
-        case GT:
-            return evalCompareGT(filter);
-        case GTE:
-            return evalCompareGTE(filter);
-        default:
-            throw new IllegalStateException("Unsupported operator " + filter.getOperator());
-        }
-    }
-
-    private ConciseSet evalCompareLT(CompareTupleFilter filter) {
-        int id = Dictionary.stringToDictId((String) filter.getFirstValue());
-        return collectRange(filter.getColumn(), null, id - 1);
-    }
-
-    private ConciseSet evalCompareLTE(CompareTupleFilter filter) {
-        int id = Dictionary.stringToDictId((String) filter.getFirstValue());
-        return collectRange(filter.getColumn(), null, id);
-    }
-
-    private ConciseSet evalCompareGT(CompareTupleFilter filter) {
-        int id = Dictionary.stringToDictId((String) filter.getFirstValue());
-        return collectRange(filter.getColumn(), id + 1, null);
-    }
-
-    private ConciseSet evalCompareGTE(CompareTupleFilter filter) {
-        int id = Dictionary.stringToDictId((String) filter.getFirstValue());
-        return collectRange(filter.getColumn(), id, null);
-    }
-
-    private ConciseSet collectRange(TblColRef column, Integer startId, Integer endId) {
-        return provider.getBitMap(column, startId, endId);
-    }
-
-    private ConciseSet evalCompareEqual(CompareTupleFilter filter) {
-        int id = Dictionary.stringToDictId((String) filter.getFirstValue());
-        ConciseSet bitMap = provider.getBitMap(filter.getColumn(), id, id);
-        if (bitMap == null)
-            return null;
-        return bitMap.clone(); // NOTE the clone() to void messing provider's cache
-    }
-
-    private ConciseSet evalCompareNotEqual(CompareTupleFilter filter) {
-        ConciseSet set = evalCompareEqual(filter);
-        not(set);
-        dropNull(set, filter);
-        return set;
-    }
-
-    private ConciseSet evalCompareIn(CompareTupleFilter filter) {
-        ConciseSet set = new ConciseSet();
-        for (Object value : filter.getValues()) {
-            int id = Dictionary.stringToDictId((String) value);
-            ConciseSet bitMap = provider.getBitMap(filter.getColumn(), id, id);
-            if (bitMap == null)
-                return null;
-            set.addAll(bitMap);
-        }
-        return set;
-    }
-
-    private ConciseSet evalCompareNotIn(CompareTupleFilter filter) {
-        ConciseSet set = evalCompareIn(filter);
-        not(set);
-        dropNull(set, filter);
-        return set;
-    }
-
-    private void dropNull(ConciseSet set, CompareTupleFilter filter) {
-        if (set == null)
-            return;
-
-        ConciseSet nullSet = evalCompareIsNull(filter);
-        set.removeAll(nullSet);
-    }
-
-    private ConciseSet evalCompareIsNull(CompareTupleFilter filter) {
-        ConciseSet bitMap = provider.getBitMap(filter.getColumn(), null, null);
-        if (bitMap == null)
-            return null;
-        return bitMap.clone(); // NOTE the clone() to void messing provider's cache
-    }
-
-    private ConciseSet evalCompareIsNotNull(CompareTupleFilter filter) {
-        ConciseSet set = evalCompareIsNull(filter);
-        not(set);
-        return set;
-    }
-
-    private ConciseSet evalLogical(LogicalTupleFilter filter) {
-        List<? extends TupleFilter> children = filter.getChildren();
-
-        switch (filter.getOperator()) {
-        case AND:
-            return evalLogicalAnd(children);
-        case OR:
-            return evalLogicalOr(children);
-        case NOT:
-            return evalLogicalNot(children);
-        default:
-            throw new IllegalStateException("Unsupported operator " + filter.getOperator());
-        }
-    }
-
-    private ConciseSet evalLogicalAnd(List<? extends TupleFilter> children) {
-        ConciseSet set = new ConciseSet();
-        not(set);
-
-        for (TupleFilter c : children) {
-            ConciseSet t = evaluate(c);
-            if (t == null)
-                continue; // because it's AND
-
-            set.retainAll(t);
-        }
-        return set;
-    }
-
-    private ConciseSet evalLogicalOr(List<? extends TupleFilter> children) {
-        ConciseSet set = new ConciseSet();
-
-        for (TupleFilter c : children) {
-            ConciseSet t = evaluate(c);
-            if (t == null)
-                return null; // because it's OR
-
-            set.addAll(t);
-        }
-        return set;
-    }
-
-    private ConciseSet evalLogicalNot(List<? extends TupleFilter> children) {
-        ConciseSet set = evaluate(children.get(0));
-        not(set);
-        return set;
-    }
-
-    private void not(ConciseSet set) {
-        if (set == null)
-            return;
-
-        set.add(provider.getRecordCount());
-        set.complement();
-    }
-
-    public static void main(String[] args) {
-        ConciseSet s = new ConciseSet();
-        s.add(5);
-        s.complement();
-        System.out.println(s);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/2cc0b9c4/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/ClearTextDictionary.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/ClearTextDictionary.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/ClearTextDictionary.java
deleted file mode 100644
index 9039165..0000000
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/ClearTextDictionary.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.storage.hbase.ii.coprocessor.endpoint;
-
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.kylin.common.util.Dictionary;
-import org.apache.kylin.dimension.DimensionEncoding;
-import org.apache.kylin.dimension.FixedLenDimEnc;
-import org.apache.kylin.dimension.IDimensionEncodingMap;
-import org.apache.kylin.invertedindex.index.TableRecordInfo;
-import org.apache.kylin.invertedindex.index.TableRecordInfoDigest;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorRowType;
-
-import com.google.common.collect.Maps;
-
-/**
- */
-public class ClearTextDictionary implements IDimensionEncodingMap {
-
-    private final Map<TblColRef, DimensionEncoding> encMap;
-
-    public ClearTextDictionary(TableRecordInfoDigest digest, CoprocessorRowType coprocessorRowType) {
-        encMap = Maps.newHashMap();
-        for (Entry<TblColRef, Integer> entry : coprocessorRowType.columnIdxMap.entrySet()) {
-            encMap.put(entry.getKey(), new FixedLenDimEnc(digest.length(entry.getValue())));
-        }
-    }
-
-    public ClearTextDictionary(TableRecordInfo tableRecordInfo) {
-        encMap = Maps.newHashMap();
-        TableRecordInfoDigest digest = tableRecordInfo.getDigest();
-        for (int i = 0; i < tableRecordInfo.getColumns().size(); i++) {
-            encMap.put(tableRecordInfo.getColumns().get(i), new FixedLenDimEnc(digest.length(i)));
-        }
-    }
-
-    @Override
-    public DimensionEncoding get(TblColRef col) {
-        return encMap.get(col);
-    }
-
-    @Override
-    public Dictionary<String> getDictionary(TblColRef col) {
-        return null;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/2cc0b9c4/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregationCache.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregationCache.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregationCache.java
deleted file mode 100644
index affb284..0000000
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregationCache.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.storage.hbase.ii.coprocessor.endpoint;
-
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.kylin.measure.MeasureAggregator;
-import org.apache.kylin.storage.hbase.common.coprocessor.AggrKey;
-import org.apache.kylin.storage.hbase.common.coprocessor.AggregationCache;
-
-/**
- */
-public class EndpointAggregationCache extends AggregationCache {
-
-    private EndpointAggregators aggregators;
-
-    public EndpointAggregationCache(EndpointAggregators aggregators) {
-        this.aggregators = aggregators;
-    }
-
-    @Override
-    public MeasureAggregator[] createBuffer() {
-        return this.aggregators.createBuffer();
-    }
-
-    public Set<Map.Entry<AggrKey, MeasureAggregator[]>> getAllEntries() {
-        return aggBufMap.entrySet();
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/2cc0b9c4/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregators.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregators.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregators.java
deleted file mode 100644
index e481272..0000000
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregators.java
+++ /dev/null
@@ -1,287 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.storage.hbase.ii.coprocessor.endpoint;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.kylin.common.util.BytesSerializer;
-import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.invertedindex.index.RawTableRecord;
-import org.apache.kylin.invertedindex.index.TableRecordInfo;
-import org.apache.kylin.invertedindex.index.TableRecordInfoDigest;
-import org.apache.kylin.invertedindex.measure.FixedLenMeasureCodec;
-import org.apache.kylin.measure.MeasureAggregator;
-import org.apache.kylin.measure.hllc.HLLCMeasureType;
-import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter;
-import org.apache.kylin.metadata.datatype.DataType;
-import org.apache.kylin.metadata.datatype.LongMutable;
-import org.apache.kylin.metadata.model.FunctionDesc;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-/**
- * @author honma
- */
-@SuppressWarnings({ "rawtypes", "unchecked" })
-public class EndpointAggregators {
-
-    private enum MetricType {
-        Count, DimensionAsMetric, DistinctCount, Normal
-    }
-
-    private final static class MetricInfo {
-        private MetricType type;
-        private int refIndex = -1;
-        private int precision = -1;
-
-        public MetricInfo(MetricType type, int refIndex, int precision) {
-            this.type = type;
-            this.refIndex = refIndex;
-            this.precision = precision;
-        }
-
-        public MetricInfo(MetricType type, int refIndex) {
-            this.type = type;
-            this.refIndex = refIndex;
-        }
-
-        public MetricInfo(MetricType type) {
-            this.type = type;
-        }
-
-    }
-
-    private static MetricInfo generateMetricInfo(TableRecordInfo tableInfo, FunctionDesc functionDesc) {
-        if (functionDesc.isCount()) {
-            return new MetricInfo(MetricType.Count);
-        } else if (functionDesc.isDimensionAsMetric()) {
-            return new MetricInfo(MetricType.DimensionAsMetric);
-        } else {
-            int index = tableInfo.findFactTableColumn(functionDesc.getParameter().getValue());
-            Preconditions.checkState(index >= 0, "Column " + functionDesc.getParameter().getValue() + " is not found in II");
-            if (HLLCMeasureType.isCountDistinct(functionDesc)) {
-                return new MetricInfo(MetricType.DistinctCount, index, functionDesc.getReturnDataType().getPrecision());
-            } else {
-                return new MetricInfo(MetricType.Normal, index);
-            }
-        }
-    }
-
-    public static EndpointAggregators fromFunctions(TableRecordInfo tableInfo, List<FunctionDesc> metrics) {
-        final int metricSize = metrics.size();
-        String[] funcNames = new String[metricSize];
-        String[] dataTypes = new String[metricSize];
-        MetricInfo[] metricInfos = new MetricInfo[metricSize];
-        for (int i = 0; i < metricSize; i++) {
-            FunctionDesc functionDesc = metrics.get(i);
-
-            //TODO: what if funcionDesc's type is different from tablDesc? cause scale difference
-            funcNames[i] = functionDesc.getExpression();
-            dataTypes[i] = functionDesc.getReturnType();
-            metricInfos[i] = generateMetricInfo(tableInfo, functionDesc);
-        }
-
-        return new EndpointAggregators(funcNames, dataTypes, metricInfos, tableInfo.getDigest());
-    }
-
-    final String[] funcNames;
-    final String[] dataTypes;
-    final MetricInfo[] metricInfos;
-
-    final transient TableRecordInfoDigest tableRecordInfoDigest;
-    final transient RawTableRecord rawTableRecord;
-    final transient ImmutableBytesWritable byteBuffer;
-    final transient HyperLogLogPlusCounter[] hllcs;
-    final transient FixedLenMeasureCodec[] measureSerializers;
-    final transient Object[] metricValues;
-
-    final LongMutable ONE = new LongMutable(1);
-
-    private EndpointAggregators(String[] funcNames, String[] dataTypes, MetricInfo[] metricInfos, TableRecordInfoDigest tableInfo) {
-        this.funcNames = funcNames;
-        this.dataTypes = dataTypes;
-        this.metricInfos = metricInfos;
-        this.tableRecordInfoDigest = tableInfo;
-        this.rawTableRecord = tableInfo.createTableRecordBytes();
-        this.byteBuffer = new ImmutableBytesWritable();
-
-        this.hllcs = new HyperLogLogPlusCounter[this.metricInfos.length];
-        this.metricValues = new Object[funcNames.length];
-        this.measureSerializers = new FixedLenMeasureCodec[funcNames.length];
-        for (int i = 0; i < this.measureSerializers.length; ++i) {
-            this.measureSerializers[i] = FixedLenMeasureCodec.get(DataType.getType(dataTypes[i]));
-        }
-    }
-
-    public TableRecordInfoDigest getTableRecordInfoDigest() {
-        return tableRecordInfoDigest;
-    }
-
-    public boolean isEmpty() {
-        return !((funcNames != null) && (funcNames.length != 0));
-    }
-
-    public MeasureAggregator[] createBuffer() {
-        MeasureAggregator[] aggrs = new MeasureAggregator[funcNames.length];
-        for (int i = 0; i < aggrs.length; i++) {
-            if (metricInfos[i].type == MetricType.DistinctCount) {
-                aggrs[i] = MeasureAggregator.create(funcNames[i], DataType.getType(dataTypes[i]));
-            } else {
-                //all other fixed length measures can be aggregated as long
-                aggrs[i] = MeasureAggregator.create(funcNames[i], DataType.getType("long"));
-            }
-        }
-        return aggrs;
-    }
-
-    /**
-     * this method is heavily called at coprocessor side,
-     * Make sure as little object creation as possible
-     */
-    public void aggregate(MeasureAggregator[] measureAggrs, byte[] row) {
-
-        rawTableRecord.setBytes(row, 0, row.length);
-
-        for (int metricIndex = 0; metricIndex < metricInfos.length; ++metricIndex) {
-            final MetricInfo metricInfo = metricInfos[metricIndex];
-            if (metricInfo.type == MetricType.Count) {
-                measureAggrs[metricIndex].aggregate(ONE);
-                continue;
-            }
-
-            if (metricInfo.type == MetricType.DimensionAsMetric) {
-                continue;
-            }
-
-            MeasureAggregator aggregator = measureAggrs[metricIndex];
-            FixedLenMeasureCodec measureSerializer = measureSerializers[metricIndex];
-
-            //get the raw bytes
-            rawTableRecord.getValueBytes(metricInfo.refIndex, byteBuffer);
-
-            if (metricInfo.type == MetricType.Normal) {
-                aggregator.aggregate(measureSerializer.read(byteBuffer.get(), byteBuffer.getOffset()));
-            } else if (metricInfo.type == MetricType.DistinctCount) {
-                //TODO: for unified dictionary, this is okay. but if different data blocks uses different dictionary, we'll have to aggregate original data
-                HyperLogLogPlusCounter hllc = hllcs[metricIndex];
-                if (hllc == null) {
-                    int precision = metricInfo.precision;
-                    hllc = new HyperLogLogPlusCounter(precision);
-                }
-                hllc.clear();
-                hllc.add(byteBuffer.get(), byteBuffer.getOffset(), byteBuffer.getLength());
-                aggregator.aggregate(hllc);
-            }
-        }
-    }
-
-    /**
-     * @param aggrs
-     * @param buffer byte buffer to get the metric data
-     * @return length of metric data
-     */
-    public int serializeMetricValues(MeasureAggregator[] aggrs, byte[] buffer, int offset) {
-        for (int i = 0; i < funcNames.length; i++) {
-            metricValues[i] = aggrs[i].getState();
-        }
-
-        int metricBytesOffset = offset;
-        int length = 0;
-        for (int i = 0; i < measureSerializers.length; i++) {
-            measureSerializers[i].write(metricValues[i], buffer, metricBytesOffset);
-            metricBytesOffset += measureSerializers[i].getLength();
-            length += measureSerializers[i].getLength();
-        }
-        return length;
-    }
-
-    public List<Object> deserializeMetricValues(ByteBuffer buffer) {
-        List<Object> ret = Lists.newArrayList();
-        for (int i = 0; i < measureSerializers.length; i++) {
-            measureSerializers[i].read(buffer);
-            Object valueString = measureSerializers[i].getValue();
-            ret.add(valueString);
-        }
-        return ret;
-    }
-
-    public static byte[] serialize(EndpointAggregators o) {
-        ByteBuffer buf = ByteBuffer.allocate(BytesSerializer.SERIALIZE_BUFFER_SIZE);
-        serializer.serialize(o, buf);
-        byte[] result = new byte[buf.position()];
-        System.arraycopy(buf.array(), 0, result, 0, buf.position());
-        return result;
-    }
-
-    public static EndpointAggregators deserialize(byte[] bytes) {
-        return serializer.deserialize(ByteBuffer.wrap(bytes));
-    }
-
-    private static final BytesSerializer<EndpointAggregators> serializer = new BytesSerializer<EndpointAggregators>() {
-
-        @Override
-        public void serialize(EndpointAggregators value, ByteBuffer out) {
-            BytesUtil.writeAsciiStringArray(value.funcNames, out);
-            BytesUtil.writeAsciiStringArray(value.dataTypes, out);
-
-            BytesUtil.writeVInt(value.metricInfos.length, out);
-            for (int i = 0; i < value.metricInfos.length; ++i) {
-                MetricInfo metricInfo = value.metricInfos[i];
-                BytesUtil.writeAsciiString(metricInfo.type.toString(), out);
-                BytesUtil.writeVInt(metricInfo.refIndex, out);
-                BytesUtil.writeVInt(metricInfo.precision, out);
-            }
-
-            BytesUtil.writeByteArray(TableRecordInfoDigest.serialize(value.tableRecordInfoDigest), out);
-        }
-
-        @Override
-        public EndpointAggregators deserialize(ByteBuffer in) {
-
-            String[] funcNames = BytesUtil.readAsciiStringArray(in);
-            String[] dataTypes = BytesUtil.readAsciiStringArray(in);
-
-            int metricInfoLength = BytesUtil.readVInt(in);
-            MetricInfo[] infos = new MetricInfo[metricInfoLength];
-            for (int i = 0; i < infos.length; ++i) {
-                MetricType type = MetricType.valueOf(BytesUtil.readAsciiString(in));
-                int refIndex = BytesUtil.readVInt(in);
-                int presision = BytesUtil.readVInt(in);
-                infos[i] = new MetricInfo(type, refIndex, presision);
-            }
-
-            byte[] temp = BytesUtil.readByteArray(in);
-            TableRecordInfoDigest tableInfo = TableRecordInfoDigest.deserialize(temp);
-
-            return new EndpointAggregators(funcNames, dataTypes, infos, tableInfo);
-        }
-
-    };
-
-    public int getMeasureSerializeLength() {
-        int length = 0;
-        for (int i = 0; i < this.measureSerializers.length; ++i) {
-            length += this.measureSerializers[i].getLength();
-        }
-        return length;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/2cc0b9c4/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointEnabler.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointEnabler.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointEnabler.java
deleted file mode 100644
index 2ae7f35..0000000
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointEnabler.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.storage.hbase.ii.coprocessor.endpoint;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author honma
- */
-public class EndpointEnabler {
-
-    private static final Logger logger = LoggerFactory.getLogger(EndpointEnabler.class);
-
-    static final String FORCE_COPROCESSOR = "forceEndpoint";
-
-    public static boolean isCoprocessorBeneficial() {
-        return Boolean.parseBoolean(getForceCoprocessor());
-    }
-
-    public static void forceCoprocessorOn() {
-        System.setProperty(FORCE_COPROCESSOR, "true");
-    }
-
-    public static void forceCoprocessorOff() {
-        System.setProperty(FORCE_COPROCESSOR, "false");
-    }
-
-    public static String getForceCoprocessor() {
-        return System.getProperty(FORCE_COPROCESSOR);
-    }
-
-    public static void forceCoprocessorUnset() {
-        System.clearProperty(FORCE_COPROCESSOR);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/2cc0b9c4/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointTupleConverter.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointTupleConverter.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointTupleConverter.java
deleted file mode 100644
index 3fdd5b0..0000000
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointTupleConverter.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.storage.hbase.ii.coprocessor.endpoint;
-
-import java.util.List;
-
-import org.apache.kylin.invertedindex.index.TableRecord;
-import org.apache.kylin.metadata.model.FunctionDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.tuple.ITuple;
-import org.apache.kylin.metadata.tuple.Tuple;
-import org.apache.kylin.metadata.tuple.TupleInfo;
-
-public class EndpointTupleConverter {
-
-    final TupleInfo tupleInfo;
-    final List<TblColRef> columns;
-    final int[] columnTupleIdx;
-    final int[] aggrTupleIdx;
-
-    public EndpointTupleConverter(List<TblColRef> columns, List<FunctionDesc> aggrMeasures, TupleInfo returnTupleInfo) {
-        this.tupleInfo = returnTupleInfo;
-        this.columns = columns;
-        this.columnTupleIdx = new int[columns.size()];
-        this.aggrTupleIdx = new int[aggrMeasures.size()];
-
-        for (int i = 0; i < columns.size(); i++) {
-            TblColRef col = columns.get(i);
-            columnTupleIdx[i] = tupleInfo.hasColumn(col) ? tupleInfo.getColumnIndex(col) : -1;
-        }
-
-        for (int i = 0; i < aggrMeasures.size(); i++) {
-            FunctionDesc measure = aggrMeasures.get(i);
-            int tupleIdx;
-            if (measure.isDimensionAsMetric()) {
-                // for dimension playing as metrics, the measure is just a placeholder, the real value comes from columns
-                tupleIdx = -1;
-            } else if (measure.needRewrite()) {
-                // a rewrite metrics is identified by its rewrite field name
-                String rewriteFieldName = measure.getRewriteFieldName();
-                tupleIdx = tupleInfo.hasField(rewriteFieldName) ? tupleInfo.getFieldIndex(rewriteFieldName) : -1;
-            } else {
-                // a non-rewrite metrics (i.e. sum) is like a dimension column
-                TblColRef col = measure.getParameter().getColRefs().get(0);
-                tupleIdx = tupleInfo.hasColumn(col) ? tupleInfo.getColumnIndex(col) : -1;
-            }
-            aggrTupleIdx[i] = tupleIdx;
-        }
-    }
-
-    public ITuple makeTuple(TableRecord tableRecord, List<Object> measureValues, Tuple tuple) {
-        // dimensions and metrics from II table record 
-        for (int i = 0; i < columnTupleIdx.length; i++) {
-            int tupleIdx = columnTupleIdx[i];
-            if (tupleIdx >= 0) {
-                String value = tableRecord.getValueString(i);
-                tuple.setDimensionValue(tupleIdx, value);
-            }
-        }
-
-        // additional aggregations calculated inside end point (like cube measures)
-        if (measureValues != null) {
-            for (int i = 0; i < aggrTupleIdx.length; ++i) {
-                int tupleIdx = aggrTupleIdx[i];
-                if (tupleIdx >= 0) {
-                    Object value = measureValues.get(i);
-                    if (value instanceof String) {
-                        String dataType = tuple.getDataTypeName(tupleIdx);
-                        value = Tuple.convertOptiqCellValue((String) value, dataType);
-                    }
-                    tuple.setMeasureValue(tupleIdx, value);
-                }
-            }
-        }
-        return tuple;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/2cc0b9c4/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointTupleIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointTupleIterator.java
deleted file mode 100644
index e197e3e..0000000
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointTupleIterator.java
+++ /dev/null
@@ -1,364 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.storage.hbase.ii.coprocessor.endpoint;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import javax.annotation.Nullable;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.SerializationUtils;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.coprocessor.Batch;
-import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
-import org.apache.hadoop.hbase.ipc.ServerRpcController;
-import org.apache.kylin.common.util.CompressionUtils;
-import org.apache.kylin.common.util.DateFormat;
-import org.apache.kylin.common.util.RangeUtil;
-import org.apache.kylin.invertedindex.IISegment;
-import org.apache.kylin.invertedindex.index.TableRecord;
-import org.apache.kylin.invertedindex.index.TableRecordInfo;
-import org.apache.kylin.measure.hllc.HLLCMeasureType;
-import org.apache.kylin.metadata.filter.ConstantTupleFilter;
-import org.apache.kylin.metadata.filter.TupleFilter;
-import org.apache.kylin.metadata.model.FunctionDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.tuple.ITuple;
-import org.apache.kylin.metadata.tuple.ITupleIterator;
-import org.apache.kylin.metadata.tuple.Tuple;
-import org.apache.kylin.metadata.tuple.TupleInfo;
-import org.apache.kylin.storage.StorageContext;
-import org.apache.kylin.storage.cache.TsConditionExtractor;
-import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorFilter;
-import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorProjector;
-import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorRowType;
-import org.apache.kylin.storage.hbase.common.coprocessor.FilterDecorator;
-import org.apache.kylin.storage.hbase.ii.coprocessor.endpoint.generated.IIProtos;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Collections2;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Range;
-import com.google.common.collect.Ranges;
-import com.google.common.collect.Sets;
-import com.google.protobuf.HBaseZeroCopyByteString;
-
-/**
- */
-public class EndpointTupleIterator implements ITupleIterator {
-
-    private final static Logger logger = LoggerFactory.getLogger(EndpointTupleIterator.class);
-
-    private final IISegment seg;
-
-    private final String factTableName;
-    private final List<TblColRef> columns;
-    private final TupleInfo tupleInfo;
-    private final TableRecordInfo tableRecordInfo;
-    private final EndpointTupleConverter tupleConverter;
-
-    private final CoprocessorRowType pushedDownRowType;
-    private final CoprocessorFilter pushedDownFilter;
-    private final CoprocessorProjector pushedDownProjector;
-    private final EndpointAggregators pushedDownAggregators;
-    private final Range<Long> tsRange;//timestamp column condition's interval
-
-    private Iterator<List<IIProtos.IIResponseInternal.IIRow>> regionResponsesIterator = null;
-    private ITupleIterator tupleIterator = null;
-    private HTableInterface table = null;
-
-    private TblColRef partitionCol;
-    private long lastDataTime = -1;
-    private int rowsInAllMetric = 0;
-
-    public EndpointTupleIterator(IISegment segment, TupleFilter rootFilter, Collection<TblColRef> groupBy, List<FunctionDesc> measures, StorageContext context, HConnection conn, TupleInfo returnTupleInfo) throws Throwable {
-
-        String tableName = segment.getStorageLocationIdentifier();
-        table = conn.getTable(tableName);
-        factTableName = segment.getIIDesc().getFactTableName();
-
-        if (rootFilter == null) {
-            rootFilter = ConstantTupleFilter.TRUE;
-        }
-
-        if (groupBy == null) {
-            groupBy = Sets.newHashSet();
-        }
-
-        if (measures == null) {
-            measures = Lists.newArrayList();
-        }
-
-        //this method will change measures
-        rewriteMeasureParameters(measures, segment.getColumns());
-
-        this.seg = segment;
-        this.columns = segment.getColumns();
-
-        this.tupleInfo = returnTupleInfo;
-        this.tupleConverter = new EndpointTupleConverter(columns, measures, returnTupleInfo);
-        this.tableRecordInfo = new TableRecordInfo(this.seg);
-
-        this.pushedDownRowType = CoprocessorRowType.fromTableRecordInfo(tableRecordInfo, this.columns);
-        this.pushedDownFilter = CoprocessorFilter.fromFilter(new ClearTextDictionary(this.tableRecordInfo), rootFilter, FilterDecorator.FilterConstantsTreatment.AS_IT_IS);
-
-        for (TblColRef column : this.pushedDownFilter.getInevaluableColumns()) {
-            groupBy.add(column);
-        }
-
-        this.pushedDownProjector = CoprocessorProjector.makeForEndpoint(tableRecordInfo, groupBy);
-        this.pushedDownAggregators = EndpointAggregators.fromFunctions(tableRecordInfo, measures);
-
-        int tsCol = this.tableRecordInfo.getTimestampColumn();
-        this.partitionCol = this.columns.get(tsCol);
-        this.tsRange = TsConditionExtractor.extractTsCondition(this.partitionCol, rootFilter);
-
-        if (this.tsRange == null) {
-            logger.info("TsRange conflict for endpoint, return empty directly");
-            this.tupleIterator = ITupleIterator.EMPTY_TUPLE_ITERATOR;
-        } else {
-            logger.info("The tsRange being pushed is " + RangeUtil.formatTsRange(tsRange));
-        }
-
-        IIProtos.IIRequest endpointRequest = prepareRequest();
-        Collection<IIProtos.IIResponse> compressedShardResults = getResults(endpointRequest, table);
-
-        //decompress
-        Collection<IIProtos.IIResponseInternal> shardResults = new ArrayList<>();
-        for (IIProtos.IIResponse input : compressedShardResults) {
-            byte[] compressed = HBaseZeroCopyByteString.zeroCopyGetBytes(input.getBlob());
-            try {
-                byte[] decompressed = CompressionUtils.decompress(compressed);
-                shardResults.add(IIProtos.IIResponseInternal.parseFrom(decompressed));
-            } catch (Exception e) {
-                throw new RuntimeException("decompress endpoint response error");
-            }
-        }
-
-        this.lastDataTime = Collections.min(Collections2.transform(shardResults, new Function<IIProtos.IIResponseInternal, Long>() {
-            @Nullable
-            @Override
-            public Long apply(IIProtos.IIResponseInternal input) {
-
-                IIProtos.IIResponseInternal.Stats status = input.getStats();
-                logger.info("Endpoints all returned, stats from shard {}: start moment:{}, finish moment: {}, elapsed ms: {}, scanned slices: {}, latest slice time is {}", //
-                        new Object[] { String.valueOf(status.getMyShard()), //
-                                DateFormat.formatToTimeStr(status.getServiceStartTime()), //
-                                DateFormat.formatToTimeStr(status.getServiceEndTime()), //
-                                String.valueOf(status.getServiceEndTime() - status.getServiceStartTime()), //
-                                String.valueOf(status.getScannedSlices()), DateFormat.formatToTimeStr(status.getLatestDataTime()) });
-
-                return status.getLatestDataTime();
-            }
-        }));
-
-        this.regionResponsesIterator = Collections2.transform(shardResults, new Function<IIProtos.IIResponseInternal, List<IIProtos.IIResponseInternal.IIRow>>() {
-            @Nullable
-            @Override
-            public List<IIProtos.IIResponseInternal.IIRow> apply(@Nullable IIProtos.IIResponseInternal input) {
-                return input.getRowsList();
-            }
-        }).iterator();
-
-        if (this.regionResponsesIterator.hasNext()) {
-            this.tupleIterator = new SingleRegionTupleIterator(this.regionResponsesIterator.next());
-        } else {
-            this.tupleIterator = ITupleIterator.EMPTY_TUPLE_ITERATOR;
-        }
-    }
-
-    /**
-     * measure comes from query engine, does not contain enough information
-     */
-    private void rewriteMeasureParameters(List<FunctionDesc> measures, List<TblColRef> columns) {
-        for (FunctionDesc functionDesc : measures) {
-            if (functionDesc.isCount()) {
-                functionDesc.setReturnType("bigint");
-            } else {
-                boolean updated = false;
-                for (TblColRef column : columns) {
-                    if (column.isSameAs(factTableName, functionDesc.getParameter().getValue())) {
-                        if (HLLCMeasureType.isCountDistinct(functionDesc)) {
-                            //TODO: default precision might need be configurable
-                            String iiDefaultHLLC = "hllc10";
-                            functionDesc.setReturnType(iiDefaultHLLC);
-                        } else {
-                            functionDesc.setReturnType(column.getColumnDesc().getType().toString());
-                        }
-                        functionDesc.getParameter().setColRefs(ImmutableList.of(column));
-                        updated = true;
-                        break;
-                    }
-                }
-                if (!updated) {
-                    throw new RuntimeException("Func " + functionDesc + " is not related to any column in fact table " + factTableName);
-                }
-            }
-        }
-    }
-
-    @Override
-    public boolean hasNext() {
-        while (!this.tupleIterator.hasNext()) {
-            if (this.regionResponsesIterator.hasNext()) {
-                this.tupleIterator = new SingleRegionTupleIterator(this.regionResponsesIterator.next());
-            } else {
-                return false;
-            }
-        }
-        return true;
-    }
-
-    @Override
-    public ITuple next() {
-        rowsInAllMetric++;
-
-        if (!hasNext()) {
-            throw new IllegalStateException("No more ITuple in EndpointTupleIterator");
-        }
-
-        ITuple tuple = this.tupleIterator.next();
-        return tuple;
-    }
-
-    @Override
-    public void remove() {
-        throw new UnsupportedOperationException();
-
-    }
-
-    @Override
-    public void close() {
-        IOUtils.closeQuietly(table);
-        logger.info("Closed after " + rowsInAllMetric + " rows are fetched");
-    }
-
-    /**
-     * tells storage layer cache what time period of data should not be cached.
-     * for static storage like cube, it will return null
-     * for dynamic storage like ii, it will for example exclude the last two minutes for possible data latency
-     * @return
-     */
-    public Range<Long> getCacheExcludedPeriod() {
-        Preconditions.checkArgument(lastDataTime != -1, "lastDataTime is not set yet");
-        return Ranges.greaterThan(lastDataTime);
-    }
-
-    private IIProtos.IIRequest prepareRequest() throws IOException {
-        IIProtos.IIRequest.Builder builder = IIProtos.IIRequest.newBuilder();
-
-        if (this.tsRange != null) {
-            byte[] tsRangeBytes = SerializationUtils.serialize(this.tsRange);
-            builder.setTsRange(HBaseZeroCopyByteString.wrap(tsRangeBytes));
-        }
-
-        builder.setType(HBaseZeroCopyByteString.wrap(CoprocessorRowType.serialize(pushedDownRowType))) //
-                .setFilter(HBaseZeroCopyByteString.wrap(CoprocessorFilter.serialize(pushedDownFilter))) //
-                .setProjector(HBaseZeroCopyByteString.wrap(CoprocessorProjector.serialize(pushedDownProjector))) //
-                .setAggregator(HBaseZeroCopyByteString.wrap(EndpointAggregators.serialize(pushedDownAggregators)));
-
-        IIProtos.IIRequest request = builder.build();
-
-        return request;
-    }
-
-    private Collection<IIProtos.IIResponse> getResults(final IIProtos.IIRequest request, HTableInterface table) throws Throwable {
-        Map<byte[], IIProtos.IIResponse> results = table.coprocessorService(IIProtos.RowsService.class, null, null, new Batch.Call<IIProtos.RowsService, IIProtos.IIResponse>() {
-            public IIProtos.IIResponse call(IIProtos.RowsService rowsService) throws IOException {
-                ServerRpcController controller = new ServerRpcController();
-                BlockingRpcCallback<IIProtos.IIResponse> rpcCallback = new BlockingRpcCallback<>();
-                rowsService.getRows(controller, request, rpcCallback);
-                IIProtos.IIResponse response = rpcCallback.get();
-                if (controller.failedOnException()) {
-                    throw controller.getFailedOn();
-                }
-
-                return response;
-            }
-        });
-
-        return results.values();
-    }
-
-    /**
-     * Internal class to handle iterators for a single region's returned rows
-     */
-    class SingleRegionTupleIterator implements ITupleIterator {
-        private List<IIProtos.IIResponseInternal.IIRow> rows;
-        private int index = 0;
-
-        //not thread safe!
-        private TableRecord tableRecord;
-        private List<Object> measureValues;
-        private Tuple tuple;
-
-        public SingleRegionTupleIterator(List<IIProtos.IIResponseInternal.IIRow> rows) {
-            this.rows = rows;
-            this.index = 0;
-            this.tableRecord = tableRecordInfo.createTableRecord();
-            this.tuple = new Tuple(tupleInfo);
-        }
-
-        @Override
-        public boolean hasNext() {
-            return index < rows.size();
-        }
-
-        @Override
-        public ITuple next() {
-            if (!hasNext()) {
-                throw new IllegalStateException("No more Tuple in the SingleRegionTupleIterator");
-            }
-
-            IIProtos.IIResponseInternal.IIRow currentRow = rows.get(index);
-            byte[] columnsBytes = HBaseZeroCopyByteString.zeroCopyGetBytes(currentRow.getColumns());
-            this.tableRecord.setBytes(columnsBytes, 0, columnsBytes.length);
-            if (currentRow.hasMeasures()) {
-                ByteBuffer buffer = currentRow.getMeasures().asReadOnlyByteBuffer();
-                this.measureValues = pushedDownAggregators.deserializeMetricValues(buffer);
-            }
-
-            index++;
-
-            return tupleConverter.makeTuple(this.tableRecord, this.measureValues, this.tuple);
-        }
-
-        @Override
-        public void remove() {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public void close() {
-        }
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/2cc0b9c4/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/HbaseServerKVIterator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/HbaseServerKVIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/HbaseServerKVIterator.java
deleted file mode 100644
index a1d0e35..0000000
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/HbaseServerKVIterator.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.storage.hbase.ii.coprocessor.endpoint;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.regionserver.RegionScanner;
-import org.apache.kylin.invertedindex.model.IIRow;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-
-/**
- */
-public class HbaseServerKVIterator implements Iterable<IIRow>, Closeable {
-
-    private RegionScanner innerScanner;
-    private Logger logger = LoggerFactory.getLogger(HbaseServerKVIterator.class);
-
-    public HbaseServerKVIterator(RegionScanner innerScanner) {
-        this.innerScanner = innerScanner;
-    }
-
-    @Override
-    public void close() throws IOException {
-        IOUtils.closeQuietly(this.innerScanner);
-    }
-
-    private static class IIRowIterator implements Iterator<IIRow> {
-
-        private final RegionScanner regionScanner;
-        private final IIRow row = new IIRow();
-        List<Cell> results = Lists.newArrayList();
-
-        private boolean hasMore;
-
-        IIRowIterator(RegionScanner innerScanner) {
-            this.regionScanner = innerScanner;
-            try {
-                hasMore = regionScanner.nextRaw(results);
-            } catch (IOException e) {
-                throw new RuntimeException(e);
-            }
-        }
-
-        @Override
-        public boolean hasNext() {
-            return !results.isEmpty();
-        }
-
-        @Override
-        public IIRow next() {
-            if (results.size() < 1) {
-                throw new NoSuchElementException();
-            }
-            for (Cell c : results) {
-                row.updateWith(c);
-            }
-            results.clear();
-            try {
-                if (hasMore) {
-                    hasMore = regionScanner.nextRaw(results);
-                }
-            } catch (IOException e) {
-                throw new RuntimeException(e);
-            }
-            return row;
-        }
-
-        @Override
-        public void remove() {
-            throw new UnsupportedOperationException();
-        }
-    }
-
-    @Override
-    public Iterator<IIRow> iterator() {
-        return new IIRowIterator(innerScanner);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/2cc0b9c4/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/IIEndpoint.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/IIEndpoint.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/IIEndpoint.java
deleted file mode 100644
index ef7de3a..0000000
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/IIEndpoint.java
+++ /dev/null
@@ -1,336 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.storage.hbase.ii.coprocessor.endpoint;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.Map;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang.ArrayUtils;
-import org.apache.commons.lang.SerializationUtils;
-import org.apache.hadoop.hbase.Coprocessor;
-import org.apache.hadoop.hbase.CoprocessorEnvironment;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
-import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.protobuf.ResponseConverter;
-import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.regionserver.RegionScanner;
-import org.apache.kylin.common.util.Array;
-import org.apache.kylin.common.util.BytesSerializer;
-import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.common.util.CompressionUtils;
-import org.apache.kylin.common.util.Dictionary;
-import org.apache.kylin.cube.kv.RowKeyColumnIO;
-import org.apache.kylin.dict.TrieDictionary;
-import org.apache.kylin.dimension.FixedLenDimEnc;
-import org.apache.kylin.invertedindex.index.RawTableRecord;
-import org.apache.kylin.invertedindex.index.Slice;
-import org.apache.kylin.invertedindex.index.TableRecordInfoDigest;
-import org.apache.kylin.invertedindex.model.IIDesc;
-import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
-import org.apache.kylin.measure.MeasureAggregator;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.storage.hbase.common.coprocessor.AggrKey;
-import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorFilter;
-import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorProjector;
-import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorRowType;
-import org.apache.kylin.storage.hbase.common.coprocessor.FilterDecorator;
-import org.apache.kylin.storage.hbase.ii.coprocessor.endpoint.generated.IIProtos;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Range;
-import com.google.protobuf.HBaseZeroCopyByteString;
-import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcController;
-import com.google.protobuf.Service;
-
-import it.uniroma3.mat.extendedset.intset.ConciseSet;
-
-/**
- */
-public class IIEndpoint extends IIProtos.RowsService implements Coprocessor, CoprocessorService {
-
-    private static final Logger logger = LoggerFactory.getLogger(IIEndpoint.class);
-    private static final int MEMORY_LIMIT = 500 * 1024 * 1024;
-
-    private RegionCoprocessorEnvironment env;
-    private long serviceStartTime;
-    private int shard;
-
-    public IIEndpoint() {
-    }
-
-    private Scan prepareScan(IIProtos.IIRequest request, HRegion region) throws IOException {
-        Scan scan = new Scan();
-
-        scan.addColumn(IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_QUALIFIER_BYTES);
-        scan.addColumn(IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_DICTIONARY_BYTES);
-
-        if (request.hasTsRange()) {
-            Range<Long> tsRange = (Range<Long>) SerializationUtils.deserialize(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getTsRange()));
-            byte[] regionStartKey = region.getStartKey();
-            if (!ArrayUtils.isEmpty(regionStartKey)) {
-                shard = BytesUtil.readUnsigned(regionStartKey, 0, IIKeyValueCodec.SHARD_LEN);
-            } else {
-                shard = 0;
-            }
-            logger.info("Start key of the region is: " + BytesUtil.toReadableText(regionStartKey) + ", making shard to be :" + shard);
-
-            if (tsRange.hasLowerBound()) {
-                //differentiate GT and GTE seems not very beneficial
-                Preconditions.checkArgument(shard != -1, "Shard is -1!");
-                long tsStart = tsRange.lowerEndpoint();
-                logger.info("ts start is " + tsStart);
-
-                byte[] idealStartKey = new byte[IIKeyValueCodec.SHARD_LEN + IIKeyValueCodec.TIMEPART_LEN];
-                BytesUtil.writeUnsigned(shard, idealStartKey, 0, IIKeyValueCodec.SHARD_LEN);
-                BytesUtil.writeLong(tsStart, idealStartKey, IIKeyValueCodec.SHARD_LEN, IIKeyValueCodec.TIMEPART_LEN);
-                logger.info("ideaStartKey is(readable) :" + BytesUtil.toReadableText(idealStartKey));
-                Result result = region.getClosestRowBefore(idealStartKey, IIDesc.HBASE_FAMILY_BYTES);
-                if (result != null) {
-                    byte[] actualStartKey = Arrays.copyOf(result.getRow(), IIKeyValueCodec.SHARD_LEN + IIKeyValueCodec.TIMEPART_LEN);
-                    scan.setStartRow(actualStartKey);
-                    logger.info("The start key is set to " + BytesUtil.toReadableText(actualStartKey));
-                } else {
-                    logger.info("There is no key before ideaStartKey so ignore tsStart");
-                }
-            }
-
-            if (tsRange.hasUpperBound()) {
-                //differentiate LT and LTE seems not very beneficial
-                Preconditions.checkArgument(shard != -1, "Shard is -1");
-                long tsEnd = tsRange.upperEndpoint();
-                logger.info("ts end is " + tsEnd);
-
-                byte[] actualEndKey = new byte[IIKeyValueCodec.SHARD_LEN + IIKeyValueCodec.TIMEPART_LEN];
-                BytesUtil.writeUnsigned(shard, actualEndKey, 0, IIKeyValueCodec.SHARD_LEN);
-                BytesUtil.writeLong(tsEnd + 1, actualEndKey, IIKeyValueCodec.SHARD_LEN, IIKeyValueCodec.TIMEPART_LEN);//notice +1 here
-                scan.setStopRow(actualEndKey);
-                logger.info("The stop key is set to " + BytesUtil.toReadableText(actualEndKey));
-            }
-        }
-
-        return scan;
-    }
-
-    @Override
-    public void getRows(RpcController controller, IIProtos.IIRequest request, RpcCallback<IIProtos.IIResponse> done) {
-
-        this.serviceStartTime = System.currentTimeMillis();
-
-        RegionScanner innerScanner = null;
-        HRegion region = null;
-
-        try {
-            region = env.getRegion();
-            region.startRegionOperation();
-
-            innerScanner = region.getScanner(prepareScan(request, region));
-
-            CoprocessorRowType type = CoprocessorRowType.deserialize(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getType()));
-            CoprocessorProjector projector = CoprocessorProjector.deserialize(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getProjector()));
-            EndpointAggregators aggregators = EndpointAggregators.deserialize(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getAggregator()));
-            CoprocessorFilter filter = CoprocessorFilter.deserialize(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getFilter()));
-
-            //compression
-            IIProtos.IIResponseInternal response = getResponse(innerScanner, type, projector, aggregators, filter);
-            byte[] compressed = CompressionUtils.compress(response.toByteArray());
-            IIProtos.IIResponse compressedR = IIProtos.IIResponse.newBuilder().setBlob(HBaseZeroCopyByteString.wrap(compressed)).build();
-
-            done.run(compressedR);
-        } catch (IOException ioe) {
-            logger.error(ioe.toString());
-            ResponseConverter.setControllerException(controller, ioe);
-        } finally {
-            IOUtils.closeQuietly(innerScanner);
-            if (region != null) {
-                try {
-                    region.closeRegionOperation();
-                } catch (IOException e) {
-                    e.printStackTrace();
-                    throw new RuntimeException(e);
-                }
-            }
-        }
-    }
-
-    public IIProtos.IIResponseInternal getResponse(RegionScanner innerScanner, CoprocessorRowType type, CoprocessorProjector projector, EndpointAggregators aggregators, CoprocessorFilter filter) {
-
-        TableRecordInfoDigest tableRecordInfoDigest = aggregators.getTableRecordInfoDigest();
-
-        IIProtos.IIResponseInternal response;
-
-        synchronized (innerScanner) {
-            IIKeyValueCodec codec = new IIKeyValueCodec(tableRecordInfoDigest);
-            //TODO pass projector to codec to skip loading columns
-            Iterable<Slice> slices = codec.decodeKeyValue(new HbaseServerKVIterator(innerScanner));
-
-            response = getResponseInternal(slices, tableRecordInfoDigest, filter, type, projector, aggregators);
-        }
-        return response;
-    }
-
-    private IIProtos.IIResponseInternal getResponseInternal(Iterable<Slice> slices, TableRecordInfoDigest recordInfo, CoprocessorFilter filter, CoprocessorRowType type, CoprocessorProjector projector, EndpointAggregators aggregators) {
-        boolean needAgg = projector.hasGroupby() || !aggregators.isEmpty();
-
-        //for needAgg use
-        EndpointAggregationCache aggCache = new EndpointAggregationCache(aggregators);
-        //for no needAgg use
-        final int byteFormLen = recordInfo.getByteFormLen();
-        int totalByteFormLen = 0;
-
-        IIProtos.IIResponseInternal.Builder responseBuilder = IIProtos.IIResponseInternal.newBuilder();
-        ClearTextDictionary clearTextDictionary = new ClearTextDictionary(recordInfo, type);
-        RowKeyColumnIO rowKeyColumnIO = new RowKeyColumnIO(clearTextDictionary);
-
-        byte[] recordBuffer = new byte[recordInfo.getByteFormLen()];
-        byte[] buffer = new byte[BytesSerializer.SERIALIZE_BUFFER_SIZE];
-
-        int iteratedSliceCount = 0;
-        long latestSliceTs = Long.MIN_VALUE;
-        for (Slice slice : slices) {
-            latestSliceTs = slice.getTimestamp();
-            iteratedSliceCount++;
-
-            //dictionaries for fact table columns can not be determined while streaming.
-            //a piece of dict coincide with each Slice, we call it "local dict"
-            final Dictionary<?>[] localDictionaries = slice.getLocalDictionaries();
-            CoprocessorFilter newFilter;
-            final boolean emptyDictionary = Array.isEmpty(localDictionaries);
-            if (emptyDictionary) {
-                newFilter = filter;
-            } else {
-                for (Dictionary<?> localDictionary : localDictionaries) {
-                    if (localDictionary instanceof TrieDictionary) {
-                        ((TrieDictionary) localDictionary).enableIdToValueBytesCache();
-                    }
-                }
-                newFilter = CoprocessorFilter.fromFilter(new LocalDictionary(localDictionaries, type, slice.getInfo()), filter.getFilter(), FilterDecorator.FilterConstantsTreatment.REPLACE_WITH_LOCAL_DICT);
-            }
-
-            ConciseSet result = null;
-            if (filter != null) {
-                result = new BitMapFilterEvaluator(new SliceBitMapProvider(slice, type)).evaluate(newFilter.getFilter());
-            }
-
-            Iterator<RawTableRecord> iterator = slice.iterateWithBitmap(result);
-
-            TblColRef[] columns = type.columns;
-            int[] finalColumnLength = new int[columns.length];
-            for (int i = 0; i < columns.length; ++i) {
-                finalColumnLength[i] = rowKeyColumnIO.getColumnLength(columns[i]);
-            }
-
-            while (iterator.hasNext()) {
-                final RawTableRecord rawTableRecord = iterator.next();
-                decodeWithDictionary(recordBuffer, rawTableRecord, localDictionaries, recordInfo, rowKeyColumnIO, finalColumnLength);
-
-                if (needAgg) {
-                    //if has group by, group them first, and extract entries later
-                    AggrKey aggKey = projector.getAggrKey(recordBuffer);
-                    MeasureAggregator[] bufs = aggCache.getBuffer(aggKey);
-                    aggregators.aggregate(bufs, recordBuffer);
-                    aggCache.checkMemoryUsage();
-                } else {
-                    //otherwise directly extract entry and put into response
-                    if (totalByteFormLen >= MEMORY_LIMIT) {
-                        throw new RuntimeException("the query has exceeded the memory limit, please check the query");
-                    }
-                    IIProtos.IIResponseInternal.IIRow.Builder rowBuilder = IIProtos.IIResponseInternal.IIRow.newBuilder().setColumns(HBaseZeroCopyByteString.wrap(recordBuffer));
-                    responseBuilder.addRows(rowBuilder.build());
-                    totalByteFormLen += byteFormLen;
-                }
-            }
-        }
-
-        logger.info("Iterated Slices count: " + iteratedSliceCount);
-
-        if (needAgg) {
-            int offset = 0;
-            int measureLength = aggregators.getMeasureSerializeLength();
-            for (Map.Entry<AggrKey, MeasureAggregator[]> entry : aggCache.getAllEntries()) {
-                AggrKey aggrKey = entry.getKey();
-                IIProtos.IIResponseInternal.IIRow.Builder rowBuilder = IIProtos.IIResponseInternal.IIRow.newBuilder().setColumns(HBaseZeroCopyByteString.wrap(aggrKey.get(), aggrKey.offset(), aggrKey.length()));
-                if (offset + measureLength > buffer.length) {
-                    buffer = new byte[BytesSerializer.SERIALIZE_BUFFER_SIZE];
-                    offset = 0;
-                }
-                int length = aggregators.serializeMetricValues(entry.getValue(), buffer, offset);
-                rowBuilder.setMeasures(HBaseZeroCopyByteString.wrap(buffer, offset, length));
-                offset += length;
-                responseBuilder.addRows(rowBuilder.build());
-            }
-        }
-
-        responseBuilder.setStats(IIProtos.IIResponseInternal.Stats.newBuilder().setLatestDataTime(latestSliceTs).setServiceStartTime(this.serviceStartTime).setServiceEndTime(System.currentTimeMillis()).setScannedSlices(iteratedSliceCount));
-        return responseBuilder.build();
-    }
-
-    private void decodeWithDictionary(byte[] recordBuffer, RawTableRecord encodedRecord, Dictionary<?>[] localDictionaries, TableRecordInfoDigest digest, RowKeyColumnIO rowKeyColumnIO, int[] finalColumnLengths) {
-        final boolean[] isMetric = digest.isMetrics();
-        final boolean emptyDictionary = Array.isEmpty(localDictionaries);
-        for (int i = 0; i < finalColumnLengths.length; i++) {
-            if (isMetric[i]) {
-                writeColumnWithoutDictionary(encodedRecord.getBytes(), encodedRecord.offset(i), encodedRecord.length(i), recordBuffer, digest.offset(i), finalColumnLengths[i]);
-            } else {
-                if (emptyDictionary) {
-                    writeColumnWithoutDictionary(encodedRecord.getBytes(), encodedRecord.offset(i), encodedRecord.length(i), recordBuffer, digest.offset(i), finalColumnLengths[i]);
-                } else {
-                    final Dictionary<?> localDictionary = localDictionaries[i];
-                    final byte[] valueBytesFromId = localDictionary.getValueBytesFromId(encodedRecord.getValueID(i));
-                    writeColumnWithoutDictionary(valueBytesFromId, 0, valueBytesFromId.length, recordBuffer, digest.offset(i), finalColumnLengths[i]);
-                }
-            }
-        }
-    }
-
-    private void writeColumnWithoutDictionary(byte[] src, int srcOffset, int srcLength, byte[] dst, int dstOffset, int dstLength) {
-        if (srcLength >= dstLength) {
-            System.arraycopy(src, srcOffset, dst, dstOffset, dstLength);
-        } else {
-            System.arraycopy(src, srcOffset, dst, dstOffset, srcLength);
-            Arrays.fill(dst, dstOffset + srcLength, dstOffset + dstLength, FixedLenDimEnc.ROWKEY_PLACE_HOLDER_BYTE);
-        }
-    }
-
-    @Override
-    public void start(CoprocessorEnvironment env) throws IOException {
-        if (env instanceof RegionCoprocessorEnvironment) {
-            this.env = (RegionCoprocessorEnvironment) env;
-        } else {
-            throw new CoprocessorException("Must be loaded on a table region!");
-        }
-    }
-
-    @Override
-    public void stop(CoprocessorEnvironment env) throws IOException {
-    }
-
-    @Override
-    public Service getService() {
-        return this;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/2cc0b9c4/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/IIResponseAdapter.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/IIResponseAdapter.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/IIResponseAdapter.java
deleted file mode 100644
index e62f41f..0000000
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/IIResponseAdapter.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.storage.hbase.ii.coprocessor.endpoint;
-
-/**
- */
-public class IIResponseAdapter {
-}