You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/04/17 15:32:54 UTC

[2/2] kylin git commit: KYLIN-1077 Support Hive View as Lookup Table

KYLIN-1077 Support Hive View as Lookup Table

Signed-off-by: wangxianbin1987 <wa...@gmail.com>
Signed-off-by: shaofengshi <sh...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/66ad1386
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/66ad1386
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/66ad1386

Branch: refs/heads/KYLIN-1077
Commit: 66ad1386041abc65a56d393aefdf381c9280630c
Parents: 209068b
Author: wangxianbin1987 <wa...@gmail.com>
Authored: Sun Apr 17 10:05:44 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Sun Apr 17 21:02:19 2016 +0800

----------------------------------------------------------------------
 .../java/org/apache/kylin/cube/CubeManager.java |  12 +-
 .../apache/kylin/dict/DictionaryManager.java    |  13 +-
 .../kylin/job/constant/ExecutableConstants.java |   1 +
 .../apache/kylin/metadata/model/TableDesc.java  |  33 +++++
 .../kylin/engine/mr/BatchCubingJobBuilder2.java |   2 +-
 .../apache/kylin/source/hive/HiveMRInput.java   | 125 +++++++++++++++----
 .../source/hive/HiveSourceTableLoader.java      |   6 +
 7 files changed, 164 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/66ad1386/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 01b05da..93b113a 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -47,9 +47,7 @@ import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.metadata.project.ProjectManager;
-import org.apache.kylin.metadata.project.RealizationEntry;
 import org.apache.kylin.metadata.realization.IRealization;
 import org.apache.kylin.metadata.realization.IRealizationConstants;
 import org.apache.kylin.metadata.realization.IRealizationProvider;
