You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/07/11 01:26:49 UTC
[03/50] [abbrv] incubator-kylin git commit: KYLIN-844 control
observer behavior
KYLIN-844 control observer behavior
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/ad4a28af
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/ad4a28af
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/ad4a28af
Branch: refs/heads/0.7
Commit: ad4a28af5e07fb75ec5d9a5eda11cd7409b033b1
Parents: 198be9e
Author: honma <ho...@ebay.com>
Authored: Fri Jun 19 18:29:59 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Wed Jun 24 12:02:25 2015 +0800
----------------------------------------------------------------------
.../observer/AggregateRegionObserver.java | 17 +++++---
.../observer/AggregationScanner.java | 37 +++++++++-------
.../coprocessor/observer/ObserverBehavior.java | 9 ++++
.../coprocessor/observer/ObserverEnabler.java | 33 +++++++--------
.../observer/AggregateRegionObserverTest.java | 44 ++++++++++----------
5 files changed, 80 insertions(+), 60 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ad4a28af/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserver.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserver.java
index d893621..aea2cea 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserver.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserver.java
@@ -18,11 +18,6 @@
package org.apache.kylin.storage.hbase.coprocessor.observer;
-import java.io.IOException;
-
-import org.apache.kylin.storage.hbase.coprocessor.CoprocessorFilter;
-import org.apache.kylin.storage.hbase.coprocessor.CoprocessorProjector;
-import org.apache.kylin.storage.hbase.coprocessor.CoprocessorRowType;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.client.Scan;
@@ -32,6 +27,11 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.kylin.storage.hbase.coprocessor.CoprocessorFilter;
+import org.apache.kylin.storage.hbase.coprocessor.CoprocessorProjector;
+import org.apache.kylin.storage.hbase.coprocessor.CoprocessorRowType;
+
+import java.io.IOException;
/**
* @author yangli9
@@ -46,6 +46,8 @@ public class AggregateRegionObserver extends BaseRegionObserver {
static final String PROJECTOR = "_Projector";
static final String AGGREGATORS = "_Aggregators";
static final String FILTER = "_Filter";
+ static final String BEHAVIOR = "_Behavior";
+
@Override
public final RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> ctxt, final Scan scan, final RegionScanner innerScanner) throws IOException {
@@ -83,6 +85,9 @@ public class AggregateRegionObserver extends BaseRegionObserver {
byte[] filterBytes = scan.getAttribute(FILTER);
CoprocessorFilter filter = CoprocessorFilter.deserialize(filterBytes);
+ byte[] behavior = scan.getAttribute(BEHAVIOR);
+ ObserverBehavior observerBehavior = ObserverBehavior.valueOf(new String(behavior));
+
// start/end region operation & sync on scanner is suggested by the
// javadoc of RegionScanner.nextRaw()
// FIXME: will the lock still work when a iterator is returned? is it safe? Is readonly attribute helping here? by mhb
@@ -90,7 +95,7 @@ public class AggregateRegionObserver extends BaseRegionObserver {
region.startRegionOperation();
try {
synchronized (innerScanner) {
- return new AggregationScanner(type, filter, projector, aggregators, innerScanner);
+ return new AggregationScanner(type, filter, projector, aggregators, innerScanner,observerBehavior);
}
} finally {
region.closeRegionOperation();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ad4a28af/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregationScanner.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregationScanner.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregationScanner.java
index 6bdb53a..85f3f6e 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregationScanner.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregationScanner.java
@@ -18,18 +18,17 @@
package org.apache.kylin.storage.hbase.coprocessor.observer;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.kylin.storage.hbase.coprocessor.CoprocessorFilter;
-import org.apache.kylin.storage.hbase.coprocessor.CoprocessorProjector;
-import org.apache.kylin.storage.hbase.coprocessor.CoprocessorRowType;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
-
import org.apache.kylin.metadata.measure.MeasureAggregator;
+import org.apache.kylin.storage.hbase.coprocessor.CoprocessorFilter;
+import org.apache.kylin.storage.hbase.coprocessor.CoprocessorProjector;
+import org.apache.kylin.storage.hbase.coprocessor.CoprocessorRowType;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
/**
* @author yangli9
@@ -38,11 +37,14 @@ import org.apache.kylin.metadata.measure.MeasureAggregator;
public class AggregationScanner implements RegionScanner {
private RegionScanner outerScanner;
+ private ObserverBehavior behavior;
- public AggregationScanner(CoprocessorRowType type, CoprocessorFilter filter, CoprocessorProjector groupBy, ObserverAggregators aggrs, RegionScanner innerScanner) throws IOException {
+ public AggregationScanner(CoprocessorRowType type, CoprocessorFilter filter, CoprocessorProjector groupBy, ObserverAggregators aggrs, RegionScanner innerScanner, ObserverBehavior behavior) throws IOException {
AggregateRegionObserver.LOG.info("Kylin Coprocessor start");
+ this.behavior = behavior;
+
ObserverAggregationCache aggCache;
Stats stats = new Stats();
@@ -72,14 +74,19 @@ public class AggregationScanner implements RegionScanner {
Cell cell = results.get(0);
tuple.setUnderlying(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
- if (filter != null && filter.evaluate(tuple) == false)
- continue;
- CoprocessorProjector.AggrKey aggKey = projector.getAggrKey(results);
- MeasureAggregator[] bufs = aggCache.getBuffer(aggKey);
- aggregators.aggregate(bufs, results);
+ if (behavior.ordinal() >= ObserverBehavior.SCAN_FILTER.ordinal()) {
+ if (filter != null && filter.evaluate(tuple) == false)
+ continue;
- aggCache.checkMemoryUsage();
+ if (behavior.ordinal() >= ObserverBehavior.SCAN_FILTER_AGGR.ordinal()) {
+ CoprocessorProjector.AggrKey aggKey = projector.getAggrKey(results);
+ MeasureAggregator[] bufs = aggCache.getBuffer(aggKey);
+ aggregators.aggregate(bufs, results);
+
+ aggCache.checkMemoryUsage();
+ }
+ }
}
return aggCache;
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ad4a28af/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverBehavior.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverBehavior.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverBehavior.java
new file mode 100644
index 0000000..62c5bac
--- /dev/null
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverBehavior.java
@@ -0,0 +1,9 @@
+package org.apache.kylin.storage.hbase.coprocessor.observer;
+
+/**
+ */
+public enum ObserverBehavior {
+ SCAN, //only scan data, used for profiling tuple scan speed
+ SCAN_FILTER, //only scan+filter used,used for profiling filter speed
+ SCAN_FILTER_AGGR, //default normal behavior
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ad4a28af/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java
index eff0eee..ce9ba9d 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java
@@ -18,32 +18,31 @@
package org.apache.kylin.storage.hbase.coprocessor.observer;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.kylin.storage.hbase.coprocessor.CoprocessorFilter;
-import org.apache.kylin.storage.hbase.coprocessor.CoprocessorProjector;
-import org.apache.kylin.storage.hbase.coprocessor.CoprocessorRowType;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
-import org.apache.kylin.storage.StorageContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.cube.kv.RowValueDecoder;
-import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.storage.StorageContext;
import org.apache.kylin.storage.hbase.RegionScannerAdapter;
import org.apache.kylin.storage.hbase.ResultScannerAdapter;
+import org.apache.kylin.storage.hbase.coprocessor.CoprocessorFilter;
+import org.apache.kylin.storage.hbase.coprocessor.CoprocessorProjector;
+import org.apache.kylin.storage.hbase.coprocessor.CoprocessorRowType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
/**
* @author yangli9
@@ -70,10 +69,11 @@ public class ObserverEnabler {
if (DEBUG_LOCAL_COPROCESSOR) {
RegionScanner innerScanner = new RegionScannerAdapter(table.getScanner(scan));
- AggregationScanner aggrScanner = new AggregationScanner(type, filter, projector, aggrs, innerScanner);
+ AggregationScanner aggrScanner = new AggregationScanner(type, filter, projector, aggrs, innerScanner, ObserverBehavior.SCAN_FILTER_AGGR);
return new ResultScannerAdapter(aggrScanner);
} else {
scan.setAttribute(AggregateRegionObserver.COPROCESSOR_ENABLE, new byte[] { 0x01 });
+ scan.setAttribute(AggregateRegionObserver.BEHAVIOR, ObserverBehavior.SCAN_FILTER_AGGR.toString().getBytes());
scan.setAttribute(AggregateRegionObserver.TYPE, CoprocessorRowType.serialize(type));
scan.setAttribute(AggregateRegionObserver.PROJECTOR, CoprocessorProjector.serialize(projector));
scan.setAttribute(AggregateRegionObserver.AGGREGATORS, ObserverAggregators.serialize(aggrs));
@@ -90,7 +90,6 @@ public class ObserverEnabler {
private static boolean isCoprocessorBeneficial(CubeInstance cube, Collection<TblColRef> groupBy, Collection<RowValueDecoder> rowValueDecoders, StorageContext context) {
-
String forceFlag = System.getProperty(FORCE_COPROCESSOR);
if (forceFlag != null) {
return Boolean.parseBoolean(forceFlag);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ad4a28af/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserverTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserverTest.java b/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserverTest.java
index 6c789d3..9bed0ca 100644
--- a/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserverTest.java
+++ b/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserverTest.java
@@ -18,36 +18,36 @@
package org.apache.kylin.storage.hbase.coprocessor.observer;
-import static org.junit.Assert.*;
-
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-
-import org.apache.kylin.metadata.model.ColumnDesc;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.storage.hbase.coprocessor.CoprocessorFilter;
-import org.apache.kylin.storage.hbase.coprocessor.CoprocessorProjector;
-import org.apache.kylin.storage.hbase.coprocessor.CoprocessorRowType;
+import com.google.common.collect.Lists;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
-import org.apache.kylin.common.util.Bytes;
import org.apache.hadoop.io.LongWritable;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.metadata.model.ColumnDesc;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.storage.hbase.coprocessor.CoprocessorFilter;
+import org.apache.kylin.storage.hbase.coprocessor.CoprocessorProjector;
+import org.apache.kylin.storage.hbase.coprocessor.CoprocessorRowType;
+import org.apache.kylin.storage.hbase.coprocessor.observer.ObserverAggregators.HCol;
import org.junit.Before;
import org.junit.Test;
-import com.google.common.collect.Lists;
-import org.apache.kylin.cube.kv.RowConstants;
-import org.apache.kylin.storage.hbase.coprocessor.observer.ObserverAggregators.HCol;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
/**
* @author yangli9
@@ -122,7 +122,7 @@ public class AggregateRegionObserverTest {
MockupRegionScanner innerScanner = new MockupRegionScanner(cellsInput);
- RegionScanner aggrScanner = new AggregationScanner(rowType, filter, projector, aggregators, innerScanner);
+ RegionScanner aggrScanner = new AggregationScanner(rowType, filter, projector, aggregators, innerScanner, ObserverBehavior.SCAN_FILTER_AGGR);
ArrayList<Cell> result = Lists.newArrayList();
boolean hasMore = true;
while (hasMore) {
@@ -171,7 +171,7 @@ public class AggregateRegionObserverTest {
MockupRegionScanner innerScanner = new MockupRegionScanner(cellsInput);
- RegionScanner aggrScanner = new AggregationScanner(rowType, filter, projector, aggregators, innerScanner);
+ RegionScanner aggrScanner = new AggregationScanner(rowType, filter, projector, aggregators, innerScanner,ObserverBehavior.SCAN_FILTER_AGGR);
ArrayList<Cell> result = Lists.newArrayList();
boolean hasMore = true;
while (hasMore) {