You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lens.apache.org by ra...@apache.org on 2015/10/09 06:17:22 UTC
[17/50] [abbrv] lens git commit: LENS-710 : Allow column name mapping
for few/all columns in underlying storage tables
LENS-710 : Allow column name mapping for few/all columns in underlying storage tables
Project: http://git-wip-us.apache.org/repos/asf/lens/repo
Commit: http://git-wip-us.apache.org/repos/asf/lens/commit/3563aacf
Tree: http://git-wip-us.apache.org/repos/asf/lens/tree/3563aacf
Diff: http://git-wip-us.apache.org/repos/asf/lens/diff/3563aacf
Branch: refs/heads/current-release-line
Commit: 3563aacf7c41a257d4c306153555099333ed5a47
Parents: 3576207
Author: Amareshwari Sriramadasu <am...@gmail.com>
Authored: Tue Sep 8 21:49:39 2015 +0530
Committer: Rajat Khandelwal <ra...@gmail.com>
Committed: Tue Sep 8 21:49:39 2015 +0530
----------------------------------------------------------------------
lens-api/src/main/resources/cube-0.1.xsd | 9 +
.../lens/driver/jdbc/ColumnarSQLRewriter.java | 179 ++++++++++++++-----
.../driver/jdbc/TestColumnarSQLRewriter.java | 91 +++++++++-
.../lens/server/api/LensConfConstants.java | 5 +
4 files changed, 232 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lens/blob/3563aacf/lens-api/src/main/resources/cube-0.1.xsd
----------------------------------------------------------------------
diff --git a/lens-api/src/main/resources/cube-0.1.xsd b/lens-api/src/main/resources/cube-0.1.xsd
index 0a981dd..58f68f5 100644
--- a/lens-api/src/main/resources/cube-0.1.xsd
+++ b/lens-api/src/main/resources/cube-0.1.xsd
@@ -811,6 +811,15 @@
<xs:annotation>
<xs:documentation>
Table properties.
+ The following properties can be specified for DBStorage table :
+ 1. lens.metastore.native.db.name : The underlying databse name in DB storage.
+ 2. lens.metastore.native.table.name : The underlying table name in DB storage.
+ 3. lens.metastore.native.table.column.mapping : The column mapping for columns of the table if they are
+ different in underlying DB storage. The value is specified with comma separated map entries specified with
+ key-values separated by equalto. Example value: id=id1,name=name1
+ The following properties can be specified for Elastic search tables :
+ 1. lens.metastore.es.index.name : The underlying ES index name.
+ 2. lens.metastore.es.type.name : The underlying ES type name.
</xs:documentation>
</xs:annotation>
</xs:element>
http://git-wip-us.apache.org/repos/asf/lens/blob/3563aacf/lens-driver-jdbc/src/main/java/org/apache/lens/driver/jdbc/ColumnarSQLRewriter.java
----------------------------------------------------------------------
diff --git a/lens-driver-jdbc/src/main/java/org/apache/lens/driver/jdbc/ColumnarSQLRewriter.java b/lens-driver-jdbc/src/main/java/org/apache/lens/driver/jdbc/ColumnarSQLRewriter.java
index 9ceb9f3..295b476 100644
--- a/lens-driver-jdbc/src/main/java/org/apache/lens/driver/jdbc/ColumnarSQLRewriter.java
+++ b/lens-driver-jdbc/src/main/java/org/apache/lens/driver/jdbc/ColumnarSQLRewriter.java
@@ -42,6 +42,8 @@ import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.antlr.runtime.CommonToken;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
/**
@@ -147,24 +149,31 @@ public class ColumnarSQLRewriter implements QueryRewriter {
private String fromTree;
/** The join ast. */
+ @Getter
private ASTNode joinAST;
/** The having ast. */
+ @Getter
private ASTNode havingAST;
/** The select ast. */
+ @Getter
private ASTNode selectAST;
/** The where ast. */
+ @Getter
private ASTNode whereAST;
/** The order by ast. */
+ @Getter
private ASTNode orderByAST;
/** The group by ast. */
+ @Getter
private ASTNode groupByAST;
/** The from ast. */
+ @Getter
protected ASTNode fromAST;
/**
@@ -944,7 +953,7 @@ public class ColumnarSQLRewriter implements QueryRewriter {
*/
public void buildQuery(Configuration conf, HiveConf hconf) throws SemanticException {
analyzeInternal(conf, hconf);
- replaceWithUnderlyingStorage(hconf, fromAST);
+ replaceWithUnderlyingStorage(hconf);
replaceAliasInAST();
getFilterInJoinCond(fromAST);
getAggregateColumns(selectAST, new MutableInt(0));
@@ -1187,67 +1196,143 @@ public class ColumnarSQLRewriter implements QueryRewriter {
return queryReplacedUdf;
}
- // Replace Lens database names with storage's proper DB and table name based
- // on table properties.
+
+ @NoArgsConstructor
+ private static class NativeTableInfo {
+ private Map<String, String> columnMapping = new HashMap<>();
+ NativeTableInfo(Table tbl) {
+ String columnMappingProp = tbl.getProperty(LensConfConstants.NATIVE_TABLE_COLUMN_MAPPING);
+ if (StringUtils.isNotBlank(columnMappingProp)) {
+ String[] columnMapArray = StringUtils.split(columnMappingProp, ",");
+ for (String columnMapEntry : columnMapArray) {
+ String[] mapEntry = StringUtils.split(columnMapEntry, "=");
+ columnMapping.put(mapEntry[0].trim(), mapEntry[1].trim());
+ }
+ }
+ }
+ String getNativeColumn(String col) {
+ String retCol = columnMapping.get(col);
+ return retCol != null ? retCol : col;
+ }
+ }
+
+ private Map<String, NativeTableInfo> aliasToNativeTableInfo = new HashMap<>();
/**
* Replace with underlying storage.
*
- * @param tree the AST tree
+ * @param metastoreConf the metastore configuration
*/
- protected void replaceWithUnderlyingStorage(HiveConf metastoreConf, ASTNode tree) {
+ protected void replaceWithUnderlyingStorage(HiveConf metastoreConf) {
+ replaceDBAndTableNames(metastoreConf, fromAST);
+ if (aliasToNativeTableInfo.isEmpty()) {
+ return;
+ }
+ replaceColumnNames(selectAST);
+ replaceColumnNames(fromAST);
+ replaceColumnNames(whereAST);
+ replaceColumnNames(groupByAST);
+ replaceColumnNames(orderByAST);
+ replaceColumnNames(havingAST);
+ }
+ // Replace Lens database names with storage's proper DB and table name based
+ // on table properties.
+ protected void replaceDBAndTableNames(HiveConf metastoreConf, ASTNode tree) {
if (tree == null) {
return;
}
- if (TOK_TABNAME == tree.getToken().getType()) {
- // If it has two children, the first one is the DB name and second one is
- // table identifier
- // Else, we have to add the DB name as the first child
- try {
- if (tree.getChildCount() == 2) {
- ASTNode dbIdentifier = (ASTNode) tree.getChild(0);
- ASTNode tableIdentifier = (ASTNode) tree.getChild(1);
- String lensTable = dbIdentifier.getText() + "." + tableIdentifier.getText();
- Table tbl = CubeMetastoreClient.getInstance(metastoreConf).getHiveTable(lensTable);
- String table = getUnderlyingTableName(tbl);
- String db = getUnderlyingDBName(tbl);
-
- // Replace both table and db names
- if ("default".equalsIgnoreCase(db)) {
- // Remove the db name for this case
- tree.deleteChild(0);
- } else if (StringUtils.isNotBlank(db)) {
- dbIdentifier.getToken().setText(db);
- } // If db is empty, then leave the tree untouched
-
- if (StringUtils.isNotBlank(table)) {
- tableIdentifier.getToken().setText(table);
- }
- } else {
- ASTNode tableIdentifier = (ASTNode) tree.getChild(0);
- String lensTable = tableIdentifier.getText();
- Table tbl = CubeMetastoreClient.getInstance(metastoreConf).getHiveTable(lensTable);
- String table = getUnderlyingTableName(tbl);
- // Replace table name
- if (StringUtils.isNotBlank(table)) {
- tableIdentifier.getToken().setText(table);
- }
+ if (TOK_TABREF == tree.getToken().getType()) {
+ // TOK_TABREF will have TOK_TABNAME as first child and alias as second child.
+ String alias;
+ String tblName = null;
+ Table tbl = null;
+ ASTNode tabNameChild = (ASTNode) tree.getChild(0);
+ if (TOK_TABNAME == tabNameChild.getToken().getType()) {
+ // If it has two children, the first one is the DB name and second one is
+ // table identifier
+ // Else, we have to add the DB name as the first child
+ try {
+ if (tabNameChild.getChildCount() == 2) {
+ ASTNode dbIdentifier = (ASTNode) tabNameChild.getChild(0);
+ ASTNode tableIdentifier = (ASTNode) tabNameChild.getChild(1);
+ tblName = tableIdentifier.getText();
+ String lensTable = dbIdentifier.getText() + "." + tblName;
+ tbl = CubeMetastoreClient.getInstance(metastoreConf).getHiveTable(lensTable);
+ String table = getUnderlyingTableName(tbl);
+ String db = getUnderlyingDBName(tbl);
+
+ // Replace both table and db names
+ if ("default".equalsIgnoreCase(db)) {
+ // Remove the db name for this case
+ tabNameChild.deleteChild(0);
+ } else if (StringUtils.isNotBlank(db)) {
+ dbIdentifier.getToken().setText(db);
+ } // If db is empty, then leave the tree untouched
+
+ if (StringUtils.isNotBlank(table)) {
+ tableIdentifier.getToken().setText(table);
+ }
+ } else {
+ ASTNode tableIdentifier = (ASTNode) tabNameChild.getChild(0);
+ tblName = tableIdentifier.getText();
+ tbl = CubeMetastoreClient.getInstance(metastoreConf).getHiveTable(tblName);
+ String table = getUnderlyingTableName(tbl);
+ // Replace table name
+ if (StringUtils.isNotBlank(table)) {
+ tableIdentifier.getToken().setText(table);
+ }
- // Add db name as a new child
- String dbName = getUnderlyingDBName(tbl);
- if (StringUtils.isNotBlank(dbName) && !"default".equalsIgnoreCase(dbName)) {
- ASTNode dbIdentifier = new ASTNode(new CommonToken(HiveParser.Identifier, dbName));
- dbIdentifier.setParent(tree);
- tree.insertChild(0, dbIdentifier);
+ // Add db name as a new child
+ String dbName = getUnderlyingDBName(tbl);
+ if (StringUtils.isNotBlank(dbName) && !"default".equalsIgnoreCase(dbName)) {
+ ASTNode dbIdentifier = new ASTNode(new CommonToken(HiveParser.Identifier, dbName));
+ dbIdentifier.setParent(tabNameChild);
+ tabNameChild.insertChild(0, dbIdentifier);
+ }
+ }
+ } catch (HiveException e) {
+ log.warn("No corresponding table in metastore:", e);
+ }
+ }
+ if (tree.getChildCount() == 2) {
+ alias = tree.getChild(1).getText();
+ } else {
+ alias = tblName;
+ }
+ if (StringUtils.isNotBlank(alias)) {
+ alias = alias.toLowerCase();
+ if (!aliasToNativeTableInfo.containsKey(alias)) {
+ if (tbl != null) {
+ aliasToNativeTableInfo.put(alias, new NativeTableInfo(tbl));
}
}
- } catch (HiveException e) {
- log.warn("No corresponding table in metastore:", e);
}
} else {
for (int i = 0; i < tree.getChildCount(); i++) {
- replaceWithUnderlyingStorage(metastoreConf, (ASTNode) tree.getChild(i));
+ replaceDBAndTableNames(metastoreConf, (ASTNode) tree.getChild(i));
+ }
+ }
+ }
+
+ void replaceColumnNames(ASTNode node) {
+ if (node == null) {
+ return;
+ }
+ int nodeType = node.getToken().getType();
+ if (nodeType == HiveParser.DOT) {
+ ASTNode tabident = HQLParser.findNodeByPath(node, TOK_TABLE_OR_COL, Identifier);
+ ASTNode colIdent = (ASTNode) node.getChild(1);
+ String column = colIdent.getText().toLowerCase();
+ String alias = tabident.getText().toLowerCase();
+ if (aliasToNativeTableInfo.get(alias) != null) {
+ colIdent.getToken().setText(aliasToNativeTableInfo.get(alias).getNativeColumn(column));
+ }
+ } else {
+ // recurse down
+ for (int i = 0; i < node.getChildCount(); i++) {
+ ASTNode child = (ASTNode) node.getChild(i);
+ replaceColumnNames(child);
}
}
}
http://git-wip-us.apache.org/repos/asf/lens/blob/3563aacf/lens-driver-jdbc/src/test/java/org/apache/lens/driver/jdbc/TestColumnarSQLRewriter.java
----------------------------------------------------------------------
diff --git a/lens-driver-jdbc/src/test/java/org/apache/lens/driver/jdbc/TestColumnarSQLRewriter.java b/lens-driver-jdbc/src/test/java/org/apache/lens/driver/jdbc/TestColumnarSQLRewriter.java
index 3415a1e..db09a4b 100644
--- a/lens-driver-jdbc/src/test/java/org/apache/lens/driver/jdbc/TestColumnarSQLRewriter.java
+++ b/lens-driver-jdbc/src/test/java/org/apache/lens/driver/jdbc/TestColumnarSQLRewriter.java
@@ -886,7 +886,7 @@ public class TestColumnarSQLRewriter {
System.out.println(joinTreeBeforeRewrite);
// Rewrite
- rewriter.replaceWithUnderlyingStorage(hconf, rewriter.fromAST);
+ rewriter.replaceWithUnderlyingStorage(hconf);
String joinTreeAfterRewrite = HQLParser.getString(rewriter.fromAST);
System.out.println("joinTreeAfterRewrite:" + joinTreeAfterRewrite);
@@ -914,7 +914,7 @@ public class TestColumnarSQLRewriter {
System.out.println(joinTreeBeforeRewrite);
// Rewrite
- rewriter.replaceWithUnderlyingStorage(hconf, rewriter.fromAST);
+ rewriter.replaceWithUnderlyingStorage(hconf);
joinTreeAfterRewrite = HQLParser.getString(rewriter.fromAST);
System.out.println(joinTreeAfterRewrite);
@@ -933,7 +933,7 @@ public class TestColumnarSQLRewriter {
rewriter.query = defaultQuery;
rewriter.analyzeInternal(conf, hconf);
joinTreeBeforeRewrite = HQLParser.getString(rewriter.fromAST);
- rewriter.replaceWithUnderlyingStorage(hconf, rewriter.fromAST);
+ rewriter.replaceWithUnderlyingStorage(hconf);
joinTreeAfterRewrite = HQLParser.getString(rewriter.fromAST);
assertTrue(joinTreeBeforeRewrite.contains("examples"), joinTreeBeforeRewrite);
assertFalse(joinTreeAfterRewrite.contains("examples"), joinTreeAfterRewrite);
@@ -949,23 +949,104 @@ public class TestColumnarSQLRewriter {
}
/**
+ * Test replace column mapping.
+ *
+ * @throws Exception the exception
+ */
+ @Test
+ public void testReplaceColumnMapping() throws Exception {
+ SessionState.start(hconf);
+ String testDB = "testrcm";
+
+ // Create test table
+ Database database = new Database();
+ database.setName(testDB);
+
+ Hive.get(hconf).createDatabase(database);
+ try {
+ SessionState.get().setCurrentDatabase(testDB);
+ Map<String, String> columnMap = new HashMap<>();
+ columnMap.put("id", "id1");
+ columnMap.put("name", "name1");
+ createTable(hconf, testDB, "mytable", "testDB", "testTable_1", false, columnMap);
+ columnMap.put("id", "id2");
+ columnMap.put("name", "name2");
+ createTable(hconf, testDB, "mytable_2", "testDB", "testTable_2", false, columnMap);
+ columnMap.put("id", "id3");
+ columnMap.put("name", "name3");
+ createTable(hconf, "default", "mytable_3", "testDB", "testTable_3", false, columnMap);
+
+ String query = "SELECT t1.id, t2.id, t3.id, t1.name, t2.name, t3.name, count(1) FROM " + testDB
+ + ".mytable t1 JOIN mytable_2 t2 ON t1.t2id = t2.id left outer join default.mytable_3 t3 on t2.t3id = t3.id"
+ + " WHERE t1.id = 100 GROUP BY t2.id HAVING count(t1.id) > 2 ORDER BY t3.id";
+
+ ColumnarSQLRewriter rewriter = new ColumnarSQLRewriter();
+ rewriter.init(conf);
+ rewriter.ast = HQLParser.parseHQL(query, hconf);
+ rewriter.query = query;
+ rewriter.analyzeInternal(conf, hconf);
+
+ // Rewrite
+ rewriter.replaceWithUnderlyingStorage(hconf);
+ String fromStringAfterRewrite = HQLParser.getString(rewriter.fromAST);
+ log.info("fromStringAfterRewrite:{}", fromStringAfterRewrite);
+
+ assertEquals(HQLParser.getString(rewriter.getSelectAST()).trim(), "( t1 . id1 ), ( t2 . id2 ), ( t3 . id3 ),"
+ + " ( t1 . name1 ), ( t2 . name2 ), ( t3 . name3 ), count( 1 )",
+ "Found :" + HQLParser.getString(rewriter.getSelectAST()));
+ assertEquals(HQLParser.getString(rewriter.getWhereAST()).trim(), "(( t1 . id1 ) = 100 )",
+ "Found: " + HQLParser.getString(rewriter.getWhereAST()));
+ assertEquals(HQLParser.getString(rewriter.getGroupByAST()).trim(), "( t2 . id2 )",
+ "Found: " + HQLParser.getString(rewriter.getGroupByAST()));
+ assertEquals(HQLParser.getString(rewriter.getOrderByAST()).trim(), "t3 . id3 asc",
+ "Found: " + HQLParser.getString(rewriter.getOrderByAST()));
+ assertEquals(HQLParser.getString(rewriter.getHavingAST()).trim(), "(count(( t1 . id1 )) > 2 )",
+ "Found: " + HQLParser.getString(rewriter.getHavingAST()));
+ assertTrue(fromStringAfterRewrite.contains("( t1 . t2id ) = ( t2 . id2 )")
+ && fromStringAfterRewrite.contains("( t2 . t3id ) = ( t3 . id3 )"), fromStringAfterRewrite);
+ assertFalse(fromStringAfterRewrite.contains(testDB), fromStringAfterRewrite);
+ assertTrue(fromStringAfterRewrite.contains("testdb"), fromStringAfterRewrite);
+ assertTrue(fromStringAfterRewrite.contains("testtable_1") && fromStringAfterRewrite.contains("testtable_2")
+ && fromStringAfterRewrite.contains("testtable_3"), fromStringAfterRewrite);
+ } finally {
+ Hive.get().dropTable("default", "mytable_3", true, true);
+ Hive.get().dropDatabase(testDB, true, true, true);
+ SessionState.get().setCurrentDatabase("default");
+ }
+ }
+
+ void createTable(HiveConf conf, String db, String table, String udb, String utable) throws Exception {
+ createTable(conf, db, table, udb, utable, true, null);
+ }
+
+ /**
* Creates the table.
*
* @param db the db
* @param table the table
* @param udb the udb
* @param utable the utable
+ * @param setCustomSerde whether to set custom serde or not
+ * @param columnMapping columnmapping for the table
+ *
* @throws Exception the exception
*/
- void createTable(HiveConf conf, String db, String table, String udb, String utable) throws Exception {
+ void createTable(HiveConf conf, String db, String table, String udb, String utable, boolean setCustomSerde,
+ Map<String, String> columnMapping) throws Exception {
Table tbl1 = new Table(db, table);
- tbl1.setSerializationLib("DatabaseJarSerde");
+ if (setCustomSerde) {
+ tbl1.setSerializationLib("DatabaseJarSerde");
+ }
if (StringUtils.isNotBlank(udb)) {
tbl1.setProperty(LensConfConstants.NATIVE_DB_NAME, udb);
}
if (StringUtils.isNotBlank(utable)) {
tbl1.setProperty(LensConfConstants.NATIVE_TABLE_NAME, utable);
}
+ if (columnMapping != null && !columnMapping.isEmpty()) {
+ tbl1.setProperty(LensConfConstants.NATIVE_TABLE_COLUMN_MAPPING, StringUtils.join(columnMapping.entrySet(), ","));
+ log.info("columnMapping property:{}", tbl1.getProperty(LensConfConstants.NATIVE_TABLE_COLUMN_MAPPING));
+ }
List<FieldSchema> columns = new ArrayList<FieldSchema>();
columns.add(new FieldSchema("id", "int", "col1"));
http://git-wip-us.apache.org/repos/asf/lens/blob/3563aacf/lens-server-api/src/main/java/org/apache/lens/server/api/LensConfConstants.java
----------------------------------------------------------------------
diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/LensConfConstants.java b/lens-server-api/src/main/java/org/apache/lens/server/api/LensConfConstants.java
index 720825a..fb11f93 100644
--- a/lens-server-api/src/main/java/org/apache/lens/server/api/LensConfConstants.java
+++ b/lens-server-api/src/main/java/org/apache/lens/server/api/LensConfConstants.java
@@ -403,6 +403,11 @@ public final class LensConfConstants {
public static final String NATIVE_TABLE_NAME = METASTORE_PFX + "native.table.name";
/**
+ * The property name for setting the column mapping, if column names in native table are different
+ */
+ public static final String NATIVE_TABLE_COLUMN_MAPPING = METASTORE_PFX + "native.table.column.mapping";
+
+ /**
* The Constant ES_INDEX_NAME.
*/
public static final String ES_INDEX_NAME = METASTORE_PFX + "es.index.name";