You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/04/17 15:32:54 UTC
[2/2] kylin git commit: KYLIN-1077 Support Hive View as Lookup Table
KYLIN-1077 Support Hive View as Lookup Table
Signed-off-by: wangxianbin1987 <wa...@gmail.com>
Signed-off-by: shaofengshi <sh...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/66ad1386
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/66ad1386
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/66ad1386
Branch: refs/heads/KYLIN-1077
Commit: 66ad1386041abc65a56d393aefdf381c9280630c
Parents: 209068b
Author: wangxianbin1987 <wa...@gmail.com>
Authored: Sun Apr 17 10:05:44 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Sun Apr 17 21:02:19 2016 +0800
----------------------------------------------------------------------
.../java/org/apache/kylin/cube/CubeManager.java | 12 +-
.../apache/kylin/dict/DictionaryManager.java | 13 +-
.../kylin/job/constant/ExecutableConstants.java | 1 +
.../apache/kylin/metadata/model/TableDesc.java | 33 +++++
.../kylin/engine/mr/BatchCubingJobBuilder2.java | 2 +-
.../apache/kylin/source/hive/HiveMRInput.java | 125 +++++++++++++++----
.../source/hive/HiveSourceTableLoader.java | 6 +
7 files changed, 164 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/66ad1386/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 01b05da..93b113a 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -47,9 +47,7 @@ import org.apache.kylin.metadata.MetadataManager;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.project.ProjectInstance;
import org.apache.kylin.metadata.project.ProjectManager;
-import org.apache.kylin.metadata.project.RealizationEntry;
import org.apache.kylin.metadata.realization.IRealization;
import org.apache.kylin.metadata.realization.IRealizationConstants;
import org.apache.kylin.metadata.realization.IRealizationProvider;
@@ -164,7 +162,7 @@ public class CubeManager implements IRealizationProvider {
return null;
DictionaryManager dictMgr = getDictionaryManager();
- DictionaryInfo dictInfo = dictMgr.buildDictionary(cubeDesc.getModel(),true, col, factTableValueProvider);
+ DictionaryInfo dictInfo = dictMgr.buildDictionary(cubeDesc.getModel(), true, col, factTableValueProvider);
if (dictInfo != null) {
cubeSeg.putDictResPath(col, dictInfo.getResourcePath());
@@ -204,7 +202,13 @@ public class CubeManager implements IRealizationProvider {
MetadataManager metaMgr = getMetadataManager();
SnapshotManager snapshotMgr = getSnapshotManager();
- TableDesc tableDesc = metaMgr.getTableDesc(lookupTable);
+ TableDesc tableDesc = new TableDesc(metaMgr.getTableDesc(lookupTable));
+ if (tableDesc.isSourceTableHiveView()) {
+ tableDesc.setDatabase(config.getHiveDatabaseForIntermediateTable());
+ String tableName = tableDesc.getHiveViewIntermediateTableName();
+ tableDesc.setName(tableName);
+ }
+
ReadableTable hiveTable = SourceFactory.createReadableTable(tableDesc);
SnapshotTable snapshot = snapshotMgr.buildSnapshot(hiveTable, tableDesc);
http://git-wip-us.apache.org/repos/asf/kylin/blob/66ad1386/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
index 015c79f..12e347a 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
@@ -28,6 +28,7 @@ import org.apache.kylin.metadata.datatype.DataType;
import org.apache.kylin.metadata.model.DataModelDesc;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.realization.IRealizationSegment;
import org.apache.kylin.source.ReadableTable;
import org.apache.kylin.source.ReadableTable.TableSignature;
import org.apache.kylin.source.SourceFactory;
@@ -273,8 +274,16 @@ public class DictionaryManager {
if (model.isFactTable(srcTable)) {
inpTable = factTableValueProvider.getDistinctValuesFor(srcCol);
} else {
- TableDesc tableDesc = MetadataManager.getInstance(config).getTableDesc(srcTable);
- inpTable = SourceFactory.createReadableTable(tableDesc);
+ MetadataManager metadataManager = MetadataManager.getInstance(config);
+ TableDesc tableDesc = new TableDesc(metadataManager.getTableDesc(srcTable));
+ if (tableDesc.isSourceTableHiveView()) {
+ tableDesc.setDatabase(config.getHiveDatabaseForIntermediateTable());
+ String tableName = tableDesc.getHiveViewIntermediateTableName();
+ tableDesc.setName(tableName);
+ inpTable = SourceFactory.createReadableTable(tableDesc);
+ } else {
+ inpTable = SourceFactory.createReadableTable(tableDesc);
+ }
}
TableSignature inputSig = inpTable.getSignature();
http://git-wip-us.apache.org/repos/asf/kylin/blob/66ad1386/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
index f619a68..d47d550 100644
--- a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
+++ b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
@@ -36,6 +36,7 @@ public final class ExecutableConstants {
public static final String STEP_NAME_BUILD_DICTIONARY = "Build Dimension Dictionary";
public static final String STEP_NAME_CREATE_FLAT_HIVE_TABLE = "Create Intermediate Flat Hive Table";
+ public static final String STEP_NAME_MATERIALIZE_HIVE_VIEW_IN_LOOKUP = "Materialize Hive View in Lookup Tables";
public static final String STEP_NAME_FACT_DISTINCT_COLUMNS = "Extract Fact Table Distinct Columns";
public static final String STEP_NAME_BUILD_BASE_CUBOID = "Build Base Cuboid Data";
public static final String STEP_NAME_BUILD_IN_MEM_CUBE = "Build Cube";
http://git-wip-us.apache.org/repos/asf/kylin/blob/66ad1386/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
index 65d85dd..33f5d93 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
@@ -42,11 +42,24 @@ public class TableDesc extends RootPersistentEntity implements ISourceAware {
private ColumnDesc[] columns;
@JsonProperty("source_type")
private int sourceType = ISourceAware.ID_HIVE;
+ @JsonProperty("source_table_type")
+ private boolean sourceTableHiveViewFlag = false;
+ @JsonProperty("hive_view__table_name_prefix")
+ private String hiveViewIntermediateTableNamePrefix = "kylin_intermediate_";
private DatabaseDesc database = new DatabaseDesc();
private String identity = null;
+ public TableDesc() {
+ }
+
+ public TableDesc(TableDesc other) {
+ this.name = other.getName();
+ this.columns = other.getColumns();
+ this.database.setName(other.getDatabase());
+ }
+
public ColumnDesc findColumnByName(String name) {
//ignore the db name and table name if exists
int lastIndexOfDot = name.lastIndexOf(".");
@@ -164,6 +177,18 @@ public class TableDesc extends RootPersistentEntity implements ISourceAware {
}
}
+ public void setSourceTableHiveViewFlag(boolean sourceTableHiveViewFlag) {
+ this.sourceTableHiveViewFlag = sourceTableHiveViewFlag;
+ }
+
+ public boolean isSourceTableHiveView(){
+ return sourceTableHiveViewFlag;
+ }
+
+ public String getHiveViewIntermediateTableName() {
+ return hiveViewIntermediateTableNamePrefix + "_" + database.getName() + "_" + name;
+ }
+
@Override
public String toString() {
return "TableDesc [database=" + getDatabase() + " name=" + name + "]";
@@ -184,4 +209,12 @@ public class TableDesc extends RootPersistentEntity implements ISourceAware {
public void setSourceType(int sourceType) {
this.sourceType = sourceType;
}
+
+ public String getHiveViewIntermediateTableNamePrefix() {
+ return hiveViewIntermediateTableNamePrefix;
+ }
+
+ public void setHiveViewIntermediateTableNamePrefix(String hiveViewIntermediateTableNamePrefix) {
+ this.hiveViewIntermediateTableNamePrefix = hiveViewIntermediateTableNamePrefix;
+ }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/66ad1386/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
index a1c9cd9..853eca0 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
@@ -53,7 +53,7 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
final String jobId = result.getId();
final String cuboidRootPath = getCuboidRootPath(jobId);
- // Phase 1: Create Flat Table
+ // Phase 1: Create Flat Table & Materialize Hive View in Lookup Tables
inputSide.addStepPhase1_CreateFlatTable(result);
// Phase 2: Build Dictionary
http://git-wip-us.apache.org/repos/asf/kylin/blob/66ad1386/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index 5242d76..d90ed60 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -23,19 +23,15 @@ import java.io.IOException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.metadata.Hive;
-import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hive.hcatalog.data.HCatRecord;
import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.engine.mr.HadoopUtil;
import org.apache.kylin.engine.mr.IMRInput;
import org.apache.kylin.engine.mr.JobBuilderSupport;
-import org.apache.kylin.invertedindex.IISegment;
-import org.apache.kylin.invertedindex.model.IIJoinedFlatTableDesc;
import org.apache.kylin.job.JoinedFlatTable;
import org.apache.kylin.job.common.ShellExecutable;
import org.apache.kylin.job.constant.ExecutableConstants;
@@ -99,6 +95,7 @@ public class HiveMRInput implements IMRInput {
final JobEngineConfig conf;
final IRealizationSegment seg;
final IJoinedFlatTableDesc flatHiveTableDesc;
+ String hiveViewIntermediateTables = "";
public BatchCubingInputSide(IRealizationSegment seg) {
this.conf = new JobEngineConfig(KylinConfig.getInstanceFromEnv());
@@ -109,6 +106,10 @@ public class HiveMRInput implements IMRInput {
@Override
public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) {
jobFlow.addTask(createFlatHiveTableStep(conf, flatHiveTableDesc, jobFlow.getId()));
+ AbstractExecutable task = createLookupHiveViewMaterializationStep(jobFlow.getId());
+ if(task != null) {
+ jobFlow.addTask(task);
+ }
}
public static AbstractExecutable createFlatHiveTableStep(JobEngineConfig conf, IJoinedFlatTableDesc flatTableDesc, String jobId) {
@@ -137,12 +138,53 @@ public class HiveMRInput implements IMRInput {
return step;
}
+
+ public ShellExecutable createLookupHiveViewMaterializationStep(String jobId) {
+ boolean findHiveViewLookUpTable = false;
+ ShellExecutable step = new ShellExecutable();;
+ step.setName(ExecutableConstants.STEP_NAME_MATERIALIZE_HIVE_VIEW_IN_LOOKUP);
+ HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
+
+ KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+ CubeManager cubeMgr = CubeManager.getInstance(kylinConfig);
+ String cubeName = seg.getRealization().getName();
+ CubeDesc cubeDesc = cubeMgr.getCube(cubeName).getDescriptor();
+
+ final String useDatabaseHql = "USE " + conf.getConfig().getHiveDatabaseForIntermediateTable() + ";";
+ hiveCmdBuilder.addStatement(useDatabaseHql);
+ for(TableDesc lookUpTableDesc : cubeDesc.getLookupTableDescs()) {
+ if (lookUpTableDesc.isSourceTableHiveView()) {
+ findHiveViewLookUpTable = true;
+ lookUpTableDesc.setHiveViewIntermediateTableNamePrefix("kylin_intermediate_" + jobId);
+ StringBuilder createIntermediateTableHql = new StringBuilder();
+ createIntermediateTableHql.append("CREATE TABLE IF NOT EXISTS " +
+ lookUpTableDesc.getHiveViewIntermediateTableName() + "\n");
+ createIntermediateTableHql.append("LOCATION '" + JobBuilderSupport.getJobWorkingDir(conf, jobId) + "/" +
+ lookUpTableDesc.getHiveViewIntermediateTableName() + "'\n");
+ createIntermediateTableHql.append("AS SELECT * FROM " + lookUpTableDesc.getIdentity() + ";\n");
+ hiveCmdBuilder.addStatement(createIntermediateTableHql.toString());
+ hiveViewIntermediateTables = hiveViewIntermediateTables + lookUpTableDesc.getHiveViewIntermediateTableName() + ";";
+ }
+ if (findHiveViewLookUpTable) {
+ hiveViewIntermediateTables= hiveViewIntermediateTables.substring(0, hiveViewIntermediateTables.length()-1);
+ }
+ }
+
+ if(findHiveViewLookUpTable) {
+ step.setCmd(hiveCmdBuilder.build());
+ return step;
+ } else {
+ return null;
+ }
+ }
+
@Override
public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
GarbageCollectionStep step = new GarbageCollectionStep();
step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION);
step.setIntermediateTableIdentity(getIntermediateTableIdentity());
step.setExternalDataPath(JoinedFlatTable.getTableDir(flatHiveTableDesc, JobBuilderSupport.getJobWorkingDir(conf, jobFlow.getId())));
+ step.setHiveViewIntermediateTableIdentitys(hiveViewIntermediateTables);
jobFlow.addTask(step);
}
@@ -161,29 +203,62 @@ public class HiveMRInput implements IMRInput {
protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
KylinConfig config = context.getConfig();
StringBuffer output = new StringBuffer();
+ try {
+ output.append(cleanUpIntermediateFlatTable(config));
+ output.append(cleanUpHiveViewIntermediateTable(config));
+ } catch (IOException e) {
+ logger.error("job:" + getId() + " execute finished with exception", e);
+ return new ExecuteResult(ExecuteResult.State.ERROR, e.getMessage());
+ }
+
+ return new ExecuteResult(ExecuteResult.State.SUCCEED, output.toString());
+ }
+ private String cleanUpIntermediateFlatTable(KylinConfig config) throws IOException {
+ StringBuffer output = new StringBuffer();
final String hiveTable = this.getIntermediateTableIdentity();
if (config.isHiveKeepFlatTable() == false && StringUtils.isNotEmpty(hiveTable)) {
final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
hiveCmdBuilder.addStatement("USE " + config.getHiveDatabaseForIntermediateTable() + ";");
hiveCmdBuilder.addStatement("DROP TABLE IF EXISTS " + hiveTable + ";");
- try {
- config.getCliCommandExecutor().execute(hiveCmdBuilder.build());
- output.append("Hive table " + hiveTable + " is dropped. \n");
-
- Path externalDataPath = new Path(getExternalDataPath());
- FileSystem fs = FileSystem.get(externalDataPath.toUri(), HadoopUtil.getCurrentConfiguration());
- if (fs.exists(externalDataPath)) {
- fs.delete(externalDataPath, true);
- output.append("Hive table " + hiveTable + " external data path " + externalDataPath + " is deleted. \n");
- }
- } catch (IOException e) {
- logger.error("job:" + getId() + " execute finished with exception", e);
- return new ExecuteResult(ExecuteResult.State.ERROR, e.getMessage());
- }
+
+ config.getCliCommandExecutor().execute(hiveCmdBuilder.build());
+ output.append("Hive table " + hiveTable + " is dropped. \n");
+
+ rmdirOnHDFS(getExternalDataPath());
+ output.append("Hive table " + hiveTable + " external data path " + getExternalDataPath() + " is deleted. \n");
+ }
+ return output.toString();
+ }
+
+ private void mkdirOnHDFS(String path) throws IOException {
+ Path externalDataPath = new Path(path);
+ FileSystem fs = FileSystem.get(externalDataPath.toUri(), HadoopUtil.getCurrentConfiguration());
+ if (!fs.exists(externalDataPath)) {
+ fs.mkdirs(externalDataPath);
}
+ }
- return new ExecuteResult(ExecuteResult.State.SUCCEED, output.toString());
+ private void rmdirOnHDFS(String path) throws IOException {
+ Path externalDataPath = new Path(path);
+ FileSystem fs = FileSystem.get(externalDataPath.toUri(), HadoopUtil.getCurrentConfiguration());
+ if (fs.exists(externalDataPath)) {
+ fs.delete(externalDataPath, true);
+ }
+ }
+
+ private String cleanUpHiveViewIntermediateTable(KylinConfig config) throws IOException {
+ StringBuffer output = new StringBuffer();
+ final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
+ hiveCmdBuilder.addStatement("USE " + config.getHiveDatabaseForIntermediateTable() + ";");
+ if (!getHiveViewIntermediateTableIdentitys().isEmpty()) {
+ for(String hiveTableName : getHiveViewIntermediateTableIdentitys().split(";")) {
+ hiveCmdBuilder.addStatement("DROP TABLE IF EXISTS " + hiveTableName + ";");
+ }
+ }
+ config.getCliCommandExecutor().execute(hiveCmdBuilder.build());
+ output.append("hive view intermediate tables: " + getHiveViewIntermediateTableIdentitys() + " is dropped. \n");
+ return output.toString();
}
public void setIntermediateTableIdentity(String tableIdentity) {
@@ -201,6 +276,14 @@ public class HiveMRInput implements IMRInput {
private String getExternalDataPath() {
return getParam("externalDataPath");
}
+
+ public void setHiveViewIntermediateTableIdentitys(String tableIdentitys) {
+ setParam("oldHiveViewIntermediateTables", tableIdentitys);
+ }
+
+ private String getHiveViewIntermediateTableIdentitys() {
+ return getParam("oldHiveViewIntermediateTables");
+ }
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/66ad1386/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java
index 2aef4e6..6860f91 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java
@@ -25,6 +25,7 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.kylin.common.KylinConfig;
@@ -116,6 +117,11 @@ public class HiveSourceTableLoader {
tableDesc.setUuid(UUID.randomUUID().toString());
tableDesc.setLastModified(0);
}
+ if(table.getTableType().equals(TableType.VIRTUAL_VIEW.toString())) {
+ tableDesc.setSourceTableHiveViewFlag(true);
+ } else {
+ tableDesc.setSourceTableHiveViewFlag(false);
+ }
int columnNumber = fields.size();
List<ColumnDesc> columns = new ArrayList<ColumnDesc>(columnNumber);