You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2015/10/25 14:12:17 UTC
[4/7] incubator-kylin git commit: KYLIN-942 support parallel scan for
grid table
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/query/src/test/resources/query/sql/query85.sql
----------------------------------------------------------------------
diff --git a/query/src/test/resources/query/sql/query85.sql b/query/src/test/resources/query/sql/query85.sql
new file mode 100644
index 0000000..1a51a02
--- /dev/null
+++ b/query/src/test/resources/query/sql/query85.sql
@@ -0,0 +1,26 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements. See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership. The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+select sum(price) as GMV, count(*) as TRANS_CNT FROM test_kylin_fact
+ inner JOIN edw.test_cal_dt as test_cal_dt
+ ON test_kylin_fact.cal_dt = test_cal_dt.cal_dt
+ inner JOIN test_category_groupings
+ ON test_kylin_fact.leaf_categ_id = test_category_groupings.leaf_categ_id
+ AND test_kylin_fact.lstg_site_id = test_category_groupings.site_id
+where test_kylin_fact.cal_dt < DATE '2012-05-01' or test_kylin_fact.cal_dt > DATE '2013-05-01'
+
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/query/src/test/resources/query/sql/query86.sql
----------------------------------------------------------------------
diff --git a/query/src/test/resources/query/sql/query86.sql b/query/src/test/resources/query/sql/query86.sql
new file mode 100644
index 0000000..f6feaaf
--- /dev/null
+++ b/query/src/test/resources/query/sql/query86.sql
@@ -0,0 +1,24 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements. See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership. The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+select test_kylin_fact.cal_dt, count(*) as mmm from test_kylin_fact inner JOIN edw.test_cal_dt as test_cal_dt
+ ON test_kylin_fact.cal_dt = test_cal_dt.cal_dt
+ inner JOIN test_category_groupings
+ ON test_kylin_fact.leaf_categ_id = test_category_groupings.leaf_categ_id AND test_kylin_fact.lstg_site_id = test_category_groupings.site_id
+ inner JOIN edw.test_sites as test_sites
+ ON test_kylin_fact.lstg_site_id = test_sites.site_id where lstg_format_name = 'Others' group by test_kylin_fact.cal_dt order by test_kylin_fact.cal_dt
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/server/src/main/resources/kylin-server-log4j.properties
----------------------------------------------------------------------
diff --git a/server/src/main/resources/kylin-server-log4j.properties b/server/src/main/resources/kylin-server-log4j.properties
index 6ccd161..a93627a 100644
--- a/server/src/main/resources/kylin-server-log4j.properties
+++ b/server/src/main/resources/kylin-server-log4j.properties
@@ -43,7 +43,7 @@ log4j.logger.org.springframework=WARN
log4j.logger.org.apache.kylin.rest.controller.QueryController=DEBUG, query
log4j.logger.org.apache.kylin.rest.service.QueryService=DEBUG, query
log4j.logger.org.apache.kylin.query=DEBUG, query
-log4j.logger.org.apache.kylin.storage=DEBUG, query
+#log4j.logger.org.apache.kylin.storage=DEBUG, query //too many stuff in storage package now
#job config
log4j.logger.org.apache.kylin.rest.controller.JobController=DEBUG, job
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java
index 421f648..53465d8 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java
@@ -35,10 +35,7 @@ import org.apache.kylin.storage.IStorageQuery;
import org.apache.kylin.storage.cache.CacheFledgedDynamicQuery;
import org.apache.kylin.storage.cache.CacheFledgedStaticQuery;
import org.apache.kylin.storage.hbase.steps.HBaseMROutput;
-import org.apache.kylin.storage.hbase.steps.HBaseMROutput2;
import org.apache.kylin.storage.hbase.steps.HBaseMROutput2Transition;
-import org.apache.kylin.storage.hybrid.HybridInstance;
-import org.apache.kylin.storage.hybrid.HybridStorageQuery;
import com.google.common.base.Preconditions;
@@ -46,7 +43,7 @@ import com.google.common.base.Preconditions;
public class HBaseStorage implements IStorage {
private final static boolean allowStorageLayerCache = true;
- private final static String defaultCubeStorageQuery = "org.apache.kylin.storage.hbase.cube.v1.CubeStorageQuery";
+ private final static String defaultCubeStorageQuery = "org.apache.kylin.storage.hbase.cube.v2.CubeStorageQuery";
private final static String defaultIIStorageQuery = "org.apache.kylin.storage.hbase.ii.InvertedIndexStorageQuery";
@Override
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorProjector.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorProjector.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorProjector.java
index 09295b0..9b839c3 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorProjector.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorProjector.java
@@ -41,7 +41,7 @@ public class CoprocessorProjector {
RowKeyEncoder rowKeyMaskEncoder = new RowKeyEncoder(cubeSegment, cuboid) {
@Override
- protected int fillHeader(byte[] bytes, byte[][] values) {
+ protected int fillHeader(byte[] bytes) {
Arrays.fill(bytes, 0, this.headerLength, (byte) 0xff);
return this.headerLength;
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorRowType.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorRowType.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorRowType.java
index 4b7c4dc..7ec97c0 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorRowType.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorRowType.java
@@ -131,7 +131,7 @@ public class CoprocessorRowType {
private void init() {
int[] offsets = new int[columns.length];
- int o = RowConstants.ROWKEY_CUBOIDID_LEN;
+ int o = RowConstants.ROWKEY_HEADER_LEN;
for (int i = 0; i < columns.length; i++) {
offsets[i] = o;
o += columnSizes[i];
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java
index 9b2cf66..034ffac 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java
@@ -186,7 +186,7 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
info.append(keyRange.getCuboid().getId());
info.append("\nStart: ");
info.append(keyRange.getStartKeyAsString());
- info.append(" - ");
+ info.append(" - ");
info.append(Bytes.toStringBinary(keyRange.getStartKey()));
info.append("\nStop: ");
info.append(keyRange.getStopKeyAsString());
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
index ed3a518..5c2117d 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
@@ -18,17 +18,32 @@
package org.apache.kylin.storage.hbase.cube.v1;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
import com.google.common.collect.Range;
import com.google.common.collect.Sets;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.kv.RowConstants;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
import org.apache.kylin.cube.model.HBaseColumnDesc;
@@ -58,9 +73,8 @@ import org.apache.kylin.storage.tuple.TupleInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.*;
+import com.google.common.collect.Lists;
-//v1
@SuppressWarnings("unused")
public class CubeStorageQuery implements ICachableStorageQuery {
@@ -86,7 +100,7 @@ public class CubeStorageQuery implements ICachableStorageQuery {
}
}
}
-
+
@Override
public ITupleIterator search(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) {
@@ -135,11 +149,8 @@ public class CubeStorageQuery implements ICachableStorageQuery {
collectNonEvaluable(filter, groupsCopD);
TupleFilter filterD = translateDerived(filter, groupsCopD);
- // flatten to OR-AND filter, (A AND B AND ..) OR (C AND D AND ..) OR ..
- TupleFilter flatFilter = flattenToOrAndFilter(filterD);
-
// translate filter into segment scan ranges
- List<HBaseKeyRange> scans = buildScanRanges(flatFilter, dimensionsD);
+ List<HBaseKeyRange> scans = buildScanRanges(flattenToOrAndFilter(filterD), dimensionsD);
// check involved measures, build value decoder for each each family:column
List<RowValueDecoder> valueDecoders = translateAggregation(cubeDesc.getHBaseMapping(), metrics, context);
@@ -150,7 +161,9 @@ public class CubeStorageQuery implements ICachableStorageQuery {
setLimit(filter, context);
HConnection conn = HBaseConnection.get(context.getConnUrl());
+
return new SerializedHBaseTupleIterator(conn, scans, cubeInstance, dimensionsD, filterD, groupsCopD, topNCol, valueDecoders, context, returnTupleInfo);
+ //Notice we're passing filterD down to storage instead of flatFilter
}
@Override
@@ -400,10 +413,12 @@ public class CubeStorageQuery implements ICachableStorageQuery {
return new ArrayList<RowValueDecoder>(codecMap.values());
}
+ //check TupleFilter.flatFilter's comment
private TupleFilter flattenToOrAndFilter(TupleFilter filter) {
if (filter == null)
return null;
+ // core
TupleFilter flatFilter = filter.flatFilter();
// normalize to OR-AND filter
@@ -445,27 +460,30 @@ public class CubeStorageQuery implements ICachableStorageQuery {
}
//log
- sb.append(scanRanges.size() + "=>");
+ sb.append(scanRanges.size() + "=(mergeoverlap)>");
List<HBaseKeyRange> mergedRanges = mergeOverlapRanges(scanRanges);
//log
- sb.append(mergedRanges.size() + "=>");
+ sb.append(mergedRanges.size() + "=(mergetoomany)>");
mergedRanges = mergeTooManyRanges(mergedRanges);
//log
- sb.append(mergedRanges.size() + ", ");
+ sb.append(mergedRanges.size() + ",");
result.addAll(mergedRanges);
}
-
logger.info(sb.toString());
logger.info("hbasekeyrange count: " + result.size());
+
dropUnhitSegments(result);
logger.info("hbasekeyrange count after dropping unhit :" + result.size());
+ result = duplicateRangeByShard(result);
+ logger.info("hbasekeyrange count after dropping duplicatebyshard :" + result.size());
+
return result;
}
@@ -675,6 +693,42 @@ public class CubeStorageQuery implements ICachableStorageQuery {
}
}
+ private List<HBaseKeyRange> duplicateRangeByShard(List<HBaseKeyRange> scans) {
+ List<HBaseKeyRange> ret = Lists.newArrayList();
+
+ for (HBaseKeyRange scan : scans) {
+ CubeSegment segment = scan.getCubeSegment();
+
+ byte[] startKey = scan.getStartKey();
+ byte[] stopKey = scan.getStopKey();
+
+ short cuboidShardNum = segment.getCuboidShardNum(scan.getCuboid().getId());
+ short cuboidShardBase = segment.getCuboidBaseShard(scan.getCuboid().getId());
+ for (short i = 0; i < cuboidShardNum; ++i) {
+ byte[] newStartKey = duplicateKeyAndChangeShard(i, startKey);
+ byte[] newStopKey = duplicateKeyAndChangeShard(i, stopKey);
+ HBaseKeyRange newRange = new HBaseKeyRange(segment, scan.getCuboid(), newStartKey, newStopKey, //
+ scan.getFuzzyKeys(), scan.getFlatOrAndFilter(), scan.getPartitionColumnStartDate(), scan.getPartitionColumnEndDate());
+ ret.add(newRange);
+ }
+ }
+
+ Collections.sort(ret, new Comparator<HBaseKeyRange>() {
+ @Override
+ public int compare(HBaseKeyRange o1, HBaseKeyRange o2) {
+ return Bytes.compareTo(o1.getStartKey(), o2.getStartKey());
+ }
+ });
+
+ return ret;
+ }
+
+ private byte[] duplicateKeyAndChangeShard(short newShard, byte[] bytes) {
+ byte[] ret = Arrays.copyOf(bytes, bytes.length);
+ BytesUtil.writeShort(newShard, ret, 0, RowConstants.ROWKEY_SHARDID_LEN);
+ return ret;
+ }
+
private void setThreshold(Collection<TblColRef> dimensions, List<RowValueDecoder> valueDecoders, StorageContext context) {
if (RowValueDecoder.hasMemHungryCountDistinct(valueDecoders) == false) {
return;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index 7f28baf..86bc42d 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -21,9 +21,13 @@ package org.apache.kylin.storage.hbase.cube.v2;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collection;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import java.util.zip.DataFormatException;
import javax.annotation.Nullable;
@@ -35,7 +39,6 @@ import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.kylin.common.util.CompressionUtils;
import org.apache.kylin.common.util.ImmutableBitSet;
-import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.cube.util.KryoUtils;
@@ -43,24 +46,29 @@ import org.apache.kylin.gridtable.GTInfo;
import org.apache.kylin.gridtable.GTRecord;
import org.apache.kylin.gridtable.GTScanRequest;
import org.apache.kylin.gridtable.IGTScanner;
-import org.apache.kylin.storage.hbase.HBaseConnection;
+import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos;
+import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitRequest.IntList;
+import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitResponse.Stats;
+import org.apache.kylin.storage.hbase.steps.HBaseConnection;
import com.google.common.base.Function;
import com.google.common.collect.Collections2;
import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
-
-import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos;
+import com.google.protobuf.HBaseZeroCopyByteString;
public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
- static class EndpintResultsAsGTScanner implements IGTScanner {
+ static class EndpointResultsAsGTScanner implements IGTScanner {
private GTInfo info;
private Iterator<byte[]> blocks;
+ private ImmutableBitSet columns;
- public EndpintResultsAsGTScanner(GTInfo info, Iterator<byte[]> blocks) {
+ public EndpointResultsAsGTScanner(GTInfo info, Iterator<byte[]> blocks, ImmutableBitSet columns) {
this.info = info;
this.blocks = blocks;
+ this.columns = columns;
}
@Override
@@ -85,7 +93,6 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
@Override
public Iterator<GTRecord> apply(@Nullable final byte[] input) {
- logger.info("Reassembling a raw block returned from Endpoint with byte length: " + input.length);
return new Iterator<GTRecord>() {
private ByteBuffer inputBuffer = null;
private GTRecord oneRecord = null;
@@ -102,7 +109,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
@Override
public GTRecord next() {
- oneRecord.loadAllColumns(inputBuffer);
+ oneRecord.loadColumns(columns, inputBuffer);
return oneRecord;
}
@@ -123,43 +130,98 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
@Override
public IGTScanner getGTScanner(final GTScanRequest scanRequest) throws IOException {
- try {
- // primary key (also the 0th column block) is always selected
- final ImmutableBitSet selectedColBlocks = scanRequest.getSelectedColBlocks().set(0);
- // globally shared connection, does not require close
- HConnection hbaseConn = HBaseConnection.get(cubeSeg.getCubeInstance().getConfig().getStorageUrl());
- final HTableInterface hbaseTable = hbaseConn.getTable(cubeSeg.getStorageLocationIdentifier());
- final List<Pair<byte[], byte[]>> hbaseColumns = makeHBaseColumns(selectedColBlocks);
-
- RawScan rawScan = prepareRawScan(scanRequest.getPkStart(), scanRequest.getPkEnd(), hbaseColumns);
-
- byte[] scanRequestBytes = KryoUtils.serialize(scanRequest);
- byte[] rawScanBytes = KryoUtils.serialize(rawScan);
- CubeVisitProtos.CubeVisitRequest.Builder builder = CubeVisitProtos.CubeVisitRequest.newBuilder();
- builder.setGtScanRequest(ByteString.copyFrom(scanRequestBytes)).setHbaseRawScan(ByteString.copyFrom(rawScanBytes));
-
- Collection<CubeVisitProtos.CubeVisitResponse> results = getResults(builder.build(), hbaseTable, rawScan.startKey, rawScan.endKey);
- final Collection<byte[]> rowBlocks = Collections2.transform(results, new Function<CubeVisitProtos.CubeVisitResponse, byte[]>() {
- @Nullable
+ // primary key (also the 0th column block) is always selected
+ final ImmutableBitSet selectedColBlocks = scanRequest.getSelectedColBlocks().set(0);
+ // globally shared connection, does not require close
+ HConnection hbaseConn = HBaseConnection.get(cubeSeg.getCubeInstance().getConfig().getStorageUrl());
+ final HTableInterface hbaseTable = hbaseConn.getTable(cubeSeg.getStorageLocationIdentifier());
+
+ List<RawScan> rawScans = preparedHBaseScan(scanRequest.getPkStart(), scanRequest.getPkEnd(), scanRequest.getFuzzyKeys(), selectedColBlocks);
+ List<List<Integer>> hbaseColumnsToGT = getHBaseColumnsGTMapping(selectedColBlocks);
+ final List<IntList> hbaseColumnsToGTIntList = Lists.newArrayList();
+ for (List<Integer> list : hbaseColumnsToGT) {
+ hbaseColumnsToGTIntList.add(IntList.newBuilder().addAllInts(list).build());
+ }
+
+ byte[] scanRequestBytes = KryoUtils.serialize(scanRequest);
+ final ByteString scanRequestBytesString = HBaseZeroCopyByteString.wrap(scanRequestBytes);
+
+ ExecutorService executorService = Executors.newFixedThreadPool(rawScans.size());
+ final List<byte[]> rowBlocks = Collections.synchronizedList(Lists.<byte[]> newArrayList());
+
+ logger.info("Total RawScan range count: " + rawScans.size());
+ for (RawScan rawScan : rawScans) {
+ logScan(rawScan, cubeSeg.getStorageLocationIdentifier());
+ }
+
+ for (int i = 0; i < rawScans.size(); ++i) {
+ final int shardIndex = i;
+ final RawScan rawScan = rawScans.get(i);
+
+ executorService.submit(new Runnable() {
@Override
- public byte[] apply(CubeVisitProtos.CubeVisitResponse input) {
+ public void run() {
+ final byte[] rawScanBytes = KryoUtils.serialize(rawScan);
+ CubeVisitProtos.CubeVisitRequest.Builder builder = CubeVisitProtos.CubeVisitRequest.newBuilder();
+ builder.setGtScanRequest(scanRequestBytesString).setHbaseRawScan(HBaseZeroCopyByteString.wrap(rawScanBytes));
+ for (IntList intList : hbaseColumnsToGTIntList) {
+ builder.addHbaseColumnsToGT(intList);
+ }
+
+ Collection<CubeVisitProtos.CubeVisitResponse> results;
try {
- return CompressionUtils.decompress(input.getCompressedRows().toByteArray());
- } catch (IOException | DataFormatException e) {
- throw new RuntimeException(e);
+ results = getResults(builder.build(), hbaseTable, rawScan.startKey, rawScan.endKey);
+ } catch (Throwable throwable) {
+ throw new RuntimeException("Error when visiting cubes by endpoint:", throwable);
+ }
+
+ //results.size() supposed to be 1;
+ if (results.size() != 1) {
+ logger.warn("{} CubeVisitResponse returned for shard {}", results.size(), shardIndex);
+ }
+
+ for (CubeVisitProtos.CubeVisitResponse result : results) {
+ logger.info(getStatsString(result, shardIndex));
}
+
+ Collection<byte[]> part = Collections2.transform(results, new Function<CubeVisitProtos.CubeVisitResponse, byte[]>() {
+ @Nullable
+ @Override
+ public byte[] apply(CubeVisitProtos.CubeVisitResponse input) {
+ try {
+ return CompressionUtils.decompress(HBaseZeroCopyByteString.zeroCopyGetBytes(input.getCompressedRows()));
+ } catch (IOException | DataFormatException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ rowBlocks.addAll(part);
}
});
+ }
+ executorService.shutdown();
+ try {
+ if (!executorService.awaitTermination(1, TimeUnit.HOURS)) {
+ throw new RuntimeException("Visiting cube by endpoint timeout");
+ }
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Visiting cube by endpoint gets interrupted");
+ }
- return new EndpintResultsAsGTScanner(fullGTInfo, rowBlocks.iterator());
+ return new EndpointResultsAsGTScanner(fullGTInfo, rowBlocks.iterator(), scanRequest.getColumns());
+ }
+
+ private String getStatsString(CubeVisitProtos.CubeVisitResponse result, int shardIndex) {
+ StringBuilder sb = new StringBuilder();
+ Stats stats = result.getStats();
+ sb.append("Shard " + shardIndex + ": ");
+ sb.append("Total scanned row: " + stats.getScannedRowCount() + ". ");
+ sb.append("Total filtered/aggred row: " + stats.getAggregatedRowCount() + ". ");
+ sb.append("Time elapsed in EP: " + (stats.getServiceEndTime() - stats.getServiceStartTime()) + "(ms). ");
+ return sb.toString();
- } catch (Throwable throwable) {
- throwable.printStackTrace();
- }
- return null;
}
- //TODO : async callback
private Collection<CubeVisitProtos.CubeVisitResponse> getResults(final CubeVisitProtos.CubeVisitRequest request, HTableInterface table, byte[] startKey, byte[] endKey) throws Throwable {
Map<byte[], CubeVisitProtos.CubeVisitResponse> results = table.coprocessorService(CubeVisitProtos.CubeVisitService.class, startKey, endKey, new Batch.Call<CubeVisitProtos.CubeVisitService, CubeVisitProtos.CubeVisitResponse>() {
public CubeVisitProtos.CubeVisitResponse call(CubeVisitProtos.CubeVisitService rowsService) throws IOException {
@@ -174,8 +236,6 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
}
});
- logger.info("{} regions returned results ", results.values().size());
-
return results.values();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
index 09bef0f..1d217ac 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
@@ -2,6 +2,7 @@ package org.apache.kylin.storage.hbase.cube.v2;
import java.io.IOException;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import org.apache.hadoop.hbase.Cell;
@@ -14,6 +15,7 @@ import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.common.util.ImmutableBitSet;
import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.common.util.ShardingHash;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.cube.kv.RowConstants;
@@ -27,6 +29,7 @@ import org.apache.kylin.gridtable.IGTScanner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
public abstract class CubeHBaseRPC {
@@ -69,59 +72,142 @@ public abstract class CubeHBaseRPC {
return scan;
}
- protected RawScan prepareRawScan(GTRecord pkStart, GTRecord pkEnd, List<Pair<byte[], byte[]>> selectedColumns) {
- byte[] start = makeRowKeyToScan(pkStart, (byte) 0x00);
- byte[] end = makeRowKeyToScan(pkEnd, (byte) 0xff);
+ protected List<RawScan> preparedHBaseScan(GTRecord pkStart, GTRecord pkEnd, List<GTRecord> fuzzyKeys, ImmutableBitSet selectedColBlocks) {
+ final List<Pair<byte[], byte[]>> selectedColumns = makeHBaseColumns(selectedColBlocks);
+ List<RawScan> ret = Lists.newArrayList();
- //TODO fuzzy match
+ byte[] start = makeRowKeyToScan(pkStart, RowConstants.ROWKEY_LOWER_BYTE);
+ byte[] end = makeRowKeyToScan(pkEnd, RowConstants.ROWKEY_UPPER_BYTE);
+ List<Pair<byte[], byte[]>> hbaseFuzzyKeys = translateFuzzyKeys(fuzzyKeys);
- return new RawScan(start, end, selectedColumns, null);
+ short cuboidShardNum = cubeSeg.getCuboidShardNum(cuboid.getId());
+
+ for (short i = 0; i < cuboidShardNum; ++i) {
+ short shard = ShardingHash.normalize(cubeSeg.getCuboidBaseShard(cuboid.getId()), i, cubeSeg.getTotalShards());
+
+ byte[] shardStart = Arrays.copyOf(start, start.length);
+ byte[] shardEnd = new byte[end.length + 1];//append extra 0 to the end key to make it inclusive while scanning
+ System.arraycopy(end, 0, shardEnd, 0, end.length);
+
+ BytesUtil.writeShort(shard, shardStart, 0, RowConstants.ROWKEY_SHARDID_LEN);
+ BytesUtil.writeShort(shard, shardEnd, 0, RowConstants.ROWKEY_SHARDID_LEN);
+
+ ret.add(new RawScan(shardStart, shardEnd, selectedColumns, hbaseFuzzyKeys));
+ }
+ return ret;
+
+ }
+
+ /**
+ * translate GTRecord format fuzzy keys to hbase expected format
+ * @return
+ */
+ private List<Pair<byte[], byte[]>> translateFuzzyKeys(List<GTRecord> fuzzyKeys) {
+ if (fuzzyKeys == null || fuzzyKeys.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ List<Pair<byte[], byte[]>> ret = Lists.newArrayList();
+ int coreLength = fullGTInfo.getMaxColumnLength(fullGTInfo.getPrimaryKey());
+ for (GTRecord gtRecordFuzzyKey : fuzzyKeys) {
+ byte[] hbaseFuzzyKey = new byte[coreLength + RowConstants.ROWKEY_HEADER_LEN];
+ byte[] hbaseFuzzyMask = new byte[coreLength + RowConstants.ROWKEY_HEADER_LEN];
+
+ int pos = 0;
+ //shard part
+ Arrays.fill(hbaseFuzzyMask, pos, pos + RowConstants.ROWKEY_SHARDID_LEN, RowConstants.BYTE_ONE);//shard part should better be FIXED, for simplicity we make it non-fixed
+ pos += RowConstants.ROWKEY_SHARDID_LEN;
+
+ //cuboid part
+ Arrays.fill(hbaseFuzzyMask, pos, pos + RowConstants.ROWKEY_CUBOIDID_LEN, RowConstants.BYTE_ZERO);
+ System.arraycopy(cuboid.getBytes(), 0, hbaseFuzzyKey, pos, RowConstants.ROWKEY_CUBOIDID_LEN);
+ pos += RowConstants.ROWKEY_CUBOIDID_LEN;
+
+ //row key core part
+ ByteArray coreKey = HBaseScan.exportScanKey(gtRecordFuzzyKey, RowConstants.BYTE_ZERO);
+ System.arraycopy(coreKey.array(), coreKey.offset(), hbaseFuzzyKey, pos, coreKey.length());
+ ByteArray coreMask = HBaseScan.exportScanMask(gtRecordFuzzyKey);
+ System.arraycopy(coreMask.array(), coreMask.offset(), hbaseFuzzyMask, pos, coreMask.length());
+
+ Preconditions.checkState(coreKey.length() == coreMask.length(), "corekey length not equal coremask length");
+ pos += coreKey.length();
+ Preconditions.checkState(hbaseFuzzyKey.length == pos, "HBase fuzzy key not completely populated");
+
+ ret.add(new Pair<byte[], byte[]>(hbaseFuzzyKey, hbaseFuzzyMask));
+ }
+
+ return ret;
}
private byte[] makeRowKeyToScan(GTRecord pkRec, byte fill) {
- ByteArray pk = GTRecord.exportScanKey(pkRec);
- int pkMaxLen = pkRec.getInfo().getMaxColumnLength(pkRec.getInfo().getPrimaryKey());
+ ByteArray pk = HBaseScan.exportScanKey(pkRec, fill);
- byte[] buf = new byte[pkMaxLen + RowConstants.ROWKEY_CUBOIDID_LEN];
+ byte[] buf = new byte[pk.length() + RowConstants.ROWKEY_HEADER_LEN];
Arrays.fill(buf, fill);
- System.arraycopy(cuboid.getBytes(), 0, buf, 0, RowConstants.ROWKEY_CUBOIDID_LEN);
+ //for scanning/reading, later all possible shard will be applied
+
+ System.arraycopy(cuboid.getBytes(), 0, buf, RowConstants.ROWKEY_SHARDID_LEN, RowConstants.ROWKEY_CUBOIDID_LEN);
if (pk != null && pk.array() != null) {
- System.arraycopy(pk.array(), pk.offset(), buf, RowConstants.ROWKEY_CUBOIDID_LEN, pk.length());
+ System.arraycopy(pk.array(), pk.offset(), buf, RowConstants.ROWKEY_HEADER_LEN, pk.length());
}
return buf;
}
+ /**
+ * prune untouched hbase columns
+ */
protected List<Pair<byte[], byte[]>> makeHBaseColumns(ImmutableBitSet selectedColBlocks) {
List<Pair<byte[], byte[]>> result = Lists.newArrayList();
- int colBlockIdx = 1; // start from 1; the 0th column block is primary key which maps to rowkey
+ int colBlkIndex = 1;
HBaseMappingDesc hbaseMapping = cubeSeg.getCubeDesc().getHbaseMapping();
for (HBaseColumnFamilyDesc familyDesc : hbaseMapping.getColumnFamily()) {
byte[] byteFamily = Bytes.toBytes(familyDesc.getName());
for (HBaseColumnDesc hbaseColDesc : familyDesc.getColumns()) {
- if (selectedColBlocks.get(colBlockIdx)) {
+ if (selectedColBlocks.get(colBlkIndex)) {
byte[] byteQualifier = Bytes.toBytes(hbaseColDesc.getQualifier());
result.add(new Pair<byte[], byte[]>(byteFamily, byteQualifier));
}
- colBlockIdx++;
+ colBlkIndex++;
}
}
return result;
}
- //possible to use binary search as cells might be sorted
- public static Cell findCell(List<Cell> cells, byte[] familyName, byte[] columnName) {
- for (Cell c : cells) {
- if (BytesUtil.compareBytes(familyName, 0, c.getFamilyArray(), c.getFamilyOffset(), familyName.length) == 0 && //
- BytesUtil.compareBytes(columnName, 0, c.getQualifierArray(), c.getQualifierOffset(), columnName.length) == 0) {
- return c;
+ /**
+ * for each selected hbase column, it might contain values of multiple GT columns.
+ * The mapping should be passed down to storage
+ */
+ protected List<List<Integer>> getHBaseColumnsGTMapping(ImmutableBitSet selectedColBlocks) {
+
+ List<List<Integer>> ret = Lists.newArrayList();
+
+ int colBlkIndex = 1;
+ int metricOffset = fullGTInfo.getPrimaryKey().trueBitCount();
+
+ HBaseMappingDesc hbaseMapping = cubeSeg.getCubeDesc().getHbaseMapping();
+ for (HBaseColumnFamilyDesc familyDesc : hbaseMapping.getColumnFamily()) {
+ for (HBaseColumnDesc hbaseColDesc : familyDesc.getColumns()) {
+ if (selectedColBlocks.get(colBlkIndex)) {
+ int[] metricIndexes = hbaseColDesc.getMeasureIndex();
+ Integer[] gtIndexes = new Integer[metricIndexes.length];
+ for (int i = 0; i < gtIndexes.length; i++) {
+ gtIndexes[i] = metricIndexes[i] + metricOffset;
+ }
+ ret.add(Arrays.asList(gtIndexes));
+ }
+ colBlkIndex++;
}
}
- return null;
+
+ Preconditions.checkState(selectedColBlocks.trueBitCount() == ret.size() + 1);
+ return ret;
}
+
+
public static void applyHBaseColums(Scan scan, List<Pair<byte[], byte[]>> hbaseColumns) {
for (Pair<byte[], byte[]> hbaseColumn : hbaseColumns) {
byte[] byteFamily = hbaseColumn.getFirst();
@@ -157,4 +243,33 @@ public abstract class CubeHBaseRPC {
return result;
}
+ protected void logScan(RawScan rawScan, String tableName) {
+ StringBuilder info = new StringBuilder();
+ info.append("\nVisiting hbase table ").append(tableName).append(": ");
+ if (cuboid.requirePostAggregation()) {
+ info.append("cuboid require post aggregation, from ");
+ } else {
+ info.append("cuboid exact match, from ");
+ }
+ info.append(cuboid.getInputID());
+ info.append(" to ");
+ info.append(cuboid.getId());
+ info.append("\nStart: ");
+ info.append(rawScan.getStartKeyAsString());
+ info.append(" - ");
+ info.append(Bytes.toStringBinary(rawScan.startKey));
+ info.append("\nStop: ");
+ info.append(rawScan.getEndKeyAsString());
+ info.append(" - ");
+ info.append(Bytes.toStringBinary(rawScan.endKey));
+ if (rawScan.fuzzyKey != null) {
+ info.append("\nFuzzy key counts: " + rawScan.fuzzyKey.size());
+ info.append("\nFuzzy: ");
+ info.append(rawScan.getFuzzyKeyAsString());
+ } else {
+ info.append("\nNo Fuzzy Key");
+ }
+ logger.info(info.toString());
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
index 8838578..fa5a844 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
@@ -11,20 +11,55 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.kylin.common.util.ImmutableBitSet;
-import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.gridtable.GTRecord;
import org.apache.kylin.gridtable.GTScanRequest;
import org.apache.kylin.gridtable.IGTScanner;
import org.apache.kylin.gridtable.IGTStore;
import org.apache.kylin.storage.hbase.HBaseConnection;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+
/**
* for test use only
*/
public class CubeHBaseScanRPC extends CubeHBaseRPC {
+ static class TrimmedInfoGTRecordAdapter implements Iterable<GTRecord> {
+
+ private final GTInfo info;
+ private final Iterator<GTRecord> input;
+
+ public TrimmedInfoGTRecordAdapter(GTInfo info, Iterator<GTRecord> input) {
+ this.info = info;
+ this.input = input;
+ }
+
+ @Override
+ public Iterator<GTRecord> iterator() {
+ return new Iterator<GTRecord>() {
+ @Override
+ public boolean hasNext() {
+ return input.hasNext();
+ }
+
+ @Override
+ public GTRecord next() {
+ GTRecord x = input.next();
+ return new GTRecord(info, x.getInternal());
+ }
+
+ @Override
+ public void remove() {
+
+ }
+ };
+ }
+ }
+
public CubeHBaseScanRPC(CubeSegment cubeSeg, Cuboid cuboid, GTInfo fullGTInfo) {
super(cubeSeg, cuboid, fullGTInfo);
}
@@ -34,34 +69,47 @@ public class CubeHBaseScanRPC extends CubeHBaseRPC {
// primary key (also the 0th column block) is always selected
final ImmutableBitSet selectedColBlocks = scanRequest.getSelectedColBlocks().set(0);
-
// globally shared connection, does not require close
HConnection hbaseConn = HBaseConnection.get(cubeSeg.getCubeInstance().getConfig().getStorageUrl());
-
final HTableInterface hbaseTable = hbaseConn.getTable(cubeSeg.getStorageLocationIdentifier());
- final List<Pair<byte[], byte[]>> hbaseColumns = makeHBaseColumns(selectedColBlocks);
- RawScan rawScan = prepareRawScan(scanRequest.getPkStart(), scanRequest.getPkEnd(), hbaseColumns);
- Scan hbaseScan = buildScan(rawScan);
+ List<RawScan> rawScans = preparedHBaseScan(scanRequest.getPkStart(), scanRequest.getPkEnd(), scanRequest.getFuzzyKeys(), selectedColBlocks);
+ List<List<Integer>> hbaseColumnsToGT = getHBaseColumnsGTMapping(selectedColBlocks);
+
+ final List<ResultScanner> scanners = Lists.newArrayList();
+ final List<Iterator<Result>> resultIterators = Lists.newArrayList();
+
+ for (RawScan rawScan : rawScans) {
+
+ logScan(rawScan, cubeSeg.getStorageLocationIdentifier());
+ Scan hbaseScan = buildScan(rawScan);
+
+ final ResultScanner scanner = hbaseTable.getScanner(hbaseScan);
+ final Iterator<Result> iterator = scanner.iterator();
+
+ scanners.add(scanner);
+ resultIterators.add(iterator);
+ }
- final ResultScanner scanner = hbaseTable.getScanner(hbaseScan);
- final Iterator<Result> iterator = scanner.iterator();
+ final Iterator<Result> allResultsIterator = Iterators.concat(resultIterators.iterator());
CellListIterator cellListIterator = new CellListIterator() {
@Override
public void close() throws IOException {
- scanner.close();
+ for (ResultScanner scanner : scanners) {
+ scanner.close();
+ }
hbaseTable.close();
}
@Override
public boolean hasNext() {
- return iterator.hasNext();
+ return allResultsIterator.hasNext();
}
@Override
public List<Cell> next() {
- return iterator.next().listCells();
+ return allResultsIterator.next().listCells();
}
@Override
@@ -70,8 +118,32 @@ public class CubeHBaseScanRPC extends CubeHBaseRPC {
}
};
- IGTStore store = new HBaseReadonlyStore(cellListIterator, scanRequest, hbaseColumns);
+ IGTStore store = new HBaseReadonlyStore(cellListIterator, scanRequest, rawScans.get(0).hbaseColumns, hbaseColumnsToGT);
IGTScanner rawScanner = store.scan(scanRequest);
- return scanRequest.decorateScanner(rawScanner);
+
+ final IGTScanner decorateScanner = scanRequest.decorateScanner(rawScanner);
+ final TrimmedInfoGTRecordAdapter trimmedInfoGTRecordAdapter = new TrimmedInfoGTRecordAdapter(fullGTInfo, decorateScanner.iterator());
+
+ return new IGTScanner() {
+ @Override
+ public GTInfo getInfo() {
+ return fullGTInfo;
+ }
+
+ @Override
+ public int getScannedRowCount() {
+ return decorateScanner.getScannedRowCount();
+ }
+
+ @Override
+ public void close() throws IOException {
+ decorateScanner.close();
+ }
+
+ @Override
+ public Iterator<GTRecord> iterator() {
+ return trimmedInfoGTRecordAdapter.iterator();
+ }
+ };
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeScanner.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeScanner.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeScanner.java
deleted file mode 100644
index 9359934..0000000
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeScanner.java
+++ /dev/null
@@ -1,265 +0,0 @@
-package org.apache.kylin.storage.hbase.cube.v2;
-
-import java.io.IOException;
-import java.util.BitSet;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
-import java.util.Set;
-
-import org.apache.kylin.common.util.ImmutableBitSet;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.gridtable.CubeGridTable;
-import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.gridtable.GTInfo;
-import org.apache.kylin.gridtable.GTRecord;
-import org.apache.kylin.gridtable.GTScanRange;
-import org.apache.kylin.gridtable.GTScanRangePlanner;
-import org.apache.kylin.gridtable.GTScanRequest;
-import org.apache.kylin.gridtable.GTUtil;
-import org.apache.kylin.gridtable.IGTScanner;
-import org.apache.kylin.metadata.filter.TupleFilter;
-import org.apache.kylin.metadata.model.FunctionDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
-public class CubeScanner implements IGTScanner {
-
- private static final int MAX_SCAN_RANGES = 200;
-
- final CubeSegment cubeSeg;
- final GTInfo info;
- final byte[] trimmedInfoBytes;
- final List<GTScanRequest> scanRequests;
- final Scanner scanner;
- final Cuboid cuboid;
-
- public CubeScanner(CubeSegment cubeSeg, Cuboid cuboid, Set<TblColRef> dimensions, Set<TblColRef> groups, //
- Collection<FunctionDesc> metrics, TupleFilter filter, boolean allowPreAggregate) {
- this.cuboid = cuboid;
- this.cubeSeg = cubeSeg;
- this.info = CubeGridTable.newGTInfo(cubeSeg, cuboid.getId());
-
- CuboidToGridTableMapping mapping = cuboid.getCuboidToGridTableMapping();
-
- //replace the constant values in filter to dictionary codes
- TupleFilter gtFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, mapping.getCuboidDimensionsInGTOrder(), groups);
-
- ImmutableBitSet gtDimensions = makeGridTableColumns(mapping, dimensions);
- ImmutableBitSet gtAggrGroups = makeGridTableColumns(mapping, replaceDerivedColumns(groups, cubeSeg.getCubeDesc()));
- ImmutableBitSet gtAggrMetrics = makeGridTableColumns(mapping, metrics);
- String[] gtAggrFuncs = makeAggrFuncs(mapping, metrics);
-
- //TODO: should remove this in endpoint scenario
- GTScanRangePlanner scanRangePlanner = new GTScanRangePlanner(info);
- List<GTScanRange> scanRanges = scanRangePlanner.planScanRanges(gtFilter, MAX_SCAN_RANGES);
-
- scanRequests = Lists.newArrayListWithCapacity(scanRanges.size());
-
- trimmedInfoBytes = GTInfo.serialize(info);
- GTInfo trimmedInfo = GTInfo.deserialize(trimmedInfoBytes);
-
- for (GTScanRange range : scanRanges) {
- scanRequests.add(new GTScanRequest(trimmedInfo, range, gtDimensions, gtAggrGroups, gtAggrMetrics, gtAggrFuncs, gtFilter, allowPreAggregate));
- }
-
- scanner = new Scanner();
- }
-
- private Set<TblColRef> replaceDerivedColumns(Set<TblColRef> input, CubeDesc cubeDesc) {
- Set<TblColRef> ret = Sets.newHashSet();
- for (TblColRef col : input) {
- if (cubeDesc.isDerived(col)) {
- for (TblColRef host : cubeDesc.getHostInfo(col).columns) {
- ret.add(host);
- }
- } else {
- ret.add(col);
- }
- }
- return ret;
- }
-
- private ImmutableBitSet makeGridTableColumns(CuboidToGridTableMapping mapping, Set<TblColRef> dimensions) {
- BitSet result = new BitSet();
- for (TblColRef dim : dimensions) {
- int idx = mapping.getIndexOf(dim);
- if (idx >= 0)
- result.set(idx);
- }
- return new ImmutableBitSet(result);
- }
-
- private ImmutableBitSet makeGridTableColumns(CuboidToGridTableMapping mapping, Collection<FunctionDesc> metrics) {
- BitSet result = new BitSet();
- for (FunctionDesc metric : metrics) {
- int idx = mapping.getIndexOf(metric);
- if (idx < 0)
- throw new IllegalStateException(metric + " not found in " + mapping);
- result.set(idx);
- }
- return new ImmutableBitSet(result);
- }
-
- private String[] makeAggrFuncs(final CuboidToGridTableMapping mapping, Collection<FunctionDesc> metrics) {
-
- //metrics are represented in ImmutableBitSet, which loses order information
- //sort the aggrFuns to align with metrics natural order
- List<FunctionDesc> metricList = Lists.newArrayList(metrics);
- Collections.sort(metricList, new Comparator<FunctionDesc>() {
- @Override
- public int compare(FunctionDesc o1, FunctionDesc o2) {
- int a = mapping.getIndexOf(o1);
- int b = mapping.getIndexOf(o2);
- return a - b;
- }
- });
-
- String[] result = new String[metricList.size()];
- int i = 0;
- for (FunctionDesc metric : metricList) {
- result[i++] = metric.getExpression();
- }
- return result;
- }
-
- @Override
- public Iterator<GTRecord> iterator() {
- return scanner.iterator();
- }
-
- @Override
- public void close() throws IOException {
- scanner.close();
- }
-
- @Override
- public GTInfo getInfo() {
- return info;
- }
-
- @Override
- public int getScannedRowCount() {
- return scanner.getScannedRowCount();
- }
-
- static class RemoteGTRecordAdapter implements Iterable<GTRecord> {
-
- private final GTInfo info;
- private final Iterator<GTRecord> input;
-
- public RemoteGTRecordAdapter(GTInfo info, Iterator<GTRecord> input) {
- this.info = info;
- this.input = input;
- }
-
- @Override
- public Iterator<GTRecord> iterator() {
- return new Iterator<GTRecord>() {
- @Override
- public boolean hasNext() {
- return input.hasNext();
- }
-
- @Override
- public GTRecord next() {
- GTRecord x = input.next();
- return new GTRecord(info, x.getInternal());
- }
-
- @Override
- public void remove() {
-
- }
- };
- }
- }
-
- private class Scanner {
- final IGTScanner[] inputScanners = new IGTScanner[scanRequests.size()];
- int cur = 0;
- Iterator<GTRecord> curIterator = null;
- GTRecord next = null;
-
- public Iterator<GTRecord> iterator() {
- return new Iterator<GTRecord>() {
-
- @Override
- public boolean hasNext() {
- if (next != null)
- return true;
-
- if (curIterator == null) {
- if (cur >= scanRequests.size())
- return false;
-
- try {
- CubeHBaseRPC rpc = new CubeHBaseEndpointRPC(cubeSeg, cuboid, info);
- inputScanners[cur] = rpc.getGTScanner(scanRequests.get(cur));
- curIterator = inputScanners[cur].iterator();
- //curIterator = new RemoteGTRecordAdapter(info, inputScanners[cur].iterator()).iterator();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- if (curIterator.hasNext() == false) {
- curIterator = null;
- cur++;
- return hasNext();
- }
-
- next = curIterator.next();
- return true;
- }
-
- @Override
- public GTRecord next() {
- // fetch next record
- if (next == null) {
- hasNext();
- if (next == null)
- throw new NoSuchElementException();
- }
-
- GTRecord result = next;
- next = null;
- return result;
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
- };
- }
-
- public void close() throws IOException {
- for (int i = 0; i < inputScanners.length; i++) {
- if (inputScanners[i] != null) {
- inputScanners[i].close();
- }
- }
- }
-
- public int getScannedRowCount() {
- int result = 0;
- for (int i = 0; i < inputScanners.length; i++) {
- if (inputScanners[i] == null)
- break;
-
- result += inputScanners[i].getScannedRowCount();
- }
- return result;
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java
new file mode 100644
index 0000000..286da55
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java
@@ -0,0 +1,290 @@
+package org.apache.kylin.storage.hbase.cube.v2;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Set;
+
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.DateFormat;
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.gridtable.CubeGridTable;
+import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.GTScanRange;
+import org.apache.kylin.gridtable.GTScanRangePlanner;
+import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.GTUtil;
+import org.apache.kylin.gridtable.IGTScanner;
+import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+public class CubeSegmentScanner implements IGTScanner {
+
+ private static final int MAX_SCAN_RANGES = 200;
+
+ final CubeSegment cubeSeg;
+ final GTInfo info;
+ final byte[] trimmedInfoBytes;
+ final List<GTScanRequest> scanRequests;
+ final Scanner scanner;
+ final Cuboid cuboid;
+
+ public CubeSegmentScanner(CubeSegment cubeSeg, Cuboid cuboid, Set<TblColRef> dimensions, Set<TblColRef> groups, //
+ Collection<FunctionDesc> metrics, TupleFilter filter, boolean allowPreAggregate) {
+ this.cuboid = cuboid;
+ this.cubeSeg = cubeSeg;
+ this.info = CubeGridTable.newGTInfo(cubeSeg, cuboid.getId());
+
+ CuboidToGridTableMapping mapping = cuboid.getCuboidToGridTableMapping();
+
+ //replace the constant values in filter to dictionary codes
+ TupleFilter gtFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, mapping.getCuboidDimensionsInGTOrder(), groups);
+
+ ImmutableBitSet gtDimensions = makeGridTableColumns(mapping, dimensions);
+ ImmutableBitSet gtAggrGroups = makeGridTableColumns(mapping, replaceDerivedColumns(groups, cubeSeg.getCubeDesc()));
+ ImmutableBitSet gtAggrMetrics = makeGridTableColumns(mapping, metrics);
+ String[] gtAggrFuncs = makeAggrFuncs(mapping, metrics);
+
+ GTScanRangePlanner scanRangePlanner;
+ if (cubeSeg.getCubeDesc().getModel().getPartitionDesc().isPartitioned()) {
+ TblColRef tblColRef = cubeSeg.getCubeDesc().getModel().getPartitionDesc().getPartitionDateColumnRef();
+ Pair<ByteArray, ByteArray> segmentStartAndEnd = null;
+ int index = mapping.getIndexOf(tblColRef);
+ if (index >= 0) {
+ segmentStartAndEnd = getSegmentStartAndEnd(tblColRef, index);
+ }
+ scanRangePlanner = new GTScanRangePlanner(info, segmentStartAndEnd, tblColRef);
+ } else {
+ scanRangePlanner = new GTScanRangePlanner(info, null, null);
+ }
+ List<GTScanRange> scanRanges = scanRangePlanner.planScanRanges(gtFilter, MAX_SCAN_RANGES);
+
+ scanRequests = Lists.newArrayListWithCapacity(scanRanges.size());
+
+ trimmedInfoBytes = GTInfo.serialize(info);
+ GTInfo trimmedInfo = GTInfo.deserialize(trimmedInfoBytes);
+
+ for (GTScanRange range : scanRanges) {
+ scanRequests.add(new GTScanRequest(trimmedInfo, range, gtDimensions, gtAggrGroups, gtAggrMetrics, gtAggrFuncs, gtFilter, allowPreAggregate));
+ }
+
+ scanner = new Scanner();
+ }
+
+ private Pair<ByteArray, ByteArray> getSegmentStartAndEnd(TblColRef tblColRef, int index) {
+
+ String partitionColType = tblColRef.getColumnDesc().getDatatype();
+
+ ByteArray start;
+ if (cubeSeg.getDateRangeStart() != Long.MIN_VALUE) {
+ start = translateTsToString(cubeSeg.getDateRangeStart(), partitionColType, index);
+ } else {
+ start = new ByteArray();
+ }
+
+ ByteArray end;
+ if (cubeSeg.getDateRangeEnd() != Long.MAX_VALUE) {
+ end = translateTsToString(cubeSeg.getDateRangeEnd(), partitionColType, index);
+ } else {
+ end = new ByteArray();
+ }
+ return Pair.newPair(start, end);
+
+ }
+
+ private ByteArray translateTsToString(long ts, String partitionColType, int index) {
+ String value;
+ if ("date".equalsIgnoreCase(partitionColType)) {
+ value = DateFormat.formatToDateStr(ts);
+ } else if ("timestamp".equalsIgnoreCase(partitionColType)) {
+ //TODO: if partition col is not dict encoded, value's format may differ from expected. Though by default it is not the case
+ value = DateFormat.formatToTimeWithoutMilliStr(ts);
+ } else {
+ throw new RuntimeException("Type " + partitionColType + " is not valid partition column type");
+ }
+
+ ByteBuffer buffer = ByteBuffer.allocate(info.getMaxColumnLength());
+ info.getCodeSystem().encodeColumnValue(index, value, buffer);
+
+ return ByteArray.copyOf(buffer.array(), 0, buffer.position());
+ }
+
+ private Set<TblColRef> replaceDerivedColumns(Set<TblColRef> input, CubeDesc cubeDesc) {
+ Set<TblColRef> ret = Sets.newHashSet();
+ for (TblColRef col : input) {
+ if (cubeDesc.isDerived(col)) {
+ for (TblColRef host : cubeDesc.getHostInfo(col).columns) {
+ ret.add(host);
+ }
+ } else {
+ ret.add(col);
+ }
+ }
+ return ret;
+ }
+
+ private ImmutableBitSet makeGridTableColumns(CuboidToGridTableMapping mapping, Set<TblColRef> dimensions) {
+ BitSet result = new BitSet();
+ for (TblColRef dim : dimensions) {
+ int idx = mapping.getIndexOf(dim);
+ if (idx >= 0)
+ result.set(idx);
+ }
+ return new ImmutableBitSet(result);
+ }
+
+ private ImmutableBitSet makeGridTableColumns(CuboidToGridTableMapping mapping, Collection<FunctionDesc> metrics) {
+ BitSet result = new BitSet();
+ for (FunctionDesc metric : metrics) {
+ int idx = mapping.getIndexOf(metric);
+ if (idx < 0)
+ throw new IllegalStateException(metric + " not found in " + mapping);
+ result.set(idx);
+ }
+ return new ImmutableBitSet(result);
+ }
+
+ private String[] makeAggrFuncs(final CuboidToGridTableMapping mapping, Collection<FunctionDesc> metrics) {
+
+ //metrics are represented in ImmutableBitSet, which loses order information
+ //sort the aggrFuns to align with metrics natural order
+ List<FunctionDesc> metricList = Lists.newArrayList(metrics);
+ Collections.sort(metricList, new Comparator<FunctionDesc>() {
+ @Override
+ public int compare(FunctionDesc o1, FunctionDesc o2) {
+ int a = mapping.getIndexOf(o1);
+ int b = mapping.getIndexOf(o2);
+ return a - b;
+ }
+ });
+
+ String[] result = new String[metricList.size()];
+ int i = 0;
+ for (FunctionDesc metric : metricList) {
+ result[i++] = metric.getExpression();
+ }
+ return result;
+ }
+
+ @Override
+ public Iterator<GTRecord> iterator() {
+ return scanner.iterator();
+ }
+
+ @Override
+ public void close() throws IOException {
+ scanner.close();
+ }
+
+ @Override
+ public GTInfo getInfo() {
+ return info;
+ }
+
+ @Override
+ public int getScannedRowCount() {
+ return scanner.getScannedRowCount();
+ }
+
+ private class Scanner {
+ final IGTScanner[] inputScanners = new IGTScanner[scanRequests.size()];
+ int cur = 0;
+ Iterator<GTRecord> curIterator = null;
+ GTRecord next = null;
+
+ public Iterator<GTRecord> iterator() {
+ return new Iterator<GTRecord>() {
+
+ @Override
+ public boolean hasNext() {
+ if (next != null)
+ return true;
+
+ if (curIterator == null) {
+ if (cur >= scanRequests.size())
+ return false;
+
+ try {
+
+ CubeHBaseRPC rpc = new CubeHBaseEndpointRPC(cubeSeg, cuboid, info);
+ //CubeHBaseRPC rpc = new CubeHBaseScanRPC(cubeSeg, cuboid, info);
+
+ //change previous line to CubeHBaseRPC rpc = new CubeHBaseScanRPC(cubeSeg, cuboid, info);
+ //to debug locally
+
+ inputScanners[cur] = rpc.getGTScanner(scanRequests.get(cur));
+ curIterator = inputScanners[cur].iterator();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ if (curIterator.hasNext() == false) {
+ curIterator = null;
+ cur++;
+ return hasNext();
+ }
+
+ next = curIterator.next();
+ return true;
+ }
+
+ @Override
+ public GTRecord next() {
+ // fetch next record
+ if (next == null) {
+ hasNext();
+ if (next == null)
+ throw new NoSuchElementException();
+ }
+
+ GTRecord result = next;
+ next = null;
+ return result;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+
+ public void close() throws IOException {
+ for (int i = 0; i < inputScanners.length; i++) {
+ if (inputScanners[i] != null) {
+ inputScanners[i].close();
+ }
+ }
+ }
+
+ public int getScannedRowCount() {
+ int result = 0;
+ for (int i = 0; i < inputScanners.length; i++) {
+ if (inputScanners[i] == null)
+ break;
+
+ result += inputScanners[i].getScannedRowCount();
+ }
+ return result;
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java
index 35f95ca..71abb41 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java
@@ -36,6 +36,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Range;
import com.google.common.collect.Sets;
+@SuppressWarnings("unused")
public class CubeStorageQuery implements ICachableStorageQuery {
private static final Logger logger = LoggerFactory.getLogger(CubeStorageQuery.class);
@@ -91,13 +92,11 @@ public class CubeStorageQuery implements ICachableStorageQuery {
TupleFilter filterD = translateDerived(filter, groupsD);
setThreshold(dimensionsD, metrics, context); // set cautious threshold to prevent out of memory
- // TODO enable coprocessor
- // setCoprocessor(groupsCopD, valueDecoders, context); // enable coprocessor if beneficial
setLimit(filter, context);
- List<CubeScanner> scanners = Lists.newArrayList();
+ List<CubeSegmentScanner> scanners = Lists.newArrayList();
for (CubeSegment cubeSeg : cubeInstance.getSegments(SegmentStatusEnum.READY)) {
- scanners.add(new CubeScanner(cubeSeg, cuboid, dimensionsD, groupsD, metrics, filterD, !isExactAggregation));
+ scanners.add(new CubeSegmentScanner(cubeSeg, cuboid, dimensionsD, groupsD, metrics, filterD, !isExactAggregation));
}
if (scanners.isEmpty())
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java
index a6c6a23..7731f19 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java
@@ -24,7 +24,7 @@ import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.hbase.Cell;
-import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.kv.RowConstants;
import org.apache.kylin.gridtable.GTInfo;
@@ -34,20 +34,22 @@ import org.apache.kylin.gridtable.IGTScanner;
import org.apache.kylin.gridtable.IGTStore;
import org.apache.kylin.gridtable.IGTWriter;
+import com.google.common.base.Preconditions;
+
public class HBaseReadonlyStore implements IGTStore {
private CellListIterator cellListIterator;
private GTInfo info;
private List<Pair<byte[], byte[]>> hbaseColumns;
- private ImmutableBitSet selectedColBlocks;
+ private List<List<Integer>> hbaseColumnsToGT;
- public HBaseReadonlyStore(CellListIterator cellListIterator, GTScanRequest gtScanRequest, List<Pair<byte[], byte[]>> hbaseColumns) {
+ public HBaseReadonlyStore(CellListIterator cellListIterator, GTScanRequest gtScanRequest, List<Pair<byte[], byte[]>> hbaseColumns, List<List<Integer>> hbaseColumnsToGT) {
this.cellListIterator = cellListIterator;
this.info = gtScanRequest.getInfo();
this.hbaseColumns = hbaseColumns;
- this.selectedColBlocks = gtScanRequest.getSelectedColBlocks().set(0);
+ this.hbaseColumnsToGT = hbaseColumnsToGT;
}
@Override
@@ -56,20 +58,31 @@ public class HBaseReadonlyStore implements IGTStore {
}
@Override
- public IGTWriter rebuild(int shard) throws IOException {
+ public IGTWriter rebuild() throws IOException {
throw new UnsupportedOperationException();
}
@Override
- public IGTWriter append(int shard) throws IOException {
+ public IGTWriter append() throws IOException {
throw new UnsupportedOperationException();
}
+ //TODO: possible to use binary search as cells might be sorted?
+ public static Cell findCell(List<Cell> cells, byte[] familyName, byte[] columnName) {
+ for (Cell c : cells) {
+ if (BytesUtil.compareBytes(familyName, 0, c.getFamilyArray(), c.getFamilyOffset(), familyName.length) == 0 && //
+ BytesUtil.compareBytes(columnName, 0, c.getQualifierArray(), c.getQualifierOffset(), columnName.length) == 0) {
+ return c;
+ }
+ }
+ return null;
+ }
+
@Override
public IGTScanner scan(GTScanRequest scanRequest) throws IOException {
return new IGTScanner() {
int count;
-
+
@Override
public void close() throws IOException {
cellListIterator.close();
@@ -79,7 +92,7 @@ public class HBaseReadonlyStore implements IGTStore {
public Iterator<GTRecord> iterator() {
return new Iterator<GTRecord>() {
GTRecord oneRecord = new GTRecord(info); // avoid object creation
-
+
@Override
public boolean hasNext() {
return cellListIterator.hasNext();
@@ -87,26 +100,24 @@ public class HBaseReadonlyStore implements IGTStore {
@Override
public GTRecord next() {
+ count++;
List<Cell> oneRow = cellListIterator.next();
if (oneRow.size() < 1) {
throw new IllegalStateException("cell list's size less than 1");
}
- ByteBuffer buf;
-
// dimensions, set to primary key, also the 0th column block
Cell firstCell = oneRow.get(0);
- buf = byteBuffer(firstCell.getRowArray(), RowConstants.ROWKEY_CUBOIDID_LEN + firstCell.getRowOffset(), firstCell.getRowLength() - RowConstants.ROWKEY_CUBOIDID_LEN);
+ ByteBuffer buf = byteBuffer(firstCell.getRowArray(), RowConstants.ROWKEY_HEADER_LEN + firstCell.getRowOffset(), firstCell.getRowLength() - RowConstants.ROWKEY_HEADER_LEN);
oneRecord.loadCellBlock(0, buf);
// metrics
- int hbaseColIdx = 0;
- for (int i = 1; i < selectedColBlocks.trueBitCount(); i++) {
- int colBlockIdx = selectedColBlocks.trueBitAt(i);
- Pair<byte[], byte[]> hbaseColumn = hbaseColumns.get(hbaseColIdx++);
- Cell cell = CubeHBaseRPC.findCell(oneRow, hbaseColumn.getFirst(), hbaseColumn.getSecond());
+ for (int i = 0; i < hbaseColumns.size(); i++) {
+ Pair<byte[], byte[]> hbaseColumn = hbaseColumns.get(i);
+ Cell cell = HBaseReadonlyStore.findCell(oneRow, hbaseColumn.getFirst(), hbaseColumn.getSecond());
+ Preconditions.checkNotNull(cell);
buf = byteBuffer(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
- oneRecord.loadCellBlock(colBlockIdx, buf);
+ oneRecord.loadColumns(hbaseColumnsToGT.get(i), buf);
}
return oneRecord;
@@ -116,7 +127,7 @@ public class HBaseReadonlyStore implements IGTStore {
public void remove() {
throw new UnsupportedOperationException();
}
-
+
private ByteBuffer byteBuffer(byte[] array, int offset, int length) {
return ByteBuffer.wrap(array, offset, length);
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseScan.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseScan.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseScan.java
new file mode 100644
index 0000000..7667830
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseScan.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.hbase.cube.v2;
+
+import java.util.Arrays;
+
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.gridtable.GTRecord;
+
+import com.google.common.base.Preconditions;
+
+public class HBaseScan {
+
+ /**
+ * every column in scan key is fixed length. for empty values, 0 zero will be populated
+ */
+ public static ByteArray exportScanKey(GTRecord rec, byte fill) {
+
+ Preconditions.checkNotNull(rec);
+
+ GTInfo info = rec.getInfo();
+ int len = info.getMaxColumnLength(info.getPrimaryKey());
+ ByteArray buf = ByteArray.allocate(len);
+ int pos = 0;
+ for (int i = 0; i < info.getPrimaryKey().trueBitCount(); i++) {
+ int c = info.getPrimaryKey().trueBitAt(i);
+ int colLength = info.getCodeSystem().maxCodeLength(c);
+
+ if (rec.get(c).array() != null) {
+ Preconditions.checkArgument(colLength == rec.get(c).length(), "ColLength :" + colLength + " not equals cols[c] length: " + rec.get(c).length() + " c is " + c);
+ System.arraycopy(rec.get(c).array(), rec.get(c).offset(), buf.array(), buf.offset() + pos, rec.get(c).length());
+ } else {
+ Arrays.fill(buf.array(), buf.offset() + pos, buf.offset() + pos + colLength, fill);
+ }
+ pos += colLength;
+ }
+ buf.setLength(pos);
+
+ return buf;
+ }
+
+ /**
+ * every column in scan key is fixed length. for fixed columns, 0 will be populated, for non-fixed columns, 1 will be populated
+ */
+ public static ByteArray exportScanMask(GTRecord rec) {
+ Preconditions.checkNotNull(rec);
+
+ GTInfo info = rec.getInfo();
+ int len = info.getMaxColumnLength(info.getPrimaryKey());
+ ByteArray buf = ByteArray.allocate(len);
+ byte fill;
+
+ int pos = 0;
+ for (int i = 0; i < info.getPrimaryKey().trueBitCount(); i++) {
+ int c = info.getPrimaryKey().trueBitAt(i);
+ int colLength = info.getCodeSystem().maxCodeLength(c);
+
+ if (rec.get(c).array() != null) {
+ fill = RowConstants.BYTE_ZERO;
+ } else {
+ fill = RowConstants.BYTE_ONE;
+ }
+ Arrays.fill(buf.array(), buf.offset() + pos, buf.offset() + pos + colLength, fill);
+ pos += colLength;
+ }
+ buf.setLength(pos);
+
+ return buf;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/RawScan.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/RawScan.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/RawScan.java
index aa73927..ad4263f 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/RawScan.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/RawScan.java
@@ -20,13 +20,14 @@ package org.apache.kylin.storage.hbase.cube.v2;
import java.util.List;
+import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.common.util.Pair;
public class RawScan {
public byte[] startKey;
public byte[] endKey;
- public List<Pair<byte[], byte[]>> hbaseColumns;
+ public List<Pair<byte[], byte[]>> hbaseColumns;//only contain interested columns
public List<Pair<byte[], byte[]>> fuzzyKey;
public RawScan(byte[] startKey, byte[] endKey, List<Pair<byte[], byte[]>> hbaseColumns, List<Pair<byte[], byte[]>> fuzzyKey) {
@@ -37,4 +38,23 @@ public class RawScan {
this.fuzzyKey = fuzzyKey;
}
+ public String getStartKeyAsString() {
+ return BytesUtil.toHex(this.startKey);
+ }
+
+ public String getEndKeyAsString() {
+ return BytesUtil.toHex(this.endKey);
+ }
+
+ public String getFuzzyKeyAsString() {
+ StringBuilder buf = new StringBuilder();
+ for (Pair<byte[], byte[]> fuzzyKey : this.fuzzyKey) {
+ buf.append(BytesUtil.toHex(fuzzyKey.getFirst()));
+ buf.append(" ");
+ buf.append(BytesUtil.toHex(fuzzyKey.getSecond()));
+ buf.append(System.lineSeparator());
+ }
+ return buf.toString();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTupleIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTupleIterator.java
index 4686da2..85aa54a 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTupleIterator.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTupleIterator.java
@@ -27,10 +27,10 @@ public class SequentialCubeTupleIterator implements ITupleIterator {
private final Set<FunctionDesc> selectedMetrics;
private final TupleInfo tupleInfo;
private final Tuple tuple;
- private final Iterator<CubeScanner> scannerIterator;
+ private final Iterator<CubeSegmentScanner> scannerIterator;
private final StorageContext context;
- private CubeScanner curScanner;
+ private CubeSegmentScanner curScanner;
private Iterator<GTRecord> curRecordIterator;
private CubeTupleConverter curTupleConverter;
private Tuple next;
@@ -38,7 +38,7 @@ public class SequentialCubeTupleIterator implements ITupleIterator {
private int scanCount;
private int scanCountDelta;
- public SequentialCubeTupleIterator(List<CubeScanner> scanners, Cuboid cuboid, Set<TblColRef> selectedDimensions, //
+ public SequentialCubeTupleIterator(List<CubeSegmentScanner> scanners, Cuboid cuboid, Set<TblColRef> selectedDimensions, //
Set<FunctionDesc> selectedMetrics, TupleInfo returnTupleInfo, StorageContext context) {
this.cuboid = cuboid;
this.selectedDimensions = selectedDimensions;
@@ -112,7 +112,7 @@ public class SequentialCubeTupleIterator implements ITupleIterator {
}
}
- private void close(CubeScanner scanner) {
+ private void close(CubeSegmentScanner scanner) {
try {
scanner.close();
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
index f0b8c6f..ba766bd 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
@@ -46,11 +46,12 @@ import org.apache.kylin.storage.hbase.cube.v2.CubeHBaseRPC;
import org.apache.kylin.storage.hbase.cube.v2.HBaseReadonlyStore;
import org.apache.kylin.storage.hbase.cube.v2.RawScan;
import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos;
+import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitRequest.IntList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
-import com.google.protobuf.ByteString;
+import com.google.protobuf.HBaseZeroCopyByteString;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
@@ -125,9 +126,13 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
try {
this.serviceStartTime = System.currentTimeMillis();
- GTScanRequest scanReq = KryoUtils.deserialize(request.getGtScanRequest().toByteArray(), GTScanRequest.class);
- RawScan hbaseRawScan = KryoUtils.deserialize(request.getHbaseRawScan().toByteArray(), RawScan.class);
- //TODO: rewrite own start/end
+ GTScanRequest scanReq = KryoUtils.deserialize(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getGtScanRequest()), GTScanRequest.class);
+ RawScan hbaseRawScan = KryoUtils.deserialize(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getHbaseRawScan()), RawScan.class);
+ List<List<Integer>> hbaseColumnsToGT = Lists.newArrayList();
+ for (IntList intList : request.getHbaseColumnsToGTList()) {
+ hbaseColumnsToGT.add(intList.getIntsList());
+ }
+
Scan scan = CubeHBaseRPC.buildScan(hbaseRawScan);
region = env.getRegion();
@@ -136,26 +141,30 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
innerScanner = region.getScanner(scan);
InnerScannerAsIterator cellListIterator = new InnerScannerAsIterator(innerScanner);
- IGTStore store = new HBaseReadonlyStore(cellListIterator, scanReq, hbaseRawScan.hbaseColumns);
+ IGTStore store = new HBaseReadonlyStore(cellListIterator, scanReq, hbaseRawScan.hbaseColumns, hbaseColumnsToGT);
IGTScanner rawScanner = store.scan(scanReq);
IGTScanner finalScanner = scanReq.decorateScanner(rawScanner);
ByteBuffer buffer = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
- ByteArrayOutputStream outputStream = new ByteArrayOutputStream(RowConstants.ROWVALUE_BUFFER_SIZE);
+
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream(RowConstants.ROWVALUE_BUFFER_SIZE);//ByteArrayOutputStream will auto grow
+ int finalRowCount = 0;
for (GTRecord oneRecord : finalScanner) {
buffer.clear();
- oneRecord.exportAllColumns(buffer);
+ oneRecord.exportColumns(scanReq.getColumns(), buffer);
buffer.flip();
+
outputStream.write(buffer.array(), buffer.arrayOffset() - buffer.position(), buffer.remaining());
+ finalRowCount++;
}
//outputStream.close() is not necessary
byte[] allRows = outputStream.toByteArray();
CubeVisitProtos.CubeVisitResponse.Builder responseBuilder = CubeVisitProtos.CubeVisitResponse.newBuilder();
done.run(responseBuilder.//
- setCompressedRows(ByteString.copyFrom(CompressionUtils.compress(allRows))).//too many array copies
+ setCompressedRows(HBaseZeroCopyByteString.wrap(CompressionUtils.compress(allRows))).//too many array copies
setStats(CubeVisitProtos.CubeVisitResponse.Stats.newBuilder().//
- setAggregatedRowCount(0).//
- setScannedRowCount(0).//
+ setAggregatedRowCount(finalScanner.getScannedRowCount() - finalRowCount).//
+ setScannedRowCount(finalScanner.getScannedRowCount()).//
setServiceStartTime(serviceStartTime).//
setServiceEndTime(System.currentTimeMillis()).build()).//
build());