You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by sa...@apache.org on 2019/06/19 07:03:23 UTC

[atlas] branch master updated: ATLAS-3290: Impala Hook should get database name and table name from vertex metadata

This is an automated email from the ASF dual-hosted git repository.

sarath pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/master by this push:
     new 1c399cc  ATLAS-3290: Impala Hook should get database name and table name from vertex metadata
1c399cc is described below

commit 1c399cc7121fc2f68f50675f804e6cc21b7a5bf5
Author: lina.li <li...@cloudera.com>
AuthorDate: Tue Jun 18 23:57:27 2019 -0700

    ATLAS-3290: Impala Hook should get database name and table name from vertex metadata
    
    Signed-off-by: Sarath Subramanian <sa...@apache.org>
---
 .../atlas/impala/hook/AtlasImpalaHookContext.java  | 40 +++++++++++++
 .../atlas/impala/hook/ImpalaLineageHook.java       |  2 +-
 .../atlas/impala/hook/events/BaseImpalaEvent.java  |  2 +-
 .../apache/atlas/impala/ImpalaLineageToolIT.java   | 56 ++++++++++++++++++
 ...palaCreateTableAsSelectVertexIdNoTableName.json | 66 ++++++++++++++++++++++
 5 files changed, 164 insertions(+), 2 deletions(-)

diff --git a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/AtlasImpalaHookContext.java b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/AtlasImpalaHookContext.java
index 5b5f05a..1305f65 100644
--- a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/AtlasImpalaHookContext.java
+++ b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/AtlasImpalaHookContext.java
@@ -23,7 +23,10 @@ import java.util.HashMap;
 import java.util.Map;
 import org.apache.atlas.impala.model.ImpalaOperationType;
 import org.apache.atlas.impala.model.ImpalaQuery;
+import org.apache.atlas.impala.model.LineageVertex;
+import org.apache.atlas.impala.model.LineageVertexMetadata;
 import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.commons.lang.StringUtils;
 
 
 /**
@@ -101,6 +104,43 @@ public class AtlasImpalaHookContext {
             getClusterName();
     }
 
+    public String getQualifiedNameForColumn(LineageVertex vertex) {
+        // get database name and table name
+        LineageVertexMetadata metadata = vertex.getMetadata();
+
+        if (metadata == null) {
+            return getQualifiedNameForColumn(vertex.getVertexId());
+        }
+
+        String fullTableName = metadata.getTableName();
+
+        if (StringUtils.isEmpty(fullTableName)) {
+            throw new IllegalArgumentException("fullTableName in column metadata is null");
+        }
+
+        int sepPos = fullTableName.lastIndexOf(QNAME_SEP_ENTITY_NAME);
+
+        if (!isSeparatorIndexValid(sepPos)) {
+            throw new IllegalArgumentException(fullTableName + "in column metadata does not contain database name");
+        }
+
+        // get pure column name
+        String columnName = vertex.getVertexId();
+        if (StringUtils.isEmpty(columnName)) {
+            throw new IllegalArgumentException("column name in vertexId is null");
+        }
+
+        int sepPosLast = columnName.lastIndexOf(QNAME_SEP_ENTITY_NAME);
+        if (isSeparatorIndexValid(sepPosLast)) {
+            columnName = columnName.substring(sepPosLast+1);
+        }
+
+        return getQualifiedNameForColumn(
+            fullTableName.substring(0, sepPos),
+            fullTableName.substring(sepPos+1),
+            columnName);
+    }
+
     public String getQualifiedNameForColumn(String fullColumnName) throws IllegalArgumentException {
         if (fullColumnName == null) {
             throw new IllegalArgumentException("fullColumnName is null");
diff --git a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/ImpalaLineageHook.java b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/ImpalaLineageHook.java
index 6d65ae0..b5fdb6d 100644
--- a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/ImpalaLineageHook.java
+++ b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/ImpalaLineageHook.java
@@ -124,7 +124,7 @@ public class ImpalaLineageHook extends AtlasHook {
         } catch (Throwable t) {
 
             LOG.error("ImpalaLineageHook.process(): failed to process query {}",
-                lineageQuery.getQueryText(), t);
+                AtlasType.toJson(lineageQuery), t);
         }
 
         if (LOG.isDebugEnabled()) {
diff --git a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/events/BaseImpalaEvent.java b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/events/BaseImpalaEvent.java
index d437ba2..4ea484f 100644
--- a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/events/BaseImpalaEvent.java
+++ b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/events/BaseImpalaEvent.java
@@ -161,7 +161,7 @@ public abstract class BaseImpalaEvent {
                 return context.getQualifiedNameForTable(node.getVertexId());
 
             case COLUMN:
-                return context.getQualifiedNameForColumn(node.getVertexId());
+                return context.getQualifiedNameForColumn(node);
 
             default:
                 LOG.warn("null qualified name for type: {} and name: {}", nodeType, node.getVertexId());
diff --git a/addons/impala-bridge/src/test/java/org/apache/atlas/impala/ImpalaLineageToolIT.java b/addons/impala-bridge/src/test/java/org/apache/atlas/impala/ImpalaLineageToolIT.java
index 6908191..8ebb385 100644
--- a/addons/impala-bridge/src/test/java/org/apache/atlas/impala/ImpalaLineageToolIT.java
+++ b/addons/impala-bridge/src/test/java/org/apache/atlas/impala/ImpalaLineageToolIT.java
@@ -418,4 +418,60 @@ public class ImpalaLineageToolIT extends ImpalaLineageITBase {
         assertNotNull(ddlQueries);
         assertEquals(ddlQueries.size(), 0);
     }
+
+    /**
+     * This tests
+     * 1) ImpalaLineageTool can parse one lineage file that contains "create table as select" command lineage,
+     *    there is table vertex with createTime. The target vertex's vertexId does not contain db name and table name
+     * 2) Lineage is sent to Atlas
+     * 3) Atlas can get this lineage from Atlas
+     */
+    @Test
+    public void testCreateTableAsSelectVertexIdNoTableNameFromFile() throws Exception {
+        String IMPALA = dir + "impalaCreateTableAsSelectVertexIdNoTableName.json";
+        String IMPALA_WAL = dir + "WALimpala.wal";
+
+        ImpalaLineageHook impalaLineageHook = new ImpalaLineageHook();
+
+        // create database and tables to simulate Impala behavior that Impala updates metadata
+        // to HMS and HMSHook sends the metadata to Atlas, which has to happen before
+        // Atlas can handle lineage notification
+        String dbName = "sales_db";
+        createDatabase(dbName);
+
+        String sourceTableName = "sales_asia";
+        createTable(dbName, sourceTableName,"(id string, name string)", false);
+
+        String targetTableName = "sales_china";
+        createTable(dbName, targetTableName,"(id string, name string)", false);
+
+        // process lineage record, and send corresponding notification to Atlas
+        String[] args = new String[]{"-d", "./", "-p", "impala"};
+        ImpalaLineageTool toolInstance = new ImpalaLineageTool(args);
+        toolInstance.importHImpalaEntities(impalaLineageHook, IMPALA, IMPALA_WAL);
+
+        // verify the process is saved in Atlas
+        // the value is from info in IMPALA_4.
+        String createTime = new Long((long)1560885039*1000).toString();
+        String processQFName =
+            dbName + "." + targetTableName + AtlasImpalaHookContext.QNAME_SEP_CLUSTER_NAME +
+                CLUSTER_NAME + AtlasImpalaHookContext.QNAME_SEP_PROCESS + createTime;
+
+        processQFName = processQFName.toLowerCase();
+
+        String queryString = "create table " + targetTableName + " as select * from " + sourceTableName;
+        AtlasEntity processEntity1 = validateProcess(processQFName, queryString);
+        AtlasEntity processExecutionEntity1 = validateProcessExecution(processEntity1, queryString);
+        AtlasObjectId process1 = toAtlasObjectId(processExecutionEntity1.getRelationshipAttribute(
+            BaseImpalaEvent.ATTRIBUTE_PROCESS));
+        Assert.assertEquals(process1.getGuid(), processEntity1.getGuid());
+        Assert.assertEquals(numberOfProcessExecutions(processEntity1), 1);
+
+        String      guid       = assertTableIsRegistered(dbName, targetTableName);
+        AtlasEntity entity     = atlasClientV2.getEntityByGuid(guid).getEntity();
+        List        ddlQueries = (List) entity.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES);
+
+        assertNotNull(ddlQueries);
+        assertEquals(ddlQueries.size(), 1);
+    }
 }