@@ -164,7 +162,7 @@ public class CubeManager implements IRealizationProvider {
             return null;
 
         DictionaryManager dictMgr = getDictionaryManager();
-        DictionaryInfo dictInfo = dictMgr.buildDictionary(cubeDesc.getModel(),true, col, factTableValueProvider);
+        DictionaryInfo dictInfo = dictMgr.buildDictionary(cubeDesc.getModel(), true, col, factTableValueProvider);
 
         if (dictInfo != null) {
             cubeSeg.putDictResPath(col, dictInfo.getResourcePath());
@@ -204,7 +202,13 @@ public class CubeManager implements IRealizationProvider {
         MetadataManager metaMgr = getMetadataManager();
         SnapshotManager snapshotMgr = getSnapshotManager();
 
-        TableDesc tableDesc = metaMgr.getTableDesc(lookupTable);
+        TableDesc tableDesc = new TableDesc(metaMgr.getTableDesc(lookupTable));
+        if (tableDesc.isSourceTableHiveView()) {
+            tableDesc.setDatabase(config.getHiveDatabaseForIntermediateTable());
+            String tableName = tableDesc.getHiveViewIntermediateTableName();
+            tableDesc.setName(tableName);
+        }
+
         ReadableTable hiveTable = SourceFactory.createReadableTable(tableDesc);
         SnapshotTable snapshot = snapshotMgr.buildSnapshot(hiveTable, tableDesc);
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/66ad1386/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
index 015c79f..12e347a 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
@@ -28,6 +28,7 @@ import org.apache.kylin.metadata.datatype.DataType;
 import org.apache.kylin.metadata.model.DataModelDesc;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.realization.IRealizationSegment;
 import org.apache.kylin.source.ReadableTable;
 import org.apache.kylin.source.ReadableTable.TableSignature;
 import org.apache.kylin.source.SourceFactory;
@@ -273,8 +274,16 @@ public class DictionaryManager {
         if (model.isFactTable(srcTable)) {
             inpTable = factTableValueProvider.getDistinctValuesFor(srcCol);
         } else {
-            TableDesc tableDesc = MetadataManager.getInstance(config).getTableDesc(srcTable);
-            inpTable = SourceFactory.createReadableTable(tableDesc);
+            MetadataManager metadataManager = MetadataManager.getInstance(config);
+            TableDesc tableDesc = new TableDesc(metadataManager.getTableDesc(srcTable));
+            if (tableDesc.isSourceTableHiveView()) {
+                tableDesc.setDatabase(config.getHiveDatabaseForIntermediateTable());
+                String tableName = tableDesc.getHiveViewIntermediateTableName();
+                tableDesc.setName(tableName);
+                inpTable = SourceFactory.createReadableTable(tableDesc);
+            } else {
+                inpTable = SourceFactory.createReadableTable(tableDesc);
+            }
         }
 
         TableSignature inputSig = inpTable.getSignature();

http://git-wip-us.apache.org/repos/asf/kylin/blob/66ad1386/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
index f619a68..d47d550 100644
--- a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
+++ b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
@@ -36,6 +36,7 @@ public final class ExecutableConstants {
 
     public static final String STEP_NAME_BUILD_DICTIONARY = "Build Dimension Dictionary";
     public static final String STEP_NAME_CREATE_FLAT_HIVE_TABLE = "Create Intermediate Flat Hive Table";
+    public static final String STEP_NAME_MATERIALIZE_HIVE_VIEW_IN_LOOKUP = "Materialize Hive View in Lookup Tables";
     public static final String STEP_NAME_FACT_DISTINCT_COLUMNS = "Extract Fact Table Distinct Columns";
     public static final String STEP_NAME_BUILD_BASE_CUBOID = "Build Base Cuboid Data";
     public static final String STEP_NAME_BUILD_IN_MEM_CUBE = "Build Cube";

http://git-wip-us.apache.org/repos/asf/kylin/blob/66ad1386/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
index 65d85dd..33f5d93 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
@@ -42,11 +42,24 @@ public class TableDesc extends RootPersistentEntity implements ISourceAware {
     private ColumnDesc[] columns;
     @JsonProperty("source_type")
     private int sourceType = ISourceAware.ID_HIVE;
+    @JsonProperty("source_table_type")
+    private boolean sourceTableHiveViewFlag =  false;
+    @JsonProperty("hive_view__table_name_prefix")
+    private String hiveViewIntermediateTableNamePrefix = "kylin_intermediate_";
 
     private DatabaseDesc database = new DatabaseDesc();
 
     private String identity = null;
 
+    public TableDesc() {
+    }
+
+    public TableDesc(TableDesc other) {
+        this.name = other.getName();
+        this.columns = other.getColumns();
+        this.database.setName(other.getDatabase());
+    }
+
     public ColumnDesc findColumnByName(String name) {
         //ignore the db name and table name if exists
         int lastIndexOfDot = name.lastIndexOf(".");
@@ -164,6 +177,18 @@ public class TableDesc extends RootPersistentEntity implements ISourceAware {
         }
     }
 
+    public void setSourceTableHiveViewFlag(boolean sourceTableHiveViewFlag) {
+        this.sourceTableHiveViewFlag = sourceTableHiveViewFlag;
+    }
+
+    public boolean isSourceTableHiveView(){
+        return sourceTableHiveViewFlag;
+    }
+
+    public String getHiveViewIntermediateTableName() {
+        return hiveViewIntermediateTableNamePrefix + "_" + database.getName() + "_" + name;
+    }
+
     @Override
     public String toString() {
         return "TableDesc [database=" + getDatabase() + " name=" + name + "]";
@@ -184,4 +209,12 @@ public class TableDesc extends RootPersistentEntity implements ISourceAware {
     public void setSourceType(int sourceType) {
         this.sourceType = sourceType;
     }
+
+    public String getHiveViewIntermediateTableNamePrefix() {
+        return hiveViewIntermediateTableNamePrefix;
+    }
+
+    public void setHiveViewIntermediateTableNamePrefix(String hiveViewIntermediateTableNamePrefix) {
+        this.hiveViewIntermediateTableNamePrefix = hiveViewIntermediateTableNamePrefix;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/66ad1386/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
index a1c9cd9..853eca0 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
@@ -53,7 +53,7 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
         final String jobId = result.getId();
         final String cuboidRootPath = getCuboidRootPath(jobId);
 
-        // Phase 1: Create Flat Table
+        // Phase 1: Create Flat Table & Materialize Hive View in Lookup Tables
         inputSide.addStepPhase1_CreateFlatTable(result);
 
         // Phase 2: Build Dictionary

http://git-wip-us.apache.org/repos/asf/kylin/blob/66ad1386/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index 5242d76..d90ed60 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -23,19 +23,15 @@ import java.io.IOException;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.metadata.Hive;
-import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hive.hcatalog.data.HCatRecord;
 import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.engine.mr.HadoopUtil;
 import org.apache.kylin.engine.mr.IMRInput;
 import org.apache.kylin.engine.mr.JobBuilderSupport;
-import org.apache.kylin.invertedindex.IISegment;
-import org.apache.kylin.invertedindex.model.IIJoinedFlatTableDesc;
 import org.apache.kylin.job.JoinedFlatTable;
 import org.apache.kylin.job.common.ShellExecutable;
 import org.apache.kylin.job.constant.ExecutableConstants;
@@ -99,6 +95,7 @@ public class HiveMRInput implements IMRInput {
         final JobEngineConfig conf;
         final IRealizationSegment seg;
         final IJoinedFlatTableDesc flatHiveTableDesc;
+        String hiveViewIntermediateTables = "";
 
         public BatchCubingInputSide(IRealizationSegment seg) {
             this.conf = new JobEngineConfig(KylinConfig.getInstanceFromEnv());
@@ -109,6 +106,10 @@ public class HiveMRInput implements IMRInput {
         @Override
         public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) {
             jobFlow.addTask(createFlatHiveTableStep(conf, flatHiveTableDesc, jobFlow.getId()));
+            AbstractExecutable task = createLookupHiveViewMaterializationStep(jobFlow.getId());
+            if(task != null) {
+                jobFlow.addTask(task);
+            }
         }
 
         public static AbstractExecutable createFlatHiveTableStep(JobEngineConfig conf, IJoinedFlatTableDesc flatTableDesc, String jobId) {
@@ -137,12 +138,53 @@ public class HiveMRInput implements IMRInput {
             return step;
         }
 
+
+        public ShellExecutable createLookupHiveViewMaterializationStep(String jobId) {
+            boolean findHiveViewLookUpTable = false;
+            ShellExecutable step = new ShellExecutable();;
+            step.setName(ExecutableConstants.STEP_NAME_MATERIALIZE_HIVE_VIEW_IN_LOOKUP);
+            HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
+
+            KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+            CubeManager cubeMgr = CubeManager.getInstance(kylinConfig);
+            String cubeName = seg.getRealization().getName();
+            CubeDesc cubeDesc = cubeMgr.getCube(cubeName).getDescriptor();
+
+            final String useDatabaseHql = "USE " + conf.getConfig().getHiveDatabaseForIntermediateTable() + ";";
+            hiveCmdBuilder.addStatement(useDatabaseHql);
+            for(TableDesc lookUpTableDesc : cubeDesc.getLookupTableDescs()) {
+                if (lookUpTableDesc.isSourceTableHiveView()) {
+                    findHiveViewLookUpTable = true;
+                    lookUpTableDesc.setHiveViewIntermediateTableNamePrefix("kylin_intermediate_" + jobId);
+                    StringBuilder createIntermediateTableHql = new StringBuilder();
+                    createIntermediateTableHql.append("CREATE TABLE IF NOT EXISTS " +
+                            lookUpTableDesc.getHiveViewIntermediateTableName() + "\n");
+                    createIntermediateTableHql.append("LOCATION '" + JobBuilderSupport.getJobWorkingDir(conf, jobId) + "/" +
+                            lookUpTableDesc.getHiveViewIntermediateTableName() + "'\n");
+                    createIntermediateTableHql.append("AS SELECT * FROM " + lookUpTableDesc.getIdentity() + ";\n");
+                    hiveCmdBuilder.addStatement(createIntermediateTableHql.toString());
+                    hiveViewIntermediateTables = hiveViewIntermediateTables + lookUpTableDesc.getHiveViewIntermediateTableName() + ";";
+                }
+                if (findHiveViewLookUpTable) {
+                    hiveViewIntermediateTables= hiveViewIntermediateTables.substring(0, hiveViewIntermediateTables.length()-1);
+                }
+            }
+
+            if(findHiveViewLookUpTable) {
+                step.setCmd(hiveCmdBuilder.build());
+                return step;
+            } else {
+                return null;
+            }
+        }
+
         @Override
         public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
             GarbageCollectionStep step = new GarbageCollectionStep();
             step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION);
             step.setIntermediateTableIdentity(getIntermediateTableIdentity());
             step.setExternalDataPath(JoinedFlatTable.getTableDir(flatHiveTableDesc, JobBuilderSupport.getJobWorkingDir(conf, jobFlow.getId())));
+            step.setHiveViewIntermediateTableIdentitys(hiveViewIntermediateTables);
             jobFlow.addTask(step);
         }
 
@@ -161,29 +203,62 @@ public class HiveMRInput implements IMRInput {
         protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
             KylinConfig config = context.getConfig();
             StringBuffer output = new StringBuffer();
+            try {
+                output.append(cleanUpIntermediateFlatTable(config));
+                output.append(cleanUpHiveViewIntermediateTable(config));
+            } catch (IOException e) {
+                logger.error("job:" + getId() + " execute finished with exception", e);
+                return new ExecuteResult(ExecuteResult.State.ERROR, e.getMessage());
+            }
+
+            return new ExecuteResult(ExecuteResult.State.SUCCEED, output.toString());
+        }
 
+        private String cleanUpIntermediateFlatTable(KylinConfig config) throws IOException {
+            StringBuffer output = new StringBuffer();
             final String hiveTable = this.getIntermediateTableIdentity();
             if (config.isHiveKeepFlatTable() == false && StringUtils.isNotEmpty(hiveTable)) {
                 final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
                 hiveCmdBuilder.addStatement("USE " + config.getHiveDatabaseForIntermediateTable() + ";");
                 hiveCmdBuilder.addStatement("DROP TABLE IF EXISTS  " + hiveTable + ";");
-                try {
-                    config.getCliCommandExecutor().execute(hiveCmdBuilder.build());
-                    output.append("Hive table " + hiveTable + " is dropped. \n");
-
-                    Path externalDataPath = new Path(getExternalDataPath());
-                    FileSystem fs = FileSystem.get(externalDataPath.toUri(), HadoopUtil.getCurrentConfiguration());
-                    if (fs.exists(externalDataPath)) {
-                        fs.delete(externalDataPath, true);
-                        output.append("Hive table " + hiveTable + " external data path " + externalDataPath + " is deleted. \n");
-                    }
-                } catch (IOException e) {
-                    logger.error("job:" + getId() + " execute finished with exception", e);
-                    return new ExecuteResult(ExecuteResult.State.ERROR, e.getMessage());
-                }
+
+                config.getCliCommandExecutor().execute(hiveCmdBuilder.build());
+                output.append("Hive table " + hiveTable + " is dropped. \n");
+
+                rmdirOnHDFS(getExternalDataPath());
+                output.append("Hive table " + hiveTable + " external data path " + getExternalDataPath() + " is deleted. \n");
+            }
+            return output.toString();
+        }
+
+        private void mkdirOnHDFS(String path) throws IOException {
+            Path externalDataPath = new Path(path);
+            FileSystem fs = FileSystem.get(externalDataPath.toUri(), HadoopUtil.getCurrentConfiguration());
+            if (!fs.exists(externalDataPath)) {
+                fs.mkdirs(externalDataPath);
             }
+        }
 
-            return new ExecuteResult(ExecuteResult.State.SUCCEED, output.toString());
+        private void rmdirOnHDFS(String path) throws IOException {
+            Path externalDataPath = new Path(path);
+            FileSystem fs = FileSystem.get(externalDataPath.toUri(), HadoopUtil.getCurrentConfiguration());
+            if (fs.exists(externalDataPath)) {
+                fs.delete(externalDataPath, true);
+            }
+        }
+
+        private String cleanUpHiveViewIntermediateTable(KylinConfig config) throws IOException {
+            StringBuffer output = new StringBuffer();
+            final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
+            hiveCmdBuilder.addStatement("USE " + config.getHiveDatabaseForIntermediateTable() + ";");
+            if (!getHiveViewIntermediateTableIdentitys().isEmpty()) {
+                for(String hiveTableName : getHiveViewIntermediateTableIdentitys().split(";")) {
+                    hiveCmdBuilder.addStatement("DROP TABLE IF EXISTS  " + hiveTableName + ";");
+                }
+            }
+            config.getCliCommandExecutor().execute(hiveCmdBuilder.build());
+            output.append("hive view intermediate tables: " + getHiveViewIntermediateTableIdentitys() + " is dropped. \n");
+            return output.toString();
         }
 
         public void setIntermediateTableIdentity(String tableIdentity) {
@@ -201,6 +276,14 @@ public class HiveMRInput implements IMRInput {
         private String getExternalDataPath() {
             return getParam("externalDataPath");
         }
+
+        public void setHiveViewIntermediateTableIdentitys(String tableIdentitys) {
+            setParam("oldHiveViewIntermediateTables", tableIdentitys);
+        }
+
+        private String getHiveViewIntermediateTableIdentitys() {
+            return getParam("oldHiveViewIntermediateTables");
+        }
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/66ad1386/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java
index 2aef4e6..6860f91 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java
@@ -25,6 +25,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 
+import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.kylin.common.KylinConfig;
@@ -116,6 +117,11 @@ public class HiveSourceTableLoader {
                 tableDesc.setUuid(UUID.randomUUID().toString());
                 tableDesc.setLastModified(0);
             }
+            if(table.getTableType().equals(TableType.VIRTUAL_VIEW.toString())) {
+                tableDesc.setSourceTableHiveViewFlag(true);
+            } else {
+                tableDesc.setSourceTableHiveViewFlag(false);
+            }
 
             int columnNumber = fields.size();
             List<ColumnDesc> columns = new ArrayList<ColumnDesc>(columnNumber);