You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lens.apache.org by ra...@apache.org on 2015/10/09 06:17:22 UTC

[17/50] [abbrv] lens git commit: LENS-710 : Allow column name mapping for few/all columns in underlying storage tables

LENS-710 : Allow column name mapping for few/all columns in underlying storage tables


Project: http://git-wip-us.apache.org/repos/asf/lens/repo
Commit: http://git-wip-us.apache.org/repos/asf/lens/commit/3563aacf
Tree: http://git-wip-us.apache.org/repos/asf/lens/tree/3563aacf
Diff: http://git-wip-us.apache.org/repos/asf/lens/diff/3563aacf

Branch: refs/heads/current-release-line
Commit: 3563aacf7c41a257d4c306153555099333ed5a47
Parents: 3576207
Author: Amareshwari Sriramadasu <am...@gmail.com>
Authored: Tue Sep 8 21:49:39 2015 +0530
Committer: Rajat Khandelwal <ra...@gmail.com>
Committed: Tue Sep 8 21:49:39 2015 +0530

----------------------------------------------------------------------
 lens-api/src/main/resources/cube-0.1.xsd        |   9 +
 .../lens/driver/jdbc/ColumnarSQLRewriter.java   | 179 ++++++++++++++-----
 .../driver/jdbc/TestColumnarSQLRewriter.java    |  91 +++++++++-
 .../lens/server/api/LensConfConstants.java      |   5 +
 4 files changed, 232 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lens/blob/3563aacf/lens-api/src/main/resources/cube-0.1.xsd
----------------------------------------------------------------------
diff --git a/lens-api/src/main/resources/cube-0.1.xsd b/lens-api/src/main/resources/cube-0.1.xsd
index 0a981dd..58f68f5 100644
--- a/lens-api/src/main/resources/cube-0.1.xsd
+++ b/lens-api/src/main/resources/cube-0.1.xsd
@@ -811,6 +811,15 @@
         <xs:annotation>
           <xs:documentation>
             Table properties.
+            The following properties can be specified for DBStorage table :
+            1. lens.metastore.native.db.name : The underlying databse name in DB storage.
+            2. lens.metastore.native.table.name : The underlying table name in DB storage.
+            3. lens.metastore.native.table.column.mapping : The column mapping for columns of the table if they are
+            different in underlying DB storage. The value is specified with comma separated map entries specified with
+            key-values separated by equalto. Example value: id=id1,name=name1
+            The following properties can be specified for Elastic search tables :
+            1. lens.metastore.es.index.name : The underlying ES index name.
+            2. lens.metastore.es.type.name : The underlying ES type name.
           </xs:documentation>
         </xs:annotation>
       </xs:element>

http://git-wip-us.apache.org/repos/asf/lens/blob/3563aacf/lens-driver-jdbc/src/main/java/org/apache/lens/driver/jdbc/ColumnarSQLRewriter.java
----------------------------------------------------------------------
diff --git a/lens-driver-jdbc/src/main/java/org/apache/lens/driver/jdbc/ColumnarSQLRewriter.java b/lens-driver-jdbc/src/main/java/org/apache/lens/driver/jdbc/ColumnarSQLRewriter.java
index 9ceb9f3..295b476 100644
--- a/lens-driver-jdbc/src/main/java/org/apache/lens/driver/jdbc/ColumnarSQLRewriter.java
+++ b/lens-driver-jdbc/src/main/java/org/apache/lens/driver/jdbc/ColumnarSQLRewriter.java
@@ -42,6 +42,8 @@ import org.apache.hadoop.hive.ql.parse.SemanticException;
 
 import org.antlr.runtime.CommonToken;
 
+import lombok.Getter;
+import lombok.NoArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 
 /**
@@ -147,24 +149,31 @@ public class ColumnarSQLRewriter implements QueryRewriter {
   private String fromTree;
 
   /** The join ast. */
+  @Getter
   private ASTNode joinAST;
 
   /** The having ast. */
+  @Getter
   private ASTNode havingAST;
 
   /** The select ast. */
+  @Getter
   private ASTNode selectAST;
 
   /** The where ast. */
+  @Getter
   private ASTNode whereAST;
 
   /** The order by ast. */
+  @Getter
   private ASTNode orderByAST;
 
   /** The group by ast. */
+  @Getter
   private ASTNode groupByAST;
 
   /** The from ast. */
