You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2016/04/21 12:19:46 UTC
[1/2] kylin git commit: refine coprocessor suicide
Repository: kylin
Updated Branches:
refs/heads/master c40e2bf41 -> abe590a20
refine coprocessor suicide
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/37d63f41
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/37d63f41
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/37d63f41
Branch: refs/heads/master
Commit: 37d63f41a8496f7ea93adaac798f7a09b95c8127
Parents: c40e2bf
Author: Hongbin Ma <ma...@apache.org>
Authored: Thu Apr 21 15:58:01 2016 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Thu Apr 21 18:12:30 2016 +0800
----------------------------------------------------------------------
.../hbase/cube/v2/CubeHBaseEndpointRPC.java | 9 ++---
.../coprocessor/endpoint/CubeVisitService.java | 37 ++++++++++++++------
2 files changed, 32 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/37d63f41/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index d86b37f..88c17ae 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -124,7 +124,9 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
this.queue = new ArrayBlockingQueue<byte[]>(expectedSize);
this.timeout = HadoopUtil.getCurrentConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
this.timeout *= KylinConfig.getInstanceFromEnv().getCubeVisitTimeoutTimes();
- logger.info("Timeout for ExpectedSizeIterator is " + this.timeout);
+
+ this.timeout *= 1.1;//allow for some delay
+ logger.info("Timeout for ExpectedSizeIterator is: " + this.timeout);
}
@Override
@@ -317,7 +319,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
logger.debug("Serialized scanRequestBytes {} bytes, rawScanBytesString {} bytes", scanRequestByteString.size(), rawScanByteString.size());
- logger.info("The scan {} for segment {} is as below with {} separate raw scans, shard part of start/end key is set to 0", Integer.toHexString(System.identityHashCode(scanRequest)), cubeSeg,rawScans.size());
+ logger.info("The scan {} for segment {} is as below with {} separate raw scans, shard part of start/end key is set to 0", Integer.toHexString(System.identityHashCode(scanRequest)), cubeSeg, rawScans.size());
for (RawScan rs : rawScans) {
logScan(rs, cubeSeg.getStorageLocationIdentifier());
}
@@ -358,8 +360,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
if (result.getValue().getStats().getNormalComplete() != 1) {
abnormalFinish = true;
- }
- else {
+ } else {
try {
epResultItr.append(CompressionUtils.decompress(HBaseZeroCopyByteString.zeroCopyGetBytes(result.getValue().getCompressedRows())));
} catch (IOException | DataFormatException e) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/37d63f41/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
index 585908b..0cd35f1 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
@@ -162,7 +162,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
}
@Override
- public void visitCube(RpcController controller, CubeVisitProtos.CubeVisitRequest request, RpcCallback<CubeVisitProtos.CubeVisitResponse> done) {
+ public void visitCube(final RpcController controller, CubeVisitProtos.CubeVisitRequest request, RpcCallback<CubeVisitProtos.CubeVisitResponse> done) {
List<RegionScanner> regionScanners = Lists.newArrayList();
HRegion region = null;
@@ -178,7 +178,6 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
region.startRegionOperation();
debugGitTag = region.getTableDesc().getValue(IRealizationConstants.HTableGitTag);
-
final GTScanRequest scanReq = GTScanRequest.serializer.deserialize(ByteBuffer.wrap(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getGtScanRequest())));
List<List<Integer>> hbaseColumnsToGT = Lists.newArrayList();
for (IntList intList : request.getHbaseColumnsToGTList()) {
@@ -230,9 +229,9 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
scanReq.setAggrCacheGB(0); // disable mem check if so told
}
- final MutableBoolean normalComplete = new MutableBoolean(true);
- final long startTime = this.serviceStartTime;//request.getStartTime();
- final long timeout = (long) (request.getTimeout() * 0.95);
+ final MutableBoolean scanNormalComplete = new MutableBoolean(true);
+ final long startTime = this.serviceStartTime;
+ final long timeout = request.getTimeout();
final CellListIterator cellListIterator = new CellListIterator() {
@@ -247,12 +246,18 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
@Override
public boolean hasNext() {
- if (counter++ % 1000 == 1) {
+ if (counter % 1000 == 1) {
if (System.currentTimeMillis() - startTime > timeout) {
- normalComplete.setValue(false);
+ scanNormalComplete.setValue(false);
+ logger.error("scanner aborted because timeout");
return false;
}
}
+
+ if (counter % 100000 == 1) {
+ logger.info("Scanned " + counter + " rows.");
+ }
+ counter++;
return allCellLists.hasNext();
}
@@ -279,6 +284,19 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
ByteArrayOutputStream outputStream = new ByteArrayOutputStream(RowConstants.ROWVALUE_BUFFER_SIZE);//ByteArrayOutputStream will auto grow
int finalRowCount = 0;
for (GTRecord oneRecord : finalScanner) {
+
+ if (!scanNormalComplete.booleanValue()) {
+ logger.error("aggregate iterator aborted because input iterator aborts");
+ break;
+ }
+
+ if (finalRowCount % 1000 == 1) {
+ if (System.currentTimeMillis() - startTime > timeout) {
+ logger.error("aggregate iterator aborted because timeout");
+ break;
+ }
+ }
+
buffer.clear();
oneRecord.exportColumns(scanReq.getColumns(), buffer);
buffer.flip();
@@ -292,7 +310,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
//outputStream.close() is not necessary
byte[] compressedAllRows;
- if (normalComplete.booleanValue()) {
+ if (scanNormalComplete.booleanValue()) {
allRows = outputStream.toByteArray();
} else {
allRows = new byte[0];
@@ -309,7 +327,6 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
appendProfileInfo(sb, "server stats done");
sb.append(" debugGitTag:" + debugGitTag);
-
CubeVisitProtos.CubeVisitResponse.Builder responseBuilder = CubeVisitProtos.CubeVisitResponse.newBuilder();
done.run(responseBuilder.//
setCompressedRows(HBaseZeroCopyByteString.wrap(compressedAllRows)).//too many array copies
@@ -323,7 +340,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
setFreeSwapSpaceSize(freeSwapSpaceSize).//
setHostname(InetAddress.getLocalHost().getHostName()).//
setEtcMsg(sb.toString()).//
- setNormalComplete(normalComplete.booleanValue() ? 1 : 0).build())
+ setNormalComplete(scanNormalComplete.booleanValue() ? 1 : 0).build())
.//
build());
[2/2] kylin git commit: KYLIN-1601 Need not to shrink scan cache when
hbase rows can be large
Posted by ma...@apache.org.
KYLIN-1601 Need not to shrink scan cache when hbase rows can be large
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/abe590a2
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/abe590a2
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/abe590a2
Branch: refs/heads/master
Commit: abe590a20e5f93d6eb5d598fe117195b1f4d7fbb
Parents: 37d63f4
Author: Hongbin Ma <ma...@apache.org>
Authored: Thu Apr 21 18:11:15 2016 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Thu Apr 21 18:19:30 2016 +0800
----------------------------------------------------------------------
.../org/apache/kylin/storage/hbase/HBaseResourceStore.java | 4 +++-
.../kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java | 6 +++---
.../org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java | 4 ++--
3 files changed, 8 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/abe590a2/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
index bb5382f..2262482 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
@@ -148,7 +148,9 @@ public class HBaseResourceStore extends ResourceStore {
private void tuneScanParameters(Scan scan) {
// divide by 10 as some resource like dictionary or snapshot can be very large
- scan.setCaching(kylinConfig.getHBaseScanCacheRows() / 10);
+ // scan.setCaching(kylinConfig.getHBaseScanCacheRows() / 10);
+ scan.setCaching(kylinConfig.getHBaseScanCacheRows());
+
scan.setMaxResultSize(kylinConfig.getHBaseScanMaxResultSize());
scan.setCacheBlocks(true);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/abe590a2/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java
index d7ea2a0..9eb05d2 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java
@@ -258,9 +258,9 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
scan.setCacheBlocks(true);
// cache less when there are memory hungry measures
- if (RowValueDecoder.hasMemHungryMeasures(rowValueDecoders)) {
- scan.setCaching(scan.getCaching() / 10);
- }
+// if (RowValueDecoder.hasMemHungryMeasures(rowValueDecoders)) {
+// scan.setCaching(scan.getCaching() / 10);
+// }
}
private void applyFuzzyFilter(Scan scan, HBaseKeyRange keyRange) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/abe590a2/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
index 49e8593..af5d4b7 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
@@ -113,8 +113,8 @@ public abstract class CubeHBaseRPC implements IGTStorage {
KylinConfig config = cubeSeg.getCubeDesc().getConfig();
int hbaseCaching = config.getHBaseScanCacheRows();
int hbaseMaxResultSize = config.getHBaseScanMaxResultSize();
- if (isMemoryHungry(selectedColBlocks))
- hbaseCaching /= 10;
+// if (isMemoryHungry(selectedColBlocks))
+// hbaseCaching /= 10;
return new RawScan(start, end, selectedColumns, hbaseFuzzyKeys, hbaseCaching, hbaseMaxResultSize);
}