You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2015/06/25 10:04:15 UTC

[1/5] incubator-kylin git commit: KYLIN-844 control observer behavior

Repository: incubator-kylin
Updated Branches:
  refs/heads/0.8 c1f252495 -> c2bb3a763


KYLIN-844 control observer behavior


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/8048fa4a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/8048fa4a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/8048fa4a

Branch: refs/heads/0.8
Commit: 8048fa4ab93e0dfbdaa3c773ab33ecc82ad20e7f
Parents: c1f2524
Author: honma <ho...@ebay.com>
Authored: Thu Jun 25 15:44:54 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Thu Jun 25 15:45:00 2015 +0800

----------------------------------------------------------------------
 .../observer/AggregateRegionObserver.java       |  17 +-
 .../observer/AggregationScanner.java            | 333 ++++++++++---------
 .../coprocessor/observer/ObserverBehavior.java  |   9 +
 .../coprocessor/observer/ObserverEnabler.java   |  11 +-
 .../observer/AggregateRegionObserverTest.java   |  44 +--
 5 files changed, 214 insertions(+), 200 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8048fa4a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserver.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserver.java
index d893621..aea2cea 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserver.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserver.java
@@ -18,11 +18,6 @@
 
 package org.apache.kylin.storage.hbase.coprocessor.observer;
 
