You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2016/04/12 04:24:53 UTC

kylin git commit: KYLIN-1578 Coprocessor thread voluntarily stop itself when it reaches timeout

Repository: kylin
Updated Branches:
  refs/heads/yang-m1 21d9ff401 -> 9513b8e47


KYLIN-1578 Coprocessor thread voluntarily stop itself when it reaches timeout


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/9513b8e4
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/9513b8e4
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/9513b8e4

Branch: refs/heads/yang-m1
Commit: 9513b8e470382b15637cfe215f24cc3ae624b9fd
Parents: 21d9ff4
Author: Hongbin Ma <ma...@apache.org>
Authored: Tue Apr 12 10:24:42 2016 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Tue Apr 12 10:24:42 2016 +0800

----------------------------------------------------------------------
 .../apache/kylin/gridtable/GTScanRequest.java   |   2 +
 .../hbase/cube/v2/CubeHBaseEndpointRPC.java     |  16 +
 .../coprocessor/endpoint/CubeVisitService.java  |  49 ++-
 .../endpoint/generated/CubeVisitProtos.java     | 436 +++++++++++++++++--
 .../endpoint/protobuf/CubeVisit.proto           |   3 +
 5 files changed, 459 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/9513b8e4/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
index 1edfb36..97900c1 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
@@ -24,6 +24,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Set;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.BytesSerializer;
 import org.apache.kylin.common.util.BytesUtil;
@@ -179,6 +180,7 @@ public class GTScanRequest {
             }
         }
         System.out.println("Meaningless byte is " + meaninglessByte);
+        IOUtils.closeQuietly(scanner);
         return scanned;
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/9513b8e4/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index 6bbb0b7..bfc240a 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -160,6 +160,12 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
                 throw new RuntimeException("error when waiting queue", e);
             }
         }
+
+
+        public long getTimeout() {
+            return timeout;
+        }
+
     }
 
     static class EndpointResultsAsGTScanner implements IGTScanner {
@@ -310,6 +316,9 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
                         }
                         builder.setRowkeyPreambleSize(cubeSeg.getRowKeyPreambleSize());
                         builder.setBehavior(toggle);
