You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2016/04/21 12:19:46 UTC

[1/2] kylin git commit: refine coprocessor suicide

Repository: kylin
Updated Branches:
  refs/heads/master c40e2bf41 -> abe590a20


refine coprocessor suicide


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/37d63f41
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/37d63f41
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/37d63f41

Branch: refs/heads/master
Commit: 37d63f41a8496f7ea93adaac798f7a09b95c8127
Parents: c40e2bf
Author: Hongbin Ma <ma...@apache.org>
Authored: Thu Apr 21 15:58:01 2016 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Thu Apr 21 18:12:30 2016 +0800

----------------------------------------------------------------------
 .../hbase/cube/v2/CubeHBaseEndpointRPC.java     |  9 ++---
 .../coprocessor/endpoint/CubeVisitService.java  | 37 ++++++++++++++------
 2 files changed, 32 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/37d63f41/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index d86b37f..88c17ae 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -124,7 +124,9 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
             this.queue = new ArrayBlockingQueue<byte[]>(expectedSize);
             this.timeout = HadoopUtil.getCurrentConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
             this.timeout *= KylinConfig.getInstanceFromEnv().getCubeVisitTimeoutTimes();
-            logger.info("Timeout for ExpectedSizeIterator is " + this.timeout);
+
+            this.timeout *= 1.1;//allow for some delay 
+            logger.info("Timeout for ExpectedSizeIterator is: " + this.timeout);
         }
 
         @Override
@@ -317,7 +319,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
 
         logger.debug("Serialized scanRequestBytes {} bytes, rawScanBytesString {} bytes", scanRequestByteString.size(), rawScanByteString.size());
 
-        logger.info("The scan {} for segment {} is as below with {} separate raw scans, shard part of start/end key is set to 0", Integer.toHexString(System.identityHashCode(scanRequest)), cubeSeg,rawScans.size());
+        logger.info("The scan {} for segment {} is as below with {} separate raw scans, shard part of start/end key is set to 0", Integer.toHexString(System.identityHashCode(scanRequest)), cubeSeg, rawScans.size());
         for (RawScan rs : rawScans) {
             logScan(rs, cubeSeg.getStorageLocationIdentifier());
         }
@@ -358,8 +360,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
 
                         if (result.getValue().getStats().getNormalComplete() != 1) {
                             abnormalFinish = true;
-                        }
-                        else {
+                        } else {
                             try {
                                 epResultItr.append(CompressionUtils.decompress(HBaseZeroCopyByteString.zeroCopyGetBytes(result.getValue().getCompressedRows())));
                             } catch (IOException | DataFormatException e) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/37d63f41/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
index 585908b..0cd35f1 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
@@ -162,7 +162,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
     }
 
     @Override
