You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2016/09/09 10:53:10 UTC

[2/5] kylin git commit: KYLIN-2005 Move all storage side behavior hints to GTScanRequest

KYLIN-2005 Move all storage side behavior hints to GTScanRequest


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/d61bcbfd
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/d61bcbfd
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/d61bcbfd

Branch: refs/heads/1.5.4-rc1
Commit: d61bcbfd8bbe50d7cd08721234d9434f1140c861
Parents: dcd47f8
Author: Hongbin Ma <ma...@apache.org>
Authored: Fri Sep 9 16:46:22 2016 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Fri Sep 9 18:52:49 2016 +0800

----------------------------------------------------------------------
 .../apache/kylin/gridtable/GTScanRequest.java   |  33 +-
 .../kylin/gridtable/GTScanRequestBuilder.java   |  30 +-
 .../kylin/gridtable/StorageSideBehavior.java    |  30 +
 .../apache/kylin/query/ITKylinQueryTest.java    |   4 +-
 .../common/coprocessor/CoprocessorBehavior.java |  30 -
 .../observer/AggregateRegionObserver.java       |  10 +-
 .../observer/AggregationScanner.java            |  16 +-
 .../coprocessor/observer/ObserverEnabler.java   |   6 +-
 .../hbase/cube/v2/CubeHBaseEndpointRPC.java     |  88 +--
 .../hbase/cube/v2/ExpectedSizeIterator.java     |   4 +-
 .../coprocessor/endpoint/CubeVisitService.java  |  18 +-
 .../endpoint/generated/CubeVisitProtos.java     | 754 ++++---------------
 .../endpoint/protobuf/CubeVisit.proto           |  13 +-
 .../observer/AggregateRegionObserverTest.java   |   6 +-
 14 files changed, 332 insertions(+), 710 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/d61bcbfd/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
