You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/07/11 01:26:49 UTC

[03/50] [abbrv] incubator-kylin git commit: KYLIN-844 control observer behavior

KYLIN-844 control observer behavior


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/ad4a28af
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/ad4a28af
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/ad4a28af

Branch: refs/heads/0.7
Commit: ad4a28af5e07fb75ec5d9a5eda11cd7409b033b1
Parents: 198be9e
Author: honma <ho...@ebay.com>
Authored: Fri Jun 19 18:29:59 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Wed Jun 24 12:02:25 2015 +0800

----------------------------------------------------------------------
 .../observer/AggregateRegionObserver.java       | 17 +++++---
 .../observer/AggregationScanner.java            | 37 +++++++++-------
 .../coprocessor/observer/ObserverBehavior.java  |  9 ++++
 .../coprocessor/observer/ObserverEnabler.java   | 33 +++++++--------
 .../observer/AggregateRegionObserverTest.java   | 44 ++++++++++----------
 5 files changed, 80 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ad4a28af/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserver.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserver.java
index d893621..aea2cea 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserver.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserver.java
@@ -18,11 +18,6 @@
 
 package org.apache.kylin.storage.hbase.coprocessor.observer;
 
-import java.io.IOException;
-
-import org.apache.kylin.storage.hbase.coprocessor.CoprocessorFilter;
-import org.apache.kylin.storage.hbase.coprocessor.CoprocessorProjector;
-import org.apache.kylin.storage.hbase.coprocessor.CoprocessorRowType;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.client.Scan;
@@ -32,6 +27,11 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.kylin.storage.hbase.coprocessor.CoprocessorFilter;
+import org.apache.kylin.storage.hbase.coprocessor.CoprocessorProjector;
+import org.apache.kylin.storage.hbase.coprocessor.CoprocessorRowType;
+
+import java.io.IOException;
 
 /**
  * @author yangli9
@@ -46,6 +46,8 @@ public class AggregateRegionObserver extends BaseRegionObserver {
     static final String PROJECTOR = "_Projector";
     static final String AGGREGATORS = "_Aggregators";
     static final String FILTER = "_Filter";
+    static final String BEHAVIOR = "_Behavior";
+
 
     @Override
     public final RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> ctxt, final Scan scan, final RegionScanner innerScanner) throws IOException {
@@ -83,6 +85,9 @@ public class AggregateRegionObserver extends BaseRegionObserver {
         byte[] filterBytes = scan.getAttribute(FILTER);
         CoprocessorFilter filter = CoprocessorFilter.deserialize(filterBytes);
 
+        byte[] behavior = scan.getAttribute(BEHAVIOR);
+        ObserverBehavior observerBehavior = ObserverBehavior.valueOf(new String(behavior));
+
         // start/end region operation & sync on scanner is suggested by the
         // javadoc of RegionScanner.nextRaw()
         // FIXME: will the lock still work when a iterator is returned? is it safe? Is readonly attribute helping here? by mhb
@@ -90,7 +95,7 @@ public class AggregateRegionObserver extends BaseRegionObserver {
         region.startRegionOperation();
         try {
             synchronized (innerScanner) {
-                return new AggregationScanner(type, filter, projector, aggregators, innerScanner);
+                return new AggregationScanner(type, filter, projector, aggregators, innerScanner,observerBehavior);
             }
         } finally {
             region.closeRegionOperation();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ad4a28af/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregationScanner.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregationScanner.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregationScanner.java
index 6bdb53a..85f3f6e 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregationScanner.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregationScanner.java
@@ -18,18 +18,17 @@
 
 package org.apache.kylin.storage.hbase.coprocessor.observer;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.kylin.storage.hbase.coprocessor.CoprocessorFilter;
-import org.apache.kylin.storage.hbase.coprocessor.CoprocessorProjector;
-import org.apache.kylin.storage.hbase.coprocessor.CoprocessorRowType;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
-
 import org.apache.kylin.metadata.measure.MeasureAggregator;
+import org.apache.kylin.storage.hbase.coprocessor.CoprocessorFilter;
+import org.apache.kylin.storage.hbase.coprocessor.CoprocessorProjector;
+import org.apache.kylin.storage.hbase.coprocessor.CoprocessorRowType;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 
 /**
  * @author yangli9
@@ -38,11 +37,14 @@ import org.apache.kylin.metadata.measure.MeasureAggregator;
 public class AggregationScanner implements RegionScanner {
 
     private RegionScanner outerScanner;
+    private ObserverBehavior behavior;
 
-    public AggregationScanner(CoprocessorRowType type, CoprocessorFilter filter, CoprocessorProjector groupBy, ObserverAggregators aggrs, RegionScanner innerScanner) throws IOException {
+    public AggregationScanner(CoprocessorRowType type, CoprocessorFilter filter, CoprocessorProjector groupBy, ObserverAggregators aggrs, RegionScanner innerScanner, ObserverBehavior behavior) throws IOException {
 
         AggregateRegionObserver.LOG.info("Kylin Coprocessor start");
 
+        this.behavior = behavior;
+
         ObserverAggregationCache aggCache;
         Stats stats = new Stats();
 
@@ -72,14 +74,19 @@ public class AggregationScanner implements RegionScanner {
 
             Cell cell = results.get(0);
             tuple.setUnderlying(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
-            if (filter != null && filter.evaluate(tuple) == false)
-                continue;
 
-            CoprocessorProjector.AggrKey aggKey = projector.getAggrKey(results);
-            MeasureAggregator[] bufs = aggCache.getBuffer(aggKey);
-            aggregators.aggregate(bufs, results);
+            if (behavior.ordinal() >= ObserverBehavior.SCAN_FILTER.ordinal()) {
+                if (filter != null && filter.evaluate(tuple) == false)
+                    continue;
 
-            aggCache.checkMemoryUsage();
+                if (behavior.ordinal() >= ObserverBehavior.SCAN_FILTER_AGGR.ordinal()) {
+                    CoprocessorProjector.AggrKey aggKey = projector.getAggrKey(results);
+                    MeasureAggregator[] bufs = aggCache.getBuffer(aggKey);
+                    aggregators.aggregate(bufs, results);
+
+                    aggCache.checkMemoryUsage();
+                }
+            }
         }
         return aggCache;
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ad4a28af/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverBehavior.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverBehavior.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverBehavior.java
new file mode 100644
index 0000000..62c5bac
--- /dev/null
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverBehavior.java
@@ -0,0 +1,9 @@
+package org.apache.kylin.storage.hbase.coprocessor.observer;
+
+/**
+ */
+public enum ObserverBehavior {
+    SCAN, //only scan data, used for profiling tuple scan speed
+    SCAN_FILTER, //only scan+filter used,used for profiling filter speed
+    SCAN_FILTER_AGGR, //default normal behavior
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ad4a28af/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java
index eff0eee..ce9ba9d 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java
@@ -18,32 +18,31 @@
 
 package org.apache.kylin.storage.hbase.coprocessor.observer;
 
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.kylin.storage.hbase.coprocessor.CoprocessorFilter;
-import org.apache.kylin.storage.hbase.coprocessor.CoprocessorProjector;
-import org.apache.kylin.storage.hbase.coprocessor.CoprocessorRowType;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
-import org.apache.kylin.storage.StorageContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.kv.RowValueDecoder;
-import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.storage.StorageContext;
 import org.apache.kylin.storage.hbase.RegionScannerAdapter;
 import org.apache.kylin.storage.hbase.ResultScannerAdapter;
+import org.apache.kylin.storage.hbase.coprocessor.CoprocessorFilter;
+import org.apache.kylin.storage.hbase.coprocessor.CoprocessorProjector;
+import org.apache.kylin.storage.hbase.coprocessor.CoprocessorRowType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
 
 /**
  * @author yangli9
@@ -70,10 +69,11 @@ public class ObserverEnabler {
 
         if (DEBUG_LOCAL_COPROCESSOR) {
             RegionScanner innerScanner = new RegionScannerAdapter(table.getScanner(scan));
-            AggregationScanner aggrScanner = new AggregationScanner(type, filter, projector, aggrs, innerScanner);
+            AggregationScanner aggrScanner = new AggregationScanner(type, filter, projector, aggrs, innerScanner, ObserverBehavior.SCAN_FILTER_AGGR);
             return new ResultScannerAdapter(aggrScanner);
         } else {
             scan.setAttribute(AggregateRegionObserver.COPROCESSOR_ENABLE, new byte[] { 0x01 });
+            scan.setAttribute(AggregateRegionObserver.BEHAVIOR, ObserverBehavior.SCAN_FILTER_AGGR.toString().getBytes());
             scan.setAttribute(AggregateRegionObserver.TYPE, CoprocessorRowType.serialize(type));
             scan.setAttribute(AggregateRegionObserver.PROJECTOR, CoprocessorProjector.serialize(projector));
             scan.setAttribute(AggregateRegionObserver.AGGREGATORS, ObserverAggregators.serialize(aggrs));
@@ -90,7 +90,6 @@ public class ObserverEnabler {
 
     private static boolean isCoprocessorBeneficial(CubeInstance cube, Collection<TblColRef> groupBy, Collection<RowValueDecoder> rowValueDecoders, StorageContext context) {
 
-
         String forceFlag = System.getProperty(FORCE_COPROCESSOR);
         if (forceFlag != null) {
             return Boolean.parseBoolean(forceFlag);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ad4a28af/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserverTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserverTest.java b/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserverTest.java
index 6c789d3..9bed0ca 100644
--- a/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserverTest.java
+++ b/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserverTest.java
@@ -18,36 +18,36 @@
 
 package org.apache.kylin.storage.hbase.coprocessor.observer;
 
-import static org.junit.Assert.*;
-
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-
-import org.apache.kylin.metadata.model.ColumnDesc;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.storage.hbase.coprocessor.CoprocessorFilter;
-import org.apache.kylin.storage.hbase.coprocessor.CoprocessorProjector;
-import org.apache.kylin.storage.hbase.coprocessor.CoprocessorRowType;
+import com.google.common.collect.Lists;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValue.Type;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
-import org.apache.kylin.common.util.Bytes;
 import org.apache.hadoop.io.LongWritable;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.metadata.model.ColumnDesc;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.storage.hbase.coprocessor.CoprocessorFilter;
+import org.apache.kylin.storage.hbase.coprocessor.CoprocessorProjector;
+import org.apache.kylin.storage.hbase.coprocessor.CoprocessorRowType;
+import org.apache.kylin.storage.hbase.coprocessor.observer.ObserverAggregators.HCol;
 import org.junit.Before;
 import org.junit.Test;
 
-import com.google.common.collect.Lists;
-import org.apache.kylin.cube.kv.RowConstants;
-import org.apache.kylin.storage.hbase.coprocessor.observer.ObserverAggregators.HCol;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 /**
  * @author yangli9
@@ -122,7 +122,7 @@ public class AggregateRegionObserverTest {
 
         MockupRegionScanner innerScanner = new MockupRegionScanner(cellsInput);
 
-        RegionScanner aggrScanner = new AggregationScanner(rowType, filter, projector, aggregators, innerScanner);
+        RegionScanner aggrScanner = new AggregationScanner(rowType, filter, projector, aggregators, innerScanner, ObserverBehavior.SCAN_FILTER_AGGR);
         ArrayList<Cell> result = Lists.newArrayList();
         boolean hasMore = true;
         while (hasMore) {
@@ -171,7 +171,7 @@ public class AggregateRegionObserverTest {
 
         MockupRegionScanner innerScanner = new MockupRegionScanner(cellsInput);
 
-        RegionScanner aggrScanner = new AggregationScanner(rowType, filter, projector, aggregators, innerScanner);
+        RegionScanner aggrScanner = new AggregationScanner(rowType, filter, projector, aggregators, innerScanner,ObserverBehavior.SCAN_FILTER_AGGR);
         ArrayList<Cell> result = Lists.newArrayList();
         boolean hasMore = true;
         while (hasMore) {