\ No newline at end of file
diff --git a/addons/impala-bridge/src/test/resources/impalaCreateTableAsSelectVertexIdNoTableName.json b/addons/impala-bridge/src/test/resources/impalaCreateTableAsSelectVertexIdNoTableName.json
new file mode 100644
index 0000000..0fadcc8
--- /dev/null
+++ b/addons/impala-bridge/src/test/resources/impalaCreateTableAsSelectVertexIdNoTableName.json
@@ -0,0 +1,66 @@
+{
+  "queryText":"create table sales_china as select * from sales_asia",
+  "queryId":"2940d0b242de53ea:e82ba8d300000000",
+  "hash":"a705a9ec851a5440afca0dfb8df86cd5",
+  "user":"root",
+  "timestamp":1560885032,
+  "endTime":1560885040,
+  "edges":[
+    {
+      "sources":[
+        1
+      ],
+      "targets":[
+        0
+      ],
+      "edgeType":"PROJECTION"
+    },
+    {
+      "sources":[
+        3
+      ],
+      "targets":[
+        2
+      ],
+      "edgeType":"PROJECTION"
+    }
+  ],
+  "vertices":[
+    {
+      "id":0,
+      "vertexType":"COLUMN",
+      "vertexId":"id",
+      "metadata":{
+        "tableName":"sales_db.sales_china",
+        "tableCreateTime":1560885039
+      }
+    },
+    {
+      "id":1,
+      "vertexType":"COLUMN",
+      "vertexId":"sales_db.sales_asia.id",
+      "metadata":{
+        "tableName":"sales_db.sales_asia",
+        "tableCreateTime":1560884919
+      }
+    },
+    {
+      "id":2,
+      "vertexType":"COLUMN",
+      "vertexId":"name",
+      "metadata":{
+        "tableName":"sales_db.sales_china",
+        "tableCreateTime":1560885039
+      }
+    },
+    {
+      "id":3,
+      "vertexType":"COLUMN",
+      "vertexId":"sales_db.sales_asia.name",
+      "metadata":{
+        "tableName":"sales_db.sales_asia",
+        "tableCreateTime":1560884919
+      }
+    }
+  ]
+}
\ No newline at end of file