index 5d27028..3e57e86 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
@@ -59,6 +59,9 @@ public class GTScanRequest {
     private String[] aggrMetricsFuncs;//
 
     // hint to storage behavior
+    private String storageBehavior;
+    private long startTime;
+    private long timeout;
     private boolean allowStorageAggregation;
     private double aggCacheMemThreshold;
     private int storageScanRowNumThreshold;
@@ -69,7 +72,7 @@ public class GTScanRequest {
 
     GTScanRequest(GTInfo info, List<GTScanRange> ranges, ImmutableBitSet dimensions, ImmutableBitSet aggrGroupBy, //
             ImmutableBitSet aggrMetrics, String[] aggrMetricsFuncs, TupleFilter filterPushDown, boolean allowStorageAggregation, //
-            double aggCacheMemThreshold, int storageScanRowNumThreshold, int storagePushDownLimit) {
+            double aggCacheMemThreshold, int storageScanRowNumThreshold, int storagePushDownLimit, String storageBehavior, long startTime, long timeout) {
         this.info = info;
         if (ranges == null) {
             this.ranges = Lists.newArrayList(new GTScanRange(new GTRecord(info), new GTRecord(info)));
@@ -83,6 +86,9 @@ public class GTScanRequest {
         this.aggrMetrics = aggrMetrics;
         this.aggrMetricsFuncs = aggrMetricsFuncs;
 
+        this.storageBehavior = storageBehavior;
+        this.startTime = startTime;
+        this.timeout = timeout;
         this.allowStorageAggregation = allowStorageAggregation;
         this.aggCacheMemThreshold = aggCacheMemThreshold;
         this.storageScanRowNumThreshold = storageScanRowNumThreshold;
@@ -115,6 +121,10 @@ public class GTScanRequest {
         }
     }
 
+    public void setTimeout(long timeout) {
+        this.timeout = timeout;
+    }
+
     private void validateFilterPushDown(GTInfo info) {
         if (!hasFilterPushDown())
             return;
@@ -280,6 +290,18 @@ public class GTScanRequest {
         return this.storagePushDownLimit;
     }
 
+    public String getStorageBehavior() {
+        return storageBehavior;
+    }
+
+    public long getStartTime() {
+        return startTime;
+    }
+
+    public long getTimeout() {
+        return timeout;
+    }
+
     @Override
     public String toString() {
         return "GTScanRequest [range=" + ranges + ", columns=" + columns + ", filterPushDown=" + filterPushDown + ", aggrGroupBy=" + aggrGroupBy + ", aggrMetrics=" + aggrMetrics + ", aggrMetricsFuncs=" + Arrays.toString(aggrMetricsFuncs) + "]";
@@ -320,6 +342,9 @@ public class GTScanRequest {
             out.putDouble(value.aggCacheMemThreshold);
             BytesUtil.writeVInt(value.storageScanRowNumThreshold, out);
             BytesUtil.writeVInt(value.storagePushDownLimit, out);
+            BytesUtil.writeVLong(value.startTime, out);
+            BytesUtil.writeVLong(value.timeout, out);
+            BytesUtil.writeUTFString(value.storageBehavior, out);
         }
 
         @Override
@@ -350,11 +375,15 @@ public class GTScanRequest {
             double sAggrCacheGB = in.getDouble();
             int storageScanRowNumThreshold = BytesUtil.readVInt(in);
             int storagePushDownLimit = BytesUtil.readVInt(in);
+            long startTime = BytesUtil.readVLong(in);
+            long timeout = BytesUtil.readVLong(in);
+            String storageBehavior = BytesUtil.readUTFString(in);
 
             return new GTScanRequestBuilder().setInfo(sInfo).setRanges(sRanges).setDimensions(sColumns).//
             setAggrGroupBy(sAggGroupBy).setAggrMetrics(sAggrMetrics).setAggrMetricsFuncs(sAggrMetricFuncs).//
             setFilterPushDown(sGTFilter).setAllowStorageAggregation(sAllowPreAggr).setAggCacheMemThreshold(sAggrCacheGB).//
-            setStorageScanRowNumThreshold(storageScanRowNumThreshold).setStoragePushDownLimit(storagePushDownLimit).createGTScanRequest();
+            setStorageScanRowNumThreshold(storageScanRowNumThreshold).setStoragePushDownLimit(storagePushDownLimit).//
+            setStartTime(startTime).setTimeout(timeout).setStorageBehavior(storageBehavior).createGTScanRequest();
         }
 
         private void serializeGTRecord(GTRecord gtRecord, ByteBuffer out) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/d61bcbfd/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequestBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequestBuilder.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequestBuilder.java
index c4390cd..f542de1 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequestBuilder.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequestBuilder.java
@@ -21,6 +21,7 @@ package org.apache.kylin.gridtable;
 import java.util.BitSet;
 import java.util.List;
 
+import org.apache.kylin.common.debug.BackdoorToggles;
 import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.metadata.filter.TupleFilter;
 
@@ -36,6 +37,9 @@ public class GTScanRequestBuilder {
     private double aggCacheMemThreshold = 0;
     private int storageScanRowNumThreshold = Integer.MAX_VALUE;// storage should terminate itself when $storageScanRowNumThreshold cuboid rows are scanned, and throw exception.   
     private int storagePushDownLimit = Integer.MAX_VALUE;// storage can quit working when $toragePushDownLimit aggregated rows are produced. 
+    private long startTime = -1;
+    private long timeout = -1;
+    private String storageBehavior = null;
 
     public GTScanRequestBuilder setInfo(GTInfo info) {
         this.info = info;
@@ -92,6 +96,21 @@ public class GTScanRequestBuilder {
         return this;
     }
 
+    public GTScanRequestBuilder setStartTime(long startTime) {
+        this.startTime = startTime;
+        return this;
+    }
+
+    public GTScanRequestBuilder setTimeout(long timeout) {
+        this.timeout = timeout;
+        return this;
+    }
+
+    public GTScanRequestBuilder setStorageBehavior(String storageBehavior) {
+        this.storageBehavior = storageBehavior;
+        return this;
+    }
+
     public GTScanRequest createGTScanRequest() {
         if (aggrGroupBy == null) {
             aggrGroupBy = new ImmutableBitSet(new BitSet());
@@ -104,7 +123,14 @@ public class GTScanRequestBuilder {
         if (aggrMetricsFuncs == null) {
             aggrMetricsFuncs = new String[0];
         }
-        
-        return new GTScanRequest(info, ranges, dimensions, aggrGroupBy, aggrMetrics, aggrMetricsFuncs, filterPushDown, allowStorageAggregation, aggCacheMemThreshold, storageScanRowNumThreshold, storagePushDownLimit);
+
+        if (storageBehavior == null) {
+            storageBehavior = BackdoorToggles.getCoprocessorBehavior() == null ? StorageSideBehavior.SCAN_FILTER_AGGR_CHECKMEM.toString() : BackdoorToggles.getCoprocessorBehavior();
+        }
+
+        this.startTime = startTime == -1 ? System.currentTimeMillis() : startTime;
+        this.timeout = timeout == -1 ? 300000 : timeout;
+
+        return new GTScanRequest(info, ranges, dimensions, aggrGroupBy, aggrMetrics, aggrMetricsFuncs, filterPushDown, allowStorageAggregation, aggCacheMemThreshold, storageScanRowNumThreshold, storagePushDownLimit, storageBehavior, startTime, timeout);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/d61bcbfd/core-cube/src/main/java/org/apache/kylin/gridtable/StorageSideBehavior.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/StorageSideBehavior.java b/core-cube/src/main/java/org/apache/kylin/gridtable/StorageSideBehavior.java
new file mode 100644
index 0000000..7fa93e7
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/StorageSideBehavior.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *  
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.gridtable;
+
+/**
+ */
+public enum StorageSideBehavior {
+    RAW_SCAN, //on use RegionScanner to scan raw data, for testing hbase scan speed
+    SCAN, //only scan data, used for profiling tuple scan speed. Will not return any result
+    SCAN_FILTER, //only scan+filter used,used for profiling filter speed.  Will not return any result
+    SCAN_FILTER_AGGR, //aggregate the result.  Will return results
+    SCAN_FILTER_AGGR_CHECKMEM, //default full operations. Will return results
+    SCAN_FILTER_AGGR_CHECKMEM_WITHDELAY, // on each scan operation, delay for 10s to simulate slow queries, for test use
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/d61bcbfd/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
index fc2fd52..0efea64 100644
--- a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
@@ -40,7 +40,7 @@ import org.apache.kylin.query.routing.Candidate;
 import org.apache.kylin.query.routing.rules.RemoveBlackoutRealizationsRule;
 import org.apache.kylin.query.schema.OLAPSchemaFactory;
 import org.apache.kylin.storage.hbase.HBaseStorage;
-import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorBehavior;
+import org.apache.kylin.gridtable.StorageSideBehavior;
 import org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer.ObserverEnabler;
 import org.dbunit.database.DatabaseConnection;
 import org.dbunit.database.IDatabaseConnection;
@@ -140,7 +140,7 @@ public class ITKylinQueryTest extends KylinTestBase {
         });
 
         Map<String, String> toggles = Maps.newHashMap();
-        toggles.put(BackdoorToggles.DEBUG_TOGGLE_COPROCESSOR_BEHAVIOR, CoprocessorBehavior.SCAN_FILTER_AGGR_CHECKMEM_WITHDELAY.toString());//delay 10ms for every scan
+        toggles.put(BackdoorToggles.DEBUG_TOGGLE_COPROCESSOR_BEHAVIOR, StorageSideBehavior.SCAN_FILTER_AGGR_CHECKMEM_WITHDELAY.toString());//delay 10ms for every scan
         BackdoorToggles.setToggles(toggles);
 
         KylinConfig.getInstanceFromEnv().setProperty("kylin.query.cube.visit.timeout.times", "0.03");//set timeout to 9s

http://git-wip-us.apache.org/repos/asf/kylin/blob/d61bcbfd/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorBehavior.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorBehavior.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorBehavior.java
deleted file mode 100644
index 5f21351..0000000
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorBehavior.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.storage.hbase.common.coprocessor;
-
-/**
- */
-public enum CoprocessorBehavior {
-    RAW_SCAN, //on use RegionScanner to scan raw data, for testing hbase scan speed
-    SCAN, //only scan data, used for profiling tuple scan speed. Will not return any result
-    SCAN_FILTER, //only scan+filter used,used for profiling filter speed.  Will not return any result
-    SCAN_FILTER_AGGR, //aggregate the result.  Will return results
-    SCAN_FILTER_AGGR_CHECKMEM, //default full operations. Will return results
-    SCAN_FILTER_AGGR_CHECKMEM_WITHDELAY, // on each scan operation, delay for 10s to simulate slow queries, for test use
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/d61bcbfd/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserver.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserver.java
index c7b650a..7139ca7 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserver.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserver.java
@@ -29,7 +29,7 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
-import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorBehavior;
+import org.apache.kylin.gridtable.StorageSideBehavior;
 import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorFilter;
 import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorProjector;
 import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorRowType;
@@ -85,15 +85,15 @@ public class AggregateRegionObserver extends BaseRegionObserver {
         byte[] filterBytes = scan.getAttribute(FILTER);
         CoprocessorFilter filter = CoprocessorFilter.deserialize(filterBytes);
 
-        CoprocessorBehavior coprocessorBehavior = CoprocessorBehavior.SCAN_FILTER_AGGR_CHECKMEM;
+        StorageSideBehavior storageSideBehavior = StorageSideBehavior.SCAN_FILTER_AGGR_CHECKMEM;
         try {
             byte[] behavior = scan.getAttribute(BEHAVIOR);
             if (behavior != null && behavior.length != 0) {
-                coprocessorBehavior = CoprocessorBehavior.valueOf(new String(behavior));
+                storageSideBehavior = StorageSideBehavior.valueOf(new String(behavior));
             }
         } catch (Exception e) {
             LOG.error("failed to parse behavior,using default behavior SCAN_FILTER_AGGR_CHECKMEM", e);
-            coprocessorBehavior = CoprocessorBehavior.SCAN_FILTER_AGGR_CHECKMEM;
+            storageSideBehavior = StorageSideBehavior.SCAN_FILTER_AGGR_CHECKMEM;
         }
 
         // start/end region operation & sync on scanner is suggested by the
@@ -103,7 +103,7 @@ public class AggregateRegionObserver extends BaseRegionObserver {
         region.startRegionOperation();
         try {
             synchronized (innerScanner) {
-                return new AggregationScanner(type, filter, projector, aggregators, innerScanner, coprocessorBehavior);
+                return new AggregationScanner(type, filter, projector, aggregators, innerScanner, storageSideBehavior);
             }
         } finally {
             region.closeRegionOperation();

http://git-wip-us.apache.org/repos/asf/kylin/blob/d61bcbfd/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregationScanner.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregationScanner.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregationScanner.java
index be26142..a77f988 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregationScanner.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregationScanner.java
@@ -27,7 +27,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.kylin.measure.MeasureAggregator;
 import org.apache.kylin.storage.hbase.common.coprocessor.AggrKey;
-import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorBehavior;
+import org.apache.kylin.gridtable.StorageSideBehavior;
 import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorFilter;
 import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorProjector;
 import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorRowType;
@@ -39,9 +39,9 @@ import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorRowType;
 public class AggregationScanner implements RegionScanner {
 
     private RegionScanner outerScanner;
-    private CoprocessorBehavior behavior;
+    private StorageSideBehavior behavior;
 
-    public AggregationScanner(CoprocessorRowType type, CoprocessorFilter filter, CoprocessorProjector groupBy, ObserverAggregators aggrs, RegionScanner innerScanner, CoprocessorBehavior behavior) throws IOException {
+    public AggregationScanner(CoprocessorRowType type, CoprocessorFilter filter, CoprocessorProjector groupBy, ObserverAggregators aggrs, RegionScanner innerScanner, StorageSideBehavior behavior) throws IOException {
 
         AggregateRegionObserver.LOG.info("Kylin Coprocessor start");
 
@@ -79,23 +79,23 @@ public class AggregationScanner implements RegionScanner {
             Cell cell = results.get(0);
             tuple.setUnderlying(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
 
-            if (behavior == CoprocessorBehavior.SCAN) {
+            if (behavior == StorageSideBehavior.SCAN) {
                 //touch every byte of the cell so that the cost of scanning will be trully reflected
                 int endIndex = cell.getRowOffset() + cell.getRowLength();
                 for (int i = cell.getRowOffset(); i < endIndex; ++i) {
                     meaninglessByte += cell.getRowArray()[i];
                 }
             } else {
-                if (behavior.ordinal() >= CoprocessorBehavior.SCAN_FILTER.ordinal()) {
+                if (behavior.ordinal() >= StorageSideBehavior.SCAN_FILTER.ordinal()) {
                     if (filter != null && filter.evaluate(tuple) == false)
                         continue;
 
-                    if (behavior.ordinal() >= CoprocessorBehavior.SCAN_FILTER_AGGR.ordinal()) {
+                    if (behavior.ordinal() >= StorageSideBehavior.SCAN_FILTER_AGGR.ordinal()) {
                         AggrKey aggKey = projector.getAggrKey(results);
                         MeasureAggregator[] bufs = aggCache.getBuffer(aggKey);
                         aggregators.aggregate(bufs, results);
 
-                        if (behavior.ordinal() >= CoprocessorBehavior.SCAN_FILTER_AGGR_CHECKMEM.ordinal()) {
+                        if (behavior.ordinal() >= StorageSideBehavior.SCAN_FILTER_AGGR_CHECKMEM.ordinal()) {
                             aggCache.checkMemoryUsage();
                         }
                     }
@@ -103,7 +103,7 @@ public class AggregationScanner implements RegionScanner {
             }
         }
 
-        if (behavior == CoprocessorBehavior.SCAN) {
+        if (behavior == StorageSideBehavior.SCAN) {
             System.out.println("meaningless byte is now " + meaninglessByte);
         }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/d61bcbfd/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverEnabler.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverEnabler.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverEnabler.java
index f0e9bed..394b3e2 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverEnabler.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverEnabler.java
@@ -35,7 +35,7 @@ import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.metadata.filter.TupleFilter;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.storage.StorageContext;
-import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorBehavior;
+import org.apache.kylin.gridtable.StorageSideBehavior;
 import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorFilter;
 import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorProjector;
 import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorRowType;
@@ -75,14 +75,14 @@ public class ObserverEnabler {
 
         if (localCoprocessor) {
             RegionScanner innerScanner = new RegionScannerAdapter(table.getScanner(scan));
-            AggregationScanner aggrScanner = new AggregationScanner(type, filter, projector, aggrs, innerScanner, CoprocessorBehavior.SCAN_FILTER_AGGR_CHECKMEM);
+            AggregationScanner aggrScanner = new AggregationScanner(type, filter, projector, aggrs, innerScanner, StorageSideBehavior.SCAN_FILTER_AGGR_CHECKMEM);
             return new ResultScannerAdapter(aggrScanner);
         } else {
 
             // debug/profiling purpose
             String toggle = BackdoorToggles.getCoprocessorBehavior();
             if (toggle == null) {
-                toggle = CoprocessorBehavior.SCAN_FILTER_AGGR_CHECKMEM.toString(); //default behavior
+                toggle = StorageSideBehavior.SCAN_FILTER_AGGR_CHECKMEM.toString(); //default behavior
             } else {
                 logger.info("The execution of this query will use " + toggle + " as observer's behavior");
             }

http://git-wip-us.apache.org/repos/asf/kylin/blob/d61bcbfd/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index 5b48351..573951b 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -32,7 +32,6 @@ import org.apache.hadoop.hbase.client.coprocessor.Batch;
 import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
 import org.apache.hadoop.hbase.ipc.ServerRpcController;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.debug.BackdoorToggles;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.BytesSerializer;
 import org.apache.kylin.common.util.BytesUtil;
@@ -47,7 +46,6 @@ import org.apache.kylin.gridtable.GTScanRequest;
 import org.apache.kylin.gridtable.GTScanSelfTerminatedException;
 import org.apache.kylin.gridtable.IGTScanner;
 import org.apache.kylin.storage.hbase.HBaseConnection;
-import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorBehavior;
 import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos;
 import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitRequest;
 import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitResponse;
@@ -104,10 +102,6 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
     @Override
     public IGTScanner getGTScanner(final GTScanRequest scanRequest) throws IOException {
 
-        final String toggle = BackdoorToggles.getCoprocessorBehavior() == null ? CoprocessorBehavior.SCAN_FILTER_AGGR_CHECKMEM.toString() : BackdoorToggles.getCoprocessorBehavior();
-
-        logger.info("New scanner for current segment {} will use {} as endpoint's behavior", cubeSeg, toggle);
-
         Pair<Short, Short> shardNumAndBaseShard = getShardNumAndBaseShard();
         short shardNum = shardNumAndBaseShard.getFirst();
         short cuboidBaseShard = shardNumAndBaseShard.getSecond();
@@ -130,39 +124,14 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
 
         //TODO: raw scan can be constructed at region side to reduce traffic
         List<RawScan> rawScans = preparedHBaseScans(scanRequest.getGTScanRanges(), selectedColBlocks);
-        int rawScanBufferSize = BytesSerializer.SERIALIZE_BUFFER_SIZE;
-        while (true) {
-            try {
-                ByteBuffer rawScanBuffer = ByteBuffer.allocate(rawScanBufferSize);
-                BytesUtil.writeVInt(rawScans.size(), rawScanBuffer);
-                for (RawScan rs : rawScans) {
-                    RawScan.serializer.serialize(rs, rawScanBuffer);
-                }
-                rawScanBuffer.flip();
-                rawScanByteString = HBaseZeroCopyByteString.wrap(rawScanBuffer.array(), rawScanBuffer.position(), rawScanBuffer.limit());
-                break;
-            } catch (BufferOverflowException boe) {
-                logger.info("Buffer size {} cannot hold the raw scans, resizing to 4 times", rawScanBufferSize);
-                rawScanBufferSize *= 4;
-            }
-        }
+        rawScanByteString = serializeRawScans(rawScans);
+        
         scanRequest.clearScanRanges();//since raw scans are sent to coprocessor, we don't need to duplicate sending it
-
-        int scanRequestBufferSize = BytesSerializer.SERIALIZE_BUFFER_SIZE;
-        while (true) {
-            try {
-                ByteBuffer buffer = ByteBuffer.allocate(scanRequestBufferSize);
-                GTScanRequest.serializer.serialize(scanRequest, buffer);
-                buffer.flip();
-                scanRequestByteString = HBaseZeroCopyByteString.wrap(buffer.array(), buffer.position(), buffer.limit());
-                break;
-            } catch (BufferOverflowException boe) {
-                logger.info("Buffer size {} cannot hold the scan request, resizing to 4 times", scanRequestBufferSize);
-                scanRequestBufferSize *= 4;
-            }
-        }
-
-        logger.debug("Serialized scanRequestBytes {} bytes, rawScanBytesString {} bytes", scanRequestByteString.size(), rawScanByteString.size());
+        final ExpectedSizeIterator epResultItr = new ExpectedSizeIterator(shardNum);
+        scanRequest.setTimeout(epResultItr.getRpcTimeout());
+        scanRequestByteString = serializeGTScanReq(scanRequest);
+        
+        logger.info("Serialized scanRequestBytes {} bytes, rawScanBytesString {} bytes", scanRequestByteString.size(), rawScanByteString.size());
 
         logger.info("The scan {} for segment {} is as below with {} separate raw scans, shard part of start/end key is set to 0", Integer.toHexString(System.identityHashCode(scanRequest)), cubeSeg, rawScans.size());
         for (RawScan rs : rawScans) {
@@ -172,7 +141,6 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
         logger.debug("Submitting rpc to {} shards starting from shard {}, scan range count {}", shardNum, cuboidBaseShard, rawScans.size());
 
         final AtomicLong totalScannedCount = new AtomicLong(0);
-        final ExpectedSizeIterator epResultItr = new ExpectedSizeIterator(shardNum);
 
         // KylinConfig: use env instance instead of CubeSegment, because KylinConfig will share among queries
         // for different cubes until redeployment of coprocessor jar.
@@ -184,9 +152,6 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
             builder.addHbaseColumnsToGT(intList);
         }
         builder.setRowkeyPreambleSize(cubeSeg.getRowKeyPreambleSize());
-        builder.setBehavior(toggle);
-        builder.setStartTime(System.currentTimeMillis());
-        builder.setTimeout(epResultItr.getRpcTimeout());
         builder.setKylinProperties(kylinConfig.getConfigAsString());
 
         for (final Pair<byte[], byte[]> epRange : getEPKeyRanges(cuboidBaseShard, shardNum, totalShards)) {
@@ -260,6 +225,45 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
         return new GTBlobScatter(fullGTInfo, epResultItr, scanRequest.getColumns(), totalScannedCount.get(), scanRequest.getStoragePushDownLimit());
     }
 
+    private ByteString serializeGTScanReq(GTScanRequest scanRequest) {
+        ByteString scanRequestByteString;
+        int scanRequestBufferSize = BytesSerializer.SERIALIZE_BUFFER_SIZE;
+        while (true) {
+            try {
+                ByteBuffer buffer = ByteBuffer.allocate(scanRequestBufferSize);
+                GTScanRequest.serializer.serialize(scanRequest, buffer);
+                buffer.flip();
+                scanRequestByteString = HBaseZeroCopyByteString.wrap(buffer.array(), buffer.position(), buffer.limit());
+                break;
+            } catch (BufferOverflowException boe) {
+                logger.info("Buffer size {} cannot hold the scan request, resizing to 4 times", scanRequestBufferSize);
+                scanRequestBufferSize *= 4;
+            }
+        }
+        return scanRequestByteString;
+    }
+
+    private ByteString serializeRawScans(List<RawScan> rawScans) {
+        ByteString rawScanByteString;
+        int rawScanBufferSize = BytesSerializer.SERIALIZE_BUFFER_SIZE;
+        while (true) {
+            try {
+                ByteBuffer rawScanBuffer = ByteBuffer.allocate(rawScanBufferSize);
+                BytesUtil.writeVInt(rawScans.size(), rawScanBuffer);
+                for (RawScan rs : rawScans) {
+                    RawScan.serializer.serialize(rs, rawScanBuffer);
+                }
+                rawScanBuffer.flip();
+                rawScanByteString = HBaseZeroCopyByteString.wrap(rawScanBuffer.array(), rawScanBuffer.position(), rawScanBuffer.limit());
+                break;
+            } catch (BufferOverflowException boe) {
+                logger.info("Buffer size {} cannot hold the raw scans, resizing to 4 times", rawScanBufferSize);
+                rawScanBufferSize *= 4;
+            }
+        }
+        return rawScanByteString;
+    }
+
     private String getStatsString(byte[] region, CubeVisitResponse result) {
         StringBuilder sb = new StringBuilder();
         Stats stats = result.getStats();

http://git-wip-us.apache.org/repos/asf/kylin/blob/d61bcbfd/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java
index 442963f..f4729a3 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java
@@ -92,8 +92,8 @@ class ExpectedSizeIterator implements Iterator<byte[]> {
                 if (coprocException instanceof GTScanSelfTerminatedException)
                     throw (GTScanSelfTerminatedException) coprocException;
                 else
-                    throw new RuntimeException("Error in coprocessor",coprocException);
-                
+                    throw new RuntimeException("Error in coprocessor", coprocException);
+
             } else if (ret == null) {
                 throw new RuntimeException("Timeout visiting cube! Check why coprocessor exception is not sent back? In coprocessor Self-termination is checked every " + //
                         GTScanRequest.terminateCheckInterval + " scanned rows, the configured timeout(" + timeout + ") cannot support this many scans?");

http://git-wip-us.apache.org/repos/asf/kylin/blob/d61bcbfd/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
index 064d100..36adca1 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
@@ -54,11 +54,11 @@ import org.apache.kylin.gridtable.GTScanRequest;
 import org.apache.kylin.gridtable.GTScanTimeoutException;
 import org.apache.kylin.gridtable.IGTScanner;
 import org.apache.kylin.gridtable.IGTStore;
+import org.apache.kylin.gridtable.StorageSideBehavior;
 import org.apache.kylin.measure.BufferedMeasureEncoder;
 import org.apache.kylin.metadata.filter.UDF.MassInTupleFilter;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.realization.IRealizationConstants;
-import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorBehavior;
 import org.apache.kylin.storage.hbase.cube.v2.CellListIterator;
 import org.apache.kylin.storage.hbase.cube.v2.CubeHBaseRPC;
 import org.apache.kylin.storage.hbase.cube.v2.HBaseReadonlyStore;
@@ -198,10 +198,10 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
             for (IntList intList : request.getHbaseColumnsToGTList()) {
                 hbaseColumnsToGT.add(intList.getIntsList());
             }
-            CoprocessorBehavior behavior = CoprocessorBehavior.valueOf(request.getBehavior());
+            StorageSideBehavior behavior = StorageSideBehavior.valueOf(scanReq.getStorageBehavior());
             final List<RawScan> hbaseRawScans = deserializeRawScans(ByteBuffer.wrap(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getHbaseRawScan())));
 
-            appendProfileInfo(sb, "start latency: " + (this.serviceStartTime - request.getStartTime()));
+            appendProfileInfo(sb, "start latency: " + (this.serviceStartTime - scanReq.getStartTime()));
 
             MassInTupleFilter.VALUE_PROVIDER_FACTORY = new MassInValueProviderFactoryImpl(new MassInValueProviderFactoryImpl.DimEncAware() {
                 @Override
@@ -228,7 +228,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
 
             final Iterator<List<Cell>> allCellLists = Iterators.concat(cellListsForeachRawScan.iterator());
 
-            if (behavior.ordinal() < CoprocessorBehavior.SCAN.ordinal()) {
+            if (behavior.ordinal() < StorageSideBehavior.SCAN.ordinal()) {
                 //this is only for CoprocessorBehavior.RAW_SCAN case to profile hbase scan speed
                 List<Cell> temp = Lists.newArrayList();
                 int counter = 0;
@@ -240,12 +240,12 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
                 appendProfileInfo(sb, "scanned " + counter);
             }
 
-            if (behavior.ordinal() < CoprocessorBehavior.SCAN_FILTER_AGGR_CHECKMEM.ordinal()) {
+            if (behavior.ordinal() < StorageSideBehavior.SCAN_FILTER_AGGR_CHECKMEM.ordinal()) {
                 scanReq.disableAggCacheMemCheck(); // disable mem check if so told
             }
 
             final MutableBoolean scanNormalComplete = new MutableBoolean(true);
-            final long deadline = request.getTimeout() + this.serviceStartTime;
+            final long deadline = scanReq.getTimeout() + this.serviceStartTime;
             final long storagePushDownLimit = scanReq.getStoragePushDownLimit();
 
             final CellListIterator cellListIterator = new CellListIterator() {
@@ -285,12 +285,12 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
             };
 
             IGTStore store = new HBaseReadonlyStore(cellListIterator, scanReq, hbaseRawScans.get(0).hbaseColumns, hbaseColumnsToGT, //
-                    request.getRowkeyPreambleSize(), CoprocessorBehavior.SCAN_FILTER_AGGR_CHECKMEM_WITHDELAY.toString().equals(request.getBehavior()));
+                    request.getRowkeyPreambleSize(), StorageSideBehavior.SCAN_FILTER_AGGR_CHECKMEM_WITHDELAY.toString().equals(scanReq.getStorageBehavior()));
 
             IGTScanner rawScanner = store.scan(scanReq);
             IGTScanner finalScanner = scanReq.decorateScanner(rawScanner, //
-                    behavior.ordinal() >= CoprocessorBehavior.SCAN_FILTER.ordinal(), //
-                    behavior.ordinal() >= CoprocessorBehavior.SCAN_FILTER_AGGR.ordinal(), deadline);
+                    behavior.ordinal() >= StorageSideBehavior.SCAN_FILTER.ordinal(), //
+                    behavior.ordinal() >= StorageSideBehavior.SCAN_FILTER_AGGR.ordinal(), deadline);
 
             ByteBuffer buffer = ByteBuffer.allocate(BufferedMeasureEncoder.DEFAULT_BUFFER_SIZE);