You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2017/06/05 05:22:53 UTC
[05/67] [abbrv] kylin git commit: Revert "reformat code"
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/FilterDecorator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/FilterDecorator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/FilterDecorator.java
index c58f227..2b2e490 100755
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/FilterDecorator.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/FilterDecorator.java
@@ -60,14 +60,12 @@ public class FilterDecorator implements TupleFilterSerializer.Decorator {
return inevaluableColumns;
}
- private TupleFilter replaceConstantsWithLocalDict(CompareTupleFilter oldCompareFilter,
- CompareTupleFilter newCompareFilter) {
+ private TupleFilter replaceConstantsWithLocalDict(CompareTupleFilter oldCompareFilter, CompareTupleFilter newCompareFilter) {
//TODO localdict: (performance issue) transalte() with roundingflag 0 will use try catch exceptions to deal with non-existing entries
return replaceConstantsWithGlobalDict(oldCompareFilter, newCompareFilter);
}
- private TupleFilter replaceConstantsWithGlobalDict(CompareTupleFilter oldCompareFilter,
- CompareTupleFilter newCompareFilter) {
+ private TupleFilter replaceConstantsWithGlobalDict(CompareTupleFilter oldCompareFilter, CompareTupleFilter newCompareFilter) {
Collection<String> constValues = (Collection<String>) oldCompareFilter.getValues();
String firstValue = constValues.iterator().next();
TblColRef col = newCompareFilter.getColumn();
@@ -219,4 +217,4 @@ public class FilterDecorator implements TupleFilterSerializer.Decorator {
columnIO.writeColumn(column, v, roundingFlag, DimensionEncoding.NULL, id, 0);
return Dictionary.dictIdToString(id, 0, id.length);
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/HBaseScannerBenchmark.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/HBaseScannerBenchmark.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/HBaseScannerBenchmark.java
index 56f78dc..3eecba1 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/HBaseScannerBenchmark.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/HBaseScannerBenchmark.java
@@ -109,8 +109,7 @@ public class HBaseScannerBenchmark {
private void testScanRaw(String msg) throws IOException {
long t = System.currentTimeMillis();
- IGTScanner scan = simpleStore.scan(new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null)
- .setFilterPushDown(null).createGTScanRequest());
+ IGTScanner scan = simpleStore.scan(new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null).setFilterPushDown(null).createGTScanRequest());
ResultScanner innerScanner = ((SimpleHBaseStore.Reader) scan).getHBaseScanner();
int count = 0;
for (Result r : innerScanner) {
@@ -126,8 +125,7 @@ public class HBaseScannerBenchmark {
private void testScanRecords(String msg) throws IOException {
long t = System.currentTimeMillis();
- IGTScanner scan = simpleStore.scan(new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null)
- .setFilterPushDown(null).createGTScanRequest());
+ IGTScanner scan = simpleStore.scan(new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null).setFilterPushDown(null).createGTScanRequest());
int count = 0;
for (GTRecord rec : scan) {
count++;
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/SimpleHBaseStore.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/SimpleHBaseStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/SimpleHBaseStore.java
index c38e8c8..b12173d 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/SimpleHBaseStore.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/SimpleHBaseStore.java
@@ -170,16 +170,12 @@ public class SimpleHBaseStore implements IGTStore {
private void loadRecord(Result r) {
Cell[] cells = r.rawCells();
Cell cell = cells[0];
- if (Bytes.compareTo(CF_B, 0, CF_B.length, cell.getFamilyArray(), cell.getFamilyOffset(),
- cell.getFamilyLength()) != 0 //
- || Bytes.compareTo(COL_B, 0, COL_B.length, cell.getQualifierArray(),
- cell.getQualifierOffset(), cell.getQualifierLength()) != 0)
+ if (Bytes.compareTo(CF_B, 0, CF_B.length, cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()) != 0 //
+ || Bytes.compareTo(COL_B, 0, COL_B.length, cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()) != 0)
throw new IllegalStateException();
- rec.loadCellBlock(0, ByteBuffer.wrap(cell.getRowArray(), cell.getRowOffset() + ID_LEN,
- cell.getRowLength() - ID_LEN));
- rec.loadCellBlock(1,
- ByteBuffer.wrap(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
+ rec.loadCellBlock(0, ByteBuffer.wrap(cell.getRowArray(), cell.getRowOffset() + ID_LEN, cell.getRowLength() - ID_LEN));
+ rec.loadCellBlock(1, ByteBuffer.wrap(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
}
@Override
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index 8586fac..e822ada 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -88,18 +88,14 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
if (shardNum == totalShards) {
//all shards
- return Lists.newArrayList(
- Pair.newPair(getByteArrayForShort((short) 0), getByteArrayForShort((short) (shardNum - 1))));
+ return Lists.newArrayList(Pair.newPair(getByteArrayForShort((short) 0), getByteArrayForShort((short) (shardNum - 1))));
} else if (baseShard + shardNum <= totalShards) {
//endpoint end key is inclusive, so no need to append 0 or anything
- return Lists.newArrayList(Pair.newPair(getByteArrayForShort(baseShard),
- getByteArrayForShort((short) (baseShard + shardNum - 1))));
+ return Lists.newArrayList(Pair.newPair(getByteArrayForShort(baseShard), getByteArrayForShort((short) (baseShard + shardNum - 1))));
} else {
//0,1,2,3,4 wants 4,0
- return Lists.newArrayList(
- Pair.newPair(getByteArrayForShort(baseShard), getByteArrayForShort((short) (totalShards - 1))), //
- Pair.newPair(getByteArrayForShort((short) 0),
- getByteArrayForShort((short) (baseShard + shardNum - totalShards - 1))));
+ return Lists.newArrayList(Pair.newPair(getByteArrayForShort(baseShard), getByteArrayForShort((short) (totalShards - 1))), //
+ Pair.newPair(getByteArrayForShort((short) 0), getByteArrayForShort((short) (baseShard + shardNum - totalShards - 1))));
}
}
@@ -141,18 +137,14 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
final ExpectedSizeIterator epResultItr = new ExpectedSizeIterator(shardNum, coprocessorTimeout);
- logger.info("Serialized scanRequestBytes {} bytes, rawScanBytesString {} bytes", scanRequestByteString.size(),
- rawScanByteString.size());
+ logger.info("Serialized scanRequestBytes {} bytes, rawScanBytesString {} bytes", scanRequestByteString.size(), rawScanByteString.size());
- logger.info(
- "The scan {} for segment {} is as below with {} separate raw scans, shard part of start/end key is set to 0",
- Integer.toHexString(System.identityHashCode(scanRequest)), cubeSeg, rawScans.size());
+ logger.info("The scan {} for segment {} is as below with {} separate raw scans, shard part of start/end key is set to 0", Integer.toHexString(System.identityHashCode(scanRequest)), cubeSeg, rawScans.size());
for (RawScan rs : rawScans) {
logScan(rs, cubeSeg.getStorageLocationIdentifier());
}
- logger.debug("Submitting rpc to {} shards starting from shard {}, scan range count {}", shardNum,
- cuboidBaseShard, rawScans.size());
+ logger.debug("Submitting rpc to {} shards starting from shard {}, scan range count {}", shardNum, cuboidBaseShard, rawScans.size());
// KylinConfig: use env instance instead of CubeSegment, because KylinConfig will share among queries
// for different cubes until redeployment of coprocessor jar.
@@ -177,13 +169,11 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
@Override
public void run() {
- final String logHeader = String.format("<sub-thread for Query %s GTScanRequest %s>", queryId,
- Integer.toHexString(System.identityHashCode(scanRequest)));
+ final String logHeader = String.format("<sub-thread for Query %s GTScanRequest %s>", queryId, Integer.toHexString(System.identityHashCode(scanRequest)));
final AtomicReference<RuntimeException> regionErrorHolder = new AtomicReference<>();
try {
- Table table = conn.getTable(TableName.valueOf(cubeSeg.getStorageLocationIdentifier()),
- HBaseConnection.getCoprocessorPool());
+ Table table = conn.getTable(TableName.valueOf(cubeSeg.getStorageLocationIdentifier()), HBaseConnection.getCoprocessorPool());
final CubeVisitRequest request = builder.build();
final byte[] startKey = epRange.getFirst();
@@ -225,20 +215,15 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
return;
}
- if (queryContext.getScannedBytes() > cubeSeg.getConfig()
- .getQueryMaxScanBytes()) {
- throw new ResourceLimitExceededException("Query scanned "
- + queryContext.getScannedBytes() + " bytes exceeds threshold "
- + cubeSeg.getConfig().getQueryMaxScanBytes());
+ if (queryContext.getScannedBytes() > cubeSeg.getConfig().getQueryMaxScanBytes()) {
+ throw new ResourceLimitExceededException("Query scanned " + queryContext.getScannedBytes() + " bytes exceeds threshold " + cubeSeg.getConfig().getQueryMaxScanBytes());
}
try {
if (compressionResult) {
- epResultItr.append(CompressionUtils.decompress(HBaseZeroCopyByteString
- .zeroCopyGetBytes(result.getCompressedRows())));
+ epResultItr.append(CompressionUtils.decompress(HBaseZeroCopyByteString.zeroCopyGetBytes(result.getCompressedRows())));
} else {
- epResultItr.append(HBaseZeroCopyByteString
- .zeroCopyGetBytes(result.getCompressedRows()));
+ epResultItr.append(HBaseZeroCopyByteString.zeroCopyGetBytes(result.getCompressedRows()));
}
} catch (IOException | DataFormatException e) {
throw new RuntimeException(logHeader + "Error when decompressing", e);
@@ -293,8 +278,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
RawScan.serializer.serialize(rs, rawScanBuffer);
}
rawScanBuffer.flip();
- rawScanByteString = HBaseZeroCopyByteString.wrap(rawScanBuffer.array(), rawScanBuffer.position(),
- rawScanBuffer.limit());
+ rawScanByteString = HBaseZeroCopyByteString.wrap(rawScanBuffer.array(), rawScanBuffer.position(), rawScanBuffer.limit());
break;
} catch (BufferOverflowException boe) {
logger.info("Buffer size {} cannot hold the raw scans, resizing to 4 times", rawScanBufferSize);
@@ -309,16 +293,12 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
Stats stats = result.getStats();
byte[] compressedRows = HBaseZeroCopyByteString.zeroCopyGetBytes(result.getCompressedRows());
- sb.append("Endpoint RPC returned from HTable ").append(cubeSeg.getStorageLocationIdentifier()).append(" Shard ")
- .append(BytesUtil.toHex(region)).append(" on host: ").append(stats.getHostname()).append(".");
+ sb.append("Endpoint RPC returned from HTable ").append(cubeSeg.getStorageLocationIdentifier()).append(" Shard ").append(BytesUtil.toHex(region)).append(" on host: ").append(stats.getHostname()).append(".");
sb.append("Total scanned row: ").append(stats.getScannedRowCount()).append(". ");
sb.append("Total scanned bytes: ").append(stats.getScannedBytes()).append(". ");
sb.append("Total filtered/aggred row: ").append(stats.getAggregatedRowCount()).append(". ");
- sb.append("Time elapsed in EP: ").append(stats.getServiceEndTime() - stats.getServiceStartTime())
- .append("(ms). ");
- sb.append("Server CPU usage: ").append(stats.getSystemCpuLoad()).append(", server physical mem left: ")
- .append(stats.getFreePhysicalMemorySize()).append(", server swap mem left:")
- .append(stats.getFreeSwapSpaceSize()).append(".");
+ sb.append("Time elapsed in EP: ").append(stats.getServiceEndTime() - stats.getServiceStartTime()).append("(ms). ");
+ sb.append("Server CPU usage: ").append(stats.getSystemCpuLoad()).append(", server physical mem left: ").append(stats.getFreePhysicalMemorySize()).append(", server swap mem left:").append(stats.getFreeSwapSpaceSize()).append(".");
sb.append("Etc message: ").append(stats.getEtcMsg()).append(".");
sb.append("Normal Complete: ").append(stats.getNormalComplete() == 1).append(".");
sb.append("Compressed row size: ").append(compressedRows.length);
@@ -328,8 +308,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
private RuntimeException getCoprocessorException(CubeVisitResponse response) {
if (!response.hasErrorInfo()) {
- return new RuntimeException(
- "Coprocessor aborts due to scan timeout or other reasons, please re-deploy coprocessor to see concrete error message");
+ return new RuntimeException("Coprocessor aborts due to scan timeout or other reasons, please re-deploy coprocessor to see concrete error message");
}
CubeVisitResponse.ErrorInfo errorInfo = response.getErrorInfo();
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
index 97d2373..db81646 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
@@ -72,7 +72,7 @@ public abstract class CubeHBaseRPC implements IGTStorage {
public CubeHBaseRPC(ISegment segment, Cuboid cuboid, GTInfo fullGTInfo, StorageContext context) {
Preconditions.checkArgument(segment instanceof CubeSegment, "segment must be CubeSegment");
-
+
this.cubeSeg = (CubeSegment) segment;
this.cuboid = cuboid;
this.fullGTInfo = fullGTInfo;
@@ -106,8 +106,7 @@ public abstract class CubeHBaseRPC implements IGTStorage {
return scan;
}
- private RawScan preparedHBaseScan(GTRecord pkStart, GTRecord pkEnd, List<GTRecord> fuzzyKeys,
- ImmutableBitSet selectedColBlocks) {
+ private RawScan preparedHBaseScan(GTRecord pkStart, GTRecord pkEnd, List<GTRecord> fuzzyKeys, ImmutableBitSet selectedColBlocks) {
final List<Pair<byte[], byte[]>> selectedColumns = makeHBaseColumns(selectedColBlocks);
LazyRowKeyEncoder encoder = new LazyRowKeyEncoder(cubeSeg, cuboid);
@@ -247,12 +246,10 @@ public abstract class CubeHBaseRPC implements IGTStorage {
}
}
- private static List<org.apache.hadoop.hbase.util.Pair<byte[], byte[]>> convertToHBasePair(
- List<org.apache.kylin.common.util.Pair<byte[], byte[]>> pairList) {
+ private static List<org.apache.hadoop.hbase.util.Pair<byte[], byte[]>> convertToHBasePair(List<org.apache.kylin.common.util.Pair<byte[], byte[]>> pairList) {
List<org.apache.hadoop.hbase.util.Pair<byte[], byte[]>> result = Lists.newArrayList();
for (org.apache.kylin.common.util.Pair<byte[], byte[]> pair : pairList) {
- org.apache.hadoop.hbase.util.Pair<byte[], byte[]> element = new org.apache.hadoop.hbase.util.Pair<byte[], byte[]>(
- pair.getFirst(), pair.getSecond());
+ org.apache.hadoop.hbase.util.Pair<byte[], byte[]> element = new org.apache.hadoop.hbase.util.Pair<byte[], byte[]>(pair.getFirst(), pair.getSecond());
result.add(element);
}
@@ -295,24 +292,23 @@ public abstract class CubeHBaseRPC implements IGTStorage {
} else {
coopTimeout = cubeSeg.getConfig().getQueryCoprocessorTimeoutSeconds() * 1000;
}
-
+
int rpcTimeout;
Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration();
rpcTimeout = hconf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
-
+
// HBase rpc timeout must be longer than coprocessor timeout
if ((int) (coopTimeout * 1.1) > rpcTimeout) {
rpcTimeout = (int) (coopTimeout * 1.1);
hconf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, rpcTimeout);
}
-
+
// coprocessor timeout is 0 by default
if (coopTimeout <= 0) {
coopTimeout = (int) (rpcTimeout * 0.9);
}
-
- logger.debug("{} = {} ms, use {} ms as timeout for coprocessor", HConstants.HBASE_RPC_TIMEOUT_KEY, rpcTimeout,
- coopTimeout);
+
+ logger.debug("{} = {} ms, use {} ms as timeout for coprocessor", HConstants.HBASE_RPC_TIMEOUT_KEY, rpcTimeout, coopTimeout);
return coopTimeout;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
index f258efb..951e2ef 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
@@ -106,8 +106,7 @@ public class CubeHBaseScanRPC extends CubeHBaseRPC {
} else {
List<byte[]> ret = Lists.newArrayList();
for (short i = 0; i < cuboidShardNum; ++i) {
- short shard = ShardingHash.normalize(cubeSeg.getCuboidBaseShard(cuboid.getId()), i,
- cubeSeg.getTotalShards(cuboid.getId()));
+ short shard = ShardingHash.normalize(cubeSeg.getCuboidBaseShard(cuboid.getId()), i, cubeSeg.getTotalShards(cuboid.getId()));
byte[] cookedKey = Arrays.copyOf(halfCookedKey, halfCookedKey.length);
BytesUtil.writeShort(shard, cookedKey, 0, RowConstants.ROWKEY_SHARDID_LEN);
ret.add(cookedKey);
@@ -194,13 +193,11 @@ public class CubeHBaseScanRPC extends CubeHBaseRPC {
}
};
- IGTStore store = new HBaseReadonlyStore(cellListIterator, scanRequest, rawScans.get(0).hbaseColumns,
- hbaseColumnsToGT, cubeSeg.getRowKeyPreambleSize(), false);
+ IGTStore store = new HBaseReadonlyStore(cellListIterator, scanRequest, rawScans.get(0).hbaseColumns, hbaseColumnsToGT, cubeSeg.getRowKeyPreambleSize(), false);
IGTScanner rawScanner = store.scan(scanRequest);
final IGTScanner decorateScanner = scanRequest.decorateScanner(rawScanner);
- final TrimmedInfoGTRecordAdapter trimmedInfoGTRecordAdapter = new TrimmedInfoGTRecordAdapter(fullGTInfo,
- decorateScanner.iterator());
+ final TrimmedInfoGTRecordAdapter trimmedInfoGTRecordAdapter = new TrimmedInfoGTRecordAdapter(fullGTInfo, decorateScanner.iterator());
return new IGTScanner() {
@Override
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java
index adc210e..59fe9e0 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java
@@ -68,11 +68,8 @@ class ExpectedSizeIterator implements Iterator<byte[]> {
}
if (ret == null) {
- throw new RuntimeException(
- "Timeout visiting cube! Check why coprocessor exception is not sent back? In coprocessor Self-termination is checked every "
- + //
- GTScanRequest.terminateCheckInterval + " scanned rows, the configured timeout("
- + coprocessorTimeout + ") cannot support this many scans?");
+ throw new RuntimeException("Timeout visiting cube! Check why coprocessor exception is not sent back? In coprocessor Self-termination is checked every " + //
+ GTScanRequest.terminateCheckInterval + " scanned rows, the configured timeout(" + coprocessorTimeout + ") cannot support this many scans?");
}
return ret;
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java
index 2e1bac4..631e8e8 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java
@@ -45,12 +45,11 @@ public class HBaseReadonlyStore implements IGTStore {
private int rowkeyPreambleSize;
private boolean withDelay = false;
+
/**
* @param withDelay is for test use
*/
- public HBaseReadonlyStore(CellListIterator cellListIterator, GTScanRequest gtScanRequest,
- List<Pair<byte[], byte[]>> hbaseColumns, List<List<Integer>> hbaseColumnsToGT, int rowkeyPreambleSize,
- boolean withDelay) {
+ public HBaseReadonlyStore(CellListIterator cellListIterator, GTScanRequest gtScanRequest, List<Pair<byte[], byte[]>> hbaseColumns, List<List<Integer>> hbaseColumnsToGT, int rowkeyPreambleSize, boolean withDelay) {
this.cellListIterator = cellListIterator;
this.info = gtScanRequest.getInfo();
this.hbaseColumns = hbaseColumns;
@@ -77,10 +76,8 @@ public class HBaseReadonlyStore implements IGTStore {
//TODO: possible to use binary search as cells might be sorted?
public static Cell findCell(List<Cell> cells, byte[] familyName, byte[] columnName) {
for (Cell c : cells) {
- if (BytesUtil.compareBytes(familyName, 0, c.getFamilyArray(), c.getFamilyOffset(), familyName.length) == 0
- && //
- BytesUtil.compareBytes(columnName, 0, c.getQualifierArray(), c.getQualifierOffset(),
- columnName.length) == 0) {
+ if (BytesUtil.compareBytes(familyName, 0, c.getFamilyArray(), c.getFamilyOffset(), familyName.length) == 0 && //
+ BytesUtil.compareBytes(columnName, 0, c.getQualifierArray(), c.getQualifierOffset(), columnName.length) == 0) {
return c;
}
}
@@ -124,9 +121,7 @@ public class HBaseReadonlyStore implements IGTStore {
// dimensions, set to primary key, also the 0th column block
Cell firstCell = oneRow.get(0);
- ByteBuffer buf = byteBuffer(firstCell.getRowArray(),
- rowkeyPreambleSize + firstCell.getRowOffset(),
- firstCell.getRowLength() - rowkeyPreambleSize);
+ ByteBuffer buf = byteBuffer(firstCell.getRowArray(), rowkeyPreambleSize + firstCell.getRowOffset(), firstCell.getRowLength() - rowkeyPreambleSize);
oneRecord.loadCellBlock(0, buf);
// metrics
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
index bfe4f44..cde127e 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
@@ -149,8 +149,8 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
private long rowCount;
private long rowBytes;
- ResourceTrackingCellListIterator(Iterator<List<Cell>> delegate, long rowCountLimit, long bytesLimit,
- long timeout) {
+ ResourceTrackingCellListIterator(Iterator<List<Cell>> delegate,
+ long rowCountLimit, long bytesLimit, long timeout) {
this.delegate = delegate;
this.rowCountLimit = rowCountLimit;
this.bytesLimit = bytesLimit;
@@ -164,8 +164,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
throw new ResourceLimitExceededException("scanned row count exceeds threshold " + rowCountLimit);
}
if (rowBytes > bytesLimit) {
- throw new ResourceLimitExceededException(
- "scanned bytes " + rowBytes + " exceeds threshold " + bytesLimit);
+ throw new ResourceLimitExceededException("scanned bytes " + rowBytes + " exceeds threshold " + bytesLimit);
}
if ((rowCount % GTScanRequest.terminateCheckInterval == 1) && System.currentTimeMillis() > deadline) {
throw new KylinTimeoutException("coprocessor timeout after " + timeout + " ms");
@@ -196,8 +195,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
if (shardLength == 0) {
return;
}
- byte[] regionStartKey = ArrayUtils.isEmpty(region.getRegionInfo().getStartKey()) ? new byte[shardLength]
- : region.getRegionInfo().getStartKey();
+ byte[] regionStartKey = ArrayUtils.isEmpty(region.getRegionInfo().getStartKey()) ? new byte[shardLength] : region.getRegionInfo().getStartKey();
Bytes.putBytes(rawScan.startKey, 0, regionStartKey, 0, shardLength);
Bytes.putBytes(rawScan.endKey, 0, regionStartKey, 0, shardLength);
}
@@ -222,8 +220,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
@SuppressWarnings("checkstyle:methodlength")
@Override
- public void visitCube(final RpcController controller, final CubeVisitProtos.CubeVisitRequest request,
- RpcCallback<CubeVisitProtos.CubeVisitResponse> done) {
+ public void visitCube(final RpcController controller, final CubeVisitProtos.CubeVisitRequest request, RpcCallback<CubeVisitProtos.CubeVisitResponse> done) {
List<RegionScanner> regionScanners = Lists.newArrayList();
HRegion region = null;
@@ -237,7 +234,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
try (SetThreadName ignored = new SetThreadName("Query %s", queryId)) {
this.serviceStartTime = System.currentTimeMillis();
- region = (HRegion) env.getRegion();
+ region = (HRegion)env.getRegion();
region.startRegionOperation();
// if user change kylin.properties on kylin server, need to manually redeploy coprocessor jar to update KylinConfig of Env.
@@ -247,15 +244,13 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
debugGitTag = region.getTableDesc().getValue(IRealizationConstants.HTableGitTag);
- final GTScanRequest scanReq = GTScanRequest.serializer
- .deserialize(ByteBuffer.wrap(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getGtScanRequest())));
+ final GTScanRequest scanReq = GTScanRequest.serializer.deserialize(ByteBuffer.wrap(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getGtScanRequest())));
List<List<Integer>> hbaseColumnsToGT = Lists.newArrayList();
for (IntList intList : request.getHbaseColumnsToGTList()) {
hbaseColumnsToGT.add(intList.getIntsList());
}
StorageSideBehavior behavior = StorageSideBehavior.valueOf(scanReq.getStorageBehavior());
- final List<RawScan> hbaseRawScans = deserializeRawScans(
- ByteBuffer.wrap(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getHbaseRawScan())));
+ final List<RawScan> hbaseRawScans = deserializeRawScans(ByteBuffer.wrap(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getHbaseRawScan())));
appendProfileInfo(sb, "start latency: " + (this.serviceStartTime - scanReq.getStartTime()));
@@ -264,8 +259,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
for (RawScan hbaseRawScan : hbaseRawScans) {
if (request.getRowkeyPreambleSize() - RowConstants.ROWKEY_CUBOIDID_LEN > 0) {
//if has shard, fill region shard to raw scan start/end
- updateRawScanByCurrentRegion(hbaseRawScan, region,
- request.getRowkeyPreambleSize() - RowConstants.ROWKEY_CUBOIDID_LEN);
+ updateRawScanByCurrentRegion(hbaseRawScan, region, request.getRowkeyPreambleSize() - RowConstants.ROWKEY_CUBOIDID_LEN);
}
Scan scan = CubeHBaseRPC.buildScan(hbaseRawScan);
@@ -296,17 +290,16 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
final long storagePushDownLimit = scanReq.getStoragePushDownLimit();
- ResourceTrackingCellListIterator cellListIterator = new ResourceTrackingCellListIterator(allCellLists,
+ ResourceTrackingCellListIterator cellListIterator = new ResourceTrackingCellListIterator(
+ allCellLists,
scanReq.getStorageScanRowNumThreshold(), // for old client (scan threshold)
!request.hasMaxScanBytes() ? Long.MAX_VALUE : request.getMaxScanBytes(), // for new client
scanReq.getTimeout());
- IGTStore store = new HBaseReadonlyStore(cellListIterator, scanReq, hbaseRawScans.get(0).hbaseColumns,
- hbaseColumnsToGT, request.getRowkeyPreambleSize(), behavior.delayToggledOn());
+ IGTStore store = new HBaseReadonlyStore(cellListIterator, scanReq, hbaseRawScans.get(0).hbaseColumns, hbaseColumnsToGT, request.getRowkeyPreambleSize(), behavior.delayToggledOn());
IGTScanner rawScanner = store.scan(scanReq);
- IGTScanner finalScanner = scanReq.decorateScanner(rawScanner, behavior.filterToggledOn(),
- behavior.aggrToggledOn(), false, request.getSpillEnabled());
+ IGTScanner finalScanner = scanReq.decorateScanner(rawScanner, behavior.filterToggledOn(), behavior.aggrToggledOn(), false, request.getSpillEnabled());
ByteBuffer buffer = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
@@ -337,20 +330,22 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
} catch (KylinTimeoutException e) {
logger.info("Abort scan: {}", e.getMessage());
errorInfo = CubeVisitProtos.CubeVisitResponse.ErrorInfo.newBuilder()
- .setType(CubeVisitProtos.CubeVisitResponse.ErrorType.TIMEOUT).setMessage(e.getMessage())
+ .setType(CubeVisitProtos.CubeVisitResponse.ErrorType.TIMEOUT)
+ .setMessage(e.getMessage())
.build();
} catch (ResourceLimitExceededException e) {
logger.info("Abort scan: {}", e.getMessage());
errorInfo = CubeVisitProtos.CubeVisitResponse.ErrorInfo.newBuilder()
.setType(CubeVisitProtos.CubeVisitResponse.ErrorType.RESOURCE_LIMIT_EXCEEDED)
- .setMessage(e.getMessage()).build();
+ .setMessage(e.getMessage())
+ .build();
} finally {
finalScanner.close();
}
appendProfileInfo(sb, "agg done");
- logger.info("Total scanned {} rows and {} bytes", cellListIterator.getTotalScannedRowCount(),
- cellListIterator.getTotalScannedRowBytes());
+ logger.info("Total scanned {} rows and {} bytes",
+ cellListIterator.getTotalScannedRowCount(), cellListIterator.getTotalScannedRowBytes());
//outputStream.close() is not necessary
byte[] compressedAllRows;
@@ -368,8 +363,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
appendProfileInfo(sb, "compress done");
logger.info("Size of final result = {} ({} before compressing)", compressedAllRows.length, allRows.length);
- OperatingSystemMXBean operatingSystemMXBean = (OperatingSystemMXBean) ManagementFactory
- .getOperatingSystemMXBean();
+ OperatingSystemMXBean operatingSystemMXBean = (OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean();
double systemCpuLoad = operatingSystemMXBean.getSystemCpuLoad();
double freePhysicalMemorySize = operatingSystemMXBean.getFreePhysicalMemorySize();
double freeSwapSpaceSize = operatingSystemMXBean.getFreeSwapSpaceSize();
@@ -383,15 +377,18 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
}
done.run(responseBuilder.//
setCompressedRows(HBaseZeroCopyByteString.wrap(compressedAllRows)).//too many array copies
- setStats(CubeVisitProtos.CubeVisitResponse.Stats.newBuilder()
- .setAggregatedRowCount(cellListIterator.getTotalScannedRowCount() - finalRowCount)
- .setScannedRowCount(cellListIterator.getTotalScannedRowCount())
- .setScannedBytes(cellListIterator.getTotalScannedRowBytes())
- .setServiceStartTime(serviceStartTime).setServiceEndTime(System.currentTimeMillis())
- .setSystemCpuLoad(systemCpuLoad).setFreePhysicalMemorySize(freePhysicalMemorySize)
- .setFreeSwapSpaceSize(freeSwapSpaceSize)
- .setHostname(InetAddress.getLocalHost().getHostName()).setEtcMsg(sb.toString())
- .setNormalComplete(errorInfo == null ? 1 : 0).build())
+ setStats(CubeVisitProtos.CubeVisitResponse.Stats.newBuilder().
+ setAggregatedRowCount(cellListIterator.getTotalScannedRowCount() - finalRowCount).
+ setScannedRowCount(cellListIterator.getTotalScannedRowCount()).
+ setScannedBytes(cellListIterator.getTotalScannedRowBytes()).
+ setServiceStartTime(serviceStartTime).
+ setServiceEndTime(System.currentTimeMillis()).
+ setSystemCpuLoad(systemCpuLoad).
+ setFreePhysicalMemorySize(freePhysicalMemorySize).
+ setFreeSwapSpaceSize(freeSwapSpaceSize).
+ setHostname(InetAddress.getLocalHost().getHostName()).
+ setEtcMsg(sb.toString()).
+ setNormalComplete(errorInfo == null ? 1 : 0).build())
.build());
} catch (IOException ioe) {