You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/11/02 08:44:17 UTC

[04/33] kylin git commit: port KYLIN-2012 to new interface introduced in KYLIN-2125

port KYLIN-2012 to new interface introduced in KYLIN-2125


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/cd2a06a5
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/cd2a06a5
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/cd2a06a5

Branch: refs/heads/v1.6.0-rc1-cdh5.7
Commit: cd2a06a5d373bdd5cfa90e78649d42e891711c43
Parents: 553d7c5
Author: Hongbin Ma <ma...@apache.org>
Authored: Wed Oct 26 14:04:56 2016 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Thu Oct 27 08:34:11 2016 +0800

----------------------------------------------------------------------
 .../java/org/apache/kylin/job/DeployUtil.java   |  5 ++-
 .../kylin/source/hive/BeelineHiveClient.java    |  2 +-
 .../source/hive/HiveSourceTableLoader.java      | 32 +++++++--------
 .../apache/kylin/source/hive/SchemaChecker.java | 41 ++++++++------------
 4 files changed, 36 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/cd2a06a5/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
index 4a24ad2..54feb24 100644
--- a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
+++ b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
@@ -45,8 +45,9 @@ import org.apache.kylin.metadata.model.ColumnDesc;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.TableRef;
 import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.source.hive.HiveClient;
+import org.apache.kylin.source.hive.HiveClientFactory;
 import org.apache.kylin.source.hive.HiveCmdBuilder;
+import org.apache.kylin.source.hive.IHiveClient;
 import org.apache.kylin.source.kafka.TimedJsonStreamParser;
 import org.apache.maven.model.Model;
 import org.apache.maven.model.io.xpp3.MavenXpp3Reader;
