You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2015/09/07 13:20:42 UTC
[07/28] incubator-kylin git commit: KYLIN-972 Make MR_V2 the default
engine for new cubes. Old cubes (0.7) continue to build with MR_V1 engine.
KYLIN-972 Make MR_V2 the default engine for new cubes. Old cubes (0.7)
continue to build with MR_V1 engine.
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/bf83339c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/bf83339c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/bf83339c
Branch: refs/heads/2.x-staging
Commit: bf83339c1a69770365dded79cf5c3062ec9839de
Parents: ab2abee
Author: Li, Yang <ya...@ebay.com>
Authored: Sat Aug 29 07:57:09 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Tue Sep 1 16:22:14 2015 +0800
----------------------------------------------------------------------
.../persistence/RootPersistentEntity.java | 1 +
.../org/apache/kylin/common/util/BytesUtil.java | 2 +-
.../org/apache/kylin/common/util/ClassUtil.java | 10 +-
.../kylin/common/util/CompressionUtils.java | 17 ++
.../kylin/common/util/DaemonThreadFactory.java | 17 ++
.../apache/kylin/common/util/DateFormat.java | 17 ++
.../apache/kylin/common/util/FIFOIterable.java | 17 ++
.../apache/kylin/common/util/FIFOIterator.java | 17 ++
.../apache/kylin/common/util/IdentityUtils.java | 17 ++
.../kylin/common/util/ImmutableBitSet.java | 17 ++
.../kylin/common/util/ImplementationSwitch.java | 70 +++++++
.../common/util/ImplementationSwitchTest.java | 58 ++++++
.../org/apache/kylin/cube/CubeInstance.java | 47 ++++-
.../java/org/apache/kylin/cube/CubeManager.java | 4 +-
.../java/org/apache/kylin/cube/CubeSegment.java | 9 +-
.../org/apache/kylin/cube/model/CubeDesc.java | 41 +---
.../apache/kylin/dict/DictionaryManager.java | 4 +-
.../apache/kylin/dict/lookup/SnapshotCLI.java | 4 +-
.../apache/kylin/engine/BuildEngineFactory.java | 52 -----
.../org/apache/kylin/engine/EngineFactory.java | 66 ++++++
.../kylin/metadata/model/DataModelDesc.java | 21 +-
.../apache/kylin/metadata/model/IBuildable.java | 7 +-
.../kylin/metadata/model/IEngineAware.java | 28 +++
.../kylin/metadata/model/ISourceAware.java | 27 +++
.../kylin/metadata/model/IStorageAware.java | 27 +++
.../apache/kylin/metadata/model/TableDesc.java | 11 +-
.../metadata/realization/IRealization.java | 3 +-
.../java/org/apache/kylin/source/ISource.java | 28 +++
.../org/apache/kylin/source/ITableSource.java | 28 ---
.../org/apache/kylin/source/SourceFactory.java | 50 +++++
.../apache/kylin/source/TableSourceFactory.java | 40 ----
.../java/org/apache/kylin/storage/IStorage.java | 2 +-
.../apache/kylin/storage/StorageFactory.java | 32 ++-
.../kylin/storage/StorageQueryFactory.java | 97 ---------
.../cache/AbstractCacheFledgedQuery.java | 84 ++++++++
.../AbstractCacheFledgedStorageEngine.java | 84 --------
.../storage/cache/CacheFledgedDynamicQuery.java | 149 ++++++++++++++
.../cache/CacheFledgedDynamicStorageEngine.java | 149 --------------
.../storage/cache/CacheFledgedStaticQuery.java | 88 ++++++++
.../cache/CacheFledgedStaticStorageEngine.java | 88 --------
.../exception/ScanOutOfLimitException.java | 2 +-
.../kylin/storage/hybrid/HybridInstance.java | 24 ++-
.../kylin/storage/hybrid/HybridManager.java | 17 ++
.../kylin/storage/hybrid/HybridStorage.java | 36 ++++
.../storage/hybrid/HybridStorageEngine.java | 44 ----
.../storage/hybrid/HybridStorageQuery.java | 61 ++++++
.../kylin/storage/cache/DynamicCacheTest.java | 2 +-
.../kylin/storage/cache/StaticCacheTest.java | 2 +-
.../kylin/engine/mr/BatchCubingJobBuilder.java | 10 +-
.../kylin/engine/mr/BatchCubingJobBuilder2.java | 5 +
.../kylin/engine/mr/BatchMergeJobBuilder.java | 5 +
.../kylin/engine/mr/BatchMergeJobBuilder2.java | 5 +
.../kylin/engine/mr/JobBuilderSupport.java | 3 -
.../java/org/apache/kylin/engine/mr/MRUtil.java | 6 +-
.../apache/kylin/engine/mr/steps/CuboidJob.java | 3 +-
.../engine/spark/SparkCubingJobBuilder.java | 3 +
.../apache/kylin/invertedindex/IIInstance.java | 7 +
.../job/hadoop/cube/NewBaseCuboidMapper.java | 6 +-
.../kylin/job/BuildCubeWithEngineTest.java | 4 +-
.../java/org/apache/kylin/job/DeployUtil.java | 5 +-
.../source/hive/ITSnapshotManagerTest.java | 4 +-
.../kylin/query/enumerator/OLAPEnumerator.java | 4 +-
.../AdjustForWeaklyMatchedRealization.java | 6 +-
.../kylin/rest/controller/QueryController.java | 4 +-
.../apache/kylin/rest/service/CubeService.java | 4 +-
.../apache/kylin/rest/service/JobService.java | 8 +-
.../kylin/source/hive/HiveTableSource.java | 4 +-
.../kylin/storage/hbase/HBaseStorage.java | 70 ++++++-
.../storage/hbase/steps/DeprecatedGCStep.java | 201 +++++++++++++++++++
.../storage/hbase/common/ITStorageTest.java | 8 +-
70 files changed, 1395 insertions(+), 698 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bf83339c/core-common/src/main/java/org/apache/kylin/common/persistence/RootPersistentEntity.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/RootPersistentEntity.java b/core-common/src/main/java/org/apache/kylin/common/persistence/RootPersistentEntity.java
index 0cbf9c2..bc72c1e 100644
--- a/core-common/src/main/java/org/apache/kylin/common/persistence/RootPersistentEntity.java
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/RootPersistentEntity.java
@@ -40,6 +40,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
*
* @author yangli9
*/
+@SuppressWarnings("serial")
@JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
abstract public class RootPersistentEntity implements AclEntity, Serializable {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bf83339c/core-common/src/main/java/org/apache/kylin/common/util/BytesUtil.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/BytesUtil.java b/core-common/src/main/java/org/apache/kylin/common/util/BytesUtil.java
index 0503ad6..0880da1 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/BytesUtil.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/BytesUtil.java
@@ -14,7 +14,7 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
-*/
+ */
package org.apache.kylin.common.util;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bf83339c/core-common/src/main/java/org/apache/kylin/common/util/ClassUtil.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/ClassUtil.java b/core-common/src/main/java/org/apache/kylin/common/util/ClassUtil.java
index f5474f5..93790e6 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/ClassUtil.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/ClassUtil.java
@@ -47,11 +47,13 @@ public class ClassUtil {
private static final Map<String, String> classRenameMap;
static {
classRenameMap = new HashMap<>();
+ classRenameMap.put("org.apache.kylin.job.common.HadoopShellExecutable", "org.apache.kylin.engine.mr.common.HadoopShellExecutable");
+ classRenameMap.put("org.apache.kylin.job.common.MapReduceExecutable", "org.apache.kylin.engine.mr.common.MapReduceExecutable");
classRenameMap.put("org.apache.kylin.job.cube.CubingJob", "org.apache.kylin.engine.mr.CubingJob");
- classRenameMap.put("org.apache.kylin.job.cube.GarbageCollectionStep", "org.apache.kylin.engine.mr.GarbageCollectionStep");
- classRenameMap.put("org.apache.kylin.job.cube.MergeDictionaryStep", "org.apache.kylin.engine.mr.MergeDictionaryStep");
- classRenameMap.put("org.apache.kylin.job.cube.UpdateCubeInfoAfterBuildStep", "org.apache.kylin.engine.mr.UpdateCubeInfoAfterBuildStep");
- classRenameMap.put("org.apache.kylin.job.cube.UpdateCubeInfoAfterMergeStep", "org.apache.kylin.engine.mr.UpdateCubeInfoAfterMergeStep");
+ classRenameMap.put("org.apache.kylin.job.cube.GarbageCollectionStep", "org.apache.kylin.storage.hbase.steps.DeprecatedGCStep");
+ classRenameMap.put("org.apache.kylin.job.cube.MergeDictionaryStep", "org.apache.kylin.engine.mr.steps.MergeDictionaryStep");
+ classRenameMap.put("org.apache.kylin.job.cube.UpdateCubeInfoAfterBuildStep", "org.apache.kylin.engine.mr.steps.UpdateCubeInfoAfterBuildStep");
+ classRenameMap.put("org.apache.kylin.job.cube.UpdateCubeInfoAfterMergeStep", "org.apache.kylin.engine.mr.steps.UpdateCubeInfoAfterMergeStep");
}
@SuppressWarnings("unchecked")
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bf83339c/core-common/src/main/java/org/apache/kylin/common/util/CompressionUtils.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/CompressionUtils.java b/core-common/src/main/java/org/apache/kylin/common/util/CompressionUtils.java
index 3ed279a..13abab5 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/CompressionUtils.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/CompressionUtils.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.kylin.common.util;
import java.io.ByteArrayOutputStream;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bf83339c/core-common/src/main/java/org/apache/kylin/common/util/DaemonThreadFactory.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/DaemonThreadFactory.java b/core-common/src/main/java/org/apache/kylin/common/util/DaemonThreadFactory.java
index bc4502c..56f4a36 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/DaemonThreadFactory.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/DaemonThreadFactory.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.kylin.common.util;
import java.util.concurrent.Executors;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bf83339c/core-common/src/main/java/org/apache/kylin/common/util/DateFormat.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/DateFormat.java b/core-common/src/main/java/org/apache/kylin/common/util/DateFormat.java
index f74debd..f46edae 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/DateFormat.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/DateFormat.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.kylin.common.util;
import java.text.ParseException;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bf83339c/core-common/src/main/java/org/apache/kylin/common/util/FIFOIterable.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/FIFOIterable.java b/core-common/src/main/java/org/apache/kylin/common/util/FIFOIterable.java
index 4c4bc6b..7204e33 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/FIFOIterable.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/FIFOIterable.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.kylin.common.util;
import java.util.Iterator;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bf83339c/core-common/src/main/java/org/apache/kylin/common/util/FIFOIterator.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/FIFOIterator.java b/core-common/src/main/java/org/apache/kylin/common/util/FIFOIterator.java
index f734143..ccea37c 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/FIFOIterator.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/FIFOIterator.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.kylin.common.util;
import java.util.Iterator;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bf83339c/core-common/src/main/java/org/apache/kylin/common/util/IdentityUtils.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/IdentityUtils.java b/core-common/src/main/java/org/apache/kylin/common/util/IdentityUtils.java
index d873959..35ade60 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/IdentityUtils.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/IdentityUtils.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.kylin.common.util;
import java.util.Collection;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bf83339c/core-common/src/main/java/org/apache/kylin/common/util/ImmutableBitSet.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/ImmutableBitSet.java b/core-common/src/main/java/org/apache/kylin/common/util/ImmutableBitSet.java
index 2ee7d4f..f5a22d2 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/ImmutableBitSet.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/ImmutableBitSet.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.kylin.common.util;
import java.util.BitSet;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bf83339c/core-common/src/main/java/org/apache/kylin/common/util/ImplementationSwitch.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/ImplementationSwitch.java b/core-common/src/main/java/org/apache/kylin/common/util/ImplementationSwitch.java
new file mode 100644
index 0000000..3101c81
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/util/ImplementationSwitch.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.common.util;
+
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Provide switch between different implementations of a same interface.
+ * Each implementation is identified by an integer ID.
+ */
+public class ImplementationSwitch {
+
+ private static final Logger logger = LoggerFactory.getLogger(ImplementationSwitch.class);
+
+ final private Object[] instances;
+
+ public ImplementationSwitch(Map<Integer, String> impls) {
+ instances = initInstances(impls);
+ }
+
+ private Object[] initInstances(Map<Integer, String> impls) {
+ int maxId = 0;
+ for (Integer id : impls.keySet()) {
+ maxId = Math.max(maxId, id);
+ }
+ if (maxId > 100)
+ throw new IllegalArgumentException("you have more than 100 implentations?");
+
+ Object[] result = new Object[maxId + 1];
+
+ for (Integer id : impls.keySet()) {
+ String clzName = impls.get(id);
+ try {
+ result[id] = ClassUtil.newInstance(clzName);
+ } catch (Exception ex) {
+ logger.warn("Implementation missing " + clzName + " - " + ex);
+ }
+ }
+
+ return result;
+ }
+
+ public <I> I get(int id, Class<I> interfaceClz) {
+ @SuppressWarnings("unchecked")
+ I result = (I) instances[id];
+
+ if (result == null)
+ throw new IllegalArgumentException("Implementations missing, ID " + id + ", interafce " + interfaceClz.getName());
+
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bf83339c/core-common/src/test/java/org/apache/kylin/common/util/ImplementationSwitchTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/util/ImplementationSwitchTest.java b/core-common/src/test/java/org/apache/kylin/common/util/ImplementationSwitchTest.java
new file mode 100644
index 0000000..4c69eeb
--- /dev/null
+++ b/core-common/src/test/java/org/apache/kylin/common/util/ImplementationSwitchTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.common.util;
+
+import static org.junit.Assert.*;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Test;
+
+public class ImplementationSwitchTest {
+
+ ImplementationSwitch sw;
+
+ public ImplementationSwitchTest() {
+ Map<Integer, String> impls = new HashMap<>();
+ impls.put(0, "non.exist.class");
+ impls.put(1, Impl1.class.getName());
+ impls.put(2, Impl2.class.getName());
+ sw = new ImplementationSwitch(impls);
+ }
+
+ public static interface I {
+ }
+
+ public static class Impl1 implements I {
+ }
+
+ public static class Impl2 implements I {
+ }
+
+ @Test
+ public void test() {
+ assertTrue(sw.get(1, I.class) instanceof Impl1);
+ assertTrue(sw.get(2, I.class) instanceof Impl2);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testException() {
+ sw.get(0, I.class);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bf83339c/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
index eb3b3e2..fcd338c 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
@@ -26,9 +26,13 @@ import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.persistence.RootPersistentEntity;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.model.IBuildable;
+import org.apache.kylin.metadata.model.IEngineAware;
+import org.apache.kylin.metadata.model.IStorageAware;
import org.apache.kylin.metadata.model.LookupDesc;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.metadata.realization.IRealization;
import org.apache.kylin.metadata.realization.RealizationStatusEnum;
@@ -43,8 +47,9 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Objects;
import com.google.common.collect.Lists;
+@SuppressWarnings("serial")
@JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
-public class CubeInstance extends RootPersistentEntity implements IRealization {
+public class CubeInstance extends RootPersistentEntity implements IRealization, IBuildable {
public static CubeInstance create(String cubeName, String projectName, CubeDesc cubeDesc) {
CubeInstance cubeInstance = new CubeInstance();
@@ -58,6 +63,10 @@ public class CubeInstance extends RootPersistentEntity implements IRealization {
cubeInstance.updateRandomUuid();
cubeInstance.setProjectName(projectName);
cubeInstance.setRetentionRange(cubeDesc.getRetentionRange());
+
+ // MR_V2 is the default engine since 0.8
+ cubeInstance.setEngineType(IEngineAware.ID_MR_V2);
+ cubeInstance.setStorageType(IStorageAware.ID_HBASE);
return cubeInstance;
}
@@ -84,12 +93,14 @@ public class CubeInstance extends RootPersistentEntity implements IRealization {
@JsonProperty("create_time_utc")
private long createTimeUTC;
-
@JsonProperty("auto_merge_time_ranges")
private long[] autoMergeTimeRanges;
-
@JsonProperty("retention_range")
private long retentionRange = 0;
+ @JsonProperty("engine_type")
+ private int engineType = IEngineAware.ID_MR_V1;
+ @JsonProperty("storage_type")
+ private int storageType = IStorageAware.ID_HBASE;
private String projectName;
@@ -205,7 +216,11 @@ public class CubeInstance extends RootPersistentEntity implements IRealization {
@Override
public String getFactTable() {
- return this.getDescriptor().getFactTable();
+ return getDescriptor().getFactTable();
+ }
+
+ public TableDesc getFactTableDesc() {
+ return getDescriptor().getFactTableDesc();
}
@Override
@@ -441,4 +456,28 @@ public class CubeInstance extends RootPersistentEntity implements IRealization {
public void setRetentionRange(long retentionRange) {
this.retentionRange = retentionRange;
}
+
+ @Override
+ public int getSourceType() {
+ return getFactTableDesc().getSourceType();
+ }
+
+ @Override
+ public int getStorageType() {
+ return storageType;
+ }
+
+ private void setStorageType(int storageType) {
+ this.storageType = storageType;
+ }
+
+ @Override
+ public int getEngineType() {
+ return engineType;
+ }
+
+ private void setEngineType(int engineType) {
+ this.engineType = engineType;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bf83339c/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 038c7cb..5cfecf1 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -59,7 +59,7 @@ import org.apache.kylin.metadata.realization.IRealizationProvider;
import org.apache.kylin.metadata.realization.RealizationStatusEnum;
import org.apache.kylin.metadata.realization.RealizationType;
import org.apache.kylin.source.ReadableTable;
-import org.apache.kylin.source.TableSourceFactory;
+import org.apache.kylin.source.SourceFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -207,7 +207,7 @@ public class CubeManager implements IRealizationProvider {
SnapshotManager snapshotMgr = getSnapshotManager();
TableDesc tableDesc = metaMgr.getTableDesc(lookupTable);
- ReadableTable hiveTable = TableSourceFactory.createReadableTable(tableDesc);
+ ReadableTable hiveTable = SourceFactory.createReadableTable(tableDesc);
SnapshotTable snapshot = snapshotMgr.buildSnapshot(hiveTable, tableDesc);
cubeSeg.putSnapshotResPath(lookupTable, snapshot.getResourcePath());
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bf83339c/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
index a80bbd2..7d89470 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
@@ -326,7 +326,7 @@ public class CubeSegment implements Comparable<CubeSegment>, IDictionaryAware, I
@Override
public String toString() {
- return Objects.toStringHelper(this).add("uuid", uuid).add("create_time_utc:", createTimeUTC).add("name", name).add("last_build_job_id", lastBuildJobID).add("status", status).toString();
+ return Objects.toStringHelper(this).add("cube", cubeInstance.getName()).add("name", name).add("status", status).add("uuid", uuid).add("create_time_utc:", createTimeUTC).add("last_build_job_id", lastBuildJobID).toString();
}
public void setDictionaries(ConcurrentHashMap<String, String> dictionaries) {
@@ -347,16 +347,17 @@ public class CubeSegment implements Comparable<CubeSegment>, IDictionaryAware, I
@Override
public int getSourceType() {
- return 0;
+ return cubeInstance.getSourceType();
}
@Override
public int getEngineType() {
- return 0;
+ return cubeInstance.getEngineType();
}
@Override
public int getStorageType() {
- return 0;
+ return cubeInstance.getStorageType();
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bf83339c/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
index f8d71b2..ae49eb0 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
@@ -26,7 +26,6 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
@@ -63,6 +62,7 @@ import com.google.common.collect.Maps;
/**
*/
+@SuppressWarnings("serial")
@JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
public class CubeDesc extends RootPersistentEntity {
@@ -217,29 +217,6 @@ public class CubeDesc extends RootPersistentEntity {
return functions;
}
- /**
- * @return
- * @deprecated use getModel().getAllTables() instead
- */
- public List<TableDesc> listTables() {
- MetadataManager metaMgr = MetadataManager.getInstance(config);
- HashSet<String> tableNames = new HashSet<String>();
- List<TableDesc> result = new ArrayList<TableDesc>();
-
- tableNames.add(this.getFactTable().toUpperCase());
- for (DimensionDesc dim : dimensions) {
- String table = dim.getTable();
- if (table != null)
- tableNames.add(table.toUpperCase());
- }
-
- for (String tableName : tableNames) {
- result.add(metaMgr.getTableDesc(tableName));
- }
-
- return result;
- }
-
public boolean isDerived(TblColRef col) {
return derivedToHostMap.containsKey(col);
}
@@ -330,7 +307,11 @@ public class CubeDesc extends RootPersistentEntity {
}
public String getFactTable() {
- return model.getFactTable().toUpperCase();
+ return model.getFactTable();
+ }
+
+ public TableDesc getFactTableDesc() {
+ return model.getFactTableDesc();
}
public String[] getNullStrings() {
@@ -456,8 +437,8 @@ public class CubeDesc extends RootPersistentEntity {
}
sortDimAndMeasure();
- initDimensionColumns(tables);
- initMeasureColumns(tables);
+ initDimensionColumns();
+ initMeasureColumns();
rowkey.init(this);
if (hbaseMapping != null) {
@@ -473,7 +454,7 @@ public class CubeDesc extends RootPersistentEntity {
}
}
- private void initDimensionColumns(Map<String, TableDesc> tables) {
+ private void initDimensionColumns() {
for (DimensionDesc dim : dimensions) {
JoinDesc join = dim.getJoin();
@@ -622,12 +603,12 @@ public class CubeDesc extends RootPersistentEntity {
return ref;
}
- private void initMeasureColumns(Map<String, TableDesc> tables) {
+ private void initMeasureColumns() {
if (measures == null || measures.isEmpty()) {
return;
}
- TableDesc factTable = tables.get(getFactTable());
+ TableDesc factTable = getFactTableDesc();
for (MeasureDesc m : measures) {
m.setName(m.getName().toUpperCase());
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bf83339c/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
index f6d76dc..c4b6ef0 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
@@ -35,7 +35,7 @@ import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.source.ReadableTable;
import org.apache.kylin.source.ReadableTable.TableSignature;
-import org.apache.kylin.source.TableSourceFactory;
+import org.apache.kylin.source.SourceFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -220,7 +220,7 @@ public class DictionaryManager {
inpTable = factTableValueProvider.getDistinctValuesFor(srcCol);
} else {
TableDesc tableDesc = MetadataManager.getInstance(config).getTableDesc(srcTable);
- inpTable = TableSourceFactory.createReadableTable(tableDesc);
+ inpTable = SourceFactory.createReadableTable(tableDesc);
}
TableSignature inputSig = inpTable.getSignature();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bf83339c/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotCLI.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotCLI.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotCLI.java
index d3e2c7d..149badc 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotCLI.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotCLI.java
@@ -5,7 +5,7 @@ import java.io.IOException;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.metadata.MetadataManager;
import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.source.TableSourceFactory;
+import org.apache.kylin.source.SourceFactory;
public class SnapshotCLI {
@@ -23,7 +23,7 @@ public class SnapshotCLI {
if (tableDesc == null)
throw new IllegalArgumentException("Not table found by " + table);
- SnapshotTable snapshot = snapshotMgr.rebuildSnapshot(TableSourceFactory.createReadableTable(tableDesc), tableDesc, overwriteUUID);
+ SnapshotTable snapshot = snapshotMgr.rebuildSnapshot(SourceFactory.createReadableTable(tableDesc), tableDesc, overwriteUUID);
System.out.println("resource path updated: " + snapshot.getResourcePath());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bf83339c/core-job/src/main/java/org/apache/kylin/engine/BuildEngineFactory.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/engine/BuildEngineFactory.java b/core-job/src/main/java/org/apache/kylin/engine/BuildEngineFactory.java
deleted file mode 100644
index 7c21e69..0000000
--- a/core-job/src/main/java/org/apache/kylin/engine/BuildEngineFactory.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.ClassUtil;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.job.execution.DefaultChainedExecutable;
-
-public class BuildEngineFactory {
-
- private static IBatchCubingEngine defaultBatchEngine;
-
- public static IBatchCubingEngine defaultBatchEngine() {
- if (defaultBatchEngine == null) {
- KylinConfig conf = KylinConfig.getInstanceFromEnv();
- if (conf.isCubingInMem()) {
- defaultBatchEngine = (IBatchCubingEngine) ClassUtil.newInstance("org.apache.kylin.engine.mr.MRBatchCubingEngine2");
- } else {
- defaultBatchEngine = (IBatchCubingEngine) ClassUtil.newInstance("org.apache.kylin.engine.mr.MRBatchCubingEngine");
- }
- }
- return defaultBatchEngine;
- }
-
- /** Build a new cube segment, typically its time range appends to the end of current cube. */
- public static DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter) {
- return defaultBatchEngine().createBatchCubingJob(newSegment, submitter);
- }
-
- /** Merge multiple small segments into a big one. */
- public static DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter) {
- return defaultBatchEngine().createBatchMergeJob(mergeSegment, submitter);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bf83339c/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java b/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java
new file mode 100644
index 0000000..8b8fb87
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kylin.common.util.ImplementationSwitch;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.metadata.model.IEngineAware;
+import static org.apache.kylin.metadata.model.IEngineAware.*;
+
+public class EngineFactory {
+
+ private static ImplementationSwitch batchEngines;
+ private static ImplementationSwitch streamingEngines;
+ static {
+ Map<Integer, String> impls = new HashMap<>();
+ impls.put(ID_MR_V1, "org.apache.kylin.engine.mr.MRBatchCubingEngine");
+ impls.put(ID_MR_V2, "org.apache.kylin.engine.mr.MRBatchCubingEngine2");
+ batchEngines = new ImplementationSwitch(impls);
+
+ impls.clear();
+ streamingEngines = new ImplementationSwitch(impls); // TODO
+ }
+
+ public static IBatchCubingEngine batchEngine(IEngineAware aware) {
+ return batchEngines.get(aware.getEngineType(), IBatchCubingEngine.class);
+ }
+
+ public static IStreamingCubingEngine streamingEngine(IEngineAware aware) {
+ return streamingEngines.get(aware.getEngineType(), IStreamingCubingEngine.class);
+ }
+
+ /** Build a new cube segment, typically its time range appends to the end of current cube. */
+ public static DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter) {
+ return batchEngine(newSegment).createBatchCubingJob(newSegment, submitter);
+ }
+
+ /** Merge multiple small segments into a big one. */
+ public static DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter) {
+ return batchEngine(mergeSegment).createBatchMergeJob(mergeSegment, submitter);
+ }
+
+ public static Runnable createStreamingCubingBuilder(CubeSegment seg) {
+ return streamingEngine(seg).createStreamingCubingBuilder(seg);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bf83339c/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelDesc.java
index 31d7d6c..1c6ef62 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelDesc.java
@@ -36,6 +36,7 @@ import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.Sets;
+@SuppressWarnings("serial")
@JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
public class DataModelDesc extends RootPersistentEntity {
@@ -69,6 +70,8 @@ public class DataModelDesc extends RootPersistentEntity {
@JsonProperty("capacity")
private RealizationCapacity capacity = RealizationCapacity.MEDIUM;
+ private TableDesc factTableDesc;
+
/**
* Error messages during resolving json metadata
*/
@@ -101,6 +104,10 @@ public class DataModelDesc extends RootPersistentEntity {
public String getFactTable() {
return factTable;
}
+
+ public TableDesc getFactTableDesc() {
+ return factTableDesc;
+ }
public void setFactTable(String factTable) {
this.factTable = factTable.toUpperCase();
@@ -168,6 +175,11 @@ public class DataModelDesc extends RootPersistentEntity {
}
public void init(Map<String, TableDesc> tables) {
+ this.factTableDesc = tables.get(this.factTable.toUpperCase());
+ if (factTableDesc == null) {
+ throw new IllegalStateException("Fact table does not exist:" + this.factTable);
+ }
+
initJoinColumns(tables);
DimensionDesc.capicalizeStrings(dimensions);
initPartitionDesc(tables);
@@ -195,6 +207,7 @@ public class DataModelDesc extends RootPersistentEntity {
StringUtil.toUpperCaseArray(join.getForeignKey(), join.getForeignKey());
StringUtil.toUpperCaseArray(join.getPrimaryKey(), join.getPrimaryKey());
+
// primary key
String[] pks = join.getPrimaryKey();
TblColRef[] pkCols = new TblColRef[pks.length];
@@ -208,15 +221,12 @@ public class DataModelDesc extends RootPersistentEntity {
pkCols[i] = colRef;
}
join.setPrimaryKeyColumns(pkCols);
+
// foreign key
- TableDesc factTable = tables.get(this.factTable.toUpperCase());
- if (factTable == null) {
- throw new IllegalStateException("Fact table does not exist:" + this.getFactTable());
- }
String[] fks = join.getForeignKey();
TblColRef[] fkCols = new TblColRef[fks.length];
for (int i = 0; i < fks.length; i++) {
- ColumnDesc col = factTable.findColumnByName(fks[i]);
+ ColumnDesc col = factTableDesc.findColumnByName(fks[i]);
if (col == null) {
throw new IllegalStateException("Can't find column " + fks[i] + " in table " + this.getFactTable());
}
@@ -225,6 +235,7 @@ public class DataModelDesc extends RootPersistentEntity {
fkCols[i] = colRef;
}
join.setForeignKeyColumns(fkCols);
+
// Validate join in dimension
if (pkCols.length != fkCols.length) {
throw new IllegalStateException("Primary keys(" + lookup.getTable() + ")" + Arrays.toString(pks) + " are not consistent with Foreign keys(" + this.getFactTable() + ") " + Arrays.toString(fks));
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bf83339c/core-metadata/src/main/java/org/apache/kylin/metadata/model/IBuildable.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/IBuildable.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/IBuildable.java
index 3090de0..39129f8 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/IBuildable.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/IBuildable.java
@@ -18,11 +18,6 @@
package org.apache.kylin.metadata.model;
-public interface IBuildable {
+public interface IBuildable extends ISourceAware, IEngineAware, IStorageAware {
- int getSourceType();
-
- int getEngineType();
-
- int getStorageType();
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bf83339c/core-metadata/src/main/java/org/apache/kylin/metadata/model/IEngineAware.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/IEngineAware.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/IEngineAware.java
new file mode 100644
index 0000000..60bd825
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/IEngineAware.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.model;
+
+public interface IEngineAware {
+
+ public static final int ID_MR_V1 = 0;
+ public static final int ID_MR_V2 = 2;
+ public static final int ID_SPARK = 5;
+
+ int getEngineType();
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bf83339c/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISourceAware.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISourceAware.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISourceAware.java
new file mode 100644
index 0000000..3d89f40
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISourceAware.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.model;
+
+public interface ISourceAware {
+
+ public static final int ID_HIVE = 0;
+ public static final int ID_SPARKSQL = 5;
+
+ int getSourceType();
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bf83339c/core-metadata/src/main/java/org/apache/kylin/metadata/model/IStorageAware.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/IStorageAware.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/IStorageAware.java
new file mode 100644
index 0000000..ea1aae9
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/IStorageAware.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.model;
+
+public interface IStorageAware {
+
+ public static final int ID_HBASE = 0;
+ public static final int ID_HYBRID = 1;
+
+ int getStorageType();
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bf83339c/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
index 785e9d4..d5e4dbb 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
@@ -32,12 +32,16 @@ import com.fasterxml.jackson.annotation.JsonProperty;
/**
* Table Metadata from Source. All name should be uppercase.
*/
+@SuppressWarnings("serial")
@JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
-public class TableDesc extends RootPersistentEntity {
+public class TableDesc extends RootPersistentEntity implements ISourceAware {
+
@JsonProperty("name")
private String name;
@JsonProperty("columns")
private ColumnDesc[] columns;
+ @JsonProperty("source_type")
+ private int sourceType = ISourceAware.ID_HIVE;
private DatabaseDesc database = new DatabaseDesc();
@@ -171,4 +175,9 @@ public class TableDesc extends RootPersistentEntity {
mockup.setName(tableName);
return mockup;
}
+
+ @Override
+ public int getSourceType() {
+ return sourceType;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bf83339c/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealization.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealization.java b/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealization.java
index 6f90e14..8c9258a 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealization.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealization.java
@@ -21,10 +21,11 @@ package org.apache.kylin.metadata.realization;
import java.util.List;
import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.model.IStorageAware;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.TblColRef;
-public interface IRealization {
+public interface IRealization extends IStorageAware {
public boolean isCapable(SQLDigest digest);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bf83339c/core-metadata/src/main/java/org/apache/kylin/source/ISource.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/source/ISource.java b/core-metadata/src/main/java/org/apache/kylin/source/ISource.java
new file mode 100644
index 0000000..3cd8a02
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/source/ISource.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source;
+
+import org.apache.kylin.metadata.model.TableDesc;
+
+public interface ISource {
+
+ public <I> I adaptToBuildEngine(Class<I> engineInterface);
+
+ public ReadableTable createReadableTable(TableDesc tableDesc);
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bf83339c/core-metadata/src/main/java/org/apache/kylin/source/ITableSource.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/source/ITableSource.java b/core-metadata/src/main/java/org/apache/kylin/source/ITableSource.java
deleted file mode 100644
index 83ae8b3..0000000
--- a/core-metadata/src/main/java/org/apache/kylin/source/ITableSource.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.source;
-
-import org.apache.kylin.metadata.model.TableDesc;
-
-public interface ITableSource {
-
- public <I> I adaptToBuildEngine(Class<I> engineInterface);
-
- public ReadableTable createReadableTable(TableDesc tableDesc);
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bf83339c/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java b/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java
new file mode 100644
index 0000000..2fbf847
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kylin.common.util.ImplementationSwitch;
+import org.apache.kylin.metadata.model.ISourceAware;
+import static org.apache.kylin.metadata.model.ISourceAware.*;
+import org.apache.kylin.metadata.model.TableDesc;
+
+public class SourceFactory {
+
+ private static ImplementationSwitch sources;
+ static {
+ Map<Integer, String> impls = new HashMap<>();
+ impls.put(ID_HIVE, "org.apache.kylin.source.hive.HiveTableSource");
+ sources = new ImplementationSwitch(impls);
+ }
+
+ public static ISource tableSource(ISourceAware aware) {
+ return sources.get(aware.getSourceType(), ISource.class);
+ }
+
+ public static ReadableTable createReadableTable(TableDesc table) {
+ return tableSource(table).createReadableTable(table);
+ }
+
+ public static <T> T createEngineAdapter(ISourceAware table, Class<T> engineInterface) {
+ return tableSource(table).adaptToBuildEngine(engineInterface);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bf83339c/core-metadata/src/main/java/org/apache/kylin/source/TableSourceFactory.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/source/TableSourceFactory.java b/core-metadata/src/main/java/org/apache/kylin/source/TableSourceFactory.java
deleted file mode 100644
index 67191e3..0000000
--- a/core-metadata/src/main/java/org/apache/kylin/source/TableSourceFactory.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.source;
-
-import org.apache.kylin.common.util.ClassUtil;
-import org.apache.kylin.metadata.model.IBuildable;
-import org.apache.kylin.metadata.model.TableDesc;
-
-public class TableSourceFactory {
-
- private static ITableSource dft = (ITableSource) ClassUtil.newInstance("org.apache.kylin.source.hive.HiveTableSource");
-
- public static ReadableTable createReadableTable(TableDesc table) {
- return dft.createReadableTable(table);
- }
-
- public static <T> T createEngineAdapter(IBuildable buildable, Class<T> engineInterface) {
- return dft.adaptToBuildEngine(engineInterface);
- }
-
- public static <T> T createEngineAdapter(TableDesc tableDesc, Class<T> engineInterface) {
- return dft.adaptToBuildEngine(engineInterface);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bf83339c/core-storage/src/main/java/org/apache/kylin/storage/IStorage.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/IStorage.java b/core-storage/src/main/java/org/apache/kylin/storage/IStorage.java
index 6506a4f..e229e14 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/IStorage.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/IStorage.java
@@ -22,7 +22,7 @@ import org.apache.kylin.metadata.realization.IRealization;
public interface IStorage {
- public IStorageQuery createStorageQuery(IRealization realization);
+ public IStorageQuery createQuery(IRealization realization);
public <I> I adaptToBuildEngine(Class<I> engineInterface);
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bf83339c/core-storage/src/main/java/org/apache/kylin/storage/StorageFactory.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/StorageFactory.java b/core-storage/src/main/java/org/apache/kylin/storage/StorageFactory.java
index b8e3e91..b26dfdb 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/StorageFactory.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/StorageFactory.java
@@ -18,17 +18,37 @@
package org.apache.kylin.storage;
-import org.apache.kylin.common.util.ClassUtil;
-import org.apache.kylin.metadata.model.IBuildable;
+import static org.apache.kylin.metadata.model.IStorageAware.*;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kylin.common.util.ImplementationSwitch;
+import org.apache.kylin.metadata.model.IStorageAware;
+import org.apache.kylin.metadata.realization.IRealization;
/**
*/
public class StorageFactory {
- private static final IStorage dft = (IStorage) ClassUtil.newInstance("org.apache.kylin.storage.hbase.HBaseStorage");
-
- public static <T> T createEngineAdapter(IBuildable buildable, Class<T> engineInterface) {
- return dft.adaptToBuildEngine(engineInterface);
+ private static ImplementationSwitch storages;
+ static {
+ Map<Integer, String> impls = new HashMap<>();
+ impls.put(ID_HBASE, "org.apache.kylin.storage.hbase.HBaseStorage");
+ impls.put(ID_HYBRID, "org.apache.kylin.storage.hybrid.HybridStorage");
+ storages = new ImplementationSwitch(impls);
+ }
+
+ public static IStorage storage(IStorageAware aware) {
+ return storages.get(aware.getStorageType(), IStorage.class);
+ }
+
+ public static IStorageQuery createQuery(IRealization realization) {
+ return storage(realization).createQuery(realization);
+ }
+
+ public static <T> T createEngineAdapter(IStorageAware aware, Class<T> engineInterface) {
+ return storage(aware).adaptToBuildEngine(engineInterface);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bf83339c/core-storage/src/main/java/org/apache/kylin/storage/StorageQueryFactory.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/StorageQueryFactory.java b/core-storage/src/main/java/org/apache/kylin/storage/StorageQueryFactory.java
deleted file mode 100644
index eb6e6b1..0000000
--- a/core-storage/src/main/java/org/apache/kylin/storage/StorageQueryFactory.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.storage;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.metadata.MetadataManager;
-import org.apache.kylin.metadata.model.DataModelDesc;
-import org.apache.kylin.metadata.model.PartitionDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.realization.IRealization;
-import org.apache.kylin.metadata.realization.RealizationType;
-import org.apache.kylin.storage.cache.CacheFledgedDynamicStorageEngine;
-import org.apache.kylin.storage.cache.CacheFledgedStaticStorageEngine;
-import org.apache.kylin.storage.hybrid.HybridInstance;
-import org.apache.kylin.storage.hybrid.HybridStorageEngine;
-
-import com.google.common.base.Preconditions;
-
-/**
- * @author xjiang
- */
-public class StorageQueryFactory {
-
- private final static boolean allowStorageLayerCache = true;
- private final static String defaultCubeStorageQuery = "org.apache.kylin.storage.hbase.cube.v1.CubeStorageQuery";
- private final static String defaultIIStorageQuery = "org.apache.kylin.storage.hbase.ii.InvertedIndexStorageQuery";
-
- public static IStorageQuery createQuery(IRealization realization) {
-
- if (realization.getType() == RealizationType.INVERTED_INDEX) {
- ICachableStorageQuery ret;
- try {
- ret = (ICachableStorageQuery) Class.forName(defaultIIStorageQuery).getConstructor(IIInstance.class).newInstance((IIInstance) realization);
- } catch (Exception e) {
- throw new RuntimeException("Failed to initialize storage query for " + defaultIIStorageQuery, e);
- }
-
- if (allowStorageLayerCache) {
- return wrapWithCache(ret, realization);
- } else {
- return ret;
- }
- } else if (realization.getType() == RealizationType.CUBE) {
- ICachableStorageQuery ret;
- try {
- ret = (ICachableStorageQuery) Class.forName(defaultCubeStorageQuery).getConstructor(CubeInstance.class).newInstance((CubeInstance) realization);
- } catch (Exception e) {
- throw new RuntimeException("Failed to initialize storage query for " + defaultCubeStorageQuery, e);
- }
-
- if (allowStorageLayerCache) {
- return wrapWithCache(ret, realization);
- } else {
- return ret;
- }
- } else {
- return new HybridStorageEngine((HybridInstance) realization);
- }
- }
-
- private static IStorageQuery wrapWithCache(ICachableStorageQuery underlyingStorageEngine, IRealization realization) {
- if (underlyingStorageEngine.isDynamic()) {
- return new CacheFledgedDynamicStorageEngine(underlyingStorageEngine, getPartitionCol(realization));
- } else {
- return new CacheFledgedStaticStorageEngine(underlyingStorageEngine);
- }
- }
-
- private static TblColRef getPartitionCol(IRealization realization) {
- String modelName = realization.getModelName();
- DataModelDesc dataModelDesc = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv()).getDataModelDesc(modelName);
- PartitionDesc partitionDesc = dataModelDesc.getPartitionDesc();
- Preconditions.checkArgument(partitionDesc != null, "PartitionDesc for " + realization + " is null!");
- TblColRef partitionColRef = partitionDesc.getPartitionDateColumnRef();
- Preconditions.checkArgument(partitionColRef != null, "getPartitionDateColumnRef for " + realization + " is null");
- return partitionColRef;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bf83339c/core-storage/src/main/java/org/apache/kylin/storage/cache/AbstractCacheFledgedQuery.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/cache/AbstractCacheFledgedQuery.java b/core-storage/src/main/java/org/apache/kylin/storage/cache/AbstractCacheFledgedQuery.java
new file mode 100644
index 0000000..5ffdf91
--- /dev/null
+++ b/core-storage/src/main/java/org/apache/kylin/storage/cache/AbstractCacheFledgedQuery.java
@@ -0,0 +1,84 @@
+package org.apache.kylin.storage.cache;
+
+import net.sf.ehcache.Cache;
+import net.sf.ehcache.CacheManager;
+import net.sf.ehcache.config.CacheConfiguration;
+import net.sf.ehcache.config.Configuration;
+import net.sf.ehcache.config.MemoryUnit;
+import net.sf.ehcache.config.PersistenceConfiguration;
+import net.sf.ehcache.store.MemoryStoreEvictionPolicy;
+
+import org.apache.kylin.metadata.realization.StreamSQLDigest;
+import org.apache.kylin.metadata.tuple.TeeTupleItrListener;
+import org.apache.kylin.storage.ICachableStorageQuery;
+import org.apache.kylin.storage.IStorageQuery;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public abstract class AbstractCacheFledgedQuery implements IStorageQuery, TeeTupleItrListener {
+ private static final Logger logger = LoggerFactory.getLogger(AbstractCacheFledgedQuery.class);
+ private static final String storageCacheTemplate = "StorageCache";
+
+ protected static CacheManager CACHE_MANAGER;
+
+ protected boolean queryCacheExists;
+ protected ICachableStorageQuery underlyingStorage;
+ protected StreamSQLDigest streamSQLDigest;
+
+ public AbstractCacheFledgedQuery(ICachableStorageQuery underlyingStorage) {
+ this.underlyingStorage = underlyingStorage;
+ this.makeCacheIfNecessary(underlyingStorage.getStorageUUID());
+ }
+
+ public static void setCacheManager(CacheManager cacheManager) {
+ CACHE_MANAGER = cacheManager;
+ }
+
+ private static void initCacheManger() {
+ Configuration conf = new Configuration();
+ conf.setMaxBytesLocalHeap("128M");
+ CACHE_MANAGER = CacheManager.create(conf);
+
+ //a fake template for test cases
+ Cache storageCache = new Cache(new CacheConfiguration(storageCacheTemplate, 0).//
+ memoryStoreEvictionPolicy(MemoryStoreEvictionPolicy.LRU).//
+ eternal(false).//
+ timeToIdleSeconds(86400).//
+ diskExpiryThreadIntervalSeconds(0).//
+ maxBytesLocalHeap(10, MemoryUnit.MEGABYTES).//
+ persistence(new PersistenceConfiguration().strategy(PersistenceConfiguration.Strategy.NONE)));
+
+ CACHE_MANAGER.addCache(storageCache);
+ }
+
+ private void makeCacheIfNecessary(String storageUUID) {
+ if (CACHE_MANAGER == null) {
+ logger.warn("CACHE_MANAGER is not provided");
+ initCacheManger();
+ }
+
+ if (CACHE_MANAGER.getCache(storageUUID) == null) {
+ logger.info("Cache for {} initting...", storageUUID);
+
+ //Create a Cache specifying its configuration.
+ CacheConfiguration templateConf = CACHE_MANAGER.getCache(storageCacheTemplate).getCacheConfiguration();
+ PersistenceConfiguration pconf = templateConf.getPersistenceConfiguration();
+ if (pconf != null) {
+ logger.info("PersistenceConfiguration strategy: " + pconf.getStrategy());
+ } else {
+ logger.warn("PersistenceConfiguration is null");
+ }
+
+ Cache storageCache = new Cache(new CacheConfiguration(storageUUID, (int) templateConf.getMaxEntriesLocalHeap()).//
+ memoryStoreEvictionPolicy(templateConf.getMemoryStoreEvictionPolicy()).//
+ eternal(templateConf.isEternal()).//
+ timeToIdleSeconds(templateConf.getTimeToIdleSeconds()).//
+ maxBytesLocalHeap(templateConf.getMaxBytesLocalHeap(), MemoryUnit.BYTES).persistence(pconf));
+ //TODO: deal with failed queries, and only cache too long query
+
+ CACHE_MANAGER.addCache(storageCache);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bf83339c/core-storage/src/main/java/org/apache/kylin/storage/cache/AbstractCacheFledgedStorageEngine.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/cache/AbstractCacheFledgedStorageEngine.java b/core-storage/src/main/java/org/apache/kylin/storage/cache/AbstractCacheFledgedStorageEngine.java
deleted file mode 100644
index 61e008f..0000000
--- a/core-storage/src/main/java/org/apache/kylin/storage/cache/AbstractCacheFledgedStorageEngine.java
+++ /dev/null
@@ -1,84 +0,0 @@
-package org.apache.kylin.storage.cache;
-
-import net.sf.ehcache.Cache;
-import net.sf.ehcache.CacheManager;
-import net.sf.ehcache.config.CacheConfiguration;
-import net.sf.ehcache.config.Configuration;
-import net.sf.ehcache.config.MemoryUnit;
-import net.sf.ehcache.config.PersistenceConfiguration;
-import net.sf.ehcache.store.MemoryStoreEvictionPolicy;
-
-import org.apache.kylin.metadata.realization.StreamSQLDigest;
-import org.apache.kylin.metadata.tuple.TeeTupleItrListener;
-import org.apache.kylin.storage.ICachableStorageQuery;
-import org.apache.kylin.storage.IStorageQuery;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- */
-public abstract class AbstractCacheFledgedStorageEngine implements IStorageQuery, TeeTupleItrListener {
- private static final Logger logger = LoggerFactory.getLogger(AbstractCacheFledgedStorageEngine.class);
- private static final String storageCacheTemplate = "StorageCache";
-
- protected static CacheManager CACHE_MANAGER;
-
- protected boolean queryCacheExists;
- protected ICachableStorageQuery underlyingStorage;
- protected StreamSQLDigest streamSQLDigest;
-
- public AbstractCacheFledgedStorageEngine(ICachableStorageQuery underlyingStorage) {
- this.underlyingStorage = underlyingStorage;
- this.makeCacheIfNecessary(underlyingStorage.getStorageUUID());
- }
-
- public static void setCacheManager(CacheManager cacheManager) {
- CACHE_MANAGER = cacheManager;
- }
-
- private static void initCacheManger() {
- Configuration conf = new Configuration();
- conf.setMaxBytesLocalHeap("128M");
- CACHE_MANAGER = CacheManager.create(conf);
-
- //a fake template for test cases
- Cache storageCache = new Cache(new CacheConfiguration(storageCacheTemplate, 0).//
- memoryStoreEvictionPolicy(MemoryStoreEvictionPolicy.LRU).//
- eternal(false).//
- timeToIdleSeconds(86400).//
- diskExpiryThreadIntervalSeconds(0).//
- maxBytesLocalHeap(10, MemoryUnit.MEGABYTES).//
- persistence(new PersistenceConfiguration().strategy(PersistenceConfiguration.Strategy.NONE)));
-
- CACHE_MANAGER.addCache(storageCache);
- }
-
- private void makeCacheIfNecessary(String storageUUID) {
- if (CACHE_MANAGER == null) {
- logger.warn("CACHE_MANAGER is not provided");
- initCacheManger();
- }
-
- if (CACHE_MANAGER.getCache(storageUUID) == null) {
- logger.info("Cache for {} initting...", storageUUID);
-
- //Create a Cache specifying its configuration.
- CacheConfiguration templateConf = CACHE_MANAGER.getCache(storageCacheTemplate).getCacheConfiguration();
- PersistenceConfiguration pconf = templateConf.getPersistenceConfiguration();
- if (pconf != null) {
- logger.info("PersistenceConfiguration strategy: " + pconf.getStrategy());
- } else {
- logger.warn("PersistenceConfiguration is null");
- }
-
- Cache storageCache = new Cache(new CacheConfiguration(storageUUID, (int) templateConf.getMaxEntriesLocalHeap()).//
- memoryStoreEvictionPolicy(templateConf.getMemoryStoreEvictionPolicy()).//
- eternal(templateConf.isEternal()).//
- timeToIdleSeconds(templateConf.getTimeToIdleSeconds()).//
- maxBytesLocalHeap(templateConf.getMaxBytesLocalHeap(), MemoryUnit.BYTES).persistence(pconf));
- //TODO: deal with failed queries, and only cache too long query
-
- CACHE_MANAGER.addCache(storageCache);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bf83339c/core-storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedDynamicQuery.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedDynamicQuery.java b/core-storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedDynamicQuery.java
new file mode 100644
index 0000000..febe1a9
--- /dev/null
+++ b/core-storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedDynamicQuery.java
@@ -0,0 +1,149 @@
+package org.apache.kylin.storage.cache;
+
+import java.util.List;
+
+import net.sf.ehcache.Cache;
+import net.sf.ehcache.Element;
+
+import org.apache.kylin.common.util.RangeUtil;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.realization.SQLDigest;
+import org.apache.kylin.metadata.realization.SQLDigestUtil;
+import org.apache.kylin.metadata.realization.StreamSQLDigest;
+import org.apache.kylin.metadata.tuple.CompoundTupleIterator;
+import org.apache.kylin.metadata.tuple.ITuple;
+import org.apache.kylin.metadata.tuple.ITupleIterator;
+import org.apache.kylin.metadata.tuple.SimpleTupleIterator;
+import org.apache.kylin.metadata.tuple.TeeTupleIterator;
+import org.apache.kylin.storage.ICachableStorageQuery;
+import org.apache.kylin.storage.StorageContext;
+import org.apache.kylin.storage.tuple.TupleInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Range;
+
+/**
+ */
+public class CacheFledgedDynamicQuery extends AbstractCacheFledgedQuery {
+ private static final Logger logger = LoggerFactory.getLogger(CacheFledgedDynamicQuery.class);
+
+ private final TblColRef partitionColRef;
+
+ private Range<Long> ts;
+
+ public CacheFledgedDynamicQuery(ICachableStorageQuery underlyingStorage, TblColRef partitionColRef) {
+ super(underlyingStorage);
+ this.partitionColRef = partitionColRef;
+
+ Preconditions.checkArgument(this.partitionColRef != null, "For dynamic columns like " + //
+ this.underlyingStorage.getStorageUUID() + ", partition column must be provided");
+ }
+
+ @Override
+ public ITupleIterator search(final StorageContext context, final SQLDigest sqlDigest, final TupleInfo returnTupleInfo) {
+ //check if ts condition in sqlDigest valid
+ ts = TsConditionExtractor.extractTsCondition(partitionColRef, sqlDigest.filter);
+ if (ts == null || ts.isEmpty()) {
+ logger.info("ts range in the query conflicts,return empty directly");
+ return ITupleIterator.EMPTY_TUPLE_ITERATOR;
+ }
+
+ //enable dynamic cache iff group by columns contains partition col
+ //because cache extraction requires partition col value as selection key
+ boolean needUpdateCache = sqlDigest.groupbyColumns.contains(partitionColRef);
+
+ streamSQLDigest = new StreamSQLDigest(sqlDigest, partitionColRef);
+ StreamSQLResult cachedResult = null;
+ Cache cache = CACHE_MANAGER.getCache(this.underlyingStorage.getStorageUUID());
+ Element element = cache.get(streamSQLDigest.hashCode());
+ if (element != null) {
+ this.queryCacheExists = true;
+ cachedResult = (StreamSQLResult) element.getObjectValue();
+ }
+
+ ITupleIterator ret = null;
+ if (cachedResult != null) {
+ Range<Long> reusePeriod = cachedResult.getReusableResults(ts);
+
+ logger.info("existing cache : " + cachedResult);
+ logger.info("ts Range in query: " + RangeUtil.formatTsRange(ts));
+ logger.info("potential reusable range : " + RangeUtil.formatTsRange(reusePeriod));
+
+ if (reusePeriod != null) {
+ List<Range<Long>> remainings = RangeUtil.remove(ts, reusePeriod);
+ if (remainings.size() == 1) {//if using cache causes two underlyingStorage searches, we'd rather not use the cache
+
+ SimpleTupleIterator reusedTuples = new SimpleTupleIterator(cachedResult.reuse(reusePeriod));
+ List<ITupleIterator> iTupleIteratorList = Lists.newArrayList();
+ iTupleIteratorList.add(reusedTuples);
+
+ for (Range<Long> remaining : remainings) {//actually there will be only one loop
+ logger.info("Appending ts " + RangeUtil.formatTsRange(remaining) + " as additional filter");
+
+ ITupleIterator freshTuples = SQLDigestUtil.appendTsFilterToExecute(sqlDigest, partitionColRef, remaining, new Function<Void, ITupleIterator>() {
+ @Override
+ public ITupleIterator apply(Void input) {
+ return underlyingStorage.search(context, sqlDigest, returnTupleInfo);
+ }
+ });
+ iTupleIteratorList.add(freshTuples);
+ }
+
+ ret = new CompoundTupleIterator(iTupleIteratorList);
+ } else if (remainings.size() == 0) {
+ logger.info("The ts range in new query was fully cached");
+ needUpdateCache = false;
+ ret = new SimpleTupleIterator(cachedResult.reuse(reusePeriod));
+ } else {
+ //if using cache causes more than one underlyingStorage searches
+ //the incurred overhead might be more expensive than the cache benefit
+ logger.info("Give up using cache to avoid complexity");
+ }
+ }
+ } else {
+ logger.info("no cache entry for this query");
+ }
+
+ if (ret == null) {
+ ret = underlyingStorage.search(context, sqlDigest, returnTupleInfo);
+ logger.info("No Cache being used");
+ } else {
+ logger.info("Cache being used");
+ }
+
+ if (needUpdateCache) {
+ //use another nested ITupleIterator to deal with cache
+ final TeeTupleIterator tee = new TeeTupleIterator(ret);
+ tee.addCloseListener(this);
+ return tee;
+ } else {
+ return ret;
+ }
+ }
+
+ @Override
+ public void notify(List<ITuple> duplicated, long createTime) {
+
+ Range<Long> cacheExclude = this.underlyingStorage.getVolatilePeriod();
+ if (cacheExclude != null) {
+ List<Range<Long>> cachablePeriods = RangeUtil.remove(ts, cacheExclude);
+ if (cachablePeriods.size() == 1) {
+ if (!ts.equals(cachablePeriods.get(0))) {
+ logger.info("With respect to growing storage, the cacheable tsRange shrinks from " + RangeUtil.formatTsRange(ts) + " to " + RangeUtil.formatTsRange(cachablePeriods.get(0)));
+ }
+ ts = cachablePeriods.get(0);
+ } else {
+ //give up updating the cache, in avoid to make cache complicated
+ logger.info("Skip updating cache to avoid complexity");
+ }
+ }
+
+ StreamSQLResult newCacheEntry = new StreamSQLResult(duplicated, ts, partitionColRef);
+ CACHE_MANAGER.getCache(this.underlyingStorage.getStorageUUID()).put(new Element(streamSQLDigest.hashCode(), newCacheEntry));
+ logger.info("cache after the query: " + newCacheEntry);
+ }
+}