-    public void visitCube(RpcController controller, CubeVisitProtos.CubeVisitRequest request, RpcCallback<CubeVisitProtos.CubeVisitResponse> done) {
+    public void visitCube(final RpcController controller, CubeVisitProtos.CubeVisitRequest request, RpcCallback<CubeVisitProtos.CubeVisitResponse> done) {
 
         List<RegionScanner> regionScanners = Lists.newArrayList();
         HRegion region = null;
@@ -178,7 +178,6 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
             region.startRegionOperation();
             debugGitTag = region.getTableDesc().getValue(IRealizationConstants.HTableGitTag);
 
-
             final GTScanRequest scanReq = GTScanRequest.serializer.deserialize(ByteBuffer.wrap(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getGtScanRequest())));
             List<List<Integer>> hbaseColumnsToGT = Lists.newArrayList();
             for (IntList intList : request.getHbaseColumnsToGTList()) {
@@ -230,9 +229,9 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
                 scanReq.setAggrCacheGB(0); // disable mem check if so told
             }
 
-            final MutableBoolean normalComplete = new MutableBoolean(true);
-            final long startTime = this.serviceStartTime;//request.getStartTime();
-            final long timeout = (long) (request.getTimeout() * 0.95);
+            final MutableBoolean scanNormalComplete = new MutableBoolean(true);
+            final long startTime = this.serviceStartTime;
+            final long timeout = request.getTimeout();
 
             final CellListIterator cellListIterator = new CellListIterator() {
 
@@ -247,12 +246,18 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
 
                 @Override
                 public boolean hasNext() {
-                    if (counter++ % 1000 == 1) {
+                    if (counter % 1000 == 1) {
                         if (System.currentTimeMillis() - startTime > timeout) {
-                            normalComplete.setValue(false);
+                            scanNormalComplete.setValue(false);
+                            logger.error("scanner aborted because timeout");
                             return false;
                         }
                     }
+
+                    if (counter % 100000 == 1) {
+                        logger.info("Scanned " + counter + " rows.");
+                    }
+                    counter++;
                     return allCellLists.hasNext();
                 }
 
@@ -279,6 +284,19 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
             ByteArrayOutputStream outputStream = new ByteArrayOutputStream(RowConstants.ROWVALUE_BUFFER_SIZE);//ByteArrayOutputStream will auto grow
             int finalRowCount = 0;
             for (GTRecord oneRecord : finalScanner) {
+
+                if (!scanNormalComplete.booleanValue()) {
+                    logger.error("aggregate iterator aborted because input iterator aborts");
+                    break;
+                }
+
+                if (finalRowCount % 1000 == 1) {
+                    if (System.currentTimeMillis() - startTime > timeout) {
+                        logger.error("aggregate iterator aborted because timeout");
+                        break;
+                    }
+                }
+
                 buffer.clear();
                 oneRecord.exportColumns(scanReq.getColumns(), buffer);
                 buffer.flip();
@@ -292,7 +310,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
 
             //outputStream.close() is not necessary
             byte[] compressedAllRows;
-            if (normalComplete.booleanValue()) {
+            if (scanNormalComplete.booleanValue()) {
                 allRows = outputStream.toByteArray();
             } else {
                 allRows = new byte[0];
@@ -309,7 +327,6 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
             appendProfileInfo(sb, "server stats done");
             sb.append(" debugGitTag:" + debugGitTag);
 
-
             CubeVisitProtos.CubeVisitResponse.Builder responseBuilder = CubeVisitProtos.CubeVisitResponse.newBuilder();
             done.run(responseBuilder.//
                     setCompressedRows(HBaseZeroCopyByteString.wrap(compressedAllRows)).//too many array copies 
@@ -323,7 +340,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
                             setFreeSwapSpaceSize(freeSwapSpaceSize).//
                             setHostname(InetAddress.getLocalHost().getHostName()).// 
                             setEtcMsg(sb.toString()).//
-                            setNormalComplete(normalComplete.booleanValue() ? 1 : 0).build())
+                            setNormalComplete(scanNormalComplete.booleanValue() ? 1 : 0).build())
                     .//
                     build());
 


[2/2] kylin git commit: KYLIN-1601 Need not to shrink scan cache when hbase rows can be large

Posted by ma...@apache.org.
KYLIN-1601 Need not to shrink scan cache when hbase rows can be large


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/abe590a2
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/abe590a2
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/abe590a2

Branch: refs/heads/master
Commit: abe590a20e5f93d6eb5d598fe117195b1f4d7fbb
Parents: 37d63f4
Author: Hongbin Ma <ma...@apache.org>
Authored: Thu Apr 21 18:11:15 2016 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Thu Apr 21 18:19:30 2016 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/storage/hbase/HBaseResourceStore.java     | 4 +++-
 .../kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java  | 6 +++---
 .../org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java   | 4 ++--
 3 files changed, 8 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/abe590a2/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
index bb5382f..2262482 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
@@ -148,7 +148,9 @@ public class HBaseResourceStore extends ResourceStore {
 
     private void tuneScanParameters(Scan scan) {
         // divide by 10 as some resource like dictionary or snapshot can be very large
-        scan.setCaching(kylinConfig.getHBaseScanCacheRows() / 10);
+        // scan.setCaching(kylinConfig.getHBaseScanCacheRows() / 10);
+        scan.setCaching(kylinConfig.getHBaseScanCacheRows());
+
         scan.setMaxResultSize(kylinConfig.getHBaseScanMaxResultSize());
         scan.setCacheBlocks(true);
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/abe590a2/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java
index d7ea2a0..9eb05d2 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java
@@ -258,9 +258,9 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
         scan.setCacheBlocks(true);
 
         // cache less when there are memory hungry measures
-        if (RowValueDecoder.hasMemHungryMeasures(rowValueDecoders)) {
-            scan.setCaching(scan.getCaching() / 10);
-        }
+//        if (RowValueDecoder.hasMemHungryMeasures(rowValueDecoders)) {
+//            scan.setCaching(scan.getCaching() / 10);
+//        }
     }
 
     private void applyFuzzyFilter(Scan scan, HBaseKeyRange keyRange) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/abe590a2/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
index 49e8593..af5d4b7 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
@@ -113,8 +113,8 @@ public abstract class CubeHBaseRPC implements IGTStorage {
         KylinConfig config = cubeSeg.getCubeDesc().getConfig();
         int hbaseCaching = config.getHBaseScanCacheRows();
         int hbaseMaxResultSize = config.getHBaseScanMaxResultSize();
-        if (isMemoryHungry(selectedColBlocks))
-            hbaseCaching /= 10;
+//        if (isMemoryHungry(selectedColBlocks))
+//            hbaseCaching /= 10;
 
         return new RawScan(start, end, selectedColumns, hbaseFuzzyKeys, hbaseCaching, hbaseMaxResultSize);
     }