@@ -234,7 +235,7 @@ public class DeployUtil {
         String tableFileDir = temp.getParent();
         temp.delete();
 
-        HiveClient hiveClient = new HiveClient();
+        IHiveClient hiveClient = HiveClientFactory.getHiveClient();
         // create hive tables
         hiveClient.executeHQL("CREATE DATABASE IF NOT EXISTS EDW");
         hiveClient.executeHQL(generateCreateTableHql(metaMgr.getTableDesc(TABLE_CAL_DT.toUpperCase())));

http://git-wip-us.apache.org/repos/asf/kylin/blob/cd2a06a5/source-hive/src/main/java/org/apache/kylin/source/hive/BeelineHiveClient.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/BeelineHiveClient.java b/source-hive/src/main/java/org/apache/kylin/source/hive/BeelineHiveClient.java
index c8d56a5..a84aeb1 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/BeelineHiveClient.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/BeelineHiveClient.java
@@ -207,7 +207,7 @@ public class BeelineHiveClient implements IHiveClient {
 
         BeelineHiveClient loader = new BeelineHiveClient("-n root --hiveconf hive.security.authorization.sqlstd.confwhitelist.append='mapreduce.job.*|dfs.*' -u 'jdbc:hive2://sandbox:10000'");
         //BeelineHiveClient loader = new BeelineHiveClient(StringUtils.join(args, " "));
-        HiveTableMeta hiveTableMeta = loader.getHiveTableMeta("default", "test001");
+        HiveTableMeta hiveTableMeta = loader.getHiveTableMeta("default", "test_kylin_fact_part");
         System.out.println(hiveTableMeta);
         loader.close();
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/cd2a06a5/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java
index 388e72b..401e720 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java
@@ -26,6 +26,7 @@ import java.util.Set;
 import java.util.UUID;
 
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.engine.mr.HadoopUtil;
 import org.apache.kylin.metadata.MetadataConstants;
 import org.apache.kylin.metadata.MetadataManager;
@@ -34,8 +35,10 @@ import org.apache.kylin.metadata.model.TableDesc;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.LinkedHashMultimap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.SetMultimap;
 import com.google.common.collect.Sets;
 
 /**
@@ -49,27 +52,25 @@ public class HiveSourceTableLoader {
     @SuppressWarnings("unused")
     private static final Logger logger = LoggerFactory.getLogger(HiveSourceTableLoader.class);
 
-    public static final String OUTPUT_SURFIX = "json";
-    public static final String TABLE_FOLDER_NAME = "table";
-    public static final String TABLE_EXD_FOLDER_NAME = "table_exd";
-
     public static Set<String> reloadHiveTables(String[] hiveTables, KylinConfig config) throws IOException {
 
-        Map<String, Set<String>> db2tables = Maps.newHashMap();
-        for (String table : hiveTables) {
-            String[] parts = HadoopUtil.parseHiveTableName(table);
-            Set<String> set = db2tables.get(parts[0]);
-            if (set == null) {
-                set = Sets.newHashSet();
-                db2tables.put(parts[0], set);
-            }
-            set.add(parts[1]);
+        SetMultimap<String, String> db2tables = LinkedHashMultimap.create();
+        for (String fullTableName : hiveTables) {
+            String[] parts = HadoopUtil.parseHiveTableName(fullTableName);
+            db2tables.put(parts[0], parts[1]);
+        }
+
+        IHiveClient hiveClient = HiveClientFactory.getHiveClient();
+        SchemaChecker checker = new SchemaChecker(hiveClient, MetadataManager.getInstance(config), CubeManager.getInstance(config));
+        for (Map.Entry<String, String> entry : db2tables.entries()) {
+            SchemaChecker.CheckResult result = checker.allowReload(entry.getKey(), entry.getValue());
+            result.raiseExceptionWhenInvalid();
         }
 
         // extract from hive
         Set<String> loadedTables = Sets.newHashSet();
         for (String database : db2tables.keySet()) {
-            List<String> loaded = extractHiveTables(database, db2tables.get(database), config);
+            List<String> loaded = extractHiveTables(database, db2tables.get(database), hiveClient);
             loadedTables.addAll(loaded);
         }
 
@@ -82,12 +83,11 @@ public class HiveSourceTableLoader {
         metaMgr.removeTableExd(hiveTable);
     }
 
-    private static List<String> extractHiveTables(String database, Set<String> tables, KylinConfig config) throws IOException {
+    private static List<String> extractHiveTables(String database, Set<String> tables, IHiveClient hiveClient) throws IOException {
 
         List<String> loadedTables = Lists.newArrayList();
         MetadataManager metaMgr = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv());
         for (String tableName : tables) {
-            IHiveClient hiveClient = HiveClientFactory.getHiveClient();
             HiveTableMeta hiveTableMeta;
             try {
                 hiveTableMeta = hiveClient.getHiveTableMeta(database, tableName);

http://git-wip-us.apache.org/repos/asf/kylin/blob/cd2a06a5/source-hive/src/main/java/org/apache/kylin/source/hive/SchemaChecker.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/SchemaChecker.java b/source-hive/src/main/java/org/apache/kylin/source/hive/SchemaChecker.java
index 319ebee..87a8870 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/SchemaChecker.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/SchemaChecker.java
@@ -27,8 +27,6 @@ import java.util.Set;
 
 import javax.annotation.Nullable;
 
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.model.CubeDesc;
@@ -46,7 +44,7 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
 public class SchemaChecker {
-    private final HiveClient hiveClient;
+    private final IHiveClient hiveClient;
     private final MetadataManager metadataManager;
     private final CubeManager cubeManager;
 
@@ -87,23 +85,16 @@ public class SchemaChecker {
         }
     }
 
-    SchemaChecker(HiveClient hiveClient, MetadataManager metadataManager, CubeManager cubeManager) {
+    SchemaChecker(IHiveClient hiveClient, MetadataManager metadataManager, CubeManager cubeManager) {
         this.hiveClient = checkNotNull(hiveClient, "hiveClient is null");
         this.metadataManager = checkNotNull(metadataManager, "metadataManager is null");
         this.cubeManager = checkNotNull(cubeManager, "cubeManager is null");
     }
 
-    private List<FieldSchema> fetchSchema(String dbName, String tblName) throws Exception {
-        List<FieldSchema> fields = Lists.newArrayList();
-        fields.addAll(hiveClient.getHiveTableFields(dbName, tblName));
-
-        Table table = hiveClient.getHiveTable(dbName, tblName);
-        List<FieldSchema> partitionFields = table.getPartitionKeys();
-        if (partitionFields != null) {
-            fields.addAll(partitionFields);
-        }
-
-        return fields;
+    private List<HiveTableMeta.HiveTableColumnMeta> fetchSchema(String dbName, String tblName) throws Exception {
+        List<HiveTableMeta.HiveTableColumnMeta> columnMetas = Lists.newArrayList();
+        columnMetas.addAll(hiveClient.getHiveTableMeta(dbName, tblName).allColumns);
+        return columnMetas;
     }
 
     private List<CubeInstance> findCubeByTable(final String fullTableName) {
@@ -128,12 +119,12 @@ public class SchemaChecker {
         return ImmutableList.copyOf(relatedCubes);
     }
 
-    private boolean isColumnCompatible(ColumnDesc column, FieldSchema field) {
-        if (!column.getName().equalsIgnoreCase(field.getName())) {
+    private boolean isColumnCompatible(ColumnDesc column, HiveTableMeta.HiveTableColumnMeta field) {
+        if (!column.getName().equalsIgnoreCase(field.name)) {
             return false;
         }
 
-        String typeStr = field.getType();
+        String typeStr = field.dataType;
         // kylin uses double internally for float, see HiveSourceTableLoader.java
         // TODO should this normalization to be in DataType class ?
         if ("float".equalsIgnoreCase(typeStr)) {
@@ -159,7 +150,7 @@ public class SchemaChecker {
      * @param fieldsMap current hive schema of `table`
      * @return true if all columns used in `cube` has compatible schema with `fieldsMap`, false otherwise
      */
-    private List<String> checkAllColumnsInCube(CubeInstance cube, TableDesc table, Map<String, FieldSchema> fieldsMap) {
+    private List<String> checkAllColumnsInCube(CubeInstance cube, TableDesc table, Map<String, HiveTableMeta.HiveTableColumnMeta> fieldsMap) {
         Set<ColumnDesc> usedColumns = Sets.newHashSet();
         for (TblColRef col : cube.getAllColumns()) {
             usedColumns.add(col.getColumnDesc());
@@ -168,7 +159,7 @@ public class SchemaChecker {
         List<String> violateColumns = Lists.newArrayList();
         for (ColumnDesc column : table.getColumns()) {
             if (usedColumns.contains(column)) {
-                FieldSchema field = fieldsMap.get(column.getName());
+                HiveTableMeta.HiveTableColumnMeta field = fieldsMap.get(column.getName());
                 if (field == null || !isColumnCompatible(column, field)) {
                     violateColumns.add(column.getName());
                 }
@@ -184,7 +175,7 @@ public class SchemaChecker {
      * @param fields current table metadata in hive
      * @return true if only new columns are appended in hive, false otherwise
      */
-    private boolean checkAllColumnsInTableDesc(TableDesc table, List<FieldSchema> fields) {
+    private boolean checkAllColumnsInTableDesc(TableDesc table, List<HiveTableMeta.HiveTableColumnMeta> fields) {
         if (table.getColumnCount() > fields.size()) {
             return false;
         }
@@ -206,15 +197,15 @@ public class SchemaChecker {
             return CheckResult.validOnFirstLoad(fullTableName);
         }
 
-        List<FieldSchema> currentFields;
-        Map<String, FieldSchema> currentFieldsMap = Maps.newHashMap();
+        List<HiveTableMeta.HiveTableColumnMeta> currentFields;
+        Map<String, HiveTableMeta.HiveTableColumnMeta> currentFieldsMap = Maps.newHashMap();
         try {
             currentFields = fetchSchema(dbName, tblName);
         } catch (Exception e) {
             return CheckResult.invalidOnFetchSchema(fullTableName, e);
         }
-        for (FieldSchema field : currentFields) {
-            currentFieldsMap.put(field.getName().toUpperCase(), field);
+        for (HiveTableMeta.HiveTableColumnMeta field : currentFields) {
+            currentFieldsMap.put(field.name.toUpperCase(), field);
         }
 
         List<String> issues = Lists.newArrayList();