You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2015/07/02 13:04:54 UTC
incubator-kylin git commit: KYLIN-867 initial commit,
backport Hybrid model to 0.7
Repository: incubator-kylin
Updated Branches:
refs/heads/KYLIN-867 [created] c87aa4a3b
KYLIN-867 initial commit, backport Hybrid model to 0.7
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/c87aa4a3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/c87aa4a3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/c87aa4a3
Branch: refs/heads/KYLIN-867
Commit: c87aa4a3b80e342dee39ad0827e2547a1f475891
Parents: e7fcc20
Author: shaofengshi <sh...@apache.org>
Authored: Thu Jul 2 19:04:25 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Thu Jul 2 19:04:25 2015 +0800
----------------------------------------------------------------------
.../org/apache/kylin/common/lock/JobLock.java | 9 +
.../apache/kylin/common/lock/MockJobLock.java | 15 ++
.../kylin/common/lock/ZookeeperJobLock.java | 83 ++++++++
.../kylin/common/persistence/ResourceStore.java | 1 +
.../kylin/common/restclient/Broadcaster.java | 2 +-
.../org/apache/kylin/cube/CubeInstance.java | 39 +++-
.../apache/kylin/cube/cuboid/CuboidTest.java | 1 -
.../hybrid/test_kylin_hybrid_inner_join.json | 14 ++
.../hybrid/test_kylin_hybrid_left_join.json | 14 ++
.../invertedindex_desc/test_kylin_ii_desc.json | 7 +-
.../localmeta/project/default.json | 6 -
.../localmeta/project/onlyinner.json | 4 -
.../localmeta/project/onlyleft.json | 4 -
.../apache/kylin/invertedindex/IIInstance.java | 36 ++++
.../kylin/invertedindex/index/SliceBuilder.java | 3 -
.../kylin/invertedindex/model/IIDesc.java | 27 ++-
.../kylin/invertedindex/model/IIDimension.java | 9 +-
.../invertedindex/IIDescManagerTest.java | 2 +-
.../invertedindex/IIInstanceTest.java | 4 +-
.../invertedindex/InvertedIndexLocalTest.java | 6 +-
.../java/org/apache/kylin/job/Scheduler.java | 3 +-
.../job/impl/threadpool/DefaultScheduler.java | 49 +----
.../kylin/job/BuildCubeWithEngineTest.java | 5 +-
.../apache/kylin/job/BuildIIWithEngineTest.java | 5 +-
.../job/impl/threadpool/BaseSchedulerTest.java | 3 +-
.../apache/kylin/metadata/model/TblColRef.java | 4 +
.../metadata/realization/IRealization.java | 8 +
.../realization/RealizationRegistry.java | 1 +
.../metadata/realization/RealizationType.java | 2 +-
.../metadata/realization/SQLDigestUtil.java | 114 +++++++++++
.../metadata/tuple/CompoundTupleIterator.java | 45 +++++
.../kylin/metadata/tuple/ITupleIterator.java | 8 +-
.../apache/kylin/query/routing/RoutingRule.java | 4 +-
.../AdjustForWeeklyMatchedRealization.java | 25 ++-
.../RoutingRules/RealizationPriorityRule.java | 5 +-
.../RoutingRules/RealizationSortRule.java | 66 +++++++
.../apache/kylin/query/test/IIQueryTest.java | 3 +
.../kylin/rest/controller/JobController.java | 3 +-
.../kylin/storage/StorageEngineFactory.java | 6 +-
.../storage/hbase/CubeSegmentTupleIterator.java | 5 +
.../hbase/SerializedHBaseTupleIterator.java | 5 +
.../endpoint/EndpointTupleIterator.java | 10 +
.../kylin/storage/hybrid/HybridInstance.java | 188 +++++++++++++++++++
.../kylin/storage/hybrid/HybridManager.java | 127 +++++++++++++
.../storage/hybrid/HybridStorageEngine.java | 69 +++++++
.../endpoint/EndpointAggregationTest.java | 1 +
.../endpoint/TableRecordInfoTest.java | 1 +
.../kylin/storage/hybrid/HybridManagerTest.java | 45 +++++
48 files changed, 979 insertions(+), 117 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/common/src/main/java/org/apache/kylin/common/lock/JobLock.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/lock/JobLock.java b/common/src/main/java/org/apache/kylin/common/lock/JobLock.java
new file mode 100644
index 0000000..7fdb64c
--- /dev/null
+++ b/common/src/main/java/org/apache/kylin/common/lock/JobLock.java
@@ -0,0 +1,9 @@
+package org.apache.kylin.common.lock;
+
+/**
+ */
+public interface JobLock {
+ boolean lock();
+
+ void unlock();
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/common/src/main/java/org/apache/kylin/common/lock/MockJobLock.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/lock/MockJobLock.java b/common/src/main/java/org/apache/kylin/common/lock/MockJobLock.java
new file mode 100644
index 0000000..230d4d8
--- /dev/null
+++ b/common/src/main/java/org/apache/kylin/common/lock/MockJobLock.java
@@ -0,0 +1,15 @@
+package org.apache.kylin.common.lock;
+
+/**
+ */
+public class MockJobLock implements JobLock {
+ @Override
+ public boolean lock() {
+ return true;
+ }
+
+ @Override
+ public void unlock() {
+ return;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/common/src/main/java/org/apache/kylin/common/lock/ZookeeperJobLock.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/lock/ZookeeperJobLock.java b/common/src/main/java/org/apache/kylin/common/lock/ZookeeperJobLock.java
new file mode 100644
index 0000000..603894c
--- /dev/null
+++ b/common/src/main/java/org/apache/kylin/common/lock/ZookeeperJobLock.java
@@ -0,0 +1,83 @@
+package org.apache.kylin.common.lock;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.imps.CuratorFrameworkState;
+import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ */
+public class ZookeeperJobLock implements JobLock {
+ private Logger logger = LoggerFactory.getLogger(ZookeeperJobLock.class);
+
+ private static final String ZOOKEEPER_LOCK_PATH = "/kylin/job_engine/lock";
+
+ private String scheduleID;
+ private InterProcessMutex sharedLock;
+ private CuratorFramework zkClient;
+
+ @Override
+ public boolean lock() {
+ this.scheduleID = schedulerId();
+ String ZKConnectString = getZKConnectString();
+ if (StringUtils.isEmpty(ZKConnectString)) {
+ throw new IllegalArgumentException("ZOOKEEPER_QUORUM is empty!");
+ }
+
+ RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
+ this.zkClient = CuratorFrameworkFactory.newClient(ZKConnectString, retryPolicy);
+ this.zkClient.start();
+ this.sharedLock = new InterProcessMutex(zkClient, this.scheduleID);
+ boolean hasLock = false;
+ try {
+ hasLock = sharedLock.acquire(3, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ logger.warn("error acquire lock", e);
+ }
+ if (!hasLock) {
+ logger.warn("fail to acquire lock, scheduler has not been started");
+ zkClient.close();
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public void unlock() {
+ releaseLock();
+ }
+
+ private String getZKConnectString() {
+ Configuration conf = HadoopUtil.newHBaseConfiguration(KylinConfig.getInstanceFromEnv().getStorageUrl());
+ return conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT);
+ }
+
+ private void releaseLock() {
+ try {
+ if (zkClient.getState().equals(CuratorFrameworkState.STARTED)) {
+ // client.setData().forPath(ZOOKEEPER_LOCK_PATH, null);
+ if (zkClient.checkExists().forPath(scheduleID) != null) {
+ zkClient.delete().guaranteed().deletingChildrenIfNeeded().forPath(scheduleID);
+ }
+ }
+ } catch (Exception e) {
+ logger.error("error release lock:" + scheduleID);
+ throw new RuntimeException(e);
+ }
+ }
+
+ private String schedulerId() {
+ return ZOOKEEPER_LOCK_PATH + "/" + KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java b/common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
index 7021915..a49706d 100644
--- a/common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
+++ b/common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
@@ -51,6 +51,7 @@ abstract public class ResourceStore {
public static final String SNAPSHOT_RESOURCE_ROOT = "/table_snapshot";
public static final String TABLE_EXD_RESOURCE_ROOT = "/table_exd";
public static final String TABLE_RESOURCE_ROOT = "/table";
+ public static final String HYBRID_RESOURCE_ROOT = "/hybrid";
private static ConcurrentHashMap<KylinConfig, ResourceStore> CACHE = new ConcurrentHashMap<KylinConfig, ResourceStore>();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/common/src/main/java/org/apache/kylin/common/restclient/Broadcaster.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/restclient/Broadcaster.java b/common/src/main/java/org/apache/kylin/common/restclient/Broadcaster.java
index b636e89..68e68c3 100644
--- a/common/src/main/java/org/apache/kylin/common/restclient/Broadcaster.java
+++ b/common/src/main/java/org/apache/kylin/common/restclient/Broadcaster.java
@@ -139,7 +139,7 @@ public class Broadcaster {
}
public static enum TYPE {
- ALL("all"), CUBE("cube"), CUBE_DESC("cube_desc"), PROJECT("project"), INVERTED_INDEX("inverted_index"), INVERTED_INDEX_DESC("ii_desc"), TABLE("table"), DATA_MODEL("data_model");
+ ALL("all"), CUBE("cube"), CUBE_DESC("cube_desc"), PROJECT("project"), INVERTED_INDEX("inverted_index"), INVERTED_INDEX_DESC("ii_desc"), TABLE("table"), DATA_MODEL("data_model"), HYBRID("hybrid");
private String text;
private TYPE(String text) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/org/apache/kylin/cube/CubeInstance.java b/cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
index d978887..898925a 100644
--- a/cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
+++ b/cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
@@ -327,7 +327,7 @@ public class CubeInstance extends RootPersistentEntity implements IRealization {
@Override
public int getCost(SQLDigest digest) {
- return 0;
+ return cost;
}
@Override
@@ -348,4 +348,41 @@ public class CubeInstance extends RootPersistentEntity implements IRealization {
this.projectName = projectName;
}
+
+ @Override
+ public long getDateRangeStart() {
+ List<CubeSegment> readySegs = getSegments(SegmentStatusEnum.READY);
+
+ long startTime = Long.MAX_VALUE;
+ for (CubeSegment seg : readySegs) {
+ if (seg.getDateRangeStart() < startTime)
+ startTime = seg.getDateRangeStart();
+ }
+
+ return startTime;
+ }
+
+ @Override
+ public long getDateRangeEnd() {
+
+ List<CubeSegment> readySegs = getSegments(SegmentStatusEnum.READY);
+
+ long endTime = 0;
+ for (CubeSegment seg : readySegs) {
+ if (seg.getDateRangeEnd() > endTime)
+ endTime = seg.getDateRangeEnd();
+ }
+
+ return endTime;
+ }
+
+ @Override
+ public String getModelName() {
+ return this.getDescriptor().getModelName();
+ }
+
+ @Override
+ public List<TblColRef> getAllDimensions() {
+ return Lists.newArrayList(getDescriptor().listDimensionColumnsIncludingDerived());
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/cube/src/test/java/org/apache/kylin/cube/cuboid/CuboidTest.java
----------------------------------------------------------------------
diff --git a/cube/src/test/java/org/apache/kylin/cube/cuboid/CuboidTest.java b/cube/src/test/java/org/apache/kylin/cube/cuboid/CuboidTest.java
index 9f7cbc6..4914448 100644
--- a/cube/src/test/java/org/apache/kylin/cube/cuboid/CuboidTest.java
+++ b/cube/src/test/java/org/apache/kylin/cube/cuboid/CuboidTest.java
@@ -173,7 +173,6 @@ public class CuboidTest extends LocalFileMetadataTestCase {
assertEquals(toLong("111111111"), cuboid.getId());
}
- //@Test
public void testII() {
CubeDesc cube = getTestKylinCubeII();
assertEquals(toLong("111111111"), Cuboid.getBaseCuboidId(cube));
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/examples/test_case_data/localmeta/hybrid/test_kylin_hybrid_inner_join.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/hybrid/test_kylin_hybrid_inner_join.json b/examples/test_case_data/localmeta/hybrid/test_kylin_hybrid_inner_join.json
new file mode 100644
index 0000000..5582841
--- /dev/null
+++ b/examples/test_case_data/localmeta/hybrid/test_kylin_hybrid_inner_join.json
@@ -0,0 +1,14 @@
+{
+ "uuid": "9iiu8590-64b6-4367-8fb5-7500eb95fd9c",
+ "name": "test_kylin_hybrid_inner_join",
+ "historyRealization": {
+ "type": "CUBE",
+ "realization": "test_kylin_cube_without_slr_empty"
+ },
+ "realTimeRealization": {
+ "type": "INVERTED_INDEX",
+ "realization": "test_kylin_ii"
+ },
+ "last_modified": 1420016227424,
+ "create_time": null
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/examples/test_case_data/localmeta/hybrid/test_kylin_hybrid_left_join.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/hybrid/test_kylin_hybrid_left_join.json b/examples/test_case_data/localmeta/hybrid/test_kylin_hybrid_left_join.json
new file mode 100644
index 0000000..37f59d6
--- /dev/null
+++ b/examples/test_case_data/localmeta/hybrid/test_kylin_hybrid_left_join.json
@@ -0,0 +1,14 @@
+{
+ "uuid": "5ca78590-64b6-4367-8fb5-7500eb95fd9c",
+ "name": "test_kylin_hybrid_left_join",
+ "historyRealization": {
+ "type": "CUBE",
+ "realization": "test_kylin_cube_with_slr_left_join_empty"
+ },
+ "realTimeRealization": {
+ "type": "INVERTED_INDEX",
+ "realization": "test_kylin_ii"
+ },
+ "last_modified": 1420016227424,
+ "create_time": null
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/examples/test_case_data/localmeta/invertedindex_desc/test_kylin_ii_desc.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/invertedindex_desc/test_kylin_ii_desc.json b/examples/test_case_data/localmeta/invertedindex_desc/test_kylin_ii_desc.json
index 962d887..a5ee5e5 100644
--- a/examples/test_case_data/localmeta/invertedindex_desc/test_kylin_ii_desc.json
+++ b/examples/test_case_data/localmeta/invertedindex_desc/test_kylin_ii_desc.json
@@ -1,12 +1,11 @@
{
"uuid": "74bf87b5-c7b5-4420-a12a-07f6b37b3187",
-
"last_modified": 0,
"name": "test_kylin_ii_desc",
"fact_table": "default.test_kylin_fact",
"model_name": "test_kylin_ii_model_desc",
"timestamp_dimension": "cal_dt",
- "bitmap_dimensions": [
+ "value_dimensions": [
{
"table": "default.test_kylin_fact",
"columns": [
@@ -43,9 +42,7 @@
"seller_type_cd",
"seller_type_desc"
]
- }
- ],
- "value_dimensions": [
+ },
{
"table": "edw.test_cal_dt",
"columns": [
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/examples/test_case_data/localmeta/project/default.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/project/default.json b/examples/test_case_data/localmeta/project/default.json
index c6b8b74..853a10a 100644
--- a/examples/test_case_data/localmeta/project/default.json
+++ b/examples/test_case_data/localmeta/project/default.json
@@ -27,11 +27,5 @@
"type": "INVERTED_INDEX",
"realization": "test_kylin_ii"
}
- ],
- "cubes": [
- "test_kylin_cube_with_slr_empty",
- "test_kylin_cube_without_slr_empty",
- "test_kylin_cube_with_slr_left_join_empty",
- "test_kylin_cube_without_slr_left_join_empty"
]
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/examples/test_case_data/localmeta/project/onlyinner.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/project/onlyinner.json b/examples/test_case_data/localmeta/project/onlyinner.json
index b98d762..a3ab387 100644
--- a/examples/test_case_data/localmeta/project/onlyinner.json
+++ b/examples/test_case_data/localmeta/project/onlyinner.json
@@ -12,9 +12,5 @@
"type": "CUBE",
"realization": "test_kylin_cube_without_slr_empty"
}
- ],
- "cubes": [
- "test_kylin_cube_with_slr_empty",
- "test_kylin_cube_without_slr_empty"
]
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/examples/test_case_data/localmeta/project/onlyleft.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/project/onlyleft.json b/examples/test_case_data/localmeta/project/onlyleft.json
index 9569eea..014ff4b 100644
--- a/examples/test_case_data/localmeta/project/onlyleft.json
+++ b/examples/test_case_data/localmeta/project/onlyleft.json
@@ -12,9 +12,5 @@
"type": "CUBE",
"realization": "test_kylin_cube_without_slr_left_join_empty"
}
- ],
- "cubes": [
- "test_kylin_cube_with_slr_left_join_empty",
- "test_kylin_cube_without_slr_left_join_empty"
]
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIInstance.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIInstance.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIInstance.java
index eeb06ed..9c46fbb 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIInstance.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIInstance.java
@@ -310,4 +310,40 @@ public class IIInstance extends RootPersistentEntity implements IRealization {
this.projectName = projectName;
}
+ @Override
+ public long getDateRangeStart() {
+ List<IISegment> readySegs = getSegments(SegmentStatusEnum.READY);
+
+ long startTime = Long.MAX_VALUE;
+ for (IISegment seg : readySegs) {
+ if (seg.getDateRangeStart() < startTime)
+ startTime = seg.getDateRangeStart();
+ }
+
+ return startTime;
+ }
+
+ @Override
+ public long getDateRangeEnd() {
+
+ List<IISegment> readySegs = getSegments(SegmentStatusEnum.READY);
+
+ long endTime = 0;
+ for (IISegment seg : readySegs) {
+ if (seg.getDateRangeEnd() > endTime)
+ endTime = seg.getDateRangeEnd();
+ }
+
+ return endTime;
+ }
+
+ @Override
+ public String getModelName() {
+ return this.getDescriptor().getModelName();
+ }
+
+ @Override
+ public List<TblColRef> getAllDimensions() {
+ return getDescriptor().listAllDimensions();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/SliceBuilder.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/SliceBuilder.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/SliceBuilder.java
index e217df0..72050a1 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/SliceBuilder.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/SliceBuilder.java
@@ -61,9 +61,6 @@ public class SliceBuilder {
// reset for next slice
nRecords = 0;
containers = new ColumnValueContainer[nColumns];
- for (int i : info.getDescriptor().getBitmapColumns()) {
- containers[i] = new BitMapContainer(info.getDigest(), i);
- }
for (int i : info.getDescriptor().getValueColumns()) {
containers[i] = new CompressedValueContainer(info.getDigest(), i,
nRecordsCap);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java
index 75cea57..fca2b93 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java
@@ -66,8 +66,6 @@ public class IIDesc extends RootPersistentEntity {
private String modelName;
@JsonProperty("timestamp_dimension")
private String timestampDimension;
- @JsonProperty("bitmap_dimensions")
- private List<IIDimension> bitmapDimensions;
@JsonProperty("value_dimensions")
private List<IIDimension> valueDimensions;
@JsonProperty("metrics")
@@ -82,8 +80,8 @@ public class IIDesc extends RootPersistentEntity {
// computed
private List<TableDesc> allTables = Lists.newArrayList();
private List<TblColRef> allColumns = Lists.newArrayList();
+ private List<TblColRef> allDimensions = Lists.newArrayList();
private int tsCol;
- private int[] bitmapCols;
private int[] valueCols;
private int[] metricsCols;
private BitSet metricsColSet;
@@ -106,7 +104,6 @@ public class IIDesc extends RootPersistentEntity {
timestampDimension = timestampDimension.toUpperCase();
// capitalize
- IIDimension.capicalizeStrings(bitmapDimensions);
IIDimension.capicalizeStrings(valueDimensions);
StringUtil.toUpperCaseArray(metricNames, metricNames);
@@ -114,11 +111,13 @@ public class IIDesc extends RootPersistentEntity {
HashSet<String> allTableNames = Sets.newHashSet();
measureDescs = Lists.newArrayList();
measureDescs.add(makeCountMeasure());
- for (IIDimension iiDimension : Iterables.concat(bitmapDimensions, valueDimensions)) {
+ for (IIDimension iiDimension : valueDimensions) {
TableDesc tableDesc = this.getTableDesc(iiDimension.getTable());
for (String column : iiDimension.getColumns()) {
ColumnDesc columnDesc = tableDesc.findColumnByName(column);
- allColumns.add(new TblColRef(columnDesc));
+ TblColRef tcr = new TblColRef(columnDesc);
+ allColumns.add(tcr);
+ allDimensions.add(tcr);
measureDescs.add(makeHLLMeasure(columnDesc, null));
}
@@ -141,15 +140,11 @@ public class IIDesc extends RootPersistentEntity {
}
// indexing for each type of columns
- bitmapCols = new int[IIDimension.getColumnCount(bitmapDimensions)];
valueCols = new int[IIDimension.getColumnCount(valueDimensions)];
metricsCols = new int[metricNames.length];
metricsColSet = new BitSet(this.getTableDesc(this.getFactTableName()).getColumnCount());
int totalIndex = 0;
- for (int i = 0; i < bitmapCols.length; ++i, ++totalIndex) {
- bitmapCols[i] = totalIndex;
- }
for (int i = 0; i < valueCols.length; ++i, ++totalIndex) {
valueCols[i] = totalIndex;
}
@@ -169,7 +164,7 @@ public class IIDesc extends RootPersistentEntity {
}
}
if (tsCol < 0)
- throw new RuntimeException("timestamp_dimension is not in bitmapDimensions or valueDimensions");
+ throw new RuntimeException("timestamp_dimension is not in valueDimensions");
}
private TableDesc getTableDesc(String tableName) {
@@ -257,6 +252,10 @@ public class IIDesc extends RootPersistentEntity {
return allColumns;
}
+ public List<TblColRef> listAllDimensions() {
+ return allDimensions;
+ }
+
public TblColRef findColumnRef(String table, String column) {
ColumnDesc columnDesc = this.getTableDesc(table).findColumnByName(column);
return new TblColRef(columnDesc);
@@ -294,10 +293,6 @@ public class IIDesc extends RootPersistentEntity {
return tsCol;
}
- public int[] getBitmapColumns() {
- return bitmapCols;
- }
-
public int[] getValueColumns() {
return valueCols;
}
@@ -354,7 +349,7 @@ public class IIDesc extends RootPersistentEntity {
try {
md = MessageDigest.getInstance("MD5");
StringBuilder sigString = new StringBuilder();
- sigString.append(this.name).append("|").append(this.getFactTableName()).append("|").append(timestampDimension).append("|").append(JsonUtil.writeValueAsString(this.bitmapDimensions)).append("|").append(JsonUtil.writeValueAsString(valueDimensions)).append("|").append(JsonUtil.writeValueAsString(this.metricNames)).append("|").append(sharding).append("|").append(sliceSize);
+ sigString.append(this.name).append("|").append(this.getFactTableName()).append("|").append(timestampDimension).append("|").append("|").append(JsonUtil.writeValueAsString(valueDimensions)).append("|").append(JsonUtil.writeValueAsString(this.metricNames)).append("|").append(sharding).append("|").append(sliceSize);
byte[] signature = md.digest(sigString.toString().getBytes());
return new String(Base64.encodeBase64(signature));
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDimension.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDimension.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDimension.java
index ee8b6b9..d2d377d 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDimension.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDimension.java
@@ -18,6 +18,7 @@
package org.apache.kylin.invertedindex.model;
+import java.util.Arrays;
import java.util.List;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
@@ -52,9 +53,11 @@ public class IIDimension {
public static void capicalizeStrings(List<IIDimension> dimensions) {
- for (IIDimension iiDimension : dimensions) {
- iiDimension.setTable(iiDimension.getTable().toUpperCase());
- StringUtil.toUpperCaseArray(iiDimension.getColumns(), iiDimension.getColumns());
+ if (dimensions != null) {
+ for (IIDimension iiDimension : dimensions) {
+ iiDimension.setTable(iiDimension.getTable().toUpperCase());
+ StringUtil.toUpperCaseArray(iiDimension.getColumns(), iiDimension.getColumns());
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/IIDescManagerTest.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/IIDescManagerTest.java b/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/IIDescManagerTest.java
index 5782a82..bd01b2c 100644
--- a/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/IIDescManagerTest.java
+++ b/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/IIDescManagerTest.java
@@ -54,7 +54,7 @@ public class IIDescManagerTest extends LocalFileMetadataTestCase {
public void testCRUD() throws IOException {
IIDescManager mgr = IIDescManager.getInstance(getTestConfig());
- String newDescName = "Copy of " + TEST_II_DESC_NAME;
+ String newDescName = "Copy_of_" + TEST_II_DESC_NAME;
try {
IIDesc testRecord = mgr.getIIDesc(newDescName);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/IIInstanceTest.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/IIInstanceTest.java b/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/IIInstanceTest.java
index 792d674..4eb11be 100644
--- a/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/IIInstanceTest.java
+++ b/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/IIInstanceTest.java
@@ -70,7 +70,7 @@ public class IIInstanceTest extends LocalFileMetadataTestCase {
IIDesc iiDesc = IIDescManager.getInstance(getTestConfig()).getIIDesc("test_kylin_ii_desc");
- IIInstance ii = IIInstance.create("new ii", "default", iiDesc);
+ IIInstance ii = IIInstance.create("new_ii", "default", iiDesc);
IIManager iiMgr = IIManager.getInstance(getTestConfig());
@@ -78,7 +78,7 @@ public class IIInstanceTest extends LocalFileMetadataTestCase {
iiMgr.createII(ii);
- Assert.assertNotNull(iiMgr.getII("new ii"));
+ Assert.assertNotNull(iiMgr.getII("new_ii"));
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/InvertedIndexLocalTest.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/InvertedIndexLocalTest.java b/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/InvertedIndexLocalTest.java
index 42de647..45eb9b5 100644
--- a/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/InvertedIndexLocalTest.java
+++ b/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/InvertedIndexLocalTest.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.kylin.common.util.Pair;
import org.junit.After;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Test;
import java.io.File;
@@ -62,6 +63,7 @@ public class InvertedIndexLocalTest extends LocalFileMetadataTestCase {
}
@Test
+ @Ignore
public void testBitMapContainer() {
// create container
BitMapContainer container = new BitMapContainer(info.getDigest(), 0);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/job/src/main/java/org/apache/kylin/job/Scheduler.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/Scheduler.java b/job/src/main/java/org/apache/kylin/job/Scheduler.java
index 120b86f..87ce072 100644
--- a/job/src/main/java/org/apache/kylin/job/Scheduler.java
+++ b/job/src/main/java/org/apache/kylin/job/Scheduler.java
@@ -18,6 +18,7 @@
package org.apache.kylin.job;
+import org.apache.kylin.common.lock.JobLock;
import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.job.exception.SchedulerException;
import org.apache.kylin.job.execution.Executable;
@@ -27,7 +28,7 @@ import org.apache.kylin.job.execution.Executable;
*/
public interface Scheduler<T extends Executable> {
- void init(JobEngineConfig jobEngineConfig) throws SchedulerException;
+ void init(JobEngineConfig jobEngineConfig, JobLock jobLock) throws SchedulerException;
void shutdown() throws SchedulerException;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java b/job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
index 6483035..d69e74d 100644
--- a/job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
+++ b/job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
@@ -26,6 +26,7 @@ import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import org.apache.kylin.common.lock.JobLock;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.commons.lang.StringUtils;
import org.apache.curator.RetryPolicy;
@@ -143,29 +144,6 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti
}
}
- private void releaseLock() {
- try {
- if (zkClient.getState().equals(CuratorFrameworkState.STARTED)) {
- // client.setData().forPath(ZOOKEEPER_LOCK_PATH, null);
- if (zkClient.checkExists().forPath(schedulerId()) != null) {
- zkClient.delete().guaranteed().deletingChildrenIfNeeded().forPath(schedulerId());
- }
- }
- } catch (Exception e) {
- logger.error("error release lock:" + schedulerId());
- throw new RuntimeException(e);
- }
- }
-
- private String schedulerId() {
- return ZOOKEEPER_LOCK_PATH + "/" + jobEngineConfig.getConfig().getMetadataUrlPrefix();
- }
-
- private String getZKConnectString(JobEngineConfig context) {
- Configuration conf = HadoopUtil.newHBaseConfiguration(context.getConfig().getStorageUrl());
- return conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT);
- }
-
public static DefaultScheduler getInstance() {
return INSTANCE;
}
@@ -182,33 +160,16 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti
}
@Override
- public synchronized void init(JobEngineConfig jobEngineConfig) throws SchedulerException {
+ public synchronized void init(JobEngineConfig jobEngineConfig, final JobLock jobLock) throws SchedulerException {
if (!initialized) {
initialized = true;
} else {
return;
}
- String ZKConnectString = getZKConnectString(jobEngineConfig);
- if (StringUtils.isEmpty(ZKConnectString)) {
- throw new IllegalArgumentException("ZOOKEEPER_QUORUM is empty!");
- }
this.jobEngineConfig = jobEngineConfig;
- RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
- this.zkClient = CuratorFrameworkFactory.newClient(ZKConnectString, retryPolicy);
- this.zkClient.start();
- this.sharedLock = new InterProcessMutex(zkClient, schedulerId());
- boolean hasLock = false;
- try {
- hasLock = sharedLock.acquire(3, TimeUnit.SECONDS);
- } catch (Exception e) {
- logger.warn("error acquire lock", e);
- }
- if (!hasLock) {
- logger.warn("fail to acquire lock, scheduler has not been started");
- zkClient.close();
- return;
- }
+ jobLock.lock();
+
executableManager = ExecutableManager.getInstance(jobEngineConfig.getConfig());
//load all executable, set them to a consistent status
fetcherPool = Executors.newScheduledThreadPool(1);
@@ -228,6 +189,7 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti
logger.debug("Closing zk connection");
try {
shutdown();
+ jobLock.unlock();
} catch (SchedulerException e) {
logger.error("error shutdown scheduler", e);
}
@@ -243,7 +205,6 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti
public void shutdown() throws SchedulerException {
fetcherPool.shutdown();
jobPool.shutdown();
- releaseLock();
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java b/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
index a5a1156..7fd4246 100644
--- a/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
+++ b/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
@@ -22,6 +22,7 @@ import com.google.common.collect.Lists;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.lock.ZookeeperJobLock;
import org.apache.kylin.common.util.AbstractKylinTestCase;
import org.apache.kylin.common.util.ClassUtil;
import org.apache.kylin.common.util.HBaseMetadataTestCase;
@@ -96,7 +97,7 @@ public class BuildCubeWithEngineTest {
final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
jobService = ExecutableManager.getInstance(kylinConfig);
scheduler = DefaultScheduler.getInstance();
- scheduler.init(new JobEngineConfig(kylinConfig));
+ scheduler.init(new JobEngineConfig(kylinConfig), new ZookeeperJobLock());
if (!scheduler.hasStarted()) {
throw new RuntimeException("scheduler has not been started");
}
@@ -212,7 +213,7 @@ public class BuildCubeWithEngineTest {
// this cube's start date is 0, end date is 20501112000000
long date1 = 0;
- long date2 = f.parse("2013-01-01").getTime();
+ long date2 = f.parse("2022-01-01").getTime();
// this cube doesn't support incremental build, always do full build
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/job/src/test/java/org/apache/kylin/job/BuildIIWithEngineTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/BuildIIWithEngineTest.java b/job/src/test/java/org/apache/kylin/job/BuildIIWithEngineTest.java
index 194a8d7..6dacc08 100644
--- a/job/src/test/java/org/apache/kylin/job/BuildIIWithEngineTest.java
+++ b/job/src/test/java/org/apache/kylin/job/BuildIIWithEngineTest.java
@@ -35,6 +35,7 @@ import java.util.concurrent.Future;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.lock.ZookeeperJobLock;
import org.apache.kylin.job.hadoop.cube.StorageCleanupJob;
import org.junit.After;
import org.junit.Before;
@@ -108,7 +109,7 @@ public class BuildIIWithEngineTest {
final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
jobService = ExecutableManager.getInstance(kylinConfig);
scheduler = DefaultScheduler.getInstance();
- scheduler.init(new JobEngineConfig(kylinConfig));
+ scheduler.init(new JobEngineConfig(kylinConfig), new ZookeeperJobLock());
if (!scheduler.hasStarted()) {
throw new RuntimeException("scheduler has not been started");
}
@@ -134,7 +135,7 @@ public class BuildIIWithEngineTest {
ii.setStatus(RealizationStatusEnum.READY);
iiManager.updateII(ii);
}
- backup();
+ // backup();
}
@Test
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java b/job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
index 9da0f73..29ef7ad 100644
--- a/job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
+++ b/job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
@@ -19,6 +19,7 @@
package org.apache.kylin.job.impl.threadpool;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.lock.ZookeeperJobLock;
import org.apache.kylin.common.util.HBaseMetadataTestCase;
import org.apache.kylin.job.constant.ExecutableConstants;
import org.apache.kylin.job.engine.JobEngineConfig;
@@ -87,7 +88,7 @@ public abstract class BaseSchedulerTest extends HBaseMetadataTestCase {
setFinalStatic(ExecutableConstants.class.getField("DEFAULT_SCHEDULER_INTERVAL_SECONDS"), 10);
jobService = ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv());
scheduler = DefaultScheduler.getInstance();
- scheduler.init(new JobEngineConfig(KylinConfig.getInstanceFromEnv()));
+ scheduler.init(new JobEngineConfig(KylinConfig.getInstanceFromEnv()), new ZookeeperJobLock());
if (!scheduler.hasStarted()) {
throw new RuntimeException("scheduler has not been started");
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java b/metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java
index 76c3911..ca932e9 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java
@@ -67,6 +67,10 @@ public class TblColRef {
this.column = column;
}
+ public ColumnDesc getColumnDesc() {
+ return column;
+ }
+
public ColumnDesc getColumn() {
return column;
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/metadata/src/main/java/org/apache/kylin/metadata/realization/IRealization.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/realization/IRealization.java b/metadata/src/main/java/org/apache/kylin/metadata/realization/IRealization.java
index d1571ce..43f5e9a 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/realization/IRealization.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/realization/IRealization.java
@@ -51,6 +51,8 @@ public interface IRealization {
public List<TblColRef> getAllColumns();
+ public List<TblColRef> getAllDimensions();
+
public List<MeasureDesc> getMeasures();
public boolean isReady();
@@ -63,4 +65,10 @@ public interface IRealization {
public void setProjectName(String prjName);
+ public long getDateRangeStart();
+
+ public long getDateRangeEnd();
+
+ public String getModelName();
+
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/metadata/src/main/java/org/apache/kylin/metadata/realization/RealizationRegistry.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/realization/RealizationRegistry.java b/metadata/src/main/java/org/apache/kylin/metadata/realization/RealizationRegistry.java
index c05497e..2c7ba42 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/realization/RealizationRegistry.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/realization/RealizationRegistry.java
@@ -111,6 +111,7 @@ public class RealizationRegistry {
IRealizationProvider p = providers.get(type);
if (p == null) {
logger.warn("No provider for realization type " + type);
+ return null;
}
try {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/metadata/src/main/java/org/apache/kylin/metadata/realization/RealizationType.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/realization/RealizationType.java b/metadata/src/main/java/org/apache/kylin/metadata/realization/RealizationType.java
index 64e3ab7..1b5dbc5 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/realization/RealizationType.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/realization/RealizationType.java
@@ -24,5 +24,5 @@ package org.apache.kylin.metadata.realization;
//TODO: change to String for plugin
public enum RealizationType {
- CUBE, INVERTED_INDEX
+ CUBE, INVERTED_INDEX, HYBRID
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigestUtil.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigestUtil.java b/metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigestUtil.java
new file mode 100644
index 0000000..72e3b10
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigestUtil.java
@@ -0,0 +1,114 @@
+package org.apache.kylin.metadata.realization;
+
+import com.google.common.base.Function;
+import com.google.common.collect.BoundType;
+import com.google.common.collect.Range;
+import org.apache.kylin.common.util.DateFormat;
+import org.apache.kylin.metadata.filter.*;
+import org.apache.kylin.metadata.model.DataType;
+import org.apache.kylin.metadata.model.TblColRef;
+
+/**
+ */
+public class SQLDigestUtil {
+
+ public static <F, T> T appendTsFilterToExecute(SQLDigest sqlDigest, TblColRef partitionColRef, Range<Long> tsRange, Function<F, T> action) {
+
+ // add the boundary condition to query real-time
+ TupleFilter originalFilter = sqlDigest.filter;
+ sqlDigest.filter = createFilterForRealtime(originalFilter, partitionColRef, tsRange);
+
+ boolean addFilterColumn = false, addAllColumn = false;
+
+ if (!sqlDigest.filterColumns.contains(partitionColRef)) {
+ sqlDigest.filterColumns.add(partitionColRef);
+ addFilterColumn = true;
+ }
+
+ if (!sqlDigest.allColumns.contains(partitionColRef)) {
+ sqlDigest.allColumns.add(partitionColRef);
+ addAllColumn = true;
+ }
+
+ T ret = action.apply(null);
+
+ // restore the sqlDigest
+ sqlDigest.filter = originalFilter;
+
+ if (addFilterColumn)
+ sqlDigest.filterColumns.remove(partitionColRef);
+
+ if (addAllColumn)
+ sqlDigest.allColumns.remove(partitionColRef);
+
+ return ret;
+ }
+
+ //ts column type differentiate
+ private static String formatTimeStr(DataType type, long ts) {
+ String ret;
+ if (type == DataType.getInstance("date")) {
+ ret = DateFormat.formatToDateStr(ts);
+ } else if (type == DataType.getInstance("long")) {
+ ret = String.valueOf(ts);
+ } else {
+ throw new IllegalArgumentException("Illegal type for partition column " + type);
+ }
+ return ret;
+ }
+
+ private static TupleFilter createFilterForRealtime(TupleFilter originFilter, TblColRef partitionColRef, Range<Long> tsRange) {
+ DataType type = partitionColRef.getColumnDesc().getType();
+
+ String startTimeStr, endTimeStr;
+ CompareTupleFilter startFilter = null, endFilter = null;
+ if (tsRange.hasLowerBound()) {
+ startTimeStr = formatTimeStr(type, tsRange.lowerEndpoint());
+ if (tsRange.lowerBoundType() == BoundType.CLOSED) {
+ startFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.GTE);
+ } else {
+ startFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.GT);
+ }
+ ColumnTupleFilter columnTupleFilter = new ColumnTupleFilter(partitionColRef);
+ ConstantTupleFilter constantTupleFilter = new ConstantTupleFilter(startTimeStr);
+ startFilter.addChild(columnTupleFilter);
+ startFilter.addChild(constantTupleFilter);
+ }
+
+ if (tsRange.hasUpperBound()) {
+ endTimeStr = formatTimeStr(type, tsRange.upperEndpoint());
+ if (tsRange.upperBoundType() == BoundType.CLOSED) {
+ endFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.LTE);
+ } else {
+ endFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.LT);
+ }
+ ColumnTupleFilter columnTupleFilter = new ColumnTupleFilter(partitionColRef);
+ ConstantTupleFilter constantTupleFilter = new ConstantTupleFilter(endTimeStr);
+ endFilter.addChild(columnTupleFilter);
+ endFilter.addChild(constantTupleFilter);
+ }
+
+ if (originFilter == null) {
+ if (endFilter == null) {
+ return startFilter;
+ }
+ if (startFilter == null) {
+ return endFilter;
+ }
+ }
+
+ LogicalTupleFilter logicalTupleFilter = new LogicalTupleFilter(TupleFilter.FilterOperatorEnum.AND);
+
+ if (originFilter != null) {
+ logicalTupleFilter.addChild(originFilter);
+ }
+ if (startFilter != null) {
+ logicalTupleFilter.addChild(startFilter);
+ }
+ if (endFilter != null) {
+ logicalTupleFilter.addChild(endFilter);
+ }
+
+ return logicalTupleFilter;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/metadata/src/main/java/org/apache/kylin/metadata/tuple/CompoundTupleIterator.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/tuple/CompoundTupleIterator.java b/metadata/src/main/java/org/apache/kylin/metadata/tuple/CompoundTupleIterator.java
new file mode 100644
index 0000000..f5d8dd6
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kylin/metadata/tuple/CompoundTupleIterator.java
@@ -0,0 +1,45 @@
+package org.apache.kylin.metadata.tuple;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterators;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ */
+public class CompoundTupleIterator implements ITupleIterator {
+ private static final Logger logger = LoggerFactory.getLogger(CompoundTupleIterator.class);
+ private List<ITupleIterator> backends;
+ private Iterator<ITuple> compoundIterator;
+
+ public CompoundTupleIterator(List<ITupleIterator> backends) {
+ Preconditions.checkArgument(backends != null && backends.size() != 0, "backends not exists");
+ this.backends = backends;
+ this.compoundIterator = Iterators.concat(backends.iterator());
+ }
+
+ @Override
+ public void close() {
+ for (ITupleIterator i : backends) {
+ i.close();
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ return this.compoundIterator.hasNext();
+ }
+
+ @Override
+ public ITuple next() {
+ return this.compoundIterator.next();
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/metadata/src/main/java/org/apache/kylin/metadata/tuple/ITupleIterator.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/tuple/ITupleIterator.java b/metadata/src/main/java/org/apache/kylin/metadata/tuple/ITupleIterator.java
index 6966751..5235387 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/tuple/ITupleIterator.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/tuple/ITupleIterator.java
@@ -18,10 +18,12 @@
package org.apache.kylin.metadata.tuple;
+import java.util.Iterator;
+
/**
* @author xjiang
*/
-public interface ITupleIterator {
+public interface ITupleIterator extends Iterator<ITuple> {
public static final ITupleIterator EMPTY_TUPLE_ITERATOR = new ITupleIterator() {
@Override
public boolean hasNext() {
@@ -34,6 +36,10 @@ public interface ITupleIterator {
}
@Override
+ public void remove() {
+ }
+
+ @Override
public void close() {
}
};
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/query/src/main/java/org/apache/kylin/query/routing/RoutingRule.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/routing/RoutingRule.java b/query/src/main/java/org/apache/kylin/query/routing/RoutingRule.java
index aaad516..04972fa 100644
--- a/query/src/main/java/org/apache/kylin/query/routing/RoutingRule.java
+++ b/query/src/main/java/org/apache/kylin/query/routing/RoutingRule.java
@@ -41,10 +41,8 @@ public abstract class RoutingRule {
//1. simple query use II prior to cube
//2. exact match prior to week match
static {
- rules.add(new RealizationPriorityRule());
rules.add(new RemoveUncapableRealizationsRule());
- rules.add(new SimpleQueryMoreColumnsCubeFirstRule());
- rules.add(new CubesSortRule());
+ rules.add(new RealizationSortRule());
rules.add(new AdjustForWeeklyMatchedRealization());//this rule might modify olapcontext content, better put it at last
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/AdjustForWeeklyMatchedRealization.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/AdjustForWeeklyMatchedRealization.java b/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/AdjustForWeeklyMatchedRealization.java
index 465d162..3402556 100644
--- a/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/AdjustForWeeklyMatchedRealization.java
+++ b/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/AdjustForWeeklyMatchedRealization.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,12 +18,6 @@
package org.apache.kylin.query.routing.RoutingRules;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.kylin.cube.CubeCapabilityChecker;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.model.CubeDesc;
@@ -34,9 +28,15 @@ import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.metadata.realization.IRealization;
import org.apache.kylin.query.relnode.OLAPContext;
import org.apache.kylin.query.routing.RoutingRule;
+import org.apache.kylin.storage.hybrid.HybridInstance;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
/**
- * Created by Hongbin Ma(Binmahone) on 1/5/15.
*/
public class AdjustForWeeklyMatchedRealization extends RoutingRule {
private static final Logger logger = LoggerFactory.getLogger(AdjustForWeeklyMatchedRealization.class);
@@ -46,6 +46,13 @@ public class AdjustForWeeklyMatchedRealization extends RoutingRule {
if (realizations.size() > 0) {
IRealization first = realizations.get(0);
+ if (first instanceof HybridInstance) {
+ HybridInstance hybrid = (HybridInstance) first;
+
+ if (hybrid.getHistoryRealizationInstance() instanceof CubeInstance)
+ first = hybrid.getHistoryRealizationInstance();
+ }
+
if (first instanceof CubeInstance) {
CubeInstance cube = (CubeInstance) first;
adjustOLAPContextIfNecessary(cube, olapContext);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RealizationPriorityRule.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RealizationPriorityRule.java b/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RealizationPriorityRule.java
index 6a67140..7425d62 100644
--- a/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RealizationPriorityRule.java
+++ b/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RealizationPriorityRule.java
@@ -33,8 +33,9 @@ public class RealizationPriorityRule extends RoutingRule {
static Map<RealizationType, Integer> priorities = Maps.newHashMap();
static {
- priorities.put(RealizationType.CUBE, 0);
- priorities.put(RealizationType.INVERTED_INDEX, 1);
+ priorities.put(RealizationType.HYBRID, 2);
+ priorities.put(RealizationType.CUBE, 1);
+ priorities.put(RealizationType.INVERTED_INDEX, 0);
}
public static void setPriorities(Map<RealizationType, Integer> priorities) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RealizationSortRule.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RealizationSortRule.java b/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RealizationSortRule.java
new file mode 100644
index 0000000..f9a250e
--- /dev/null
+++ b/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RealizationSortRule.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.query.routing.RoutingRules;
+
+import org.apache.kylin.metadata.realization.IRealization;
+import org.apache.kylin.query.relnode.OLAPContext;
+import org.apache.kylin.query.routing.RoutingRule;
+import org.apache.kylin.storage.hybrid.HybridInstance;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+/**
+ */
+public class RealizationSortRule extends RoutingRule {
+ @Override
+ public void apply(List<IRealization> realizations, final OLAPContext olapContext) {
+
+ // sort cube candidates, 0) the priority 1) the cost indicator, 2) the lesser header
+ // columns the better, 3) the lesser body columns the better 4) the larger date range the better
+
+ Collections.sort(realizations, new Comparator<IRealization>() {
+ @Override
+ public int compare(IRealization o1, IRealization o2) {
+ int i1 = RealizationPriorityRule.priorities.get(o1.getType());
+ int i2 = RealizationPriorityRule.priorities.get(o2.getType());
+ int comp = i1 - i2;
+ if (comp != 0) {
+ return comp;
+ }
+
+ comp = o1.getCost(olapContext.getSQLDigest()) - o2.getCost(olapContext.getSQLDigest());
+ if (comp != 0) {
+ return comp;
+ }
+
+ if (o1 instanceof HybridInstance)
+ return -1;
+ else if (o2 instanceof HybridInstance)
+ return 1;
+
+ return 0;
+ }
+ });
+
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/query/src/test/java/org/apache/kylin/query/test/IIQueryTest.java
----------------------------------------------------------------------
diff --git a/query/src/test/java/org/apache/kylin/query/test/IIQueryTest.java b/query/src/test/java/org/apache/kylin/query/test/IIQueryTest.java
index f5b42d5..b58e439 100644
--- a/query/src/test/java/org/apache/kylin/query/test/IIQueryTest.java
+++ b/query/src/test/java/org/apache/kylin/query/test/IIQueryTest.java
@@ -24,6 +24,7 @@ import org.apache.kylin.metadata.realization.RealizationType;
import org.apache.kylin.query.routing.RoutingRules.RealizationPriorityRule;
import org.junit.AfterClass;
import org.junit.BeforeClass;
+import org.junit.Ignore;
import org.junit.Test;
import com.google.common.collect.Maps;
@@ -40,6 +41,7 @@ public class IIQueryTest extends KylinQueryTest {
Map<RealizationType, Integer> priorities = Maps.newHashMap();
priorities.put(RealizationType.INVERTED_INDEX, 0);
priorities.put(RealizationType.CUBE, 1);
+ priorities.put(RealizationType.HYBRID, 1);
RealizationPriorityRule.setPriorities(priorities);
}
@@ -51,6 +53,7 @@ public class IIQueryTest extends KylinQueryTest {
Map<RealizationType, Integer> priorities = Maps.newHashMap();
priorities.put(RealizationType.INVERTED_INDEX, 1);
priorities.put(RealizationType.CUBE, 0);
+ priorities.put(RealizationType.HYBRID, 0);
RealizationPriorityRule.setPriorities(priorities);
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/server/src/main/java/org/apache/kylin/rest/controller/JobController.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/controller/JobController.java b/server/src/main/java/org/apache/kylin/rest/controller/JobController.java
index 2f2d9cf..0e0a0b1 100644
--- a/server/src/main/java/org/apache/kylin/rest/controller/JobController.java
+++ b/server/src/main/java/org/apache/kylin/rest/controller/JobController.java
@@ -26,6 +26,7 @@ import java.util.List;
import java.util.Map;
import java.util.TimeZone;
+import org.apache.kylin.common.lock.ZookeeperJobLock;
import org.apache.kylin.rest.exception.InternalErrorException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -82,7 +83,7 @@ public class JobController extends BasicController implements InitializingBean {
public void run() {
try {
DefaultScheduler scheduler = DefaultScheduler.getInstance();
- scheduler.init(new JobEngineConfig(kylinConfig));
+ scheduler.init(new JobEngineConfig(kylinConfig), new ZookeeperJobLock());
if (!scheduler.hasStarted()) {
logger.error("scheduler has not been started");
System.exit(1);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/storage/src/main/java/org/apache/kylin/storage/StorageEngineFactory.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/StorageEngineFactory.java b/storage/src/main/java/org/apache/kylin/storage/StorageEngineFactory.java
index 5eb4ea3..b81bad2 100644
--- a/storage/src/main/java/org/apache/kylin/storage/StorageEngineFactory.java
+++ b/storage/src/main/java/org/apache/kylin/storage/StorageEngineFactory.java
@@ -24,6 +24,8 @@ import org.apache.kylin.metadata.realization.RealizationType;
import org.apache.kylin.metadata.realization.IRealization;
import org.apache.kylin.storage.hbase.CubeStorageEngine;
import org.apache.kylin.storage.hbase.InvertedIndexStorageEngine;
+import org.apache.kylin.storage.hybrid.HybridInstance;
+import org.apache.kylin.storage.hybrid.HybridStorageEngine;
/**
* @author xjiang
@@ -33,8 +35,10 @@ public class StorageEngineFactory {
public static IStorageEngine getStorageEngine(IRealization realization) {
if (realization.getType() == RealizationType.INVERTED_INDEX) {
return new InvertedIndexStorageEngine((IIInstance) realization);
- } else {
+ } else if (realization.getType() == RealizationType.CUBE) {
return new CubeStorageEngine((CubeInstance) realization);
+ } else {
+ return new HybridStorageEngine((HybridInstance) realization);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/storage/src/main/java/org/apache/kylin/storage/hbase/CubeSegmentTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/CubeSegmentTupleIterator.java b/storage/src/main/java/org/apache/kylin/storage/hbase/CubeSegmentTupleIterator.java
index 4589e4d..1a8decd 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/CubeSegmentTupleIterator.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/CubeSegmentTupleIterator.java
@@ -180,6 +180,11 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
return this.tuple;
}
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
private void scanNextRange() {
if (this.rangeIterator.hasNext()) {
closeScanner();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/storage/src/main/java/org/apache/kylin/storage/hbase/SerializedHBaseTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/SerializedHBaseTupleIterator.java b/storage/src/main/java/org/apache/kylin/storage/hbase/SerializedHBaseTupleIterator.java
index 8a22579..97ba981 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/SerializedHBaseTupleIterator.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/SerializedHBaseTupleIterator.java
@@ -122,6 +122,11 @@ public class SerializedHBaseTupleIterator implements ITupleIterator {
}
@Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
public void close() {
context.setTotalScanCount(scanCount);
segmentIterator.close();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointTupleIterator.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointTupleIterator.java
index 8b4d8cf..b2f07c0 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointTupleIterator.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointTupleIterator.java
@@ -194,6 +194,11 @@ public class EndpointTupleIterator implements ITupleIterator {
}
@Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
public void close() {
IOUtils.closeQuietly(table);
logger.info("Closed after " + rowsInAllMetric + " rows are fetched");
@@ -312,6 +317,11 @@ public class EndpointTupleIterator implements ITupleIterator {
}
@Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
public void close() {
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java b/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
new file mode 100644
index 0000000..1694719
--- /dev/null
+++ b/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
@@ -0,0 +1,188 @@
+package org.apache.kylin.storage.hybrid;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.RootPersistentEntity;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.project.RealizationEntry;
+import org.apache.kylin.metadata.realization.IRealization;
+import org.apache.kylin.metadata.realization.RealizationRegistry;
+import org.apache.kylin.metadata.realization.RealizationType;
+import org.apache.kylin.metadata.realization.SQLDigest;
+
+import java.util.List;
+
+/**
+ */
+
+@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE)
+public class HybridInstance extends RootPersistentEntity implements IRealization {
+
+ @JsonIgnore
+ private KylinConfig config;
+
+ @JsonProperty("name")
+ private String name;
+
+ @JsonProperty("historyRealization")
+ private RealizationEntry historyRealization;
+
+ @JsonProperty("realTimeRealization")
+ private RealizationEntry realTimeRealization;
+
+ private IRealization historyRealizationInstance;
+ private IRealization realTimeRealizationInstance;
+ private String projectName;
+
+ public void init() {
+ RealizationRegistry registry = RealizationRegistry.getInstance(config);
+ historyRealizationInstance = registry.getRealization(historyRealization.getType(), historyRealization.getRealization());
+ realTimeRealizationInstance = registry.getRealization(realTimeRealization.getType(), realTimeRealization.getRealization());
+
+ if (historyRealizationInstance == null) {
+ throw new IllegalArgumentException("Didn't find realization '" + historyRealization.getType() + "'" + " with name '" + historyRealization.getRealization() + "' in '" + name + "'");
+ }
+
+ if (realTimeRealizationInstance == null) {
+ throw new IllegalArgumentException("Didn't find realization '" + realTimeRealization.getType() + "'" + " with name '" + realTimeRealization.getRealization() + "' in '" + name + "'");
+ }
+
+ }
+
+ @Override
+ public boolean isCapable(SQLDigest digest) {
+ return getHistoryRealizationInstance().isCapable(digest) && getRealTimeRealizationInstance().isCapable(digest);
+ }
+
+ @Override
+ public int getCost(SQLDigest digest) {
+ return historyRealizationInstance.getCost(digest);
+ //return Math.min(historyRealizationInstance.getCost(digest), realTimeRealizationInstance.getCost(digest));
+ }
+
+ @Override
+ public RealizationType getType() {
+ return RealizationType.HYBRID;
+ }
+
+ @Override
+ public String getFactTable() {
+ return getHistoryRealizationInstance().getFactTable();
+ }
+
+ @Override
+ public List<TblColRef> getAllColumns() {
+ return getHistoryRealizationInstance().getAllColumns();
+ }
+
+ @Override
+ public List<MeasureDesc> getMeasures() {
+ return getHistoryRealizationInstance().getMeasures();
+ }
+
+ @Override
+ public boolean isReady() {
+ return historyRealizationInstance.isReady() || realTimeRealizationInstance.isReady();
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public String toString() {
+ return getCanonicalName();
+ }
+
+ @Override
+ public String getCanonicalName() {
+ return getType() + "[name=" + name + "]";
+ }
+
+ @Override
+ public String getProjectName() {
+ return projectName;
+ }
+
+ @Override
+ public void setProjectName(String prjName) {
+ projectName = prjName;
+ }
+
+ public KylinConfig getConfig() {
+ return config;
+ }
+
+ public void setConfig(KylinConfig config) {
+ this.config = config;
+ }
+
+ public RealizationEntry getHistoryRealization() {
+ return historyRealization;
+ }
+
+ public RealizationEntry getRealTimeRealization() {
+ return realTimeRealization;
+ }
+
+ public IRealization getHistoryRealizationInstance() {
+ if (historyRealizationInstance == null) {
+ this.init();
+ }
+ return historyRealizationInstance;
+ }
+
+ public IRealization getRealTimeRealizationInstance() {
+ if (realTimeRealizationInstance == null) {
+ this.init();
+ }
+ return realTimeRealizationInstance;
+ }
+
+ @Override
+ public long getDateRangeStart() {
+ return Math.min(getHistoryRealizationInstance().getDateRangeStart(), getRealTimeRealizationInstance().getDateRangeStart());
+ }
+
+ @Override
+ public long getDateRangeEnd() {
+ return Math.max(getHistoryRealizationInstance().getDateRangeEnd(), getRealTimeRealizationInstance().getDateRangeEnd()) +1;
+ }
+
+
+
+ public DataModelDesc getDataModelDesc(){
+ if (getHistoryRealizationInstance() instanceof CubeInstance) {
+ return ((CubeInstance) historyRealizationInstance).getDescriptor().getModel();
+ }
+
+ return ((IIInstance) historyRealizationInstance).getDescriptor().getModel();
+ }
+
+ @Override
+ public String getModelName() {
+ if (getHistoryRealizationInstance() instanceof CubeInstance) {
+ return ((CubeInstance) historyRealizationInstance).getDescriptor().getModelName();
+ }
+
+ return ((IIInstance) historyRealizationInstance).getDescriptor().getModelName();
+ }
+
+ @Override
+ public List<TblColRef> getAllDimensions(){
+
+ return this.getHistoryRealizationInstance().getAllDimensions();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java b/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java
new file mode 100644
index 0000000..df3fae3
--- /dev/null
+++ b/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java
@@ -0,0 +1,127 @@
+package org.apache.kylin.storage.hybrid;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.JsonSerializer;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.persistence.Serializer;
+import org.apache.kylin.common.restclient.Broadcaster;
+import org.apache.kylin.common.restclient.CaseInsensitiveStringCache;
+import org.apache.kylin.metadata.realization.IRealization;
+import org.apache.kylin.metadata.realization.IRealizationProvider;
+import org.apache.kylin.metadata.realization.RealizationType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ */
+public class HybridManager implements IRealizationProvider {
+ public static final Serializer<HybridInstance> HYBRID_SERIALIZER = new JsonSerializer<HybridInstance>(HybridInstance.class);
+
+ private static final Logger logger = LoggerFactory.getLogger(HybridManager.class);
+
+ // static cached instances
+ private static final ConcurrentHashMap<KylinConfig, HybridManager> CACHE = new ConcurrentHashMap<KylinConfig, HybridManager>();
+
+ public static HybridManager getInstance(KylinConfig config) {
+ HybridManager r = CACHE.get(config);
+ if (r != null) {
+ return r;
+ }
+
+ synchronized (HybridManager.class) {
+ r = CACHE.get(config);
+ if (r != null) {
+ return r;
+ }
+ try {
+ r = new HybridManager(config);
+ CACHE.put(config, r);
+ if (CACHE.size() > 1) {
+ logger.warn("More than one Hybrid Manager singleton exist");
+ }
+ return r;
+ } catch (IOException e) {
+ throw new IllegalStateException("Failed to init Hybrid Manager from " + config, e);
+ }
+ }
+ }
+
+ public static void clearCache() {
+ CACHE.clear();
+ }
+
+ // ============================================================================
+
+ private KylinConfig config;
+
+ private CaseInsensitiveStringCache<HybridInstance> hybridMap = new CaseInsensitiveStringCache<HybridInstance>(Broadcaster.TYPE.HYBRID);
+
+ private HybridManager(KylinConfig config) throws IOException {
+ logger.info("Initializing HybridManager with config " + config);
+ this.config = config;
+
+ loadAllHybridInstance();
+ }
+
+ private void loadAllHybridInstance() throws IOException {
+ ResourceStore store = getStore();
+ List<String> paths = store.collectResourceRecursively(ResourceStore.HYBRID_RESOURCE_ROOT, ".json");
+
+ logger.debug("Loading Hybrid from folder " + store.getReadableResourcePath(ResourceStore.HYBRID_RESOURCE_ROOT));
+
+ for (String path : paths) {
+ loadHybridInstance(path);
+ }
+
+ logger.debug("Loaded " + paths.size() + " Hybrid(s)");
+ }
+
+ private synchronized HybridInstance loadHybridInstance(String path) throws IOException {
+ ResourceStore store = getStore();
+
+ HybridInstance hybridInstance = null;
+ try {
+ hybridInstance = store.getResource(path, HybridInstance.class, HYBRID_SERIALIZER);
+ hybridInstance.setConfig(config);
+
+ if (StringUtils.isBlank(hybridInstance.getName()))
+ throw new IllegalStateException("HybridInstance name must not be blank, at " + path);
+
+ if (hybridInstance.getHistoryRealization() == null || hybridInstance.getRealTimeRealization() == null) {
+
+ throw new IllegalStateException("HybridInstance must have both historyRealization and realTimeRealization set, at " + path);
+ }
+
+ final String name = hybridInstance.getName();
+ hybridMap.putLocal(name, hybridInstance);
+
+ return hybridInstance;
+ } catch (Exception e) {
+ logger.error("Error during load hybrid instance " + path, e);
+ return null;
+ }
+ }
+
+ @Override
+ public RealizationType getRealizationType() {
+ return RealizationType.HYBRID;
+ }
+
+ @Override
+ public IRealization getRealization(String name) {
+ return getHybridInstance(name);
+ }
+
+ public HybridInstance getHybridInstance(String name) {
+ return hybridMap.get(name);
+ }
+
+ private ResourceStore getStore() {
+ return ResourceStore.getStore(this.config);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridStorageEngine.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridStorageEngine.java b/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridStorageEngine.java
new file mode 100644
index 0000000..7ad5ed0
--- /dev/null
+++ b/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridStorageEngine.java
@@ -0,0 +1,69 @@
+package org.apache.kylin.storage.hybrid;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Ranges;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.realization.SQLDigest;
+import org.apache.kylin.metadata.realization.SQLDigestUtil;
+import org.apache.kylin.metadata.tuple.CompoundTupleIterator;
+import org.apache.kylin.metadata.tuple.ITupleIterator;
+import org.apache.kylin.storage.IStorageEngine;
+import org.apache.kylin.storage.StorageContext;
+import org.apache.kylin.storage.StorageEngineFactory;
+import org.apache.kylin.storage.tuple.TupleInfo;
+
+import javax.annotation.Nullable;
+
+/**
+ */
+public class HybridStorageEngine implements IStorageEngine {
+
+ private HybridInstance hybridInstance;
+ private IStorageEngine historicalStorageEngine;
+ private IStorageEngine realtimeStorageEngine;
+
+ public HybridStorageEngine(HybridInstance hybridInstance) {
+ this.hybridInstance = hybridInstance;
+ this.historicalStorageEngine = StorageEngineFactory.getStorageEngine(this.hybridInstance.getHistoryRealizationInstance());
+ this.realtimeStorageEngine = StorageEngineFactory.getStorageEngine(this.hybridInstance.getRealTimeRealizationInstance());
+ }
+
+ @Override
+ public ITupleIterator search(final StorageContext context, final SQLDigest sqlDigest) {
+
+ // search the historic realization
+ ITupleIterator historicalDataIterator = this.historicalStorageEngine.search(context, sqlDigest);
+
+ String modelName = hybridInstance.getModelName();
+ MetadataManager metaMgr = getMetadataManager();
+ DataModelDesc modelDesc = metaMgr.getDataModelDesc(modelName);
+
+ // if the model isn't partitioned, only query the history
+ if (modelDesc.getPartitionDesc() == null || modelDesc.getPartitionDesc().getPartitionDateColumnRef() == null)
+ return historicalDataIterator;
+
+ TblColRef partitionColRef = modelDesc.getPartitionDesc().getPartitionDateColumnRef();
+
+ ITupleIterator realtimeDataIterator = SQLDigestUtil.appendTsFilterToExecute(sqlDigest, partitionColRef, //
+ Ranges.atLeast(hybridInstance.getHistoryRealizationInstance().getDateRangeEnd()),//
+ new Function<Void, ITupleIterator>() {
+ @Nullable
+ @Override
+ public ITupleIterator apply(@Nullable Void input) {
+ return realtimeStorageEngine.search(context, sqlDigest);
+ }
+ });
+
+ // combine history and real-time tuple iterator
+ return new CompoundTupleIterator(Lists.newArrayList(historicalDataIterator, realtimeDataIterator));
+ }
+
+
+ private MetadataManager getMetadataManager() {
+ return MetadataManager.getInstance(KylinConfig.getInstanceFromEnv());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregationTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregationTest.java b/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregationTest.java
index 87c6445..b98b09b 100644
--- a/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregationTest.java
+++ b/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregationTest.java
@@ -36,6 +36,7 @@ import org.apache.kylin.storage.hbase.coprocessor.CoprocessorFilter;
import org.apache.kylin.storage.hbase.coprocessor.CoprocessorProjector;
import org.junit.After;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Test;
import java.io.IOException;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/TableRecordInfoTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/TableRecordInfoTest.java b/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/TableRecordInfoTest.java
index c4f785a..6ec9385 100644
--- a/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/TableRecordInfoTest.java
+++ b/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/TableRecordInfoTest.java
@@ -25,6 +25,7 @@ import org.apache.kylin.invertedindex.index.TableRecordInfo;
import org.apache.kylin.invertedindex.index.TableRecordInfoDigest;
import org.junit.After;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Test;
import java.io.IOException;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c87aa4a3/storage/src/test/java/org/apache/kylin/storage/hybrid/HybridManagerTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/hybrid/HybridManagerTest.java b/storage/src/test/java/org/apache/kylin/storage/hybrid/HybridManagerTest.java
new file mode 100644
index 0000000..3ad7ca6
--- /dev/null
+++ b/storage/src/test/java/org/apache/kylin/storage/hybrid/HybridManagerTest.java
@@ -0,0 +1,45 @@
+package org.apache.kylin.storage.hybrid;
+
+import org.apache.kylin.common.util.JsonUtil;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.metadata.realization.IRealization;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ */
+public class HybridManagerTest extends LocalFileMetadataTestCase {
+
+ @Before
+ public void setUp() throws Exception {
+ this.createTestMetadata();
+ }
+
+ @After
+ public void after() throws Exception {
+ this.cleanupTestMetadata();
+ }
+
+ @Test
+ public void testBasics() throws Exception {
+ HybridInstance cube = getHybridManager().getHybridInstance("test_kylin_hybrid_left_join");
+ cube.init();
+ System.out.println(JsonUtil.writeValueAsIndentString(cube));
+
+ IRealization history = cube.getHistoryRealizationInstance();
+ IRealization realTime = cube.getRealTimeRealizationInstance();
+
+ Assert.assertTrue(history instanceof CubeInstance);
+ Assert.assertTrue(realTime instanceof IIInstance);
+
+ }
+
+
+ public HybridManager getHybridManager() {
+ return HybridManager.getInstance(getTestConfig());
+ }
+}