+                        builder.setStartTime(System.currentTimeMillis());
+                        builder.setTimeout(epResultItr.getTimeout());
+
 
                         Map<byte[], CubeVisitProtos.CubeVisitResponse> results;
                         try {
@@ -321,6 +330,11 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
                         for (Map.Entry<byte[], CubeVisitProtos.CubeVisitResponse> result : results.entrySet()) {
                             totalScannedCount.addAndGet(result.getValue().getStats().getScannedRowCount());
                             logger.info("<sub-thread for GTScanRequest " + Integer.toHexString(System.identityHashCode(scanRequests.get(i))) + "> " + getStatsString(result));
+
+                            if (result.getValue().getStats().getNormalComplete() != 1) {
+                                throw new RuntimeException("The coprocessor thread stopped itself due to scan timeout.");
+                            }
+
                             try {
                                 epResultItr.append(CompressionUtils.decompress(HBaseZeroCopyByteString.zeroCopyGetBytes(result.getValue().getCompressedRows())));
                             } catch (IOException | DataFormatException e) {
@@ -344,6 +358,8 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
         sb.append("Time elapsed in EP: ").append(stats.getServiceEndTime() - stats.getServiceStartTime()).append("(ms). ");
         sb.append("Server CPU usage: ").append(stats.getSystemCpuLoad()).append(", server physical mem left: ").append(stats.getFreePhysicalMemorySize()).append(", server swap mem left:").append(stats.getFreeSwapSpaceSize()).append(".");
         sb.append("Etc message: ").append(stats.getEtcMsg()).append(".");
+        sb.append("Normal Complete: ").append(stats.getNormalComplete() == 1).append(".");
+
         return sb.toString();
 
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/9513b8e4/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
index 1e7b1b5..596814f 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
@@ -27,6 +27,7 @@ import java.util.List;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.ArrayUtils;
+import org.apache.commons.lang.mutable.MutableBoolean;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.Coprocessor;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
@@ -78,14 +79,22 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
     private long serviceStartTime;
 
     static class InnerScannerAsIterator implements CellListIterator {
+
         private RegionScanner regionScanner;
+        private MutableBoolean normalComplete;
+        private long startTime;
+        private long timeout;
+        private int counter = 0;
+
         private List<Cell> nextOne = Lists.newArrayList();
         private List<Cell> ret = Lists.newArrayList();
-
         private boolean hasMore;
 
-        public InnerScannerAsIterator(RegionScanner regionScanner) {
+        public InnerScannerAsIterator(RegionScanner regionScanner, MutableBoolean normalComplete, long startTime, long timeout) {
             this.regionScanner = regionScanner;
+            this.normalComplete = normalComplete;
+            this.startTime = startTime;
+            this.timeout = timeout;
 
             try {
                 hasMore = regionScanner.nextRaw(nextOne);
@@ -96,6 +105,13 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
 
         @Override
         public boolean hasNext() {
+            if (counter++ % 1000 == 1) {
+                if (System.currentTimeMillis() - startTime > timeout) {
+                    normalComplete.setValue(false);
+                    return false;
+                }
+            }
+
             return !nextOne.isEmpty();
         }
 
@@ -138,8 +154,11 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
         Bytes.putBytes(rawScan.endKey, 0, regionStartKey, 0, shardLength);
     }
 
-    private void appendProfileInfo(StringBuilder sb) {
-        sb.append(System.currentTimeMillis() - this.serviceStartTime);
+    private void appendProfileInfo(StringBuilder sb, String info) {
+        if (info != null) {
+            sb.append(info);
+        }
+        sb.append("@" + (System.currentTimeMillis() - this.serviceStartTime));
         sb.append(",");
     }
 
@@ -161,6 +180,8 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
             final GTScanRequest scanReq = GTScanRequest.serializer.deserialize(ByteBuffer.wrap(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getGtScanRequest())));
             final RawScan hbaseRawScan = RawScan.serializer.deserialize(ByteBuffer.wrap(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getHbaseRawScan())));
 
+            appendProfileInfo(sb, "start latency: " + (this.serviceStartTime - request.getStartTime()));
+
             MassInTupleFilter.VALUE_PROVIDER_FACTORY = new MassInValueProviderFactoryImpl(new MassInValueProviderFactoryImpl.DimEncAware() {
                 @Override
                 public DimensionEncoding getDimEnc(TblColRef col) {
@@ -180,7 +201,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
 
             Scan scan = CubeHBaseRPC.buildScan(hbaseRawScan);
 
-            appendProfileInfo(sb);
+            appendProfileInfo(sb, "scan built");
 
             innerScanner = region.getScanner(scan);
             CoprocessorBehavior behavior = CoprocessorBehavior.valueOf(request.getBehavior());
@@ -191,10 +212,14 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
                 while (innerScanner.nextRaw(temp)) {
                     counter++;
                 }
-                sb.append("Scanned " + counter + " rows in " + (System.currentTimeMillis() - serviceStartTime) + ",");
+                appendProfileInfo(sb, "scanned " + counter);
             }
 
-            InnerScannerAsIterator cellListIterator = new InnerScannerAsIterator(innerScanner);
+            final MutableBoolean normalComplete = new MutableBoolean(true);
+            final long startTime = request.getStartTime();
+            final long timeout = (long) (request.getTimeout() * 0.95);
+            InnerScannerAsIterator cellListIterator = new InnerScannerAsIterator(innerScanner, normalComplete, startTime, timeout);
+
             if (behavior.ordinal() < CoprocessorBehavior.SCAN_FILTER_AGGR_CHECKMEM.ordinal()) {
                 scanReq.setAggrCacheGB(0); // disable mem check if so told
             }
@@ -218,21 +243,21 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
                 outputStream.write(buffer.array(), buffer.arrayOffset() - buffer.position(), buffer.remaining());
                 finalRowCount++;
             }
-
-            appendProfileInfo(sb);
+            finalScanner.close();
+            appendProfileInfo(sb, "agg done");
 
             //outputStream.close() is not necessary
             allRows = outputStream.toByteArray();
             byte[] compressedAllRows = CompressionUtils.compress(allRows);
 
-            appendProfileInfo(sb);
+            appendProfileInfo(sb, "compress done");
 
             OperatingSystemMXBean operatingSystemMXBean = (OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean();
             double systemCpuLoad = operatingSystemMXBean.getSystemCpuLoad();
             double freePhysicalMemorySize = operatingSystemMXBean.getFreePhysicalMemorySize();
             double freeSwapSpaceSize = operatingSystemMXBean.getFreeSwapSpaceSize();
 
-            appendProfileInfo(sb);
+            appendProfileInfo(sb, "server stats done");
 
             CubeVisitProtos.CubeVisitResponse.Builder responseBuilder = CubeVisitProtos.CubeVisitResponse.newBuilder();
             done.run(responseBuilder.//
@@ -247,7 +272,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
                             setFreeSwapSpaceSize(freeSwapSpaceSize).//
                             setHostname(InetAddress.getLocalHost().getHostName()).// 
                             setEtcMsg(sb.toString()).//
-                            build())
+                            setNormalComplete(normalComplete.booleanValue() ? 1 : 0).build())
                     .//
                     build());
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/9513b8e4/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java
index 6e3e2bb..53393e8 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java
@@ -1,21 +1,3 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
 // Generated by the protocol buffer compiler.  DO NOT EDIT!
 // source: storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto
 
@@ -98,6 +80,42 @@ public final class CubeVisitProtos {
      */
     org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitRequest.IntListOrBuilder getHbaseColumnsToGTOrBuilder(
         int index);
+
+    // required int64 startTime = 6;
+    /**
+     * <code>required int64 startTime = 6;</code>
+     *
+     * <pre>
+     *when client start the request
+     * </pre>
+     */
+    boolean hasStartTime();
+    /**
+     * <code>required int64 startTime = 6;</code>
+     *
+     * <pre>
+     *when client start the request
+     * </pre>
+     */
+    long getStartTime();
+
+    // required int64 timeout = 7;
+    /**
+     * <code>required int64 timeout = 7;</code>
+     *
+     * <pre>
+     *how long client will wait
+     * </pre>
+     */
+    boolean hasTimeout();
+    /**
+     * <code>required int64 timeout = 7;</code>
+     *
+     * <pre>
+     *how long client will wait
+     * </pre>
+     */
+    long getTimeout();
   }
   /**
    * Protobuf type {@code CubeVisitRequest}
@@ -178,6 +196,16 @@ public final class CubeVisitProtos {
               hbaseColumnsToGT_.add(input.readMessage(org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitRequest.IntList.PARSER, extensionRegistry));
               break;
             }
+            case 48: {
+              bitField0_ |= 0x00000010;
+              startTime_ = input.readInt64();
+              break;
+            }
+            case 56: {
+              bitField0_ |= 0x00000020;
+              timeout_ = input.readInt64();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -852,12 +880,62 @@ public final class CubeVisitProtos {
       return hbaseColumnsToGT_.get(index);
     }
 
+    // required int64 startTime = 6;
+    public static final int STARTTIME_FIELD_NUMBER = 6;
+    private long startTime_;
+    /**
+     * <code>required int64 startTime = 6;</code>
+     *
+     * <pre>
+     *when client start the request
+     * </pre>
+     */
+    public boolean hasStartTime() {
+      return ((bitField0_ & 0x00000010) == 0x00000010);
+    }
+    /**
+     * <code>required int64 startTime = 6;</code>
+     *
+     * <pre>
+     *when client start the request
+     * </pre>
+     */
+    public long getStartTime() {
+      return startTime_;
+    }
+
+    // required int64 timeout = 7;
+    public static final int TIMEOUT_FIELD_NUMBER = 7;
+    private long timeout_;
+    /**
+     * <code>required int64 timeout = 7;</code>
+     *
+     * <pre>
+     *how long client will wait
+     * </pre>
+     */
+    public boolean hasTimeout() {
+      return ((bitField0_ & 0x00000020) == 0x00000020);
+    }
+    /**
+     * <code>required int64 timeout = 7;</code>
+     *
+     * <pre>
+     *how long client will wait
+     * </pre>
+     */
+    public long getTimeout() {
+      return timeout_;
+    }
+
     private void initFields() {
       behavior_ = "";
       gtScanRequest_ = com.google.protobuf.ByteString.EMPTY;
       hbaseRawScan_ = com.google.protobuf.ByteString.EMPTY;
       rowkeyPreambleSize_ = 0;
       hbaseColumnsToGT_ = java.util.Collections.emptyList();
+      startTime_ = 0L;
+      timeout_ = 0L;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -880,6 +958,14 @@ public final class CubeVisitProtos {
         memoizedIsInitialized = 0;
         return false;
       }
+      if (!hasStartTime()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
+      if (!hasTimeout()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
       memoizedIsInitialized = 1;
       return true;
     }
@@ -902,6 +988,12 @@ public final class CubeVisitProtos {
       for (int i = 0; i < hbaseColumnsToGT_.size(); i++) {
         output.writeMessage(5, hbaseColumnsToGT_.get(i));
       }
+      if (((bitField0_ & 0x00000010) == 0x00000010)) {
+        output.writeInt64(6, startTime_);
+      }
+      if (((bitField0_ & 0x00000020) == 0x00000020)) {
+        output.writeInt64(7, timeout_);
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -931,6 +1023,14 @@ public final class CubeVisitProtos {
         size += com.google.protobuf.CodedOutputStream
           .computeMessageSize(5, hbaseColumnsToGT_.get(i));
       }
+      if (((bitField0_ & 0x00000010) == 0x00000010)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt64Size(6, startTime_);
+      }
+      if (((bitField0_ & 0x00000020) == 0x00000020)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt64Size(7, timeout_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -976,6 +1076,16 @@ public final class CubeVisitProtos {
       }
       result = result && getHbaseColumnsToGTList()
           .equals(other.getHbaseColumnsToGTList());
+      result = result && (hasStartTime() == other.hasStartTime());
+      if (hasStartTime()) {
+        result = result && (getStartTime()
+            == other.getStartTime());
+      }
+      result = result && (hasTimeout() == other.hasTimeout());
+      if (hasTimeout()) {
+        result = result && (getTimeout()
+            == other.getTimeout());
+      }
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
       return result;
@@ -1009,6 +1119,14 @@ public final class CubeVisitProtos {
         hash = (37 * hash) + HBASECOLUMNSTOGT_FIELD_NUMBER;
         hash = (53 * hash) + getHbaseColumnsToGTList().hashCode();
       }
+      if (hasStartTime()) {
+        hash = (37 * hash) + STARTTIME_FIELD_NUMBER;
+        hash = (53 * hash) + hashLong(getStartTime());
+      }
+      if (hasTimeout()) {
+        hash = (37 * hash) + TIMEOUT_FIELD_NUMBER;
+        hash = (53 * hash) + hashLong(getTimeout());
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       memoizedHashCode = hash;
       return hash;
@@ -1133,6 +1251,10 @@ public final class CubeVisitProtos {
         } else {
           hbaseColumnsToGTBuilder_.clear();
         }
+        startTime_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000020);
+        timeout_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000040);
         return this;
       }
 
@@ -1186,6 +1308,14 @@ public final class CubeVisitProtos {
         } else {
           result.hbaseColumnsToGT_ = hbaseColumnsToGTBuilder_.build();
         }
+        if (((from_bitField0_ & 0x00000020) == 0x00000020)) {
+          to_bitField0_ |= 0x00000010;
+        }
+        result.startTime_ = startTime_;
+        if (((from_bitField0_ & 0x00000040) == 0x00000040)) {
+          to_bitField0_ |= 0x00000020;
+        }
+        result.timeout_ = timeout_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -1242,6 +1372,12 @@ public final class CubeVisitProtos {
             }
           }
         }
+        if (other.hasStartTime()) {
+          setStartTime(other.getStartTime());
+        }
+        if (other.hasTimeout()) {
+          setTimeout(other.getTimeout());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -1263,6 +1399,14 @@ public final class CubeVisitProtos {
           
           return false;
         }
+        if (!hasStartTime()) {
+          
+          return false;
+        }
+        if (!hasTimeout()) {
+          
+          return false;
+        }
         return true;
       }
 
@@ -1704,6 +1848,104 @@ public final class CubeVisitProtos {
         return hbaseColumnsToGTBuilder_;
       }
 
+      // required int64 startTime = 6;
+      private long startTime_ ;
+      /**
+       * <code>required int64 startTime = 6;</code>
+       *
+       * <pre>
+       *when client start the request
+       * </pre>
+       */
+      public boolean hasStartTime() {
+        return ((bitField0_ & 0x00000020) == 0x00000020);
+      }
+      /**
+       * <code>required int64 startTime = 6;</code>
+       *
+       * <pre>
+       *when client start the request
+       * </pre>
+       */
+      public long getStartTime() {
+        return startTime_;
+      }
+      /**
+       * <code>required int64 startTime = 6;</code>
+       *
+       * <pre>
+       *when client start the request
+       * </pre>
+       */
+      public Builder setStartTime(long value) {
+        bitField0_ |= 0x00000020;
+        startTime_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>required int64 startTime = 6;</code>
+       *
+       * <pre>
+       *when client start the request
+       * </pre>
+       */
+      public Builder clearStartTime() {
+        bitField0_ = (bitField0_ & ~0x00000020);
+        startTime_ = 0L;
+        onChanged();
+        return this;
+      }
+
+      // required int64 timeout = 7;
+      private long timeout_ ;
+      /**
+       * <code>required int64 timeout = 7;</code>
+       *
+       * <pre>
+       *how long client will wait
+       * </pre>
+       */
+      public boolean hasTimeout() {
+        return ((bitField0_ & 0x00000040) == 0x00000040);
+      }
+      /**
+       * <code>required int64 timeout = 7;</code>
+       *
+       * <pre>
+       *how long client will wait
+       * </pre>
+       */
+      public long getTimeout() {
+        return timeout_;
+      }
+      /**
+       * <code>required int64 timeout = 7;</code>
+       *
+       * <pre>
+       *how long client will wait
+       * </pre>
+       */
+      public Builder setTimeout(long value) {
+        bitField0_ |= 0x00000040;
+        timeout_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>required int64 timeout = 7;</code>
+       *
+       * <pre>
+       *how long client will wait
+       * </pre>
+       */
+      public Builder clearTimeout() {
+        bitField0_ = (bitField0_ & ~0x00000040);
+        timeout_ = 0L;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:CubeVisitRequest)
     }
 
@@ -1952,6 +2194,24 @@ public final class CubeVisitProtos {
        */
       com.google.protobuf.ByteString
           getEtcMsgBytes();
+
+      // optional int32 normalComplete = 10;
+      /**
+       * <code>optional int32 normalComplete = 10;</code>
+       *
+       * <pre>
+       *when time outs, normalComplete will be false
+       * </pre>
+       */
+      boolean hasNormalComplete();
+      /**
+       * <code>optional int32 normalComplete = 10;</code>
+       *
+       * <pre>
+       *when time outs, normalComplete will be false
+       * </pre>
+       */
+      int getNormalComplete();
     }
     /**
      * Protobuf type {@code CubeVisitResponse.Stats}
@@ -2049,6 +2309,11 @@ public final class CubeVisitProtos {
                 etcMsg_ = input.readBytes();
                 break;
               }
+              case 80: {
+                bitField0_ |= 0x00000200;
+                normalComplete_ = input.readInt32();
+                break;
+              }
             }
           }
         } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -2287,6 +2552,30 @@ public final class CubeVisitProtos {
         }
       }
 
+      // optional int32 normalComplete = 10;
+      public static final int NORMALCOMPLETE_FIELD_NUMBER = 10;
+      private int normalComplete_;
+      /**
+       * <code>optional int32 normalComplete = 10;</code>
+       *
+       * <pre>
+       *when time outs, normalComplete will be false
+       * </pre>
+       */
+      public boolean hasNormalComplete() {
+        return ((bitField0_ & 0x00000200) == 0x00000200);
+      }
+      /**
+       * <code>optional int32 normalComplete = 10;</code>
+       *
+       * <pre>
+       *when time outs, normalComplete will be false
+       * </pre>
+       */
+      public int getNormalComplete() {
+        return normalComplete_;
+      }
+
       private void initFields() {
         serviceStartTime_ = 0L;
         serviceEndTime_ = 0L;
@@ -2297,6 +2586,7 @@ public final class CubeVisitProtos {
         freeSwapSpaceSize_ = 0D;
         hostname_ = "";
         etcMsg_ = "";
+        normalComplete_ = 0;
       }
       private byte memoizedIsInitialized = -1;
       public final boolean isInitialized() {
@@ -2337,6 +2627,9 @@ public final class CubeVisitProtos {
         if (((bitField0_ & 0x00000100) == 0x00000100)) {
           output.writeBytes(9, getEtcMsgBytes());
         }
+        if (((bitField0_ & 0x00000200) == 0x00000200)) {
+          output.writeInt32(10, normalComplete_);
+        }
         getUnknownFields().writeTo(output);
       }
 
@@ -2382,6 +2675,10 @@ public final class CubeVisitProtos {
           size += com.google.protobuf.CodedOutputStream
             .computeBytesSize(9, getEtcMsgBytes());
         }
+        if (((bitField0_ & 0x00000200) == 0x00000200)) {
+          size += com.google.protobuf.CodedOutputStream
+            .computeInt32Size(10, normalComplete_);
+        }
         size += getUnknownFields().getSerializedSize();
         memoizedSerializedSize = size;
         return size;
@@ -2447,6 +2744,11 @@ public final class CubeVisitProtos {
           result = result && getEtcMsg()
               .equals(other.getEtcMsg());
         }
+        result = result && (hasNormalComplete() == other.hasNormalComplete());
+        if (hasNormalComplete()) {
+          result = result && (getNormalComplete()
+              == other.getNormalComplete());
+        }
         result = result &&
             getUnknownFields().equals(other.getUnknownFields());
         return result;
@@ -2499,6 +2801,10 @@ public final class CubeVisitProtos {
           hash = (37 * hash) + ETCMSG_FIELD_NUMBER;
           hash = (53 * hash) + getEtcMsg().hashCode();
         }
+        if (hasNormalComplete()) {
+          hash = (37 * hash) + NORMALCOMPLETE_FIELD_NUMBER;
+          hash = (53 * hash) + getNormalComplete();
+        }
         hash = (29 * hash) + getUnknownFields().hashCode();
         memoizedHashCode = hash;
         return hash;
@@ -2626,6 +2932,8 @@ public final class CubeVisitProtos {
           bitField0_ = (bitField0_ & ~0x00000080);
           etcMsg_ = "";
           bitField0_ = (bitField0_ & ~0x00000100);
+          normalComplete_ = 0;
+          bitField0_ = (bitField0_ & ~0x00000200);
           return this;
         }
 
@@ -2690,6 +2998,10 @@ public final class CubeVisitProtos {
             to_bitField0_ |= 0x00000100;
           }
           result.etcMsg_ = etcMsg_;
+          if (((from_bitField0_ & 0x00000200) == 0x00000200)) {
+            to_bitField0_ |= 0x00000200;
+          }
+          result.normalComplete_ = normalComplete_;
           result.bitField0_ = to_bitField0_;
           onBuilt();
           return result;
@@ -2737,6 +3049,9 @@ public final class CubeVisitProtos {
             etcMsg_ = other.etcMsg_;
             onChanged();
           }
+          if (other.hasNormalComplete()) {
+            setNormalComplete(other.getNormalComplete());
+          }
           this.mergeUnknownFields(other.getUnknownFields());
           return this;
         }
@@ -3143,6 +3458,55 @@ public final class CubeVisitProtos {
           return this;
         }
 
+        // optional int32 normalComplete = 10;
+        private int normalComplete_ ;
+        /**
+         * <code>optional int32 normalComplete = 10;</code>
+         *
+         * <pre>
+         *when time outs, normalComplete will be false
+         * </pre>
+         */
+        public boolean hasNormalComplete() {
+          return ((bitField0_ & 0x00000200) == 0x00000200);
+        }
+        /**
+         * <code>optional int32 normalComplete = 10;</code>
+         *
+         * <pre>
+         *when time outs, normalComplete will be false
+         * </pre>
+         */
+        public int getNormalComplete() {
+          return normalComplete_;
+        }
+        /**
+         * <code>optional int32 normalComplete = 10;</code>
+         *
+         * <pre>
+         *when time outs, normalComplete will be false
+         * </pre>
+         */
+        public Builder setNormalComplete(int value) {
+          bitField0_ |= 0x00000200;
+          normalComplete_ = value;
+          onChanged();
+          return this;
+        }
+        /**
+         * <code>optional int32 normalComplete = 10;</code>
+         *
+         * <pre>
+         *when time outs, normalComplete will be false
+         * </pre>
+         */
+        public Builder clearNormalComplete() {
+          bitField0_ = (bitField0_ & ~0x00000200);
+          normalComplete_ = 0;
+          onChanged();
+          return this;
+        }
+
         // @@protoc_insertion_point(builder_scope:CubeVisitResponse.Stats)
       }
 
@@ -3936,24 +4300,26 @@ public final class CubeVisitProtos {
     java.lang.String[] descriptorData = {
       "\npstorage-hbase/src/main/java/org/apache" +
       "/kylin/storage/hbase/cube/v2/coprocessor" +
-      "/endpoint/protobuf/CubeVisit.proto\"\273\001\n\020C" +
+      "/endpoint/protobuf/CubeVisit.proto\"\337\001\n\020C" +
       "ubeVisitRequest\022\020\n\010behavior\030\001 \002(\t\022\025\n\rgtS" +
       "canRequest\030\002 \002(\014\022\024\n\014hbaseRawScan\030\003 \002(\014\022\032" +
       "\n\022rowkeyPreambleSize\030\004 \002(\005\0223\n\020hbaseColum" +
-      "nsToGT\030\005 \003(\0132\031.CubeVisitRequest.IntList\032" +
-      "\027\n\007IntList\022\014\n\004ints\030\001 \003(\005\"\271\002\n\021CubeVisitRe" +
-      "sponse\022\026\n\016compressedRows\030\001 \002(\014\022\'\n\005stats\030" +
-      "\002 \002(\0132\030.CubeVisitResponse.Stats\032\342\001\n\005Stat",
-      "s\022\030\n\020serviceStartTime\030\001 \001(\003\022\026\n\016serviceEn" +
-      "dTime\030\002 \001(\003\022\027\n\017scannedRowCount\030\003 \001(\005\022\032\n\022" +
-      "aggregatedRowCount\030\004 \001(\005\022\025\n\rsystemCpuLoa" +
-      "d\030\005 \001(\001\022\036\n\026freePhysicalMemorySize\030\006 \001(\001\022" +
-      "\031\n\021freeSwapSpaceSize\030\007 \001(\001\022\020\n\010hostname\030\010" +
-      " \001(\t\022\016\n\006etcMsg\030\t \001(\t2F\n\020CubeVisitService" +
-      "\0222\n\tvisitCube\022\021.CubeVisitRequest\032\022.CubeV" +
-      "isitResponseB`\nEorg.apache.kylin.storage" +
-      ".hbase.cube.v2.coprocessor.endpoint.gene" +
-      "ratedB\017CubeVisitProtosH\001\210\001\001\240\001\001"
+      "nsToGT\030\005 \003(\0132\031.CubeVisitRequest.IntList\022" +
+      "\021\n\tstartTime\030\006 \002(\003\022\017\n\007timeout\030\007 \002(\003\032\027\n\007I" +
+      "ntList\022\014\n\004ints\030\001 \003(\005\"\321\002\n\021CubeVisitRespon" +
+      "se\022\026\n\016compressedRows\030\001 \002(\014\022\'\n\005stats\030\002 \002(",
+      "\0132\030.CubeVisitResponse.Stats\032\372\001\n\005Stats\022\030\n" +
+      "\020serviceStartTime\030\001 \001(\003\022\026\n\016serviceEndTim" +
+      "e\030\002 \001(\003\022\027\n\017scannedRowCount\030\003 \001(\005\022\032\n\022aggr" +
+      "egatedRowCount\030\004 \001(\005\022\025\n\rsystemCpuLoad\030\005 " +
+      "\001(\001\022\036\n\026freePhysicalMemorySize\030\006 \001(\001\022\031\n\021f" +
+      "reeSwapSpaceSize\030\007 \001(\001\022\020\n\010hostname\030\010 \001(\t" +
+      "\022\016\n\006etcMsg\030\t \001(\t\022\026\n\016normalComplete\030\n \001(\005" +
+      "2F\n\020CubeVisitService\0222\n\tvisitCube\022\021.Cube" +
+      "VisitRequest\032\022.CubeVisitResponseB`\nEorg." +
+      "apache.kylin.storage.hbase.cube.v2.copro",
+      "cessor.endpoint.generatedB\017CubeVisitProt" +
+      "osH\001\210\001\001\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -3965,7 +4331,7 @@ public final class CubeVisitProtos {
           internal_static_CubeVisitRequest_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_CubeVisitRequest_descriptor,
-              new java.lang.String[] { "Behavior", "GtScanRequest", "HbaseRawScan", "RowkeyPreambleSize", "HbaseColumnsToGT", });
+              new java.lang.String[] { "Behavior", "GtScanRequest", "HbaseRawScan", "RowkeyPreambleSize", "HbaseColumnsToGT", "StartTime", "Timeout", });
           internal_static_CubeVisitRequest_IntList_descriptor =
             internal_static_CubeVisitRequest_descriptor.getNestedTypes().get(0);
           internal_static_CubeVisitRequest_IntList_fieldAccessorTable = new
@@ -3983,7 +4349,7 @@ public final class CubeVisitProtos {
           internal_static_CubeVisitResponse_Stats_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_CubeVisitResponse_Stats_descriptor,
-              new java.lang.String[] { "ServiceStartTime", "ServiceEndTime", "ScannedRowCount", "AggregatedRowCount", "SystemCpuLoad", "FreePhysicalMemorySize", "FreeSwapSpaceSize", "Hostname", "EtcMsg", });
+              new java.lang.String[] { "ServiceStartTime", "ServiceEndTime", "ScannedRowCount", "AggregatedRowCount", "SystemCpuLoad", "FreePhysicalMemorySize", "FreeSwapSpaceSize", "Hostname", "EtcMsg", "NormalComplete", });
           return null;
         }
       };

http://git-wip-us.apache.org/repos/asf/kylin/blob/9513b8e4/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto
index 5b66a56..ecaad35 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto
@@ -35,6 +35,8 @@ message CubeVisitRequest {
     required bytes hbaseRawScan = 3;
     required int32 rowkeyPreambleSize = 4;
     repeated IntList hbaseColumnsToGT = 5;
+    required int64 startTime = 6;//when client start the request
+    required int64 timeout = 7;//how long client will wait
     message IntList {
         repeated int32 ints = 1;
     }
@@ -51,6 +53,7 @@ message CubeVisitResponse {
         optional double freeSwapSpaceSize = 7;
         optional string hostname = 8;
         optional string etcMsg = 9;
+        optional int32 normalComplete =10;//when time outs, normalComplete will be false
     }
     required bytes compressedRows = 1;
     required Stats stats = 2;