You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/09/14 08:50:14 UTC
[27/50] [abbrv] kylin git commit: KYLIN-1922 refactors
KYLIN-1922 refactors
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/4e8ed97d
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/4e8ed97d
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/4e8ed97d
Branch: refs/heads/KYLIN-1726
Commit: 4e8ed97d12c53c19d09f736be3baaa9112dcf413
Parents: a201c5b
Author: Hongbin Ma <ma...@apache.org>
Authored: Mon Sep 12 23:52:43 2016 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Mon Sep 12 23:53:48 2016 +0800
----------------------------------------------------------------------
.../org/apache/kylin/cube/CubeInstance.java | 10 +-
.../org/apache/kylin/cube/model/CubeDesc.java | 3 +-
.../apache/kylin/gridtable/GTScanRequest.java | 4 +-
.../metadata/realization/IRealization.java | 1 +
.../apache/kylin/storage/StorageContext.java | 34 ++++-
.../storage/gtrecord/CubeScanRangePlanner.java | 6 +-
.../gtrecord/GTCubeStorageQueryBase.java | 10 +-
.../gtrecord/SequentialCubeTupleIterator.java | 7 +-
.../gtrecord/StorageResponseGTScatter.java | 117 +++++++++++++++
.../kylin/storage/hybrid/HybridInstance.java | 5 +
.../apache/kylin/query/ITCombinationTest.java | 6 +-
.../apache/kylin/query/ITKylinQueryTest.java | 150 +++++++------------
.../org/apache/kylin/query/KylinTestBase.java | 56 +++++++
.../src/test/resources/query/sql/query45.sql | 23 +++
.../test/resources/query/sql_limit/query01.sql | 21 +++
.../test/resources/query/sql_limit/query02.sql | 24 +++
.../query/sql_optimize/enable-limit01.sql | 19 ---
.../resources/query/sql_timeout/query02.sql | 19 +++
.../rules/RemoveBlackoutRealizationsRule.java | 11 +-
.../hbase/cube/v2/CubeHBaseEndpointRPC.java | 3 +-
.../hbase/cube/v2/ExpectedSizeIterator.java | 2 +-
.../storage/hbase/cube/v2/GTBlobScatter.java | 150 -------------------
.../coprocessor/endpoint/CubeVisitService.java | 1 +
23 files changed, 393 insertions(+), 289 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/4e8ed97d/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
index 151e142..851b016 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
@@ -35,17 +35,17 @@ import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.metadata.realization.CapabilityResult;
-import org.apache.kylin.metadata.realization.CapabilityResult.CapabilityInfluence;
import org.apache.kylin.metadata.realization.IRealization;
import org.apache.kylin.metadata.realization.RealizationStatusEnum;
import org.apache.kylin.metadata.realization.RealizationType;
import org.apache.kylin.metadata.realization.SQLDigest;
+import org.apache.kylin.metadata.realization.CapabilityResult.CapabilityInfluence;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
-import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonManagedReference;
import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
import com.google.common.base.Objects;
import com.google.common.collect.Lists;
@@ -305,7 +305,6 @@ public class CubeInstance extends RootPersistentEntity implements IRealization,
return result;
}
-
public CubeSegment getSegment(String name, SegmentStatusEnum status) {
for (CubeSegment segment : segments) {
@@ -404,6 +403,11 @@ public class CubeInstance extends RootPersistentEntity implements IRealization,
return endTime;
}
+ @Override
+ public boolean supportsLimitPushDown() {
+ return getDescriptor().supportsLimitPushDown();
+ }
+
public int getRowKeyColumnCount() {
return getDescriptor().getRowkey().getRowKeyColumns().length;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/4e8ed97d/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
index 2d9945a..e6b3d3f 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
@@ -992,8 +992,7 @@ public class CubeDesc extends RootPersistentEntity implements IEngineAware {
}
public boolean supportsLimitPushDown() {
- //currently only ID_SHARDED_HBASE supports limit push down
- return getStorageType() == IStorageAware.ID_SHARDED_HBASE;
+ return getStorageType() != IStorageAware.ID_HBASE && getStorageType() != IStorageAware.ID_HYBRID;
}
public int getStorageType() {
http://git-wip-us.apache.org/repos/asf/kylin/blob/4e8ed97d/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
index 4f68806..dc90ed6 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
@@ -42,8 +42,10 @@ import com.google.common.collect.Sets;
public class GTScanRequest {
private static final Logger logger = LoggerFactory.getLogger(GTScanRequest.class);
+
//it's not necessary to increase the checkInterval to very large because the check cost is not high
- public static final int terminateCheckInterval = 1000;
+ //changing it might break org.apache.kylin.query.ITKylinQueryTest.testTimeoutQuery()
+ public static final int terminateCheckInterval = 100;
private GTInfo info;
private List<GTScanRange> ranges;
http://git-wip-us.apache.org/repos/asf/kylin/blob/4e8ed97d/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealization.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealization.java b/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealization.java
index fda05ce..040cdc5 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealization.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealization.java
@@ -57,4 +57,5 @@ public interface IRealization extends IStorageAware {
public long getDateRangeEnd();
+ public boolean supportsLimitPushDown();
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/4e8ed97d/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
index acb4960..cc39918 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
@@ -20,7 +20,11 @@ package org.apache.kylin.storage;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.metadata.realization.IRealization;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.common.collect.Range;
@@ -28,6 +32,7 @@ import com.google.common.collect.Range;
* @author xjiang
*/
public class StorageContext {
+ private static final Logger logger = LoggerFactory.getLogger(StorageContext.class);
public static final int DEFAULT_THRESHOLD = 1000000;
@@ -35,6 +40,7 @@ public class StorageContext {
private int threshold;
private int limit;
private int offset;
+ private int finalPushDownLimit;
private boolean hasSort;
private boolean acceptPartialResult;
@@ -62,6 +68,7 @@ public class StorageContext {
this.acceptPartialResult = false;
this.partialResultReturned = false;
+ this.finalPushDownLimit = Integer.MAX_VALUE;
}
public String getConnUrl() {
@@ -104,10 +111,33 @@ public class StorageContext {
return this.enableLimit;
}
- public int getStoragePushDownLimit() {
+ private int getStoragePushDownLimit() {
return this.isLimitEnabled() ? this.getOffset() + this.getLimit() : Integer.MAX_VALUE;
}
-
+
+ public int getFinalPushDownLimit() {
+ return finalPushDownLimit;
+ }
+
+ public void setFinalPushDownLimit(IRealization realization) {
+
+ //decide the final limit push down
+ int tempPushDownLimit = this.getStoragePushDownLimit();
+ if (tempPushDownLimit == Integer.MAX_VALUE) {
+ return;
+ }
+
+ int pushDownLimitMax = KylinConfig.getInstanceFromEnv().getStoragePushDownLimitMax();
+ if (!realization.supportsLimitPushDown()) {
+ logger.info("Not enabling limit push down because cube storage type not supported");
+ } else if (tempPushDownLimit > pushDownLimitMax) {
+ logger.info("Not enabling limit push down because the limit(including offset) {} is larger than kylin.query.pushdown.limit.max {}", //
+ tempPushDownLimit, pushDownLimitMax);
+ } else {
+ this.finalPushDownLimit = tempPushDownLimit;
+ }
+ }
+
public void markSort() {
this.hasSort = true;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/4e8ed97d/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
index 9f505f3..b011f40 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
@@ -152,9 +152,9 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase {
setAllowStorageAggregation(context.isNeedStorageAggregation()).setAggCacheMemThreshold(cubeSegment.getCubeInstance().getConfig().getQueryCoprocessorMemGB()).//
setStorageScanRowNumThreshold(context.getThreshold());
- if (cubeDesc.supportsLimitPushDown()) {
- builder.setStoragePushDownLimit(context.getStoragePushDownLimit());
- }
+ if (context.getFinalPushDownLimit() != Integer.MAX_VALUE)
+ builder.setStoragePushDownLimit(context.getFinalPushDownLimit());
+
scanRequest = builder.createGTScanRequest();
} else {
scanRequest = null;
http://git-wip-us.apache.org/repos/asf/kylin/blob/4e8ed97d/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
index f0c2494..31663d0 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
@@ -113,9 +113,13 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
// replace derived columns in filter with host columns; columns on loosened condition must be added to group by
TupleFilter filterD = translateDerived(filter, groupsD);
+ //set whether to aggr at storage
context.setNeedStorageAggregation(isNeedStorageAggregation(cuboid, groupsD, singleValuesD));
+ // set limit push down
enableStorageLimitIfPossible(cuboid, groups, derivedPostAggregation, groupsD, filter, sqlDigest.aggregations, context);
- setThresholdIfNecessary(dimensionsD, metrics, context); // set cautious threshold to prevent out of memory
+ context.setFinalPushDownLimit(cubeInstance);
+ // set cautious threshold to prevent out of memory
+ setThresholdIfNecessary(dimensionsD, metrics, context);
List<CubeSegmentScanner> scanners = Lists.newArrayList();
for (CubeSegment cubeSeg : cubeInstance.getSegments(SegmentStatusEnum.READY)) {
@@ -135,7 +139,7 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
if (scanners.isEmpty())
return ITupleIterator.EMPTY_TUPLE_ITERATOR;
- return new SequentialCubeTupleIterator(scanners, cuboid, dimensionsD, metrics, returnTupleInfo, context, cubeDesc.supportsLimitPushDown());
+ return new SequentialCubeTupleIterator(scanners, cuboid, dimensionsD, metrics, returnTupleInfo, context);
}
protected boolean skipZeroInputSegment(CubeSegment cubeSegment) {
@@ -398,7 +402,7 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
private void enableStorageLimitIfPossible(Cuboid cuboid, Collection<TblColRef> groups, Set<TblColRef> derivedPostAggregation, Collection<TblColRef> groupsD, TupleFilter filter, Collection<FunctionDesc> functionDescs, StorageContext context) {
boolean possible = true;
- boolean goodFilter = filter == null || (TupleFilter.isEvaluableRecursively(filter) && context.isCoprocessorEnabled());
+ boolean goodFilter = filter == null || TupleFilter.isEvaluableRecursively(filter);
if (!goodFilter) {
possible = false;
logger.info("Storage limit push down is impossible because the filter is unevaluatable");
http://git-wip-us.apache.org/repos/asf/kylin/blob/4e8ed97d/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
index 7059473..bef0e88 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
@@ -25,7 +25,6 @@ import java.util.Set;
import javax.annotation.Nullable;
-import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.TblColRef;
@@ -54,7 +53,7 @@ public class SequentialCubeTupleIterator implements ITupleIterator {
private int scanCountDelta;
public SequentialCubeTupleIterator(List<CubeSegmentScanner> scanners, Cuboid cuboid, Set<TblColRef> selectedDimensions, //
- Set<FunctionDesc> selectedMetrics, TupleInfo returnTupleInfo, StorageContext context, boolean supportLimitPushDown) {
+ Set<FunctionDesc> selectedMetrics, TupleInfo returnTupleInfo, StorageContext context) {
this.context = context;
this.scanners = scanners;
@@ -63,8 +62,8 @@ public class SequentialCubeTupleIterator implements ITupleIterator {
segmentCubeTupleIterators.add(new SegmentCubeTupleIterator(scanner, cuboid, selectedDimensions, selectedMetrics, returnTupleInfo, context));
}
- this.storagePushDownLimit = context.getStoragePushDownLimit();
- if (!supportLimitPushDown || storagePushDownLimit > KylinConfig.getInstanceFromEnv().getStoragePushDownLimitMax()) {
+ this.storagePushDownLimit = context.getFinalPushDownLimit();
+ if (storagePushDownLimit == Integer.MAX_VALUE) {
//normal case
tupleIterator = Iterators.concat(segmentCubeTupleIterators.iterator());
} else {
http://git-wip-us.apache.org/repos/asf/kylin/blob/4e8ed97d/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java
new file mode 100644
index 0000000..fe1afd3
--- /dev/null
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.gtrecord;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+
+import javax.annotation.Nullable;
+
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.IGTScanner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterators;
+
+/**
+ * scatter the blob returned from region server to a iterable of gtrecords
+ */
+public class StorageResponseGTScatter implements IGTScanner {
+
+ private static final Logger logger = LoggerFactory.getLogger(StorageResponseGTScatter.class);
+
+ private GTInfo info;
+ private Iterator<byte[]> blocks;
+ private ImmutableBitSet columns;
+ private long totalScannedCount;
+ private int storagePushDownLimit = -1;
+
+ public StorageResponseGTScatter(GTInfo info, Iterator<byte[]> blocks, ImmutableBitSet columns, long totalScannedCount, int storagePushDownLimit) {
+ this.info = info;
+ this.blocks = blocks;
+ this.columns = columns;
+ this.totalScannedCount = totalScannedCount;
+ this.storagePushDownLimit = storagePushDownLimit;
+ }
+
+ @Override
+ public GTInfo getInfo() {
+ return info;
+ }
+
+ @Override
+ public long getScannedRowCount() {
+ return totalScannedCount;
+ }
+
+ @Override
+ public void close() throws IOException {
+ //do nothing
+ }
+
+ @Override
+ public Iterator<GTRecord> iterator() {
+ Iterator<Iterator<GTRecord>> shardSubsets = Iterators.transform(blocks, new EndpointResponseGTScatterFunc());
+ if (storagePushDownLimit != Integer.MAX_VALUE) {
+ return new SortedIteratorMergerWithLimit<GTRecord>(shardSubsets, storagePushDownLimit, GTRecord.getPrimaryKeyComparator()).getIterator();
+ } else {
+ return Iterators.concat(shardSubsets);
+ }
+ }
+
+ class EndpointResponseGTScatterFunc implements Function<byte[], Iterator<GTRecord>> {
+ @Nullable
+ @Override
+ public Iterator<GTRecord> apply(@Nullable final byte[] input) {
+
+ return new Iterator<GTRecord>() {
+ private ByteBuffer inputBuffer = null;
+ //rotate between two buffer GTRecord to support SortedIteratorMergerWithLimit, which will peek one more GTRecord
+ private GTRecord firstRecord = null;
+
+ @Override
+ public boolean hasNext() {
+ if (inputBuffer == null) {
+ inputBuffer = ByteBuffer.wrap(input);
+ firstRecord = new GTRecord(info);
+ }
+
+ return inputBuffer.position() < inputBuffer.limit();
+ }
+
+ @Override
+ public GTRecord next() {
+ firstRecord.loadColumns(columns, inputBuffer);
+ return firstRecord;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/4e8ed97d/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java b/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
index a0262e3..9b3a0fc 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
@@ -257,6 +257,11 @@ public class HybridInstance extends RootPersistentEntity implements IRealization
}
@Override
+ public boolean supportsLimitPushDown() {
+ return false;
+ }
+
+ @Override
public List<TblColRef> getAllDimensions() {
init();
return allDimensions;
http://git-wip-us.apache.org/repos/asf/kylin/blob/4e8ed97d/kylin-it/src/test/java/org/apache/kylin/query/ITCombinationTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/query/ITCombinationTest.java b/kylin-it/src/test/java/org/apache/kylin/query/ITCombinationTest.java
index cbd4e44..f4667af 100644
--- a/kylin-it/src/test/java/org/apache/kylin/query/ITCombinationTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/query/ITCombinationTest.java
@@ -85,10 +85,10 @@ public class ITCombinationTest extends ITKylinQueryTest {
// unset
}
- RemoveBlackoutRealizationsRule.blackouts.clear();
+ RemoveBlackoutRealizationsRule.blackList.clear();
if (excludeViewCubes) {
- RemoveBlackoutRealizationsRule.blackouts.add("CUBE[name=test_kylin_cube_with_view_left_join_empty]");
- RemoveBlackoutRealizationsRule.blackouts.add("CUBE[name=test_kylin_cube_with_view_inner_join_empty]");
+ RemoveBlackoutRealizationsRule.blackList.add("CUBE[name=test_kylin_cube_with_view_left_join_empty]");
+ RemoveBlackoutRealizationsRule.blackList.add("CUBE[name=test_kylin_cube_with_view_inner_join_empty]");
}
if ("v1".equalsIgnoreCase(queryEngine))
http://git-wip-us.apache.org/repos/asf/kylin/blob/4e8ed97d/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
index c1c9767..b9895e8 100644
--- a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
@@ -21,31 +21,22 @@ package org.apache.kylin.query;
import static org.junit.Assert.assertTrue;
import java.io.File;
-import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
-import java.util.Properties;
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.debug.BackdoorToggles;
-import org.apache.kylin.common.util.HBaseMetadataTestCase;
import org.apache.kylin.gridtable.GTScanSelfTerminatedException;
+import org.apache.kylin.gridtable.GTScanTimeoutException;
import org.apache.kylin.gridtable.StorageSideBehavior;
-import org.apache.kylin.metadata.project.ProjectInstance;
import org.apache.kylin.metadata.realization.RealizationType;
-import org.apache.kylin.query.enumerator.OLAPQuery;
-import org.apache.kylin.query.relnode.OLAPContext;
import org.apache.kylin.query.routing.Candidate;
import org.apache.kylin.query.routing.rules.RemoveBlackoutRealizationsRule;
-import org.apache.kylin.query.schema.OLAPSchemaFactory;
import org.apache.kylin.storage.hbase.HBaseStorage;
-import org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer.ObserverEnabler;
import org.dbunit.database.DatabaseConnection;
import org.dbunit.database.IDatabaseConnection;
-import org.hamcrest.BaseMatcher;
-import org.hamcrest.Description;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Ignore;
@@ -73,8 +64,8 @@ public class ITKylinQueryTest extends KylinTestBase {
setupAll();
- RemoveBlackoutRealizationsRule.blackouts.add("CUBE[name=test_kylin_cube_with_view_left_join_empty]");
- RemoveBlackoutRealizationsRule.blackouts.add("CUBE[name=test_kylin_cube_with_view_inner_join_empty]");
+ RemoveBlackoutRealizationsRule.blackList.add("CUBE[name=test_kylin_cube_with_view_left_join_empty]");
+ RemoveBlackoutRealizationsRule.blackList.add("CUBE[name=test_kylin_cube_with_view_inner_join_empty]");
}
@AfterClass
@@ -84,105 +75,69 @@ public class ITKylinQueryTest extends KylinTestBase {
clean();
}
- protected static void setupAll() throws Exception {
- //setup env
- HBaseMetadataTestCase.staticCreateTestMetadata();
- config = KylinConfig.getInstanceFromEnv();
-
- //setup cube conn
- File olapTmp = OLAPSchemaFactory.createTempOLAPJson(ProjectInstance.DEFAULT_PROJECT_NAME, config);
- Properties props = new Properties();
- props.setProperty(OLAPQuery.PROP_SCAN_THRESHOLD, "10001");
- cubeConnection = DriverManager.getConnection("jdbc:calcite:model=" + olapTmp.getAbsolutePath(), props);
-
- //setup h2
- h2Connection = DriverManager.getConnection("jdbc:h2:mem:db" + (h2InstanceCount++), "sa", "");
- // Load H2 Tables (inner join)
- H2Database h2DB = new H2Database(h2Connection, config);
- h2DB.loadAllTables();
-
- }
-
- protected static void clean() {
- if (cubeConnection != null)
- closeConnection(cubeConnection);
- if (h2Connection != null)
- closeConnection(h2Connection);
-
- ObserverEnabler.forceCoprocessorUnset();
- HBaseMetadataTestCase.staticCleanupTestMetadata();
- RemoveBlackoutRealizationsRule.blackouts.clear();
-
- }
-
protected String getQueryFolderPrefix() {
return "";
}
+ protected Throwable findRoot(Throwable throwable) {
+ while (true) {
+ if (throwable.getCause() != null) {
+ throwable = throwable.getCause();
+ } else {
+ break;
+ }
+ }
+ return throwable;
+ }
+
@Test
public void testTimeoutQuery() throws Exception {
if (HBaseStorage.overwriteStorageQuery != null) {
//v1 engine does not suit
return;
}
-
- thrown.expect(SQLException.class);
-
- //should not break at table duplicate check, should fail at model duplicate check
- thrown.expect(new BaseMatcher<Throwable>() {
- @Override
- public boolean matches(Object item) {
-
- //find the "root"
- Throwable throwable = (Throwable) item;
- while (true) {
- if (throwable.getCause() != null) {
- throwable = throwable.getCause();
- } else {
- break;
- }
- }
-
- if (throwable instanceof GTScanSelfTerminatedException) {
- return true;
- }
- return false;
- }
-
- @Override
- public void describeTo(Description description) {
- }
- });
-
- runTimetoutQueries();
-
- }
-
- protected void runTimetoutQueries() throws Exception {
try {
Map<String, String> toggles = Maps.newHashMap();
toggles.put(BackdoorToggles.DEBUG_TOGGLE_COPROCESSOR_BEHAVIOR, StorageSideBehavior.SCAN_FILTER_AGGR_CHECKMEM_WITHDELAY.toString());//delay 10ms for every scan
BackdoorToggles.setToggles(toggles);
- KylinConfig.getInstanceFromEnv().setProperty("kylin.query.cube.visit.timeout.times", "0.03");//set timeout to 9s
+ KylinConfig.getInstanceFromEnv().setProperty("kylin.query.cube.visit.timeout.times", "0.01");//set timeout to 3s
//these two cubes has RAW measure, will disturb limit push down
- RemoveBlackoutRealizationsRule.blackouts.add("CUBE[name=test_kylin_cube_without_slr_left_join_empty]");
- RemoveBlackoutRealizationsRule.blackouts.add("CUBE[name=test_kylin_cube_without_slr_inner_join_empty]");
+ RemoveBlackoutRealizationsRule.blackList.add("CUBE[name=test_kylin_cube_without_slr_left_join_empty]");
+ RemoveBlackoutRealizationsRule.blackList.add("CUBE[name=test_kylin_cube_without_slr_inner_join_empty]");
- execAndCompQuery(getQueryFolderPrefix() + "src/test/resources/query/sql_timeout", null, true);
+ runTimeoutQueries();
} finally {
//these two cubes has RAW measure, will disturb limit push down
- RemoveBlackoutRealizationsRule.blackouts.remove("CUBE[name=test_kylin_cube_without_slr_left_join_empty]");
- RemoveBlackoutRealizationsRule.blackouts.remove("CUBE[name=test_kylin_cube_without_slr_inner_join_empty]");
+ RemoveBlackoutRealizationsRule.blackList.remove("CUBE[name=test_kylin_cube_without_slr_left_join_empty]");
+ RemoveBlackoutRealizationsRule.blackList.remove("CUBE[name=test_kylin_cube_without_slr_inner_join_empty]");
KylinConfig.getInstanceFromEnv().setProperty("kylin.query.cube.visit.timeout.times", "1");//set timeout to 9s
BackdoorToggles.cleanToggles();
}
}
+ protected void runTimeoutQueries() throws Exception {
+ List<File> sqlFiles = getFilesFromFolder(new File(getQueryFolderPrefix() + "src/test/resources/query/sql_timeout"), ".sql");
+ for (File sqlFile : sqlFiles) {
+ try {
+ runSQL(sqlFile, false, false);
+ } catch (SQLException e) {
+
+ System.out.println(e.getMessage());
+
+ if (findRoot(e) instanceof GTScanSelfTerminatedException) {
+ //expected
+ continue;
+ }
+ }
+ throw new RuntimeException("Expecting GTScanTimeoutException");
+ }
+ }
+
//don't try to ignore this test, try to clean your "temp" folder
@Test
public void testTempQuery() throws Exception {
@@ -346,11 +301,25 @@ public class ITKylinQueryTest extends KylinTestBase {
execAndCompDynamicQuery(getQueryFolderPrefix() + "src/test/resources/query/sql_dynamic", null, true);
}
- @Ignore("simple query will be supported by ii")
@Test
public void testLimitEnabled() throws Exception {
- runSqlFile(getQueryFolderPrefix() + "src/test/resources/query/sql_optimize/enable-limit01.sql");
- assertLimitWasEnabled();
+ if (HBaseStorage.overwriteStorageQuery == null) {//v1 query engine will not work
+
+ try {
+ //other cubes have strange aggregation groups
+ RemoveBlackoutRealizationsRule.whiteList.add("CUBE[name=test_kylin_cube_with_slr_empty]");
+
+ List<File> sqlFiles = getFilesFromFolder(new File(getQueryFolderPrefix() + "src/test/resources/query/sql_limit"), ".sql");
+ for (File sqlFile : sqlFiles) {
+ runSQL(sqlFile, false, false);
+ assertTrue(checkLimitEnabled());
+ assertTrue(checkFinalPushDownLimit());
+ }
+
+ } finally {
+ RemoveBlackoutRealizationsRule.whiteList.remove("CUBE[name=test_kylin_cube_with_slr_empty]");
+ }
+ }
}
@Test
@@ -377,13 +346,4 @@ public class ITKylinQueryTest extends KylinTestBase {
this.batchExecuteQuery(getQueryFolderPrefix() + "src/test/resources/query/sql_window");
}
- private void assertLimitWasEnabled() {
- OLAPContext context = getFirstOLAPContext();
- assertTrue(context.storageContext.isLimitEnabled());
- }
-
- private OLAPContext getFirstOLAPContext() {
- return OLAPContext.getThreadLocalContexts().iterator().next();
- }
-
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/4e8ed97d/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java b/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
index 2ad1105..294750e 100644
--- a/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
+++ b/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
@@ -26,6 +26,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.sql.Connection;
+import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
@@ -38,12 +39,20 @@ import java.util.Arrays;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
+import java.util.Properties;
import java.util.Set;
import java.util.TreeSet;
import java.util.logging.LogManager;
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.apache.kylin.metadata.project.ProjectInstance;
+import org.apache.kylin.query.enumerator.OLAPQuery;
+import org.apache.kylin.query.relnode.OLAPContext;
+import org.apache.kylin.query.routing.rules.RemoveBlackoutRealizationsRule;
+import org.apache.kylin.query.schema.OLAPSchemaFactory;
+import org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer.ObserverEnabler;
import org.dbunit.Assertion;
import org.dbunit.database.DatabaseConfig;
import org.dbunit.database.DatabaseConnection;
@@ -568,4 +577,51 @@ public class KylinTestBase {
printInfo(sb.toString());
return count;
}
+
+ protected static void setupAll() throws Exception {
+ //setup env
+ HBaseMetadataTestCase.staticCreateTestMetadata();
+ config = KylinConfig.getInstanceFromEnv();
+
+ //setup cube conn
+ File olapTmp = OLAPSchemaFactory.createTempOLAPJson(ProjectInstance.DEFAULT_PROJECT_NAME, config);
+ Properties props = new Properties();
+ props.setProperty(OLAPQuery.PROP_SCAN_THRESHOLD, "10001");
+ cubeConnection = DriverManager.getConnection("jdbc:calcite:model=" + olapTmp.getAbsolutePath(), props);
+
+ //setup h2
+ h2Connection = DriverManager.getConnection("jdbc:h2:mem:db" + (h2InstanceCount++), "sa", "");
+ // Load H2 Tables (inner join)
+ H2Database h2DB = new H2Database(h2Connection, config);
+ h2DB.loadAllTables();
+
+ }
+
+ protected static void clean() {
+ if (cubeConnection != null)
+ closeConnection(cubeConnection);
+ if (h2Connection != null)
+ closeConnection(h2Connection);
+
+ ObserverEnabler.forceCoprocessorUnset();
+ HBaseMetadataTestCase.staticCleanupTestMetadata();
+ RemoveBlackoutRealizationsRule.blackList.clear();
+
+ }
+
+ protected boolean checkLimitEnabled() {
+ OLAPContext context = getFirstOLAPContext();
+ return (context.storageContext.isLimitEnabled());
+ }
+
+ protected boolean checkFinalPushDownLimit() {
+ OLAPContext context = getFirstOLAPContext();
+ return (context.storageContext.getFinalPushDownLimit() != Integer.MAX_VALUE);
+
+ }
+
+ private OLAPContext getFirstOLAPContext() {
+ return OLAPContext.getThreadLocalContexts().iterator().next();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/4e8ed97d/kylin-it/src/test/resources/query/sql/query45.sql
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/resources/query/sql/query45.sql b/kylin-it/src/test/resources/query/sql/query45.sql
new file mode 100644
index 0000000..0c78657
--- /dev/null
+++ b/kylin-it/src/test/resources/query/sql/query45.sql
@@ -0,0 +1,23 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements. See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership. The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+
+
+select seller_id, sum(price) from test_kylin_fact
+ where lstg_format_name='FP-GTC'
+ group by seller_id limit 20
http://git-wip-us.apache.org/repos/asf/kylin/blob/4e8ed97d/kylin-it/src/test/resources/query/sql_limit/query01.sql
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/resources/query/sql_limit/query01.sql b/kylin-it/src/test/resources/query/sql_limit/query01.sql
new file mode 100644
index 0000000..fca8175
--- /dev/null
+++ b/kylin-it/src/test/resources/query/sql_limit/query01.sql
@@ -0,0 +1,21 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements. See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership. The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+select * from test_kylin_fact
+ where lstg_format_name='FP-GTC'
+ limit 20
http://git-wip-us.apache.org/repos/asf/kylin/blob/4e8ed97d/kylin-it/src/test/resources/query/sql_limit/query02.sql
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/resources/query/sql_limit/query02.sql b/kylin-it/src/test/resources/query/sql_limit/query02.sql
new file mode 100644
index 0000000..53f7bd7
--- /dev/null
+++ b/kylin-it/src/test/resources/query/sql_limit/query02.sql
@@ -0,0 +1,24 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements. See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership. The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+
+
+select seller_id, sum(price) from test_kylin_fact
+ where lstg_format_name='FP-GTC'
+ group by seller_id limit 20
+
http://git-wip-us.apache.org/repos/asf/kylin/blob/4e8ed97d/kylin-it/src/test/resources/query/sql_optimize/enable-limit01.sql
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/resources/query/sql_optimize/enable-limit01.sql b/kylin-it/src/test/resources/query/sql_optimize/enable-limit01.sql
deleted file mode 100644
index 4a62d92..0000000
--- a/kylin-it/src/test/resources/query/sql_optimize/enable-limit01.sql
+++ /dev/null
@@ -1,19 +0,0 @@
---
--- Licensed to the Apache Software Foundation (ASF) under one
--- or more contributor license agreements. See the NOTICE file
--- distributed with this work for additional information
--- regarding copyright ownership. The ASF licenses this file
--- to you under the Apache License, Version 2.0 (the
--- "License"); you may not use this file except in compliance
--- with the License. You may obtain a copy of the License at
---
--- http://www.apache.org/licenses/LICENSE-2.0
---
--- Unless required by applicable law or agreed to in writing, software
--- distributed under the License is distributed on an "AS IS" BASIS,
--- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
--- See the License for the specific language governing permissions and
--- limitations under the License.
---
-
-select * from test_kylin_fact limit 10
http://git-wip-us.apache.org/repos/asf/kylin/blob/4e8ed97d/kylin-it/src/test/resources/query/sql_timeout/query02.sql
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/resources/query/sql_timeout/query02.sql b/kylin-it/src/test/resources/query/sql_timeout/query02.sql
new file mode 100644
index 0000000..2f187a4
--- /dev/null
+++ b/kylin-it/src/test/resources/query/sql_timeout/query02.sql
@@ -0,0 +1,19 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements. See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership. The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+select seller_id,lstg_format_name,sum(price) from test_kylin_fact group by seller_id,lstg_format_name
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/4e8ed97d/query/src/main/java/org/apache/kylin/query/routing/rules/RemoveBlackoutRealizationsRule.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/routing/rules/RemoveBlackoutRealizationsRule.java b/query/src/main/java/org/apache/kylin/query/routing/rules/RemoveBlackoutRealizationsRule.java
index 9c3d7c9..f299d17 100644
--- a/query/src/main/java/org/apache/kylin/query/routing/rules/RemoveBlackoutRealizationsRule.java
+++ b/query/src/main/java/org/apache/kylin/query/routing/rules/RemoveBlackoutRealizationsRule.java
@@ -31,15 +31,22 @@ import com.google.common.collect.Sets;
* for IT use, exclude some cubes
*/
public class RemoveBlackoutRealizationsRule extends RoutingRule {
- public static Set<String> blackouts = Sets.newHashSet();
+ public static Set<String> blackList = Sets.newHashSet();
+ public static Set<String> whiteList = Sets.newHashSet();
@Override
public void apply(List<Candidate> candidates) {
for (Iterator<Candidate> iterator = candidates.iterator(); iterator.hasNext();) {
Candidate candidate = iterator.next();
- if (blackouts.contains(candidate.getRealization().getCanonicalName())) {
+ if (blackList.contains(candidate.getRealization().getCanonicalName())) {
iterator.remove();
+ continue;
+ }
+
+ if (!whiteList.isEmpty() && !whiteList.contains(candidate.getRealization().getCanonicalName())) {
+ iterator.remove();
+ continue;
}
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/4e8ed97d/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index 573951b..c7de287 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -45,6 +45,7 @@ import org.apache.kylin.gridtable.GTInfo;
import org.apache.kylin.gridtable.GTScanRequest;
import org.apache.kylin.gridtable.GTScanSelfTerminatedException;
import org.apache.kylin.gridtable.IGTScanner;
+import org.apache.kylin.storage.gtrecord.StorageResponseGTScatter;
import org.apache.kylin.storage.hbase.HBaseConnection;
import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos;
import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitRequest;
@@ -222,7 +223,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
});
}
- return new GTBlobScatter(fullGTInfo, epResultItr, scanRequest.getColumns(), totalScannedCount.get(), scanRequest.getStoragePushDownLimit());
+ return new StorageResponseGTScatter(fullGTInfo, epResultItr, scanRequest.getColumns(), totalScannedCount.get(), scanRequest.getStoragePushDownLimit());
}
private ByteString serializeGTScanReq(GTScanRequest scanRequest) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/4e8ed97d/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java
index f4729a3..c27e5fc 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java
@@ -85,7 +85,7 @@ class ExpectedSizeIterator implements Iterator<byte[]> {
byte[] ret = null;
while (ret == null && coprocException == null && timeoutTS > System.currentTimeMillis()) {
- ret = queue.poll(5000, TimeUnit.MILLISECONDS);
+ ret = queue.poll(10000, TimeUnit.MILLISECONDS);
}
if (coprocException != null) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/4e8ed97d/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/GTBlobScatter.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/GTBlobScatter.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/GTBlobScatter.java
deleted file mode 100644
index 631510e..0000000
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/GTBlobScatter.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.storage.hbase.cube.v2;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Iterator;
-
-import javax.annotation.Nullable;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.ImmutableBitSet;
-import org.apache.kylin.gridtable.GTInfo;
-import org.apache.kylin.gridtable.GTRecord;
-import org.apache.kylin.gridtable.IGTScanner;
-import org.apache.kylin.storage.gtrecord.SortedIteratorMergerWithLimit;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Iterators;
-
-/**
- * scatter the blob returned from region server to a iterable of gtrecords
- */
-class GTBlobScatter implements IGTScanner {
-
- private static final Logger logger = LoggerFactory.getLogger(GTBlobScatter.class);
-
- private GTInfo info;
- private Iterator<byte[]> blocks;
- private ImmutableBitSet columns;
- private long totalScannedCount;
- private int storagePushDownLimit = -1;
-
- public GTBlobScatter(GTInfo info, Iterator<byte[]> blocks, ImmutableBitSet columns, long totalScannedCount, int storagePushDownLimit) {
- this.info = info;
- this.blocks = blocks;
- this.columns = columns;
- this.totalScannedCount = totalScannedCount;
- this.storagePushDownLimit = storagePushDownLimit;
- }
-
- @Override
- public GTInfo getInfo() {
- return info;
- }
-
- @Override
- public long getScannedRowCount() {
- return totalScannedCount;
- }
-
- @Override
- public void close() throws IOException {
- //do nothing
- }
-
- @Override
- public Iterator<GTRecord> iterator() {
- Iterator<Iterator<GTRecord>> shardSubsets = Iterators.transform(blocks, new GTBlobScatterFunc());
- if (storagePushDownLimit <= KylinConfig.getInstanceFromEnv().getStoragePushDownLimitMax()) {
- return new SortedIteratorMergerWithLimit<GTRecord>(shardSubsets, storagePushDownLimit, GTRecord.getPrimaryKeyComparator()).getIterator();
- } else {
- return Iterators.concat(shardSubsets);
- }
- }
-
- class GTBlobScatterFunc implements Function<byte[], Iterator<GTRecord>> {
- @Nullable
- @Override
- public Iterator<GTRecord> apply(@Nullable final byte[] input) {
-
- return new Iterator<GTRecord>() {
- private ByteBuffer inputBuffer = null;
- //rotate between two buffer GTRecord to support SortedIteratorMergerWithLimit, which will peek one more GTRecord
- private GTRecord firstRecord = null;
- private GTRecord secondRecord = null;
- private GTRecord thirdRecord = null;
- private GTRecord fourthRecord = null;
- private int counter = 0;
-
- @Override
- public boolean hasNext() {
- if (inputBuffer == null) {
- inputBuffer = ByteBuffer.wrap(input);
- firstRecord = new GTRecord(info);
- secondRecord = new GTRecord(info);
- thirdRecord = new GTRecord(info);
- fourthRecord = new GTRecord(info);
- }
-
- return inputBuffer.position() < inputBuffer.limit();
- }
-
- @Override
- public GTRecord next() {
- firstRecord.loadColumns(columns, inputBuffer);
- //logger.info("A GTRecord: " + System.identityHashCode(this) + " " + firstRecord + " " + System.identityHashCode(firstRecord));
- return firstRecord;
- // GTRecord temp = new GTRecord(info);
- // temp.loadColumns(columns, inputBuffer);
- // return temp;
-
- // counter++;
- // int index = counter % 4;
- // if (index == 1) {
- // firstRecord.loadColumns(columns, inputBuffer);
- // //logger.info("A GTRecord: " + System.identityHashCode(this) + " " + firstRecord + " " + System.identityHashCode(firstRecord));
- // return firstRecord;
- // } else if (index == 2) {
- // secondRecord.loadColumns(columns, inputBuffer);
- // //logger.info("B GTRecord: " + System.identityHashCode(this) + " " + secondRecord + " " + System.identityHashCode(secondRecord));
- // return secondRecord;
- // } else if (index == 3) {
- // thirdRecord.loadColumns(columns, inputBuffer);
- // //logger.info("C GTRecord: " + System.identityHashCode(this) + " " + thirdRecord + " " + System.identityHashCode(thirdRecord));
- // return thirdRecord;
- // } else {
- // fourthRecord.loadColumns(columns, inputBuffer);
- // //logger.info("D GTRecord: " + System.identityHashCode(this) + " " + fourthRecord + " " + System.identityHashCode(fourthRecord));
- // return fourthRecord;
- // }
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
- };
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/4e8ed97d/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
index ffe41c5..13a7b53 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
@@ -246,6 +246,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
final MutableBoolean scanNormalComplete = new MutableBoolean(true);
final long deadline = scanReq.getTimeout() + this.serviceStartTime;
+ logger.info("deadline is " + deadline);
final long storagePushDownLimit = scanReq.getStoragePushDownLimit();
final CellListIterator cellListIterator = new CellListIterator() {