You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2016/10/27 00:30:32 UTC
[5/7] kylin git commit: port KYLIN-2012 to new interface introduced
in KYLIN-2125
port KYLIN-2012 to new interface introduced in KYLIN-2125
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/615e21d7
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/615e21d7
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/615e21d7
Branch: refs/heads/master
Commit: 615e21d7fb0dae651ab643949ef5078bef7b12ea
Parents: ca6837d
Author: Hongbin Ma <ma...@apache.org>
Authored: Wed Oct 26 14:04:56 2016 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Thu Oct 27 08:30:13 2016 +0800
----------------------------------------------------------------------
.../java/org/apache/kylin/job/DeployUtil.java | 5 ++-
.../kylin/source/hive/BeelineHiveClient.java | 2 +-
.../source/hive/HiveSourceTableLoader.java | 32 +++++++--------
.../apache/kylin/source/hive/SchemaChecker.java | 41 ++++++++------------
4 files changed, 36 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/615e21d7/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
index 4a24ad2..54feb24 100644
--- a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
+++ b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
@@ -45,8 +45,9 @@ import org.apache.kylin.metadata.model.ColumnDesc;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.metadata.model.TableRef;
import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.source.hive.HiveClient;
+import org.apache.kylin.source.hive.HiveClientFactory;
import org.apache.kylin.source.hive.HiveCmdBuilder;
+import org.apache.kylin.source.hive.IHiveClient;
import org.apache.kylin.source.kafka.TimedJsonStreamParser;
import org.apache.maven.model.Model;
import org.apache.maven.model.io.xpp3.MavenXpp3Reader;
@@ -234,7 +235,7 @@ public class DeployUtil {
String tableFileDir = temp.getParent();
temp.delete();
- HiveClient hiveClient = new HiveClient();
+ IHiveClient hiveClient = HiveClientFactory.getHiveClient();
// create hive tables
hiveClient.executeHQL("CREATE DATABASE IF NOT EXISTS EDW");
hiveClient.executeHQL(generateCreateTableHql(metaMgr.getTableDesc(TABLE_CAL_DT.toUpperCase())));
http://git-wip-us.apache.org/repos/asf/kylin/blob/615e21d7/source-hive/src/main/java/org/apache/kylin/source/hive/BeelineHiveClient.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/BeelineHiveClient.java b/source-hive/src/main/java/org/apache/kylin/source/hive/BeelineHiveClient.java
index c8d56a5..a84aeb1 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/BeelineHiveClient.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/BeelineHiveClient.java
@@ -207,7 +207,7 @@ public class BeelineHiveClient implements IHiveClient {
BeelineHiveClient loader = new BeelineHiveClient("-n root --hiveconf hive.security.authorization.sqlstd.confwhitelist.append='mapreduce.job.*|dfs.*' -u 'jdbc:hive2://sandbox:10000'");
//BeelineHiveClient loader = new BeelineHiveClient(StringUtils.join(args, " "));
- HiveTableMeta hiveTableMeta = loader.getHiveTableMeta("default", "test001");
+ HiveTableMeta hiveTableMeta = loader.getHiveTableMeta("default", "test_kylin_fact_part");
System.out.println(hiveTableMeta);
loader.close();
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/615e21d7/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java
index 388e72b..401e720 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java
@@ -26,6 +26,7 @@ import java.util.Set;
import java.util.UUID;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.engine.mr.HadoopUtil;
import org.apache.kylin.metadata.MetadataConstants;
import org.apache.kylin.metadata.MetadataManager;
@@ -34,8 +35,10 @@ import org.apache.kylin.metadata.model.TableDesc;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.collect.LinkedHashMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.common.collect.SetMultimap;
import com.google.common.collect.Sets;
/**
@@ -49,27 +52,25 @@ public class HiveSourceTableLoader {
@SuppressWarnings("unused")
private static final Logger logger = LoggerFactory.getLogger(HiveSourceTableLoader.class);
- public static final String OUTPUT_SURFIX = "json";
- public static final String TABLE_FOLDER_NAME = "table";
- public static final String TABLE_EXD_FOLDER_NAME = "table_exd";
-
public static Set<String> reloadHiveTables(String[] hiveTables, KylinConfig config) throws IOException {
- Map<String, Set<String>> db2tables = Maps.newHashMap();
- for (String table : hiveTables) {
- String[] parts = HadoopUtil.parseHiveTableName(table);
- Set<String> set = db2tables.get(parts[0]);
- if (set == null) {
- set = Sets.newHashSet();
- db2tables.put(parts[0], set);
- }
- set.add(parts[1]);
+ SetMultimap<String, String> db2tables = LinkedHashMultimap.create();
+ for (String fullTableName : hiveTables) {
+ String[] parts = HadoopUtil.parseHiveTableName(fullTableName);
+ db2tables.put(parts[0], parts[1]);
+ }
+
+ IHiveClient hiveClient = HiveClientFactory.getHiveClient();
+ SchemaChecker checker = new SchemaChecker(hiveClient, MetadataManager.getInstance(config), CubeManager.getInstance(config));
+ for (Map.Entry<String, String> entry : db2tables.entries()) {
+ SchemaChecker.CheckResult result = checker.allowReload(entry.getKey(), entry.getValue());
+ result.raiseExceptionWhenInvalid();
}
// extract from hive
Set<String> loadedTables = Sets.newHashSet();
for (String database : db2tables.keySet()) {
- List<String> loaded = extractHiveTables(database, db2tables.get(database), config);
+ List<String> loaded = extractHiveTables(database, db2tables.get(database), hiveClient);
loadedTables.addAll(loaded);
}
@@ -82,12 +83,11 @@ public class HiveSourceTableLoader {
metaMgr.removeTableExd(hiveTable);
}
- private static List<String> extractHiveTables(String database, Set<String> tables, KylinConfig config) throws IOException {
+ private static List<String> extractHiveTables(String database, Set<String> tables, IHiveClient hiveClient) throws IOException {
List<String> loadedTables = Lists.newArrayList();
MetadataManager metaMgr = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv());
for (String tableName : tables) {
- IHiveClient hiveClient = HiveClientFactory.getHiveClient();
HiveTableMeta hiveTableMeta;
try {
hiveTableMeta = hiveClient.getHiveTableMeta(database, tableName);
http://git-wip-us.apache.org/repos/asf/kylin/blob/615e21d7/source-hive/src/main/java/org/apache/kylin/source/hive/SchemaChecker.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/SchemaChecker.java b/source-hive/src/main/java/org/apache/kylin/source/hive/SchemaChecker.java
index 319ebee..87a8870 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/SchemaChecker.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/SchemaChecker.java
@@ -27,8 +27,6 @@ import java.util.Set;
import javax.annotation.Nullable;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.model.CubeDesc;
@@ -46,7 +44,7 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
public class SchemaChecker {
- private final HiveClient hiveClient;
+ private final IHiveClient hiveClient;
private final MetadataManager metadataManager;
private final CubeManager cubeManager;
@@ -87,23 +85,16 @@ public class SchemaChecker {
}
}
- SchemaChecker(HiveClient hiveClient, MetadataManager metadataManager, CubeManager cubeManager) {
+ SchemaChecker(IHiveClient hiveClient, MetadataManager metadataManager, CubeManager cubeManager) {
this.hiveClient = checkNotNull(hiveClient, "hiveClient is null");
this.metadataManager = checkNotNull(metadataManager, "metadataManager is null");
this.cubeManager = checkNotNull(cubeManager, "cubeManager is null");
}
- private List<FieldSchema> fetchSchema(String dbName, String tblName) throws Exception {
- List<FieldSchema> fields = Lists.newArrayList();
- fields.addAll(hiveClient.getHiveTableFields(dbName, tblName));
-
- Table table = hiveClient.getHiveTable(dbName, tblName);
- List<FieldSchema> partitionFields = table.getPartitionKeys();
- if (partitionFields != null) {
- fields.addAll(partitionFields);
- }
-
- return fields;
+ private List<HiveTableMeta.HiveTableColumnMeta> fetchSchema(String dbName, String tblName) throws Exception {
+ List<HiveTableMeta.HiveTableColumnMeta> columnMetas = Lists.newArrayList();
+ columnMetas.addAll(hiveClient.getHiveTableMeta(dbName, tblName).allColumns);
+ return columnMetas;
}
private List<CubeInstance> findCubeByTable(final String fullTableName) {
@@ -128,12 +119,12 @@ public class SchemaChecker {
return ImmutableList.copyOf(relatedCubes);
}
- private boolean isColumnCompatible(ColumnDesc column, FieldSchema field) {
- if (!column.getName().equalsIgnoreCase(field.getName())) {
+ private boolean isColumnCompatible(ColumnDesc column, HiveTableMeta.HiveTableColumnMeta field) {
+ if (!column.getName().equalsIgnoreCase(field.name)) {
return false;
}
- String typeStr = field.getType();
+ String typeStr = field.dataType;
// kylin uses double internally for float, see HiveSourceTableLoader.java
// TODO should this normalization to be in DataType class ?
if ("float".equalsIgnoreCase(typeStr)) {
@@ -159,7 +150,7 @@ public class SchemaChecker {
* @param fieldsMap current hive schema of `table`
* @return true if all columns used in `cube` has compatible schema with `fieldsMap`, false otherwise
*/
- private List<String> checkAllColumnsInCube(CubeInstance cube, TableDesc table, Map<String, FieldSchema> fieldsMap) {
+ private List<String> checkAllColumnsInCube(CubeInstance cube, TableDesc table, Map<String, HiveTableMeta.HiveTableColumnMeta> fieldsMap) {
Set<ColumnDesc> usedColumns = Sets.newHashSet();
for (TblColRef col : cube.getAllColumns()) {
usedColumns.add(col.getColumnDesc());
@@ -168,7 +159,7 @@ public class SchemaChecker {
List<String> violateColumns = Lists.newArrayList();
for (ColumnDesc column : table.getColumns()) {
if (usedColumns.contains(column)) {
- FieldSchema field = fieldsMap.get(column.getName());
+ HiveTableMeta.HiveTableColumnMeta field = fieldsMap.get(column.getName());
if (field == null || !isColumnCompatible(column, field)) {
violateColumns.add(column.getName());
}
@@ -184,7 +175,7 @@ public class SchemaChecker {
* @param fields current table metadata in hive
* @return true if only new columns are appended in hive, false otherwise
*/
- private boolean checkAllColumnsInTableDesc(TableDesc table, List<FieldSchema> fields) {
+ private boolean checkAllColumnsInTableDesc(TableDesc table, List<HiveTableMeta.HiveTableColumnMeta> fields) {
if (table.getColumnCount() > fields.size()) {
return false;
}
@@ -206,15 +197,15 @@ public class SchemaChecker {
return CheckResult.validOnFirstLoad(fullTableName);
}
- List<FieldSchema> currentFields;
- Map<String, FieldSchema> currentFieldsMap = Maps.newHashMap();
+ List<HiveTableMeta.HiveTableColumnMeta> currentFields;
+ Map<String, HiveTableMeta.HiveTableColumnMeta> currentFieldsMap = Maps.newHashMap();
try {
currentFields = fetchSchema(dbName, tblName);
} catch (Exception e) {
return CheckResult.invalidOnFetchSchema(fullTableName, e);
}
- for (FieldSchema field : currentFields) {
- currentFieldsMap.put(field.getName().toUpperCase(), field);
+ for (HiveTableMeta.HiveTableColumnMeta field : currentFields) {
+ currentFieldsMap.put(field.name.toUpperCase(), field);
}
List<String> issues = Lists.newArrayList();