You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2016/07/08 07:15:17 UTC
[4/8] kylin git commit: KYLIN-1858 remove all ii related code
http://git-wip-us.apache.org/repos/asf/kylin/blob/2cc0b9c4/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHFileMapper.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHFileMapper.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHFileMapper.java
deleted file mode 100644
index 545d058..0000000
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHFileMapper.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.storage.hbase.ii;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValue.Type;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.kylin.engine.mr.KylinMapper;
-import org.apache.kylin.invertedindex.model.IIDesc;
-
-/**
- * @author yangli9
- */
-public class IICreateHFileMapper extends KylinMapper<ImmutableBytesWritable, ImmutableBytesWritable, ImmutableBytesWritable, KeyValue> {
-
- long timestamp;
-
- @Override
- protected void setup(Context context) throws IOException, InterruptedException {
- super.bindCurrentConfiguration(context.getConfiguration());
-
- timestamp = System.currentTimeMillis();
- }
-
- @Override
- protected void map(ImmutableBytesWritable key, ImmutableBytesWritable value, Context context) throws IOException, InterruptedException {
-
- ByteBuffer buffer = ByteBuffer.wrap(value.get(), value.getOffset(), value.getLength());
- int totalLength = value.getLength();
- int valueLength = buffer.getInt();
- int dictionaryLength = totalLength - valueLength - 4;
- KeyValue kv = new KeyValue(key.get(), key.getOffset(), key.getLength(), //
- IIDesc.HBASE_FAMILY_BYTES, 0, IIDesc.HBASE_FAMILY_BYTES.length, //
- IIDesc.HBASE_QUALIFIER_BYTES, 0, IIDesc.HBASE_QUALIFIER_BYTES.length, //
- timestamp, Type.Put, //
- buffer.array(), buffer.position(), valueLength);
-
- // write value
- context.write(key, kv);
-
- kv = new KeyValue(key.get(), key.getOffset(), key.getLength(), //
- IIDesc.HBASE_FAMILY_BYTES, 0, IIDesc.HBASE_FAMILY_BYTES.length, //
- IIDesc.HBASE_DICTIONARY_BYTES, 0, IIDesc.HBASE_DICTIONARY_BYTES.length, //
- timestamp, Type.Put, //
- buffer.array(), buffer.position() + valueLength, dictionaryLength);
-
- // write dictionary
- context.write(key, kv);
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/2cc0b9c4/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHTableJob.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHTableJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHTableJob.java
deleted file mode 100644
index 8099276..0000000
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHTableJob.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.storage.hbase.ii;
-
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.io.compress.Compression;
-import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
-import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
-import org.apache.hadoop.hbase.security.User;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IIManager;
-import org.apache.kylin.invertedindex.model.IIDesc;
-import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
-import org.apache.kylin.metadata.realization.IRealizationConstants;
-import org.apache.kylin.storage.hbase.util.IIDeployCoprocessorCLI;
-
-/**
- * @author George Song (ysong1)
- */
-public class IICreateHTableJob extends AbstractHadoopJob {
-
- @Override
- public int run(String[] args) throws Exception {
- Options options = new Options();
- HBaseAdmin admin = null;
- try {
- options.addOption(OPTION_II_NAME);
- options.addOption(OPTION_HTABLE_NAME);
- parseOptions(options, args);
-
- String tableName = getOptionValue(OPTION_HTABLE_NAME);
- String iiName = getOptionValue(OPTION_II_NAME);
-
- KylinConfig config = KylinConfig.getInstanceFromEnv();
- IIManager iiManager = IIManager.getInstance(config);
- IIInstance ii = iiManager.getII(iiName);
- int sharding = ii.getDescriptor().getSharding();
-
- Configuration conf = HBaseConfiguration.create(getConf());
- // check if the table already exists
- admin = new HBaseAdmin(conf);
- if (admin.tableExists(tableName)) {
- if (admin.isTableEnabled(tableName)) {
- logger.info("Table " + tableName + " already exists and is enabled, no need to create.");
- return 0;
- } else {
- logger.error("Table " + tableName + " is disabled, couldn't append data");
- return 1;
- }
- }
-
- // table doesn't exist, need to create
-
- HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName));
- HColumnDescriptor cf = new HColumnDescriptor(IIDesc.HBASE_FAMILY);
- cf.setMaxVersions(1);
-
- String hbaseDefaultCC = config.getHbaseDefaultCompressionCodec().toLowerCase();
-
- switch (hbaseDefaultCC) {
- case "snappy": {
- logger.info("hbase will use snappy to compress data");
- cf.setCompressionType(Compression.Algorithm.SNAPPY);
- break;
- }
- case "lzo": {
- logger.info("hbase will use lzo to compress data");
- cf.setCompressionType(Compression.Algorithm.LZO);
- break;
- }
- case "gz":
- case "gzip": {
- logger.info("hbase will use gzip to compress data");
- cf.setCompressionType(Compression.Algorithm.GZ);
- break;
- }
- case "lz4": {
- logger.info("hbase will use lz4 to compress data");
- cf.setCompressionType(Compression.Algorithm.LZ4);
- break;
- }
- default: {
- logger.info("hbase will not user any compression codec to compress data");
- }
- }
-
- cf.setDataBlockEncoding(DataBlockEncoding.FAST_DIFF);
- tableDesc.addFamily(cf);
- tableDesc.setValue(IRealizationConstants.HTableTag, config.getMetadataUrlPrefix());
- tableDesc.setValue(IRealizationConstants.HTableCreationTime, String.valueOf(System.currentTimeMillis()));
- tableDesc.setValue(HTableDescriptor.SPLIT_POLICY, DisabledRegionSplitPolicy.class.getName());
-
- if (User.isHBaseSecurityEnabled(conf)) {
- // add coprocessor for bulk load
- tableDesc.addCoprocessor("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint");
- }
-
- IIDeployCoprocessorCLI.deployCoprocessor(tableDesc);
-
- // create table
- byte[][] splitKeys = getSplits(sharding);
- if (splitKeys.length == 0)
- splitKeys = null;
- admin.createTable(tableDesc, splitKeys);
- if (splitKeys != null) {
- for (int i = 0; i < splitKeys.length; i++) {
- logger.info("split key " + i + ": " + BytesUtil.toHex(splitKeys[i]));
- }
- }
- logger.info("create hbase table " + tableName + " done.");
-
- return 0;
- } catch (Exception e) {
- printUsage(options);
- throw e;
- } finally {
- if (admin != null)
- admin.close();
- }
- }
-
- //one region for one shard
- private byte[][] getSplits(int shard) {
- byte[][] result = new byte[shard - 1][];
- for (int i = 1; i < shard; ++i) {
- byte[] split = new byte[IIKeyValueCodec.SHARD_LEN];
- BytesUtil.writeUnsigned(i, split, 0, IIKeyValueCodec.SHARD_LEN);
- result[i - 1] = split;
- }
- return result;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/2cc0b9c4/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/InvertedIndexStorageQuery.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/InvertedIndexStorageQuery.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/InvertedIndexStorageQuery.java
deleted file mode 100644
index fef9662..0000000
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/InvertedIndexStorageQuery.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.storage.hbase.ii;
-
-import java.util.ArrayList;
-
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IISegment;
-import org.apache.kylin.metadata.realization.SQLDigest;
-import org.apache.kylin.metadata.tuple.ITupleIterator;
-import org.apache.kylin.metadata.tuple.TupleInfo;
-import org.apache.kylin.storage.IStorageQuery;
-import org.apache.kylin.storage.StorageContext;
-import org.apache.kylin.storage.hbase.HBaseConnection;
-import org.apache.kylin.storage.hbase.ii.coprocessor.endpoint.EndpointTupleIterator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author yangli9
- */
-public class InvertedIndexStorageQuery implements IStorageQuery {
-
- private static Logger logger = LoggerFactory.getLogger(InvertedIndexStorageQuery.class);
-
- private IISegment seg;
- private String uuid;
- private EndpointTupleIterator dataIterator;
-
- public InvertedIndexStorageQuery(IIInstance ii) {
- this.seg = ii.getFirstSegment();
- this.uuid = ii.getUuid();
- }
-
- @Override
- public ITupleIterator search(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) {
- String tableName = seg.getStorageLocationIdentifier();
-
- //HConnection is cached, so need not be closed
- HConnection conn = HBaseConnection.get(context.getConnUrl());
- try {
- dataIterator = new EndpointTupleIterator(seg, sqlDigest.filter, sqlDigest.groupbyColumns, new ArrayList<>(sqlDigest.aggregations), context, conn, returnTupleInfo);
- return dataIterator;
- } catch (Throwable e) {
- logger.error("Error when connecting to II htable " + tableName, e);
- throw new IllegalStateException("Error when connecting to II htable " + tableName, e);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/2cc0b9c4/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/BitMapFilterEvaluator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/BitMapFilterEvaluator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/BitMapFilterEvaluator.java
deleted file mode 100644
index 1f024fe..0000000
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/BitMapFilterEvaluator.java
+++ /dev/null
@@ -1,249 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.storage.hbase.ii.coprocessor.endpoint;
-
-import java.util.List;
-
-import org.apache.kylin.common.util.Dictionary;
-import org.apache.kylin.metadata.filter.CompareTupleFilter;
-import org.apache.kylin.metadata.filter.ConstantTupleFilter;
-import org.apache.kylin.metadata.filter.LogicalTupleFilter;
-import org.apache.kylin.metadata.filter.TupleFilter;
-import org.apache.kylin.metadata.model.TblColRef;
-
-import it.uniroma3.mat.extendedset.intset.ConciseSet;
-
-/**
- * @author yangli9
- *
- * Evaluate a group of records against a filter in batch.
- */
-public class BitMapFilterEvaluator {
-
- /** Provides bitmaps for a record group ranging [0..N-1], where N is the size of the group */
- public static interface BitMapProvider {
-
- /** return records whose specified column having specified value */
- ConciseSet getBitMap(TblColRef col, Integer startId, Integer endId);
-
- /** return the size of the group */
- int getRecordCount();
-
- /** return the max value ID of a column according to dictionary */
- int getMaxValueId(TblColRef col);
- }
-
- BitMapProvider provider;
-
- public BitMapFilterEvaluator(BitMapProvider bitMapProvider) {
- this.provider = bitMapProvider;
- }
-
- /**
- * @param filter
- * @return a set of records that match the filter; or null if filter is null or unable to evaluate
- */
- public ConciseSet evaluate(TupleFilter filter) {
- if (filter == null)
- return null;
-
- if (filter instanceof LogicalTupleFilter)
- return evalLogical((LogicalTupleFilter) filter);
-
- if (filter instanceof CompareTupleFilter)
- return evalCompare((CompareTupleFilter) filter);
-
- if (filter instanceof ConstantTupleFilter) {
- if (!filter.evaluate(null, null)) {
- return new ConciseSet();
- }
- }
-
- return null; // unable to evaluate
- }
-
- private ConciseSet evalCompare(CompareTupleFilter filter) {
- switch (filter.getOperator()) {
- case ISNULL:
- return evalCompareIsNull(filter);
- case ISNOTNULL:
- return evalCompareIsNotNull(filter);
- case EQ:
- return evalCompareEqual(filter);
- case NEQ:
- return evalCompareNotEqual(filter);
- case IN:
- return evalCompareIn(filter);
- case NOTIN:
- return evalCompareNotIn(filter);
- case LT:
- return evalCompareLT(filter);
- case LTE:
- return evalCompareLTE(filter);
- case GT:
- return evalCompareGT(filter);
- case GTE:
- return evalCompareGTE(filter);
- default:
- throw new IllegalStateException("Unsupported operator " + filter.getOperator());
- }
- }
-
- private ConciseSet evalCompareLT(CompareTupleFilter filter) {
- int id = Dictionary.stringToDictId((String) filter.getFirstValue());
- return collectRange(filter.getColumn(), null, id - 1);
- }
-
- private ConciseSet evalCompareLTE(CompareTupleFilter filter) {
- int id = Dictionary.stringToDictId((String) filter.getFirstValue());
- return collectRange(filter.getColumn(), null, id);
- }
-
- private ConciseSet evalCompareGT(CompareTupleFilter filter) {
- int id = Dictionary.stringToDictId((String) filter.getFirstValue());
- return collectRange(filter.getColumn(), id + 1, null);
- }
-
- private ConciseSet evalCompareGTE(CompareTupleFilter filter) {
- int id = Dictionary.stringToDictId((String) filter.getFirstValue());
- return collectRange(filter.getColumn(), id, null);
- }
-
- private ConciseSet collectRange(TblColRef column, Integer startId, Integer endId) {
- return provider.getBitMap(column, startId, endId);
- }
-
- private ConciseSet evalCompareEqual(CompareTupleFilter filter) {
- int id = Dictionary.stringToDictId((String) filter.getFirstValue());
- ConciseSet bitMap = provider.getBitMap(filter.getColumn(), id, id);
- if (bitMap == null)
- return null;
- return bitMap.clone(); // NOTE the clone() to void messing provider's cache
- }
-
- private ConciseSet evalCompareNotEqual(CompareTupleFilter filter) {
- ConciseSet set = evalCompareEqual(filter);
- not(set);
- dropNull(set, filter);
- return set;
- }
-
- private ConciseSet evalCompareIn(CompareTupleFilter filter) {
- ConciseSet set = new ConciseSet();
- for (Object value : filter.getValues()) {
- int id = Dictionary.stringToDictId((String) value);
- ConciseSet bitMap = provider.getBitMap(filter.getColumn(), id, id);
- if (bitMap == null)
- return null;
- set.addAll(bitMap);
- }
- return set;
- }
-
- private ConciseSet evalCompareNotIn(CompareTupleFilter filter) {
- ConciseSet set = evalCompareIn(filter);
- not(set);
- dropNull(set, filter);
- return set;
- }
-
- private void dropNull(ConciseSet set, CompareTupleFilter filter) {
- if (set == null)
- return;
-
- ConciseSet nullSet = evalCompareIsNull(filter);
- set.removeAll(nullSet);
- }
-
- private ConciseSet evalCompareIsNull(CompareTupleFilter filter) {
- ConciseSet bitMap = provider.getBitMap(filter.getColumn(), null, null);
- if (bitMap == null)
- return null;
- return bitMap.clone(); // NOTE the clone() to void messing provider's cache
- }
-
- private ConciseSet evalCompareIsNotNull(CompareTupleFilter filter) {
- ConciseSet set = evalCompareIsNull(filter);
- not(set);
- return set;
- }
-
- private ConciseSet evalLogical(LogicalTupleFilter filter) {
- List<? extends TupleFilter> children = filter.getChildren();
-
- switch (filter.getOperator()) {
- case AND:
- return evalLogicalAnd(children);
- case OR:
- return evalLogicalOr(children);
- case NOT:
- return evalLogicalNot(children);
- default:
- throw new IllegalStateException("Unsupported operator " + filter.getOperator());
- }
- }
-
- private ConciseSet evalLogicalAnd(List<? extends TupleFilter> children) {
- ConciseSet set = new ConciseSet();
- not(set);
-
- for (TupleFilter c : children) {
- ConciseSet t = evaluate(c);
- if (t == null)
- continue; // because it's AND
-
- set.retainAll(t);
- }
- return set;
- }
-
- private ConciseSet evalLogicalOr(List<? extends TupleFilter> children) {
- ConciseSet set = new ConciseSet();
-
- for (TupleFilter c : children) {
- ConciseSet t = evaluate(c);
- if (t == null)
- return null; // because it's OR
-
- set.addAll(t);
- }
- return set;
- }
-
- private ConciseSet evalLogicalNot(List<? extends TupleFilter> children) {
- ConciseSet set = evaluate(children.get(0));
- not(set);
- return set;
- }
-
- private void not(ConciseSet set) {
- if (set == null)
- return;
-
- set.add(provider.getRecordCount());
- set.complement();
- }
-
- public static void main(String[] args) {
- ConciseSet s = new ConciseSet();
- s.add(5);
- s.complement();
- System.out.println(s);
- }
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/2cc0b9c4/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/ClearTextDictionary.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/ClearTextDictionary.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/ClearTextDictionary.java
deleted file mode 100644
index 9039165..0000000
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/ClearTextDictionary.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.storage.hbase.ii.coprocessor.endpoint;
-
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.kylin.common.util.Dictionary;
-import org.apache.kylin.dimension.DimensionEncoding;
-import org.apache.kylin.dimension.FixedLenDimEnc;
-import org.apache.kylin.dimension.IDimensionEncodingMap;
-import org.apache.kylin.invertedindex.index.TableRecordInfo;
-import org.apache.kylin.invertedindex.index.TableRecordInfoDigest;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorRowType;
-
-import com.google.common.collect.Maps;
-
-/**
- */
-public class ClearTextDictionary implements IDimensionEncodingMap {
-
- private final Map<TblColRef, DimensionEncoding> encMap;
-
- public ClearTextDictionary(TableRecordInfoDigest digest, CoprocessorRowType coprocessorRowType) {
- encMap = Maps.newHashMap();
- for (Entry<TblColRef, Integer> entry : coprocessorRowType.columnIdxMap.entrySet()) {
- encMap.put(entry.getKey(), new FixedLenDimEnc(digest.length(entry.getValue())));
- }
- }
-
- public ClearTextDictionary(TableRecordInfo tableRecordInfo) {
- encMap = Maps.newHashMap();
- TableRecordInfoDigest digest = tableRecordInfo.getDigest();
- for (int i = 0; i < tableRecordInfo.getColumns().size(); i++) {
- encMap.put(tableRecordInfo.getColumns().get(i), new FixedLenDimEnc(digest.length(i)));
- }
- }
-
- @Override
- public DimensionEncoding get(TblColRef col) {
- return encMap.get(col);
- }
-
- @Override
- public Dictionary<String> getDictionary(TblColRef col) {
- return null;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/2cc0b9c4/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregationCache.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregationCache.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregationCache.java
deleted file mode 100644
index affb284..0000000
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregationCache.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.storage.hbase.ii.coprocessor.endpoint;
-
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.kylin.measure.MeasureAggregator;
-import org.apache.kylin.storage.hbase.common.coprocessor.AggrKey;
-import org.apache.kylin.storage.hbase.common.coprocessor.AggregationCache;
-
-/**
- */
-public class EndpointAggregationCache extends AggregationCache {
-
- private EndpointAggregators aggregators;
-
- public EndpointAggregationCache(EndpointAggregators aggregators) {
- this.aggregators = aggregators;
- }
-
- @Override
- public MeasureAggregator[] createBuffer() {
- return this.aggregators.createBuffer();
- }
-
- public Set<Map.Entry<AggrKey, MeasureAggregator[]>> getAllEntries() {
- return aggBufMap.entrySet();
- }
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/2cc0b9c4/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregators.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregators.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregators.java
deleted file mode 100644
index e481272..0000000
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregators.java
+++ /dev/null
@@ -1,287 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.storage.hbase.ii.coprocessor.endpoint;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.kylin.common.util.BytesSerializer;
-import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.invertedindex.index.RawTableRecord;
-import org.apache.kylin.invertedindex.index.TableRecordInfo;
-import org.apache.kylin.invertedindex.index.TableRecordInfoDigest;
-import org.apache.kylin.invertedindex.measure.FixedLenMeasureCodec;
-import org.apache.kylin.measure.MeasureAggregator;
-import org.apache.kylin.measure.hllc.HLLCMeasureType;
-import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter;
-import org.apache.kylin.metadata.datatype.DataType;
-import org.apache.kylin.metadata.datatype.LongMutable;
-import org.apache.kylin.metadata.model.FunctionDesc;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-/**
- * @author honma
- */
-@SuppressWarnings({ "rawtypes", "unchecked" })
-public class EndpointAggregators {
-
- private enum MetricType {
- Count, DimensionAsMetric, DistinctCount, Normal
- }
-
- private final static class MetricInfo {
- private MetricType type;
- private int refIndex = -1;
- private int precision = -1;
-
- public MetricInfo(MetricType type, int refIndex, int precision) {
- this.type = type;
- this.refIndex = refIndex;
- this.precision = precision;
- }
-
- public MetricInfo(MetricType type, int refIndex) {
- this.type = type;
- this.refIndex = refIndex;
- }
-
- public MetricInfo(MetricType type) {
- this.type = type;
- }
-
- }
-
- private static MetricInfo generateMetricInfo(TableRecordInfo tableInfo, FunctionDesc functionDesc) {
- if (functionDesc.isCount()) {
- return new MetricInfo(MetricType.Count);
- } else if (functionDesc.isDimensionAsMetric()) {
- return new MetricInfo(MetricType.DimensionAsMetric);
- } else {
- int index = tableInfo.findFactTableColumn(functionDesc.getParameter().getValue());
- Preconditions.checkState(index >= 0, "Column " + functionDesc.getParameter().getValue() + " is not found in II");
- if (HLLCMeasureType.isCountDistinct(functionDesc)) {
- return new MetricInfo(MetricType.DistinctCount, index, functionDesc.getReturnDataType().getPrecision());
- } else {
- return new MetricInfo(MetricType.Normal, index);
- }
- }
- }
-
- public static EndpointAggregators fromFunctions(TableRecordInfo tableInfo, List<FunctionDesc> metrics) {
- final int metricSize = metrics.size();
- String[] funcNames = new String[metricSize];
- String[] dataTypes = new String[metricSize];
- MetricInfo[] metricInfos = new MetricInfo[metricSize];
- for (int i = 0; i < metricSize; i++) {
- FunctionDesc functionDesc = metrics.get(i);
-
- //TODO: what if funcionDesc's type is different from tablDesc? cause scale difference
- funcNames[i] = functionDesc.getExpression();
- dataTypes[i] = functionDesc.getReturnType();
- metricInfos[i] = generateMetricInfo(tableInfo, functionDesc);
- }
-
- return new EndpointAggregators(funcNames, dataTypes, metricInfos, tableInfo.getDigest());
- }
-
- final String[] funcNames;
- final String[] dataTypes;
- final MetricInfo[] metricInfos;
-
- final transient TableRecordInfoDigest tableRecordInfoDigest;
- final transient RawTableRecord rawTableRecord;
- final transient ImmutableBytesWritable byteBuffer;
- final transient HyperLogLogPlusCounter[] hllcs;
- final transient FixedLenMeasureCodec[] measureSerializers;
- final transient Object[] metricValues;
-
- final LongMutable ONE = new LongMutable(1);
-
- private EndpointAggregators(String[] funcNames, String[] dataTypes, MetricInfo[] metricInfos, TableRecordInfoDigest tableInfo) {
- this.funcNames = funcNames;
- this.dataTypes = dataTypes;
- this.metricInfos = metricInfos;
- this.tableRecordInfoDigest = tableInfo;
- this.rawTableRecord = tableInfo.createTableRecordBytes();
- this.byteBuffer = new ImmutableBytesWritable();
-
- this.hllcs = new HyperLogLogPlusCounter[this.metricInfos.length];
- this.metricValues = new Object[funcNames.length];
- this.measureSerializers = new FixedLenMeasureCodec[funcNames.length];
- for (int i = 0; i < this.measureSerializers.length; ++i) {
- this.measureSerializers[i] = FixedLenMeasureCodec.get(DataType.getType(dataTypes[i]));
- }
- }
-
- public TableRecordInfoDigest getTableRecordInfoDigest() {
- return tableRecordInfoDigest;
- }
-
- public boolean isEmpty() {
- return !((funcNames != null) && (funcNames.length != 0));
- }
-
- public MeasureAggregator[] createBuffer() {
- MeasureAggregator[] aggrs = new MeasureAggregator[funcNames.length];
- for (int i = 0; i < aggrs.length; i++) {
- if (metricInfos[i].type == MetricType.DistinctCount) {
- aggrs[i] = MeasureAggregator.create(funcNames[i], DataType.getType(dataTypes[i]));
- } else {
- //all other fixed length measures can be aggregated as long
- aggrs[i] = MeasureAggregator.create(funcNames[i], DataType.getType("long"));
- }
- }
- return aggrs;
- }
-
- /**
- * this method is heavily called at coprocessor side,
- * Make sure as little object creation as possible
- */
- public void aggregate(MeasureAggregator[] measureAggrs, byte[] row) {
-
- rawTableRecord.setBytes(row, 0, row.length);
-
- for (int metricIndex = 0; metricIndex < metricInfos.length; ++metricIndex) {
- final MetricInfo metricInfo = metricInfos[metricIndex];
- if (metricInfo.type == MetricType.Count) {
- measureAggrs[metricIndex].aggregate(ONE);
- continue;
- }
-
- if (metricInfo.type == MetricType.DimensionAsMetric) {
- continue;
- }
-
- MeasureAggregator aggregator = measureAggrs[metricIndex];
- FixedLenMeasureCodec measureSerializer = measureSerializers[metricIndex];
-
- //get the raw bytes
- rawTableRecord.getValueBytes(metricInfo.refIndex, byteBuffer);
-
- if (metricInfo.type == MetricType.Normal) {
- aggregator.aggregate(measureSerializer.read(byteBuffer.get(), byteBuffer.getOffset()));
- } else if (metricInfo.type == MetricType.DistinctCount) {
- //TODO: for unified dictionary, this is okay. but if different data blocks uses different dictionary, we'll have to aggregate original data
- HyperLogLogPlusCounter hllc = hllcs[metricIndex];
- if (hllc == null) {
- int precision = metricInfo.precision;
- hllc = new HyperLogLogPlusCounter(precision);
- }
- hllc.clear();
- hllc.add(byteBuffer.get(), byteBuffer.getOffset(), byteBuffer.getLength());
- aggregator.aggregate(hllc);
- }
- }
- }
-
- /**
- * @param aggrs
- * @param buffer byte buffer to get the metric data
- * @return length of metric data
- */
- public int serializeMetricValues(MeasureAggregator[] aggrs, byte[] buffer, int offset) {
- for (int i = 0; i < funcNames.length; i++) {
- metricValues[i] = aggrs[i].getState();
- }
-
- int metricBytesOffset = offset;
- int length = 0;
- for (int i = 0; i < measureSerializers.length; i++) {
- measureSerializers[i].write(metricValues[i], buffer, metricBytesOffset);
- metricBytesOffset += measureSerializers[i].getLength();
- length += measureSerializers[i].getLength();
- }
- return length;
- }
-
- public List<Object> deserializeMetricValues(ByteBuffer buffer) {
- List<Object> ret = Lists.newArrayList();
- for (int i = 0; i < measureSerializers.length; i++) {
- measureSerializers[i].read(buffer);
- Object valueString = measureSerializers[i].getValue();
- ret.add(valueString);
- }
- return ret;
- }
-
- public static byte[] serialize(EndpointAggregators o) {
- ByteBuffer buf = ByteBuffer.allocate(BytesSerializer.SERIALIZE_BUFFER_SIZE);
- serializer.serialize(o, buf);
- byte[] result = new byte[buf.position()];
- System.arraycopy(buf.array(), 0, result, 0, buf.position());
- return result;
- }
-
- public static EndpointAggregators deserialize(byte[] bytes) {
- return serializer.deserialize(ByteBuffer.wrap(bytes));
- }
-
- private static final BytesSerializer<EndpointAggregators> serializer = new BytesSerializer<EndpointAggregators>() {
-
- @Override
- public void serialize(EndpointAggregators value, ByteBuffer out) {
- BytesUtil.writeAsciiStringArray(value.funcNames, out);
- BytesUtil.writeAsciiStringArray(value.dataTypes, out);
-
- BytesUtil.writeVInt(value.metricInfos.length, out);
- for (int i = 0; i < value.metricInfos.length; ++i) {
- MetricInfo metricInfo = value.metricInfos[i];
- BytesUtil.writeAsciiString(metricInfo.type.toString(), out);
- BytesUtil.writeVInt(metricInfo.refIndex, out);
- BytesUtil.writeVInt(metricInfo.precision, out);
- }
-
- BytesUtil.writeByteArray(TableRecordInfoDigest.serialize(value.tableRecordInfoDigest), out);
- }
-
- @Override
- public EndpointAggregators deserialize(ByteBuffer in) {
-
- String[] funcNames = BytesUtil.readAsciiStringArray(in);
- String[] dataTypes = BytesUtil.readAsciiStringArray(in);
-
- int metricInfoLength = BytesUtil.readVInt(in);
- MetricInfo[] infos = new MetricInfo[metricInfoLength];
- for (int i = 0; i < infos.length; ++i) {
- MetricType type = MetricType.valueOf(BytesUtil.readAsciiString(in));
- int refIndex = BytesUtil.readVInt(in);
- int presision = BytesUtil.readVInt(in);
- infos[i] = new MetricInfo(type, refIndex, presision);
- }
-
- byte[] temp = BytesUtil.readByteArray(in);
- TableRecordInfoDigest tableInfo = TableRecordInfoDigest.deserialize(temp);
-
- return new EndpointAggregators(funcNames, dataTypes, infos, tableInfo);
- }
-
- };
-
- public int getMeasureSerializeLength() {
- int length = 0;
- for (int i = 0; i < this.measureSerializers.length; ++i) {
- length += this.measureSerializers[i].getLength();
- }
- return length;
- }
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/2cc0b9c4/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointEnabler.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointEnabler.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointEnabler.java
deleted file mode 100644
index 2ae7f35..0000000
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointEnabler.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.storage.hbase.ii.coprocessor.endpoint;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author honma
- */
-public class EndpointEnabler {
-
- private static final Logger logger = LoggerFactory.getLogger(EndpointEnabler.class);
-
- static final String FORCE_COPROCESSOR = "forceEndpoint";
-
- public static boolean isCoprocessorBeneficial() {
- return Boolean.parseBoolean(getForceCoprocessor());
- }
-
- public static void forceCoprocessorOn() {
- System.setProperty(FORCE_COPROCESSOR, "true");
- }
-
- public static void forceCoprocessorOff() {
- System.setProperty(FORCE_COPROCESSOR, "false");
- }
-
- public static String getForceCoprocessor() {
- return System.getProperty(FORCE_COPROCESSOR);
- }
-
- public static void forceCoprocessorUnset() {
- System.clearProperty(FORCE_COPROCESSOR);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/2cc0b9c4/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointTupleConverter.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointTupleConverter.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointTupleConverter.java
deleted file mode 100644
index 3fdd5b0..0000000
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointTupleConverter.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.storage.hbase.ii.coprocessor.endpoint;
-
-import java.util.List;
-
-import org.apache.kylin.invertedindex.index.TableRecord;
-import org.apache.kylin.metadata.model.FunctionDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.tuple.ITuple;
-import org.apache.kylin.metadata.tuple.Tuple;
-import org.apache.kylin.metadata.tuple.TupleInfo;
-
-public class EndpointTupleConverter {
-
- final TupleInfo tupleInfo;
- final List<TblColRef> columns;
- final int[] columnTupleIdx;
- final int[] aggrTupleIdx;
-
- public EndpointTupleConverter(List<TblColRef> columns, List<FunctionDesc> aggrMeasures, TupleInfo returnTupleInfo) {
- this.tupleInfo = returnTupleInfo;
- this.columns = columns;
- this.columnTupleIdx = new int[columns.size()];
- this.aggrTupleIdx = new int[aggrMeasures.size()];
-
- for (int i = 0; i < columns.size(); i++) {
- TblColRef col = columns.get(i);
- columnTupleIdx[i] = tupleInfo.hasColumn(col) ? tupleInfo.getColumnIndex(col) : -1;
- }
-
- for (int i = 0; i < aggrMeasures.size(); i++) {
- FunctionDesc measure = aggrMeasures.get(i);
- int tupleIdx;
- if (measure.isDimensionAsMetric()) {
- // for dimension playing as metrics, the measure is just a placeholder, the real value comes from columns
- tupleIdx = -1;
- } else if (measure.needRewrite()) {
- // a rewrite metrics is identified by its rewrite field name
- String rewriteFieldName = measure.getRewriteFieldName();
- tupleIdx = tupleInfo.hasField(rewriteFieldName) ? tupleInfo.getFieldIndex(rewriteFieldName) : -1;
- } else {
- // a non-rewrite metrics (i.e. sum) is like a dimension column
- TblColRef col = measure.getParameter().getColRefs().get(0);
- tupleIdx = tupleInfo.hasColumn(col) ? tupleInfo.getColumnIndex(col) : -1;
- }
- aggrTupleIdx[i] = tupleIdx;
- }
- }
-
- public ITuple makeTuple(TableRecord tableRecord, List<Object> measureValues, Tuple tuple) {
- // dimensions and metrics from II table record
- for (int i = 0; i < columnTupleIdx.length; i++) {
- int tupleIdx = columnTupleIdx[i];
- if (tupleIdx >= 0) {
- String value = tableRecord.getValueString(i);
- tuple.setDimensionValue(tupleIdx, value);
- }
- }
-
- // additional aggregations calculated inside end point (like cube measures)
- if (measureValues != null) {
- for (int i = 0; i < aggrTupleIdx.length; ++i) {
- int tupleIdx = aggrTupleIdx[i];
- if (tupleIdx >= 0) {
- Object value = measureValues.get(i);
- if (value instanceof String) {
- String dataType = tuple.getDataTypeName(tupleIdx);
- value = Tuple.convertOptiqCellValue((String) value, dataType);
- }
- tuple.setMeasureValue(tupleIdx, value);
- }
- }
- }
- return tuple;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/2cc0b9c4/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointTupleIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointTupleIterator.java
deleted file mode 100644
index e197e3e..0000000
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointTupleIterator.java
+++ /dev/null
@@ -1,364 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.storage.hbase.ii.coprocessor.endpoint;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import javax.annotation.Nullable;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.SerializationUtils;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.coprocessor.Batch;
-import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
-import org.apache.hadoop.hbase.ipc.ServerRpcController;
-import org.apache.kylin.common.util.CompressionUtils;
-import org.apache.kylin.common.util.DateFormat;
-import org.apache.kylin.common.util.RangeUtil;
-import org.apache.kylin.invertedindex.IISegment;
-import org.apache.kylin.invertedindex.index.TableRecord;
-import org.apache.kylin.invertedindex.index.TableRecordInfo;
-import org.apache.kylin.measure.hllc.HLLCMeasureType;
-import org.apache.kylin.metadata.filter.ConstantTupleFilter;
-import org.apache.kylin.metadata.filter.TupleFilter;
-import org.apache.kylin.metadata.model.FunctionDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.tuple.ITuple;
-import org.apache.kylin.metadata.tuple.ITupleIterator;
-import org.apache.kylin.metadata.tuple.Tuple;
-import org.apache.kylin.metadata.tuple.TupleInfo;
-import org.apache.kylin.storage.StorageContext;
-import org.apache.kylin.storage.cache.TsConditionExtractor;
-import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorFilter;
-import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorProjector;
-import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorRowType;
-import org.apache.kylin.storage.hbase.common.coprocessor.FilterDecorator;
-import org.apache.kylin.storage.hbase.ii.coprocessor.endpoint.generated.IIProtos;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Collections2;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Range;
-import com.google.common.collect.Ranges;
-import com.google.common.collect.Sets;
-import com.google.protobuf.HBaseZeroCopyByteString;
-
-/**
- */
-public class EndpointTupleIterator implements ITupleIterator {
-
- private final static Logger logger = LoggerFactory.getLogger(EndpointTupleIterator.class);
-
- private final IISegment seg;
-
- private final String factTableName;
- private final List<TblColRef> columns;
- private final TupleInfo tupleInfo;
- private final TableRecordInfo tableRecordInfo;
- private final EndpointTupleConverter tupleConverter;
-
- private final CoprocessorRowType pushedDownRowType;
- private final CoprocessorFilter pushedDownFilter;
- private final CoprocessorProjector pushedDownProjector;
- private final EndpointAggregators pushedDownAggregators;
- private final Range<Long> tsRange;//timestamp column condition's interval
-
- private Iterator<List<IIProtos.IIResponseInternal.IIRow>> regionResponsesIterator = null;
- private ITupleIterator tupleIterator = null;
- private HTableInterface table = null;
-
- private TblColRef partitionCol;
- private long lastDataTime = -1;
- private int rowsInAllMetric = 0;
-
- public EndpointTupleIterator(IISegment segment, TupleFilter rootFilter, Collection<TblColRef> groupBy, List<FunctionDesc> measures, StorageContext context, HConnection conn, TupleInfo returnTupleInfo) throws Throwable {
-
- String tableName = segment.getStorageLocationIdentifier();
- table = conn.getTable(tableName);
- factTableName = segment.getIIDesc().getFactTableName();
-
- if (rootFilter == null) {
- rootFilter = ConstantTupleFilter.TRUE;
- }
-
- if (groupBy == null) {
- groupBy = Sets.newHashSet();
- }
-
- if (measures == null) {
- measures = Lists.newArrayList();
- }
-
- //this method will change measures
- rewriteMeasureParameters(measures, segment.getColumns());
-
- this.seg = segment;
- this.columns = segment.getColumns();
-
- this.tupleInfo = returnTupleInfo;
- this.tupleConverter = new EndpointTupleConverter(columns, measures, returnTupleInfo);
- this.tableRecordInfo = new TableRecordInfo(this.seg);
-
- this.pushedDownRowType = CoprocessorRowType.fromTableRecordInfo(tableRecordInfo, this.columns);
- this.pushedDownFilter = CoprocessorFilter.fromFilter(new ClearTextDictionary(this.tableRecordInfo), rootFilter, FilterDecorator.FilterConstantsTreatment.AS_IT_IS);
-
- for (TblColRef column : this.pushedDownFilter.getInevaluableColumns()) {
- groupBy.add(column);
- }
-
- this.pushedDownProjector = CoprocessorProjector.makeForEndpoint(tableRecordInfo, groupBy);
- this.pushedDownAggregators = EndpointAggregators.fromFunctions(tableRecordInfo, measures);
-
- int tsCol = this.tableRecordInfo.getTimestampColumn();
- this.partitionCol = this.columns.get(tsCol);
- this.tsRange = TsConditionExtractor.extractTsCondition(this.partitionCol, rootFilter);
-
- if (this.tsRange == null) {
- logger.info("TsRange conflict for endpoint, return empty directly");
- this.tupleIterator = ITupleIterator.EMPTY_TUPLE_ITERATOR;
- } else {
- logger.info("The tsRange being pushed is " + RangeUtil.formatTsRange(tsRange));
- }
-
- IIProtos.IIRequest endpointRequest = prepareRequest();
- Collection<IIProtos.IIResponse> compressedShardResults = getResults(endpointRequest, table);
-
- //decompress
- Collection<IIProtos.IIResponseInternal> shardResults = new ArrayList<>();
- for (IIProtos.IIResponse input : compressedShardResults) {
- byte[] compressed = HBaseZeroCopyByteString.zeroCopyGetBytes(input.getBlob());
- try {
- byte[] decompressed = CompressionUtils.decompress(compressed);
- shardResults.add(IIProtos.IIResponseInternal.parseFrom(decompressed));
- } catch (Exception e) {
- throw new RuntimeException("decompress endpoint response error");
- }
- }
-
- this.lastDataTime = Collections.min(Collections2.transform(shardResults, new Function<IIProtos.IIResponseInternal, Long>() {
- @Nullable
- @Override
- public Long apply(IIProtos.IIResponseInternal input) {
-
- IIProtos.IIResponseInternal.Stats status = input.getStats();
- logger.info("Endpoints all returned, stats from shard {}: start moment:{}, finish moment: {}, elapsed ms: {}, scanned slices: {}, latest slice time is {}", //
- new Object[] { String.valueOf(status.getMyShard()), //
- DateFormat.formatToTimeStr(status.getServiceStartTime()), //
- DateFormat.formatToTimeStr(status.getServiceEndTime()), //
- String.valueOf(status.getServiceEndTime() - status.getServiceStartTime()), //
- String.valueOf(status.getScannedSlices()), DateFormat.formatToTimeStr(status.getLatestDataTime()) });
-
- return status.getLatestDataTime();
- }
- }));
-
- this.regionResponsesIterator = Collections2.transform(shardResults, new Function<IIProtos.IIResponseInternal, List<IIProtos.IIResponseInternal.IIRow>>() {
- @Nullable
- @Override
- public List<IIProtos.IIResponseInternal.IIRow> apply(@Nullable IIProtos.IIResponseInternal input) {
- return input.getRowsList();
- }
- }).iterator();
-
- if (this.regionResponsesIterator.hasNext()) {
- this.tupleIterator = new SingleRegionTupleIterator(this.regionResponsesIterator.next());
- } else {
- this.tupleIterator = ITupleIterator.EMPTY_TUPLE_ITERATOR;
- }
- }
-
- /**
- * measure comes from query engine, does not contain enough information
- */
- private void rewriteMeasureParameters(List<FunctionDesc> measures, List<TblColRef> columns) {
- for (FunctionDesc functionDesc : measures) {
- if (functionDesc.isCount()) {
- functionDesc.setReturnType("bigint");
- } else {
- boolean updated = false;
- for (TblColRef column : columns) {
- if (column.isSameAs(factTableName, functionDesc.getParameter().getValue())) {
- if (HLLCMeasureType.isCountDistinct(functionDesc)) {
- //TODO: default precision might need be configurable
- String iiDefaultHLLC = "hllc10";
- functionDesc.setReturnType(iiDefaultHLLC);
- } else {
- functionDesc.setReturnType(column.getColumnDesc().getType().toString());
- }
- functionDesc.getParameter().setColRefs(ImmutableList.of(column));
- updated = true;
- break;
- }
- }
- if (!updated) {
- throw new RuntimeException("Func " + functionDesc + " is not related to any column in fact table " + factTableName);
- }
- }
- }
- }
-
- @Override
- public boolean hasNext() {
- while (!this.tupleIterator.hasNext()) {
- if (this.regionResponsesIterator.hasNext()) {
- this.tupleIterator = new SingleRegionTupleIterator(this.regionResponsesIterator.next());
- } else {
- return false;
- }
- }
- return true;
- }
-
- @Override
- public ITuple next() {
- rowsInAllMetric++;
-
- if (!hasNext()) {
- throw new IllegalStateException("No more ITuple in EndpointTupleIterator");
- }
-
- ITuple tuple = this.tupleIterator.next();
- return tuple;
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
-
- }
-
- @Override
- public void close() {
- IOUtils.closeQuietly(table);
- logger.info("Closed after " + rowsInAllMetric + " rows are fetched");
- }
-
- /**
- * tells storage layer cache what time period of data should not be cached.
- * for static storage like cube, it will return null
- * for dynamic storage like ii, it will for example exclude the last two minutes for possible data latency
- * @return
- */
- public Range<Long> getCacheExcludedPeriod() {
- Preconditions.checkArgument(lastDataTime != -1, "lastDataTime is not set yet");
- return Ranges.greaterThan(lastDataTime);
- }
-
- private IIProtos.IIRequest prepareRequest() throws IOException {
- IIProtos.IIRequest.Builder builder = IIProtos.IIRequest.newBuilder();
-
- if (this.tsRange != null) {
- byte[] tsRangeBytes = SerializationUtils.serialize(this.tsRange);
- builder.setTsRange(HBaseZeroCopyByteString.wrap(tsRangeBytes));
- }
-
- builder.setType(HBaseZeroCopyByteString.wrap(CoprocessorRowType.serialize(pushedDownRowType))) //
- .setFilter(HBaseZeroCopyByteString.wrap(CoprocessorFilter.serialize(pushedDownFilter))) //
- .setProjector(HBaseZeroCopyByteString.wrap(CoprocessorProjector.serialize(pushedDownProjector))) //
- .setAggregator(HBaseZeroCopyByteString.wrap(EndpointAggregators.serialize(pushedDownAggregators)));
-
- IIProtos.IIRequest request = builder.build();
-
- return request;
- }
-
- private Collection<IIProtos.IIResponse> getResults(final IIProtos.IIRequest request, HTableInterface table) throws Throwable {
- Map<byte[], IIProtos.IIResponse> results = table.coprocessorService(IIProtos.RowsService.class, null, null, new Batch.Call<IIProtos.RowsService, IIProtos.IIResponse>() {
- public IIProtos.IIResponse call(IIProtos.RowsService rowsService) throws IOException {
- ServerRpcController controller = new ServerRpcController();
- BlockingRpcCallback<IIProtos.IIResponse> rpcCallback = new BlockingRpcCallback<>();
- rowsService.getRows(controller, request, rpcCallback);
- IIProtos.IIResponse response = rpcCallback.get();
- if (controller.failedOnException()) {
- throw controller.getFailedOn();
- }
-
- return response;
- }
- });
-
- return results.values();
- }
-
- /**
- * Internal class to handle iterators for a single region's returned rows
- */
- class SingleRegionTupleIterator implements ITupleIterator {
- private List<IIProtos.IIResponseInternal.IIRow> rows;
- private int index = 0;
-
- //not thread safe!
- private TableRecord tableRecord;
- private List<Object> measureValues;
- private Tuple tuple;
-
- public SingleRegionTupleIterator(List<IIProtos.IIResponseInternal.IIRow> rows) {
- this.rows = rows;
- this.index = 0;
- this.tableRecord = tableRecordInfo.createTableRecord();
- this.tuple = new Tuple(tupleInfo);
- }
-
- @Override
- public boolean hasNext() {
- return index < rows.size();
- }
-
- @Override
- public ITuple next() {
- if (!hasNext()) {
- throw new IllegalStateException("No more Tuple in the SingleRegionTupleIterator");
- }
-
- IIProtos.IIResponseInternal.IIRow currentRow = rows.get(index);
- byte[] columnsBytes = HBaseZeroCopyByteString.zeroCopyGetBytes(currentRow.getColumns());
- this.tableRecord.setBytes(columnsBytes, 0, columnsBytes.length);
- if (currentRow.hasMeasures()) {
- ByteBuffer buffer = currentRow.getMeasures().asReadOnlyByteBuffer();
- this.measureValues = pushedDownAggregators.deserializeMetricValues(buffer);
- }
-
- index++;
-
- return tupleConverter.makeTuple(this.tableRecord, this.measureValues, this.tuple);
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void close() {
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/2cc0b9c4/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/HbaseServerKVIterator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/HbaseServerKVIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/HbaseServerKVIterator.java
deleted file mode 100644
index a1d0e35..0000000
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/HbaseServerKVIterator.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.storage.hbase.ii.coprocessor.endpoint;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.regionserver.RegionScanner;
-import org.apache.kylin.invertedindex.model.IIRow;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-
-/**
- */
-public class HbaseServerKVIterator implements Iterable<IIRow>, Closeable {
-
- private RegionScanner innerScanner;
- private Logger logger = LoggerFactory.getLogger(HbaseServerKVIterator.class);
-
- public HbaseServerKVIterator(RegionScanner innerScanner) {
- this.innerScanner = innerScanner;
- }
-
- @Override
- public void close() throws IOException {
- IOUtils.closeQuietly(this.innerScanner);
- }
-
- private static class IIRowIterator implements Iterator<IIRow> {
-
- private final RegionScanner regionScanner;
- private final IIRow row = new IIRow();
- List<Cell> results = Lists.newArrayList();
-
- private boolean hasMore;
-
- IIRowIterator(RegionScanner innerScanner) {
- this.regionScanner = innerScanner;
- try {
- hasMore = regionScanner.nextRaw(results);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public boolean hasNext() {
- return !results.isEmpty();
- }
-
- @Override
- public IIRow next() {
- if (results.size() < 1) {
- throw new NoSuchElementException();
- }
- for (Cell c : results) {
- row.updateWith(c);
- }
- results.clear();
- try {
- if (hasMore) {
- hasMore = regionScanner.nextRaw(results);
- }
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- return row;
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
- }
-
- @Override
- public Iterator<IIRow> iterator() {
- return new IIRowIterator(innerScanner);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/2cc0b9c4/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/IIEndpoint.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/IIEndpoint.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/IIEndpoint.java
deleted file mode 100644
index ef7de3a..0000000
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/IIEndpoint.java
+++ /dev/null
@@ -1,336 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.storage.hbase.ii.coprocessor.endpoint;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.Map;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang.ArrayUtils;
-import org.apache.commons.lang.SerializationUtils;
-import org.apache.hadoop.hbase.Coprocessor;
-import org.apache.hadoop.hbase.CoprocessorEnvironment;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
-import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.protobuf.ResponseConverter;
-import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.regionserver.RegionScanner;
-import org.apache.kylin.common.util.Array;
-import org.apache.kylin.common.util.BytesSerializer;
-import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.common.util.CompressionUtils;
-import org.apache.kylin.common.util.Dictionary;
-import org.apache.kylin.cube.kv.RowKeyColumnIO;
-import org.apache.kylin.dict.TrieDictionary;
-import org.apache.kylin.dimension.FixedLenDimEnc;
-import org.apache.kylin.invertedindex.index.RawTableRecord;
-import org.apache.kylin.invertedindex.index.Slice;
-import org.apache.kylin.invertedindex.index.TableRecordInfoDigest;
-import org.apache.kylin.invertedindex.model.IIDesc;
-import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
-import org.apache.kylin.measure.MeasureAggregator;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.storage.hbase.common.coprocessor.AggrKey;
-import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorFilter;
-import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorProjector;
-import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorRowType;
-import org.apache.kylin.storage.hbase.common.coprocessor.FilterDecorator;
-import org.apache.kylin.storage.hbase.ii.coprocessor.endpoint.generated.IIProtos;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Range;
-import com.google.protobuf.HBaseZeroCopyByteString;
-import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcController;
-import com.google.protobuf.Service;
-
-import it.uniroma3.mat.extendedset.intset.ConciseSet;
-
-/**
- */
-public class IIEndpoint extends IIProtos.RowsService implements Coprocessor, CoprocessorService {
-
- private static final Logger logger = LoggerFactory.getLogger(IIEndpoint.class);
- private static final int MEMORY_LIMIT = 500 * 1024 * 1024;
-
- private RegionCoprocessorEnvironment env;
- private long serviceStartTime;
- private int shard;
-
- public IIEndpoint() {
- }
-
- private Scan prepareScan(IIProtos.IIRequest request, HRegion region) throws IOException {
- Scan scan = new Scan();
-
- scan.addColumn(IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_QUALIFIER_BYTES);
- scan.addColumn(IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_DICTIONARY_BYTES);
-
- if (request.hasTsRange()) {
- Range<Long> tsRange = (Range<Long>) SerializationUtils.deserialize(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getTsRange()));
- byte[] regionStartKey = region.getStartKey();
- if (!ArrayUtils.isEmpty(regionStartKey)) {
- shard = BytesUtil.readUnsigned(regionStartKey, 0, IIKeyValueCodec.SHARD_LEN);
- } else {
- shard = 0;
- }
- logger.info("Start key of the region is: " + BytesUtil.toReadableText(regionStartKey) + ", making shard to be :" + shard);
-
- if (tsRange.hasLowerBound()) {
- //differentiate GT and GTE seems not very beneficial
- Preconditions.checkArgument(shard != -1, "Shard is -1!");
- long tsStart = tsRange.lowerEndpoint();
- logger.info("ts start is " + tsStart);
-
- byte[] idealStartKey = new byte[IIKeyValueCodec.SHARD_LEN + IIKeyValueCodec.TIMEPART_LEN];
- BytesUtil.writeUnsigned(shard, idealStartKey, 0, IIKeyValueCodec.SHARD_LEN);
- BytesUtil.writeLong(tsStart, idealStartKey, IIKeyValueCodec.SHARD_LEN, IIKeyValueCodec.TIMEPART_LEN);
- logger.info("ideaStartKey is(readable) :" + BytesUtil.toReadableText(idealStartKey));
- Result result = region.getClosestRowBefore(idealStartKey, IIDesc.HBASE_FAMILY_BYTES);
- if (result != null) {
- byte[] actualStartKey = Arrays.copyOf(result.getRow(), IIKeyValueCodec.SHARD_LEN + IIKeyValueCodec.TIMEPART_LEN);
- scan.setStartRow(actualStartKey);
- logger.info("The start key is set to " + BytesUtil.toReadableText(actualStartKey));
- } else {
- logger.info("There is no key before ideaStartKey so ignore tsStart");
- }
- }
-
- if (tsRange.hasUpperBound()) {
- //differentiate LT and LTE seems not very beneficial
- Preconditions.checkArgument(shard != -1, "Shard is -1");
- long tsEnd = tsRange.upperEndpoint();
- logger.info("ts end is " + tsEnd);
-
- byte[] actualEndKey = new byte[IIKeyValueCodec.SHARD_LEN + IIKeyValueCodec.TIMEPART_LEN];
- BytesUtil.writeUnsigned(shard, actualEndKey, 0, IIKeyValueCodec.SHARD_LEN);
- BytesUtil.writeLong(tsEnd + 1, actualEndKey, IIKeyValueCodec.SHARD_LEN, IIKeyValueCodec.TIMEPART_LEN);//notice +1 here
- scan.setStopRow(actualEndKey);
- logger.info("The stop key is set to " + BytesUtil.toReadableText(actualEndKey));
- }
- }
-
- return scan;
- }
-
- @Override
- public void getRows(RpcController controller, IIProtos.IIRequest request, RpcCallback<IIProtos.IIResponse> done) {
-
- this.serviceStartTime = System.currentTimeMillis();
-
- RegionScanner innerScanner = null;
- HRegion region = null;
-
- try {
- region = env.getRegion();
- region.startRegionOperation();
-
- innerScanner = region.getScanner(prepareScan(request, region));
-
- CoprocessorRowType type = CoprocessorRowType.deserialize(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getType()));
- CoprocessorProjector projector = CoprocessorProjector.deserialize(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getProjector()));
- EndpointAggregators aggregators = EndpointAggregators.deserialize(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getAggregator()));
- CoprocessorFilter filter = CoprocessorFilter.deserialize(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getFilter()));
-
- //compression
- IIProtos.IIResponseInternal response = getResponse(innerScanner, type, projector, aggregators, filter);
- byte[] compressed = CompressionUtils.compress(response.toByteArray());
- IIProtos.IIResponse compressedR = IIProtos.IIResponse.newBuilder().setBlob(HBaseZeroCopyByteString.wrap(compressed)).build();
-
- done.run(compressedR);
- } catch (IOException ioe) {
- logger.error(ioe.toString());
- ResponseConverter.setControllerException(controller, ioe);
- } finally {
- IOUtils.closeQuietly(innerScanner);
- if (region != null) {
- try {
- region.closeRegionOperation();
- } catch (IOException e) {
- e.printStackTrace();
- throw new RuntimeException(e);
- }
- }
- }
- }
-
- public IIProtos.IIResponseInternal getResponse(RegionScanner innerScanner, CoprocessorRowType type, CoprocessorProjector projector, EndpointAggregators aggregators, CoprocessorFilter filter) {
-
- TableRecordInfoDigest tableRecordInfoDigest = aggregators.getTableRecordInfoDigest();
-
- IIProtos.IIResponseInternal response;
-
- synchronized (innerScanner) {
- IIKeyValueCodec codec = new IIKeyValueCodec(tableRecordInfoDigest);
- //TODO pass projector to codec to skip loading columns
- Iterable<Slice> slices = codec.decodeKeyValue(new HbaseServerKVIterator(innerScanner));
-
- response = getResponseInternal(slices, tableRecordInfoDigest, filter, type, projector, aggregators);
- }
- return response;
- }
-
- private IIProtos.IIResponseInternal getResponseInternal(Iterable<Slice> slices, TableRecordInfoDigest recordInfo, CoprocessorFilter filter, CoprocessorRowType type, CoprocessorProjector projector, EndpointAggregators aggregators) {
- boolean needAgg = projector.hasGroupby() || !aggregators.isEmpty();
-
- //for needAgg use
- EndpointAggregationCache aggCache = new EndpointAggregationCache(aggregators);
- //for no needAgg use
- final int byteFormLen = recordInfo.getByteFormLen();
- int totalByteFormLen = 0;
-
- IIProtos.IIResponseInternal.Builder responseBuilder = IIProtos.IIResponseInternal.newBuilder();
- ClearTextDictionary clearTextDictionary = new ClearTextDictionary(recordInfo, type);
- RowKeyColumnIO rowKeyColumnIO = new RowKeyColumnIO(clearTextDictionary);
-
- byte[] recordBuffer = new byte[recordInfo.getByteFormLen()];
- byte[] buffer = new byte[BytesSerializer.SERIALIZE_BUFFER_SIZE];
-
- int iteratedSliceCount = 0;
- long latestSliceTs = Long.MIN_VALUE;
- for (Slice slice : slices) {
- latestSliceTs = slice.getTimestamp();
- iteratedSliceCount++;
-
- //dictionaries for fact table columns can not be determined while streaming.
- //a piece of dict coincide with each Slice, we call it "local dict"
- final Dictionary<?>[] localDictionaries = slice.getLocalDictionaries();
- CoprocessorFilter newFilter;
- final boolean emptyDictionary = Array.isEmpty(localDictionaries);
- if (emptyDictionary) {
- newFilter = filter;
- } else {
- for (Dictionary<?> localDictionary : localDictionaries) {
- if (localDictionary instanceof TrieDictionary) {
- ((TrieDictionary) localDictionary).enableIdToValueBytesCache();
- }
- }
- newFilter = CoprocessorFilter.fromFilter(new LocalDictionary(localDictionaries, type, slice.getInfo()), filter.getFilter(), FilterDecorator.FilterConstantsTreatment.REPLACE_WITH_LOCAL_DICT);
- }
-
- ConciseSet result = null;
- if (filter != null) {
- result = new BitMapFilterEvaluator(new SliceBitMapProvider(slice, type)).evaluate(newFilter.getFilter());
- }
-
- Iterator<RawTableRecord> iterator = slice.iterateWithBitmap(result);
-
- TblColRef[] columns = type.columns;
- int[] finalColumnLength = new int[columns.length];
- for (int i = 0; i < columns.length; ++i) {
- finalColumnLength[i] = rowKeyColumnIO.getColumnLength(columns[i]);
- }
-
- while (iterator.hasNext()) {
- final RawTableRecord rawTableRecord = iterator.next();
- decodeWithDictionary(recordBuffer, rawTableRecord, localDictionaries, recordInfo, rowKeyColumnIO, finalColumnLength);
-
- if (needAgg) {
- //if has group by, group them first, and extract entries later
- AggrKey aggKey = projector.getAggrKey(recordBuffer);
- MeasureAggregator[] bufs = aggCache.getBuffer(aggKey);
- aggregators.aggregate(bufs, recordBuffer);
- aggCache.checkMemoryUsage();
- } else {
- //otherwise directly extract entry and put into response
- if (totalByteFormLen >= MEMORY_LIMIT) {
- throw new RuntimeException("the query has exceeded the memory limit, please check the query");
- }
- IIProtos.IIResponseInternal.IIRow.Builder rowBuilder = IIProtos.IIResponseInternal.IIRow.newBuilder().setColumns(HBaseZeroCopyByteString.wrap(recordBuffer));
- responseBuilder.addRows(rowBuilder.build());
- totalByteFormLen += byteFormLen;
- }
- }
- }
-
- logger.info("Iterated Slices count: " + iteratedSliceCount);
-
- if (needAgg) {
- int offset = 0;
- int measureLength = aggregators.getMeasureSerializeLength();
- for (Map.Entry<AggrKey, MeasureAggregator[]> entry : aggCache.getAllEntries()) {
- AggrKey aggrKey = entry.getKey();
- IIProtos.IIResponseInternal.IIRow.Builder rowBuilder = IIProtos.IIResponseInternal.IIRow.newBuilder().setColumns(HBaseZeroCopyByteString.wrap(aggrKey.get(), aggrKey.offset(), aggrKey.length()));
- if (offset + measureLength > buffer.length) {
- buffer = new byte[BytesSerializer.SERIALIZE_BUFFER_SIZE];
- offset = 0;
- }
- int length = aggregators.serializeMetricValues(entry.getValue(), buffer, offset);
- rowBuilder.setMeasures(HBaseZeroCopyByteString.wrap(buffer, offset, length));
- offset += length;
- responseBuilder.addRows(rowBuilder.build());
- }
- }
-
- responseBuilder.setStats(IIProtos.IIResponseInternal.Stats.newBuilder().setLatestDataTime(latestSliceTs).setServiceStartTime(this.serviceStartTime).setServiceEndTime(System.currentTimeMillis()).setScannedSlices(iteratedSliceCount));
- return responseBuilder.build();
- }
-
- private void decodeWithDictionary(byte[] recordBuffer, RawTableRecord encodedRecord, Dictionary<?>[] localDictionaries, TableRecordInfoDigest digest, RowKeyColumnIO rowKeyColumnIO, int[] finalColumnLengths) {
- final boolean[] isMetric = digest.isMetrics();
- final boolean emptyDictionary = Array.isEmpty(localDictionaries);
- for (int i = 0; i < finalColumnLengths.length; i++) {
- if (isMetric[i]) {
- writeColumnWithoutDictionary(encodedRecord.getBytes(), encodedRecord.offset(i), encodedRecord.length(i), recordBuffer, digest.offset(i), finalColumnLengths[i]);
- } else {
- if (emptyDictionary) {
- writeColumnWithoutDictionary(encodedRecord.getBytes(), encodedRecord.offset(i), encodedRecord.length(i), recordBuffer, digest.offset(i), finalColumnLengths[i]);
- } else {
- final Dictionary<?> localDictionary = localDictionaries[i];
- final byte[] valueBytesFromId = localDictionary.getValueBytesFromId(encodedRecord.getValueID(i));
- writeColumnWithoutDictionary(valueBytesFromId, 0, valueBytesFromId.length, recordBuffer, digest.offset(i), finalColumnLengths[i]);
- }
- }
- }
- }
-
- private void writeColumnWithoutDictionary(byte[] src, int srcOffset, int srcLength, byte[] dst, int dstOffset, int dstLength) {
- if (srcLength >= dstLength) {
- System.arraycopy(src, srcOffset, dst, dstOffset, dstLength);
- } else {
- System.arraycopy(src, srcOffset, dst, dstOffset, srcLength);
- Arrays.fill(dst, dstOffset + srcLength, dstOffset + dstLength, FixedLenDimEnc.ROWKEY_PLACE_HOLDER_BYTE);
- }
- }
-
- @Override
- public void start(CoprocessorEnvironment env) throws IOException {
- if (env instanceof RegionCoprocessorEnvironment) {
- this.env = (RegionCoprocessorEnvironment) env;
- } else {
- throw new CoprocessorException("Must be loaded on a table region!");
- }
- }
-
- @Override
- public void stop(CoprocessorEnvironment env) throws IOException {
- }
-
- @Override
- public Service getService() {
- return this;
- }
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/2cc0b9c4/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/IIResponseAdapter.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/IIResponseAdapter.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/IIResponseAdapter.java
deleted file mode 100644
index e62f41f..0000000
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/IIResponseAdapter.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.storage.hbase.ii.coprocessor.endpoint;
-
-/**
- */
-public class IIResponseAdapter {
-}