You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by sa...@apache.org on 2019/06/19 07:03:23 UTC
[atlas] branch master updated: ATLAS-3290: Impala Hook should get
database name and table name from vertex metadata
This is an automated email from the ASF dual-hosted git repository.
sarath pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push:
new 1c399cc ATLAS-3290: Impala Hook should get database name and table name from vertex metadata
1c399cc is described below
commit 1c399cc7121fc2f68f50675f804e6cc21b7a5bf5
Author: lina.li <li...@cloudera.com>
AuthorDate: Tue Jun 18 23:57:27 2019 -0700
ATLAS-3290: Impala Hook should get database name and table name from vertex metadata
Signed-off-by: Sarath Subramanian <sa...@apache.org>
---
.../atlas/impala/hook/AtlasImpalaHookContext.java | 40 +++++++++++++
.../atlas/impala/hook/ImpalaLineageHook.java | 2 +-
.../atlas/impala/hook/events/BaseImpalaEvent.java | 2 +-
.../apache/atlas/impala/ImpalaLineageToolIT.java | 56 ++++++++++++++++++
...palaCreateTableAsSelectVertexIdNoTableName.json | 66 ++++++++++++++++++++++
5 files changed, 164 insertions(+), 2 deletions(-)
diff --git a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/AtlasImpalaHookContext.java b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/AtlasImpalaHookContext.java
index 5b5f05a..1305f65 100644
--- a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/AtlasImpalaHookContext.java
+++ b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/AtlasImpalaHookContext.java
@@ -23,7 +23,10 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.atlas.impala.model.ImpalaOperationType;
import org.apache.atlas.impala.model.ImpalaQuery;
+import org.apache.atlas.impala.model.LineageVertex;
+import org.apache.atlas.impala.model.LineageVertexMetadata;
import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.commons.lang.StringUtils;
/**
@@ -101,6 +104,43 @@ public class AtlasImpalaHookContext {
getClusterName();
}
+ public String getQualifiedNameForColumn(LineageVertex vertex) {
+ // get database name and table name
+ LineageVertexMetadata metadata = vertex.getMetadata();
+
+ if (metadata == null) {
+ return getQualifiedNameForColumn(vertex.getVertexId());
+ }
+
+ String fullTableName = metadata.getTableName();
+
+ if (StringUtils.isEmpty(fullTableName)) {
+ throw new IllegalArgumentException("fullTableName in column metadata is null");
+ }
+
+ int sepPos = fullTableName.lastIndexOf(QNAME_SEP_ENTITY_NAME);
+
+ if (!isSeparatorIndexValid(sepPos)) {
+ throw new IllegalArgumentException(fullTableName + "in column metadata does not contain database name");
+ }
+
+ // get pure column name
+ String columnName = vertex.getVertexId();
+ if (StringUtils.isEmpty(columnName)) {
+ throw new IllegalArgumentException("column name in vertexId is null");
+ }
+
+ int sepPosLast = columnName.lastIndexOf(QNAME_SEP_ENTITY_NAME);
+ if (isSeparatorIndexValid(sepPosLast)) {
+ columnName = columnName.substring(sepPosLast+1);
+ }
+
+ return getQualifiedNameForColumn(
+ fullTableName.substring(0, sepPos),
+ fullTableName.substring(sepPos+1),
+ columnName);
+ }
+
public String getQualifiedNameForColumn(String fullColumnName) throws IllegalArgumentException {
if (fullColumnName == null) {
throw new IllegalArgumentException("fullColumnName is null");
diff --git a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/ImpalaLineageHook.java b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/ImpalaLineageHook.java
index 6d65ae0..b5fdb6d 100644
--- a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/ImpalaLineageHook.java
+++ b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/ImpalaLineageHook.java
@@ -124,7 +124,7 @@ public class ImpalaLineageHook extends AtlasHook {
} catch (Throwable t) {
LOG.error("ImpalaLineageHook.process(): failed to process query {}",
- lineageQuery.getQueryText(), t);
+ AtlasType.toJson(lineageQuery), t);
}
if (LOG.isDebugEnabled()) {
diff --git a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/events/BaseImpalaEvent.java b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/events/BaseImpalaEvent.java
index d437ba2..4ea484f 100644
--- a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/events/BaseImpalaEvent.java
+++ b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/events/BaseImpalaEvent.java
@@ -161,7 +161,7 @@ public abstract class BaseImpalaEvent {
return context.getQualifiedNameForTable(node.getVertexId());
case COLUMN:
- return context.getQualifiedNameForColumn(node.getVertexId());
+ return context.getQualifiedNameForColumn(node);
default:
LOG.warn("null qualified name for type: {} and name: {}", nodeType, node.getVertexId());
diff --git a/addons/impala-bridge/src/test/java/org/apache/atlas/impala/ImpalaLineageToolIT.java b/addons/impala-bridge/src/test/java/org/apache/atlas/impala/ImpalaLineageToolIT.java
index 6908191..8ebb385 100644
--- a/addons/impala-bridge/src/test/java/org/apache/atlas/impala/ImpalaLineageToolIT.java
+++ b/addons/impala-bridge/src/test/java/org/apache/atlas/impala/ImpalaLineageToolIT.java
@@ -418,4 +418,60 @@ public class ImpalaLineageToolIT extends ImpalaLineageITBase {
assertNotNull(ddlQueries);
assertEquals(ddlQueries.size(), 0);
}
+
+ /**
+ * This tests
+ * 1) ImpalaLineageTool can parse one lineage file that contains "create table as select" command lineage,
+ * there is table vertex with createTime. The target vertex's vertexId does not contain db name and table name
+ * 2) Lineage is sent to Atlas
+ * 3) Atlas can get this lineage from Atlas
+ */
+ @Test
+ public void testCreateTableAsSelectVertexIdNoTableNameFromFile() throws Exception {
+ String IMPALA = dir + "impalaCreateTableAsSelectVertexIdNoTableName.json";
+ String IMPALA_WAL = dir + "WALimpala.wal";
+
+ ImpalaLineageHook impalaLineageHook = new ImpalaLineageHook();
+
+ // create database and tables to simulate Impala behavior that Impala updates metadata
+ // to HMS and HMSHook sends the metadata to Atlas, which has to happen before
+ // Atlas can handle lineage notification
+ String dbName = "sales_db";
+ createDatabase(dbName);
+
+ String sourceTableName = "sales_asia";
+ createTable(dbName, sourceTableName,"(id string, name string)", false);
+
+ String targetTableName = "sales_china";
+ createTable(dbName, targetTableName,"(id string, name string)", false);
+
+ // process lineage record, and send corresponding notification to Atlas
+ String[] args = new String[]{"-d", "./", "-p", "impala"};
+ ImpalaLineageTool toolInstance = new ImpalaLineageTool(args);
+ toolInstance.importHImpalaEntities(impalaLineageHook, IMPALA, IMPALA_WAL);
+
+ // verify the process is saved in Atlas
+ // the value is from info in IMPALA_4.
+ String createTime = new Long((long)1560885039*1000).toString();
+ String processQFName =
+ dbName + "." + targetTableName + AtlasImpalaHookContext.QNAME_SEP_CLUSTER_NAME +
+ CLUSTER_NAME + AtlasImpalaHookContext.QNAME_SEP_PROCESS + createTime;
+
+ processQFName = processQFName.toLowerCase();
+
+ String queryString = "create table " + targetTableName + " as select * from " + sourceTableName;
+ AtlasEntity processEntity1 = validateProcess(processQFName, queryString);
+ AtlasEntity processExecutionEntity1 = validateProcessExecution(processEntity1, queryString);
+ AtlasObjectId process1 = toAtlasObjectId(processExecutionEntity1.getRelationshipAttribute(
+ BaseImpalaEvent.ATTRIBUTE_PROCESS));
+ Assert.assertEquals(process1.getGuid(), processEntity1.getGuid());
+ Assert.assertEquals(numberOfProcessExecutions(processEntity1), 1);
+
+ String guid = assertTableIsRegistered(dbName, targetTableName);
+ AtlasEntity entity = atlasClientV2.getEntityByGuid(guid).getEntity();
+ List ddlQueries = (List) entity.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES);
+
+ assertNotNull(ddlQueries);
+ assertEquals(ddlQueries.size(), 1);
+ }
}
\ No newline at end of file
diff --git a/addons/impala-bridge/src/test/resources/impalaCreateTableAsSelectVertexIdNoTableName.json b/addons/impala-bridge/src/test/resources/impalaCreateTableAsSelectVertexIdNoTableName.json
new file mode 100644
index 0000000..0fadcc8
--- /dev/null
+++ b/addons/impala-bridge/src/test/resources/impalaCreateTableAsSelectVertexIdNoTableName.json
@@ -0,0 +1,66 @@
+{
+ "queryText":"create table sales_china as select * from sales_asia",
+ "queryId":"2940d0b242de53ea:e82ba8d300000000",
+ "hash":"a705a9ec851a5440afca0dfb8df86cd5",
+ "user":"root",
+ "timestamp":1560885032,
+ "endTime":1560885040,
+ "edges":[
+ {
+ "sources":[
+ 1
+ ],
+ "targets":[
+ 0
+ ],
+ "edgeType":"PROJECTION"
+ },
+ {
+ "sources":[
+ 3
+ ],
+ "targets":[
+ 2
+ ],
+ "edgeType":"PROJECTION"
+ }
+ ],
+ "vertices":[
+ {
+ "id":0,
+ "vertexType":"COLUMN",
+ "vertexId":"id",
+ "metadata":{
+ "tableName":"sales_db.sales_china",
+ "tableCreateTime":1560885039
+ }
+ },
+ {
+ "id":1,
+ "vertexType":"COLUMN",
+ "vertexId":"sales_db.sales_asia.id",
+ "metadata":{
+ "tableName":"sales_db.sales_asia",
+ "tableCreateTime":1560884919
+ }
+ },
+ {
+ "id":2,
+ "vertexType":"COLUMN",
+ "vertexId":"name",
+ "metadata":{
+ "tableName":"sales_db.sales_china",
+ "tableCreateTime":1560885039
+ }
+ },
+ {
+ "id":3,
+ "vertexType":"COLUMN",
+ "vertexId":"sales_db.sales_asia.name",
+ "metadata":{
+ "tableName":"sales_db.sales_asia",
+ "tableCreateTime":1560884919
+ }
+ }
+ ]
+}
\ No newline at end of file