-import java.io.IOException;
-
-import org.apache.kylin.storage.hbase.coprocessor.CoprocessorFilter;
-import org.apache.kylin.storage.hbase.coprocessor.CoprocessorProjector;
-import org.apache.kylin.storage.hbase.coprocessor.CoprocessorRowType;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.client.Scan;
@@ -32,6 +27,11 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.kylin.storage.hbase.coprocessor.CoprocessorFilter;
+import org.apache.kylin.storage.hbase.coprocessor.CoprocessorProjector;
+import org.apache.kylin.storage.hbase.coprocessor.CoprocessorRowType;
+
+import java.io.IOException;
 
 /**
  * @author yangli9
@@ -46,6 +46,8 @@ public class AggregateRegionObserver extends BaseRegionObserver {
     static final String PROJECTOR = "_Projector";
     static final String AGGREGATORS = "_Aggregators";
     static final String FILTER = "_Filter";
+    static final String BEHAVIOR = "_Behavior";
+
 
     @Override
     public final RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> ctxt, final Scan scan, final RegionScanner innerScanner) throws IOException {
@@ -83,6 +85,9 @@ public class AggregateRegionObserver extends BaseRegionObserver {
         byte[] filterBytes = scan.getAttribute(FILTER);
         CoprocessorFilter filter = CoprocessorFilter.deserialize(filterBytes);
 
+        byte[] behavior = scan.getAttribute(BEHAVIOR);
+        ObserverBehavior observerBehavior = ObserverBehavior.valueOf(new String(behavior));
+
         // start/end region operation & sync on scanner is suggested by the
         // javadoc of RegionScanner.nextRaw()
         // FIXME: will the lock still work when a iterator is returned? is it safe? Is readonly attribute helping here? by mhb
@@ -90,7 +95,7 @@ public class AggregateRegionObserver extends BaseRegionObserver {
         region.startRegionOperation();
         try {
             synchronized (innerScanner) {
-                return new AggregationScanner(type, filter, projector, aggregators, innerScanner);
+                return new AggregationScanner(type, filter, projector, aggregators, innerScanner,observerBehavior);
             }
         } finally {
             region.closeRegionOperation();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8048fa4a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregationScanner.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregationScanner.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregationScanner.java
index d9ddeda..e72444e 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregationScanner.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregationScanner.java
@@ -1,163 +1,170 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.storage.hbase.coprocessor.observer;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.kylin.storage.hbase.coprocessor.AggrKey;
-import org.apache.kylin.storage.hbase.coprocessor.CoprocessorFilter;
-import org.apache.kylin.storage.hbase.coprocessor.CoprocessorProjector;
-import org.apache.kylin.storage.hbase.coprocessor.CoprocessorRowType;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.regionserver.RegionScanner;
-
-import org.apache.kylin.metadata.measure.MeasureAggregator;
-
-/**
- * @author yangli9
- * 
- */
-public class AggregationScanner implements RegionScanner {
-
-    private RegionScanner outerScanner;
-
-    public AggregationScanner(CoprocessorRowType type, CoprocessorFilter filter, CoprocessorProjector groupBy, ObserverAggregators aggrs, RegionScanner innerScanner) throws IOException {
-
-        AggregateRegionObserver.LOG.info("Kylin Coprocessor start");
-
-        ObserverAggregationCache aggCache;
-        Stats stats = new Stats();
-
-        aggCache = buildAggrCache(innerScanner, type, groupBy, aggrs, filter, stats);
-        stats.countOutputRow(aggCache.getSize());
-        this.outerScanner = aggCache.getScanner(innerScanner);
-
-        AggregateRegionObserver.LOG.info("Kylin Coprocessor aggregation done: " + stats);
-    }
-
-    @SuppressWarnings("rawtypes")
-    ObserverAggregationCache buildAggrCache(final RegionScanner innerScanner, CoprocessorRowType type, CoprocessorProjector projector, ObserverAggregators aggregators, CoprocessorFilter filter, Stats stats) throws IOException {
-
-        ObserverAggregationCache aggCache = new ObserverAggregationCache(aggregators);
-
-        ObserverTuple tuple = new ObserverTuple(type);
-        boolean hasMore = true;
-        List<Cell> results = new ArrayList<Cell>();
-        while (hasMore) {
-            results.clear();
-            hasMore = innerScanner.nextRaw(results);
-            if (results.isEmpty())
-                continue;
-
-            if (stats != null)
-                stats.countInputRow(results);
-
-            Cell cell = results.get(0);
-            tuple.setUnderlying(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
-            if (filter != null && filter.evaluate(tuple) == false)
-                continue;
-
-            AggrKey aggKey = projector.getAggrKey(results);
-            MeasureAggregator[] bufs = aggCache.getBuffer(aggKey);
-            aggregators.aggregate(bufs, results);
-
-            aggCache.checkMemoryUsage();
-        }
-        return aggCache;
-    }
-
-    @Override
-    public boolean next(List<Cell> results) throws IOException {
-        return outerScanner.next(results);
-    }
-
-    @Override
-    public boolean next(List<Cell> result, int limit) throws IOException {
-        return outerScanner.next(result, limit);
-    }
-
-    @Override
-    public boolean nextRaw(List<Cell> result) throws IOException {
-        return outerScanner.nextRaw(result);
-    }
-
-    @Override
-    public boolean nextRaw(List<Cell> result, int limit) throws IOException {
-        return outerScanner.nextRaw(result, limit);
-    }
-
-    @Override
-    public void close() throws IOException {
-        outerScanner.close();
-    }
-
-    @Override
-    public HRegionInfo getRegionInfo() {
-        return outerScanner.getRegionInfo();
-    }
-
-    @Override
-    public boolean isFilterDone() throws IOException {
-        return outerScanner.isFilterDone();
-    }
-
-    @Override
-    public boolean reseek(byte[] row) throws IOException {
-        return outerScanner.reseek(row);
-    }
-
-    @Override
-    public long getMaxResultSize() {
-        return outerScanner.getMaxResultSize();
-    }
-
-    @Override
-    public long getMvccReadPoint() {
-        return outerScanner.getMvccReadPoint();
-    }
-
-    private static class Stats {
-        long inputRows = 0;
-        long inputBytes = 0;
-        long outputRows = 0;
-
-        // have no outputBytes because that requires actual serialize all the
-        // aggregator buffers
-
-        public void countInputRow(List<Cell> row) {
-            inputRows++;
-            inputBytes += row.get(0).getRowLength();
-            for (int i = 0, n = row.size(); i < n; i++) {
-                inputBytes += row.get(i).getValueLength();
-            }
-        }
-
-        public void countOutputRow(long rowCount) {
-            outputRows += rowCount;
-        }
-
-        public String toString() {
-            double percent = (double) outputRows / inputRows * 100;
-            return Math.round(percent) + "% = " + outputRows + " (out rows) / " + inputRows + " (in rows); in bytes = " + inputBytes + "; est. out bytes = " + Math.round(inputBytes * percent / 100);
-        }
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.coprocessor.observer;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.kylin.metadata.measure.MeasureAggregator;
+import org.apache.kylin.storage.hbase.coprocessor.AggrKey;
+import org.apache.kylin.storage.hbase.coprocessor.CoprocessorFilter;
+import org.apache.kylin.storage.hbase.coprocessor.CoprocessorProjector;
+import org.apache.kylin.storage.hbase.coprocessor.CoprocessorRowType;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * @author yangli9
+ * 
+ */
+public class AggregationScanner implements RegionScanner {
+
+    private RegionScanner outerScanner;
+    private ObserverBehavior behavior;
+
+    public AggregationScanner(CoprocessorRowType type, CoprocessorFilter filter, CoprocessorProjector groupBy, ObserverAggregators aggrs, RegionScanner innerScanner, ObserverBehavior behavior) throws IOException {
+
+        AggregateRegionObserver.LOG.info("Kylin Coprocessor start");
+
+        this.behavior = behavior;
+
+        ObserverAggregationCache aggCache;
+        Stats stats = new Stats();
+
+        aggCache = buildAggrCache(innerScanner, type, groupBy, aggrs, filter, stats);
+        stats.countOutputRow(aggCache.getSize());
+        this.outerScanner = aggCache.getScanner(innerScanner);
+
+        AggregateRegionObserver.LOG.info("Kylin Coprocessor aggregation done: " + stats);
+    }
+
+    @SuppressWarnings("rawtypes")
+    ObserverAggregationCache buildAggrCache(final RegionScanner innerScanner, CoprocessorRowType type, CoprocessorProjector projector, ObserverAggregators aggregators, CoprocessorFilter filter, Stats stats) throws IOException {
+
+        ObserverAggregationCache aggCache = new ObserverAggregationCache(aggregators);
+
+        ObserverTuple tuple = new ObserverTuple(type);
+        boolean hasMore = true;
+        List<Cell> results = new ArrayList<Cell>();
+        while (hasMore) {
+            results.clear();
+            hasMore = innerScanner.nextRaw(results);
+            if (results.isEmpty())
+                continue;
+
+            if (stats != null)
+                stats.countInputRow(results);
+
+            Cell cell = results.get(0);
+            tuple.setUnderlying(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
+
+            if (behavior.ordinal() >= ObserverBehavior.SCAN_FILTER.ordinal()) {
+                if (filter != null && filter.evaluate(tuple) == false)
+                    continue;
+
+                if (behavior.ordinal() >= ObserverBehavior.SCAN_FILTER_AGGR.ordinal()) {
+                    AggrKey aggKey = projector.getAggrKey(results);
+                    MeasureAggregator[] bufs = aggCache.getBuffer(aggKey);
+                    aggregators.aggregate(bufs, results);
+
+                    aggCache.checkMemoryUsage();
+                }
+            }
+        }
+        return aggCache;
+    }
+
+    @Override
+    public boolean next(List<Cell> results) throws IOException {
+        return outerScanner.next(results);
+    }
+
+    @Override
+    public boolean next(List<Cell> result, int limit) throws IOException {
+        return outerScanner.next(result, limit);
+    }
+
+    @Override
+    public boolean nextRaw(List<Cell> result) throws IOException {
+        return outerScanner.nextRaw(result);
+    }
+
+    @Override
+    public boolean nextRaw(List<Cell> result, int limit) throws IOException {
+        return outerScanner.nextRaw(result, limit);
+    }
+
+    @Override
+    public void close() throws IOException {
+        outerScanner.close();
+    }
+
+    @Override
+    public HRegionInfo getRegionInfo() {
+        return outerScanner.getRegionInfo();
+    }
+
+    @Override
+    public boolean isFilterDone() throws IOException {
+        return outerScanner.isFilterDone();
+    }
+
+    @Override
+    public boolean reseek(byte[] row) throws IOException {
+        return outerScanner.reseek(row);
+    }
+
+    @Override
+    public long getMaxResultSize() {
+        return outerScanner.getMaxResultSize();
+    }
+
+    @Override
+    public long getMvccReadPoint() {
+        return outerScanner.getMvccReadPoint();
+    }
+
+    private static class Stats {
+        long inputRows = 0;
+        long inputBytes = 0;
+        long outputRows = 0;
+
+        // have no outputBytes because that requires actual serialize all the
+        // aggregator buffers
+
+        public void countInputRow(List<Cell> row) {
+            inputRows++;
+            inputBytes += row.get(0).getRowLength();
+            for (int i = 0, n = row.size(); i < n; i++) {
+                inputBytes += row.get(i).getValueLength();
+            }
+        }
+
+        public void countOutputRow(long rowCount) {
+            outputRows += rowCount;
+        }
+
+        public String toString() {
+            double percent = (double) outputRows / inputRows * 100;
+            return Math.round(percent) + "% = " + outputRows + " (out rows) / " + inputRows + " (in rows); in bytes = " + inputBytes + "; est. out bytes = " + Math.round(inputBytes * percent / 100);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8048fa4a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverBehavior.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverBehavior.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverBehavior.java
new file mode 100644
index 0000000..62c5bac
--- /dev/null
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverBehavior.java
@@ -0,0 +1,9 @@
+package org.apache.kylin.storage.hbase.coprocessor.observer;
+
+/**
+ */
+public enum ObserverBehavior {
+    SCAN, //only scan data, used for profiling tuple scan speed
+    SCAN_FILTER, //only scan+filter used,used for profiling filter speed
+    SCAN_FILTER_AGGR, //default normal behavior
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8048fa4a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java
index 420e4c4..1c94fe5 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java
@@ -24,14 +24,6 @@ import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
-import org.apache.kylin.storage.StorageContext;
-import org.apache.kylin.storage.hbase.coprocessor.FilterDecorator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.debug.BackdoorToggles;
 import org.apache.kylin.cube.CubeInstance;
@@ -81,10 +73,11 @@ public class ObserverEnabler {
 
         if (localCoprocessor) {
             RegionScanner innerScanner = new RegionScannerAdapter(table.getScanner(scan));
-            AggregationScanner aggrScanner = new AggregationScanner(type, filter, projector, aggrs, innerScanner);
+            AggregationScanner aggrScanner = new AggregationScanner(type, filter, projector, aggrs, innerScanner, ObserverBehavior.SCAN_FILTER_AGGR);
             return new ResultScannerAdapter(aggrScanner);
         } else {
             scan.setAttribute(AggregateRegionObserver.COPROCESSOR_ENABLE, new byte[] { 0x01 });
+            scan.setAttribute(AggregateRegionObserver.BEHAVIOR, ObserverBehavior.SCAN_FILTER_AGGR.toString().getBytes());
             scan.setAttribute(AggregateRegionObserver.TYPE, CoprocessorRowType.serialize(type));
             scan.setAttribute(AggregateRegionObserver.PROJECTOR, CoprocessorProjector.serialize(projector));
             scan.setAttribute(AggregateRegionObserver.AGGREGATORS, ObserverAggregators.serialize(aggrs));

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8048fa4a/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserverTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserverTest.java b/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserverTest.java
index b6ee99a..b9f1f6a 100644
--- a/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserverTest.java
+++ b/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserverTest.java
@@ -18,36 +18,36 @@
 
 package org.apache.kylin.storage.hbase.coprocessor.observer;
 
-import static org.junit.Assert.*;
-
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-
-import org.apache.kylin.metadata.model.ColumnDesc;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.storage.hbase.coprocessor.CoprocessorFilter;
-import org.apache.kylin.storage.hbase.coprocessor.CoprocessorProjector;
-import org.apache.kylin.storage.hbase.coprocessor.CoprocessorRowType;
+import com.google.common.collect.Lists;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValue.Type;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
-import org.apache.kylin.common.util.Bytes;
 import org.apache.hadoop.io.LongWritable;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.metadata.model.ColumnDesc;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.storage.hbase.coprocessor.CoprocessorFilter;
+import org.apache.kylin.storage.hbase.coprocessor.CoprocessorProjector;
+import org.apache.kylin.storage.hbase.coprocessor.CoprocessorRowType;
+import org.apache.kylin.storage.hbase.coprocessor.observer.ObserverAggregators.HCol;
 import org.junit.Before;
 import org.junit.Test;
 
-import com.google.common.collect.Lists;
-import org.apache.kylin.cube.kv.RowConstants;
-import org.apache.kylin.storage.hbase.coprocessor.observer.ObserverAggregators.HCol;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 /**
  * @author yangli9
@@ -122,7 +122,7 @@ public class AggregateRegionObserverTest {
 
         MockupRegionScanner innerScanner = new MockupRegionScanner(cellsInput);
 
-        RegionScanner aggrScanner = new AggregationScanner(rowType, filter, projector, aggregators, innerScanner);
+        RegionScanner aggrScanner = new AggregationScanner(rowType, filter, projector, aggregators, innerScanner, ObserverBehavior.SCAN_FILTER_AGGR);
         ArrayList<Cell> result = Lists.newArrayList();
         boolean hasMore = true;
         while (hasMore) {
@@ -171,7 +171,7 @@ public class AggregateRegionObserverTest {
 
         MockupRegionScanner innerScanner = new MockupRegionScanner(cellsInput);
 
-        RegionScanner aggrScanner = new AggregationScanner(rowType, filter, projector, aggregators, innerScanner);
+        RegionScanner aggrScanner = new AggregationScanner(rowType, filter, projector, aggregators, innerScanner,ObserverBehavior.SCAN_FILTER_AGGR);
         ArrayList<Cell> result = Lists.newArrayList();
         boolean hasMore = true;
         while (hasMore) {


[5/5] incubator-kylin git commit: KYLIN-860 add ObserverBehavior.SCAN_FILTER_AGGR_CHECKMEM

Posted by ma...@apache.org.
KYLIN-860 add ObserverBehavior.SCAN_FILTER_AGGR_CHECKMEM


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/c2bb3a76
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/c2bb3a76
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/c2bb3a76

Branch: refs/heads/0.8
Commit: c2bb3a763aa9a826b15aff027e7b4c8aa36fa653
Parents: b8962c1
Author: honma <ho...@ebay.com>
Authored: Thu Jun 25 16:03:54 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Thu Jun 25 16:03:54 2015 +0800

----------------------------------------------------------------------
 .../hbase/coprocessor/observer/AggregateRegionObserver.java      | 2 +-
 .../storage/hbase/coprocessor/observer/AggregationScanner.java   | 4 +++-
 .../storage/hbase/coprocessor/observer/ObserverBehavior.java     | 3 ++-
 .../storage/hbase/coprocessor/observer/ObserverEnabler.java      | 4 ++--
 .../hbase/coprocessor/observer/AggregateRegionObserverTest.java  | 4 ++--
 5 files changed, 10 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c2bb3a76/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserver.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserver.java
index 8771bf4..cf5b8d1 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserver.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserver.java
@@ -84,7 +84,7 @@ public class AggregateRegionObserver extends BaseRegionObserver {
         byte[] filterBytes = scan.getAttribute(FILTER);
         CoprocessorFilter filter = CoprocessorFilter.deserialize(filterBytes);
 
-        ObserverBehavior observerBehavior = ObserverBehavior.SCAN_FILTER_AGGR;
+        ObserverBehavior observerBehavior = ObserverBehavior.SCAN_FILTER_AGGR_CHECKMEM;
         byte[] behavior = scan.getAttribute(BEHAVIOR);
         if (behavior != null && behavior.length != 0) {
             observerBehavior = ObserverBehavior.valueOf(new String(behavior));

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c2bb3a76/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregationScanner.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregationScanner.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregationScanner.java
index e72444e..30c590b 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregationScanner.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregationScanner.java
@@ -85,7 +85,9 @@ public class AggregationScanner implements RegionScanner {
                     MeasureAggregator[] bufs = aggCache.getBuffer(aggKey);
                     aggregators.aggregate(bufs, results);
 
-                    aggCache.checkMemoryUsage();
+                    if (behavior.ordinal() >= ObserverBehavior.SCAN_FILTER_AGGR_CHECKMEM.ordinal()) {
+                        aggCache.checkMemoryUsage();
+                    }
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c2bb3a76/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverBehavior.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverBehavior.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverBehavior.java
index 62c5bac..ff6060d 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverBehavior.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverBehavior.java
@@ -5,5 +5,6 @@ package org.apache.kylin.storage.hbase.coprocessor.observer;
 public enum ObserverBehavior {
     SCAN, //only scan data, used for profiling tuple scan speed
     SCAN_FILTER, //only scan+filter used,used for profiling filter speed
-    SCAN_FILTER_AGGR, //default normal behavior
+    SCAN_FILTER_AGGR, //aggregate the result
+    SCAN_FILTER_AGGR_CHECKMEM, //default full operations
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c2bb3a76/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java
index d9933d6..c1ef5f8 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java
@@ -73,14 +73,14 @@ public class ObserverEnabler {
 
         if (localCoprocessor) {
             RegionScanner innerScanner = new RegionScannerAdapter(table.getScanner(scan));
-            AggregationScanner aggrScanner = new AggregationScanner(type, filter, projector, aggrs, innerScanner, ObserverBehavior.SCAN_FILTER_AGGR);
+            AggregationScanner aggrScanner = new AggregationScanner(type, filter, projector, aggrs, innerScanner, ObserverBehavior.SCAN_FILTER_AGGR_CHECKMEM);
             return new ResultScannerAdapter(aggrScanner);
         } else {
 
             // debug/profiling purpose
             String toggle = BackdoorToggles.getObserverBehavior();
             if (toggle == null) {
-                toggle = ObserverBehavior.SCAN_FILTER_AGGR.toString(); //default behavior
+                toggle = ObserverBehavior.SCAN_FILTER_AGGR_CHECKMEM.toString(); //default behavior
             } else {
                 logger.info("The execution of this query will use " + toggle + " as observer's behavior");
             }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c2bb3a76/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserverTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserverTest.java b/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserverTest.java
index b9f1f6a..ba09b51 100644
--- a/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserverTest.java
+++ b/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserverTest.java
@@ -122,7 +122,7 @@ public class AggregateRegionObserverTest {
 
         MockupRegionScanner innerScanner = new MockupRegionScanner(cellsInput);
 
-        RegionScanner aggrScanner = new AggregationScanner(rowType, filter, projector, aggregators, innerScanner, ObserverBehavior.SCAN_FILTER_AGGR);
+        RegionScanner aggrScanner = new AggregationScanner(rowType, filter, projector, aggregators, innerScanner, ObserverBehavior.SCAN_FILTER_AGGR_CHECKMEM);
         ArrayList<Cell> result = Lists.newArrayList();
         boolean hasMore = true;
         while (hasMore) {
@@ -171,7 +171,7 @@ public class AggregateRegionObserverTest {
 
         MockupRegionScanner innerScanner = new MockupRegionScanner(cellsInput);
 
-        RegionScanner aggrScanner = new AggregationScanner(rowType, filter, projector, aggregators, innerScanner,ObserverBehavior.SCAN_FILTER_AGGR);
+        RegionScanner aggrScanner = new AggregationScanner(rowType, filter, projector, aggregators, innerScanner,ObserverBehavior.SCAN_FILTER_AGGR_CHECKMEM);
         ArrayList<Cell> result = Lists.newArrayList();
         boolean hasMore = true;
         while (hasMore) {


[4/5] incubator-kylin git commit: bug fix: after cherry pick

Posted by ma...@apache.org.
bug fix: after cherry pick


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/b8962c15
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/b8962c15
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/b8962c15

Branch: refs/heads/0.8
Commit: b8962c150fbfd40f0f347c92d7c5459c81b1d362
Parents: dfe3a50
Author: honma <ho...@ebay.com>
Authored: Thu Jun 25 15:55:25 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Thu Jun 25 15:55:25 2015 +0800

----------------------------------------------------------------------
 .../apache/kylin/rest/controller/QueryController.java    |  3 +++
 .../org/apache/kylin/storage/hbase/HBaseKeyRange.java    | 11 ++++-------
 .../hbase/coprocessor/observer/ObserverEnabler.java      |  9 +++++----
 3 files changed, 12 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b8962c15/server/src/main/java/org/apache/kylin/rest/controller/QueryController.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/controller/QueryController.java b/server/src/main/java/org/apache/kylin/rest/controller/QueryController.java
index 55e3946..5f4b985 100644
--- a/server/src/main/java/org/apache/kylin/rest/controller/QueryController.java
+++ b/server/src/main/java/org/apache/kylin/rest/controller/QueryController.java
@@ -40,6 +40,8 @@ import org.apache.kylin.rest.request.SaveSqlRequest;
 import org.apache.kylin.rest.response.SQLResponse;
 import org.apache.kylin.rest.service.QueryService;
 import org.apache.kylin.rest.util.QueryUtil;
+import org.apache.kylin.storage.cache.AbstractCacheFledgedStorageEngine;
+import org.apache.kylin.storage.hbase.ScanOutOfLimitException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -51,6 +53,7 @@ import org.supercsv.io.CsvListWriter;
 import org.supercsv.io.ICsvListWriter;
 import org.supercsv.prefs.CsvPreference;
 
+import javax.annotation.PostConstruct;
 import javax.servlet.http.HttpServletResponse;
 import java.io.IOException;
 import java.sql.SQLException;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b8962c15/storage/src/main/java/org/apache/kylin/storage/hbase/HBaseKeyRange.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/HBaseKeyRange.java b/storage/src/main/java/org/apache/kylin/storage/hbase/HBaseKeyRange.java
index d2853bc..51789ce 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/HBaseKeyRange.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/HBaseKeyRange.java
@@ -158,13 +158,10 @@ public class HBaseKeyRange implements Comparable<HBaseKeyRange> {
     private List<Pair<byte[], byte[]>> buildFuzzyKeys(Map<TblColRef, Set<String>> fuzzyValueSet) {
         ArrayList<Pair<byte[], byte[]>> result = new ArrayList<Pair<byte[], byte[]>>();
 
-        //debug/profiling purpose
-        String toggle;
-        if ((toggle = BackdoorToggles.getToggle(BackdoorToggles.DEBUG_TOGGLE_DISABLE_FUZZY_KEY)) != null) {
-            if (Boolean.valueOf(toggle)) {
-                logger.info("The execution of this query will not use fuzzy key");
-                return result;
-            }
+        // debug/profiling purpose
+        if (BackdoorToggles.getDisableFuzzyKey()) {
+            logger.info("The execution of this query will not use fuzzy key");
+            return result;
         }
 
         FuzzyKeyEncoder fuzzyKeyEncoder = new FuzzyKeyEncoder(cubeSeg, cuboid);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b8962c15/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java
index 25b198a..d9933d6 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.debug.BackdoorToggles;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeSegment;
@@ -76,10 +77,10 @@ public class ObserverEnabler {
             return new ResultScannerAdapter(aggrScanner);
         } else {
 
-            //debug/profiling purpose
-            String toggle;
-            if ((toggle = BackdoorToggles.getToggle(BackdoorToggles.DEBUG_TOGGLE_OBSERVER_BEHAVIOR)) == null) {
-                toggle = ObserverBehavior.SCAN_FILTER_AGGR.toString();//default behavior
+            // debug/profiling purpose
+            String toggle = BackdoorToggles.getObserverBehavior();
+            if (toggle == null) {
+                toggle = ObserverBehavior.SCAN_FILTER_AGGR.toString(); //default behavior
             } else {
                 logger.info("The execution of this query will use " + toggle + " as observer's behavior");
             }


[2/5] incubator-kylin git commit: KYLIN-844 add BackdoorToggles to control fuzzy key

Posted by ma...@apache.org.
KYLIN-844 add BackdoorToggles to control fuzzy key


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/2d4aea8d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/2d4aea8d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/2d4aea8d

Branch: refs/heads/0.8
Commit: 2d4aea8df378829508768b1a2c09335ae0e4d1dd
Parents: 8048fa4
Author: honma <ho...@ebay.com>
Authored: Thu Jun 25 15:50:16 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Thu Jun 25 15:50:16 2015 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/common/util/BytesUtil.java | 12 ++--
 .../apache/kylin/common/util/BytesUtilTest.java | 10 +++
 .../apache/kylin/job/tools/RowCounterCLI.java   | 67 ++++++++++++++++++++
 .../kylin/rest/controller/QueryController.java  | 25 ++++++--
 .../apache/kylin/rest/request/SQLRequest.java   | 13 ++++
 .../kylin/storage/hbase/HBaseKeyRange.java      | 15 +++++
 .../coprocessor/observer/ObserverEnabler.java   | 12 +++-
 7 files changed, 143 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2d4aea8d/common/src/main/java/org/apache/kylin/common/util/BytesUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/util/BytesUtil.java b/common/src/main/java/org/apache/kylin/common/util/BytesUtil.java
index bb1bab4..c9f1e08 100644
--- a/common/src/main/java/org/apache/kylin/common/util/BytesUtil.java
+++ b/common/src/main/java/org/apache/kylin/common/util/BytesUtil.java
@@ -18,16 +18,15 @@
 
 package org.apache.kylin.common.util;
 
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.Writable;
+
 import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.nio.ByteBuffer;
 
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.hadoop.io.Writable;
-
 public class BytesUtil {
 
     public static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
@@ -394,6 +393,10 @@ public class BytesUtil {
         return toHex(array);
     }
 
+
+    /**
+     * this method only works for hex strings
+     */
     public static byte[] fromReadableText(String text) {
         String[] tokens = text.split("\\\\x");
         byte[] ret = new byte[tokens.length - 1];
@@ -422,6 +425,7 @@ public class BytesUtil {
         return sb.toString();
     }
 
+
     public static void main(String[] args) throws Exception {
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2d4aea8d/common/src/test/java/org/apache/kylin/common/util/BytesUtilTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/kylin/common/util/BytesUtilTest.java b/common/src/test/java/org/apache/kylin/common/util/BytesUtilTest.java
index 6d554bb..7d4dea9 100644
--- a/common/src/test/java/org/apache/kylin/common/util/BytesUtilTest.java
+++ b/common/src/test/java/org/apache/kylin/common/util/BytesUtilTest.java
@@ -78,4 +78,14 @@ public class BytesUtilTest extends TestCase {
     }
 
 
+
+    public void testReadable()
+    {
+        String x = "\\x00\\x00\\x00\\x00\\x00\\x01\\xFC\\xA8";
+        byte[] bytes = BytesUtil.fromReadableText(x);
+        String y = BytesUtil.toHex(bytes);
+        assertEquals(x,y);
+    }
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2d4aea8d/job/src/main/java/org/apache/kylin/job/tools/RowCounterCLI.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/tools/RowCounterCLI.java b/job/src/main/java/org/apache/kylin/job/tools/RowCounterCLI.java
new file mode 100644
index 0000000..b80063d
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/job/tools/RowCounterCLI.java
@@ -0,0 +1,67 @@
+package org.apache.kylin.job.tools;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.BytesUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ */
+public class RowCounterCLI {
+    private static final Logger logger = LoggerFactory.getLogger(RowCounterCLI.class);
+
+    public static void main(String[] args) throws IOException {
+
+        if (args == null || args.length != 3) {
+            System.out.println("Usage: hbase org.apache.hadoop.util.RunJar kylin-job-latest.jar org.apache.kylin.job.tools.RowCounterCLI [HTABLE_NAME] [STARTKEY] [ENDKEY]");
+        }
+
+        System.out.println(args[0]);
+        String htableName = args[0];
+        System.out.println(args[1]);
+        byte[] startKey = BytesUtil.fromReadableText(args[1]);
+        System.out.println(args[2]);
+        byte[] endKey = BytesUtil.fromReadableText(args[2]);
+
+        if (startKey == null) {
+            System.out.println("startkey is null ");
+        } else {
+            System.out.println("startkey lenght: " + startKey.length);
+        }
+
+        System.out.println("start key in binary: " + Bytes.toStringBinary(startKey));
+        System.out.println("end key in binary: " + Bytes.toStringBinary(endKey));
+
+        Configuration conf = HBaseConfiguration.create();
+
+        Scan scan = new Scan();
+        scan.setCaching(1024);
+        scan.setCacheBlocks(true);
+        scan.setStartRow(startKey);
+        scan.setStopRow(endKey);
+
+        logger.info("My Scan " + scan.toString());
+
+        HConnection conn = HConnectionManager.createConnection(conf);
+        HTableInterface tableInterface = conn.getTable(htableName);
+
+        Iterator<Result> iterator = tableInterface.getScanner(scan).iterator();
+        int counter = 0;
+        while (iterator.hasNext()) {
+            iterator.next();
+            counter++;
+            if (counter % 1000 == 1) {
+                System.out.println("number of rows: " + counter);
+            }
+        }
+        System.out.println("number of rows: " + counter);
+        tableInterface.close();
+        conn.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2d4aea8d/server/src/main/java/org/apache/kylin/rest/controller/QueryController.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/controller/QueryController.java b/server/src/main/java/org/apache/kylin/rest/controller/QueryController.java
index e3cfc95..55e3946 100644
--- a/server/src/main/java/org/apache/kylin/rest/controller/QueryController.java
+++ b/server/src/main/java/org/apache/kylin/rest/controller/QueryController.java
@@ -25,6 +25,7 @@ import net.sf.ehcache.CacheManager;
 import net.sf.ehcache.Element;
 import org.apache.commons.io.IOUtils;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.debug.BackdoorToggles;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.rest.constant.Constant;
 import org.apache.kylin.rest.exception.ForbiddenException;
@@ -39,8 +40,6 @@ import org.apache.kylin.rest.request.SaveSqlRequest;
 import org.apache.kylin.rest.response.SQLResponse;
 import org.apache.kylin.rest.service.QueryService;
 import org.apache.kylin.rest.util.QueryUtil;
-import org.apache.kylin.storage.cache.AbstractCacheFledgedStorageEngine;
-import org.apache.kylin.storage.hbase.ScanOutOfLimitException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -52,13 +51,13 @@ import org.supercsv.io.CsvListWriter;
 import org.supercsv.io.ICsvListWriter;
 import org.supercsv.prefs.CsvPreference;
 
-import javax.annotation.PostConstruct;
 import javax.servlet.http.HttpServletResponse;
 import java.io.IOException;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
+import java.util.Map;
 
 /**
  * Handle query requests.
@@ -88,13 +87,15 @@ public class QueryController extends BasicController {
     @ResponseBody
     @Timed(name = "query")
     public SQLResponse query(@RequestBody SQLRequest sqlRequest) {
-        long startTimestamp = System.currentTimeMillis();
+        initDebugToggles(sqlRequest);
 
+        long startTimestamp = System.currentTimeMillis();
         SQLResponse response = doQuery(sqlRequest);
-
         response.setDuration(System.currentTimeMillis() - startTimestamp);
         queryService.logQuery(sqlRequest, response, new Date(startTimestamp), new Date(System.currentTimeMillis()));
 
+        cleanupDebugToggles();
+
         return response;
     }
 
@@ -257,4 +258,18 @@ public class QueryController extends BasicController {
         this.cacheManager = cacheManager;
     }
 
+    private void initDebugToggles(SQLRequest sqlRequest) {
+
+        Map<String, String> toggles = sqlRequest.getBackdoorToggles();
+        if (toggles == null || toggles.size() == 0) {
+            return;
+        }
+
+        BackdoorToggles.setToggles(toggles);
+    }
+
+    private void cleanupDebugToggles() {
+        BackdoorToggles.cleanToggles();
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2d4aea8d/server/src/main/java/org/apache/kylin/rest/request/SQLRequest.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/request/SQLRequest.java b/server/src/main/java/org/apache/kylin/rest/request/SQLRequest.java
index a75ddfb..901badd 100644
--- a/server/src/main/java/org/apache/kylin/rest/request/SQLRequest.java
+++ b/server/src/main/java/org/apache/kylin/rest/request/SQLRequest.java
@@ -18,6 +18,8 @@
 
 package org.apache.kylin.rest.request;
 
+import java.util.Map;
+
 public class SQLRequest {
 
     private String sql;
@@ -26,9 +28,20 @@ public class SQLRequest {
     private Integer limit = 0;
     private boolean acceptPartial = true;
 
+    private Map<String,String> backdoorToggles;
+
     public SQLRequest() {
     }
 
+    public Map<String, String> getBackdoorToggles() {
+        return backdoorToggles;
+    }
+
+    public void setBackdoorToggles(Map<String, String> backdoorToggles) {
+        this.backdoorToggles = backdoorToggles;
+    }
+
+
     public String getProject() {
         return project;
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2d4aea8d/storage/src/main/java/org/apache/kylin/storage/hbase/HBaseKeyRange.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/HBaseKeyRange.java b/storage/src/main/java/org/apache/kylin/storage/hbase/HBaseKeyRange.java
index 6d77fa2..d2853bc 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/HBaseKeyRange.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/HBaseKeyRange.java
@@ -20,6 +20,7 @@ package org.apache.kylin.storage.hbase;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import org.apache.kylin.common.debug.BackdoorToggles;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.BytesUtil;
 import org.apache.kylin.common.util.DateFormat;
@@ -32,6 +33,8 @@ import org.apache.kylin.cube.kv.FuzzyMaskEncoder;
 import org.apache.kylin.cube.kv.RowConstants;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.metadata.model.TblColRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.*;
 
@@ -42,6 +45,8 @@ import java.util.*;
  */
 public class HBaseKeyRange implements Comparable<HBaseKeyRange> {
 
+    private static final Logger logger = LoggerFactory.getLogger(HBaseKeyRange.class);
+
     private static final int FUZZY_VALUE_CAP = 20;
     private static final byte[] ZERO_TAIL_BYTES = new byte[] { 0 };
 
@@ -145,6 +150,7 @@ public class HBaseKeyRange implements Comparable<HBaseKeyRange> {
             buf.append(BytesUtil.toHex(fuzzyKey.getFirst()));
             buf.append(" ");
             buf.append(BytesUtil.toHex(fuzzyKey.getSecond()));
+            buf.append(System.lineSeparator());
         }
         this.fuzzyKeyString = buf.toString();
     }
@@ -152,6 +158,15 @@ public class HBaseKeyRange implements Comparable<HBaseKeyRange> {
     private List<Pair<byte[], byte[]>> buildFuzzyKeys(Map<TblColRef, Set<String>> fuzzyValueSet) {
         ArrayList<Pair<byte[], byte[]>> result = new ArrayList<Pair<byte[], byte[]>>();
 
+        //debug/profiling purpose
+        String toggle;
+        if ((toggle = BackdoorToggles.getToggle(BackdoorToggles.DEBUG_TOGGLE_DISABLE_FUZZY_KEY)) != null) {
+            if (Boolean.valueOf(toggle)) {
+                logger.info("The execution of this query will not use fuzzy key");
+                return result;
+            }
+        }
+
         FuzzyKeyEncoder fuzzyKeyEncoder = new FuzzyKeyEncoder(cubeSeg, cuboid);
         FuzzyMaskEncoder fuzzyMaskEncoder = new FuzzyMaskEncoder(cubeSeg, cuboid);
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2d4aea8d/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java
index 1c94fe5..25b198a 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java
@@ -24,7 +24,6 @@ import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
-import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.debug.BackdoorToggles;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeSegment;
@@ -76,8 +75,17 @@ public class ObserverEnabler {
             AggregationScanner aggrScanner = new AggregationScanner(type, filter, projector, aggrs, innerScanner, ObserverBehavior.SCAN_FILTER_AGGR);
             return new ResultScannerAdapter(aggrScanner);
         } else {
+
+            //debug/profiling purpose
+            String toggle;
+            if ((toggle = BackdoorToggles.getToggle(BackdoorToggles.DEBUG_TOGGLE_OBSERVER_BEHAVIOR)) == null) {
+                toggle = ObserverBehavior.SCAN_FILTER_AGGR.toString();//default behavior
+            } else {
+                logger.info("The execution of this query will use " + toggle + " as observer's behavior");
+            }
+
             scan.setAttribute(AggregateRegionObserver.COPROCESSOR_ENABLE, new byte[] { 0x01 });
-            scan.setAttribute(AggregateRegionObserver.BEHAVIOR, ObserverBehavior.SCAN_FILTER_AGGR.toString().getBytes());
+            scan.setAttribute(AggregateRegionObserver.BEHAVIOR, toggle.getBytes());
             scan.setAttribute(AggregateRegionObserver.TYPE, CoprocessorRowType.serialize(type));
             scan.setAttribute(AggregateRegionObserver.PROJECTOR, CoprocessorProjector.serialize(projector));
             scan.setAttribute(AggregateRegionObserver.AGGREGATORS, ObserverAggregators.serialize(aggrs));


[3/5] incubator-kylin git commit: KYLIN-844 default observer behavior if no attribute hint found

Posted by ma...@apache.org.
KYLIN-844 default observer behavior if no attribute hint found


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/dfe3a50a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/dfe3a50a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/dfe3a50a

Branch: refs/heads/0.8
Commit: dfe3a50abfd9dbae185a6a23f54c04cdd71d4c78
Parents: 2d4aea8
Author: honma <ho...@ebay.com>
Authored: Thu Jun 25 15:50:56 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Thu Jun 25 15:50:56 2015 +0800

----------------------------------------------------------------------
 .../hbase/coprocessor/observer/AggregateRegionObserver.java  | 8 +++++---
 1 file changed, 5 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/dfe3a50a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserver.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserver.java
index aea2cea..8771bf4 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserver.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserver.java
@@ -48,7 +48,6 @@ public class AggregateRegionObserver extends BaseRegionObserver {
     static final String FILTER = "_Filter";
     static final String BEHAVIOR = "_Behavior";
 
-
     @Override
     public final RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> ctxt, final Scan scan, final RegionScanner innerScanner) throws IOException {
 
@@ -85,8 +84,11 @@ public class AggregateRegionObserver extends BaseRegionObserver {
         byte[] filterBytes = scan.getAttribute(FILTER);
         CoprocessorFilter filter = CoprocessorFilter.deserialize(filterBytes);
 
+        ObserverBehavior observerBehavior = ObserverBehavior.SCAN_FILTER_AGGR;
         byte[] behavior = scan.getAttribute(BEHAVIOR);
-        ObserverBehavior observerBehavior = ObserverBehavior.valueOf(new String(behavior));
+        if (behavior != null && behavior.length != 0) {
+            observerBehavior = ObserverBehavior.valueOf(new String(behavior));
+        }
 
         // start/end region operation & sync on scanner is suggested by the
         // javadoc of RegionScanner.nextRaw()
@@ -95,7 +97,7 @@ public class AggregateRegionObserver extends BaseRegionObserver {
         region.startRegionOperation();
         try {
             synchronized (innerScanner) {
-                return new AggregationScanner(type, filter, projector, aggregators, innerScanner,observerBehavior);
+                return new AggregationScanner(type, filter, projector, aggregators, innerScanner, observerBehavior);
             }
         } finally {
             region.closeRegionOperation();