You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2016/04/13 05:39:50 UTC
[1/8] kylin git commit: minor bug fix in removing ext filter from
project
Repository: kylin
Updated Branches:
refs/heads/master aaf3b870c -> 59cb57ca6
minor bug fix in removing ext filter from project
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/b6c893db
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/b6c893db
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/b6c893db
Branch: refs/heads/master
Commit: b6c893dbeefead2523fc686055736526d4f3ca3e
Parents: b2a3861
Author: Hongbin Ma <ma...@apache.org>
Authored: Tue Apr 12 16:01:45 2016 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Wed Apr 13 11:11:15 2016 +0800
----------------------------------------------------------------------
.../java/org/apache/kylin/metadata/MetadataManager.java | 12 ++++--------
.../apache/kylin/metadata/project/ProjectInstance.java | 9 +++++----
2 files changed, 9 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/b6c893db/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java b/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java
index aa249fd..3391ef4 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java
@@ -143,7 +143,7 @@ public class MetadataManager {
return Lists.newArrayList(srcTableMap.values());
}
- public List<ExternalFilterDesc> listAllExternalFilters(){
+ public List<ExternalFilterDesc> listAllExternalFilters() {
return Lists.newArrayList(extFilterMap.values());
}
@@ -217,27 +217,23 @@ public class MetadataManager {
}
public void saveExternalFilter(ExternalFilterDesc desc) throws IOException {
- if(desc.getUuid() == null){
+ if (desc.getUuid() == null) {
throw new IllegalArgumentException("UUID not set.");
}
String path = desc.getResourcePath();
- getStore().putResource(path,desc,EXTERNAL_FILTER_DESC_SERIALIZER);
+ getStore().putResource(path, desc, EXTERNAL_FILTER_DESC_SERIALIZER);
desc = reloadExternalFilterAt(path);
- extFilterMap.put(desc.getName(),desc);
+ extFilterMap.put(desc.getName(), desc);
}
public void removeExternalFilter(String name) throws IOException {
- if(name !=null ){
- name = name.toLowerCase();
- }
String path = ExternalFilterDesc.concatResourcePath(name);
getStore().deleteResource(path);
extFilterMap.remove(name);
}
-
private void init(KylinConfig config) throws IOException {
this.config = config;
this.srcTableMap = new CaseInsensitiveStringCache<>(config, Broadcaster.TYPE.TABLE);
http://git-wip-us.apache.org/repos/asf/kylin/blob/b6c893db/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java
index 9567da3..74f843f 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java
@@ -210,9 +210,13 @@ public class ProjectInstance extends RootPersistentEntity {
public void removeTable(String tableName) {
tables.remove(tableName.toUpperCase());
}
+
+ public void addExtFilter(String extFilterName){
+ this.getExtFilters().add(extFilterName);
+ }
public void removeExtFilter(String filterName) {
- extFilters.remove(filterName.toUpperCase());
+ extFilters.remove(filterName);
}
public int getTablesCount() {
@@ -227,9 +231,6 @@ public class ProjectInstance extends RootPersistentEntity {
return tables;
}
- public void addExtFilter(String extFilterName){
- this.getExtFilters().add(extFilterName.toLowerCase());
- }
public Set<String> getExtFilters(){
return extFilters;
[5/8] kylin git commit: refactor
Posted by ma...@apache.org.
refactor
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/b26b2489
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/b26b2489
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/b26b2489
Branch: refs/heads/master
Commit: b26b2489baddcfd148eaf8f17330878bbf349048
Parents: aaf3b87
Author: Hongbin Ma <ma...@apache.org>
Authored: Mon Apr 11 17:33:07 2016 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Wed Apr 13 11:11:15 2016 +0800
----------------------------------------------------------------------
.../kylin/job/DeployLocalMetaToRemoteTest.java | 4 +-
.../java/org/apache/kylin/job/DeployUtil.java | 4 +-
.../org/apache/kylin/common/util/ByteArray.java | 11 +-
.../kylin/common/util/Log4jConfigurer.java | 2 +
.../common/util/AbstractKylinTestCase.java | 14 +-
.../common/util/HBaseMetadataTestCase.java | 4 +-
.../common/util/LocalFileMetadataTestCase.java | 8 +-
.../org/apache/kylin/cube/cuboid/Cuboid.java | 18 ++
.../apache/kylin/cube/kv/LazyRowKeyEncoder.java | 25 --
.../org/apache/kylin/cube/model/CubeDesc.java | 5 +
.../org/apache/kylin/gridtable/GTRecord.java | 10 +-
.../org/apache/kylin/gridtable/GTScanRange.java | 2 -
.../kylin/gridtable/GTScanRangePlanner.java | 294 ++++++++++++++++---
.../apache/kylin/gridtable/GTScanRequest.java | 68 +++--
.../org/apache/kylin/gridtable/IGTStorage.java | 27 ++
.../apache/kylin/gridtable/ScannerWorker.java | 72 +++++
.../kylin/cube/AggregationGroupRuleTest.java | 19 +-
.../apache/kylin/cube/RowKeyAttrRuleTest.java | 5 +-
.../cube/inmemcubing/DoggedCubeBuilderTest.java | 160 ----------
.../cube/inmemcubing/InMemCubeBuilderTest.java | 268 -----------------
.../kylin/gridtable/DictGridTableTest.java | 58 ++--
.../impl/threadpool/DefaultSchedulerTest.java | 151 ----------
.../filter/UDF/MassInValueProviderFactory.java | 1 -
.../apache/kylin/storage/StorageMockUtils.java | 189 ++++++++++++
.../kylin/storage/cache/StorageMockUtils.java | 157 ----------
kylin-it/pom.xml | 7 +
.../ITDoggedCubeBuilderStressTest.java | 8 +-
.../inmemcubing/ITDoggedCubeBuilderTest.java | 163 ++++++++++
.../inmemcubing/ITInMemCubeBuilderTest.java | 271 +++++++++++++++++
.../impl/threadpool/ITDefaultSchedulerTest.java | 154 ++++++++++
.../kylin/provision/BuildCubeWithEngine.java | 4 +-
.../kylin/provision/BuildCubeWithSpark.java | 6 +-
.../kylin/provision/BuildCubeWithStream.java | 4 +-
.../kylin/provision/BuildIIWithEngine.java | 4 +-
.../kylin/provision/BuildIIWithStream.java | 2 +-
.../org/apache/kylin/query/KylinTestBase.java | 11 -
.../kylin/storage/hbase/ITStorageTest.java | 2 +-
source-hive/pom.xml | 7 +
.../kylin/source/hive/HiveCmdBuilderTest.java | 6 +-
.../hbase/cube/v2/CubeHBaseEndpointRPC.java | 111 ++++---
.../storage/hbase/cube/v2/CubeHBaseRPC.java | 56 +---
.../storage/hbase/cube/v2/CubeHBaseScanRPC.java | 94 ++++--
.../hbase/cube/v2/CubeSegmentScanner.java | 215 +-------------
.../storage/hbase/cube/v2/CubeStorageQuery.java | 30 +-
.../kylin/storage/hbase/cube/v2/RawScan.java | 10 +
.../coprocessor/endpoint/CubeVisitService.java | 88 ++++--
.../hbase/steps/SandboxMetastoreCLI.java | 2 +-
47 files changed, 1539 insertions(+), 1292 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/assembly/src/test/java/org/apache/kylin/job/DeployLocalMetaToRemoteTest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/DeployLocalMetaToRemoteTest.java b/assembly/src/test/java/org/apache/kylin/job/DeployLocalMetaToRemoteTest.java
index 1267ab7..8494607 100644
--- a/assembly/src/test/java/org/apache/kylin/job/DeployLocalMetaToRemoteTest.java
+++ b/assembly/src/test/java/org/apache/kylin/job/DeployLocalMetaToRemoteTest.java
@@ -45,7 +45,7 @@ public class DeployLocalMetaToRemoteTest {
public static void beforeClass() throws Exception {
logger.info("Adding to classpath: " + new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
- System.setProperty(KylinConfig.KYLIN_CONF, "../examples/test_case_data/sandbox");
+ System.setProperty(KylinConfig.KYLIN_CONF, HBaseMetadataTestCase.SANDBOX_TEST_DATA);
if (StringUtils.isEmpty(System.getProperty("hdp.version"))) {
throw new RuntimeException("No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.2.4.2-2");
}
@@ -53,7 +53,7 @@ public class DeployLocalMetaToRemoteTest {
@Before
public void before() throws Exception {
- HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA);
+ HBaseMetadataTestCase.staticCreateTestMetadata(HBaseMetadataTestCase.SANDBOX_TEST_DATA);
DeployUtil.initCliWorkDir();
DeployUtil.deployMetadata();
http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
index 2f51475..513f546 100644
--- a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
+++ b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
@@ -34,7 +34,7 @@ import org.apache.commons.lang.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.persistence.ResourceTool;
-import org.apache.kylin.common.util.AbstractKylinTestCase;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeUpdate;
@@ -66,7 +66,7 @@ public class DeployUtil {
public static void deployMetadata() throws IOException {
// install metadata to hbase
ResourceTool.reset(config());
- ResourceTool.copy(KylinConfig.createInstanceFromUri(AbstractKylinTestCase.LOCALMETA_TEST_DATA), config());
+ ResourceTool.copy(KylinConfig.createInstanceFromUri(LocalFileMetadataTestCase.LOCALMETA_TEST_DATA), config());
// update cube desc signature.
for (CubeInstance cube : CubeManager.getInstance(config()).listAllCubes()) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/core-common/src/main/java/org/apache/kylin/common/util/ByteArray.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/ByteArray.java b/core-common/src/main/java/org/apache/kylin/common/util/ByteArray.java
index 5e35257..d850f8e 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/ByteArray.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/ByteArray.java
@@ -100,7 +100,12 @@ public class ByteArray implements Comparable<ByteArray>, Serializable {
}
public ByteArray copy() {
- ByteArray copy = new ByteArray(length);
+ ByteArray copy;
+ if (data != null) {
+ copy = new ByteArray(length);
+ } else {
+ copy = new ByteArray(null);
+ }
copy.copyFrom(this);
return copy;
}
@@ -116,7 +121,9 @@ public class ByteArray implements Comparable<ByteArray>, Serializable {
}
public void copyFrom(ByteArray other) {
- System.arraycopy(other.array(), other.offset, data, offset, other.length);
+ if (other.data != null) {
+ System.arraycopy(other.array(), other.offset, data, offset, other.length);
+ }
this.length = other.length;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/core-common/src/main/java/org/apache/kylin/common/util/Log4jConfigurer.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/Log4jConfigurer.java b/core-common/src/main/java/org/apache/kylin/common/util/Log4jConfigurer.java
index fe0c55a..696aaff 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/Log4jConfigurer.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/Log4jConfigurer.java
@@ -21,6 +21,7 @@ package org.apache.kylin.common.util;
import java.util.Enumeration;
import org.apache.log4j.ConsoleAppender;
+import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.log4j.PatternLayout;
@@ -35,6 +36,7 @@ public class Log4jConfigurer {
public static void initLogger() {
if (!INITIALIZED && !isConfigured()) {
org.apache.log4j.BasicConfigurator.configure(new ConsoleAppender(new PatternLayout(DEFAULT_PATTERN_LAYOUT)));
+ LogManager.getRootLogger().setLevel(Level.DEBUG);
}
INITIALIZED = true;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/core-common/src/test/java/org/apache/kylin/common/util/AbstractKylinTestCase.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/util/AbstractKylinTestCase.java b/core-common/src/test/java/org/apache/kylin/common/util/AbstractKylinTestCase.java
index d517930..3a4b7bb 100644
--- a/core-common/src/test/java/org/apache/kylin/common/util/AbstractKylinTestCase.java
+++ b/core-common/src/test/java/org/apache/kylin/common/util/AbstractKylinTestCase.java
@@ -28,16 +28,14 @@ import org.apache.kylin.common.KylinConfig;
*/
public abstract class AbstractKylinTestCase {
- public static final String LOCALMETA_TEST_DATA = "../examples/test_case_data/localmeta";
- public static final String SANDBOX_TEST_DATA = "../examples/test_case_data/sandbox";
-
- public static final String[] SERVICES_WITH_CACHE = {//
- "org.apache.kylin.cube.CubeManager",//
+ public static final String[] SERVICES_WITH_CACHE = { //
+ "org.apache.kylin.cube.CubeManager", //
"org.apache.kylin.cube.CubeDescManager", //
- "org.apache.kylin.invertedindex.IIDescManager",//
- "org.apache.kylin.invertedindex.IIManager",//
- "org.apache.kylin.storage.hybrid.HybridManager", "org.apache.kylin.metadata.realization.RealizationRegistry", //
+ "org.apache.kylin.invertedindex.IIDescManager", //
+ "org.apache.kylin.invertedindex.IIManager", //
+ "org.apache.kylin.storage.hybrid.HybridManager", //
+ "org.apache.kylin.metadata.realization.RealizationRegistry", //
"org.apache.kylin.metadata.project.ProjectManager", //
"org.apache.kylin.metadata.MetadataManager" //
};
http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/core-common/src/test/java/org/apache/kylin/common/util/HBaseMetadataTestCase.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/util/HBaseMetadataTestCase.java b/core-common/src/test/java/org/apache/kylin/common/util/HBaseMetadataTestCase.java
index e0be1c2..fdf066b 100644
--- a/core-common/src/test/java/org/apache/kylin/common/util/HBaseMetadataTestCase.java
+++ b/core-common/src/test/java/org/apache/kylin/common/util/HBaseMetadataTestCase.java
@@ -26,9 +26,11 @@ import org.apache.kylin.common.KylinConfig;
*/
public class HBaseMetadataTestCase extends AbstractKylinTestCase {
+ public static String SANDBOX_TEST_DATA = "../examples/test_case_data/sandbox";
+
static {
try {
- ClassUtil.addClasspath(new File("../examples/test_case_data/sandbox/").getAbsolutePath());
+ ClassUtil.addClasspath(new File(SANDBOX_TEST_DATA).getAbsolutePath());
} catch (Exception e) {
e.printStackTrace();
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/core-common/src/test/java/org/apache/kylin/common/util/LocalFileMetadataTestCase.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/util/LocalFileMetadataTestCase.java b/core-common/src/test/java/org/apache/kylin/common/util/LocalFileMetadataTestCase.java
index 70c849c..e2f2d8b 100644
--- a/core-common/src/test/java/org/apache/kylin/common/util/LocalFileMetadataTestCase.java
+++ b/core-common/src/test/java/org/apache/kylin/common/util/LocalFileMetadataTestCase.java
@@ -27,9 +27,15 @@ import org.apache.kylin.common.persistence.ResourceStore;
public class LocalFileMetadataTestCase extends AbstractKylinTestCase {
+ public static String LOCALMETA_TEST_DATA = "../examples/test_case_data/localmeta";
+
@Override
public void createTestMetadata() {
- staticCreateTestMetadata();
+ staticCreateTestMetadata(getLocalMetaTestData());
+ }
+
+ protected String getLocalMetaTestData() {
+ return LOCALMETA_TEST_DATA;
}
public static void staticCreateTestMetadata() {
http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java
index 7f8d2b8..e8a75d7 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java
@@ -24,6 +24,7 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.lang.StringUtils;
@@ -33,6 +34,7 @@ import org.apache.kylin.cube.model.AggregationGroup;
import org.apache.kylin.cube.model.AggregationGroup.HierarchyMask;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.cube.model.RowKeyColDesc;
+import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.TblColRef;
import com.google.common.base.Function;
@@ -52,6 +54,20 @@ public class Cuboid implements Comparable<Cuboid> {
}
};
+ public static Cuboid identifyCuboid(CubeDesc cubeDesc, Set<TblColRef> dimensions, Collection<FunctionDesc> metrics) {
+ for (FunctionDesc metric : metrics) {
+ if (metric.getMeasureType().onlyAggrInBaseCuboid())
+ return Cuboid.getBaseCuboid(cubeDesc);
+ }
+
+ long cuboidID = 0;
+ for (TblColRef column : dimensions) {
+ int index = cubeDesc.getRowkey().getColumnBitIndex(column);
+ cuboidID |= 1L << index;
+ }
+ return Cuboid.findById(cubeDesc, cuboidID);
+ }
+
public static Cuboid findById(CubeDesc cube, byte[] cuboidID) {
return findById(cube, Bytes.toLong(cuboidID));
}
@@ -397,6 +413,8 @@ public class Cuboid implements Comparable<Cuboid> {
return cuboidToGridTableMapping;
}
+
+
public static String getDisplayName(long cuboidID, int dimensionCount) {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < dimensionCount; ++i) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/core-cube/src/main/java/org/apache/kylin/cube/kv/LazyRowKeyEncoder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/LazyRowKeyEncoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/LazyRowKeyEncoder.java
index c93f65e..9c3d037 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/LazyRowKeyEncoder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/LazyRowKeyEncoder.java
@@ -18,15 +18,9 @@
package org.apache.kylin.cube.kv;
-import com.google.common.collect.Lists;
-import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.common.util.ShardingHash;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.Cuboid;
-import java.util.Arrays;
-import java.util.List;
-
/**
* A LazyRowKeyEncoder will not try to calculate shard
* It works for both enableSharding or non-enableSharding scenario
@@ -45,23 +39,4 @@ public class LazyRowKeyEncoder extends RowKeyEncoder {
}
}
-
- //for non-sharding cases it will only return one byte[] with not shard at beginning
- public List<byte[]> getRowKeysDifferentShards(byte[] halfCookedKey) {
- final short cuboidShardNum = cubeSeg.getCuboidShardNum(cuboid.getId());
-
- if (!enableSharding) {
- return Lists.newArrayList(halfCookedKey);//not shard to append at head, so it is already well cooked
- } else {
- List<byte[]> ret = Lists.newArrayList();
- for (short i = 0; i < cuboidShardNum; ++i) {
- short shard = ShardingHash.normalize(cubeSeg.getCuboidBaseShard(cuboid.getId()), i, cubeSeg.getTotalShards());
- byte[] cookedKey = Arrays.copyOf(halfCookedKey, halfCookedKey.length);
- BytesUtil.writeShort(shard, cookedKey, 0, RowConstants.ROWKEY_SHARDID_LEN);
- ret.add(cookedKey);
- }
- return ret;
- }
-
- }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
index 240cf52..65ba0a5 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
@@ -900,6 +900,11 @@ public class CubeDesc extends RootPersistentEntity {
}
}
+ public TblColRef getColumnByBitIndex(int bitIndex) {
+ RowKeyColDesc[] rowKeyColumns = this.getRowkey().getRowKeyColumns();
+ return rowKeyColumns[rowKeyColumns.length - 1 - bitIndex].getColRef();
+ }
+
public boolean hasMemoryHungryMeasures() {
for (MeasureDesc measure : measures) {
if (measure.getFunction().getMeasureType().isMemoryHungry()) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
index d09ab9a..bccd0c5 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
@@ -47,7 +47,6 @@ public class GTRecord implements Comparable<GTRecord> {
}
this.info = info;
this.maskForEqualHashComp = maskForEqualHashComp;
-
}
public GTRecord(GTInfo info) {
@@ -58,6 +57,15 @@ public class GTRecord implements Comparable<GTRecord> {
this(info, info.colAll, cols);
}
+ public GTRecord(GTRecord other) {
+ this.info = other.info;
+ this.maskForEqualHashComp = other.maskForEqualHashComp;
+ this.cols = new ByteArray[info.getColumnCount()];
+ for (int i = 0; i < other.cols.length; i++) {
+ this.cols[i] = other.cols[i].copy();
+ }
+ }
+
public GTInfo getInfo() {
return info;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRange.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRange.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRange.java
index 56f55f6..e1b38dc 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRange.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRange.java
@@ -29,8 +29,6 @@ public class GTScanRange {
final public GTRecord pkEnd; // inclusive, record must not be null, col[pk].array() can be null to mean unbounded
final public List<GTRecord> fuzzyKeys; // partial matching primary keys
-
-
public GTScanRange(GTRecord pkStart, GTRecord pkEnd) {
this(pkStart, pkEnd, null);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java
index 2307aaf..3f9bac0 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java
@@ -18,69 +18,166 @@
package org.apache.kylin.gridtable;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.debug.BackdoorToggles;
import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.DateFormat;
import org.apache.kylin.common.util.ImmutableBitSet;
import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.common.FuzzyValueCombination;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.gridtable.CubeGridTable;
+import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.metadata.datatype.DataType;
import org.apache.kylin.metadata.filter.CompareTupleFilter;
import org.apache.kylin.metadata.filter.ConstantTupleFilter;
import org.apache.kylin.metadata.filter.LogicalTupleFilter;
import org.apache.kylin.metadata.filter.TupleFilter;
import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
+import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.*;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
public class GTScanRangePlanner {
private static final Logger logger = LoggerFactory.getLogger(GTScanRangePlanner.class);
+ private static final int MAX_SCAN_RANGES = 200;
private static final int MAX_HBASE_FUZZY_KEYS = 100;
- final private GTInfo info;
- final private Pair<ByteArray, ByteArray> segmentStartAndEnd;
- final private TblColRef partitionColRef;
+ protected int maxScanRanges;
+
+ //non-GT
+ protected CubeSegment cubeSegment;
+ protected CubeDesc cubeDesc;
+ protected Cuboid cuboid;
+ protected TupleFilter filter;
+ protected Set<TblColRef> dimensions;
+ protected Set<TblColRef> groupbyDims;
+ protected Set<TblColRef> filterDims;
+ protected Collection<FunctionDesc> metrics;
+
+ //GT
+ protected TupleFilter gtFilter;
+ protected GTInfo gtInfo;
+ protected Pair<ByteArray, ByteArray> gtStartAndEnd;
+ protected TblColRef gtPartitionCol;
+ protected ImmutableBitSet gtDimensions;
+ protected ImmutableBitSet gtAggrGroups;
+ protected ImmutableBitSet gtAggrMetrics;
+ protected String[] gtAggrFuncs;
+ final protected RecordComparator rangeStartComparator;
+ final protected RecordComparator rangeEndComparator;
+ final protected RecordComparator rangeStartEndComparator;
+
+ public GTScanRangePlanner(CubeSegment cubeSegment, Cuboid cuboid, TupleFilter filter, Set<TblColRef> dimensions, Set<TblColRef> groupbyDims, //
+ Collection<FunctionDesc> metrics) {
+
+ this.maxScanRanges = MAX_SCAN_RANGES;
+
+ this.cubeSegment = cubeSegment;
+ this.cubeDesc = cubeSegment.getCubeDesc();
+ this.cuboid = cuboid;
+ this.dimensions = dimensions;
+ this.groupbyDims = groupbyDims;
+ this.filter = filter;
+ this.metrics = metrics;
+ this.filterDims = Sets.newHashSet();
+ TupleFilter.collectColumns(filter, this.filterDims);
+
+ this.gtInfo = CubeGridTable.newGTInfo(cubeSegment, cuboid.getId());
+ CuboidToGridTableMapping mapping = cuboid.getCuboidToGridTableMapping();
+
+ IGTComparator comp = gtInfo.codeSystem.getComparator();
+ //start key GTRecord compare to start key GTRecord
+ this.rangeStartComparator = getRangeStartComparator(comp);
+ //stop key GTRecord compare to stop key GTRecord
+ this.rangeEndComparator = getRangeEndComparator(comp);
+ //start key GTRecord compare to stop key GTRecord
+ this.rangeStartEndComparator = getRangeStartEndComparator(comp);
+
+ //replace the constant values in filter to dictionary codes
+ this.gtFilter = GTUtil.convertFilterColumnsAndConstants(filter, gtInfo, mapping.getCuboidDimensionsInGTOrder(), this.groupbyDims);
- final private RecordComparator rangeStartComparator;
- final private RecordComparator rangeEndComparator;
- final private RecordComparator rangeStartEndComparator;
+ this.gtDimensions = makeGridTableColumns(mapping, dimensions);
+ this.gtAggrGroups = makeGridTableColumns(mapping, replaceDerivedColumns(groupbyDims, cubeSegment.getCubeDesc()));
+ this.gtAggrMetrics = makeGridTableColumns(mapping, metrics);
+ this.gtAggrFuncs = makeAggrFuncs(mapping, metrics);
+
+ if (cubeSegment.getCubeDesc().getModel().getPartitionDesc().isPartitioned()) {
+ int index = mapping.getIndexOf(cubeSegment.getCubeDesc().getModel().getPartitionDesc().getPartitionDateColumnRef());
+ if (index >= 0) {
+ this.gtStartAndEnd = getSegmentStartAndEnd(index);
+ this.gtPartitionCol = gtInfo.colRef(index);
+ }
+ }
+
+ }
/**
+ * constrcut GTScanRangePlanner with incomplete information. only be used for UT
* @param info
- * @param segmentStartAndEnd in GT encoding
- * @param partitionColRef the TblColRef in GT
+ * @param gtStartAndEnd
+ * @param gtPartitionCol
+ * @param gtFilter
*/
- public GTScanRangePlanner(GTInfo info, Pair<ByteArray, ByteArray> segmentStartAndEnd, TblColRef partitionColRef) {
-
- this.info = info;
- this.segmentStartAndEnd = segmentStartAndEnd;
- this.partitionColRef = partitionColRef;
+ public GTScanRangePlanner(GTInfo info, Pair<ByteArray, ByteArray> gtStartAndEnd, TblColRef gtPartitionCol, TupleFilter gtFilter) {
- IGTComparator comp = info.codeSystem.getComparator();
+ this.maxScanRanges = MAX_SCAN_RANGES;
+ this.gtInfo = info;
+ IGTComparator comp = gtInfo.codeSystem.getComparator();
//start key GTRecord compare to start key GTRecord
this.rangeStartComparator = getRangeStartComparator(comp);
//stop key GTRecord compare to stop key GTRecord
this.rangeEndComparator = getRangeEndComparator(comp);
//start key GTRecord compare to stop key GTRecord
this.rangeStartEndComparator = getRangeStartEndComparator(comp);
- }
- // return empty list meaning filter is always false
- public List<GTScanRange> planScanRanges(TupleFilter filter) {
- return planScanRanges(filter, Integer.MAX_VALUE);
+
+ this.gtFilter = gtFilter;
+ this.gtStartAndEnd = gtStartAndEnd;
+ this.gtPartitionCol = gtPartitionCol;
}
+
- // return empty list meaning filter is always false
- public List<GTScanRange> planScanRanges(TupleFilter filter, int maxRanges) {
+ public GTScanRequest planScanRequest(boolean allowPreAggregate) {
+ GTScanRequest scanRequest;
+ List<GTScanRange> scanRanges = this.planScanRanges();
+ if (scanRanges != null && scanRanges.size() != 0) {
+ scanRequest = new GTScanRequest(gtInfo, scanRanges, gtDimensions, gtAggrGroups, gtAggrMetrics, gtAggrFuncs, gtFilter, allowPreAggregate, cubeSegment.getCubeInstance().getConfig().getQueryCoprocessorMemGB());
+ } else {
+ scanRequest = null;
+ }
+ return scanRequest;
+ }
- TupleFilter flatFilter = flattenToOrAndFilter(filter);
+ /**
+ * Overwrite this method to provide smarter storage visit plans
+ * @return
+ */
+ public List<GTScanRange> planScanRanges() {
+ TupleFilter flatFilter = flattenToOrAndFilter(gtFilter);
List<Collection<ColumnRange>> orAndDimRanges = translateToOrAndDimRanges(flatFilter);
@@ -92,11 +189,108 @@ public class GTScanRangePlanner {
}
List<GTScanRange> mergedRanges = mergeOverlapRanges(scanRanges);
- mergedRanges = mergeTooManyRanges(mergedRanges, maxRanges);
+ mergedRanges = mergeTooManyRanges(mergedRanges, maxScanRanges);
return mergedRanges;
}
+ private Pair<ByteArray, ByteArray> getSegmentStartAndEnd(int index) {
+ ByteArray start;
+ if (cubeSegment.getDateRangeStart() != Long.MIN_VALUE) {
+ start = encodeTime(cubeSegment.getDateRangeStart(), index, 1);
+ } else {
+ start = new ByteArray();
+ }
+
+ ByteArray end;
+ if (cubeSegment.getDateRangeEnd() != Long.MAX_VALUE) {
+ end = encodeTime(cubeSegment.getDateRangeEnd(), index, -1);
+ } else {
+ end = new ByteArray();
+ }
+ return Pair.newPair(start, end);
+
+ }
+
+ private ByteArray encodeTime(long ts, int index, int roundingFlag) {
+ String value;
+ DataType partitionColType = gtInfo.getColumnType(index);
+ if (partitionColType.isDate()) {
+ value = DateFormat.formatToDateStr(ts);
+ } else if (partitionColType.isDatetime() || partitionColType.isTimestamp()) {
+ value = DateFormat.formatToTimeWithoutMilliStr(ts);
+ } else if (partitionColType.isStringFamily()) {
+ String partitionDateFormat = cubeSegment.getCubeDesc().getModel().getPartitionDesc().getPartitionDateFormat();
+ if (StringUtils.isEmpty(partitionDateFormat))
+ partitionDateFormat = DateFormat.DEFAULT_DATE_PATTERN;
+ value = DateFormat.formatToDateStr(ts, partitionDateFormat);
+ } else {
+ throw new RuntimeException("Type " + partitionColType + " is not valid partition column type");
+ }
+
+ ByteBuffer buffer = ByteBuffer.allocate(gtInfo.getMaxColumnLength());
+ gtInfo.getCodeSystem().encodeColumnValue(index, value, roundingFlag, buffer);
+
+ return ByteArray.copyOf(buffer.array(), 0, buffer.position());
+ }
+
+ private Set<TblColRef> replaceDerivedColumns(Set<TblColRef> input, CubeDesc cubeDesc) {
+ Set<TblColRef> ret = Sets.newHashSet();
+ for (TblColRef col : input) {
+ if (cubeDesc.hasHostColumn(col)) {
+ for (TblColRef host : cubeDesc.getHostInfo(col).columns) {
+ ret.add(host);
+ }
+ } else {
+ ret.add(col);
+ }
+ }
+ return ret;
+ }
+
+ private ImmutableBitSet makeGridTableColumns(CuboidToGridTableMapping mapping, Set<TblColRef> dimensions) {
+ BitSet result = new BitSet();
+ for (TblColRef dim : dimensions) {
+ int idx = mapping.getIndexOf(dim);
+ if (idx >= 0)
+ result.set(idx);
+ }
+ return new ImmutableBitSet(result);
+ }
+
+ private ImmutableBitSet makeGridTableColumns(CuboidToGridTableMapping mapping, Collection<FunctionDesc> metrics) {
+ BitSet result = new BitSet();
+ for (FunctionDesc metric : metrics) {
+ int idx = mapping.getIndexOf(metric);
+ if (idx < 0)
+ throw new IllegalStateException(metric + " not found in " + mapping);
+ result.set(idx);
+ }
+ return new ImmutableBitSet(result);
+ }
+
+ private String[] makeAggrFuncs(final CuboidToGridTableMapping mapping, Collection<FunctionDesc> metrics) {
+
+ //metrics are represented in ImmutableBitSet, which loses order information
+ //sort the aggrFuns to align with metrics natural order
+ List<FunctionDesc> metricList = Lists.newArrayList(metrics);
+ Collections.sort(metricList, new Comparator<FunctionDesc>() {
+ @Override
+ public int compare(FunctionDesc o1, FunctionDesc o2) {
+ int a = mapping.getIndexOf(o1);
+ int b = mapping.getIndexOf(o2);
+ return a - b;
+ }
+ });
+
+ String[] result = new String[metricList.size()];
+ int i = 0;
+ for (FunctionDesc metric : metricList) {
+ result[i++] = metric.getExpression();
+ }
+ return result;
+ }
+
private String makeReadable(ByteArray byteArray) {
if (byteArray == null) {
return null;
@@ -105,29 +299,29 @@ public class GTScanRangePlanner {
}
}
- private GTScanRange newScanRange(Collection<ColumnRange> andDimRanges) {
- GTRecord pkStart = new GTRecord(info);
- GTRecord pkEnd = new GTRecord(info);
+ protected GTScanRange newScanRange(Collection<ColumnRange> andDimRanges) {
+ GTRecord pkStart = new GTRecord(gtInfo);
+ GTRecord pkEnd = new GTRecord(gtInfo);
Map<Integer, Set<ByteArray>> fuzzyValues = Maps.newHashMap();
List<GTRecord> fuzzyKeys;
for (ColumnRange range : andDimRanges) {
- if (partitionColRef != null && range.column.equals(partitionColRef)) {
- if (rangeStartEndComparator.comparator.compare(segmentStartAndEnd.getFirst(), range.end) <= 0 //
- && (rangeStartEndComparator.comparator.compare(range.begin, segmentStartAndEnd.getSecond()) < 0 //
- || rangeStartEndComparator.comparator.compare(range.begin, segmentStartAndEnd.getSecond()) == 0 //
- && (range.op == FilterOperatorEnum.EQ || range.op == FilterOperatorEnum.LTE || range.op == FilterOperatorEnum.GTE || range.op == FilterOperatorEnum.IN))) {
+ if (gtPartitionCol != null && range.column.equals(gtPartitionCol)) {
+ if (rangeStartEndComparator.comparator.compare(gtStartAndEnd.getFirst(), range.end) <= 0 //
+ && (rangeStartEndComparator.comparator.compare(range.begin, gtStartAndEnd.getSecond()) < 0 //
+ || rangeStartEndComparator.comparator.compare(range.begin, gtStartAndEnd.getSecond()) == 0 //
+ && (range.op == FilterOperatorEnum.EQ || range.op == FilterOperatorEnum.LTE || range.op == FilterOperatorEnum.GTE || range.op == FilterOperatorEnum.IN))) {
//segment range is [Closed,Open), but segmentStartAndEnd.getSecond() might be rounded, so use <= when has equals in condition.
} else {
- logger.debug("Pre-check partition col filter failed, partitionColRef {}, segment start {}, segment end {}, range begin {}, range end {}",//
- new Object[] { partitionColRef, makeReadable(segmentStartAndEnd.getFirst()), makeReadable(segmentStartAndEnd.getSecond()), makeReadable(range.begin), makeReadable(range.end) });
+ logger.debug("Pre-check partition col filter failed, partitionColRef {}, segment start {}, segment end {}, range begin {}, range end {}", //
+ new Object[] { gtPartitionCol, makeReadable(gtStartAndEnd.getFirst()), makeReadable(gtStartAndEnd.getSecond()), makeReadable(range.begin), makeReadable(range.end) });
return null;
}
}
int col = range.column.getColumnDesc().getZeroBasedIndex();
- if (!info.primaryKey.get(col))
+ if (!gtInfo.primaryKey.get(col))
continue;
pkStart.set(col, range.begin);
@@ -139,7 +333,6 @@ public class GTScanRangePlanner {
}
fuzzyKeys = buildFuzzyKeys(fuzzyValues);
-
return new GTScanRange(pkStart, pkEnd, fuzzyKeys);
}
@@ -154,17 +347,16 @@ public class GTScanRangePlanner {
logger.info("The execution of this query will not use fuzzy key");
return result;
}
-
List<Map<Integer, ByteArray>> fuzzyValueCombinations = FuzzyValueCombination.calculate(fuzzyValueSet, MAX_HBASE_FUZZY_KEYS);
for (Map<Integer, ByteArray> fuzzyValue : fuzzyValueCombinations) {
- BitSet bitSet = new BitSet(info.getColumnCount());
+ BitSet bitSet = new BitSet(gtInfo.getColumnCount());
for (Map.Entry<Integer, ByteArray> entry : fuzzyValue.entrySet()) {
bitSet.set(entry.getKey());
}
- GTRecord fuzzy = new GTRecord(info, new ImmutableBitSet(bitSet));
+ GTRecord fuzzy = new GTRecord(gtInfo, new ImmutableBitSet(bitSet));
for (Map.Entry<Integer, ByteArray> entry : fuzzyValue.entrySet()) {
fuzzy.set(entry.getKey(), entry.getValue());
}
@@ -174,7 +366,7 @@ public class GTScanRangePlanner {
return result;
}
- private TupleFilter flattenToOrAndFilter(TupleFilter filter) {
+ protected TupleFilter flattenToOrAndFilter(TupleFilter filter) {
if (filter == null)
return null;
@@ -193,7 +385,7 @@ public class GTScanRangePlanner {
return flatFilter;
}
- private List<Collection<ColumnRange>> translateToOrAndDimRanges(TupleFilter flatFilter) {
+ protected List<Collection<ColumnRange>> translateToOrAndDimRanges(TupleFilter flatFilter) {
List<Collection<ColumnRange>> result = Lists.newArrayList();
if (flatFilter == null) {
@@ -272,7 +464,7 @@ public class GTScanRangePlanner {
return orAndRanges;
}
- private List<GTScanRange> mergeOverlapRanges(List<GTScanRange> ranges) {
+ protected List<GTScanRange> mergeOverlapRanges(List<GTScanRange> ranges) {
if (ranges.size() <= 1) {
return ranges;
}
@@ -339,7 +531,7 @@ public class GTScanRangePlanner {
return new GTScanRange(start, end, newFuzzyKeys);
}
- private List<GTScanRange> mergeTooManyRanges(List<GTScanRange> ranges, int maxRanges) {
+ protected List<GTScanRange> mergeTooManyRanges(List<GTScanRange> ranges, int maxRanges) {
if (ranges.size() <= maxRanges) {
return ranges;
}
@@ -351,7 +543,15 @@ public class GTScanRangePlanner {
return result;
}
- private class ColumnRange {
+ public int getMaxScanRanges() {
+ return maxScanRanges;
+ }
+
+ public void setMaxScanRanges(int maxScanRanges) {
+ this.maxScanRanges = maxScanRanges;
+ }
+
+ protected class ColumnRange {
private TblColRef column;
private ByteArray begin = ByteArray.EMPTY;
private ByteArray end = ByteArray.EMPTY;
@@ -412,7 +612,7 @@ public class GTScanRangePlanner {
if (valueSet != null) {
return valueSet.isEmpty();
} else if (begin.array() != null && end.array() != null) {
- return info.codeSystem.getComparator().compare(begin, end) > 0;
+ return gtInfo.codeSystem.getComparator().compare(begin, end) > 0;
} else {
return false;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
index 1edfb36..c4abb57 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
@@ -37,7 +37,7 @@ import com.google.common.collect.Sets;
public class GTScanRequest {
private GTInfo info;
- private GTScanRange range;
+ private List<GTScanRange> ranges;
private ImmutableBitSet columns;
private transient ImmutableBitSet selectedColBlocks;
@@ -53,18 +53,26 @@ public class GTScanRequest {
private boolean allowPreAggregation = true;
private double aggrCacheGB = 0; // no limit
- public GTScanRequest(GTInfo info, GTScanRange range, ImmutableBitSet columns, TupleFilter filterPushDown) {
+ public GTScanRequest(GTInfo info, List<GTScanRange> ranges, ImmutableBitSet columns, TupleFilter filterPushDown) {
this.info = info;
- this.range = range == null ? new GTScanRange(new GTRecord(info), new GTRecord(info)) : range;
+ if (ranges == null) {
+ this.ranges = Lists.newArrayList(new GTScanRange(new GTRecord(info), new GTRecord(info)));
+ } else {
+ this.ranges = ranges;
+ }
this.columns = columns;
this.filterPushDown = filterPushDown;
validate(info);
}
- public GTScanRequest(GTInfo info, GTScanRange range, ImmutableBitSet dimensions, ImmutableBitSet aggrGroupBy, //
+ public GTScanRequest(GTInfo info, List<GTScanRange> ranges, ImmutableBitSet dimensions, ImmutableBitSet aggrGroupBy, //
ImmutableBitSet aggrMetrics, String[] aggrMetricsFuncs, TupleFilter filterPushDown, boolean allowPreAggregation, double aggrCacheGB) {
this.info = info;
- this.range = range == null ? new GTScanRange(new GTRecord(info), new GTRecord(info)) : range;
+ if (ranges == null) {
+ this.ranges = Lists.newArrayList(new GTScanRange(new GTRecord(info), new GTRecord(info)));
+ } else {
+ this.ranges = ranges;
+ }
this.columns = dimensions;
this.filterPushDown = filterPushDown;
@@ -137,7 +145,7 @@ public class GTScanRequest {
/**
* doFilter,doAggr,doMemCheck are only for profiling use.
* in normal cases they are all true.
- *
+ * <p/>
* Refer to CoprocessorBehavior for explanation
*/
public IGTScanner decorateScanner(IGTScanner scanner, boolean doFilter, boolean doAggr) throws IOException {
@@ -195,16 +203,12 @@ public class GTScanRequest {
return info;
}
- public GTRecord getPkStart() {
- return range.pkStart;
+ public List<GTScanRange> getGTScanRanges() {
+ return ranges;
}
- public GTRecord getPkEnd() {
- return range.pkEnd;
- }
-
- public List<GTRecord> getFuzzyKeys() {
- return range.fuzzyKeys;
+ public void setGTScanRanges(List<GTScanRange> ranges) {
+ this.ranges = ranges;
}
public ImmutableBitSet getSelectedColBlocks() {
@@ -241,7 +245,7 @@ public class GTScanRequest {
@Override
public String toString() {
- return "GTScanRequest [range=" + range + ", columns=" + columns + ", filterPushDown=" + filterPushDown + ", aggrGroupBy=" + aggrGroupBy + ", aggrMetrics=" + aggrMetrics + ", aggrMetricsFuncs=" + Arrays.toString(aggrMetricsFuncs) + "]";
+ return "GTScanRequest [range=" + ranges + ", columns=" + columns + ", filterPushDown=" + filterPushDown + ", aggrGroupBy=" + aggrGroupBy + ", aggrMetrics=" + aggrMetrics + ", aggrMetricsFuncs=" + Arrays.toString(aggrMetricsFuncs) + "]";
}
public static final BytesSerializer<GTScanRequest> serializer = new BytesSerializer<GTScanRequest>() {
@@ -249,11 +253,14 @@ public class GTScanRequest {
public void serialize(GTScanRequest value, ByteBuffer out) {
GTInfo.serializer.serialize(value.info, out);
- serializeGTRecord(value.range.pkStart, out);
- serializeGTRecord(value.range.pkEnd, out);
- BytesUtil.writeVInt(value.range.fuzzyKeys.size(), out);
- for (GTRecord f : value.range.fuzzyKeys) {
- serializeGTRecord(f, out);
+ BytesUtil.writeVInt(value.ranges.size(), out);
+ for (GTScanRange range : value.ranges) {
+ serializeGTRecord(range.pkStart, out);
+ serializeGTRecord(range.pkEnd, out);
+ BytesUtil.writeVInt(range.fuzzyKeys.size(), out);
+ for (GTRecord f : range.fuzzyKeys) {
+ serializeGTRecord(f, out);
+ }
}
ImmutableBitSet.serializer.serialize(value.columns, out);
@@ -270,14 +277,19 @@ public class GTScanRequest {
public GTScanRequest deserialize(ByteBuffer in) {
GTInfo sInfo = GTInfo.serializer.deserialize(in);
- GTRecord sPkStart = deserializeGTRecord(in, sInfo);
- GTRecord sPkEnd = deserializeGTRecord(in, sInfo);
- List<GTRecord> sFuzzyKeys = Lists.newArrayList();
- int sFuzzyKeySize = BytesUtil.readVInt(in);
- for (int i = 0; i < sFuzzyKeySize; i++) {
- sFuzzyKeys.add(deserializeGTRecord(in, sInfo));
+ List<GTScanRange> sRanges = Lists.newArrayList();
+ int sRangesCount = BytesUtil.readVInt(in);
+ for (int rangeIdx = 0; rangeIdx < sRangesCount; rangeIdx++) {
+ GTRecord sPkStart = deserializeGTRecord(in, sInfo);
+ GTRecord sPkEnd = deserializeGTRecord(in, sInfo);
+ List<GTRecord> sFuzzyKeys = Lists.newArrayList();
+ int sFuzzyKeySize = BytesUtil.readVInt(in);
+ for (int i = 0; i < sFuzzyKeySize; i++) {
+ sFuzzyKeys.add(deserializeGTRecord(in, sInfo));
+ }
+ GTScanRange sRange = new GTScanRange(sPkStart, sPkEnd, sFuzzyKeys);
+ sRanges.add(sRange);
}
- GTScanRange sRange = new GTScanRange(sPkStart, sPkEnd, sFuzzyKeys);
ImmutableBitSet sColumns = ImmutableBitSet.serializer.deserialize(in);
TupleFilter sGTFilter = GTUtil.deserializeGTFilter(BytesUtil.readByteArray(in), sInfo);
@@ -288,7 +300,7 @@ public class GTScanRequest {
boolean sAllowPreAggr = (BytesUtil.readVInt(in) == 1);
double sAggrCacheGB = in.getDouble();
- return new GTScanRequest(sInfo, sRange, sColumns, sAggGroupBy, sAggrMetrics, sAggrMetricFuncs, sGTFilter, sAllowPreAggr, sAggrCacheGB);
+ return new GTScanRequest(sInfo, sRanges, sColumns, sAggGroupBy, sAggrMetrics, sAggrMetricFuncs, sGTFilter, sAllowPreAggr, sAggrCacheGB);
}
private void serializeGTRecord(GTRecord gtRecord, ByteBuffer out) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/core-cube/src/main/java/org/apache/kylin/gridtable/IGTStorage.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/IGTStorage.java b/core-cube/src/main/java/org/apache/kylin/gridtable/IGTStorage.java
new file mode 100644
index 0000000..ff95743
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/IGTStorage.java
@@ -0,0 +1,27 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ * /
+ */
+
+package org.apache.kylin.gridtable;
+
+import java.io.IOException;
+
+public interface IGTStorage {
+ IGTScanner getGTScanner(final GTScanRequest scanRequests) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/core-cube/src/main/java/org/apache/kylin/gridtable/ScannerWorker.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/ScannerWorker.java b/core-cube/src/main/java/org/apache/kylin/gridtable/ScannerWorker.java
new file mode 100644
index 0000000..75fa94b
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/ScannerWorker.java
@@ -0,0 +1,72 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ * /
+ */
+
+package org.apache.kylin.gridtable;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Iterator;
+
+import org.apache.kylin.common.debug.BackdoorToggles;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ScannerWorker {
+
+ private static final Logger logger = LoggerFactory.getLogger(ScannerWorker.class);
+ private IGTScanner internal = null;
+
+ public ScannerWorker(CubeSegment cubeSeg, Cuboid cuboid, GTScanRequest scanRequest) {
+ if (scanRequest == null) {
+ logger.info("Segment {} will be skipped", cubeSeg);
+ internal = new EmptyGTScanner(0);
+ return;
+ }
+
+ final GTInfo info = scanRequest.getInfo();
+
+ try {
+ IGTStorage rpc;
+ if ("scan".equalsIgnoreCase(BackdoorToggles.getHbaseCubeQueryProtocol())) {
+ rpc = (IGTStorage) Class.forName("org.apache.kylin.storage.hbase.cube.v2.CubeHBaseScanRPC").getConstructor(CubeSegment.class, Cuboid.class, GTInfo.class).newInstance(cubeSeg, cuboid, info); // for local debug
+ } else {
+ rpc = (IGTStorage) Class.forName("org.apache.kylin.storage.hbase.cube.v2.CubeHBaseEndpointRPC").getConstructor(CubeSegment.class, Cuboid.class, GTInfo.class).newInstance(cubeSeg, cuboid, info); // default behavior
+ }
+ internal = rpc.getGTScanner(scanRequest);
+ } catch (IOException | InstantiationException | InvocationTargetException | IllegalAccessException | ClassNotFoundException | NoSuchMethodException e) {
+ throw new RuntimeException("error", e);
+ }
+ }
+
+ public Iterator<GTRecord> iterator() {
+ return internal.iterator();
+ }
+
+ public void close() throws IOException {
+ internal.close();
+ }
+
+ public int getScannedRowCount() {
+ return internal.getScannedRowCount();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/core-cube/src/test/java/org/apache/kylin/cube/AggregationGroupRuleTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/AggregationGroupRuleTest.java b/core-cube/src/test/java/org/apache/kylin/cube/AggregationGroupRuleTest.java
index 53952c4..6c3d544 100644
--- a/core-cube/src/test/java/org/apache/kylin/cube/AggregationGroupRuleTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/cube/AggregationGroupRuleTest.java
@@ -27,6 +27,7 @@ import java.io.IOException;
import java.util.Arrays;
import org.apache.kylin.common.util.JsonUtil;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.cube.model.validation.IValidatorRule;
import org.apache.kylin.cube.model.validation.ValidateContext;
@@ -39,7 +40,7 @@ public class AggregationGroupRuleTest {
public void testGoodDesc() throws IOException {
AggregationGroupRule rule = getAggregationGroupRule();
- for (File f : new File("../examples/test_case_data/localmeta/cube_desc/").listFiles()) {
+ for (File f : new File(LocalFileMetadataTestCase.LOCALMETA_TEST_DATA + "/cube_desc/").listFiles()) {
CubeDesc desc = JsonUtil.readValue(new FileInputStream(f), CubeDesc.class);
ValidateContext vContext = new ValidateContext();
rule.validate(desc, vContext);
@@ -57,7 +58,7 @@ public class AggregationGroupRuleTest {
}
};
- for (File f : new File("../examples/test_case_data/localmeta/cube_desc/").listFiles()) {
+ for (File f : new File(LocalFileMetadataTestCase.LOCALMETA_TEST_DATA + "/cube_desc/").listFiles()) {
System.out.println(f.getName());
CubeDesc desc = JsonUtil.readValue(new FileInputStream(f), CubeDesc.class);
ValidateContext vContext = new ValidateContext();
@@ -72,9 +73,9 @@ public class AggregationGroupRuleTest {
public void testGoodDesc2() throws IOException {
ValidateContext vContext = new ValidateContext();
- CubeDesc desc = JsonUtil.readValue(new FileInputStream("../examples/test_case_data/localmeta/cube_desc/test_kylin_cube_with_slr_desc.json"), CubeDesc.class);
- desc.getAggregationGroups().get(0).getSelectRule().joint_dims = new String[][] {//
- new String[] { "lstg_format_name", "lstg_site_id", "slr_segment_cd", "CATEG_LVL2_NAME" } };
+ CubeDesc desc = JsonUtil.readValue(new FileInputStream(LocalFileMetadataTestCase.LOCALMETA_TEST_DATA + "/cube_desc/test_kylin_cube_with_slr_desc.json"), CubeDesc.class);
+ desc.getAggregationGroups().get(0).getSelectRule().joint_dims = new String[][] { //
+ new String[] { "lstg_format_name", "lstg_site_id", "slr_segment_cd", "CATEG_LVL2_NAME" } };
IValidatorRule<CubeDesc> rule = getAggregationGroupRule();
rule.validate(desc, vContext);
@@ -86,7 +87,7 @@ public class AggregationGroupRuleTest {
public void testBadDesc1() throws IOException {
ValidateContext vContext = new ValidateContext();
- CubeDesc desc = JsonUtil.readValue(new FileInputStream("../examples/test_case_data/localmeta/cube_desc/test_kylin_cube_with_slr_desc.json"), CubeDesc.class);
+ CubeDesc desc = JsonUtil.readValue(new FileInputStream(LocalFileMetadataTestCase.LOCALMETA_TEST_DATA + "/cube_desc/test_kylin_cube_with_slr_desc.json"), CubeDesc.class);
String[] temp = Arrays.asList(desc.getAggregationGroups().get(0).getIncludes()).subList(0, 3).toArray(new String[3]);
desc.getAggregationGroups().get(0).setIncludes(temp);
@@ -103,9 +104,9 @@ public class AggregationGroupRuleTest {
public void testBadDesc2() throws IOException {
ValidateContext vContext = new ValidateContext();
- CubeDesc desc = JsonUtil.readValue(new FileInputStream("../examples/test_case_data/localmeta/cube_desc/test_kylin_cube_with_slr_desc.json"), CubeDesc.class);
- desc.getAggregationGroups().get(0).getSelectRule().joint_dims = new String[][] {//
- new String[] { "lstg_format_name", "lstg_site_id", "slr_segment_cd", "META_CATEG_NAME", "CATEG_LVL2_NAME" } };
+ CubeDesc desc = JsonUtil.readValue(new FileInputStream(LocalFileMetadataTestCase.LOCALMETA_TEST_DATA + "/cube_desc/test_kylin_cube_with_slr_desc.json"), CubeDesc.class);
+ desc.getAggregationGroups().get(0).getSelectRule().joint_dims = new String[][] { //
+ new String[] { "lstg_format_name", "lstg_site_id", "slr_segment_cd", "META_CATEG_NAME", "CATEG_LVL2_NAME" } };
IValidatorRule<CubeDesc> rule = getAggregationGroupRule();
rule.validate(desc, vContext);
http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/core-cube/src/test/java/org/apache/kylin/cube/RowKeyAttrRuleTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/RowKeyAttrRuleTest.java b/core-cube/src/test/java/org/apache/kylin/cube/RowKeyAttrRuleTest.java
index b07d360..636102c 100644
--- a/core-cube/src/test/java/org/apache/kylin/cube/RowKeyAttrRuleTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/cube/RowKeyAttrRuleTest.java
@@ -25,6 +25,7 @@ import java.io.FileInputStream;
import java.io.IOException;
import org.apache.kylin.common.util.JsonUtil;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.cube.model.validation.IValidatorRule;
import org.apache.kylin.cube.model.validation.ValidateContext;
@@ -35,7 +36,7 @@ public class RowKeyAttrRuleTest {
@Test
public void testGoodDesc() throws IOException {
- for (File f : new File("../examples/test_case_data/localmeta/cube_desc/").listFiles()) {
+ for (File f : new File(LocalFileMetadataTestCase.LOCALMETA_TEST_DATA + "/cube_desc/").listFiles()) {
CubeDesc desc = JsonUtil.readValue(new FileInputStream(f), CubeDesc.class);
ValidateContext vContext = new ValidateContext();
IValidatorRule<CubeDesc> rule = new RowKeyAttrRule();
@@ -48,7 +49,7 @@ public class RowKeyAttrRuleTest {
@Test
public void testBadDesc() throws IOException {
ValidateContext vContext = new ValidateContext();
- CubeDesc desc = JsonUtil.readValue(new FileInputStream("../examples/test_case_data/localmeta/cube_desc/test_kylin_cube_with_slr_desc.json"), CubeDesc.class);
+ CubeDesc desc = JsonUtil.readValue(new FileInputStream(LocalFileMetadataTestCase.LOCALMETA_TEST_DATA + "/cube_desc/test_kylin_cube_with_slr_desc.json"), CubeDesc.class);
desc.getRowkey().getRowKeyColumns()[2].setColumn("");
IValidatorRule<CubeDesc> rule = new RowKeyAttrRule();
rule.validate(desc, vContext);
http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilderTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilderTest.java b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilderTest.java
deleted file mode 100644
index 80e3df1..0000000
--- a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilderTest.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.cube.inmemcubing;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.PrintWriter;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.LocalFileMetadataTestCase;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.dimension.Dictionary;
-import org.apache.kylin.gridtable.GTRecord;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- */
-public class DoggedCubeBuilderTest extends LocalFileMetadataTestCase {
-
- @SuppressWarnings("unused")
- private static final Logger logger = LoggerFactory.getLogger(DoggedCubeBuilderTest.class);
-
- private static final int INPUT_ROWS = 10000;
- private static final int SPLIT_ROWS = 5000;
- private static final int THREADS = 4;
-
- private static CubeInstance cube;
- private static String flatTable;
- private static Map<TblColRef, Dictionary<String>> dictionaryMap;
-
- @BeforeClass
- public static void before() throws IOException {
- staticCreateTestMetadata();
-
- KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
- CubeManager cubeManager = CubeManager.getInstance(kylinConfig);
-
- cube = cubeManager.getCube("test_kylin_cube_without_slr_left_join_empty");
- flatTable = "../examples/test_case_data/localmeta/data/flatten_data_for_without_slr_left_join.csv";
- dictionaryMap = InMemCubeBuilderTest.getDictionaryMap(cube, flatTable);
- }
-
- @AfterClass
- public static void after() throws Exception {
- staticCleanupTestMetadata();
- }
-
- @Test
- public void test() throws Exception {
-
- ArrayBlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(1000);
- ExecutorService executorService = Executors.newSingleThreadExecutor();
- long randSeed = System.currentTimeMillis();
-
- DoggedCubeBuilder doggedBuilder = new DoggedCubeBuilder(cube.getDescriptor(), dictionaryMap);
- doggedBuilder.setConcurrentThreads(THREADS);
- doggedBuilder.setSplitRowThreshold(SPLIT_ROWS);
- FileRecordWriter doggedResult = new FileRecordWriter();
-
- {
- Future<?> future = executorService.submit(doggedBuilder.buildAsRunnable(queue, doggedResult));
- InMemCubeBuilderTest.feedData(cube, flatTable, queue, INPUT_ROWS, randSeed);
- future.get();
- doggedResult.close();
- }
-
- InMemCubeBuilder inmemBuilder = new InMemCubeBuilder(cube.getDescriptor(), dictionaryMap);
- inmemBuilder.setConcurrentThreads(THREADS);
- FileRecordWriter inmemResult = new FileRecordWriter();
-
- {
- Future<?> future = executorService.submit(inmemBuilder.buildAsRunnable(queue, inmemResult));
- InMemCubeBuilderTest.feedData(cube, flatTable, queue, INPUT_ROWS, randSeed);
- future.get();
- inmemResult.close();
- }
-
- fileCompare(doggedResult.file, inmemResult.file);
- doggedResult.file.delete();
- inmemResult.file.delete();
- }
-
- private void fileCompare(File file, File file2) throws IOException {
- BufferedReader r1 = new BufferedReader(new InputStreamReader(new FileInputStream(file), "UTF-8"));
- BufferedReader r2 = new BufferedReader(new InputStreamReader(new FileInputStream(file2), "UTF-8"));
-
- String line1, line2;
- do {
- line1 = r1.readLine();
- line2 = r2.readLine();
-
- assertEquals(line1, line2);
-
- } while (line1 != null || line2 != null);
-
- r1.close();
- r2.close();
- }
-
- class FileRecordWriter implements ICuboidWriter {
-
- File file;
- PrintWriter writer;
-
- FileRecordWriter() throws IOException {
- file = File.createTempFile("DoggedCubeBuilderTest_", ".data");
- writer = new PrintWriter(file, "UTF-8");
- }
-
- @Override
- public void write(long cuboidId, GTRecord record) throws IOException {
- writer.print(cuboidId);
- writer.print(", ");
- writer.print(record.toString());
- writer.println();
- }
-
- @Override
- public void flush() {
-
- }
-
- @Override
- public void close() {
- writer.close();
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderTest.java b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderTest.java
deleted file mode 100644
index 88573c6..0000000
--- a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderTest.java
+++ /dev/null
@@ -1,268 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.cube.inmemcubing;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.LocalFileMetadataTestCase;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
-import org.apache.kylin.dict.DictionaryGenerator;
-import org.apache.kylin.dict.IterableDictionaryValueEnumerator;
-import org.apache.kylin.dimension.Dictionary;
-import org.apache.kylin.gridtable.GTRecord;
-import org.apache.kylin.metadata.model.FunctionDesc;
-import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-/**
- */
-public class InMemCubeBuilderTest extends LocalFileMetadataTestCase {
-
- private static final Logger logger = LoggerFactory.getLogger(InMemCubeBuilderTest.class);
-
- private CubeInstance cube;
- private String flatTable;
- private Map<TblColRef, Dictionary<String>> dictionaryMap;
-
- private int nInpRows;
- private int nThreads;
-
- @Before
- public void before() throws IOException {
- createTestMetadata();
- }
-
- @After
- public void after() throws Exception {
- cleanupTestMetadata();
- }
-
- @Test
- public void testKylinCube() throws Exception {
- testBuild("test_kylin_cube_without_slr_left_join_empty", //
- "../examples/test_case_data/localmeta/data/flatten_data_for_without_slr_left_join.csv", 70000, 4);
- }
-
- @Test
- public void testSSBCube() throws Exception {
- testBuild("ssb", //
- "../examples/test_case_data/localmeta/data/kylin_intermediate_ssb_19920101000000_19920201000000.csv", 1000, 1);
- }
-
- public void testBuild(String cubeName, String flatTable, int nInpRows, int nThreads) throws Exception {
- KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
- CubeManager cubeManager = CubeManager.getInstance(kylinConfig);
-
- this.nInpRows = nInpRows;
- this.nThreads = nThreads;
-
- this.cube = cubeManager.getCube(cubeName);
- this.flatTable = flatTable;
- this.dictionaryMap = getDictionaryMap(cube, flatTable);
-
- testBuildInner();
- }
-
- private void testBuildInner() throws Exception {
-
- InMemCubeBuilder cubeBuilder = new InMemCubeBuilder(cube.getDescriptor(), dictionaryMap);
- //DoggedCubeBuilder cubeBuilder = new DoggedCubeBuilder(cube.getDescriptor(), dictionaryMap);
- cubeBuilder.setConcurrentThreads(nThreads);
-
- ArrayBlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(1000);
- ExecutorService executorService = Executors.newSingleThreadExecutor();
-
- try {
- // round 1
- {
- Future<?> future = executorService.submit(cubeBuilder.buildAsRunnable(queue, new ConsoleGTRecordWriter()));
- feedData(cube, flatTable, queue, nInpRows);
- future.get();
- }
-
- // round 2, zero input
- {
- Future<?> future = executorService.submit(cubeBuilder.buildAsRunnable(queue, new ConsoleGTRecordWriter()));
- feedData(cube, flatTable, queue, 0);
- future.get();
- }
-
- // round 3
- {
- Future<?> future = executorService.submit(cubeBuilder.buildAsRunnable(queue, new ConsoleGTRecordWriter()));
- feedData(cube, flatTable, queue, nInpRows);
- future.get();
- }
-
- } catch (Exception e) {
- logger.error("stream build failed", e);
- throw new IOException("Failed to build cube ", e);
- }
- }
-
- static void feedData(final CubeInstance cube, final String flatTable, ArrayBlockingQueue<List<String>> queue, int count) throws IOException, InterruptedException {
- feedData(cube, flatTable, queue, count, 0);
- }
-
- static void feedData(final CubeInstance cube, final String flatTable, ArrayBlockingQueue<List<String>> queue, int count, long randSeed) throws IOException, InterruptedException {
- CubeJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(cube.getDescriptor(), null);
- int nColumns = flatTableDesc.getColumnList().size();
-
- @SuppressWarnings("unchecked")
- Set<String>[] distinctSets = new Set[nColumns];
- for (int i = 0; i < nColumns; i++)
- distinctSets[i] = new TreeSet<String>();
-
- // get distinct values on each column
- List<String> lines = FileUtils.readLines(new File(flatTable), "UTF-8");
- for (String line : lines) {
- String[] row = line.trim().split(",");
- assert row.length == nColumns;
- for (int i = 0; i < nColumns; i++)
- distinctSets[i].add(row[i]);
- }
-
- List<String[]> distincts = new ArrayList<String[]>();
- for (int i = 0; i < nColumns; i++) {
- distincts.add((String[]) distinctSets[i].toArray(new String[distinctSets[i].size()]));
- }
-
- Random rand = new Random();
- if (randSeed != 0)
- rand.setSeed(randSeed);
-
- // output with random data
- for (; count > 0; count--) {
- ArrayList<String> row = new ArrayList<String>(nColumns);
- for (int i = 0; i < nColumns; i++) {
- String[] candidates = distincts.get(i);
- row.add(candidates[rand.nextInt(candidates.length)]);
- }
- queue.put(row);
- }
- queue.put(new ArrayList<String>(0));
- }
-
- static Map<TblColRef, Dictionary<String>> getDictionaryMap(CubeInstance cube, String flatTable) throws IOException {
- Map<TblColRef, Dictionary<String>> result = Maps.newHashMap();
- CubeDesc desc = cube.getDescriptor();
- CubeJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(desc, null);
- int nColumns = flatTableDesc.getColumnList().size();
-
- List<TblColRef> columns = Cuboid.getBaseCuboid(desc).getColumns();
- for (int c = 0; c < columns.size(); c++) {
- TblColRef col = columns.get(c);
- if (desc.getRowkey().isUseDictionary(col)) {
- logger.info("Building dictionary for " + col);
- List<byte[]> valueList = readValueList(flatTable, nColumns, flatTableDesc.getRowKeyColumnIndexes()[c]);
- Dictionary<String> dict = DictionaryGenerator.buildDictionaryFromValueEnumerator(col.getType(), new IterableDictionaryValueEnumerator(valueList));
- result.put(col, dict);
- }
- }
-
- for (int measureIdx = 0; measureIdx < cube.getDescriptor().getMeasures().size(); measureIdx++) {
- MeasureDesc measureDesc = cube.getDescriptor().getMeasures().get(measureIdx);
- FunctionDesc func = measureDesc.getFunction();
- List<TblColRef> dictCols = func.getMeasureType().getColumnsNeedDictionary(func);
- if (dictCols.isEmpty())
- continue;
-
- int[] flatTableIdx = flatTableDesc.getMeasureColumnIndexes()[measureIdx];
- List<TblColRef> paramCols = func.getParameter().getColRefs();
- for (int i = 0; i < paramCols.size(); i++) {
- TblColRef col = paramCols.get(i);
- if (dictCols.contains(col)) {
- int colIdxOnFlat = flatTableIdx[i];
- logger.info("Building dictionary for " + col);
- List<byte[]> valueList = readValueList(flatTable, nColumns, colIdxOnFlat);
- Dictionary<String> dict = DictionaryGenerator.buildDictionaryFromValueEnumerator(col.getType(), new IterableDictionaryValueEnumerator(valueList));
-
- result.put(col, dict);
- }
- }
- }
-
- return result;
- }
-
- private static List<byte[]> readValueList(String flatTable, int nColumns, int c) throws IOException {
- List<byte[]> result = Lists.newArrayList();
- List<String> lines = FileUtils.readLines(new File(flatTable), "UTF-8");
- for (String line : lines) {
- String[] row = line.trim().split(",");
- if (row.length != nColumns) {
- throw new IllegalStateException();
- }
- if (row[c] != null) {
- result.add(Bytes.toBytes(row[c]));
- }
- }
- return result;
- }
-
- class ConsoleGTRecordWriter implements ICuboidWriter {
-
- boolean verbose = false;
-
- @Override
- public void write(long cuboidId, GTRecord record) throws IOException {
- if (verbose)
- System.out.println(record.toString());
- }
-
- @Override
- public void flush() {
- if (verbose) {
- System.out.println("flush");
- }
- }
-
- @Override
- public void close() {
- if (verbose) {
- System.out.println("close");
- }
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java
index b1b5ee9..b411e96 100644
--- a/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java
@@ -101,11 +101,11 @@ public class DictGridTableTest {
ByteArray segmentEnd = enc(info, 0, "2015-01-15");
assertEquals(segmentStart, segmentStartX);
- GTScanRangePlanner planner = new GTScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), info.colRef(0));
{
LogicalTupleFilter filter = and(timeComp0, ageComp1);
- List<GTScanRange> r = planner.planScanRanges(filter);
+ GTScanRangePlanner planner = new GTScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), info.colRef(0),filter);
+ List<GTScanRange> r = planner.planScanRanges();
assertEquals(1, r.size());//scan range are [close,close]
assertEquals("[null, 10]-[1421193600000, 10]", r.get(0).toString());
assertEquals(1, r.get(0).fuzzyKeys.size());
@@ -113,29 +113,34 @@ public class DictGridTableTest {
}
{
LogicalTupleFilter filter = and(timeComp2, ageComp1);
- List<GTScanRange> r = planner.planScanRanges(filter);
+ GTScanRangePlanner planner = new GTScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), info.colRef(0),filter);
+ List<GTScanRange> r = planner.planScanRanges();
assertEquals(0, r.size());
}
{
LogicalTupleFilter filter = and(timeComp4, ageComp1);
- List<GTScanRange> r = planner.planScanRanges(filter);
+ GTScanRangePlanner planner = new GTScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), info.colRef(0),filter);
+ List<GTScanRange> r = planner.planScanRanges();
assertEquals(1, r.size());
}
{
LogicalTupleFilter filter = and(timeComp5, ageComp1);
- List<GTScanRange> r = planner.planScanRanges(filter);
+ GTScanRangePlanner planner = new GTScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), info.colRef(0),filter);
+ List<GTScanRange> r = planner.planScanRanges();
assertEquals(0, r.size());
}
{
LogicalTupleFilter filter = or(and(timeComp2, ageComp1), and(timeComp1, ageComp1), and(timeComp6, ageComp1));
- List<GTScanRange> r = planner.planScanRanges(filter);
+ GTScanRangePlanner planner = new GTScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), info.colRef(0),filter);
+ List<GTScanRange> r = planner.planScanRanges();
assertEquals(1, r.size());
assertEquals("[1421193600000, 10]-[null, 10]", r.get(0).toString());
assertEquals("[[10], [1421193600000, 10]]", r.get(0).fuzzyKeys.toString());
}
{
LogicalTupleFilter filter = or(timeComp2, timeComp1, timeComp6);
- List<GTScanRange> r = planner.planScanRanges(filter);
+ GTScanRangePlanner planner = new GTScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), info.colRef(0),filter);
+ List<GTScanRange> r = planner.planScanRanges();
assertEquals(1, r.size());
assertEquals("[1421193600000, null]-[null, null]", r.get(0).toString());
assertEquals(0, r.get(0).fuzzyKeys.size());
@@ -143,20 +148,23 @@ public class DictGridTableTest {
{
//skip FALSE filter
LogicalTupleFilter filter = and(ageComp1, ConstantTupleFilter.FALSE);
- List<GTScanRange> r = planner.planScanRanges(filter);
+ GTScanRangePlanner planner = new GTScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), info.colRef(0),filter);
+ List<GTScanRange> r = planner.planScanRanges();
assertEquals(0, r.size());
}
{
//TRUE or FALSE filter
LogicalTupleFilter filter = or(ConstantTupleFilter.TRUE, ConstantTupleFilter.FALSE);
- List<GTScanRange> r = planner.planScanRanges(filter);
+ GTScanRangePlanner planner = new GTScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), info.colRef(0),filter);
+ List<GTScanRange> r = planner.planScanRanges();
assertEquals(1, r.size());
assertEquals("[null, null]-[null, null]", r.get(0).toString());
}
{
//TRUE or other filter
LogicalTupleFilter filter = or(ageComp1, ConstantTupleFilter.TRUE);
- List<GTScanRange> r = planner.planScanRanges(filter);
+ GTScanRangePlanner planner = new GTScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), info.colRef(0),filter);
+ List<GTScanRange> r = planner.planScanRanges();
assertEquals(1, r.size());
assertEquals("[null, null]-[null, null]", r.get(0).toString());
}
@@ -165,11 +173,11 @@ public class DictGridTableTest {
@Test
public void verifySegmentSkipping2() {
ByteArray segmentEnd = enc(info, 0, "2015-01-15");
- GTScanRangePlanner planner = new GTScanRangePlanner(info, Pair.newPair(new ByteArray(), segmentEnd), info.colRef(0));
{
LogicalTupleFilter filter = and(timeComp0, ageComp1);
- List<GTScanRange> r = planner.planScanRanges(filter);
+ GTScanRangePlanner planner = new GTScanRangePlanner(info, Pair.newPair(new ByteArray(), segmentEnd), info.colRef(0),filter);
+ List<GTScanRange> r = planner.planScanRanges();
assertEquals(1, r.size());//scan range are [close,close]
assertEquals("[null, 10]-[1421193600000, 10]", r.get(0).toString());
assertEquals(1, r.get(0).fuzzyKeys.size());
@@ -178,7 +186,8 @@ public class DictGridTableTest {
{
LogicalTupleFilter filter = and(timeComp5, ageComp1);
- List<GTScanRange> r = planner.planScanRanges(filter);
+ GTScanRangePlanner planner = new GTScanRangePlanner(info, Pair.newPair(new ByteArray(), segmentEnd), info.colRef(0),filter);
+ List<GTScanRange> r = planner.planScanRanges();
assertEquals(0, r.size());//scan range are [close,close]
}
}
@@ -186,12 +195,12 @@ public class DictGridTableTest {
@Test
public void verifyScanRangePlanner() {
- GTScanRangePlanner planner = new GTScanRangePlanner(info, null, null);
// flatten or-and & hbase fuzzy value
{
LogicalTupleFilter filter = and(timeComp1, or(ageComp1, ageComp2));
- List<GTScanRange> r = planner.planScanRanges(filter);
+ GTScanRangePlanner planner = new GTScanRangePlanner(info, null, null,filter);
+ List<GTScanRange> r = planner.planScanRanges();
assertEquals(1, r.size());
assertEquals("[1421193600000, 10]-[null, 20]", r.get(0).toString());
assertEquals("[[10], [20]]", r.get(0).fuzzyKeys.toString());
@@ -200,33 +209,38 @@ public class DictGridTableTest {
// pre-evaluate ever false
{
LogicalTupleFilter filter = and(timeComp1, timeComp2);
- List<GTScanRange> r = planner.planScanRanges(filter);
+ GTScanRangePlanner planner = new GTScanRangePlanner(info, null, null,filter);
+ List<GTScanRange> r = planner.planScanRanges();
assertEquals(0, r.size());
}
// pre-evaluate ever true
{
LogicalTupleFilter filter = or(timeComp1, ageComp4);
- List<GTScanRange> r = planner.planScanRanges(filter);
+ GTScanRangePlanner planner = new GTScanRangePlanner(info, null, null,filter);
+ List<GTScanRange> r = planner.planScanRanges();
assertEquals("[[null, null]-[null, null]]", r.toString());
}
// merge overlap range
{
LogicalTupleFilter filter = or(timeComp1, timeComp3);
- List<GTScanRange> r = planner.planScanRanges(filter);
+ GTScanRangePlanner planner = new GTScanRangePlanner(info, null, null,filter);
+ List<GTScanRange> r = planner.planScanRanges();
assertEquals("[[null, null]-[null, null]]", r.toString());
}
// merge too many ranges
{
LogicalTupleFilter filter = or(and(timeComp4, ageComp1), and(timeComp4, ageComp2), and(timeComp4, ageComp3));
- List<GTScanRange> r = planner.planScanRanges(filter);
+ GTScanRangePlanner planner = new GTScanRangePlanner(info, null, null,filter);
+ List<GTScanRange> r = planner.planScanRanges();
assertEquals(3, r.size());
assertEquals("[1421280000000, 10]-[1421280000000, 10]", r.get(0).toString());
assertEquals("[1421280000000, 20]-[1421280000000, 20]", r.get(1).toString());
assertEquals("[1421280000000, 30]-[1421280000000, 30]", r.get(2).toString());
- List<GTScanRange> r2 = planner.planScanRanges(filter, 2);
+ planner.setMaxScanRanges(2);
+ List<GTScanRange> r2 = planner.planScanRanges();
assertEquals("[[1421280000000, 10]-[1421280000000, 30]]", r2.toString());
}
}
@@ -269,7 +283,7 @@ public class DictGridTableTest {
GTScanRequest req = new GTScanRequest(info, null, null, setOf(0), setOf(3), new String[] { "sum" }, filter, true, 0);
// note the unEvaluatable column 1 in filter is added to group by
- assertEquals("GTScanRequest [range=[null, null]-[null, null], columns={0, 1, 3}, filterPushDown=AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], [null], [null]], aggrGroupBy={0, 1}, aggrMetrics={3}, aggrMetricsFuncs=[sum]]", req.toString());
+ assertEquals("GTScanRequest [range=[[null, null]-[null, null]], columns={0, 1, 3}, filterPushDown=AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], [null], [null]], aggrGroupBy={0, 1}, aggrMetrics={3}, aggrMetricsFuncs=[sum]]", req.toString());
doScanAndVerify(table, useDeserializedGTScanRequest(req), "[1421280000000, 20, null, 20, null]", "[1421280000000, 30, null, 10, null]", "[1421366400000, 20, null, 20, null]", "[1421366400000, 30, null, 20, null]", "[1421452800000, 10, null, 10, null]");
}
@@ -284,7 +298,7 @@ public class DictGridTableTest {
GTScanRequest req = new GTScanRequest(info, null, null, setOf(0), setOf(3), new String[] { "sum" }, filter, true, 0);
// note the evaluatable column 1 in filter is added to returned columns but not in group by
- assertEquals("GTScanRequest [range=[null, null]-[null, null], columns={0, 1, 3}, filterPushDown=AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], NULL.GT_MOCKUP_TABLE.1 GT [\\x00]], aggrGroupBy={0}, aggrMetrics={3}, aggrMetricsFuncs=[sum]]", req.toString());
+ assertEquals("GTScanRequest [range=[[null, null]-[null, null]], columns={0, 1, 3}, filterPushDown=AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], NULL.GT_MOCKUP_TABLE.1 GT [\\x00]], aggrGroupBy={0}, aggrMetrics={3}, aggrMetricsFuncs=[sum]]", req.toString());
doScanAndVerify(table, useDeserializedGTScanRequest(req), "[1421280000000, 20, null, 30, null]", "[1421366400000, 20, null, 40, null]");
}
[2/8] kylin git commit: KYLIN-1579 revise logging
Posted by ma...@apache.org.
KYLIN-1579 revise logging
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/b2a38612
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/b2a38612
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/b2a38612
Branch: refs/heads/master
Commit: b2a386126e075ac6cc2d15d0c1a42d720abed06a
Parents: 71d3f30
Author: Hongbin Ma <ma...@apache.org>
Authored: Tue Apr 12 15:27:45 2016 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Wed Apr 13 11:11:15 2016 +0800
----------------------------------------------------------------------
.../hbase/cube/v2/CubeHBaseEndpointRPC.java | 23 +++++++++++++-------
1 file changed, 15 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/b2a38612/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index 38041b3..81d5baa 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -342,28 +342,35 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
builder.setBehavior(toggle);
builder.setStartTime(System.currentTimeMillis());
builder.setTimeout(epResultItr.getTimeout());
+ String logHeader = "<sub-thread for GTScanRequest " + Integer.toHexString(System.identityHashCode(scanRequest)) + "> ";
Map<byte[], CubeVisitProtos.CubeVisitResponse> results;
try {
results = getResults(builder.build(), conn.getTable(cubeSeg.getStorageLocationIdentifier()), epRange.getFirst(), epRange.getSecond());
} catch (Throwable throwable) {
- throw new RuntimeException("<sub-thread for GTScanRequest " + Integer.toHexString(System.identityHashCode(scanRequest)) + "> " + "Error when visiting cubes by endpoint", throwable);
+ throw new RuntimeException(logHeader + "Error when visiting cubes by endpoint", throwable);
}
+ boolean abnormalFinish = false;
for (Map.Entry<byte[], CubeVisitProtos.CubeVisitResponse> result : results.entrySet()) {
totalScannedCount.addAndGet(result.getValue().getStats().getScannedRowCount());
- logger.info("<sub-thread for GTScanRequest " + Integer.toHexString(System.identityHashCode(scanRequest)) + "> " + getStatsString(result));
+ logger.info(logHeader + getStatsString(result));
if (result.getValue().getStats().getNormalComplete() != 1) {
- throw new RuntimeException("The coprocessor thread stopped itself due to scan timeout.");
+ abnormalFinish = true;
}
-
- try {
- epResultItr.append(CompressionUtils.decompress(HBaseZeroCopyByteString.zeroCopyGetBytes(result.getValue().getCompressedRows())));
- } catch (IOException | DataFormatException e) {
- throw new RuntimeException("Error when decompressing", e);
+ else {
+ try {
+ epResultItr.append(CompressionUtils.decompress(HBaseZeroCopyByteString.zeroCopyGetBytes(result.getValue().getCompressedRows())));
+ } catch (IOException | DataFormatException e) {
+ throw new RuntimeException(logHeader + "Error when decompressing", e);
+ }
}
}
+
+ if (abnormalFinish) {
+ throw new RuntimeException(logHeader + "The coprocessor thread stopped itself due to scan timeout, failing current query...");
+ }
}
});
}
[7/8] kylin git commit: KYLIN-1579 IT preparation classes like
BuildCubeWithEngine should exit with status code upon build exception
Posted by ma...@apache.org.
KYLIN-1579 IT preparation classes like BuildCubeWithEngine should exit with status code upon build exception
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/71d3f303
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/71d3f303
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/71d3f303
Branch: refs/heads/master
Commit: 71d3f303058afa66608440ae3db91e32e921740c
Parents: 11be1e3
Author: Hongbin Ma <ma...@apache.org>
Authored: Tue Apr 12 11:35:54 2016 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Wed Apr 13 11:11:15 2016 +0800
----------------------------------------------------------------------
.../kylin/provision/BuildCubeWithEngine.java | 26 ++++++++++--------
.../kylin/provision/BuildCubeWithStream.java | 28 ++++++++++++--------
.../kylin/provision/BuildIIWithStream.java | 22 ++++++++-------
.../coprocessor/endpoint/CubeVisitService.java | 4 +--
4 files changed, 47 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/71d3f303/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
index e1cbe1f..71bb34f 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
@@ -41,7 +41,6 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.util.ToolRunner;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.AbstractKylinTestCase;
import org.apache.kylin.common.util.ClassUtil;
import org.apache.kylin.common.util.HBaseMetadataTestCase;
import org.apache.kylin.common.util.Pair;
@@ -76,15 +75,20 @@ public class BuildCubeWithEngine {
private static final Log logger = LogFactory.getLog(BuildCubeWithEngine.class);
public static void main(String[] args) throws Exception {
- beforeClass();
-
- BuildCubeWithEngine buildCubeWithEngine = new BuildCubeWithEngine();
- buildCubeWithEngine.before();
- buildCubeWithEngine.build();
- logger.info("Build is done");
- afterClass();
- logger.info("Going to exit");
- System.exit(0);
+ try {
+ beforeClass();
+
+ BuildCubeWithEngine buildCubeWithEngine = new BuildCubeWithEngine();
+ buildCubeWithEngine.before();
+ buildCubeWithEngine.build();
+ logger.info("Build is done");
+ afterClass();
+ logger.info("Going to exit");
+ System.exit(0);
+ } catch (Exception e) {
+ logger.error("error", e);
+ System.exit(1);
+ }
}
public static void beforeClass() throws Exception {
@@ -99,7 +103,7 @@ public class BuildCubeWithEngine {
logger.info("Will not use fast build mode");
}
- System.setProperty(KylinConfig.KYLIN_CONF,HBaseMetadataTestCase.SANDBOX_TEST_DATA);
+ System.setProperty(KylinConfig.KYLIN_CONF, HBaseMetadataTestCase.SANDBOX_TEST_DATA);
if (StringUtils.isEmpty(System.getProperty("hdp.version"))) {
throw new RuntimeException("No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.2.4.2-2");
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/71d3f303/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
index 99da26f..d8a9c21 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
@@ -23,7 +23,6 @@ import java.util.UUID;
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.AbstractKylinTestCase;
import org.apache.kylin.common.util.ClassUtil;
import org.apache.kylin.common.util.DateFormat;
import org.apache.kylin.common.util.HBaseMetadataTestCase;
@@ -53,21 +52,28 @@ public class BuildCubeWithStream {
private KylinConfig kylinConfig;
public static void main(String[] args) throws Exception {
- beforeClass();
- BuildCubeWithStream buildCubeWithStream = new BuildCubeWithStream();
- buildCubeWithStream.before();
- buildCubeWithStream.build();
- logger.info("Build is done");
- afterClass();
- logger.info("Going to exit");
- System.exit(0);
-
+
+ try {
+ beforeClass();
+
+ BuildCubeWithStream buildCubeWithStream = new BuildCubeWithStream();
+ buildCubeWithStream.before();
+ buildCubeWithStream.build();
+ logger.info("Build is done");
+ afterClass();
+ logger.info("Going to exit");
+ System.exit(0);
+ } catch (Exception e) {
+ logger.error("error", e);
+ System.exit(1);
+ }
+
}
public static void beforeClass() throws Exception {
logger.info("Adding to classpath: " + new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
- System.setProperty(KylinConfig.KYLIN_CONF,HBaseMetadataTestCase.SANDBOX_TEST_DATA);
+ System.setProperty(KylinConfig.KYLIN_CONF, HBaseMetadataTestCase.SANDBOX_TEST_DATA);
if (StringUtils.isEmpty(System.getProperty("hdp.version"))) {
throw new RuntimeException("No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.2.4.2-2");
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/71d3f303/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithStream.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithStream.java
index ace1a2f..6d077e7 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithStream.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithStream.java
@@ -50,7 +50,6 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.util.ToolRunner;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.AbstractKylinTestCase;
import org.apache.kylin.common.util.ClassUtil;
import org.apache.kylin.common.util.DateFormat;
import org.apache.kylin.common.util.HBaseMetadataTestCase;
@@ -93,14 +92,19 @@ public class BuildIIWithStream {
private KylinConfig kylinConfig;
public static void main(String[] args) throws Exception {
- beforeClass();
- BuildIIWithStream buildCubeWithEngine = new BuildIIWithStream();
- buildCubeWithEngine.before();
- buildCubeWithEngine.build();
- logger.info("Build is done");
- afterClass();
- logger.info("Going to exit");
- System.exit(0);
+ try {
+ beforeClass();
+ BuildIIWithStream buildCubeWithEngine = new BuildIIWithStream();
+ buildCubeWithEngine.before();
+ buildCubeWithEngine.build();
+ logger.info("Build is done");
+ afterClass();
+ logger.info("Going to exit");
+ System.exit(0);
+ } catch (Exception e) {
+ logger.error("error", e);
+ System.exit(1);
+ }
}
public static void beforeClass() throws Exception {
http://git-wip-us.apache.org/repos/asf/kylin/blob/71d3f303/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
index 5158b33..2f1bb9b 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
@@ -227,7 +227,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
}
final MutableBoolean normalComplete = new MutableBoolean(true);
- final long startTime = request.getStartTime();
+ final long startTime = this.serviceStartTime;//request.getStartTime();
final long timeout = (long) (request.getTimeout() * 0.95);
final CellListIterator cellListIterator = new CellListIterator() {
@@ -294,7 +294,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
allRows = new byte[0];
}
compressedAllRows = CompressionUtils.compress(allRows);
-
+
appendProfileInfo(sb, "compress done");
OperatingSystemMXBean operatingSystemMXBean = (OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean();
[6/8] kylin git commit: KYLIN-1578 Coprocessor thread voluntarily
stop itself when it reaches timeout
Posted by ma...@apache.org.
KYLIN-1578 Coprocessor thread voluntarily stop itself when it reaches timeout
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/11be1e38
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/11be1e38
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/11be1e38
Branch: refs/heads/master
Commit: 11be1e3826cdea8db8df8975ebdff5cf1d93444f
Parents: b26b248
Author: Hongbin Ma <ma...@apache.org>
Authored: Tue Apr 12 09:47:52 2016 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Wed Apr 13 11:11:15 2016 +0800
----------------------------------------------------------------------
.../apache/kylin/gridtable/GTScanRequest.java | 2 +
.../hbase/cube/v2/CubeHBaseEndpointRPC.java | 16 +-
.../coprocessor/endpoint/CubeVisitService.java | 58 ++-
.../endpoint/generated/CubeVisitProtos.java | 436 +++++++++++++++++--
.../endpoint/protobuf/CubeVisit.proto | 3 +
5 files changed, 463 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/11be1e38/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
index c4abb57..5681057 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
@@ -24,6 +24,7 @@ import java.util.Arrays;
import java.util.List;
import java.util.Set;
+import org.apache.commons.io.IOUtils;
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.BytesSerializer;
import org.apache.kylin.common.util.BytesUtil;
@@ -187,6 +188,7 @@ public class GTScanRequest {
}
}
System.out.println("Meaningless byte is " + meaninglessByte);
+ IOUtils.closeQuietly(scanner);
return scanned;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/11be1e38/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index 1d3da28..38041b3 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -162,6 +162,10 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
throw new RuntimeException("error when waiting queue", e);
}
}
+
+ public long getTimeout() {
+ return timeout;
+ }
}
static class EndpointResultsAsGTScanner implements IGTScanner {
@@ -313,7 +317,6 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
logger.debug("Serialized scanRequestBytes {} bytes, rawScanBytesString {} bytes", scanRequestByteString.size(), rawScanByteString.size());
-
logger.info("The scan {} for segment {} is as below, shard part of start/end key is set to 0", Integer.toHexString(System.identityHashCode(scanRequest)), cubeSeg);
for (RawScan rs : rawScans) {
logScan(rs, cubeSeg.getStorageLocationIdentifier());
@@ -323,7 +326,6 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
final AtomicInteger totalScannedCount = new AtomicInteger(0);
final ExpectedSizeIterator epResultItr = new ExpectedSizeIterator(shardNum);
- final String currentThreadName = Thread.currentThread().getName();
for (final Pair<byte[], byte[]> epRange : getEPKeyRanges(cuboidBaseShard, shardNum, totalShards)) {
final ByteString finalScanRequestByteString = scanRequestByteString;
@@ -338,6 +340,8 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
}
builder.setRowkeyPreambleSize(cubeSeg.getRowKeyPreambleSize());
builder.setBehavior(toggle);
+ builder.setStartTime(System.currentTimeMillis());
+ builder.setTimeout(epResultItr.getTimeout());
Map<byte[], CubeVisitProtos.CubeVisitResponse> results;
try {
@@ -348,7 +352,12 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
for (Map.Entry<byte[], CubeVisitProtos.CubeVisitResponse> result : results.entrySet()) {
totalScannedCount.addAndGet(result.getValue().getStats().getScannedRowCount());
- logger.info("<sub-thread for GTScanRequest " + Integer.toHexString(System.identityHashCode(scanRequest)) + "> " + getStatsString(result));
+ logger.info("<sub-thread for GTScanRequest " + Integer.toHexString(System.identityHashCode(scanRequest)) + "> " + getStatsString(result));
+
+ if (result.getValue().getStats().getNormalComplete() != 1) {
+ throw new RuntimeException("The coprocessor thread stopped itself due to scan timeout.");
+ }
+
try {
epResultItr.append(CompressionUtils.decompress(HBaseZeroCopyByteString.zeroCopyGetBytes(result.getValue().getCompressedRows())));
} catch (IOException | DataFormatException e) {
@@ -371,6 +380,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
sb.append("Time elapsed in EP: ").append(stats.getServiceEndTime() - stats.getServiceStartTime()).append("(ms). ");
sb.append("Server CPU usage: ").append(stats.getSystemCpuLoad()).append(", server physical mem left: ").append(stats.getFreePhysicalMemorySize()).append(", server swap mem left:").append(stats.getFreeSwapSpaceSize()).append(".");
sb.append("Etc message: ").append(stats.getEtcMsg()).append(".");
+ sb.append("Normal Complete: ").append(stats.getNormalComplete() == 1).append(".");
return sb.toString();
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/11be1e38/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
index 9e8e251..5158b33 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
@@ -28,6 +28,7 @@ import java.util.List;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.ArrayUtils;
+import org.apache.commons.lang.mutable.MutableBoolean;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
@@ -152,17 +153,17 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
}
private void appendProfileInfo(StringBuilder sb, String info) {
- sb.append(System.currentTimeMillis() - this.serviceStartTime);
if (info != null) {
- sb.append(":").append(info);
+ sb.append(info);
}
+ sb.append("@" + (System.currentTimeMillis() - this.serviceStartTime));
sb.append(",");
}
@Override
public void visitCube(RpcController controller, CubeVisitProtos.CubeVisitRequest request, RpcCallback<CubeVisitProtos.CubeVisitResponse> done) {
- RegionScanner innerScanner = null;
+ List<RegionScanner> regionScanners = Lists.newArrayList();
HRegion region = null;
StringBuilder sb = new StringBuilder();
@@ -182,6 +183,8 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
CoprocessorBehavior behavior = CoprocessorBehavior.valueOf(request.getBehavior());
final List<RawScan> hbaseRawScans = deserializeRawScans(ByteBuffer.wrap(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getHbaseRawScan())));
+ appendProfileInfo(sb, "start latency: " + (this.serviceStartTime - request.getStartTime()));
+
MassInTupleFilter.VALUE_PROVIDER_FACTORY = new MassInValueProviderFactoryImpl(new MassInValueProviderFactoryImpl.DimEncAware() {
@Override
public DimensionEncoding getDimEnc(TblColRef col) {
@@ -190,6 +193,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
});
final List<InnerScannerAsIterator> cellListsForeachRawScan = Lists.newArrayList();
+
for (RawScan hbaseRawScan : hbaseRawScans) {
if (request.getRowkeyPreambleSize() - RowConstants.ROWKEY_CUBOIDID_LEN > 0) {
//if has shard, fill region shard to raw scan start/end
@@ -197,20 +201,23 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
}
Scan scan = CubeHBaseRPC.buildScan(hbaseRawScan);
- innerScanner = region.getScanner(scan);
+ RegionScanner innerScanner = region.getScanner(scan);
+ regionScanners.add(innerScanner);
InnerScannerAsIterator cellListIterator = new InnerScannerAsIterator(innerScanner);
cellListsForeachRawScan.add(cellListIterator);
}
-
+
final Iterator<List<Cell>> allCellLists = Iterators.concat(cellListsForeachRawScan.iterator());
if (behavior.ordinal() < CoprocessorBehavior.SCAN.ordinal()) {
//this is only for CoprocessorBehavior.RAW_SCAN case to profile hbase scan speed
List<Cell> temp = Lists.newArrayList();
int counter = 0;
- while (innerScanner.nextRaw(temp)) {
- counter++;
+ for (RegionScanner innerScanner : regionScanners) {
+ while (innerScanner.nextRaw(temp)) {
+ counter++;
+ }
}
appendProfileInfo(sb, "scanned " + counter);
}
@@ -219,7 +226,14 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
scanReq.setAggrCacheGB(0); // disable mem check if so told
}
- IGTStore store = new HBaseReadonlyStore(new CellListIterator() {
+ final MutableBoolean normalComplete = new MutableBoolean(true);
+ final long startTime = request.getStartTime();
+ final long timeout = (long) (request.getTimeout() * 0.95);
+
+ final CellListIterator cellListIterator = new CellListIterator() {
+
+ int counter = 0;
+
@Override
public void close() throws IOException {
for (CellListIterator closeable : cellListsForeachRawScan) {
@@ -229,6 +243,12 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
@Override
public boolean hasNext() {
+ if (counter++ % 1000 == 1) {
+ if (System.currentTimeMillis() - startTime > timeout) {
+ normalComplete.setValue(false);
+ return false;
+ }
+ }
return allCellLists.hasNext();
}
@@ -241,7 +261,9 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
public void remove() {
throw new UnsupportedOperationException();
}
- }, scanReq, hbaseRawScans.get(0).hbaseColumns, hbaseColumnsToGT, request.getRowkeyPreambleSize());
+ };
+
+ IGTStore store = new HBaseReadonlyStore(cellListIterator, scanReq, hbaseRawScans.get(0).hbaseColumns, hbaseColumnsToGT, request.getRowkeyPreambleSize());
IGTScanner rawScanner = store.scan(scanReq);
IGTScanner finalScanner = scanReq.decorateScanner(rawScanner, //
@@ -260,13 +282,19 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
outputStream.write(buffer.array(), buffer.arrayOffset() - buffer.position(), buffer.remaining());
finalRowCount++;
}
+ finalScanner.close();
appendProfileInfo(sb, "agg done");
//outputStream.close() is not necessary
- allRows = outputStream.toByteArray();
- byte[] compressedAllRows = CompressionUtils.compress(allRows);
-
+ byte[] compressedAllRows;
+ if (normalComplete.booleanValue()) {
+ allRows = outputStream.toByteArray();
+ } else {
+ allRows = new byte[0];
+ }
+ compressedAllRows = CompressionUtils.compress(allRows);
+
appendProfileInfo(sb, "compress done");
OperatingSystemMXBean operatingSystemMXBean = (OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean();
@@ -289,7 +317,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
setFreeSwapSpaceSize(freeSwapSpaceSize).//
setHostname(InetAddress.getLocalHost().getHostName()).//
setEtcMsg(sb.toString()).//
- build())
+ setNormalComplete(normalComplete.booleanValue() ? 1 : 0).build())
.//
build());
@@ -297,7 +325,9 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
logger.error(ioe.toString());
ResponseConverter.setControllerException(controller, ioe);
} finally {
- IOUtils.closeQuietly(innerScanner);
+ for (RegionScanner innerScanner : regionScanners) {
+ IOUtils.closeQuietly(innerScanner);
+ }
if (region != null) {
try {
region.closeRegionOperation();
http://git-wip-us.apache.org/repos/asf/kylin/blob/11be1e38/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java
index 6e3e2bb..53393e8 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java
@@ -1,21 +1,3 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
// Generated by the protocol buffer compiler. DO NOT EDIT!
// source: storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto
@@ -98,6 +80,42 @@ public final class CubeVisitProtos {
*/
org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitRequest.IntListOrBuilder getHbaseColumnsToGTOrBuilder(
int index);
+
+ // required int64 startTime = 6;
+ /**
+ * <code>required int64 startTime = 6;</code>
+ *
+ * <pre>
+ *when client start the request
+ * </pre>
+ */
+ boolean hasStartTime();
+ /**
+ * <code>required int64 startTime = 6;</code>
+ *
+ * <pre>
+ *when client start the request
+ * </pre>
+ */
+ long getStartTime();
+
+ // required int64 timeout = 7;
+ /**
+ * <code>required int64 timeout = 7;</code>
+ *
+ * <pre>
+ *how long client will wait
+ * </pre>
+ */
+ boolean hasTimeout();
+ /**
+ * <code>required int64 timeout = 7;</code>
+ *
+ * <pre>
+ *how long client will wait
+ * </pre>
+ */
+ long getTimeout();
}
/**
* Protobuf type {@code CubeVisitRequest}
@@ -178,6 +196,16 @@ public final class CubeVisitProtos {
hbaseColumnsToGT_.add(input.readMessage(org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitRequest.IntList.PARSER, extensionRegistry));
break;
}
+ case 48: {
+ bitField0_ |= 0x00000010;
+ startTime_ = input.readInt64();
+ break;
+ }
+ case 56: {
+ bitField0_ |= 0x00000020;
+ timeout_ = input.readInt64();
+ break;
+ }
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -852,12 +880,62 @@ public final class CubeVisitProtos {
return hbaseColumnsToGT_.get(index);
}
+ // required int64 startTime = 6;
+ public static final int STARTTIME_FIELD_NUMBER = 6;
+ private long startTime_;
+ /**
+ * <code>required int64 startTime = 6;</code>
+ *
+ * <pre>
+ *when client start the request
+ * </pre>
+ */
+ public boolean hasStartTime() {
+ return ((bitField0_ & 0x00000010) == 0x00000010);
+ }
+ /**
+ * <code>required int64 startTime = 6;</code>
+ *
+ * <pre>
+ *when client start the request
+ * </pre>
+ */
+ public long getStartTime() {
+ return startTime_;
+ }
+
+ // required int64 timeout = 7;
+ public static final int TIMEOUT_FIELD_NUMBER = 7;
+ private long timeout_;
+ /**
+ * <code>required int64 timeout = 7;</code>
+ *
+ * <pre>
+ *how long client will wait
+ * </pre>
+ */
+ public boolean hasTimeout() {
+ return ((bitField0_ & 0x00000020) == 0x00000020);
+ }
+ /**
+ * <code>required int64 timeout = 7;</code>
+ *
+ * <pre>
+ *how long client will wait
+ * </pre>
+ */
+ public long getTimeout() {
+ return timeout_;
+ }
+
private void initFields() {
behavior_ = "";
gtScanRequest_ = com.google.protobuf.ByteString.EMPTY;
hbaseRawScan_ = com.google.protobuf.ByteString.EMPTY;
rowkeyPreambleSize_ = 0;
hbaseColumnsToGT_ = java.util.Collections.emptyList();
+ startTime_ = 0L;
+ timeout_ = 0L;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -880,6 +958,14 @@ public final class CubeVisitProtos {
memoizedIsInitialized = 0;
return false;
}
+ if (!hasStartTime()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
+ if (!hasTimeout()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
memoizedIsInitialized = 1;
return true;
}
@@ -902,6 +988,12 @@ public final class CubeVisitProtos {
for (int i = 0; i < hbaseColumnsToGT_.size(); i++) {
output.writeMessage(5, hbaseColumnsToGT_.get(i));
}
+ if (((bitField0_ & 0x00000010) == 0x00000010)) {
+ output.writeInt64(6, startTime_);
+ }
+ if (((bitField0_ & 0x00000020) == 0x00000020)) {
+ output.writeInt64(7, timeout_);
+ }
getUnknownFields().writeTo(output);
}
@@ -931,6 +1023,14 @@ public final class CubeVisitProtos {
size += com.google.protobuf.CodedOutputStream
.computeMessageSize(5, hbaseColumnsToGT_.get(i));
}
+ if (((bitField0_ & 0x00000010) == 0x00000010)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeInt64Size(6, startTime_);
+ }
+ if (((bitField0_ & 0x00000020) == 0x00000020)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeInt64Size(7, timeout_);
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@@ -976,6 +1076,16 @@ public final class CubeVisitProtos {
}
result = result && getHbaseColumnsToGTList()
.equals(other.getHbaseColumnsToGTList());
+ result = result && (hasStartTime() == other.hasStartTime());
+ if (hasStartTime()) {
+ result = result && (getStartTime()
+ == other.getStartTime());
+ }
+ result = result && (hasTimeout() == other.hasTimeout());
+ if (hasTimeout()) {
+ result = result && (getTimeout()
+ == other.getTimeout());
+ }
result = result &&
getUnknownFields().equals(other.getUnknownFields());
return result;
@@ -1009,6 +1119,14 @@ public final class CubeVisitProtos {
hash = (37 * hash) + HBASECOLUMNSTOGT_FIELD_NUMBER;
hash = (53 * hash) + getHbaseColumnsToGTList().hashCode();
}
+ if (hasStartTime()) {
+ hash = (37 * hash) + STARTTIME_FIELD_NUMBER;
+ hash = (53 * hash) + hashLong(getStartTime());
+ }
+ if (hasTimeout()) {
+ hash = (37 * hash) + TIMEOUT_FIELD_NUMBER;
+ hash = (53 * hash) + hashLong(getTimeout());
+ }
hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash;
return hash;
@@ -1133,6 +1251,10 @@ public final class CubeVisitProtos {
} else {
hbaseColumnsToGTBuilder_.clear();
}
+ startTime_ = 0L;
+ bitField0_ = (bitField0_ & ~0x00000020);
+ timeout_ = 0L;
+ bitField0_ = (bitField0_ & ~0x00000040);
return this;
}
@@ -1186,6 +1308,14 @@ public final class CubeVisitProtos {
} else {
result.hbaseColumnsToGT_ = hbaseColumnsToGTBuilder_.build();
}
+ if (((from_bitField0_ & 0x00000020) == 0x00000020)) {
+ to_bitField0_ |= 0x00000010;
+ }
+ result.startTime_ = startTime_;
+ if (((from_bitField0_ & 0x00000040) == 0x00000040)) {
+ to_bitField0_ |= 0x00000020;
+ }
+ result.timeout_ = timeout_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@@ -1242,6 +1372,12 @@ public final class CubeVisitProtos {
}
}
}
+ if (other.hasStartTime()) {
+ setStartTime(other.getStartTime());
+ }
+ if (other.hasTimeout()) {
+ setTimeout(other.getTimeout());
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@@ -1263,6 +1399,14 @@ public final class CubeVisitProtos {
return false;
}
+ if (!hasStartTime()) {
+
+ return false;
+ }
+ if (!hasTimeout()) {
+
+ return false;
+ }
return true;
}
@@ -1704,6 +1848,104 @@ public final class CubeVisitProtos {
return hbaseColumnsToGTBuilder_;
}
+ // required int64 startTime = 6;
+ private long startTime_ ;
+ /**
+ * <code>required int64 startTime = 6;</code>
+ *
+ * <pre>
+ *when client start the request
+ * </pre>
+ */
+ public boolean hasStartTime() {
+ return ((bitField0_ & 0x00000020) == 0x00000020);
+ }
+ /**
+ * <code>required int64 startTime = 6;</code>
+ *
+ * <pre>
+ *when client start the request
+ * </pre>
+ */
+ public long getStartTime() {
+ return startTime_;
+ }
+ /**
+ * <code>required int64 startTime = 6;</code>
+ *
+ * <pre>
+ *when client start the request
+ * </pre>
+ */
+ public Builder setStartTime(long value) {
+ bitField0_ |= 0x00000020;
+ startTime_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>required int64 startTime = 6;</code>
+ *
+ * <pre>
+ *when client start the request
+ * </pre>
+ */
+ public Builder clearStartTime() {
+ bitField0_ = (bitField0_ & ~0x00000020);
+ startTime_ = 0L;
+ onChanged();
+ return this;
+ }
+
+ // required int64 timeout = 7;
+ private long timeout_ ;
+ /**
+ * <code>required int64 timeout = 7;</code>
+ *
+ * <pre>
+ *how long client will wait
+ * </pre>
+ */
+ public boolean hasTimeout() {
+ return ((bitField0_ & 0x00000040) == 0x00000040);
+ }
+ /**
+ * <code>required int64 timeout = 7;</code>
+ *
+ * <pre>
+ *how long client will wait
+ * </pre>
+ */
+ public long getTimeout() {
+ return timeout_;
+ }
+ /**
+ * <code>required int64 timeout = 7;</code>
+ *
+ * <pre>
+ *how long client will wait
+ * </pre>
+ */
+ public Builder setTimeout(long value) {
+ bitField0_ |= 0x00000040;
+ timeout_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>required int64 timeout = 7;</code>
+ *
+ * <pre>
+ *how long client will wait
+ * </pre>
+ */
+ public Builder clearTimeout() {
+ bitField0_ = (bitField0_ & ~0x00000040);
+ timeout_ = 0L;
+ onChanged();
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:CubeVisitRequest)
}
@@ -1952,6 +2194,24 @@ public final class CubeVisitProtos {
*/
com.google.protobuf.ByteString
getEtcMsgBytes();
+
+ // optional int32 normalComplete = 10;
+ /**
+ * <code>optional int32 normalComplete = 10;</code>
+ *
+ * <pre>
+ *when time outs, normalComplete will be false
+ * </pre>
+ */
+ boolean hasNormalComplete();
+ /**
+ * <code>optional int32 normalComplete = 10;</code>
+ *
+ * <pre>
+ *when time outs, normalComplete will be false
+ * </pre>
+ */
+ int getNormalComplete();
}
/**
* Protobuf type {@code CubeVisitResponse.Stats}
@@ -2049,6 +2309,11 @@ public final class CubeVisitProtos {
etcMsg_ = input.readBytes();
break;
}
+ case 80: {
+ bitField0_ |= 0x00000200;
+ normalComplete_ = input.readInt32();
+ break;
+ }
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -2287,6 +2552,30 @@ public final class CubeVisitProtos {
}
}
+ // optional int32 normalComplete = 10;
+ public static final int NORMALCOMPLETE_FIELD_NUMBER = 10;
+ private int normalComplete_;
+ /**
+ * <code>optional int32 normalComplete = 10;</code>
+ *
+ * <pre>
+ *when time outs, normalComplete will be false
+ * </pre>
+ */
+ public boolean hasNormalComplete() {
+ return ((bitField0_ & 0x00000200) == 0x00000200);
+ }
+ /**
+ * <code>optional int32 normalComplete = 10;</code>
+ *
+ * <pre>
+ *when time outs, normalComplete will be false
+ * </pre>
+ */
+ public int getNormalComplete() {
+ return normalComplete_;
+ }
+
private void initFields() {
serviceStartTime_ = 0L;
serviceEndTime_ = 0L;
@@ -2297,6 +2586,7 @@ public final class CubeVisitProtos {
freeSwapSpaceSize_ = 0D;
hostname_ = "";
etcMsg_ = "";
+ normalComplete_ = 0;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -2337,6 +2627,9 @@ public final class CubeVisitProtos {
if (((bitField0_ & 0x00000100) == 0x00000100)) {
output.writeBytes(9, getEtcMsgBytes());
}
+ if (((bitField0_ & 0x00000200) == 0x00000200)) {
+ output.writeInt32(10, normalComplete_);
+ }
getUnknownFields().writeTo(output);
}
@@ -2382,6 +2675,10 @@ public final class CubeVisitProtos {
size += com.google.protobuf.CodedOutputStream
.computeBytesSize(9, getEtcMsgBytes());
}
+ if (((bitField0_ & 0x00000200) == 0x00000200)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeInt32Size(10, normalComplete_);
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@@ -2447,6 +2744,11 @@ public final class CubeVisitProtos {
result = result && getEtcMsg()
.equals(other.getEtcMsg());
}
+ result = result && (hasNormalComplete() == other.hasNormalComplete());
+ if (hasNormalComplete()) {
+ result = result && (getNormalComplete()
+ == other.getNormalComplete());
+ }
result = result &&
getUnknownFields().equals(other.getUnknownFields());
return result;
@@ -2499,6 +2801,10 @@ public final class CubeVisitProtos {
hash = (37 * hash) + ETCMSG_FIELD_NUMBER;
hash = (53 * hash) + getEtcMsg().hashCode();
}
+ if (hasNormalComplete()) {
+ hash = (37 * hash) + NORMALCOMPLETE_FIELD_NUMBER;
+ hash = (53 * hash) + getNormalComplete();
+ }
hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash;
return hash;
@@ -2626,6 +2932,8 @@ public final class CubeVisitProtos {
bitField0_ = (bitField0_ & ~0x00000080);
etcMsg_ = "";
bitField0_ = (bitField0_ & ~0x00000100);
+ normalComplete_ = 0;
+ bitField0_ = (bitField0_ & ~0x00000200);
return this;
}
@@ -2690,6 +2998,10 @@ public final class CubeVisitProtos {
to_bitField0_ |= 0x00000100;
}
result.etcMsg_ = etcMsg_;
+ if (((from_bitField0_ & 0x00000200) == 0x00000200)) {
+ to_bitField0_ |= 0x00000200;
+ }
+ result.normalComplete_ = normalComplete_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@@ -2737,6 +3049,9 @@ public final class CubeVisitProtos {
etcMsg_ = other.etcMsg_;
onChanged();
}
+ if (other.hasNormalComplete()) {
+ setNormalComplete(other.getNormalComplete());
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@@ -3143,6 +3458,55 @@ public final class CubeVisitProtos {
return this;
}
+ // optional int32 normalComplete = 10;
+ private int normalComplete_ ;
+ /**
+ * <code>optional int32 normalComplete = 10;</code>
+ *
+ * <pre>
+ *when time outs, normalComplete will be false
+ * </pre>
+ */
+ public boolean hasNormalComplete() {
+ return ((bitField0_ & 0x00000200) == 0x00000200);
+ }
+ /**
+ * <code>optional int32 normalComplete = 10;</code>
+ *
+ * <pre>
+ *when time outs, normalComplete will be false
+ * </pre>
+ */
+ public int getNormalComplete() {
+ return normalComplete_;
+ }
+ /**
+ * <code>optional int32 normalComplete = 10;</code>
+ *
+ * <pre>
+ *when time outs, normalComplete will be false
+ * </pre>
+ */
+ public Builder setNormalComplete(int value) {
+ bitField0_ |= 0x00000200;
+ normalComplete_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional int32 normalComplete = 10;</code>
+ *
+ * <pre>
+ *when time outs, normalComplete will be false
+ * </pre>
+ */
+ public Builder clearNormalComplete() {
+ bitField0_ = (bitField0_ & ~0x00000200);
+ normalComplete_ = 0;
+ onChanged();
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:CubeVisitResponse.Stats)
}
@@ -3936,24 +4300,26 @@ public final class CubeVisitProtos {
java.lang.String[] descriptorData = {
"\npstorage-hbase/src/main/java/org/apache" +
"/kylin/storage/hbase/cube/v2/coprocessor" +
- "/endpoint/protobuf/CubeVisit.proto\"\273\001\n\020C" +
+ "/endpoint/protobuf/CubeVisit.proto\"\337\001\n\020C" +
"ubeVisitRequest\022\020\n\010behavior\030\001 \002(\t\022\025\n\rgtS" +
"canRequest\030\002 \002(\014\022\024\n\014hbaseRawScan\030\003 \002(\014\022\032" +
"\n\022rowkeyPreambleSize\030\004 \002(\005\0223\n\020hbaseColum" +
- "nsToGT\030\005 \003(\0132\031.CubeVisitRequest.IntList\032" +
- "\027\n\007IntList\022\014\n\004ints\030\001 \003(\005\"\271\002\n\021CubeVisitRe" +
- "sponse\022\026\n\016compressedRows\030\001 \002(\014\022\'\n\005stats\030" +
- "\002 \002(\0132\030.CubeVisitResponse.Stats\032\342\001\n\005Stat",
- "s\022\030\n\020serviceStartTime\030\001 \001(\003\022\026\n\016serviceEn" +
- "dTime\030\002 \001(\003\022\027\n\017scannedRowCount\030\003 \001(\005\022\032\n\022" +
- "aggregatedRowCount\030\004 \001(\005\022\025\n\rsystemCpuLoa" +
- "d\030\005 \001(\001\022\036\n\026freePhysicalMemorySize\030\006 \001(\001\022" +
- "\031\n\021freeSwapSpaceSize\030\007 \001(\001\022\020\n\010hostname\030\010" +
- " \001(\t\022\016\n\006etcMsg\030\t \001(\t2F\n\020CubeVisitService" +
- "\0222\n\tvisitCube\022\021.CubeVisitRequest\032\022.CubeV" +
- "isitResponseB`\nEorg.apache.kylin.storage" +
- ".hbase.cube.v2.coprocessor.endpoint.gene" +
- "ratedB\017CubeVisitProtosH\001\210\001\001\240\001\001"
+ "nsToGT\030\005 \003(\0132\031.CubeVisitRequest.IntList\022" +
+ "\021\n\tstartTime\030\006 \002(\003\022\017\n\007timeout\030\007 \002(\003\032\027\n\007I" +
+ "ntList\022\014\n\004ints\030\001 \003(\005\"\321\002\n\021CubeVisitRespon" +
+ "se\022\026\n\016compressedRows\030\001 \002(\014\022\'\n\005stats\030\002 \002(",
+ "\0132\030.CubeVisitResponse.Stats\032\372\001\n\005Stats\022\030\n" +
+ "\020serviceStartTime\030\001 \001(\003\022\026\n\016serviceEndTim" +
+ "e\030\002 \001(\003\022\027\n\017scannedRowCount\030\003 \001(\005\022\032\n\022aggr" +
+ "egatedRowCount\030\004 \001(\005\022\025\n\rsystemCpuLoad\030\005 " +
+ "\001(\001\022\036\n\026freePhysicalMemorySize\030\006 \001(\001\022\031\n\021f" +
+ "reeSwapSpaceSize\030\007 \001(\001\022\020\n\010hostname\030\010 \001(\t" +
+ "\022\016\n\006etcMsg\030\t \001(\t\022\026\n\016normalComplete\030\n \001(\005" +
+ "2F\n\020CubeVisitService\0222\n\tvisitCube\022\021.Cube" +
+ "VisitRequest\032\022.CubeVisitResponseB`\nEorg." +
+ "apache.kylin.storage.hbase.cube.v2.copro",
+ "cessor.endpoint.generatedB\017CubeVisitProt" +
+ "osH\001\210\001\001\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -3965,7 +4331,7 @@ public final class CubeVisitProtos {
internal_static_CubeVisitRequest_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_CubeVisitRequest_descriptor,
- new java.lang.String[] { "Behavior", "GtScanRequest", "HbaseRawScan", "RowkeyPreambleSize", "HbaseColumnsToGT", });
+ new java.lang.String[] { "Behavior", "GtScanRequest", "HbaseRawScan", "RowkeyPreambleSize", "HbaseColumnsToGT", "StartTime", "Timeout", });
internal_static_CubeVisitRequest_IntList_descriptor =
internal_static_CubeVisitRequest_descriptor.getNestedTypes().get(0);
internal_static_CubeVisitRequest_IntList_fieldAccessorTable = new
@@ -3983,7 +4349,7 @@ public final class CubeVisitProtos {
internal_static_CubeVisitResponse_Stats_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_CubeVisitResponse_Stats_descriptor,
- new java.lang.String[] { "ServiceStartTime", "ServiceEndTime", "ScannedRowCount", "AggregatedRowCount", "SystemCpuLoad", "FreePhysicalMemorySize", "FreeSwapSpaceSize", "Hostname", "EtcMsg", });
+ new java.lang.String[] { "ServiceStartTime", "ServiceEndTime", "ScannedRowCount", "AggregatedRowCount", "SystemCpuLoad", "FreePhysicalMemorySize", "FreeSwapSpaceSize", "Hostname", "EtcMsg", "NormalComplete", });
return null;
}
};
http://git-wip-us.apache.org/repos/asf/kylin/blob/11be1e38/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto
index 5b66a56..ecaad35 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto
@@ -35,6 +35,8 @@ message CubeVisitRequest {
required bytes hbaseRawScan = 3;
required int32 rowkeyPreambleSize = 4;
repeated IntList hbaseColumnsToGT = 5;
+ required int64 startTime = 6;//when client start the request
+ required int64 timeout = 7;//how long client will wait
message IntList {
repeated int32 ints = 1;
}
@@ -51,6 +53,7 @@ message CubeVisitResponse {
optional double freeSwapSpaceSize = 7;
optional string hostname = 8;
optional string etcMsg = 9;
+ optional int32 normalComplete =10;//when time outs, normalComplete will be false
}
required bytes compressedRows = 1;
required Stats stats = 2;
[3/8] kylin git commit: refactor
Posted by ma...@apache.org.
http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
index ef53cb7..938145b 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
@@ -19,30 +19,35 @@
package org.apache.kylin.storage.hbase.cube.v2;
import java.io.IOException;
+import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
-import javax.annotation.Nullable;
-
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.common.util.ShardingHash;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.dimension.DimensionEncoding;
import org.apache.kylin.gridtable.GTInfo;
import org.apache.kylin.gridtable.GTRecord;
import org.apache.kylin.gridtable.GTScanRequest;
import org.apache.kylin.gridtable.IGTScanner;
import org.apache.kylin.gridtable.IGTStore;
+import org.apache.kylin.metadata.filter.UDF.MassInTupleFilter;
+import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.storage.hbase.HBaseConnection;
+import org.apache.kylin.storage.hbase.cube.v2.filter.MassInValueProviderFactoryImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Function;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
@@ -84,53 +89,77 @@ public class CubeHBaseScanRPC extends CubeHBaseRPC {
}
}
- public CubeHBaseScanRPC(CubeSegment cubeSeg, Cuboid cuboid, GTInfo fullGTInfo) {
+ public CubeHBaseScanRPC(CubeSegment cubeSeg, Cuboid cuboid, final GTInfo fullGTInfo) {
super(cubeSeg, cuboid, fullGTInfo);
+ MassInTupleFilter.VALUE_PROVIDER_FACTORY = new MassInValueProviderFactoryImpl(new MassInValueProviderFactoryImpl.DimEncAware() {
+ @Override
+ public DimensionEncoding getDimEnc(TblColRef col) {
+ return fullGTInfo.getCodeSystem().getDimEnc(col.getColumnDesc().getZeroBasedIndex());
+ }
+ });
}
@Override
- public IGTScanner getGTScanner(final List<GTScanRequest> scanRequests) throws IOException {
- final List<IGTScanner> scanners = Lists.newArrayList();
- for (GTScanRequest request : scanRequests) {
- scanners.add(getGTScanner(request));
- }
+ public IGTScanner getGTScanner(final GTScanRequest scanRequest) throws IOException {
+ final IGTScanner scanner = getGTScannerInternal(scanRequest);
return new IGTScanner() {
@Override
public GTInfo getInfo() {
- return scanners.get(0).getInfo();
+ return scanner.getInfo();
}
@Override
public int getScannedRowCount() {
int sum = 0;
- for (IGTScanner s : scanners) {
- sum += s.getScannedRowCount();
- }
+ sum += scanner.getScannedRowCount();
return sum;
}
@Override
public void close() throws IOException {
- for (IGTScanner s : scanners) {
- s.close();
- }
+ scanner.close();
}
@Override
public Iterator<GTRecord> iterator() {
- return Iterators.concat(Iterators.transform(scanners.iterator(), new Function<IGTScanner, Iterator<GTRecord>>() {
- @Nullable
- @Override
- public Iterator<GTRecord> apply(IGTScanner input) {
- return input.iterator();
- }
- }));
+ return scanner.iterator();
}
};
}
- private IGTScanner getGTScanner(final GTScanRequest scanRequest) throws IOException {
+ //for non-sharding cases it will only return one byte[] with not shard at beginning
+ private List<byte[]> getRowKeysDifferentShards(byte[] halfCookedKey) {
+ final short cuboidShardNum = cubeSeg.getCuboidShardNum(cuboid.getId());
+
+ if (!cubeSeg.isEnableSharding()) {
+ return Lists.newArrayList(halfCookedKey);//not shard to append at head, so it is already well cooked
+ } else {
+ List<byte[]> ret = Lists.newArrayList();
+ for (short i = 0; i < cuboidShardNum; ++i) {
+ short shard = ShardingHash.normalize(cubeSeg.getCuboidBaseShard(cuboid.getId()), i, cubeSeg.getTotalShards());
+ byte[] cookedKey = Arrays.copyOf(halfCookedKey, halfCookedKey.length);
+ BytesUtil.writeShort(shard, cookedKey, 0, RowConstants.ROWKEY_SHARDID_LEN);
+ ret.add(cookedKey);
+ }
+ return ret;
+ }
+ }
+
+ private List<RawScan> spawnRawScansForAllShards(RawScan rawScan) {
+ List<RawScan> ret = Lists.newArrayList();
+ List<byte[]> startKeys = getRowKeysDifferentShards(rawScan.startKey);
+ List<byte[]> endKeys = getRowKeysDifferentShards(rawScan.endKey);
+ for (int i = 0; i < startKeys.size(); i++) {
+ RawScan temp = new RawScan(rawScan);
+ temp.startKey = startKeys.get(i);
+ temp.endKey = endKeys.get(i);
+ ret.add(temp);
+ }
+ return ret;
+ }
+
+ private IGTScanner getGTScannerInternal(final GTScanRequest scanRequest) throws IOException {
// primary key (also the 0th column block) is always selected
final ImmutableBitSet selectedColBlocks = scanRequest.getSelectedColBlocks().set(0);
@@ -138,22 +167,23 @@ public class CubeHBaseScanRPC extends CubeHBaseRPC {
HConnection hbaseConn = HBaseConnection.get(cubeSeg.getCubeInstance().getConfig().getStorageUrl());
final HTableInterface hbaseTable = hbaseConn.getTable(cubeSeg.getStorageLocationIdentifier());
- List<RawScan> rawScans = preparedHBaseScans(scanRequest.getPkStart(), scanRequest.getPkEnd(), scanRequest.getFuzzyKeys(), selectedColBlocks);
+ List<RawScan> rawScans = preparedHBaseScans(scanRequest.getGTScanRanges(), selectedColBlocks);
List<List<Integer>> hbaseColumnsToGT = getHBaseColumnsGTMapping(selectedColBlocks);
final List<ResultScanner> scanners = Lists.newArrayList();
final List<Iterator<Result>> resultIterators = Lists.newArrayList();
for (RawScan rawScan : rawScans) {
+ for (RawScan rawScanWithShard : spawnRawScansForAllShards(rawScan)) {
+ logScan(rawScanWithShard, cubeSeg.getStorageLocationIdentifier());
+ Scan hbaseScan = buildScan(rawScanWithShard);
- logScan(rawScan, cubeSeg.getStorageLocationIdentifier());
- Scan hbaseScan = buildScan(rawScan);
-
- final ResultScanner scanner = hbaseTable.getScanner(hbaseScan);
- final Iterator<Result> iterator = scanner.iterator();
+ final ResultScanner scanner = hbaseTable.getScanner(hbaseScan);
+ final Iterator<Result> iterator = scanner.iterator();
- scanners.add(scanner);
- resultIterators.add(iterator);
+ scanners.add(scanner);
+ resultIterators.add(iterator);
+ }
}
final Iterator<Result> allResultsIterator = Iterators.concat(resultIterators.iterator());
http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java
index 6bbb9c0..9ed914a 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java
@@ -19,204 +19,48 @@
package org.apache.kylin.storage.hbase.cube.v2;
import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.BitSet;
import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
import java.util.Iterator;
-import java.util.List;
import java.util.Set;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.debug.BackdoorToggles;
-import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.DateFormat;
-import org.apache.kylin.common.util.ImmutableBitSet;
-import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.gridtable.CubeGridTable;
-import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping;
-import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.dict.BuildInFunctionTransformer;
-import org.apache.kylin.dimension.DimensionEncoding;
-import org.apache.kylin.gridtable.EmptyGTScanner;
import org.apache.kylin.gridtable.GTInfo;
import org.apache.kylin.gridtable.GTRecord;
-import org.apache.kylin.gridtable.GTScanRange;
import org.apache.kylin.gridtable.GTScanRangePlanner;
import org.apache.kylin.gridtable.GTScanRequest;
-import org.apache.kylin.gridtable.GTUtil;
import org.apache.kylin.gridtable.IGTScanner;
-import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.gridtable.ScannerWorker;
import org.apache.kylin.metadata.filter.ITupleFilterTransformer;
import org.apache.kylin.metadata.filter.TupleFilter;
-import org.apache.kylin.metadata.filter.UDF.MassInTupleFilter;
import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.storage.hbase.cube.v2.filter.MassInValueProviderFactoryImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
public class CubeSegmentScanner implements IGTScanner {
private static final Logger logger = LoggerFactory.getLogger(CubeSegmentScanner.class);
- private static final int MAX_SCAN_RANGES = 200;
-
final CubeSegment cubeSeg;
- final GTInfo info;
- final List<GTScanRequest> scanRequests;
- final Scanner scanner;
+ final ScannerWorker scanner;
final Cuboid cuboid;
+ final GTScanRequest scanRequest;
+
public CubeSegmentScanner(CubeSegment cubeSeg, Cuboid cuboid, Set<TblColRef> dimensions, Set<TblColRef> groups, //
Collection<FunctionDesc> metrics, TupleFilter filter, boolean allowPreAggregate) {
this.cuboid = cuboid;
this.cubeSeg = cubeSeg;
- this.info = CubeGridTable.newGTInfo(cubeSeg, cuboid.getId());
-
- CuboidToGridTableMapping mapping = cuboid.getCuboidToGridTableMapping();
// translate FunctionTupleFilter to IN clause
ITupleFilterTransformer translator = new BuildInFunctionTransformer(cubeSeg.getDimensionEncodingMap());
filter = translator.transform(filter);
- //replace the constant values in filter to dictionary codes
- TupleFilter gtFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, mapping.getCuboidDimensionsInGTOrder(), groups);
-
- ImmutableBitSet gtDimensions = makeGridTableColumns(mapping, dimensions);
- ImmutableBitSet gtAggrGroups = makeGridTableColumns(mapping, replaceDerivedColumns(groups, cubeSeg.getCubeDesc()));
- ImmutableBitSet gtAggrMetrics = makeGridTableColumns(mapping, metrics);
- String[] gtAggrFuncs = makeAggrFuncs(mapping, metrics);
-
- GTScanRangePlanner scanRangePlanner;
- if (cubeSeg.getCubeDesc().getModel().getPartitionDesc().isPartitioned()) {
- TblColRef tblColRef = cubeSeg.getCubeDesc().getModel().getPartitionDesc().getPartitionDateColumnRef();
- TblColRef partitionColOfGT = null;
- Pair<ByteArray, ByteArray> segmentStartAndEnd = null;
- int index = mapping.getIndexOf(tblColRef);
- if (index >= 0) {
- segmentStartAndEnd = getSegmentStartAndEnd(index);
- partitionColOfGT = info.colRef(index);
- }
- scanRangePlanner = new GTScanRangePlanner(info, segmentStartAndEnd, partitionColOfGT);
- } else {
- scanRangePlanner = new GTScanRangePlanner(info, null, null);
- }
- List<GTScanRange> scanRanges = scanRangePlanner.planScanRanges(gtFilter, MAX_SCAN_RANGES);
-
- scanRequests = Lists.newArrayListWithCapacity(scanRanges.size());
-
- KylinConfig config = cubeSeg.getCubeInstance().getConfig();
- for (GTScanRange range : scanRanges) {
- GTScanRequest req = new GTScanRequest(info, range, gtDimensions, gtAggrGroups, gtAggrMetrics, gtAggrFuncs, gtFilter, allowPreAggregate, config.getQueryCoprocessorMemGB());
- scanRequests.add(req);
- }
-
- scanner = new Scanner();
- }
-
- private Pair<ByteArray, ByteArray> getSegmentStartAndEnd(int index) {
- ByteArray start;
- if (cubeSeg.getDateRangeStart() != Long.MIN_VALUE) {
- start = encodeTime(cubeSeg.getDateRangeStart(), index, 1);
- } else {
- start = new ByteArray();
- }
-
- ByteArray end;
- if (cubeSeg.getDateRangeEnd() != Long.MAX_VALUE) {
- end = encodeTime(cubeSeg.getDateRangeEnd(), index, -1);
- } else {
- end = new ByteArray();
- }
- return Pair.newPair(start, end);
-
- }
-
- private ByteArray encodeTime(long ts, int index, int roundingFlag) {
- String value;
- DataType partitionColType = info.getColumnType(index);
- if (partitionColType.isDate()) {
- value = DateFormat.formatToDateStr(ts);
- } else if (partitionColType.isDatetime() || partitionColType.isTimestamp()) {
- value = DateFormat.formatToTimeWithoutMilliStr(ts);
- } else if (partitionColType.isStringFamily()) {
- String partitionDateFormat = cubeSeg.getCubeDesc().getModel().getPartitionDesc().getPartitionDateFormat();
- if (StringUtils.isEmpty(partitionDateFormat))
- partitionDateFormat = DateFormat.DEFAULT_DATE_PATTERN;
- value = DateFormat.formatToDateStr(ts, partitionDateFormat);
- } else {
- throw new RuntimeException("Type " + partitionColType + " is not valid partition column type");
- }
-
- ByteBuffer buffer = ByteBuffer.allocate(info.getMaxColumnLength());
- info.getCodeSystem().encodeColumnValue(index, value, roundingFlag, buffer);
-
- return ByteArray.copyOf(buffer.array(), 0, buffer.position());
- }
-
- private Set<TblColRef> replaceDerivedColumns(Set<TblColRef> input, CubeDesc cubeDesc) {
- Set<TblColRef> ret = Sets.newHashSet();
- for (TblColRef col : input) {
- if (cubeDesc.hasHostColumn(col)) {
- for (TblColRef host : cubeDesc.getHostInfo(col).columns) {
- ret.add(host);
- }
- } else {
- ret.add(col);
- }
- }
- return ret;
- }
-
- private ImmutableBitSet makeGridTableColumns(CuboidToGridTableMapping mapping, Set<TblColRef> dimensions) {
- BitSet result = new BitSet();
- for (TblColRef dim : dimensions) {
- int idx = mapping.getIndexOf(dim);
- if (idx >= 0)
- result.set(idx);
- }
- return new ImmutableBitSet(result);
- }
-
- private ImmutableBitSet makeGridTableColumns(CuboidToGridTableMapping mapping, Collection<FunctionDesc> metrics) {
- BitSet result = new BitSet();
- for (FunctionDesc metric : metrics) {
- int idx = mapping.getIndexOf(metric);
- if (idx < 0)
- throw new IllegalStateException(metric + " not found in " + mapping);
- result.set(idx);
- }
- return new ImmutableBitSet(result);
- }
-
- private String[] makeAggrFuncs(final CuboidToGridTableMapping mapping, Collection<FunctionDesc> metrics) {
-
- //metrics are represented in ImmutableBitSet, which loses order information
- //sort the aggrFuns to align with metrics natural order
- List<FunctionDesc> metricList = Lists.newArrayList(metrics);
- Collections.sort(metricList, new Comparator<FunctionDesc>() {
- @Override
- public int compare(FunctionDesc o1, FunctionDesc o2) {
- int a = mapping.getIndexOf(o1);
- int b = mapping.getIndexOf(o2);
- return a - b;
- }
- });
-
- String[] result = new String[metricList.size()];
- int i = 0;
- for (FunctionDesc metric : metricList) {
- result[i++] = metric.getExpression();
- }
- return result;
+ GTScanRangePlanner scanRangePlanner = new GTScanRangePlanner(cubeSeg, cuboid, filter, dimensions, groups, metrics);
+ scanRequest = scanRangePlanner.planScanRequest(allowPreAggregate);
+ scanner = new ScannerWorker(cubeSeg, cuboid, scanRequest);
}
@Override
@@ -231,7 +75,7 @@ public class CubeSegmentScanner implements IGTScanner {
@Override
public GTInfo getInfo() {
- return info;
+ return scanRequest == null ? null : scanRequest.getInfo();
}
@Override
@@ -239,47 +83,4 @@ public class CubeSegmentScanner implements IGTScanner {
return scanner.getScannedRowCount();
}
- private class Scanner {
- IGTScanner internal = null;
-
- public Scanner() {
- CubeHBaseRPC rpc;
- if ("scan".equalsIgnoreCase(BackdoorToggles.getHbaseCubeQueryProtocol())) {
- MassInTupleFilter.VALUE_PROVIDER_FACTORY = new MassInValueProviderFactoryImpl(new MassInValueProviderFactoryImpl.DimEncAware() {
- @Override
- public DimensionEncoding getDimEnc(TblColRef col) {
- return info.getCodeSystem().getDimEnc(col.getColumnDesc().getZeroBasedIndex());
- }
- });
- rpc = new CubeHBaseScanRPC(cubeSeg, cuboid, info); // for local debug
- } else {
- rpc = new CubeHBaseEndpointRPC(cubeSeg, cuboid, info); // default behavior
- }
-
- try {
- if (scanRequests.size() == 0) {
- logger.info("Segment {} will be skipped", cubeSeg);
- internal = new EmptyGTScanner(0);
- } else {
- internal = rpc.getGTScanner(scanRequests);
- }
- } catch (IOException e) {
- throw new RuntimeException("error", e);
- }
- }
-
- public Iterator<GTRecord> iterator() {
- return internal.iterator();
- }
-
- public void close() throws IOException {
- internal.close();
- }
-
- public int getScannedRowCount() {
- return internal.getScannedRowCount();
- }
-
- }
-
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java
index 863dd67..e0e6d83 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java
@@ -79,26 +79,26 @@ public class CubeStorageQuery implements IStorageQuery {
Set<FunctionDesc> metrics = new LinkedHashSet<FunctionDesc>();
buildDimensionsAndMetrics(sqlDigest, dimensions, metrics);
- // all dimensions = groups + filter dimensions
- Set<TblColRef> filterDims = Sets.newHashSet(dimensions);
- filterDims.removeAll(groups);
+ // all dimensions = groups + other(like filter) dimensions
+ Set<TblColRef> otherDims = Sets.newHashSet(dimensions);
+ otherDims.removeAll(groups);
// expand derived (xxxD means contains host columns only, derived columns were translated)
Set<TblColRef> derivedPostAggregation = Sets.newHashSet();
Set<TblColRef> groupsD = expandDerived(groups, derivedPostAggregation);
- Set<TblColRef> filterDimsD = expandDerived(filterDims, derivedPostAggregation);
- filterDimsD.removeAll(groupsD);
+ Set<TblColRef> otherDimsD = expandDerived(otherDims, derivedPostAggregation);
+ otherDimsD.removeAll(groupsD);
// identify cuboid
Set<TblColRef> dimensionsD = new LinkedHashSet<TblColRef>();
dimensionsD.addAll(groupsD);
- dimensionsD.addAll(filterDimsD);
- Cuboid cuboid = identifyCuboid(dimensionsD, metrics);
+ dimensionsD.addAll(otherDimsD);
+ Cuboid cuboid = Cuboid.identifyCuboid(cubeDesc,dimensionsD, metrics);
context.setCuboid(cuboid);
// isExactAggregation? meaning: tuples returned from storage requires no further aggregation in query engine
Set<TblColRef> singleValuesD = findSingleValueColumns(filter);
- boolean isExactAggregation = isExactAggregation(cuboid, groups, filterDimsD, singleValuesD, derivedPostAggregation);
+ boolean isExactAggregation = isExactAggregation(cuboid, groups, otherDimsD, singleValuesD, derivedPostAggregation);
context.setExactAggregation(isExactAggregation);
// replace derived columns in filter with host columns; columns on loosened condition must be added to group by
@@ -169,19 +169,7 @@ public class CubeStorageQuery implements IStorageQuery {
return expanded;
}
- private Cuboid identifyCuboid(Set<TblColRef> dimensions, Collection<FunctionDesc> metrics) {
- for (FunctionDesc metric : metrics) {
- if (metric.getMeasureType().onlyAggrInBaseCuboid())
- return Cuboid.getBaseCuboid(cubeDesc);
- }
-
- long cuboidID = 0;
- for (TblColRef column : dimensions) {
- int index = cubeDesc.getRowkey().getColumnBitIndex(column);
- cuboidID |= 1L << index;
- }
- return Cuboid.findById(cubeDesc, cuboidID);
- }
+
@SuppressWarnings("unchecked")
private Set<TblColRef> findSingleValueColumns(TupleFilter filter) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/RawScan.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/RawScan.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/RawScan.java
index c2ffdba..c2eacc1 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/RawScan.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/RawScan.java
@@ -47,6 +47,16 @@ public class RawScan {
this.hbaseMaxResultSize = hbaseMaxResultSize;
}
+ public RawScan(RawScan other) {
+
+ this.startKey = other.startKey;
+ this.endKey = other.endKey;
+ this.hbaseColumns = other.hbaseColumns;
+ this.fuzzyKeys = other.fuzzyKeys;
+ this.hbaseCaching = other.hbaseCaching;
+ this.hbaseMaxResultSize = other.hbaseMaxResultSize;
+ }
+
public String getStartKeyAsString() {
return BytesUtil.toHex(this.startKey);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
index 1e7b1b5..9e8e251 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.net.InetAddress;
import java.nio.ByteBuffer;
+import java.util.Iterator;
import java.util.List;
import org.apache.commons.io.IOUtils;
@@ -38,6 +39,7 @@ import org.apache.hadoop.hbase.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.common.util.CompressionUtils;
import org.apache.kylin.cube.kv.RowConstants;
import org.apache.kylin.dimension.DimensionEncoding;
@@ -58,6 +60,7 @@ import org.apache.kylin.storage.hbase.cube.v2.filter.MassInValueProviderFactoryI
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.protobuf.HBaseZeroCopyByteString;
import com.google.protobuf.RpcCallback;
@@ -138,8 +141,21 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
Bytes.putBytes(rawScan.endKey, 0, regionStartKey, 0, shardLength);
}
- private void appendProfileInfo(StringBuilder sb) {
+ private List<RawScan> deserializeRawScans(ByteBuffer in) {
+ int rawScanCount = BytesUtil.readVInt(in);
+ List<RawScan> ret = Lists.newArrayList();
+ for (int i = 0; i < rawScanCount; i++) {
+ RawScan temp = RawScan.serializer.deserialize(in);
+ ret.add(temp);
+ }
+ return ret;
+ }
+
+ private void appendProfileInfo(StringBuilder sb, String info) {
sb.append(System.currentTimeMillis() - this.serviceStartTime);
+ if (info != null) {
+ sb.append(":").append(info);
+ }
sb.append(",");
}
@@ -159,7 +175,12 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
region.startRegionOperation();
final GTScanRequest scanReq = GTScanRequest.serializer.deserialize(ByteBuffer.wrap(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getGtScanRequest())));
- final RawScan hbaseRawScan = RawScan.serializer.deserialize(ByteBuffer.wrap(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getHbaseRawScan())));
+ List<List<Integer>> hbaseColumnsToGT = Lists.newArrayList();
+ for (IntList intList : request.getHbaseColumnsToGTList()) {
+ hbaseColumnsToGT.add(intList.getIntsList());
+ }
+ CoprocessorBehavior behavior = CoprocessorBehavior.valueOf(request.getBehavior());
+ final List<RawScan> hbaseRawScans = deserializeRawScans(ByteBuffer.wrap(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getHbaseRawScan())));
MassInTupleFilter.VALUE_PROVIDER_FACTORY = new MassInValueProviderFactoryImpl(new MassInValueProviderFactoryImpl.DimEncAware() {
@Override
@@ -168,40 +189,61 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
}
});
- List<List<Integer>> hbaseColumnsToGT = Lists.newArrayList();
- for (IntList intList : request.getHbaseColumnsToGTList()) {
- hbaseColumnsToGT.add(intList.getIntsList());
- }
-
- if (request.getRowkeyPreambleSize() - RowConstants.ROWKEY_CUBOIDID_LEN > 0) {
- //if has shard, fill region shard to raw scan start/end
- updateRawScanByCurrentRegion(hbaseRawScan, region, request.getRowkeyPreambleSize() - RowConstants.ROWKEY_CUBOIDID_LEN);
- }
-
- Scan scan = CubeHBaseRPC.buildScan(hbaseRawScan);
+ final List<InnerScannerAsIterator> cellListsForeachRawScan = Lists.newArrayList();
+ for (RawScan hbaseRawScan : hbaseRawScans) {
+ if (request.getRowkeyPreambleSize() - RowConstants.ROWKEY_CUBOIDID_LEN > 0) {
+ //if has shard, fill region shard to raw scan start/end
+ updateRawScanByCurrentRegion(hbaseRawScan, region, request.getRowkeyPreambleSize() - RowConstants.ROWKEY_CUBOIDID_LEN);
+ }
- appendProfileInfo(sb);
+ Scan scan = CubeHBaseRPC.buildScan(hbaseRawScan);
+ innerScanner = region.getScanner(scan);
- innerScanner = region.getScanner(scan);
- CoprocessorBehavior behavior = CoprocessorBehavior.valueOf(request.getBehavior());
+ InnerScannerAsIterator cellListIterator = new InnerScannerAsIterator(innerScanner);
+ cellListsForeachRawScan.add(cellListIterator);
+ }
+
+ final Iterator<List<Cell>> allCellLists = Iterators.concat(cellListsForeachRawScan.iterator());
if (behavior.ordinal() < CoprocessorBehavior.SCAN.ordinal()) {
+ //this is only for CoprocessorBehavior.RAW_SCAN case to profile hbase scan speed
List<Cell> temp = Lists.newArrayList();
int counter = 0;
while (innerScanner.nextRaw(temp)) {
counter++;
}
- sb.append("Scanned " + counter + " rows in " + (System.currentTimeMillis() - serviceStartTime) + ",");
+ appendProfileInfo(sb, "scanned " + counter);
}
- InnerScannerAsIterator cellListIterator = new InnerScannerAsIterator(innerScanner);
if (behavior.ordinal() < CoprocessorBehavior.SCAN_FILTER_AGGR_CHECKMEM.ordinal()) {
scanReq.setAggrCacheGB(0); // disable mem check if so told
}
- IGTStore store = new HBaseReadonlyStore(cellListIterator, scanReq, hbaseRawScan.hbaseColumns, hbaseColumnsToGT, request.getRowkeyPreambleSize());
- IGTScanner rawScanner = store.scan(scanReq);
+ IGTStore store = new HBaseReadonlyStore(new CellListIterator() {
+ @Override
+ public void close() throws IOException {
+ for (CellListIterator closeable : cellListsForeachRawScan) {
+ closeable.close();
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ return allCellLists.hasNext();
+ }
+
+ @Override
+ public List<Cell> next() {
+ return allCellLists.next();
+ }
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ }, scanReq, hbaseRawScans.get(0).hbaseColumns, hbaseColumnsToGT, request.getRowkeyPreambleSize());
+
+ IGTScanner rawScanner = store.scan(scanReq);
IGTScanner finalScanner = scanReq.decorateScanner(rawScanner, //
behavior.ordinal() >= CoprocessorBehavior.SCAN_FILTER.ordinal(), //
behavior.ordinal() >= CoprocessorBehavior.SCAN_FILTER_AGGR.ordinal());
@@ -219,20 +261,20 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
finalRowCount++;
}
- appendProfileInfo(sb);
+ appendProfileInfo(sb, "agg done");
//outputStream.close() is not necessary
allRows = outputStream.toByteArray();
byte[] compressedAllRows = CompressionUtils.compress(allRows);
- appendProfileInfo(sb);
+ appendProfileInfo(sb, "compress done");
OperatingSystemMXBean operatingSystemMXBean = (OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean();
double systemCpuLoad = operatingSystemMXBean.getSystemCpuLoad();
double freePhysicalMemorySize = operatingSystemMXBean.getFreePhysicalMemorySize();
double freeSwapSpaceSize = operatingSystemMXBean.getFreeSwapSpaceSize();
- appendProfileInfo(sb);
+ appendProfileInfo(sb, "server stats done");
CubeVisitProtos.CubeVisitResponse.Builder responseBuilder = CubeVisitProtos.CubeVisitResponse.newBuilder();
done.run(responseBuilder.//
http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/SandboxMetastoreCLI.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/SandboxMetastoreCLI.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/SandboxMetastoreCLI.java
index b12451a..fa1687b 100644
--- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/SandboxMetastoreCLI.java
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/SandboxMetastoreCLI.java
@@ -42,7 +42,7 @@ public class SandboxMetastoreCLI {
public static void main(String[] args) throws Exception {
logger.info("Adding to classpath: " + new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
- System.setProperty(KylinConfig.KYLIN_CONF, "../examples/test_case_data/sandbox");
+ System.setProperty(KylinConfig.KYLIN_CONF, HBaseMetadataTestCase.SANDBOX_TEST_DATA);
if (StringUtils.isEmpty(System.getProperty("hdp.version"))) {
throw new RuntimeException("No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.2.4.2-2");
}
[8/8] kylin git commit: update kylin metadata version from 1.5.1 to
1.5.2
Posted by ma...@apache.org.
update kylin metadata version from 1.5.1 to 1.5.2
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/59cb57ca
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/59cb57ca
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/59cb57ca
Branch: refs/heads/master
Commit: 59cb57ca6e1a545628d0057d5980c4fe7e789536
Parents: b6c893d
Author: Hongbin Ma <ma...@apache.org>
Authored: Wed Apr 13 11:39:42 2016 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Wed Apr 13 11:39:42 2016 +0800
----------------------------------------------------------------------
.../src/main/java/org/apache/kylin/common/KylinVersion.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/59cb57ca/core-common/src/main/java/org/apache/kylin/common/KylinVersion.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinVersion.java b/core-common/src/main/java/org/apache/kylin/common/KylinVersion.java
index d55f969..0700968 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinVersion.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinVersion.java
@@ -59,7 +59,7 @@ public class KylinVersion {
/**
* Require MANUAL updating kylin version per ANY upgrading.
*/
- private static final KylinVersion CURRENT_KYLIN_VERSION = new KylinVersion("1.5.1");
+ private static final KylinVersion CURRENT_KYLIN_VERSION = new KylinVersion("1.5.2");
private static final Set<KylinVersion> SIGNATURE_INCOMPATIBLE_REVISIONS = new HashSet<KylinVersion>();
[4/8] kylin git commit: refactor
Posted by ma...@apache.org.
http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java
----------------------------------------------------------------------
diff --git a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java
deleted file mode 100644
index d50baad..0000000
--- a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.impl.threadpool;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.kylin.job.BaseTestExecutable;
-import org.apache.kylin.job.ErrorTestExecutable;
-import org.apache.kylin.job.FailedTestExecutable;
-import org.apache.kylin.job.SelfStopExecutable;
-import org.apache.kylin.job.SucceedTestExecutable;
-import org.apache.kylin.job.execution.DefaultChainedExecutable;
-import org.apache.kylin.job.execution.ExecutableState;
-import org.junit.Test;
-
-/**
- */
-public class DefaultSchedulerTest extends BaseSchedulerTest {
-
- @Test
- public void testSingleTaskJob() throws Exception {
- DefaultChainedExecutable job = new DefaultChainedExecutable();
- BaseTestExecutable task1 = new SucceedTestExecutable();
- job.addTask(task1);
- jobService.addJob(job);
- waitForJobFinish(job.getId());
- assertEquals(ExecutableState.SUCCEED, jobService.getOutput(job.getId()).getState());
- assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState());
- }
-
- @Test
- public void testSucceed() throws Exception {
- DefaultChainedExecutable job = new DefaultChainedExecutable();
- BaseTestExecutable task1 = new SucceedTestExecutable();
- BaseTestExecutable task2 = new SucceedTestExecutable();
- job.addTask(task1);
- job.addTask(task2);
- jobService.addJob(job);
- waitForJobFinish(job.getId());
- assertEquals(ExecutableState.SUCCEED, jobService.getOutput(job.getId()).getState());
- assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState());
- assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task2.getId()).getState());
- }
-
- @Test
- public void testSucceedAndFailed() throws Exception {
- DefaultChainedExecutable job = new DefaultChainedExecutable();
- BaseTestExecutable task1 = new SucceedTestExecutable();
- BaseTestExecutable task2 = new FailedTestExecutable();
- job.addTask(task1);
- job.addTask(task2);
- jobService.addJob(job);
- waitForJobFinish(job.getId());
- assertEquals(ExecutableState.ERROR, jobService.getOutput(job.getId()).getState());
- assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState());
- assertEquals(ExecutableState.ERROR, jobService.getOutput(task2.getId()).getState());
- }
-
- @Test
- public void testSucceedAndError() throws Exception {
- DefaultChainedExecutable job = new DefaultChainedExecutable();
- BaseTestExecutable task1 = new ErrorTestExecutable();
- BaseTestExecutable task2 = new SucceedTestExecutable();
- job.addTask(task1);
- job.addTask(task2);
- jobService.addJob(job);
- waitForJobFinish(job.getId());
- assertEquals(ExecutableState.ERROR, jobService.getOutput(job.getId()).getState());
- assertEquals(ExecutableState.ERROR, jobService.getOutput(task1.getId()).getState());
- assertEquals(ExecutableState.READY, jobService.getOutput(task2.getId()).getState());
- }
-
- @Test
- public void testDiscard() throws Exception {
- DefaultChainedExecutable job = new DefaultChainedExecutable();
- BaseTestExecutable task1 = new SelfStopExecutable();
- job.addTask(task1);
- jobService.addJob(job);
- waitForJobStatus(job.getId(), ExecutableState.RUNNING, 500);
- jobService.discardJob(job.getId());
- waitForJobFinish(job.getId());
- assertEquals(ExecutableState.DISCARDED, jobService.getOutput(job.getId()).getState());
- assertEquals(ExecutableState.DISCARDED, jobService.getOutput(task1.getId()).getState());
- Thread.sleep(5000);
- System.out.println(job);
- }
-
- @SuppressWarnings("rawtypes")
- @Test
- public void testSchedulerPool() throws InterruptedException {
- ScheduledExecutorService fetchPool = Executors.newScheduledThreadPool(1);
- final CountDownLatch countDownLatch = new CountDownLatch(3);
- ScheduledFuture future = fetchPool.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- countDownLatch.countDown();
- }
- }, 5, 5, TimeUnit.SECONDS);
- assertTrue("countDownLatch should reach zero in 15 secs", countDownLatch.await(20, TimeUnit.SECONDS));
- assertTrue("future should still running", future.cancel(true));
-
- final CountDownLatch countDownLatch2 = new CountDownLatch(3);
- ScheduledFuture future2 = fetchPool.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- countDownLatch2.countDown();
- throw new RuntimeException();
- }
- }, 5, 5, TimeUnit.SECONDS);
- assertFalse("countDownLatch2 should NOT reach zero in 15 secs", countDownLatch2.await(20, TimeUnit.SECONDS));
- assertFalse("future2 should has been stopped", future2.cancel(true));
-
- final CountDownLatch countDownLatch3 = new CountDownLatch(3);
- ScheduledFuture future3 = fetchPool.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- try {
- countDownLatch3.countDown();
- throw new RuntimeException();
- } catch (Exception e) {
- }
- }
- }, 5, 5, TimeUnit.SECONDS);
- assertTrue("countDownLatch3 should reach zero in 15 secs", countDownLatch3.await(20, TimeUnit.SECONDS));
- assertTrue("future3 should still running", future3.cancel(true));
- }
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/core-metadata/src/main/java/org/apache/kylin/metadata/filter/UDF/MassInValueProviderFactory.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/UDF/MassInValueProviderFactory.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/UDF/MassInValueProviderFactory.java
index 0ae7e6a..18da37a 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/UDF/MassInValueProviderFactory.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/UDF/MassInValueProviderFactory.java
@@ -18,7 +18,6 @@
package org.apache.kylin.metadata.filter.UDF;
-import org.apache.kylin.dimension.DimensionEncoding;
import org.apache.kylin.metadata.filter.function.Functions;
import org.apache.kylin.metadata.model.TblColRef;
http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/core-storage/src/test/java/org/apache/kylin/storage/StorageMockUtils.java
----------------------------------------------------------------------
diff --git a/core-storage/src/test/java/org/apache/kylin/storage/StorageMockUtils.java b/core-storage/src/test/java/org/apache/kylin/storage/StorageMockUtils.java
new file mode 100644
index 0000000..3bb4540
--- /dev/null
+++ b/core-storage/src/test/java/org/apache/kylin/storage/StorageMockUtils.java
@@ -0,0 +1,189 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ * /
+ */
+
+package org.apache.kylin.storage;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.kylin.metadata.filter.ColumnTupleFilter;
+import org.apache.kylin.metadata.filter.CompareTupleFilter;
+import org.apache.kylin.metadata.filter.ConstantTupleFilter;
+import org.apache.kylin.metadata.filter.LogicalTupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.kylin.metadata.model.ColumnDesc;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.ParameterDesc;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.tuple.TupleInfo;
+
+import com.google.common.collect.ImmutableList;
+
+/**
+ */
+public class StorageMockUtils {
+ public static TupleInfo newTupleInfo(List<TblColRef> groups, List<FunctionDesc> aggregations) {
+ TupleInfo info = new TupleInfo();
+ int idx = 0;
+
+ for (TblColRef col : groups) {
+ info.setField(col.getName(), col, idx++);
+ }
+
+ TableDesc sourceTable = groups.get(0).getColumnDesc().getTable();
+ for (FunctionDesc func : aggregations) {
+ TblColRef col = new TblColRef(func.newFakeRewriteColumn(sourceTable));
+ info.setField(col.getName(), col, idx++);
+ }
+
+ return info;
+ }
+
+ public static List<TblColRef> buildGroups() {
+ List<TblColRef> groups = new ArrayList<TblColRef>();
+
+ TableDesc t1 = TableDesc.mockup("DEFAULT.TEST_KYLIN_FACT");
+ ColumnDesc c1 = ColumnDesc.mockup(t1, 2, "CAL_DT", "date");
+ TblColRef cf1 = new TblColRef(c1);
+ groups.add(cf1);
+
+ TableDesc t2 = TableDesc.mockup("DEFAULT.TEST_CATEGORY_GROUPINGS");
+ ColumnDesc c2 = ColumnDesc.mockup(t2, 14, "META_CATEG_NAME", "string");
+ TblColRef cf2 = new TblColRef(c2);
+ groups.add(cf2);
+
+ return groups;
+ }
+
+ public static List<FunctionDesc> buildAggregations1() {
+ List<FunctionDesc> functions = new ArrayList<FunctionDesc>();
+
+ TableDesc t1 = TableDesc.mockup("DEFAULT.TEST_KYLIN_FACT");
+ TblColRef priceCol = new TblColRef(ColumnDesc.mockup(t1, 7, "PRICE", "decimal(19,4)"));
+
+ FunctionDesc f1 = new FunctionDesc();
+ f1.setExpression("SUM");
+ ParameterDesc p1 = new ParameterDesc();
+ p1.setType("column");
+ p1.setValue("PRICE");
+ p1.setColRefs(ImmutableList.of(priceCol));
+ f1.setParameter(p1);
+ f1.setReturnType("decimal(19,4)");
+ functions.add(f1);
+
+
+ return functions;
+ }
+
+ public static List<FunctionDesc> buildAggregations() {
+ List<FunctionDesc> functions = new ArrayList<FunctionDesc>();
+
+ TableDesc t1 = TableDesc.mockup("DEFAULT.TEST_KYLIN_FACT");
+ TblColRef priceCol = new TblColRef(ColumnDesc.mockup(t1, 7, "PRICE", "decimal(19,4)"));
+ TblColRef sellerCol = new TblColRef(ColumnDesc.mockup(t1, 9, "SELLER_ID", "bigint"));
+
+ FunctionDesc f1 = new FunctionDesc();
+ f1.setExpression("SUM");
+ ParameterDesc p1 = new ParameterDesc();
+ p1.setType("column");
+ p1.setValue("PRICE");
+ p1.setColRefs(ImmutableList.of(priceCol));
+ f1.setParameter(p1);
+ f1.setReturnType("decimal(19,4)");
+ functions.add(f1);
+
+ FunctionDesc f2 = new FunctionDesc();
+ f2.setExpression("COUNT_DISTINCT");
+ ParameterDesc p2 = new ParameterDesc();
+ p2.setType("column");
+ p2.setValue("SELLER_ID");
+ p2.setColRefs(ImmutableList.of(sellerCol));
+ f2.setParameter(p2);
+ f2.setReturnType("hllc(10)");
+ functions.add(f2);
+
+ return functions;
+ }
+
+ public static CompareTupleFilter buildTs2010Filter(TblColRef column) {
+ CompareTupleFilter compareFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.GT);
+ ColumnTupleFilter columnFilter1 = new ColumnTupleFilter(column);
+ compareFilter.addChild(columnFilter1);
+ ConstantTupleFilter constantFilter1 = new ConstantTupleFilter("2010-01-01");
+ compareFilter.addChild(constantFilter1);
+ return compareFilter;
+ }
+
+ public static CompareTupleFilter buildTs2011Filter(TblColRef column) {
+ CompareTupleFilter compareFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.GT);
+ ColumnTupleFilter columnFilter1 = new ColumnTupleFilter(column);
+ compareFilter.addChild(columnFilter1);
+ ConstantTupleFilter constantFilter1 = new ConstantTupleFilter("2011-01-01");
+ compareFilter.addChild(constantFilter1);
+ return compareFilter;
+ }
+
+ public static CompareTupleFilter buildFilter1(TblColRef column) {
+ CompareTupleFilter compareFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.LTE);
+ ColumnTupleFilter columnFilter1 = new ColumnTupleFilter(column);
+ compareFilter.addChild(columnFilter1);
+ ConstantTupleFilter constantFilter1 = new ConstantTupleFilter("2012-05-23");
+ compareFilter.addChild(constantFilter1);
+ return compareFilter;
+ }
+
+ public static CompareTupleFilter buildFilter2(TblColRef column) {
+ CompareTupleFilter compareFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.EQ);
+ ColumnTupleFilter columnFilter2 = new ColumnTupleFilter(column);
+ compareFilter.addChild(columnFilter2);
+ ConstantTupleFilter constantFilter2 = new ConstantTupleFilter("ClothinShoes & Accessories");
+ compareFilter.addChild(constantFilter2);
+ return compareFilter;
+ }
+
+ public static CompareTupleFilter buildFilter3(TblColRef column) {
+ CompareTupleFilter compareFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.EQ);
+ ColumnTupleFilter columnFilter1 = new ColumnTupleFilter(column);
+ compareFilter.addChild(columnFilter1);
+ ConstantTupleFilter constantFilter1 = new ConstantTupleFilter("2012-05-23");
+ compareFilter.addChild(constantFilter1);
+ return compareFilter;
+ }
+
+
+ public static TupleFilter buildAndFilter(List<TblColRef> columns) {
+ CompareTupleFilter compareFilter1 = buildFilter1(columns.get(0));
+ CompareTupleFilter compareFilter2 = buildFilter2(columns.get(1));
+ LogicalTupleFilter andFilter = new LogicalTupleFilter(TupleFilter.FilterOperatorEnum.AND);
+ andFilter.addChild(compareFilter1);
+ andFilter.addChild(compareFilter2);
+ return andFilter;
+ }
+
+ public static TupleFilter buildOrFilter(List<TblColRef> columns) {
+ CompareTupleFilter compareFilter1 = buildFilter1(columns.get(0));
+ CompareTupleFilter compareFilter2 = buildFilter2(columns.get(1));
+ LogicalTupleFilter logicFilter = new LogicalTupleFilter(TupleFilter.FilterOperatorEnum.OR);
+ logicFilter.addChild(compareFilter1);
+ logicFilter.addChild(compareFilter2);
+ return logicFilter;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/core-storage/src/test/java/org/apache/kylin/storage/cache/StorageMockUtils.java
----------------------------------------------------------------------
diff --git a/core-storage/src/test/java/org/apache/kylin/storage/cache/StorageMockUtils.java b/core-storage/src/test/java/org/apache/kylin/storage/cache/StorageMockUtils.java
deleted file mode 100644
index 5f8f08f..0000000
--- a/core-storage/src/test/java/org/apache/kylin/storage/cache/StorageMockUtils.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.storage.cache;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.kylin.metadata.filter.ColumnTupleFilter;
-import org.apache.kylin.metadata.filter.CompareTupleFilter;
-import org.apache.kylin.metadata.filter.ConstantTupleFilter;
-import org.apache.kylin.metadata.filter.LogicalTupleFilter;
-import org.apache.kylin.metadata.filter.TupleFilter;
-import org.apache.kylin.metadata.model.ColumnDesc;
-import org.apache.kylin.metadata.model.FunctionDesc;
-import org.apache.kylin.metadata.model.ParameterDesc;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.tuple.TupleInfo;
-
-import com.google.common.collect.ImmutableList;
-
-/**
- */
-public class StorageMockUtils {
- public static TupleInfo newTupleInfo(List<TblColRef> groups, List<FunctionDesc> aggregations) {
- TupleInfo info = new TupleInfo();
- int idx = 0;
-
- for (TblColRef col : groups) {
- info.setField(col.getName(), col, idx++);
- }
-
- TableDesc sourceTable = groups.get(0).getColumnDesc().getTable();
- for (FunctionDesc func : aggregations) {
- TblColRef col = new TblColRef(func.newFakeRewriteColumn(sourceTable));
- info.setField(col.getName(), col, idx++);
- }
-
- return info;
- }
-
- public static List<TblColRef> buildGroups() {
- List<TblColRef> groups = new ArrayList<TblColRef>();
-
- TableDesc t1 = TableDesc.mockup("DEFAULT.TEST_KYLIN_FACT");
- ColumnDesc c1 = ColumnDesc.mockup(t1, 2, "CAL_DT", "date");
- TblColRef cf1 = new TblColRef(c1);
- groups.add(cf1);
-
- TableDesc t2 = TableDesc.mockup("DEFAULT.TEST_CATEGORY_GROUPINGS");
- ColumnDesc c2 = ColumnDesc.mockup(t2, 14, "META_CATEG_NAME", "string");
- TblColRef cf2 = new TblColRef(c2);
- groups.add(cf2);
-
- return groups;
- }
-
- public static List<FunctionDesc> buildAggregations() {
- List<FunctionDesc> functions = new ArrayList<FunctionDesc>();
-
- TableDesc t1 = TableDesc.mockup("DEFAULT.TEST_KYLIN_FACT");
- TblColRef priceCol = new TblColRef(ColumnDesc.mockup(t1, 7, "PRICE", "decimal(19,4)"));
- TblColRef sellerCol = new TblColRef(ColumnDesc.mockup(t1, 9, "SELLER_ID", "bigint"));
-
- FunctionDesc f1 = new FunctionDesc();
- f1.setExpression("SUM");
- ParameterDesc p1 = new ParameterDesc();
- p1.setType("column");
- p1.setValue("PRICE");
- p1.setColRefs(ImmutableList.of(priceCol));
- f1.setParameter(p1);
- f1.setReturnType("decimal(19,4)");
- functions.add(f1);
-
- FunctionDesc f2 = new FunctionDesc();
- f2.setExpression("COUNT_DISTINCT");
- ParameterDesc p2 = new ParameterDesc();
- p2.setType("column");
- p2.setValue("SELLER_ID");
- p2.setColRefs(ImmutableList.of(sellerCol));
- f2.setParameter(p2);
- f2.setReturnType("hllc(10)");
- functions.add(f2);
-
- return functions;
- }
-
- public static CompareTupleFilter buildTs2010Filter(TblColRef column) {
- CompareTupleFilter compareFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.GT);
- ColumnTupleFilter columnFilter1 = new ColumnTupleFilter(column);
- compareFilter.addChild(columnFilter1);
- ConstantTupleFilter constantFilter1 = new ConstantTupleFilter("2010-01-01");
- compareFilter.addChild(constantFilter1);
- return compareFilter;
- }
-
- public static CompareTupleFilter buildTs2011Filter(TblColRef column) {
- CompareTupleFilter compareFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.GT);
- ColumnTupleFilter columnFilter1 = new ColumnTupleFilter(column);
- compareFilter.addChild(columnFilter1);
- ConstantTupleFilter constantFilter1 = new ConstantTupleFilter("2011-01-01");
- compareFilter.addChild(constantFilter1);
- return compareFilter;
- }
-
- public static CompareTupleFilter buildFilter1(TblColRef column) {
- CompareTupleFilter compareFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.LTE);
- ColumnTupleFilter columnFilter1 = new ColumnTupleFilter(column);
- compareFilter.addChild(columnFilter1);
- ConstantTupleFilter constantFilter1 = new ConstantTupleFilter("2012-05-23");
- compareFilter.addChild(constantFilter1);
- return compareFilter;
- }
-
- public static CompareTupleFilter buildFilter2(TblColRef column) {
- CompareTupleFilter compareFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.EQ);
- ColumnTupleFilter columnFilter2 = new ColumnTupleFilter(column);
- compareFilter.addChild(columnFilter2);
- ConstantTupleFilter constantFilter2 = new ConstantTupleFilter("ClothinShoes & Accessories");
- compareFilter.addChild(constantFilter2);
- return compareFilter;
- }
-
- public static TupleFilter buildAndFilter(List<TblColRef> columns) {
- CompareTupleFilter compareFilter1 = buildFilter1(columns.get(0));
- CompareTupleFilter compareFilter2 = buildFilter2(columns.get(1));
- LogicalTupleFilter andFilter = new LogicalTupleFilter(TupleFilter.FilterOperatorEnum.AND);
- andFilter.addChild(compareFilter1);
- andFilter.addChild(compareFilter2);
- return andFilter;
- }
-
- public static TupleFilter buildOrFilter(List<TblColRef> columns) {
- CompareTupleFilter compareFilter1 = buildFilter1(columns.get(0));
- CompareTupleFilter compareFilter2 = buildFilter2(columns.get(1));
- LogicalTupleFilter logicFilter = new LogicalTupleFilter(TupleFilter.FilterOperatorEnum.OR);
- logicFilter.addChild(compareFilter1);
- logicFilter.addChild(compareFilter2);
- return logicFilter;
- }
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/kylin-it/pom.xml
----------------------------------------------------------------------
diff --git a/kylin-it/pom.xml b/kylin-it/pom.xml
index 60815c7..1a3efb7 100644
--- a/kylin-it/pom.xml
+++ b/kylin-it/pom.xml
@@ -124,6 +124,13 @@
</dependency>
<dependency>
<groupId>org.apache.kylin</groupId>
+ <artifactId>kylin-core-job</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kylin</groupId>
<artifactId>kylin-storage-hbase</artifactId>
<type>test-jar</type>
<scope>test</scope>
http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderStressTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderStressTest.java b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderStressTest.java
index 4291d91..809cfb7 100644
--- a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderStressTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderStressTest.java
@@ -63,8 +63,8 @@ public class ITDoggedCubeBuilderStressTest extends LocalFileMetadataTestCase {
CubeManager cubeManager = CubeManager.getInstance(kylinConfig);
cube = cubeManager.getCube("test_kylin_cube_without_slr_left_join_empty");
- flatTable = "../examples/test_case_data/localmeta/data/flatten_data_for_without_slr_left_join.csv";
- dictionaryMap = InMemCubeBuilderTest.getDictionaryMap(cube, flatTable);
+ flatTable = LOCALMETA_TEST_DATA + "/data/flatten_data_for_without_slr_left_join.csv";
+ dictionaryMap = ITInMemCubeBuilderTest.getDictionaryMap(cube, flatTable);
}
@AfterClass
@@ -84,7 +84,7 @@ public class ITDoggedCubeBuilderStressTest extends LocalFileMetadataTestCase {
{
Future<?> future = executorService.submit(doggedBuilder.buildAsRunnable(queue, new NoopWriter()));
- InMemCubeBuilderTest.feedData(cube, flatTable, queue, INPUT_ROWS, randSeed);
+ ITInMemCubeBuilderTest.feedData(cube, flatTable, queue, INPUT_ROWS, randSeed);
future.get();
}
}
@@ -101,7 +101,7 @@ public class ITDoggedCubeBuilderStressTest extends LocalFileMetadataTestCase {
@Override
public void close() {
-
+
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderTest.java b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderTest.java
new file mode 100644
index 0000000..ab9ac63
--- /dev/null
+++ b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderTest.java
@@ -0,0 +1,163 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ * /
+ */
+
+package org.apache.kylin.cube.inmemcubing;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.dimension.Dictionary;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public class ITDoggedCubeBuilderTest extends LocalFileMetadataTestCase {
+
+ @SuppressWarnings("unused")
+ private static final Logger logger = LoggerFactory.getLogger(ITDoggedCubeBuilderTest.class);
+
+ private static final int INPUT_ROWS = 10000;
+ private static final int SPLIT_ROWS = 5000;
+ private static final int THREADS = 4;
+
+ private static CubeInstance cube;
+ private static String flatTable;
+ private static Map<TblColRef, Dictionary<String>> dictionaryMap;
+
+ @BeforeClass
+ public static void before() throws IOException {
+ staticCreateTestMetadata();
+
+ KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+ CubeManager cubeManager = CubeManager.getInstance(kylinConfig);
+
+ cube = cubeManager.getCube("test_kylin_cube_without_slr_left_join_empty");
+ flatTable = LocalFileMetadataTestCase.LOCALMETA_TEST_DATA + "/data/flatten_data_for_without_slr_left_join.csv";
+ dictionaryMap = ITInMemCubeBuilderTest.getDictionaryMap(cube, flatTable);
+ }
+
+ @AfterClass
+ public static void after() throws Exception {
+ staticCleanupTestMetadata();
+ }
+
+ @Test
+ public void test() throws Exception {
+
+ ArrayBlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(1000);
+ ExecutorService executorService = Executors.newSingleThreadExecutor();
+ long randSeed = System.currentTimeMillis();
+
+ DoggedCubeBuilder doggedBuilder = new DoggedCubeBuilder(cube.getDescriptor(), dictionaryMap);
+ doggedBuilder.setConcurrentThreads(THREADS);
+ doggedBuilder.setSplitRowThreshold(SPLIT_ROWS);
+ FileRecordWriter doggedResult = new FileRecordWriter();
+
+ {
+ Future<?> future = executorService.submit(doggedBuilder.buildAsRunnable(queue, doggedResult));
+ ITInMemCubeBuilderTest.feedData(cube, flatTable, queue, INPUT_ROWS, randSeed);
+ future.get();
+ doggedResult.close();
+ }
+
+ InMemCubeBuilder inmemBuilder = new InMemCubeBuilder(cube.getDescriptor(), dictionaryMap);
+ inmemBuilder.setConcurrentThreads(THREADS);
+ FileRecordWriter inmemResult = new FileRecordWriter();
+
+ {
+ Future<?> future = executorService.submit(inmemBuilder.buildAsRunnable(queue, inmemResult));
+ ITInMemCubeBuilderTest.feedData(cube, flatTable, queue, INPUT_ROWS, randSeed);
+ future.get();
+ inmemResult.close();
+ }
+
+ fileCompare(doggedResult.file, inmemResult.file);
+ doggedResult.file.delete();
+ inmemResult.file.delete();
+ }
+
+ private void fileCompare(File file, File file2) throws IOException {
+ BufferedReader r1 = new BufferedReader(new InputStreamReader(new FileInputStream(file), "UTF-8"));
+ BufferedReader r2 = new BufferedReader(new InputStreamReader(new FileInputStream(file2), "UTF-8"));
+
+ String line1, line2;
+ do {
+ line1 = r1.readLine();
+ line2 = r2.readLine();
+
+ assertEquals(line1, line2);
+
+ } while (line1 != null || line2 != null);
+
+ r1.close();
+ r2.close();
+ }
+
+ class FileRecordWriter implements ICuboidWriter {
+
+ File file;
+ PrintWriter writer;
+
+ FileRecordWriter() throws IOException {
+ file = File.createTempFile("DoggedCubeBuilderTest_", ".data");
+ writer = new PrintWriter(file, "UTF-8");
+ }
+
+ @Override
+ public void write(long cuboidId, GTRecord record) throws IOException {
+ writer.print(cuboidId);
+ writer.print(", ");
+ writer.print(record.toString());
+ writer.println();
+ }
+
+ @Override
+ public void flush() {
+
+ }
+
+ @Override
+ public void close() {
+ writer.close();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java
new file mode 100644
index 0000000..ad02f2a
--- /dev/null
+++ b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java
@@ -0,0 +1,271 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ * /
+ */
+
+package org.apache.kylin.cube.inmemcubing;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
+import org.apache.kylin.dict.DictionaryGenerator;
+import org.apache.kylin.dict.IterableDictionaryValueEnumerator;
+import org.apache.kylin.dimension.Dictionary;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ */
+public class ITInMemCubeBuilderTest extends LocalFileMetadataTestCase {
+
+ private static final Logger logger = LoggerFactory.getLogger(ITInMemCubeBuilderTest.class);
+
+ private CubeInstance cube;
+ private String flatTable;
+ private Map<TblColRef, Dictionary<String>> dictionaryMap;
+
+ private int nInpRows;
+ private int nThreads;
+
+ @Before
+ public void before() throws IOException {
+ createTestMetadata();
+ }
+
+ @After
+ public void after() throws Exception {
+ cleanupTestMetadata();
+ }
+
+ @Test
+ public void testKylinCube() throws Exception {
+ testBuild("test_kylin_cube_without_slr_left_join_empty", //
+ LOCALMETA_TEST_DATA + "/data/flatten_data_for_without_slr_left_join.csv", 70000, 4);
+ }
+
+ @Test
+ public void testSSBCube() throws Exception {
+ testBuild("ssb", //
+ LOCALMETA_TEST_DATA + "/data/kylin_intermediate_ssb_19920101000000_19920201000000.csv", 1000, 1);
+ }
+
+ public void testBuild(String cubeName, String flatTable, int nInpRows, int nThreads) throws Exception {
+ KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+ CubeManager cubeManager = CubeManager.getInstance(kylinConfig);
+
+ this.nInpRows = nInpRows;
+ this.nThreads = nThreads;
+
+ this.cube = cubeManager.getCube(cubeName);
+ this.flatTable = flatTable;
+ this.dictionaryMap = getDictionaryMap(cube, flatTable);
+
+ testBuildInner();
+ }
+
+ private void testBuildInner() throws Exception {
+
+ InMemCubeBuilder cubeBuilder = new InMemCubeBuilder(cube.getDescriptor(), dictionaryMap);
+ //DoggedCubeBuilder cubeBuilder = new DoggedCubeBuilder(cube.getDescriptor(), dictionaryMap);
+ cubeBuilder.setConcurrentThreads(nThreads);
+
+ ArrayBlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(1000);
+ ExecutorService executorService = Executors.newSingleThreadExecutor();
+
+ try {
+ // round 1
+ {
+ Future<?> future = executorService.submit(cubeBuilder.buildAsRunnable(queue, new ConsoleGTRecordWriter()));
+ feedData(cube, flatTable, queue, nInpRows);
+ future.get();
+ }
+
+ // round 2, zero input
+ {
+ Future<?> future = executorService.submit(cubeBuilder.buildAsRunnable(queue, new ConsoleGTRecordWriter()));
+ feedData(cube, flatTable, queue, 0);
+ future.get();
+ }
+
+ // round 3
+ {
+ Future<?> future = executorService.submit(cubeBuilder.buildAsRunnable(queue, new ConsoleGTRecordWriter()));
+ feedData(cube, flatTable, queue, nInpRows);
+ future.get();
+ }
+
+ } catch (Exception e) {
+ logger.error("stream build failed", e);
+ throw new IOException("Failed to build cube ", e);
+ }
+ }
+
+ static void feedData(final CubeInstance cube, final String flatTable, ArrayBlockingQueue<List<String>> queue, int count) throws IOException, InterruptedException {
+ feedData(cube, flatTable, queue, count, 0);
+ }
+
+ static void feedData(final CubeInstance cube, final String flatTable, ArrayBlockingQueue<List<String>> queue, int count, long randSeed) throws IOException, InterruptedException {
+ CubeJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(cube.getDescriptor(), null);
+ int nColumns = flatTableDesc.getColumnList().size();
+
+ @SuppressWarnings("unchecked")
+ Set<String>[] distinctSets = new Set[nColumns];
+ for (int i = 0; i < nColumns; i++)
+ distinctSets[i] = new TreeSet<String>();
+
+ // get distinct values on each column
+ List<String> lines = FileUtils.readLines(new File(flatTable), "UTF-8");
+ for (String line : lines) {
+ String[] row = line.trim().split(",");
+ assert row.length == nColumns;
+ for (int i = 0; i < nColumns; i++)
+ distinctSets[i].add(row[i]);
+ }
+
+ List<String[]> distincts = new ArrayList<String[]>();
+ for (int i = 0; i < nColumns; i++) {
+ distincts.add((String[]) distinctSets[i].toArray(new String[distinctSets[i].size()]));
+ }
+
+ Random rand = new Random();
+ if (randSeed != 0)
+ rand.setSeed(randSeed);
+
+ // output with random data
+ for (; count > 0; count--) {
+ ArrayList<String> row = new ArrayList<String>(nColumns);
+ for (int i = 0; i < nColumns; i++) {
+ String[] candidates = distincts.get(i);
+ row.add(candidates[rand.nextInt(candidates.length)]);
+ }
+ queue.put(row);
+ }
+ queue.put(new ArrayList<String>(0));
+ }
+
+ static Map<TblColRef, Dictionary<String>> getDictionaryMap(CubeInstance cube, String flatTable) throws IOException {
+ Map<TblColRef, Dictionary<String>> result = Maps.newHashMap();
+ CubeDesc desc = cube.getDescriptor();
+ CubeJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(desc, null);
+ int nColumns = flatTableDesc.getColumnList().size();
+
+ List<TblColRef> columns = Cuboid.getBaseCuboid(desc).getColumns();
+ for (int c = 0; c < columns.size(); c++) {
+ TblColRef col = columns.get(c);
+ if (desc.getRowkey().isUseDictionary(col)) {
+ logger.info("Building dictionary for " + col);
+ List<byte[]> valueList = readValueList(flatTable, nColumns, flatTableDesc.getRowKeyColumnIndexes()[c]);
+ Dictionary<String> dict = DictionaryGenerator.buildDictionaryFromValueEnumerator(col.getType(), new IterableDictionaryValueEnumerator(valueList));
+ result.put(col, dict);
+ }
+ }
+
+ for (int measureIdx = 0; measureIdx < cube.getDescriptor().getMeasures().size(); measureIdx++) {
+ MeasureDesc measureDesc = cube.getDescriptor().getMeasures().get(measureIdx);
+ FunctionDesc func = measureDesc.getFunction();
+ List<TblColRef> dictCols = func.getMeasureType().getColumnsNeedDictionary(func);
+ if (dictCols.isEmpty())
+ continue;
+
+ int[] flatTableIdx = flatTableDesc.getMeasureColumnIndexes()[measureIdx];
+ List<TblColRef> paramCols = func.getParameter().getColRefs();
+ for (int i = 0; i < paramCols.size(); i++) {
+ TblColRef col = paramCols.get(i);
+ if (dictCols.contains(col)) {
+ int colIdxOnFlat = flatTableIdx[i];
+ logger.info("Building dictionary for " + col);
+ List<byte[]> valueList = readValueList(flatTable, nColumns, colIdxOnFlat);
+ Dictionary<String> dict = DictionaryGenerator.buildDictionaryFromValueEnumerator(col.getType(), new IterableDictionaryValueEnumerator(valueList));
+
+ result.put(col, dict);
+ }
+ }
+ }
+
+ return result;
+ }
+
+ private static List<byte[]> readValueList(String flatTable, int nColumns, int c) throws IOException {
+ List<byte[]> result = Lists.newArrayList();
+ List<String> lines = FileUtils.readLines(new File(flatTable), "UTF-8");
+ for (String line : lines) {
+ String[] row = line.trim().split(",");
+ if (row.length != nColumns) {
+ throw new IllegalStateException();
+ }
+ if (row[c] != null) {
+ result.add(Bytes.toBytes(row[c]));
+ }
+ }
+ return result;
+ }
+
+ class ConsoleGTRecordWriter implements ICuboidWriter {
+
+ boolean verbose = false;
+
+ @Override
+ public void write(long cuboidId, GTRecord record) throws IOException {
+ if (verbose)
+ System.out.println(record.toString());
+ }
+
+ @Override
+ public void flush() {
+ if (verbose) {
+ System.out.println("flush");
+ }
+ }
+
+ @Override
+ public void close() {
+ if (verbose) {
+ System.out.println("close");
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/kylin-it/src/test/java/org/apache/kylin/job/impl/threadpool/ITDefaultSchedulerTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/job/impl/threadpool/ITDefaultSchedulerTest.java b/kylin-it/src/test/java/org/apache/kylin/job/impl/threadpool/ITDefaultSchedulerTest.java
new file mode 100644
index 0000000..ad1ddd3
--- /dev/null
+++ b/kylin-it/src/test/java/org/apache/kylin/job/impl/threadpool/ITDefaultSchedulerTest.java
@@ -0,0 +1,154 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ * /
+ */
+
+package org.apache.kylin.job.impl.threadpool;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.kylin.job.BaseTestExecutable;
+import org.apache.kylin.job.ErrorTestExecutable;
+import org.apache.kylin.job.FailedTestExecutable;
+import org.apache.kylin.job.SelfStopExecutable;
+import org.apache.kylin.job.SucceedTestExecutable;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.job.execution.ExecutableState;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ */
+public class ITDefaultSchedulerTest extends BaseSchedulerTest {
+
+ @Test
+ public void testSingleTaskJob() throws Exception {
+ DefaultChainedExecutable job = new DefaultChainedExecutable();
+ BaseTestExecutable task1 = new SucceedTestExecutable();
+ job.addTask(task1);
+ jobService.addJob(job);
+ waitForJobFinish(job.getId());
+ Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(job.getId()).getState());
+ Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState());
+ }
+
+ @Test
+ public void testSucceed() throws Exception {
+ DefaultChainedExecutable job = new DefaultChainedExecutable();
+ BaseTestExecutable task1 = new SucceedTestExecutable();
+ BaseTestExecutable task2 = new SucceedTestExecutable();
+ job.addTask(task1);
+ job.addTask(task2);
+ jobService.addJob(job);
+ waitForJobFinish(job.getId());
+ Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(job.getId()).getState());
+ Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState());
+ Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task2.getId()).getState());
+ }
+
+ @Test
+ public void testSucceedAndFailed() throws Exception {
+ DefaultChainedExecutable job = new DefaultChainedExecutable();
+ BaseTestExecutable task1 = new SucceedTestExecutable();
+ BaseTestExecutable task2 = new FailedTestExecutable();
+ job.addTask(task1);
+ job.addTask(task2);
+ jobService.addJob(job);
+ waitForJobFinish(job.getId());
+ Assert.assertEquals(ExecutableState.ERROR, jobService.getOutput(job.getId()).getState());
+ Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState());
+ Assert.assertEquals(ExecutableState.ERROR, jobService.getOutput(task2.getId()).getState());
+ }
+
+ @Test
+ public void testSucceedAndError() throws Exception {
+ DefaultChainedExecutable job = new DefaultChainedExecutable();
+ BaseTestExecutable task1 = new ErrorTestExecutable();
+ BaseTestExecutable task2 = new SucceedTestExecutable();
+ job.addTask(task1);
+ job.addTask(task2);
+ jobService.addJob(job);
+ waitForJobFinish(job.getId());
+ Assert.assertEquals(ExecutableState.ERROR, jobService.getOutput(job.getId()).getState());
+ Assert.assertEquals(ExecutableState.ERROR, jobService.getOutput(task1.getId()).getState());
+ Assert.assertEquals(ExecutableState.READY, jobService.getOutput(task2.getId()).getState());
+ }
+
+ @Test
+ public void testDiscard() throws Exception {
+ DefaultChainedExecutable job = new DefaultChainedExecutable();
+ BaseTestExecutable task1 = new SelfStopExecutable();
+ job.addTask(task1);
+ jobService.addJob(job);
+ waitForJobStatus(job.getId(), ExecutableState.RUNNING, 500);
+ jobService.discardJob(job.getId());
+ waitForJobFinish(job.getId());
+ Assert.assertEquals(ExecutableState.DISCARDED, jobService.getOutput(job.getId()).getState());
+ Assert.assertEquals(ExecutableState.DISCARDED, jobService.getOutput(task1.getId()).getState());
+ Thread.sleep(5000);
+ System.out.println(job);
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Test
+ public void testSchedulerPool() throws InterruptedException {
+ ScheduledExecutorService fetchPool = Executors.newScheduledThreadPool(1);
+ final CountDownLatch countDownLatch = new CountDownLatch(3);
+ ScheduledFuture future = fetchPool.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ countDownLatch.countDown();
+ }
+ }, 5, 5, TimeUnit.SECONDS);
+ assertTrue("countDownLatch should reach zero in 15 secs", countDownLatch.await(20, TimeUnit.SECONDS));
+ assertTrue("future should still running", future.cancel(true));
+
+ final CountDownLatch countDownLatch2 = new CountDownLatch(3);
+ ScheduledFuture future2 = fetchPool.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ countDownLatch2.countDown();
+ throw new RuntimeException();
+ }
+ }, 5, 5, TimeUnit.SECONDS);
+ assertFalse("countDownLatch2 should NOT reach zero in 15 secs", countDownLatch2.await(20, TimeUnit.SECONDS));
+ assertFalse("future2 should has been stopped", future2.cancel(true));
+
+ final CountDownLatch countDownLatch3 = new CountDownLatch(3);
+ ScheduledFuture future3 = fetchPool.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ countDownLatch3.countDown();
+ throw new RuntimeException();
+ } catch (Exception e) {
+ }
+ }
+ }, 5, 5, TimeUnit.SECONDS);
+ assertTrue("countDownLatch3 should reach zero in 15 secs", countDownLatch3.await(20, TimeUnit.SECONDS));
+ assertTrue("future3 should still running", future3.cancel(true));
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
index c945485..e1cbe1f 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
@@ -99,12 +99,12 @@ public class BuildCubeWithEngine {
logger.info("Will not use fast build mode");
}
- System.setProperty(KylinConfig.KYLIN_CONF, "../examples/test_case_data/sandbox");
+ System.setProperty(KylinConfig.KYLIN_CONF,HBaseMetadataTestCase.SANDBOX_TEST_DATA);
if (StringUtils.isEmpty(System.getProperty("hdp.version"))) {
throw new RuntimeException("No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.2.4.2-2");
}
- HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA);
+ HBaseMetadataTestCase.staticCreateTestMetadata(HBaseMetadataTestCase.SANDBOX_TEST_DATA);
try {
//check hdfs permission
http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithSpark.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithSpark.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithSpark.java
index 5ab5e83..d862dbf 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithSpark.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithSpark.java
@@ -80,7 +80,7 @@ public class BuildCubeWithSpark {
public static void beforeClass() throws Exception {
logger.info("Adding to classpath: " + new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
- System.setProperty(KylinConfig.KYLIN_CONF, "../examples/test_case_data/sandbox");
+ System.setProperty(KylinConfig.KYLIN_CONF, HBaseMetadataTestCase.SANDBOX_TEST_DATA);
if (StringUtils.isEmpty(System.getProperty("hdp.version"))) {
throw new RuntimeException("No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.2.4.2-2");
}
@@ -89,7 +89,7 @@ public class BuildCubeWithSpark {
@Before
public void before() throws Exception {
- HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA);
+ HBaseMetadataTestCase.staticCreateTestMetadata(HBaseMetadataTestCase.SANDBOX_TEST_DATA);
DeployUtil.initCliWorkDir();
DeployUtil.deployMetadata();
@@ -117,7 +117,7 @@ public class BuildCubeWithSpark {
@Test
public void test() throws Exception {
final CubeSegment segment = createSegment();
- String confPath = new File(AbstractKylinTestCase.SANDBOX_TEST_DATA).getAbsolutePath();
+ String confPath = new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath();
KylinConfig.getInstanceFromEnv().getCoprocessorLocalJar();
String coprocessor = KylinConfig.getInstanceFromEnv().getCoprocessorLocalJar();
logger.info("confPath location:" + confPath);
http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
index eeff999..99da26f 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
@@ -67,11 +67,11 @@ public class BuildCubeWithStream {
public static void beforeClass() throws Exception {
logger.info("Adding to classpath: " + new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
- System.setProperty(KylinConfig.KYLIN_CONF, "../examples/test_case_data/sandbox");
+ System.setProperty(KylinConfig.KYLIN_CONF,HBaseMetadataTestCase.SANDBOX_TEST_DATA);
if (StringUtils.isEmpty(System.getProperty("hdp.version"))) {
throw new RuntimeException("No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.2.4.2-2");
}
- HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA);
+ HBaseMetadataTestCase.staticCreateTestMetadata(HBaseMetadataTestCase.SANDBOX_TEST_DATA);
}
public void before() throws Exception {
http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithEngine.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithEngine.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithEngine.java
index 4b8ce24..643b122 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithEngine.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithEngine.java
@@ -92,7 +92,7 @@ public class BuildIIWithEngine {
public static void beforeClass() throws Exception {
logger.info("Adding to classpath: " + new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
- System.setProperty(KylinConfig.KYLIN_CONF, "../examples/test_case_data/sandbox");
+ System.setProperty(KylinConfig.KYLIN_CONF,HBaseMetadataTestCase.SANDBOX_TEST_DATA);
if (StringUtils.isEmpty(System.getProperty("hdp.version"))) {
throw new RuntimeException("No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.2.4.2-2");
}
@@ -100,7 +100,7 @@ public class BuildIIWithEngine {
@Before
public void before() throws Exception {
- HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA);
+ HBaseMetadataTestCase.staticCreateTestMetadata(HBaseMetadataTestCase.SANDBOX_TEST_DATA);
//DeployUtil.initCliWorkDir();
// DeployUtil.deployMetadata();
http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithStream.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithStream.java
index 9b7cd14..ace1a2f 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithStream.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithStream.java
@@ -109,7 +109,7 @@ public class BuildIIWithStream {
if (StringUtils.isEmpty(System.getProperty("hdp.version"))) {
throw new RuntimeException("No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.2.4.2-2");
}
- HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA);
+ HBaseMetadataTestCase.staticCreateTestMetadata(HBaseMetadataTestCase.SANDBOX_TEST_DATA);
}
public void before() throws Exception {
http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java b/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
index ec3e60f..46aa68d 100644
--- a/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
+++ b/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
@@ -19,11 +19,9 @@
package org.apache.kylin.query;
import java.io.BufferedReader;
-import java.io.BufferedWriter;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileReader;
-import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
@@ -38,22 +36,18 @@ import java.sql.Types;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashSet;
-import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import java.util.logging.LogManager;
-import com.google.common.base.Throwables;
import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.exception.ExceptionContext;
import org.apache.kylin.common.KylinConfig;
import org.dbunit.Assertion;
import org.dbunit.database.DatabaseConfig;
import org.dbunit.database.DatabaseConnection;
import org.dbunit.database.IDatabaseConnection;
import org.dbunit.dataset.DataSetException;
-import org.dbunit.dataset.DefaultTable;
import org.dbunit.dataset.ITable;
import org.dbunit.dataset.SortedTable;
import org.dbunit.dataset.datatype.DataType;
@@ -63,8 +57,6 @@ import org.dbunit.ext.h2.H2DataTypeFactory;
import org.junit.Assert;
import com.google.common.io.Files;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
*/
@@ -287,7 +279,6 @@ public class KylinTestBase {
return ret;
}
-
protected void execQueryUsingH2(String queryFolder, boolean needSort) throws Exception {
printInfo("---------- Running H2 queries: " + queryFolder);
@@ -349,7 +340,6 @@ public class KylinTestBase {
h2Conn.getConfig().setProperty(DatabaseConfig.PROPERTY_DATATYPE_FACTORY, new TestH2DataTypeFactory());
ITable h2Table = executeQuery(h2Conn, queryName, sql, needSort);
-
try {
// compare the result
Assert.assertEquals(h2Table.getRowCount(), kylinTable.getRowCount());
@@ -427,7 +417,6 @@ public class KylinTestBase {
h2Conn.getConfig().setProperty(DatabaseConfig.PROPERTY_DATATYPE_FACTORY, new TestH2DataTypeFactory());
ITable h2Table = executeDynamicQuery(h2Conn, queryName, sql, parameters, needSort);
-
// compare the result
Assertion.assertEquals(h2Table, kylinTable);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java b/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java
index 15e435e..136342d 100644
--- a/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java
@@ -38,7 +38,7 @@ import org.apache.kylin.metadata.tuple.ITupleIterator;
import org.apache.kylin.storage.IStorageQuery;
import org.apache.kylin.storage.StorageContext;
import org.apache.kylin.storage.StorageFactory;
-import org.apache.kylin.storage.cache.StorageMockUtils;
+import org.apache.kylin.storage.StorageMockUtils;
import org.apache.kylin.storage.exception.ScanOutOfLimitException;
import org.junit.After;
import org.junit.AfterClass;
http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/source-hive/pom.xml
----------------------------------------------------------------------
diff --git a/source-hive/pom.xml b/source-hive/pom.xml
index eb36eed..796b9c1 100644
--- a/source-hive/pom.xml
+++ b/source-hive/pom.xml
@@ -44,6 +44,13 @@
<!-- Env & Test -->
<dependency>
+ <groupId>org.apache.kylin</groupId>
+ <artifactId>kylin-core-common</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>provided</scope>
http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/source-hive/src/test/java/org/apache/kylin/source/hive/HiveCmdBuilderTest.java
----------------------------------------------------------------------
diff --git a/source-hive/src/test/java/org/apache/kylin/source/hive/HiveCmdBuilderTest.java b/source-hive/src/test/java/org/apache/kylin/source/hive/HiveCmdBuilderTest.java
index 70c11b3..83c50c0 100644
--- a/source-hive/src/test/java/org/apache/kylin/source/hive/HiveCmdBuilderTest.java
+++ b/source-hive/src/test/java/org/apache/kylin/source/hive/HiveCmdBuilderTest.java
@@ -24,18 +24,16 @@ import java.io.File;
import java.io.IOException;
import org.apache.commons.io.FileUtils;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-/**
- * Created by dongli on 2/22/16.
- */
public class HiveCmdBuilderTest {
@Before
public void setup() {
- System.setProperty("KYLIN_CONF", "../examples/test_case_data/localmeta");
+ System.setProperty("KYLIN_CONF", LocalFileMetadataTestCase.LOCALMETA_TEST_DATA);
}
@After
http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index 6bbb0b7..1d3da28 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -19,6 +19,7 @@
package org.apache.kylin.storage.hbase.cube.v2;
import java.io.IOException;
+import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.List;
@@ -56,6 +57,7 @@ import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.engine.mr.HadoopUtil;
import org.apache.kylin.gridtable.GTInfo;
import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.GTScanRange;
import org.apache.kylin.gridtable.GTScanRequest;
import org.apache.kylin.gridtable.IGTScanner;
import org.apache.kylin.storage.hbase.HBaseConnection;
@@ -250,20 +252,21 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
}
@Override
- public IGTScanner getGTScanner(final List<GTScanRequest> scanRequests) throws IOException {
+ public IGTScanner getGTScanner(final GTScanRequest scanRequest) throws IOException {
final String toggle = BackdoorToggles.getCoprocessorBehavior() == null ? CoprocessorBehavior.SCAN_FILTER_AGGR_CHECKMEM.toString() : BackdoorToggles.getCoprocessorBehavior();
+
logger.debug("New scanner for current segment {} will use {} as endpoint's behavior", cubeSeg, toggle);
short cuboidBaseShard = cubeSeg.getCuboidBaseShard(this.cuboid.getId());
short shardNum = cubeSeg.getCuboidShardNum(this.cuboid.getId());
int totalShards = cubeSeg.getTotalShards();
- final List<ByteString> scanRequestByteStrings = Lists.newArrayList();
- final List<ByteString> rawScanByteStrings = Lists.newArrayList();
+ ByteString scanRequestByteString = null;
+ ByteString rawScanByteString = null;
// primary key (also the 0th column block) is always selected
- final ImmutableBitSet selectedColBlocks = scanRequests.get(0).getSelectedColBlocks().set(0);
+ final ImmutableBitSet selectedColBlocks = scanRequest.getSelectedColBlocks().set(0);
// globally shared connection, does not require close
final HConnection conn = HBaseConnection.get(cubeSeg.getCubeInstance().getConfig().getStorageUrl());
@@ -274,65 +277,89 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
hbaseColumnsToGTIntList.add(IntList.newBuilder().addAllInts(list).build());
}
- for (GTScanRequest req : scanRequests) {
- ByteBuffer buffer = ByteBuffer.allocate(BytesSerializer.SERIALIZE_BUFFER_SIZE);
- GTScanRequest.serializer.serialize(req, buffer);
- buffer.flip();
- scanRequestByteStrings.add(HBaseZeroCopyByteString.wrap(buffer.array(), buffer.position(), buffer.limit()));
+ //TODO: raw scan can be constructed at region side to reduce traffic
+ List<RawScan> rawScans = preparedHBaseScans(scanRequest.getGTScanRanges(), selectedColBlocks);
+ int rawScanBufferSize = BytesSerializer.SERIALIZE_BUFFER_SIZE;
+ while (true) {
+ try {
+ ByteBuffer rawScanBuffer = ByteBuffer.allocate(rawScanBufferSize);
+ BytesUtil.writeVInt(rawScans.size(), rawScanBuffer);
+ for (RawScan rs : rawScans) {
+ RawScan.serializer.serialize(rs, rawScanBuffer);
+ }
+ rawScanBuffer.flip();
+ rawScanByteString = HBaseZeroCopyByteString.wrap(rawScanBuffer.array(), rawScanBuffer.position(), rawScanBuffer.limit());
+ break;
+ } catch (BufferOverflowException boe) {
+ logger.info("Buffer size {} cannot hold the raw scans, resizing to 4 times", rawScanBufferSize);
+ rawScanBufferSize *= 4;
+ }
+ }
+ scanRequest.setGTScanRanges(Lists.<GTScanRange> newArrayList());//since raw scans are sent to coprocessor, we don't need to duplicate sending it
- RawScan rawScan = preparedHBaseScan(req.getPkStart(), req.getPkEnd(), req.getFuzzyKeys(), selectedColBlocks);
+ int scanRequestBufferSize = BytesSerializer.SERIALIZE_BUFFER_SIZE;
+ while (true) {
+ try {
+ ByteBuffer buffer = ByteBuffer.allocate(scanRequestBufferSize);
+ GTScanRequest.serializer.serialize(scanRequest, buffer);
+ buffer.flip();
+ scanRequestByteString = HBaseZeroCopyByteString.wrap(buffer.array(), buffer.position(), buffer.limit());
+ break;
+ } catch (BufferOverflowException boe) {
+ logger.info("Buffer size {} cannot hold the scan request, resizing to 4 times", scanRequestBufferSize);
+ scanRequestBufferSize *= 4;
+ }
+ }
- ByteBuffer rawScanBuffer = ByteBuffer.allocate(BytesSerializer.SERIALIZE_BUFFER_SIZE);
- RawScan.serializer.serialize(rawScan, rawScanBuffer);
- rawScanBuffer.flip();
- rawScanByteStrings.add(HBaseZeroCopyByteString.wrap(rawScanBuffer.array(), rawScanBuffer.position(), rawScanBuffer.limit()));
+ logger.debug("Serialized scanRequestBytes {} bytes, rawScanBytesString {} bytes", scanRequestByteString.size(), rawScanByteString.size());
- logger.debug("Serialized scanRequestBytes {} bytes, rawScanBytesString {} bytes", buffer.limit() - buffer.position(), rawScanBuffer.limit() - rawScanBuffer.position());
- logger.info("The scan {} for segment {} is as below, shard part of start/end key is set to 0", Integer.toHexString(System.identityHashCode(req)), cubeSeg);
- logScan(rawScan, cubeSeg.getStorageLocationIdentifier());
+ logger.info("The scan {} for segment {} is as below, shard part of start/end key is set to 0", Integer.toHexString(System.identityHashCode(scanRequest)), cubeSeg);
+ for (RawScan rs : rawScans) {
+ logScan(rs, cubeSeg.getStorageLocationIdentifier());
}
- logger.debug("Submitting rpc to {} shards starting from shard {}, scan requests count {}", new Object[] { shardNum, cuboidBaseShard, scanRequests.size() });
+ logger.debug("Submitting rpc to {} shards starting from shard {}, scan range count {}", new Object[] { shardNum, cuboidBaseShard, rawScans.size() });
final AtomicInteger totalScannedCount = new AtomicInteger(0);
- final ExpectedSizeIterator epResultItr = new ExpectedSizeIterator(scanRequests.size() * shardNum);
+ final ExpectedSizeIterator epResultItr = new ExpectedSizeIterator(shardNum);
+ final String currentThreadName = Thread.currentThread().getName();
for (final Pair<byte[], byte[]> epRange : getEPKeyRanges(cuboidBaseShard, shardNum, totalShards)) {
+ final ByteString finalScanRequestByteString = scanRequestByteString;
+ final ByteString finalRawScanByteString = rawScanByteString;
executorService.submit(new Runnable() {
@Override
public void run() {
- for (int i = 0; i < scanRequests.size(); ++i) {
- CubeVisitProtos.CubeVisitRequest.Builder builder = CubeVisitProtos.CubeVisitRequest.newBuilder();
- builder.setGtScanRequest(scanRequestByteStrings.get(i)).setHbaseRawScan(rawScanByteStrings.get(i));
- for (IntList intList : hbaseColumnsToGTIntList) {
- builder.addHbaseColumnsToGT(intList);
- }
- builder.setRowkeyPreambleSize(cubeSeg.getRowKeyPreambleSize());
- builder.setBehavior(toggle);
+ CubeVisitProtos.CubeVisitRequest.Builder builder = CubeVisitProtos.CubeVisitRequest.newBuilder();
+ builder.setGtScanRequest(finalScanRequestByteString).setHbaseRawScan(finalRawScanByteString);
+ for (IntList intList : hbaseColumnsToGTIntList) {
+ builder.addHbaseColumnsToGT(intList);
+ }
+ builder.setRowkeyPreambleSize(cubeSeg.getRowKeyPreambleSize());
+ builder.setBehavior(toggle);
+
+ Map<byte[], CubeVisitProtos.CubeVisitResponse> results;
+ try {
+ results = getResults(builder.build(), conn.getTable(cubeSeg.getStorageLocationIdentifier()), epRange.getFirst(), epRange.getSecond());
+ } catch (Throwable throwable) {
+ throw new RuntimeException("<sub-thread for GTScanRequest " + Integer.toHexString(System.identityHashCode(scanRequest)) + "> " + "Error when visiting cubes by endpoint", throwable);
+ }
- Map<byte[], CubeVisitProtos.CubeVisitResponse> results;
+ for (Map.Entry<byte[], CubeVisitProtos.CubeVisitResponse> result : results.entrySet()) {
+ totalScannedCount.addAndGet(result.getValue().getStats().getScannedRowCount());
+ logger.info("<sub-thread for GTScanRequest " + Integer.toHexString(System.identityHashCode(scanRequest)) + "> " + getStatsString(result));
try {
- results = getResults(builder.build(), conn.getTable(cubeSeg.getStorageLocationIdentifier()), epRange.getFirst(), epRange.getSecond());
- } catch (Throwable throwable) {
- throw new RuntimeException("<sub-thread for GTScanRequest " + Integer.toHexString(System.identityHashCode(scanRequests.get(i))) + "> " + "Error when visiting cubes by endpoint", throwable);
- }
-
- for (Map.Entry<byte[], CubeVisitProtos.CubeVisitResponse> result : results.entrySet()) {
- totalScannedCount.addAndGet(result.getValue().getStats().getScannedRowCount());
- logger.info("<sub-thread for GTScanRequest " + Integer.toHexString(System.identityHashCode(scanRequests.get(i))) + "> " + getStatsString(result));
- try {
- epResultItr.append(CompressionUtils.decompress(HBaseZeroCopyByteString.zeroCopyGetBytes(result.getValue().getCompressedRows())));
- } catch (IOException | DataFormatException e) {
- throw new RuntimeException("Error when decompressing", e);
- }
+ epResultItr.append(CompressionUtils.decompress(HBaseZeroCopyByteString.zeroCopyGetBytes(result.getValue().getCompressedRows())));
+ } catch (IOException | DataFormatException e) {
+ throw new RuntimeException("Error when decompressing", e);
}
}
}
});
}
- return new EndpointResultsAsGTScanner(fullGTInfo, epResultItr, scanRequests.get(0).getColumns(), totalScannedCount.get());
+ return new EndpointResultsAsGTScanner(fullGTInfo, epResultItr, scanRequest.getColumns(), totalScannedCount.get());
}
private String getStatsString(Map.Entry<byte[], CubeVisitProtos.CubeVisitResponse> result) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
index 8563a5e..49e8593 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
@@ -18,7 +18,6 @@
package org.apache.kylin.storage.hbase.cube.v2;
-import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -43,16 +42,15 @@ import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
import org.apache.kylin.cube.model.HBaseMappingDesc;
import org.apache.kylin.gridtable.GTInfo;
import org.apache.kylin.gridtable.GTRecord;
-import org.apache.kylin.gridtable.GTScanRequest;
-import org.apache.kylin.gridtable.IGTScanner;
+import org.apache.kylin.gridtable.GTScanRange;
+import org.apache.kylin.gridtable.IGTStorage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
-public abstract class CubeHBaseRPC {
+public abstract class CubeHBaseRPC implements IGTStorage {
public static final Logger logger = LoggerFactory.getLogger(CubeHBaseRPC.class);
@@ -71,8 +69,6 @@ public abstract class CubeHBaseRPC {
this.fuzzyMaskEncoder = new FuzzyMaskEncoder(cubeSeg, cuboid);
}
- abstract IGTScanner getGTScanner(final List<GTScanRequest> scanRequests) throws IOException;
-
public static Scan buildScan(RawScan rawScan) {
Scan scan = new Scan();
scan.setCaching(rawScan.hbaseCaching);
@@ -96,7 +92,7 @@ public abstract class CubeHBaseRPC {
return scan;
}
- protected RawScan preparedHBaseScan(GTRecord pkStart, GTRecord pkEnd, List<GTRecord> fuzzyKeys, ImmutableBitSet selectedColBlocks) {
+ private RawScan preparedHBaseScan(GTRecord pkStart, GTRecord pkEnd, List<GTRecord> fuzzyKeys, ImmutableBitSet selectedColBlocks) {
final List<Pair<byte[], byte[]>> selectedColumns = makeHBaseColumns(selectedColBlocks);
LazyRowKeyEncoder encoder = new LazyRowKeyEncoder(cubeSeg, cuboid);
@@ -123,46 +119,12 @@ public abstract class CubeHBaseRPC {
return new RawScan(start, end, selectedColumns, hbaseFuzzyKeys, hbaseCaching, hbaseMaxResultSize);
}
- protected List<RawScan> preparedHBaseScans(GTRecord pkStart, GTRecord pkEnd, List<GTRecord> fuzzyKeys, ImmutableBitSet selectedColBlocks) {
- final List<Pair<byte[], byte[]>> selectedColumns = makeHBaseColumns(selectedColBlocks);
- List<RawScan> ret = Lists.newArrayList();
-
- LazyRowKeyEncoder encoder = new LazyRowKeyEncoder(cubeSeg, cuboid);
- byte[] start = encoder.createBuf();
- byte[] end = encoder.createBuf();
- List<byte[]> startKeys;
- List<byte[]> endKeys;
-
- encoder.setBlankByte(RowConstants.ROWKEY_LOWER_BYTE);
- encoder.encode(pkStart, pkStart.getInfo().getPrimaryKey(), start);
- startKeys = encoder.getRowKeysDifferentShards(start);
-
- encoder.setBlankByte(RowConstants.ROWKEY_UPPER_BYTE);
- encoder.encode(pkEnd, pkEnd.getInfo().getPrimaryKey(), end);
- endKeys = encoder.getRowKeysDifferentShards(end);
- endKeys = Lists.transform(endKeys, new Function<byte[], byte[]>() {
- @Override
- public byte[] apply(byte[] input) {
- byte[] shardEnd = new byte[input.length + 1];//append extra 0 to the end key to make it inclusive while scanning
- System.arraycopy(input, 0, shardEnd, 0, input.length);
- return shardEnd;
- }
- });
-
- Preconditions.checkState(startKeys.size() == endKeys.size());
- List<Pair<byte[], byte[]>> hbaseFuzzyKeys = translateFuzzyKeys(fuzzyKeys);
-
- KylinConfig config = cubeSeg.getCubeDesc().getConfig();
- int hbaseCaching = config.getHBaseScanCacheRows();
- int hbaseMaxResultSize = config.getHBaseScanMaxResultSize();
- if (isMemoryHungry(selectedColBlocks))
- hbaseCaching /= 10;
-
- for (short i = 0; i < startKeys.size(); ++i) {
- ret.add(new RawScan(startKeys.get(i), endKeys.get(i), selectedColumns, hbaseFuzzyKeys, hbaseCaching, hbaseMaxResultSize));
+ protected List<RawScan> preparedHBaseScans(List<GTScanRange> ranges, ImmutableBitSet selectedColBlocks) {
+ List<RawScan> allRawScans = Lists.newArrayList();
+ for (GTScanRange range : ranges) {
+ allRawScans.add(preparedHBaseScan(range.pkStart, range.pkEnd, range.fuzzyKeys, selectedColBlocks));
}
- return ret;
-
+ return allRawScans;
}
private boolean isMemoryHungry(ImmutableBitSet selectedColBlocks) {