You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2015/06/25 10:04:15 UTC
[1/5] incubator-kylin git commit: KYLIN-844 control observer behavior
Repository: incubator-kylin
Updated Branches:
refs/heads/0.8 c1f252495 -> c2bb3a763
KYLIN-844 control observer behavior
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/8048fa4a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/8048fa4a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/8048fa4a
Branch: refs/heads/0.8
Commit: 8048fa4ab93e0dfbdaa3c773ab33ecc82ad20e7f
Parents: c1f2524
Author: honma <ho...@ebay.com>
Authored: Thu Jun 25 15:44:54 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Thu Jun 25 15:45:00 2015 +0800
----------------------------------------------------------------------
.../observer/AggregateRegionObserver.java | 17 +-
.../observer/AggregationScanner.java | 333 ++++++++++---------
.../coprocessor/observer/ObserverBehavior.java | 9 +
.../coprocessor/observer/ObserverEnabler.java | 11 +-
.../observer/AggregateRegionObserverTest.java | 44 +--
5 files changed, 214 insertions(+), 200 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8048fa4a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserver.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserver.java
index d893621..aea2cea 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserver.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserver.java
@@ -18,11 +18,6 @@
package org.apache.kylin.storage.hbase.coprocessor.observer;
-import java.io.IOException;
-
-import org.apache.kylin.storage.hbase.coprocessor.CoprocessorFilter;
-import org.apache.kylin.storage.hbase.coprocessor.CoprocessorProjector;
-import org.apache.kylin.storage.hbase.coprocessor.CoprocessorRowType;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.client.Scan;
@@ -32,6 +27,11 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.kylin.storage.hbase.coprocessor.CoprocessorFilter;
+import org.apache.kylin.storage.hbase.coprocessor.CoprocessorProjector;
+import org.apache.kylin.storage.hbase.coprocessor.CoprocessorRowType;
+
+import java.io.IOException;
/**
* @author yangli9
@@ -46,6 +46,8 @@ public class AggregateRegionObserver extends BaseRegionObserver {
static final String PROJECTOR = "_Projector";
static final String AGGREGATORS = "_Aggregators";
static final String FILTER = "_Filter";
+ static final String BEHAVIOR = "_Behavior";
+
@Override
public final RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> ctxt, final Scan scan, final RegionScanner innerScanner) throws IOException {
@@ -83,6 +85,9 @@ public class AggregateRegionObserver extends BaseRegionObserver {
byte[] filterBytes = scan.getAttribute(FILTER);
CoprocessorFilter filter = CoprocessorFilter.deserialize(filterBytes);
+ byte[] behavior = scan.getAttribute(BEHAVIOR);
+ ObserverBehavior observerBehavior = ObserverBehavior.valueOf(new String(behavior));
+
// start/end region operation & sync on scanner is suggested by the
// javadoc of RegionScanner.nextRaw()
// FIXME: will the lock still work when a iterator is returned? is it safe? Is readonly attribute helping here? by mhb
@@ -90,7 +95,7 @@ public class AggregateRegionObserver extends BaseRegionObserver {
region.startRegionOperation();
try {
synchronized (innerScanner) {
- return new AggregationScanner(type, filter, projector, aggregators, innerScanner);
+ return new AggregationScanner(type, filter, projector, aggregators, innerScanner,observerBehavior);
}
} finally {
region.closeRegionOperation();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8048fa4a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregationScanner.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregationScanner.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregationScanner.java
index d9ddeda..e72444e 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregationScanner.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregationScanner.java
@@ -1,163 +1,170 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.storage.hbase.coprocessor.observer;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.kylin.storage.hbase.coprocessor.AggrKey;
-import org.apache.kylin.storage.hbase.coprocessor.CoprocessorFilter;
-import org.apache.kylin.storage.hbase.coprocessor.CoprocessorProjector;
-import org.apache.kylin.storage.hbase.coprocessor.CoprocessorRowType;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.regionserver.RegionScanner;
-
-import org.apache.kylin.metadata.measure.MeasureAggregator;
-
-/**
- * @author yangli9
- *
- */
-public class AggregationScanner implements RegionScanner {
-
- private RegionScanner outerScanner;
-
- public AggregationScanner(CoprocessorRowType type, CoprocessorFilter filter, CoprocessorProjector groupBy, ObserverAggregators aggrs, RegionScanner innerScanner) throws IOException {
-
- AggregateRegionObserver.LOG.info("Kylin Coprocessor start");
-
- ObserverAggregationCache aggCache;
- Stats stats = new Stats();
-
- aggCache = buildAggrCache(innerScanner, type, groupBy, aggrs, filter, stats);
- stats.countOutputRow(aggCache.getSize());
- this.outerScanner = aggCache.getScanner(innerScanner);
-
- AggregateRegionObserver.LOG.info("Kylin Coprocessor aggregation done: " + stats);
- }
-
- @SuppressWarnings("rawtypes")
- ObserverAggregationCache buildAggrCache(final RegionScanner innerScanner, CoprocessorRowType type, CoprocessorProjector projector, ObserverAggregators aggregators, CoprocessorFilter filter, Stats stats) throws IOException {
-
- ObserverAggregationCache aggCache = new ObserverAggregationCache(aggregators);
-
- ObserverTuple tuple = new ObserverTuple(type);
- boolean hasMore = true;
- List<Cell> results = new ArrayList<Cell>();
- while (hasMore) {
- results.clear();
- hasMore = innerScanner.nextRaw(results);
- if (results.isEmpty())
- continue;
-
- if (stats != null)
- stats.countInputRow(results);
-
- Cell cell = results.get(0);
- tuple.setUnderlying(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
- if (filter != null && filter.evaluate(tuple) == false)
- continue;
-
- AggrKey aggKey = projector.getAggrKey(results);
- MeasureAggregator[] bufs = aggCache.getBuffer(aggKey);
- aggregators.aggregate(bufs, results);
-
- aggCache.checkMemoryUsage();
- }
- return aggCache;
- }
-
- @Override
- public boolean next(List<Cell> results) throws IOException {
- return outerScanner.next(results);
- }
-
- @Override
- public boolean next(List<Cell> result, int limit) throws IOException {
- return outerScanner.next(result, limit);
- }
-
- @Override
- public boolean nextRaw(List<Cell> result) throws IOException {
- return outerScanner.nextRaw(result);
- }
-
- @Override
- public boolean nextRaw(List<Cell> result, int limit) throws IOException {
- return outerScanner.nextRaw(result, limit);
- }
-
- @Override
- public void close() throws IOException {
- outerScanner.close();
- }
-
- @Override
- public HRegionInfo getRegionInfo() {
- return outerScanner.getRegionInfo();
- }
-
- @Override
- public boolean isFilterDone() throws IOException {
- return outerScanner.isFilterDone();
- }
-
- @Override
- public boolean reseek(byte[] row) throws IOException {
- return outerScanner.reseek(row);
- }
-
- @Override
- public long getMaxResultSize() {
- return outerScanner.getMaxResultSize();
- }
-
- @Override
- public long getMvccReadPoint() {
- return outerScanner.getMvccReadPoint();
- }
-
- private static class Stats {
- long inputRows = 0;
- long inputBytes = 0;
- long outputRows = 0;
-
- // have no outputBytes because that requires actual serialize all the
- // aggregator buffers
-
- public void countInputRow(List<Cell> row) {
- inputRows++;
- inputBytes += row.get(0).getRowLength();
- for (int i = 0, n = row.size(); i < n; i++) {
- inputBytes += row.get(i).getValueLength();
- }
- }
-
- public void countOutputRow(long rowCount) {
- outputRows += rowCount;
- }
-
- public String toString() {
- double percent = (double) outputRows / inputRows * 100;
- return Math.round(percent) + "% = " + outputRows + " (out rows) / " + inputRows + " (in rows); in bytes = " + inputBytes + "; est. out bytes = " + Math.round(inputBytes * percent / 100);
- }
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.coprocessor.observer;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.kylin.metadata.measure.MeasureAggregator;
+import org.apache.kylin.storage.hbase.coprocessor.AggrKey;
+import org.apache.kylin.storage.hbase.coprocessor.CoprocessorFilter;
+import org.apache.kylin.storage.hbase.coprocessor.CoprocessorProjector;
+import org.apache.kylin.storage.hbase.coprocessor.CoprocessorRowType;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * @author yangli9
+ *
+ */
+public class AggregationScanner implements RegionScanner {
+
+ private RegionScanner outerScanner;
+ private ObserverBehavior behavior;
+
+ public AggregationScanner(CoprocessorRowType type, CoprocessorFilter filter, CoprocessorProjector groupBy, ObserverAggregators aggrs, RegionScanner innerScanner, ObserverBehavior behavior) throws IOException {
+
+ AggregateRegionObserver.LOG.info("Kylin Coprocessor start");
+
+ this.behavior = behavior;
+
+ ObserverAggregationCache aggCache;
+ Stats stats = new Stats();
+
+ aggCache = buildAggrCache(innerScanner, type, groupBy, aggrs, filter, stats);
+ stats.countOutputRow(aggCache.getSize());
+ this.outerScanner = aggCache.getScanner(innerScanner);
+
+ AggregateRegionObserver.LOG.info("Kylin Coprocessor aggregation done: " + stats);
+ }
+
+ @SuppressWarnings("rawtypes")
+ ObserverAggregationCache buildAggrCache(final RegionScanner innerScanner, CoprocessorRowType type, CoprocessorProjector projector, ObserverAggregators aggregators, CoprocessorFilter filter, Stats stats) throws IOException {
+
+ ObserverAggregationCache aggCache = new ObserverAggregationCache(aggregators);
+
+ ObserverTuple tuple = new ObserverTuple(type);
+ boolean hasMore = true;
+ List<Cell> results = new ArrayList<Cell>();
+ while (hasMore) {
+ results.clear();
+ hasMore = innerScanner.nextRaw(results);
+ if (results.isEmpty())
+ continue;
+
+ if (stats != null)
+ stats.countInputRow(results);
+
+ Cell cell = results.get(0);
+ tuple.setUnderlying(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
+
+ if (behavior.ordinal() >= ObserverBehavior.SCAN_FILTER.ordinal()) {
+ if (filter != null && filter.evaluate(tuple) == false)
+ continue;
+
+ if (behavior.ordinal() >= ObserverBehavior.SCAN_FILTER_AGGR.ordinal()) {
+ AggrKey aggKey = projector.getAggrKey(results);
+ MeasureAggregator[] bufs = aggCache.getBuffer(aggKey);
+ aggregators.aggregate(bufs, results);
+
+ aggCache.checkMemoryUsage();
+ }
+ }
+ }
+ return aggCache;
+ }
+
+ @Override
+ public boolean next(List<Cell> results) throws IOException {
+ return outerScanner.next(results);
+ }
+
+ @Override
+ public boolean next(List<Cell> result, int limit) throws IOException {
+ return outerScanner.next(result, limit);
+ }
+
+ @Override
+ public boolean nextRaw(List<Cell> result) throws IOException {
+ return outerScanner.nextRaw(result);
+ }
+
+ @Override
+ public boolean nextRaw(List<Cell> result, int limit) throws IOException {
+ return outerScanner.nextRaw(result, limit);
+ }
+
+ @Override
+ public void close() throws IOException {
+ outerScanner.close();
+ }
+
+ @Override
+ public HRegionInfo getRegionInfo() {
+ return outerScanner.getRegionInfo();
+ }
+
+ @Override
+ public boolean isFilterDone() throws IOException {
+ return outerScanner.isFilterDone();
+ }
+
+ @Override
+ public boolean reseek(byte[] row) throws IOException {
+ return outerScanner.reseek(row);
+ }
+
+ @Override
+ public long getMaxResultSize() {
+ return outerScanner.getMaxResultSize();
+ }
+
+ @Override
+ public long getMvccReadPoint() {
+ return outerScanner.getMvccReadPoint();
+ }
+
+ private static class Stats {
+ long inputRows = 0;
+ long inputBytes = 0;
+ long outputRows = 0;
+
+ // have no outputBytes because that requires actual serialize all the
+ // aggregator buffers
+
+ public void countInputRow(List<Cell> row) {
+ inputRows++;
+ inputBytes += row.get(0).getRowLength();
+ for (int i = 0, n = row.size(); i < n; i++) {
+ inputBytes += row.get(i).getValueLength();
+ }
+ }
+
+ public void countOutputRow(long rowCount) {
+ outputRows += rowCount;
+ }
+
+ public String toString() {
+ double percent = (double) outputRows / inputRows * 100;
+ return Math.round(percent) + "% = " + outputRows + " (out rows) / " + inputRows + " (in rows); in bytes = " + inputBytes + "; est. out bytes = " + Math.round(inputBytes * percent / 100);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8048fa4a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverBehavior.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverBehavior.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverBehavior.java
new file mode 100644
index 0000000..62c5bac
--- /dev/null
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverBehavior.java
@@ -0,0 +1,9 @@
+package org.apache.kylin.storage.hbase.coprocessor.observer;
+
+/**
+ */
+public enum ObserverBehavior {
+ SCAN, //only scan data, used for profiling tuple scan speed
+ SCAN_FILTER, //only scan+filter used,used for profiling filter speed
+ SCAN_FILTER_AGGR, //default normal behavior
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8048fa4a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java
index 420e4c4..1c94fe5 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java
@@ -24,14 +24,6 @@ import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
-import org.apache.kylin.storage.StorageContext;
-import org.apache.kylin.storage.hbase.coprocessor.FilterDecorator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.debug.BackdoorToggles;
import org.apache.kylin.cube.CubeInstance;
@@ -81,10 +73,11 @@ public class ObserverEnabler {
if (localCoprocessor) {
RegionScanner innerScanner = new RegionScannerAdapter(table.getScanner(scan));
- AggregationScanner aggrScanner = new AggregationScanner(type, filter, projector, aggrs, innerScanner);
+ AggregationScanner aggrScanner = new AggregationScanner(type, filter, projector, aggrs, innerScanner, ObserverBehavior.SCAN_FILTER_AGGR);
return new ResultScannerAdapter(aggrScanner);
} else {
scan.setAttribute(AggregateRegionObserver.COPROCESSOR_ENABLE, new byte[] { 0x01 });
+ scan.setAttribute(AggregateRegionObserver.BEHAVIOR, ObserverBehavior.SCAN_FILTER_AGGR.toString().getBytes());
scan.setAttribute(AggregateRegionObserver.TYPE, CoprocessorRowType.serialize(type));
scan.setAttribute(AggregateRegionObserver.PROJECTOR, CoprocessorProjector.serialize(projector));
scan.setAttribute(AggregateRegionObserver.AGGREGATORS, ObserverAggregators.serialize(aggrs));
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8048fa4a/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserverTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserverTest.java b/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserverTest.java
index b6ee99a..b9f1f6a 100644
--- a/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserverTest.java
+++ b/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserverTest.java
@@ -18,36 +18,36 @@
package org.apache.kylin.storage.hbase.coprocessor.observer;
-import static org.junit.Assert.*;
-
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-
-import org.apache.kylin.metadata.model.ColumnDesc;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.storage.hbase.coprocessor.CoprocessorFilter;
-import org.apache.kylin.storage.hbase.coprocessor.CoprocessorProjector;
-import org.apache.kylin.storage.hbase.coprocessor.CoprocessorRowType;
+import com.google.common.collect.Lists;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
-import org.apache.kylin.common.util.Bytes;
import org.apache.hadoop.io.LongWritable;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.metadata.model.ColumnDesc;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.storage.hbase.coprocessor.CoprocessorFilter;
+import org.apache.kylin.storage.hbase.coprocessor.CoprocessorProjector;
+import org.apache.kylin.storage.hbase.coprocessor.CoprocessorRowType;
+import org.apache.kylin.storage.hbase.coprocessor.observer.ObserverAggregators.HCol;
import org.junit.Before;
import org.junit.Test;
-import com.google.common.collect.Lists;
-import org.apache.kylin.cube.kv.RowConstants;
-import org.apache.kylin.storage.hbase.coprocessor.observer.ObserverAggregators.HCol;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
/**
* @author yangli9
@@ -122,7 +122,7 @@ public class AggregateRegionObserverTest {
MockupRegionScanner innerScanner = new MockupRegionScanner(cellsInput);
- RegionScanner aggrScanner = new AggregationScanner(rowType, filter, projector, aggregators, innerScanner);
+ RegionScanner aggrScanner = new AggregationScanner(rowType, filter, projector, aggregators, innerScanner, ObserverBehavior.SCAN_FILTER_AGGR);
ArrayList<Cell> result = Lists.newArrayList();
boolean hasMore = true;
while (hasMore) {
@@ -171,7 +171,7 @@ public class AggregateRegionObserverTest {
MockupRegionScanner innerScanner = new MockupRegionScanner(cellsInput);
- RegionScanner aggrScanner = new AggregationScanner(rowType, filter, projector, aggregators, innerScanner);
+ RegionScanner aggrScanner = new AggregationScanner(rowType, filter, projector, aggregators, innerScanner,ObserverBehavior.SCAN_FILTER_AGGR);
ArrayList<Cell> result = Lists.newArrayList();
boolean hasMore = true;
while (hasMore) {
[5/5] incubator-kylin git commit: KYLIN-860 add
ObserverBehavior.SCAN_FILTER_AGGR_CHECKMEM
Posted by ma...@apache.org.
KYLIN-860 add ObserverBehavior.SCAN_FILTER_AGGR_CHECKMEM
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/c2bb3a76
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/c2bb3a76
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/c2bb3a76
Branch: refs/heads/0.8
Commit: c2bb3a763aa9a826b15aff027e7b4c8aa36fa653
Parents: b8962c1
Author: honma <ho...@ebay.com>
Authored: Thu Jun 25 16:03:54 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Thu Jun 25 16:03:54 2015 +0800
----------------------------------------------------------------------
.../hbase/coprocessor/observer/AggregateRegionObserver.java | 2 +-
.../storage/hbase/coprocessor/observer/AggregationScanner.java | 4 +++-
.../storage/hbase/coprocessor/observer/ObserverBehavior.java | 3 ++-
.../storage/hbase/coprocessor/observer/ObserverEnabler.java | 4 ++--
.../hbase/coprocessor/observer/AggregateRegionObserverTest.java | 4 ++--
5 files changed, 10 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c2bb3a76/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserver.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserver.java
index 8771bf4..cf5b8d1 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserver.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserver.java
@@ -84,7 +84,7 @@ public class AggregateRegionObserver extends BaseRegionObserver {
byte[] filterBytes = scan.getAttribute(FILTER);
CoprocessorFilter filter = CoprocessorFilter.deserialize(filterBytes);
- ObserverBehavior observerBehavior = ObserverBehavior.SCAN_FILTER_AGGR;
+ ObserverBehavior observerBehavior = ObserverBehavior.SCAN_FILTER_AGGR_CHECKMEM;
byte[] behavior = scan.getAttribute(BEHAVIOR);
if (behavior != null && behavior.length != 0) {
observerBehavior = ObserverBehavior.valueOf(new String(behavior));
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c2bb3a76/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregationScanner.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregationScanner.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregationScanner.java
index e72444e..30c590b 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregationScanner.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregationScanner.java
@@ -85,7 +85,9 @@ public class AggregationScanner implements RegionScanner {
MeasureAggregator[] bufs = aggCache.getBuffer(aggKey);
aggregators.aggregate(bufs, results);
- aggCache.checkMemoryUsage();
+ if (behavior.ordinal() >= ObserverBehavior.SCAN_FILTER_AGGR_CHECKMEM.ordinal()) {
+ aggCache.checkMemoryUsage();
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c2bb3a76/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverBehavior.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverBehavior.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverBehavior.java
index 62c5bac..ff6060d 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverBehavior.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverBehavior.java
@@ -5,5 +5,6 @@ package org.apache.kylin.storage.hbase.coprocessor.observer;
public enum ObserverBehavior {
SCAN, //only scan data, used for profiling tuple scan speed
SCAN_FILTER, //only scan+filter used,used for profiling filter speed
- SCAN_FILTER_AGGR, //default normal behavior
+ SCAN_FILTER_AGGR, //aggregate the result
+ SCAN_FILTER_AGGR_CHECKMEM, //default full operations
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c2bb3a76/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java
index d9933d6..c1ef5f8 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java
@@ -73,14 +73,14 @@ public class ObserverEnabler {
if (localCoprocessor) {
RegionScanner innerScanner = new RegionScannerAdapter(table.getScanner(scan));
- AggregationScanner aggrScanner = new AggregationScanner(type, filter, projector, aggrs, innerScanner, ObserverBehavior.SCAN_FILTER_AGGR);
+ AggregationScanner aggrScanner = new AggregationScanner(type, filter, projector, aggrs, innerScanner, ObserverBehavior.SCAN_FILTER_AGGR_CHECKMEM);
return new ResultScannerAdapter(aggrScanner);
} else {
// debug/profiling purpose
String toggle = BackdoorToggles.getObserverBehavior();
if (toggle == null) {
- toggle = ObserverBehavior.SCAN_FILTER_AGGR.toString(); //default behavior
+ toggle = ObserverBehavior.SCAN_FILTER_AGGR_CHECKMEM.toString(); //default behavior
} else {
logger.info("The execution of this query will use " + toggle + " as observer's behavior");
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c2bb3a76/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserverTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserverTest.java b/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserverTest.java
index b9f1f6a..ba09b51 100644
--- a/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserverTest.java
+++ b/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserverTest.java
@@ -122,7 +122,7 @@ public class AggregateRegionObserverTest {
MockupRegionScanner innerScanner = new MockupRegionScanner(cellsInput);
- RegionScanner aggrScanner = new AggregationScanner(rowType, filter, projector, aggregators, innerScanner, ObserverBehavior.SCAN_FILTER_AGGR);
+ RegionScanner aggrScanner = new AggregationScanner(rowType, filter, projector, aggregators, innerScanner, ObserverBehavior.SCAN_FILTER_AGGR_CHECKMEM);
ArrayList<Cell> result = Lists.newArrayList();
boolean hasMore = true;
while (hasMore) {
@@ -171,7 +171,7 @@ public class AggregateRegionObserverTest {
MockupRegionScanner innerScanner = new MockupRegionScanner(cellsInput);
- RegionScanner aggrScanner = new AggregationScanner(rowType, filter, projector, aggregators, innerScanner,ObserverBehavior.SCAN_FILTER_AGGR);
+ RegionScanner aggrScanner = new AggregationScanner(rowType, filter, projector, aggregators, innerScanner,ObserverBehavior.SCAN_FILTER_AGGR_CHECKMEM);
ArrayList<Cell> result = Lists.newArrayList();
boolean hasMore = true;
while (hasMore) {
[4/5] incubator-kylin git commit: bug fix: after cherry pick
Posted by ma...@apache.org.
bug fix: after cherry pick
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/b8962c15
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/b8962c15
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/b8962c15
Branch: refs/heads/0.8
Commit: b8962c150fbfd40f0f347c92d7c5459c81b1d362
Parents: dfe3a50
Author: honma <ho...@ebay.com>
Authored: Thu Jun 25 15:55:25 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Thu Jun 25 15:55:25 2015 +0800
----------------------------------------------------------------------
.../apache/kylin/rest/controller/QueryController.java | 3 +++
.../org/apache/kylin/storage/hbase/HBaseKeyRange.java | 11 ++++-------
.../hbase/coprocessor/observer/ObserverEnabler.java | 9 +++++----
3 files changed, 12 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b8962c15/server/src/main/java/org/apache/kylin/rest/controller/QueryController.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/controller/QueryController.java b/server/src/main/java/org/apache/kylin/rest/controller/QueryController.java
index 55e3946..5f4b985 100644
--- a/server/src/main/java/org/apache/kylin/rest/controller/QueryController.java
+++ b/server/src/main/java/org/apache/kylin/rest/controller/QueryController.java
@@ -40,6 +40,8 @@ import org.apache.kylin.rest.request.SaveSqlRequest;
import org.apache.kylin.rest.response.SQLResponse;
import org.apache.kylin.rest.service.QueryService;
import org.apache.kylin.rest.util.QueryUtil;
+import org.apache.kylin.storage.cache.AbstractCacheFledgedStorageEngine;
+import org.apache.kylin.storage.hbase.ScanOutOfLimitException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -51,6 +53,7 @@ import org.supercsv.io.CsvListWriter;
import org.supercsv.io.ICsvListWriter;
import org.supercsv.prefs.CsvPreference;
+import javax.annotation.PostConstruct;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.sql.SQLException;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b8962c15/storage/src/main/java/org/apache/kylin/storage/hbase/HBaseKeyRange.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/HBaseKeyRange.java b/storage/src/main/java/org/apache/kylin/storage/hbase/HBaseKeyRange.java
index d2853bc..51789ce 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/HBaseKeyRange.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/HBaseKeyRange.java
@@ -158,13 +158,10 @@ public class HBaseKeyRange implements Comparable<HBaseKeyRange> {
private List<Pair<byte[], byte[]>> buildFuzzyKeys(Map<TblColRef, Set<String>> fuzzyValueSet) {
ArrayList<Pair<byte[], byte[]>> result = new ArrayList<Pair<byte[], byte[]>>();
- //debug/profiling purpose
- String toggle;
- if ((toggle = BackdoorToggles.getToggle(BackdoorToggles.DEBUG_TOGGLE_DISABLE_FUZZY_KEY)) != null) {
- if (Boolean.valueOf(toggle)) {
- logger.info("The execution of this query will not use fuzzy key");
- return result;
- }
+ // debug/profiling purpose
+ if (BackdoorToggles.getDisableFuzzyKey()) {
+ logger.info("The execution of this query will not use fuzzy key");
+ return result;
}
FuzzyKeyEncoder fuzzyKeyEncoder = new FuzzyKeyEncoder(cubeSeg, cuboid);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b8962c15/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java
index 25b198a..d9933d6 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.debug.BackdoorToggles;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeSegment;
@@ -76,10 +77,10 @@ public class ObserverEnabler {
return new ResultScannerAdapter(aggrScanner);
} else {
- //debug/profiling purpose
- String toggle;
- if ((toggle = BackdoorToggles.getToggle(BackdoorToggles.DEBUG_TOGGLE_OBSERVER_BEHAVIOR)) == null) {
- toggle = ObserverBehavior.SCAN_FILTER_AGGR.toString();//default behavior
+ // debug/profiling purpose
+ String toggle = BackdoorToggles.getObserverBehavior();
+ if (toggle == null) {
+ toggle = ObserverBehavior.SCAN_FILTER_AGGR.toString(); //default behavior
} else {
logger.info("The execution of this query will use " + toggle + " as observer's behavior");
}
[2/5] incubator-kylin git commit: KYLIN-844 add BackdoorToggles to
control fuzzy key
Posted by ma...@apache.org.
KYLIN-844 add BackdoorToggles to control fuzzy key
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/2d4aea8d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/2d4aea8d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/2d4aea8d
Branch: refs/heads/0.8
Commit: 2d4aea8df378829508768b1a2c09335ae0e4d1dd
Parents: 8048fa4
Author: honma <ho...@ebay.com>
Authored: Thu Jun 25 15:50:16 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Thu Jun 25 15:50:16 2015 +0800
----------------------------------------------------------------------
.../org/apache/kylin/common/util/BytesUtil.java | 12 ++--
.../apache/kylin/common/util/BytesUtilTest.java | 10 +++
.../apache/kylin/job/tools/RowCounterCLI.java | 67 ++++++++++++++++++++
.../kylin/rest/controller/QueryController.java | 25 ++++++--
.../apache/kylin/rest/request/SQLRequest.java | 13 ++++
.../kylin/storage/hbase/HBaseKeyRange.java | 15 +++++
.../coprocessor/observer/ObserverEnabler.java | 12 +++-
7 files changed, 143 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2d4aea8d/common/src/main/java/org/apache/kylin/common/util/BytesUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/util/BytesUtil.java b/common/src/main/java/org/apache/kylin/common/util/BytesUtil.java
index bb1bab4..c9f1e08 100644
--- a/common/src/main/java/org/apache/kylin/common/util/BytesUtil.java
+++ b/common/src/main/java/org/apache/kylin/common/util/BytesUtil.java
@@ -18,16 +18,15 @@
package org.apache.kylin.common.util;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.Writable;
+
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.hadoop.io.Writable;
-
public class BytesUtil {
public static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
@@ -394,6 +393,10 @@ public class BytesUtil {
return toHex(array);
}
+
+ /**
+ * this method only works for hex strings
+ */
public static byte[] fromReadableText(String text) {
String[] tokens = text.split("\\\\x");
byte[] ret = new byte[tokens.length - 1];
@@ -422,6 +425,7 @@ public class BytesUtil {
return sb.toString();
}
+
public static void main(String[] args) throws Exception {
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2d4aea8d/common/src/test/java/org/apache/kylin/common/util/BytesUtilTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/kylin/common/util/BytesUtilTest.java b/common/src/test/java/org/apache/kylin/common/util/BytesUtilTest.java
index 6d554bb..7d4dea9 100644
--- a/common/src/test/java/org/apache/kylin/common/util/BytesUtilTest.java
+++ b/common/src/test/java/org/apache/kylin/common/util/BytesUtilTest.java
@@ -78,4 +78,14 @@ public class BytesUtilTest extends TestCase {
}
+
+ public void testReadable()
+ {
+ String x = "\\x00\\x00\\x00\\x00\\x00\\x01\\xFC\\xA8";
+ byte[] bytes = BytesUtil.fromReadableText(x);
+ String y = BytesUtil.toHex(bytes);
+ assertEquals(x,y);
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2d4aea8d/job/src/main/java/org/apache/kylin/job/tools/RowCounterCLI.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/tools/RowCounterCLI.java b/job/src/main/java/org/apache/kylin/job/tools/RowCounterCLI.java
new file mode 100644
index 0000000..b80063d
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/job/tools/RowCounterCLI.java
@@ -0,0 +1,67 @@
+package org.apache.kylin.job.tools;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.BytesUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ */
+public class RowCounterCLI {
+ private static final Logger logger = LoggerFactory.getLogger(RowCounterCLI.class);
+
+ public static void main(String[] args) throws IOException {
+
+ if (args == null || args.length != 3) {
+ System.out.println("Usage: hbase org.apache.hadoop.util.RunJar kylin-job-latest.jar org.apache.kylin.job.tools.RowCounterCLI [HTABLE_NAME] [STARTKEY] [ENDKEY]");
+ }
+
+ System.out.println(args[0]);
+ String htableName = args[0];
+ System.out.println(args[1]);
+ byte[] startKey = BytesUtil.fromReadableText(args[1]);
+ System.out.println(args[2]);
+ byte[] endKey = BytesUtil.fromReadableText(args[2]);
+
+ if (startKey == null) {
+ System.out.println("startkey is null ");
+ } else {
+ System.out.println("startkey lenght: " + startKey.length);
+ }
+
+ System.out.println("start key in binary: " + Bytes.toStringBinary(startKey));
+ System.out.println("end key in binary: " + Bytes.toStringBinary(endKey));
+
+ Configuration conf = HBaseConfiguration.create();
+
+ Scan scan = new Scan();
+ scan.setCaching(1024);
+ scan.setCacheBlocks(true);
+ scan.setStartRow(startKey);
+ scan.setStopRow(endKey);
+
+ logger.info("My Scan " + scan.toString());
+
+ HConnection conn = HConnectionManager.createConnection(conf);
+ HTableInterface tableInterface = conn.getTable(htableName);
+
+ Iterator<Result> iterator = tableInterface.getScanner(scan).iterator();
+ int counter = 0;
+ while (iterator.hasNext()) {
+ iterator.next();
+ counter++;
+ if (counter % 1000 == 1) {
+ System.out.println("number of rows: " + counter);
+ }
+ }
+ System.out.println("number of rows: " + counter);
+ tableInterface.close();
+ conn.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2d4aea8d/server/src/main/java/org/apache/kylin/rest/controller/QueryController.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/controller/QueryController.java b/server/src/main/java/org/apache/kylin/rest/controller/QueryController.java
index e3cfc95..55e3946 100644
--- a/server/src/main/java/org/apache/kylin/rest/controller/QueryController.java
+++ b/server/src/main/java/org/apache/kylin/rest/controller/QueryController.java
@@ -25,6 +25,7 @@ import net.sf.ehcache.CacheManager;
import net.sf.ehcache.Element;
import org.apache.commons.io.IOUtils;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.debug.BackdoorToggles;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.rest.constant.Constant;
import org.apache.kylin.rest.exception.ForbiddenException;
@@ -39,8 +40,6 @@ import org.apache.kylin.rest.request.SaveSqlRequest;
import org.apache.kylin.rest.response.SQLResponse;
import org.apache.kylin.rest.service.QueryService;
import org.apache.kylin.rest.util.QueryUtil;
-import org.apache.kylin.storage.cache.AbstractCacheFledgedStorageEngine;
-import org.apache.kylin.storage.hbase.ScanOutOfLimitException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -52,13 +51,13 @@ import org.supercsv.io.CsvListWriter;
import org.supercsv.io.ICsvListWriter;
import org.supercsv.prefs.CsvPreference;
-import javax.annotation.PostConstruct;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
+import java.util.Map;
/**
* Handle query requests.
@@ -88,13 +87,15 @@ public class QueryController extends BasicController {
@ResponseBody
@Timed(name = "query")
public SQLResponse query(@RequestBody SQLRequest sqlRequest) {
- long startTimestamp = System.currentTimeMillis();
+ initDebugToggles(sqlRequest);
+ long startTimestamp = System.currentTimeMillis();
SQLResponse response = doQuery(sqlRequest);
-
response.setDuration(System.currentTimeMillis() - startTimestamp);
queryService.logQuery(sqlRequest, response, new Date(startTimestamp), new Date(System.currentTimeMillis()));
+ cleanupDebugToggles();
+
return response;
}
@@ -257,4 +258,18 @@ public class QueryController extends BasicController {
this.cacheManager = cacheManager;
}
+ private void initDebugToggles(SQLRequest sqlRequest) {
+
+ Map<String, String> toggles = sqlRequest.getBackdoorToggles();
+ if (toggles == null || toggles.size() == 0) {
+ return;
+ }
+
+ BackdoorToggles.setToggles(toggles);
+ }
+
+ private void cleanupDebugToggles() {
+ BackdoorToggles.cleanToggles();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2d4aea8d/server/src/main/java/org/apache/kylin/rest/request/SQLRequest.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/request/SQLRequest.java b/server/src/main/java/org/apache/kylin/rest/request/SQLRequest.java
index a75ddfb..901badd 100644
--- a/server/src/main/java/org/apache/kylin/rest/request/SQLRequest.java
+++ b/server/src/main/java/org/apache/kylin/rest/request/SQLRequest.java
@@ -18,6 +18,8 @@
package org.apache.kylin.rest.request;
+import java.util.Map;
+
public class SQLRequest {
private String sql;
@@ -26,9 +28,20 @@ public class SQLRequest {
private Integer limit = 0;
private boolean acceptPartial = true;
+ private Map<String,String> backdoorToggles;
+
public SQLRequest() {
}
+ public Map<String, String> getBackdoorToggles() {
+ return backdoorToggles;
+ }
+
+ public void setBackdoorToggles(Map<String, String> backdoorToggles) {
+ this.backdoorToggles = backdoorToggles;
+ }
+
+
public String getProject() {
return project;
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2d4aea8d/storage/src/main/java/org/apache/kylin/storage/hbase/HBaseKeyRange.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/HBaseKeyRange.java b/storage/src/main/java/org/apache/kylin/storage/hbase/HBaseKeyRange.java
index 6d77fa2..d2853bc 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/HBaseKeyRange.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/HBaseKeyRange.java
@@ -20,6 +20,7 @@ package org.apache.kylin.storage.hbase;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import org.apache.kylin.common.debug.BackdoorToggles;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.common.util.DateFormat;
@@ -32,6 +33,8 @@ import org.apache.kylin.cube.kv.FuzzyMaskEncoder;
import org.apache.kylin.cube.kv.RowConstants;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.metadata.model.TblColRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.*;
@@ -42,6 +45,8 @@ import java.util.*;
*/
public class HBaseKeyRange implements Comparable<HBaseKeyRange> {
+ private static final Logger logger = LoggerFactory.getLogger(HBaseKeyRange.class);
+
private static final int FUZZY_VALUE_CAP = 20;
private static final byte[] ZERO_TAIL_BYTES = new byte[] { 0 };
@@ -145,6 +150,7 @@ public class HBaseKeyRange implements Comparable<HBaseKeyRange> {
buf.append(BytesUtil.toHex(fuzzyKey.getFirst()));
buf.append(" ");
buf.append(BytesUtil.toHex(fuzzyKey.getSecond()));
+ buf.append(System.lineSeparator());
}
this.fuzzyKeyString = buf.toString();
}
@@ -152,6 +158,15 @@ public class HBaseKeyRange implements Comparable<HBaseKeyRange> {
private List<Pair<byte[], byte[]>> buildFuzzyKeys(Map<TblColRef, Set<String>> fuzzyValueSet) {
ArrayList<Pair<byte[], byte[]>> result = new ArrayList<Pair<byte[], byte[]>>();
+ //debug/profiling purpose
+ String toggle;
+ if ((toggle = BackdoorToggles.getToggle(BackdoorToggles.DEBUG_TOGGLE_DISABLE_FUZZY_KEY)) != null) {
+ if (Boolean.valueOf(toggle)) {
+ logger.info("The execution of this query will not use fuzzy key");
+ return result;
+ }
+ }
+
FuzzyKeyEncoder fuzzyKeyEncoder = new FuzzyKeyEncoder(cubeSeg, cuboid);
FuzzyMaskEncoder fuzzyMaskEncoder = new FuzzyMaskEncoder(cubeSeg, cuboid);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2d4aea8d/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java
index 1c94fe5..25b198a 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java
@@ -24,7 +24,6 @@ import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
-import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.debug.BackdoorToggles;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeSegment;
@@ -76,8 +75,17 @@ public class ObserverEnabler {
AggregationScanner aggrScanner = new AggregationScanner(type, filter, projector, aggrs, innerScanner, ObserverBehavior.SCAN_FILTER_AGGR);
return new ResultScannerAdapter(aggrScanner);
} else {
+
+ //debug/profiling purpose
+ String toggle;
+ if ((toggle = BackdoorToggles.getToggle(BackdoorToggles.DEBUG_TOGGLE_OBSERVER_BEHAVIOR)) == null) {
+ toggle = ObserverBehavior.SCAN_FILTER_AGGR.toString();//default behavior
+ } else {
+ logger.info("The execution of this query will use " + toggle + " as observer's behavior");
+ }
+
scan.setAttribute(AggregateRegionObserver.COPROCESSOR_ENABLE, new byte[] { 0x01 });
- scan.setAttribute(AggregateRegionObserver.BEHAVIOR, ObserverBehavior.SCAN_FILTER_AGGR.toString().getBytes());
+ scan.setAttribute(AggregateRegionObserver.BEHAVIOR, toggle.getBytes());
scan.setAttribute(AggregateRegionObserver.TYPE, CoprocessorRowType.serialize(type));
scan.setAttribute(AggregateRegionObserver.PROJECTOR, CoprocessorProjector.serialize(projector));
scan.setAttribute(AggregateRegionObserver.AGGREGATORS, ObserverAggregators.serialize(aggrs));
[3/5] incubator-kylin git commit: KYLIN-844 default observer behavior
if no attribute hint found
Posted by ma...@apache.org.
KYLIN-844 default observer behavior if no attribute hint found
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/dfe3a50a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/dfe3a50a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/dfe3a50a
Branch: refs/heads/0.8
Commit: dfe3a50abfd9dbae185a6a23f54c04cdd71d4c78
Parents: 2d4aea8
Author: honma <ho...@ebay.com>
Authored: Thu Jun 25 15:50:56 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Thu Jun 25 15:50:56 2015 +0800
----------------------------------------------------------------------
.../hbase/coprocessor/observer/AggregateRegionObserver.java | 8 +++++---
1 file changed, 5 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/dfe3a50a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserver.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserver.java
index aea2cea..8771bf4 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserver.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserver.java
@@ -48,7 +48,6 @@ public class AggregateRegionObserver extends BaseRegionObserver {
static final String FILTER = "_Filter";
static final String BEHAVIOR = "_Behavior";
-
@Override
public final RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> ctxt, final Scan scan, final RegionScanner innerScanner) throws IOException {
@@ -85,8 +84,11 @@ public class AggregateRegionObserver extends BaseRegionObserver {
byte[] filterBytes = scan.getAttribute(FILTER);
CoprocessorFilter filter = CoprocessorFilter.deserialize(filterBytes);
+ ObserverBehavior observerBehavior = ObserverBehavior.SCAN_FILTER_AGGR;
byte[] behavior = scan.getAttribute(BEHAVIOR);
- ObserverBehavior observerBehavior = ObserverBehavior.valueOf(new String(behavior));
+ if (behavior != null && behavior.length != 0) {
+ observerBehavior = ObserverBehavior.valueOf(new String(behavior));
+ }
// start/end region operation & sync on scanner is suggested by the
// javadoc of RegionScanner.nextRaw()
@@ -95,7 +97,7 @@ public class AggregateRegionObserver extends BaseRegionObserver {
region.startRegionOperation();
try {
synchronized (innerScanner) {
- return new AggregationScanner(type, filter, projector, aggregators, innerScanner,observerBehavior);
+ return new AggregationScanner(type, filter, projector, aggregators, innerScanner, observerBehavior);
}
} finally {
region.closeRegionOperation();