You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2016/04/08 08:26:20 UTC
kylin git commit: revise log in endpoint rpc client
Repository: kylin
Updated Branches:
refs/heads/master 8992b5669 -> f6cde56f0
revise log in endpoint rpc client
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/f6cde56f
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/f6cde56f
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/f6cde56f
Branch: refs/heads/master
Commit: f6cde56f07786e86c2adf0f791c6a14b12da9870
Parents: 8992b56
Author: Hongbin Ma <ma...@apache.org>
Authored: Fri Apr 8 14:25:27 2016 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Fri Apr 8 14:25:27 2016 +0800
----------------------------------------------------------------------
.../storage/hbase/cube/v2/CubeHBaseEndpointRPC.java | 13 ++++---------
1 file changed, 4 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/f6cde56f/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index 31977ae..6bbb0b7 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -274,7 +274,6 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
hbaseColumnsToGTIntList.add(IntList.newBuilder().addAllInts(list).build());
}
- boolean scanLogged = false;
for (GTScanRequest req : scanRequests) {
ByteBuffer buffer = ByteBuffer.allocate(BytesSerializer.SERIALIZE_BUFFER_SIZE);
GTScanRequest.serializer.serialize(req, buffer);
@@ -290,18 +289,14 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
logger.debug("Serialized scanRequestBytes {} bytes, rawScanBytesString {} bytes", buffer.limit() - buffer.position(), rawScanBuffer.limit() - rawScanBuffer.position());
- if (!scanLogged) {
- logger.info("The scan(s) info for current segment is as below, shard part of start/end key is set to 0", cubeSeg);
- logScan(rawScan, cubeSeg.getStorageLocationIdentifier());
- scanLogged = true;
- }
+ logger.info("The scan {} for segment {} is as below, shard part of start/end key is set to 0", Integer.toHexString(System.identityHashCode(req)), cubeSeg);
+ logScan(rawScan, cubeSeg.getStorageLocationIdentifier());
}
logger.debug("Submitting rpc to {} shards starting from shard {}, scan requests count {}", new Object[] { shardNum, cuboidBaseShard, scanRequests.size() });
final AtomicInteger totalScannedCount = new AtomicInteger(0);
final ExpectedSizeIterator epResultItr = new ExpectedSizeIterator(scanRequests.size() * shardNum);
- final String currentThreadName = Thread.currentThread().getName();
for (final Pair<byte[], byte[]> epRange : getEPKeyRanges(cuboidBaseShard, shardNum, totalShards)) {
executorService.submit(new Runnable() {
@@ -320,12 +315,12 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
try {
results = getResults(builder.build(), conn.getTable(cubeSeg.getStorageLocationIdentifier()), epRange.getFirst(), epRange.getSecond());
} catch (Throwable throwable) {
- throw new RuntimeException("Error when visiting cubes by endpoint:", throwable);
+ throw new RuntimeException("<sub-thread for GTScanRequest " + Integer.toHexString(System.identityHashCode(scanRequests.get(i))) + "> " + "Error when visiting cubes by endpoint", throwable);
}
for (Map.Entry<byte[], CubeVisitProtos.CubeVisitResponse> result : results.entrySet()) {
totalScannedCount.addAndGet(result.getValue().getStats().getScannedRowCount());
- logger.info("<spawned by " + currentThreadName + "> " + getStatsString(result));
+ logger.info("<sub-thread for GTScanRequest " + Integer.toHexString(System.identityHashCode(scanRequests.get(i))) + "> " + getStatsString(result));
try {
epResultItr.append(CompressionUtils.decompress(HBaseZeroCopyByteString.zeroCopyGetBytes(result.getValue().getCompressedRows())));
} catch (IOException | DataFormatException e) {