+  @Getter
   protected ASTNode fromAST;
 
   /**
@@ -944,7 +953,7 @@ public class ColumnarSQLRewriter implements QueryRewriter {
    */
   public void buildQuery(Configuration conf, HiveConf hconf) throws SemanticException {
     analyzeInternal(conf, hconf);
-    replaceWithUnderlyingStorage(hconf, fromAST);
+    replaceWithUnderlyingStorage(hconf);
     replaceAliasInAST();
     getFilterInJoinCond(fromAST);
     getAggregateColumns(selectAST, new MutableInt(0));
@@ -1187,67 +1196,143 @@ public class ColumnarSQLRewriter implements QueryRewriter {
     return queryReplacedUdf;
   }
 
-  // Replace Lens database names with storage's proper DB and table name based
-  // on table properties.
+
+  @NoArgsConstructor
+  private static class NativeTableInfo {
+    private Map<String, String> columnMapping = new HashMap<>();
+    NativeTableInfo(Table tbl) {
+      String columnMappingProp = tbl.getProperty(LensConfConstants.NATIVE_TABLE_COLUMN_MAPPING);
+      if (StringUtils.isNotBlank(columnMappingProp)) {
+        String[] columnMapArray = StringUtils.split(columnMappingProp, ",");
+        for (String columnMapEntry : columnMapArray) {
+          String[] mapEntry = StringUtils.split(columnMapEntry, "=");
+          columnMapping.put(mapEntry[0].trim(), mapEntry[1].trim());
+        }
+      }
+    }
+    String getNativeColumn(String col) {
+      String retCol = columnMapping.get(col);
+      return retCol != null ? retCol : col;
+    }
+  }
+
+  private Map<String, NativeTableInfo> aliasToNativeTableInfo = new HashMap<>();
 
   /**
    * Replace with underlying storage.
    *
-   * @param tree the AST tree
+   * @param metastoreConf the metastore configuration
    */
-  protected void replaceWithUnderlyingStorage(HiveConf metastoreConf, ASTNode tree) {
+  protected void replaceWithUnderlyingStorage(HiveConf metastoreConf) {
+    replaceDBAndTableNames(metastoreConf, fromAST);
+    if (aliasToNativeTableInfo.isEmpty()) {
+      return;
+    }
+    replaceColumnNames(selectAST);
+    replaceColumnNames(fromAST);
+    replaceColumnNames(whereAST);
+    replaceColumnNames(groupByAST);
+    replaceColumnNames(orderByAST);
+    replaceColumnNames(havingAST);
+  }
+  // Replace Lens database names with storage's proper DB and table name based
+  // on table properties.
+  protected void replaceDBAndTableNames(HiveConf metastoreConf, ASTNode tree) {
     if (tree == null) {
       return;
     }
 
-    if (TOK_TABNAME == tree.getToken().getType()) {
-      // If it has two children, the first one is the DB name and second one is
-      // table identifier
-      // Else, we have to add the DB name as the first child
-      try {
-        if (tree.getChildCount() == 2) {
-          ASTNode dbIdentifier = (ASTNode) tree.getChild(0);
-          ASTNode tableIdentifier = (ASTNode) tree.getChild(1);
-          String lensTable = dbIdentifier.getText() + "." + tableIdentifier.getText();
-          Table tbl = CubeMetastoreClient.getInstance(metastoreConf).getHiveTable(lensTable);
-          String table = getUnderlyingTableName(tbl);
-          String db = getUnderlyingDBName(tbl);
-
-          // Replace both table and db names
-          if ("default".equalsIgnoreCase(db)) {
-            // Remove the db name for this case
-            tree.deleteChild(0);
-          } else if (StringUtils.isNotBlank(db)) {
-            dbIdentifier.getToken().setText(db);
-          } // If db is empty, then leave the tree untouched
-
-          if (StringUtils.isNotBlank(table)) {
-            tableIdentifier.getToken().setText(table);
-          }
-        } else {
-          ASTNode tableIdentifier = (ASTNode) tree.getChild(0);
-          String lensTable = tableIdentifier.getText();
-          Table tbl = CubeMetastoreClient.getInstance(metastoreConf).getHiveTable(lensTable);
-          String table = getUnderlyingTableName(tbl);
-          // Replace table name
-          if (StringUtils.isNotBlank(table)) {
-            tableIdentifier.getToken().setText(table);
-          }
+    if (TOK_TABREF == tree.getToken().getType()) {
+      // TOK_TABREF will have TOK_TABNAME as first child and alias as second child.
+      String alias;
+      String tblName = null;
+      Table tbl = null;
+      ASTNode tabNameChild = (ASTNode) tree.getChild(0);
+      if (TOK_TABNAME == tabNameChild.getToken().getType()) {
+        // If it has two children, the first one is the DB name and second one is
+        // table identifier
+        // Else, we have to add the DB name as the first child
+        try {
+          if (tabNameChild.getChildCount() == 2) {
+            ASTNode dbIdentifier = (ASTNode) tabNameChild.getChild(0);
+            ASTNode tableIdentifier = (ASTNode) tabNameChild.getChild(1);
+            tblName = tableIdentifier.getText();
+            String lensTable = dbIdentifier.getText() + "." + tblName;
+            tbl = CubeMetastoreClient.getInstance(metastoreConf).getHiveTable(lensTable);
+            String table = getUnderlyingTableName(tbl);
+            String db = getUnderlyingDBName(tbl);
+
+            // Replace both table and db names
+            if ("default".equalsIgnoreCase(db)) {
+              // Remove the db name for this case
+              tabNameChild.deleteChild(0);
+            } else if (StringUtils.isNotBlank(db)) {
+              dbIdentifier.getToken().setText(db);
+            } // If db is empty, then leave the tree untouched
+
+            if (StringUtils.isNotBlank(table)) {
+              tableIdentifier.getToken().setText(table);
+            }
+          } else {
+            ASTNode tableIdentifier = (ASTNode) tabNameChild.getChild(0);
+            tblName = tableIdentifier.getText();
+            tbl = CubeMetastoreClient.getInstance(metastoreConf).getHiveTable(tblName);
+            String table = getUnderlyingTableName(tbl);
+            // Replace table name
+            if (StringUtils.isNotBlank(table)) {
+              tableIdentifier.getToken().setText(table);
+            }
 
-          // Add db name as a new child
-          String dbName = getUnderlyingDBName(tbl);
-          if (StringUtils.isNotBlank(dbName) && !"default".equalsIgnoreCase(dbName)) {
-            ASTNode dbIdentifier = new ASTNode(new CommonToken(HiveParser.Identifier, dbName));
-            dbIdentifier.setParent(tree);
-            tree.insertChild(0, dbIdentifier);
+            // Add db name as a new child
+            String dbName = getUnderlyingDBName(tbl);
+            if (StringUtils.isNotBlank(dbName) && !"default".equalsIgnoreCase(dbName)) {
+              ASTNode dbIdentifier = new ASTNode(new CommonToken(HiveParser.Identifier, dbName));
+              dbIdentifier.setParent(tabNameChild);
+              tabNameChild.insertChild(0, dbIdentifier);
+            }
+          }
+        } catch (HiveException e) {
+          log.warn("No corresponding table in metastore:", e);
+        }
+      }
+      if (tree.getChildCount() == 2) {
+        alias = tree.getChild(1).getText();
+      } else {
+        alias = tblName;
+      }
+      if (StringUtils.isNotBlank(alias)) {
+        alias = alias.toLowerCase();
+        if (!aliasToNativeTableInfo.containsKey(alias)) {
+          if (tbl != null) {
+            aliasToNativeTableInfo.put(alias, new NativeTableInfo(tbl));
           }
         }
-      } catch (HiveException e) {
-        log.warn("No corresponding table in metastore:", e);
       }
     } else {
       for (int i = 0; i < tree.getChildCount(); i++) {
-        replaceWithUnderlyingStorage(metastoreConf, (ASTNode) tree.getChild(i));
+        replaceDBAndTableNames(metastoreConf, (ASTNode) tree.getChild(i));
+      }
+    }
+  }
+
+  void replaceColumnNames(ASTNode node) {
+    if (node == null) {
+      return;
+    }
+    int nodeType = node.getToken().getType();
+    if (nodeType == HiveParser.DOT) {
+      ASTNode tabident = HQLParser.findNodeByPath(node, TOK_TABLE_OR_COL, Identifier);
+      ASTNode colIdent = (ASTNode) node.getChild(1);
+      String column = colIdent.getText().toLowerCase();
+      String alias = tabident.getText().toLowerCase();
+      if (aliasToNativeTableInfo.get(alias) != null) {
+        colIdent.getToken().setText(aliasToNativeTableInfo.get(alias).getNativeColumn(column));
+      }
+    } else {
+      // recurse down
+      for (int i = 0; i < node.getChildCount(); i++) {
+        ASTNode child = (ASTNode) node.getChild(i);
+        replaceColumnNames(child);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/lens/blob/3563aacf/lens-driver-jdbc/src/test/java/org/apache/lens/driver/jdbc/TestColumnarSQLRewriter.java
----------------------------------------------------------------------
diff --git a/lens-driver-jdbc/src/test/java/org/apache/lens/driver/jdbc/TestColumnarSQLRewriter.java b/lens-driver-jdbc/src/test/java/org/apache/lens/driver/jdbc/TestColumnarSQLRewriter.java
index 3415a1e..db09a4b 100644
--- a/lens-driver-jdbc/src/test/java/org/apache/lens/driver/jdbc/TestColumnarSQLRewriter.java
+++ b/lens-driver-jdbc/src/test/java/org/apache/lens/driver/jdbc/TestColumnarSQLRewriter.java
@@ -886,7 +886,7 @@ public class TestColumnarSQLRewriter {
     System.out.println(joinTreeBeforeRewrite);
 
     // Rewrite
-    rewriter.replaceWithUnderlyingStorage(hconf, rewriter.fromAST);
+    rewriter.replaceWithUnderlyingStorage(hconf);
     String joinTreeAfterRewrite = HQLParser.getString(rewriter.fromAST);
     System.out.println("joinTreeAfterRewrite:" + joinTreeAfterRewrite);
 
@@ -914,7 +914,7 @@ public class TestColumnarSQLRewriter {
     System.out.println(joinTreeBeforeRewrite);
 
     // Rewrite
-    rewriter.replaceWithUnderlyingStorage(hconf, rewriter.fromAST);
+    rewriter.replaceWithUnderlyingStorage(hconf);
     joinTreeAfterRewrite = HQLParser.getString(rewriter.fromAST);
     System.out.println(joinTreeAfterRewrite);
 
@@ -933,7 +933,7 @@ public class TestColumnarSQLRewriter {
     rewriter.query = defaultQuery;
     rewriter.analyzeInternal(conf, hconf);
     joinTreeBeforeRewrite = HQLParser.getString(rewriter.fromAST);
-    rewriter.replaceWithUnderlyingStorage(hconf, rewriter.fromAST);
+    rewriter.replaceWithUnderlyingStorage(hconf);
     joinTreeAfterRewrite = HQLParser.getString(rewriter.fromAST);
     assertTrue(joinTreeBeforeRewrite.contains("examples"), joinTreeBeforeRewrite);
     assertFalse(joinTreeAfterRewrite.contains("examples"), joinTreeAfterRewrite);
@@ -949,23 +949,104 @@ public class TestColumnarSQLRewriter {
   }
 
   /**
+   * Test replace column mapping.
+   *
+   * @throws Exception the exception
+   */
+  @Test
+  public void testReplaceColumnMapping() throws Exception {
+    SessionState.start(hconf);
+    String testDB = "testrcm";
+
+    // Create test table
+    Database database = new Database();
+    database.setName(testDB);
+
+    Hive.get(hconf).createDatabase(database);
+    try {
+      SessionState.get().setCurrentDatabase(testDB);
+      Map<String, String> columnMap = new HashMap<>();
+      columnMap.put("id", "id1");
+      columnMap.put("name", "name1");
+      createTable(hconf, testDB, "mytable", "testDB", "testTable_1", false, columnMap);
+      columnMap.put("id", "id2");
+      columnMap.put("name", "name2");
+      createTable(hconf, testDB, "mytable_2", "testDB", "testTable_2", false, columnMap);
+      columnMap.put("id", "id3");
+      columnMap.put("name", "name3");
+      createTable(hconf, "default", "mytable_3", "testDB", "testTable_3", false, columnMap);
+
+      String query = "SELECT t1.id, t2.id, t3.id, t1.name, t2.name, t3.name, count(1) FROM " + testDB
+        + ".mytable t1 JOIN mytable_2 t2 ON t1.t2id = t2.id   left outer join default.mytable_3 t3 on t2.t3id = t3.id"
+        + " WHERE t1.id = 100 GROUP BY t2.id HAVING count(t1.id) > 2 ORDER BY t3.id";
+
+      ColumnarSQLRewriter rewriter = new ColumnarSQLRewriter();
+      rewriter.init(conf);
+      rewriter.ast = HQLParser.parseHQL(query, hconf);
+      rewriter.query = query;
+      rewriter.analyzeInternal(conf, hconf);
+
+      // Rewrite
+      rewriter.replaceWithUnderlyingStorage(hconf);
+      String fromStringAfterRewrite = HQLParser.getString(rewriter.fromAST);
+      log.info("fromStringAfterRewrite:{}", fromStringAfterRewrite);
+
+      assertEquals(HQLParser.getString(rewriter.getSelectAST()).trim(), "( t1 . id1 ), ( t2 . id2 ), ( t3 . id3 ),"
+        + " ( t1 . name1 ), ( t2 . name2 ), ( t3 . name3 ), count( 1 )",
+        "Found :" + HQLParser.getString(rewriter.getSelectAST()));
+      assertEquals(HQLParser.getString(rewriter.getWhereAST()).trim(), "(( t1 . id1 ) =  100 )",
+        "Found: " + HQLParser.getString(rewriter.getWhereAST()));
+      assertEquals(HQLParser.getString(rewriter.getGroupByAST()).trim(), "( t2 . id2 )",
+        "Found: " + HQLParser.getString(rewriter.getGroupByAST()));
+      assertEquals(HQLParser.getString(rewriter.getOrderByAST()).trim(), "t3 . id3   asc",
+        "Found: " + HQLParser.getString(rewriter.getOrderByAST()));
+      assertEquals(HQLParser.getString(rewriter.getHavingAST()).trim(), "(count(( t1 . id1 )) >  2 )",
+        "Found: " + HQLParser.getString(rewriter.getHavingAST()));
+      assertTrue(fromStringAfterRewrite.contains("( t1 . t2id ) = ( t2 . id2 )")
+        && fromStringAfterRewrite.contains("( t2 . t3id ) = ( t3 . id3 )"), fromStringAfterRewrite);
+      assertFalse(fromStringAfterRewrite.contains(testDB), fromStringAfterRewrite);
+      assertTrue(fromStringAfterRewrite.contains("testdb"), fromStringAfterRewrite);
+      assertTrue(fromStringAfterRewrite.contains("testtable_1") && fromStringAfterRewrite.contains("testtable_2")
+        && fromStringAfterRewrite.contains("testtable_3"), fromStringAfterRewrite);
+    } finally {
+      Hive.get().dropTable("default", "mytable_3", true, true);
+      Hive.get().dropDatabase(testDB, true, true, true);
+      SessionState.get().setCurrentDatabase("default");
+    }
+  }
+
+  void createTable(HiveConf conf, String db, String table, String udb, String utable) throws Exception {
+    createTable(conf, db, table, udb, utable, true, null);
+  }
+
+  /**
    * Creates the table.
    *
    * @param db     the db
    * @param table  the table
    * @param udb    the udb
    * @param utable the utable
+   * @param setCustomSerde whether to set custom serde or not
+   * @param columnMapping columnmapping for the table
+   *
    * @throws Exception the exception
    */
-  void createTable(HiveConf conf, String db, String table, String udb, String utable) throws Exception {
+  void createTable(HiveConf conf, String db, String table, String udb, String utable, boolean setCustomSerde,
+    Map<String, String> columnMapping) throws Exception {
     Table tbl1 = new Table(db, table);
-    tbl1.setSerializationLib("DatabaseJarSerde");
+    if (setCustomSerde) {
+      tbl1.setSerializationLib("DatabaseJarSerde");
+    }
     if (StringUtils.isNotBlank(udb)) {
       tbl1.setProperty(LensConfConstants.NATIVE_DB_NAME, udb);
     }
     if (StringUtils.isNotBlank(utable)) {
       tbl1.setProperty(LensConfConstants.NATIVE_TABLE_NAME, utable);
     }
+    if (columnMapping != null && !columnMapping.isEmpty()) {
+      tbl1.setProperty(LensConfConstants.NATIVE_TABLE_COLUMN_MAPPING, StringUtils.join(columnMapping.entrySet(), ","));
+      log.info("columnMapping property:{}", tbl1.getProperty(LensConfConstants.NATIVE_TABLE_COLUMN_MAPPING));
+    }
 
     List<FieldSchema> columns = new ArrayList<FieldSchema>();
     columns.add(new FieldSchema("id", "int", "col1"));

http://git-wip-us.apache.org/repos/asf/lens/blob/3563aacf/lens-server-api/src/main/java/org/apache/lens/server/api/LensConfConstants.java
----------------------------------------------------------------------
diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/LensConfConstants.java b/lens-server-api/src/main/java/org/apache/lens/server/api/LensConfConstants.java
index 720825a..fb11f93 100644
--- a/lens-server-api/src/main/java/org/apache/lens/server/api/LensConfConstants.java
+++ b/lens-server-api/src/main/java/org/apache/lens/server/api/LensConfConstants.java
@@ -403,6 +403,11 @@ public final class LensConfConstants {
   public static final String NATIVE_TABLE_NAME = METASTORE_PFX + "native.table.name";
 
   /**
+   * The property name for setting the column mapping, if column names in native table are different
+   */
+  public static final String NATIVE_TABLE_COLUMN_MAPPING = METASTORE_PFX + "native.table.column.mapping";
+
+  /**
    * The Constant ES_INDEX_NAME.
    */
   public static final String ES_INDEX_NAME = METASTORE_PFX + "es.index.name";