You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/04/17 15:32:53 UTC
[1/2] kylin git commit: KYLIN-1566 use a separate kylin_job_conf.xml
for in-mem cubing
Repository: kylin
Updated Branches:
refs/heads/KYLIN-1077 [created] 66ad13860
KYLIN-1566 use a separate kylin_job_conf.xml for in-mem cubing
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/209068b9
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/209068b9
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/209068b9
Branch: refs/heads/KYLIN-1077
Commit: 209068b943bf4a90efe4df618e1aaf5cbfe49cde
Parents: 1b54a40
Author: shaofengshi <sh...@apache.org>
Authored: Fri Apr 15 16:11:44 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Sat Apr 16 09:05:40 2016 +0800
----------------------------------------------------------------------
build/conf/kylin_job_conf_inmem.xml | 98 ++++++++++++++++++++
.../apache/kylin/common/KylinConfigBase.java | 19 ----
.../kylin/job/engine/JobEngineConfig.java | 44 ++++++---
.../kylin/engine/mr/BatchCubingJobBuilder2.java | 3 +-
.../kylin/engine/mr/JobBuilderSupport.java | 13 ++-
.../kylin/engine/mr/steps/InMemCuboidJob.java | 11 ---
.../cardinality/HiveColumnCardinalityJob.java | 2 +-
7 files changed, 140 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/209068b9/build/conf/kylin_job_conf_inmem.xml
----------------------------------------------------------------------
diff --git a/build/conf/kylin_job_conf_inmem.xml b/build/conf/kylin_job_conf_inmem.xml
new file mode 100644
index 0000000..55bf9ed
--- /dev/null
+++ b/build/conf/kylin_job_conf_inmem.xml
@@ -0,0 +1,98 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<configuration>
+
+ <property>
+ <name>mapreduce.job.split.metainfo.maxsize</name>
+ <value>-1</value>
+ <description>The maximum permissible size of the split metainfo file.
+ The JobTracker won't attempt to read split metainfo files bigger than
+ the configured value. No limits if set to -1.
+ </description>
+ </property>
+
+ <property>
+ <name>mapred.compress.map.output</name>
+ <value>true</value>
+ <description>Compress map outputs</description>
+ </property>
+
+ <property>
+ <name>mapred.map.output.compression.codec</name>
+ <value>org.apache.hadoop.io.compress.SnappyCodec</value>
+ <description>The compression codec to use for map outputs
+ </description>
+ </property>
+
+ <property>
+ <name>mapred.output.compress</name>
+ <value>true</value>
+ <description>Compress the output of a MapReduce job</description>
+ </property>
+
+ <property>
+ <name>mapred.output.compression.codec</name>
+ <value>org.apache.hadoop.io.compress.SnappyCodec</value>
+ <description>The compression codec to use for job outputs
+ </description>
+ </property>
+
+ <property>
+ <name>mapred.output.compression.type</name>
+ <value>BLOCK</value>
+ <description>The compression type to use for job outputs</description>
+ </property>
+
+
+ <property>
+ <name>mapreduce.job.max.split.locations</name>
+ <value>2000</value>
+ <description>No description</description>
+ </property>
+
+ <property>
+ <name>dfs.replication</name>
+ <value>2</value>
+ <description>Block replication</description>
+ </property>
+
+ <property>
+ <name>mapred.task.timeout</name>
+ <value>3600000</value>
+ <description>Set task timeout to 1 hour</description>
+ </property>
+
+ <!--Additional config for in-mem cubing, giving mapper more memory -->
+ <property>
+ <name>mapreduce.map.memory.mb</name>
+ <value>3072</value>
+ <description></description>
+ </property>
+
+ <property>
+ <name>mapreduce.map.java.opts</name>
+ <value>-Xmx2700m</value>
+ <description></description>
+ </property>
+
+ <property>
+ <name>mapreduce.task.io.sort.mb</name>
+ <value>200</value>
+ <description></description>
+ </property>
+
+</configuration>
http://git-wip-us.apache.org/repos/asf/kylin/blob/209068b9/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 51aa8aa..4d65c1d 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -531,25 +531,6 @@ abstract public class KylinConfigBase implements Serializable {
return percent;
}
- public Map<String, String> getCubingInMemMRJobConfOverride() {
- // in-mem cubing requires big memory, however dev env (sandbox) may not have that much
- String defaultOverride = isDevEnv() ? "" : "mapreduce.map.java.opts=-Xmx2700m; mapreduce.map.memory.mb=3072; mapreduce.task.io.sort.mb=200";
- String override = getOptional("kylin.job.cubing.inmem.mrjob_conf_override", defaultOverride);
-
- Map<String, String> result = Maps.newHashMap();
- for (String pair : override.split(";")) {
- int cut = pair.indexOf('=');
- if (cut < 0)
- continue;
- String k = pair.substring(0, cut).trim();
- String v = pair.substring(cut + 1).trim();
- if (k.isEmpty() || v.isEmpty())
- continue;
- result.put(k, v);
- }
- return result;
- }
-
public String getHbaseDefaultCompressionCodec() {
return getOptional("kylin.hbase.default.compression.codec", "");
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/209068b9/core-job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java b/core-job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java
index 546c033..fb4ce68 100644
--- a/core-job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java
+++ b/core-job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java
@@ -33,8 +33,10 @@ import org.slf4j.LoggerFactory;
*/
public class JobEngineConfig {
private static final Logger logger = LoggerFactory.getLogger(JobEngineConfig.class);
- public static String HADOOP_JOB_CONF_FILENAME = "kylin_job_conf";
- public static String HIVE_CONF_FILENAME = "kylin_hive_conf";
+ public static final String HADOOP_JOB_CONF_FILENAME = "kylin_job_conf";
+ public static final String HIVE_CONF_FILENAME = "kylin_hive_conf";
+ public static final String DEFAUL_JOB_CONF_SUFFIX = "";
+ public static final String IN_MEM_JOB_CONF_SUFFIX = "inmem";
private static File getJobConfig(String fileName) {
String path = System.getProperty(KylinConfig.KYLIN_CONF);
@@ -49,10 +51,10 @@ public class JobEngineConfig {
return null;
}
- private String getHadoopJobConfFilePath(RealizationCapacity capaticy, boolean appendSuffix) throws IOException {
+ private String getHadoopJobConfFilePath(String suffix, boolean appendSuffix) throws IOException {
String hadoopJobConfFile;
- if (capaticy != null && appendSuffix) {
- hadoopJobConfFile = (HADOOP_JOB_CONF_FILENAME + "_" + capaticy.toString().toLowerCase() + ".xml");
+ if (suffix != null && appendSuffix) {
+ hadoopJobConfFile = (HADOOP_JOB_CONF_FILENAME + "_" + suffix.toLowerCase() + ".xml");
} else {
hadoopJobConfFile = (HADOOP_JOB_CONF_FILENAME + ".xml");
}
@@ -69,19 +71,31 @@ public class JobEngineConfig {
return OptionsHelper.convertToFileURL(jobConfig.getAbsolutePath());
}
- public String getHadoopJobConfFilePath(RealizationCapacity capaticy) throws IOException {
- String path = getHadoopJobConfFilePath(capaticy, true);
- if (!StringUtils.isEmpty(path)) {
- logger.info("Chosen job conf is : " + path);
- return path;
+ /**
+ *
+ * @param suffix job config file suffix name; if be null, will use the default job conf
+ * @return the job config file path
+ * @throws IOException
+ */
+ public String getHadoopJobConfFilePath(String jobType, String capacity) throws IOException {
+ String suffix;
+ if(!StringUtils.isEmpty(jobType)) {
+ suffix = jobType + "_" + capacity;
} else {
- path = getHadoopJobConfFilePath(capaticy, false);
- if (!StringUtils.isEmpty(path)) {
- logger.info("Chosen job conf is : " + path);
- return path;
+ suffix = capacity;
+ }
+ String path = getHadoopJobConfFilePath(suffix, true);
+ if (StringUtils.isEmpty(path)) {
+ path = getHadoopJobConfFilePath(jobType, true);
+ if (StringUtils.isEmpty(path)) {
+ path = getHadoopJobConfFilePath(jobType, false);
+ if (StringUtils.isEmpty(path)) {
+ path = "";
+ }
}
}
- return "";
+ logger.info("Chosen job conf is : " + path);
+ return path;
}
public String getHiveConfFilePath() throws IOException {
http://git-wip-us.apache.org/repos/asf/kylin/blob/209068b9/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
index 0b1bd90..a1c9cd9 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
@@ -30,6 +30,7 @@ import org.apache.kylin.engine.mr.steps.InMemCuboidJob;
import org.apache.kylin.engine.mr.steps.NDCuboidJob;
import org.apache.kylin.engine.mr.steps.SaveStatisticsStep;
import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.engine.JobEngineConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -108,7 +109,7 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
MapReduceExecutable cubeStep = new MapReduceExecutable();
StringBuilder cmd = new StringBuilder();
- appendMapReduceParameters(cmd, ((CubeSegment) seg).getCubeDesc().getModel());
+ appendMapReduceParameters(cmd, JobEngineConfig.IN_MEM_JOB_CONF_SUFFIX, ((CubeSegment) seg).getCubeDesc().getModel());
cubeStep.setName(ExecutableConstants.STEP_NAME_BUILD_IN_MEM_CUBE);
http://git-wip-us.apache.org/repos/asf/kylin/blob/209068b9/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
index c4fc6b9..841c402 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
@@ -21,7 +21,6 @@ package org.apache.kylin.engine.mr;
import java.io.IOException;
import java.util.List;
-import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.engine.mr.common.HadoopShellExecutable;
@@ -162,9 +161,17 @@ public class JobBuilderSupport {
return getRealizationRootPath(jobId) + "/secondary_index/";
}
- public void appendMapReduceParameters(StringBuilder buf, DataModelDesc modelDesc) {
+ public void appendMapReduceParameters(StringBuilder buf, DataModelDesc dataModelDesc) {
+ appendMapReduceParameters(buf, JobEngineConfig.DEFAUL_JOB_CONF_SUFFIX, dataModelDesc.getCapacity().toString());
+ }
+
+ public void appendMapReduceParameters(StringBuilder buf, String jobType, DataModelDesc dataModelDesc) {
+ appendMapReduceParameters(buf, jobType, dataModelDesc.getCapacity().toString());
+ }
+
+ public void appendMapReduceParameters(StringBuilder buf, String jobType, String capacity) {
try {
- String jobConf = config.getHadoopJobConfFilePath(modelDesc.getCapacity());
+ String jobConf = config.getHadoopJobConfFilePath(jobType, capacity);
if (jobConf != null && jobConf.length() > 0) {
buf.append(" -conf ").append(jobConf);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/209068b9/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
index f440b22..e7bbdf1 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
@@ -101,9 +101,6 @@ public class InMemCuboidJob extends AbstractHadoopJob {
job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
logger.info("Starting: " + job.getJobName());
- // some special tuning for in-mem MR job
- overrideJobConf(job.getConfiguration(), config);
-
setJobClasspath(job);
// add metadata to distributed cache
@@ -112,8 +109,6 @@ public class InMemCuboidJob extends AbstractHadoopJob {
// set job configuration
job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);
- long timeout = 1000 * 60 * 60L; // 1 hour
- job.getConfiguration().set("mapred.task.timeout", String.valueOf(timeout));
// set input
IMRTableInputFormat flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat();
@@ -149,12 +144,6 @@ public class InMemCuboidJob extends AbstractHadoopJob {
}
}
- private void overrideJobConf(Configuration jobConf, KylinConfig kylinConfig) {
- for (Entry<String, String> entry : kylinConfig.getCubingInMemMRJobConfOverride().entrySet()) {
- jobConf.set(entry.getKey(), entry.getValue());
- }
- }
-
private int calculateReducerNum(CubeSegment cubeSeg) throws IOException {
KylinConfig kylinConfig = cubeSeg.getConfig();
http://git-wip-us.apache.org/repos/asf/kylin/blob/209068b9/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java
index 9162208..3ce0ab2 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java
@@ -71,7 +71,7 @@ public class HiveColumnCardinalityJob extends AbstractHadoopJob {
Configuration conf = getConf();
JobEngineConfig jobEngineConfig = new JobEngineConfig(KylinConfig.getInstanceFromEnv());
- conf.addResource(jobEngineConfig.getHadoopJobConfFilePath(null));
+ conf.addResource(jobEngineConfig.getHadoopJobConfFilePath(null, null));
job = Job.getInstance(conf, jobName);
[2/2] kylin git commit: KYLIN-1077 Support Hive View as Lookup Table
Posted by sh...@apache.org.
KYLIN-1077 Support Hive View as Lookup Table
Signed-off-by: wangxianbin1987 <wa...@gmail.com>
Signed-off-by: shaofengshi <sh...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/66ad1386
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/66ad1386
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/66ad1386
Branch: refs/heads/KYLIN-1077
Commit: 66ad1386041abc65a56d393aefdf381c9280630c
Parents: 209068b
Author: wangxianbin1987 <wa...@gmail.com>
Authored: Sun Apr 17 10:05:44 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Sun Apr 17 21:02:19 2016 +0800
----------------------------------------------------------------------
.../java/org/apache/kylin/cube/CubeManager.java | 12 +-
.../apache/kylin/dict/DictionaryManager.java | 13 +-
.../kylin/job/constant/ExecutableConstants.java | 1 +
.../apache/kylin/metadata/model/TableDesc.java | 33 +++++
.../kylin/engine/mr/BatchCubingJobBuilder2.java | 2 +-
.../apache/kylin/source/hive/HiveMRInput.java | 125 +++++++++++++++----
.../source/hive/HiveSourceTableLoader.java | 6 +
7 files changed, 164 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/66ad1386/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 01b05da..93b113a 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -47,9 +47,7 @@ import org.apache.kylin.metadata.MetadataManager;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.project.ProjectInstance;
import org.apache.kylin.metadata.project.ProjectManager;
-import org.apache.kylin.metadata.project.RealizationEntry;
import org.apache.kylin.metadata.realization.IRealization;
import org.apache.kylin.metadata.realization.IRealizationConstants;
import org.apache.kylin.metadata.realization.IRealizationProvider;
@@ -164,7 +162,7 @@ public class CubeManager implements IRealizationProvider {
return null;
DictionaryManager dictMgr = getDictionaryManager();
- DictionaryInfo dictInfo = dictMgr.buildDictionary(cubeDesc.getModel(),true, col, factTableValueProvider);
+ DictionaryInfo dictInfo = dictMgr.buildDictionary(cubeDesc.getModel(), true, col, factTableValueProvider);
if (dictInfo != null) {
cubeSeg.putDictResPath(col, dictInfo.getResourcePath());
@@ -204,7 +202,13 @@ public class CubeManager implements IRealizationProvider {
MetadataManager metaMgr = getMetadataManager();
SnapshotManager snapshotMgr = getSnapshotManager();
- TableDesc tableDesc = metaMgr.getTableDesc(lookupTable);
+ TableDesc tableDesc = new TableDesc(metaMgr.getTableDesc(lookupTable));
+ if (tableDesc.isSourceTableHiveView()) {
+ tableDesc.setDatabase(config.getHiveDatabaseForIntermediateTable());
+ String tableName = tableDesc.getHiveViewIntermediateTableName();
+ tableDesc.setName(tableName);
+ }
+
ReadableTable hiveTable = SourceFactory.createReadableTable(tableDesc);
SnapshotTable snapshot = snapshotMgr.buildSnapshot(hiveTable, tableDesc);
http://git-wip-us.apache.org/repos/asf/kylin/blob/66ad1386/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
index 015c79f..12e347a 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
@@ -28,6 +28,7 @@ import org.apache.kylin.metadata.datatype.DataType;
import org.apache.kylin.metadata.model.DataModelDesc;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.realization.IRealizationSegment;
import org.apache.kylin.source.ReadableTable;
import org.apache.kylin.source.ReadableTable.TableSignature;
import org.apache.kylin.source.SourceFactory;
@@ -273,8 +274,16 @@ public class DictionaryManager {
if (model.isFactTable(srcTable)) {
inpTable = factTableValueProvider.getDistinctValuesFor(srcCol);
} else {
- TableDesc tableDesc = MetadataManager.getInstance(config).getTableDesc(srcTable);
- inpTable = SourceFactory.createReadableTable(tableDesc);
+ MetadataManager metadataManager = MetadataManager.getInstance(config);
+ TableDesc tableDesc = new TableDesc(metadataManager.getTableDesc(srcTable));
+ if (tableDesc.isSourceTableHiveView()) {
+ tableDesc.setDatabase(config.getHiveDatabaseForIntermediateTable());
+ String tableName = tableDesc.getHiveViewIntermediateTableName();
+ tableDesc.setName(tableName);
+ inpTable = SourceFactory.createReadableTable(tableDesc);
+ } else {
+ inpTable = SourceFactory.createReadableTable(tableDesc);
+ }
}
TableSignature inputSig = inpTable.getSignature();
http://git-wip-us.apache.org/repos/asf/kylin/blob/66ad1386/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
index f619a68..d47d550 100644
--- a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
+++ b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
@@ -36,6 +36,7 @@ public final class ExecutableConstants {
public static final String STEP_NAME_BUILD_DICTIONARY = "Build Dimension Dictionary";
public static final String STEP_NAME_CREATE_FLAT_HIVE_TABLE = "Create Intermediate Flat Hive Table";
+ public static final String STEP_NAME_MATERIALIZE_HIVE_VIEW_IN_LOOKUP = "Materialize Hive View in Lookup Tables";
public static final String STEP_NAME_FACT_DISTINCT_COLUMNS = "Extract Fact Table Distinct Columns";
public static final String STEP_NAME_BUILD_BASE_CUBOID = "Build Base Cuboid Data";
public static final String STEP_NAME_BUILD_IN_MEM_CUBE = "Build Cube";
http://git-wip-us.apache.org/repos/asf/kylin/blob/66ad1386/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
index 65d85dd..33f5d93 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
@@ -42,11 +42,24 @@ public class TableDesc extends RootPersistentEntity implements ISourceAware {
private ColumnDesc[] columns;
@JsonProperty("source_type")
private int sourceType = ISourceAware.ID_HIVE;
+ @JsonProperty("source_table_type")
+ private boolean sourceTableHiveViewFlag = false;
+ @JsonProperty("hive_view__table_name_prefix")
+ private String hiveViewIntermediateTableNamePrefix = "kylin_intermediate_";
private DatabaseDesc database = new DatabaseDesc();
private String identity = null;
+ public TableDesc() {
+ }
+
+ public TableDesc(TableDesc other) {
+ this.name = other.getName();
+ this.columns = other.getColumns();
+ this.database.setName(other.getDatabase());
+ }
+
public ColumnDesc findColumnByName(String name) {
//ignore the db name and table name if exists
int lastIndexOfDot = name.lastIndexOf(".");
@@ -164,6 +177,18 @@ public class TableDesc extends RootPersistentEntity implements ISourceAware {
}
}
+ public void setSourceTableHiveViewFlag(boolean sourceTableHiveViewFlag) {
+ this.sourceTableHiveViewFlag = sourceTableHiveViewFlag;
+ }
+
+ public boolean isSourceTableHiveView(){
+ return sourceTableHiveViewFlag;
+ }
+
+ public String getHiveViewIntermediateTableName() {
+ return hiveViewIntermediateTableNamePrefix + "_" + database.getName() + "_" + name;
+ }
+
@Override
public String toString() {
return "TableDesc [database=" + getDatabase() + " name=" + name + "]";
@@ -184,4 +209,12 @@ public class TableDesc extends RootPersistentEntity implements ISourceAware {
public void setSourceType(int sourceType) {
this.sourceType = sourceType;
}
+
+ public String getHiveViewIntermediateTableNamePrefix() {
+ return hiveViewIntermediateTableNamePrefix;
+ }
+
+ public void setHiveViewIntermediateTableNamePrefix(String hiveViewIntermediateTableNamePrefix) {
+ this.hiveViewIntermediateTableNamePrefix = hiveViewIntermediateTableNamePrefix;
+ }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/66ad1386/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
index a1c9cd9..853eca0 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
@@ -53,7 +53,7 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
final String jobId = result.getId();
final String cuboidRootPath = getCuboidRootPath(jobId);
- // Phase 1: Create Flat Table
+ // Phase 1: Create Flat Table & Materialize Hive View in Lookup Tables
inputSide.addStepPhase1_CreateFlatTable(result);
// Phase 2: Build Dictionary
http://git-wip-us.apache.org/repos/asf/kylin/blob/66ad1386/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index 5242d76..d90ed60 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -23,19 +23,15 @@ import java.io.IOException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.metadata.Hive;
-import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hive.hcatalog.data.HCatRecord;
import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.engine.mr.HadoopUtil;
import org.apache.kylin.engine.mr.IMRInput;
import org.apache.kylin.engine.mr.JobBuilderSupport;
-import org.apache.kylin.invertedindex.IISegment;
-import org.apache.kylin.invertedindex.model.IIJoinedFlatTableDesc;
import org.apache.kylin.job.JoinedFlatTable;
import org.apache.kylin.job.common.ShellExecutable;
import org.apache.kylin.job.constant.ExecutableConstants;
@@ -99,6 +95,7 @@ public class HiveMRInput implements IMRInput {
final JobEngineConfig conf;
final IRealizationSegment seg;
final IJoinedFlatTableDesc flatHiveTableDesc;
+ String hiveViewIntermediateTables = "";
public BatchCubingInputSide(IRealizationSegment seg) {
this.conf = new JobEngineConfig(KylinConfig.getInstanceFromEnv());
@@ -109,6 +106,10 @@ public class HiveMRInput implements IMRInput {
@Override
public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) {
jobFlow.addTask(createFlatHiveTableStep(conf, flatHiveTableDesc, jobFlow.getId()));
+ AbstractExecutable task = createLookupHiveViewMaterializationStep(jobFlow.getId());
+ if(task != null) {
+ jobFlow.addTask(task);
+ }
}
public static AbstractExecutable createFlatHiveTableStep(JobEngineConfig conf, IJoinedFlatTableDesc flatTableDesc, String jobId) {
@@ -137,12 +138,53 @@ public class HiveMRInput implements IMRInput {
return step;
}
+
+ public ShellExecutable createLookupHiveViewMaterializationStep(String jobId) {
+ boolean findHiveViewLookUpTable = false;
+ ShellExecutable step = new ShellExecutable();;
+ step.setName(ExecutableConstants.STEP_NAME_MATERIALIZE_HIVE_VIEW_IN_LOOKUP);
+ HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
+
+ KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+ CubeManager cubeMgr = CubeManager.getInstance(kylinConfig);
+ String cubeName = seg.getRealization().getName();
+ CubeDesc cubeDesc = cubeMgr.getCube(cubeName).getDescriptor();
+
+ final String useDatabaseHql = "USE " + conf.getConfig().getHiveDatabaseForIntermediateTable() + ";";
+ hiveCmdBuilder.addStatement(useDatabaseHql);
+ for(TableDesc lookUpTableDesc : cubeDesc.getLookupTableDescs()) {
+ if (lookUpTableDesc.isSourceTableHiveView()) {
+ findHiveViewLookUpTable = true;
+ lookUpTableDesc.setHiveViewIntermediateTableNamePrefix("kylin_intermediate_" + jobId);
+ StringBuilder createIntermediateTableHql = new StringBuilder();
+ createIntermediateTableHql.append("CREATE TABLE IF NOT EXISTS " +
+ lookUpTableDesc.getHiveViewIntermediateTableName() + "\n");
+ createIntermediateTableHql.append("LOCATION '" + JobBuilderSupport.getJobWorkingDir(conf, jobId) + "/" +
+ lookUpTableDesc.getHiveViewIntermediateTableName() + "'\n");
+ createIntermediateTableHql.append("AS SELECT * FROM " + lookUpTableDesc.getIdentity() + ";\n");
+ hiveCmdBuilder.addStatement(createIntermediateTableHql.toString());
+ hiveViewIntermediateTables = hiveViewIntermediateTables + lookUpTableDesc.getHiveViewIntermediateTableName() + ";";
+ }
+ if (findHiveViewLookUpTable) {
+ hiveViewIntermediateTables= hiveViewIntermediateTables.substring(0, hiveViewIntermediateTables.length()-1);
+ }
+ }
+
+ if(findHiveViewLookUpTable) {
+ step.setCmd(hiveCmdBuilder.build());
+ return step;
+ } else {
+ return null;
+ }
+ }
+
@Override
public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
GarbageCollectionStep step = new GarbageCollectionStep();
step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION);
step.setIntermediateTableIdentity(getIntermediateTableIdentity());
step.setExternalDataPath(JoinedFlatTable.getTableDir(flatHiveTableDesc, JobBuilderSupport.getJobWorkingDir(conf, jobFlow.getId())));
+ step.setHiveViewIntermediateTableIdentitys(hiveViewIntermediateTables);
jobFlow.addTask(step);
}
@@ -161,29 +203,62 @@ public class HiveMRInput implements IMRInput {
protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
KylinConfig config = context.getConfig();
StringBuffer output = new StringBuffer();
+ try {
+ output.append(cleanUpIntermediateFlatTable(config));
+ output.append(cleanUpHiveViewIntermediateTable(config));
+ } catch (IOException e) {
+ logger.error("job:" + getId() + " execute finished with exception", e);
+ return new ExecuteResult(ExecuteResult.State.ERROR, e.getMessage());
+ }
+
+ return new ExecuteResult(ExecuteResult.State.SUCCEED, output.toString());
+ }
+ private String cleanUpIntermediateFlatTable(KylinConfig config) throws IOException {
+ StringBuffer output = new StringBuffer();
final String hiveTable = this.getIntermediateTableIdentity();
if (config.isHiveKeepFlatTable() == false && StringUtils.isNotEmpty(hiveTable)) {
final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
hiveCmdBuilder.addStatement("USE " + config.getHiveDatabaseForIntermediateTable() + ";");
hiveCmdBuilder.addStatement("DROP TABLE IF EXISTS " + hiveTable + ";");
- try {
- config.getCliCommandExecutor().execute(hiveCmdBuilder.build());
- output.append("Hive table " + hiveTable + " is dropped. \n");
-
- Path externalDataPath = new Path(getExternalDataPath());
- FileSystem fs = FileSystem.get(externalDataPath.toUri(), HadoopUtil.getCurrentConfiguration());
- if (fs.exists(externalDataPath)) {
- fs.delete(externalDataPath, true);
- output.append("Hive table " + hiveTable + " external data path " + externalDataPath + " is deleted. \n");
- }
- } catch (IOException e) {
- logger.error("job:" + getId() + " execute finished with exception", e);
- return new ExecuteResult(ExecuteResult.State.ERROR, e.getMessage());
- }
+
+ config.getCliCommandExecutor().execute(hiveCmdBuilder.build());
+ output.append("Hive table " + hiveTable + " is dropped. \n");
+
+ rmdirOnHDFS(getExternalDataPath());
+ output.append("Hive table " + hiveTable + " external data path " + getExternalDataPath() + " is deleted. \n");
+ }
+ return output.toString();
+ }
+
+ private void mkdirOnHDFS(String path) throws IOException {
+ Path externalDataPath = new Path(path);
+ FileSystem fs = FileSystem.get(externalDataPath.toUri(), HadoopUtil.getCurrentConfiguration());
+ if (!fs.exists(externalDataPath)) {
+ fs.mkdirs(externalDataPath);
}
+ }
- return new ExecuteResult(ExecuteResult.State.SUCCEED, output.toString());
+ private void rmdirOnHDFS(String path) throws IOException {
+ Path externalDataPath = new Path(path);
+ FileSystem fs = FileSystem.get(externalDataPath.toUri(), HadoopUtil.getCurrentConfiguration());
+ if (fs.exists(externalDataPath)) {
+ fs.delete(externalDataPath, true);
+ }
+ }
+
+ private String cleanUpHiveViewIntermediateTable(KylinConfig config) throws IOException {
+ StringBuffer output = new StringBuffer();
+ final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
+ hiveCmdBuilder.addStatement("USE " + config.getHiveDatabaseForIntermediateTable() + ";");
+ if (!getHiveViewIntermediateTableIdentitys().isEmpty()) {
+ for(String hiveTableName : getHiveViewIntermediateTableIdentitys().split(";")) {
+ hiveCmdBuilder.addStatement("DROP TABLE IF EXISTS " + hiveTableName + ";");
+ }
+ }
+ config.getCliCommandExecutor().execute(hiveCmdBuilder.build());
+ output.append("hive view intermediate tables: " + getHiveViewIntermediateTableIdentitys() + " is dropped. \n");
+ return output.toString();
}
public void setIntermediateTableIdentity(String tableIdentity) {
@@ -201,6 +276,14 @@ public class HiveMRInput implements IMRInput {
private String getExternalDataPath() {
return getParam("externalDataPath");
}
+
+ public void setHiveViewIntermediateTableIdentitys(String tableIdentitys) {
+ setParam("oldHiveViewIntermediateTables", tableIdentitys);
+ }
+
+ private String getHiveViewIntermediateTableIdentitys() {
+ return getParam("oldHiveViewIntermediateTables");
+ }
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/66ad1386/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java
index 2aef4e6..6860f91 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java
@@ -25,6 +25,7 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.kylin.common.KylinConfig;
@@ -116,6 +117,11 @@ public class HiveSourceTableLoader {
tableDesc.setUuid(UUID.randomUUID().toString());
tableDesc.setLastModified(0);
}
+ if(table.getTableType().equals(TableType.VIRTUAL_VIEW.toString())) {
+ tableDesc.setSourceTableHiveViewFlag(true);
+ } else {
+ tableDesc.setSourceTableHiveViewFlag(false);
+ }
int columnNumber = fields.size();
List<ColumnDesc> columns = new ArrayList<ColumnDesc>(columnNumber);