You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by sa...@apache.org on 2019/05/20 22:08:47 UTC

[atlas] branch master updated: ATLAS-3184: Add support of lineage integration for more Impala commands

This is an automated email from the ASF dual-hosted git repository.

sarath pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/master by this push:
     new 1ca6ee1  ATLAS-3184: Add support of lineage integration for more Impala commands
1ca6ee1 is described below

commit 1ca6ee125529a033252bf74baafcf49685fc20ae
Author: lina.li <li...@cloudera.com>
AuthorDate: Mon May 20 15:08:31 2019 -0700

    ATLAS-3184: Add support of lineage integration for more Impala commands
    
    Signed-off-by: Sarath Subramanian <ss...@cloudera.com>
---
 .../hook/ImpalaLineageHook.java                    |   6 +-
 .../hook/ImpalaOperationParser.java                |  35 +++++-
 .../hook/events/BaseImpalaEvent.java               |  92 +++++++++++++-
 .../model/ImpalaOperationType.java                 |  10 ++
 .../apache/atlas/impala/ImpalaLineageToolIT.java   | 138 ++++++++++++++++++++-
 .../impala-bridge/src/test/resources/impala5.json  |  62 +++++++++
 .../impala-bridge/src/test/resources/impala6.json  |  62 +++++++++
 .../impala-bridge/src/test/resources/impala7.json  |  75 +++++++++++
 8 files changed, 467 insertions(+), 13 deletions(-)

diff --git a/addons/impala-bridge/src/main/java/org.apache.atlas.impala/hook/ImpalaLineageHook.java b/addons/impala-bridge/src/main/java/org.apache.atlas.impala/hook/ImpalaLineageHook.java
index c77bb38..fdb6748 100644
--- a/addons/impala-bridge/src/main/java/org.apache.atlas.impala/hook/ImpalaLineageHook.java
+++ b/addons/impala-bridge/src/main/java/org.apache.atlas.impala/hook/ImpalaLineageHook.java
@@ -46,7 +46,6 @@ public class ImpalaLineageHook extends AtlasHook implements IImpalaLineageHook {
     public static final String CONF_REALM_NAME                     = "atlas.realm.name";
     public static final String HDFS_PATH_CONVERT_TO_LOWER_CASE     = CONF_PREFIX + "hdfs_path.convert_to_lowercase";
 
-    private ImpalaOperationParser parser = new ImpalaOperationParser();
     private static final String clusterName;
     private  static final String realm;
     private static final boolean convertHdfsPathToLowerCase;
@@ -77,13 +76,16 @@ public class ImpalaLineageHook extends AtlasHook implements IImpalaLineageHook {
         }
 
         try {
-            ImpalaOperationType operationType = parser.getImpalaOperationType(lineageQuery.getQueryText());
+            ImpalaOperationType operationType = ImpalaOperationParser.getImpalaOperationType(lineageQuery.getQueryText());
             AtlasImpalaHookContext context =
                 new AtlasImpalaHookContext(this, operationType, lineageQuery);
             BaseImpalaEvent event = null;
 
             switch (operationType) {
                     case CREATEVIEW:
+                    case CREATETABLE_AS_SELECT:
+                    case ALTERVIEW_AS:
+                    case QUERY:
                         event = new CreateImpalaProcess(context);
                         break;
                 default:
diff --git a/addons/impala-bridge/src/main/java/org.apache.atlas.impala/hook/ImpalaOperationParser.java b/addons/impala-bridge/src/main/java/org.apache.atlas.impala/hook/ImpalaOperationParser.java
index 6cb726c..b9dd894 100644
--- a/addons/impala-bridge/src/main/java/org.apache.atlas.impala/hook/ImpalaOperationParser.java
+++ b/addons/impala-bridge/src/main/java/org.apache.atlas.impala/hook/ImpalaOperationParser.java
@@ -19,6 +19,7 @@
 package org.apache.atlas.impala.hook;
 
 import org.apache.atlas.impala.model.ImpalaOperationType;
+import org.apache.commons.lang.StringUtils;
 
 /**
  * Parse an Impala query text and output the impala operation type
@@ -28,12 +29,40 @@ public class ImpalaOperationParser {
     public ImpalaOperationParser() {
     }
 
-    public ImpalaOperationType getImpalaOperationType(String queryText) {
-        // TODO: more Impala commands will be handled in ATLAS-3184
-        if (queryText.toLowerCase().startsWith("create view")) {
+    public static ImpalaOperationType getImpalaOperationType(String queryText) {
+        // Impala does no generate lineage record for command "LOAD DATA INPATH"
+        if (StringUtils.startsWithIgnoreCase(queryText, "create view")) {
             return ImpalaOperationType.CREATEVIEW;
+        } else if (StringUtils.startsWithIgnoreCase(queryText, "create table") &&
+        StringUtils.containsIgnoreCase(queryText, "as select")) {
+            return ImpalaOperationType.CREATETABLE_AS_SELECT;
+        } else if (StringUtils.startsWithIgnoreCase(queryText, "alter view") &&
+            StringUtils.containsIgnoreCase(queryText, "as select")) {
+            return ImpalaOperationType.ALTERVIEW_AS;
+        } else if (StringUtils.containsIgnoreCase(queryText, "insert into") &&
+            StringUtils.containsIgnoreCase(queryText, "select") &&
+            StringUtils.containsIgnoreCase(queryText, "from")) {
+            return ImpalaOperationType.QUERY;
+        } else if (StringUtils.containsIgnoreCase(queryText,"insert overwrite") &&
+            StringUtils.containsIgnoreCase(queryText, "select") &&
+            StringUtils.containsIgnoreCase(queryText, "from")) {
+            return ImpalaOperationType.QUERY;
         }
 
         return ImpalaOperationType.UNKNOWN;
     }
+
+    public static ImpalaOperationType getImpalaOperationSubType(ImpalaOperationType operationType, String queryText) {
+        if (operationType == ImpalaOperationType.QUERY) {
+            if (StringUtils.containsIgnoreCase(queryText, "insert into")) {
+                return ImpalaOperationType.INSERT;
+            } else if (StringUtils.containsIgnoreCase(queryText, "insert overwrite")) {
+                return ImpalaOperationType.INSERT_OVERWRITE;
+            }
+        }
+
+        return ImpalaOperationType.UNKNOWN;
+    }
+
+
 }
\ No newline at end of file
diff --git a/addons/impala-bridge/src/main/java/org.apache.atlas.impala/hook/events/BaseImpalaEvent.java b/addons/impala-bridge/src/main/java/org.apache.atlas.impala/hook/events/BaseImpalaEvent.java
index 0487739..afe296c 100644
--- a/addons/impala-bridge/src/main/java/org.apache.atlas.impala/hook/events/BaseImpalaEvent.java
+++ b/addons/impala-bridge/src/main/java/org.apache.atlas.impala/hook/events/BaseImpalaEvent.java
@@ -21,12 +21,16 @@ package org.apache.atlas.impala.hook.events;
 import static org.apache.atlas.impala.hook.AtlasImpalaHookContext.QNAME_SEP_PROCESS;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import org.apache.atlas.impala.hook.AtlasImpalaHookContext;
+import org.apache.atlas.impala.hook.ImpalaOperationParser;
 import org.apache.atlas.impala.model.ImpalaDataType;
 import org.apache.atlas.impala.model.ImpalaNode;
 import org.apache.atlas.impala.model.ImpalaOperationType;
@@ -172,8 +176,9 @@ public abstract class BaseImpalaEvent {
     protected String getQualifiedName(List<AtlasEntity> inputs, List<AtlasEntity> outputs) throws Exception {
         ImpalaOperationType operation = context.getImpalaOperationType();
 
-        // TODO: add more operation type here
-        if (operation == ImpalaOperationType.CREATEVIEW) {
+        if (operation == ImpalaOperationType.CREATEVIEW ||
+            operation == ImpalaOperationType.CREATETABLE_AS_SELECT ||
+            operation == ImpalaOperationType.ALTERVIEW_AS) {
             List<? extends AtlasEntity> sortedEntities = new ArrayList<>(outputs);
 
             Collections.sort(sortedEntities, entityComparator);
@@ -187,8 +192,83 @@ public abstract class BaseImpalaEvent {
             }
         }
 
-        // TODO: add code for name construction for HDFS path
-        return null;
+        if (operation != ImpalaOperationType.QUERY) {
+            String errorMessage = String.format("Expect operation to be QUERY, but get unexpected operation type {}", operation.name());
+            LOG.error(errorMessage);
+            throw new IllegalArgumentException(errorMessage);
+        }
+
+        // construct qualified name for QUERY
+        String qualifiedName = null;
+        String operationName = operation.toString();
+
+        if (operationName != null) {
+            StringBuilder sb = new StringBuilder(operationName);
+
+            addToProcessQualifiedName(sb, inputs, false);
+            sb.append("->");
+            addToProcessQualifiedName(sb, outputs, true);
+
+            qualifiedName = sb.toString();
+        }
+
+
+        return qualifiedName;
+    }
+
+    protected void addToProcessQualifiedName(StringBuilder processQualifiedName, Collection<? extends AtlasEntity> entities, boolean isOutput) {
+        if (entities == null) {
+            return;
+        }
+
+        ImpalaOperationType         operation      = context.getImpalaOperationType();
+        String                      queryText      = context.getQueryStr();
+        List<? extends AtlasEntity> sortedEntities = new ArrayList<>(entities);
+
+        Collections.sort(sortedEntities, entityComparator);
+
+        Set<String> dataSetsProcessed = new HashSet<>();
+
+        for (AtlasEntity entity : sortedEntities) {
+            String qualifiedName = null;
+            long   createTime    = 0;
+
+            qualifiedName = (String)entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME);
+
+            if (entity.getTypeName().equalsIgnoreCase(HIVE_TYPE_TABLE)) {
+                Long createTimeObj = (Long)entity.getAttribute(ATTRIBUTE_CREATE_TIME);
+                if (createTimeObj != null) {
+                    createTime = createTimeObj;
+                }
+            }
+
+            if (qualifiedName == null || !dataSetsProcessed.add(qualifiedName)) {
+                continue;
+            }
+
+            if (isOutput) {
+                boolean             addWriteType = false;
+                ImpalaOperationType subType      = ImpalaOperationParser.getImpalaOperationSubType(operation, queryText);
+
+                    switch (subType) {
+                        // Impala does not generate lineage for UPDATE and DELETE
+                        case INSERT:
+                        case INSERT_OVERWRITE:
+                            addWriteType = true;
+                            break;
+                    }
+
+                    if (addWriteType) {
+                        processQualifiedName.append(QNAME_SEP_PROCESS).append(subType.name());
+                    }
+            }
+
+            processQualifiedName.append(QNAME_SEP_PROCESS).append(qualifiedName.toLowerCase().replaceAll("/", ""));
+
+            if (createTime != 0) {
+                processQualifiedName.append(QNAME_SEP_PROCESS).append(createTime);
+            }
+        }
     }
 
     protected AtlasEntity getInputOutputEntity(ImpalaNode node, AtlasEntityExtInfo entityExtInfo) throws Exception {
@@ -419,8 +499,8 @@ public abstract class BaseImpalaEvent {
     }
 
     protected AtlasEntity getImpalaProcessEntity(List<AtlasEntity> inputs, List<AtlasEntity> outputs) throws Exception {
-        AtlasEntity ret         = new AtlasEntity(ImpalaDataType.IMPALA_PROCESS.getName());
-        String      queryStr    = context.getQueryStr();
+        AtlasEntity         ret           = new AtlasEntity(ImpalaDataType.IMPALA_PROCESS.getName());
+        String              queryStr      = context.getQueryStr();
 
         if (queryStr != null) {
             queryStr = queryStr.toLowerCase().trim();
diff --git a/addons/impala-bridge/src/main/java/org.apache.atlas.impala/model/ImpalaOperationType.java b/addons/impala-bridge/src/main/java/org.apache.atlas.impala/model/ImpalaOperationType.java
index 8b0be16..a893b88 100644
--- a/addons/impala-bridge/src/main/java/org.apache.atlas.impala/model/ImpalaOperationType.java
+++ b/addons/impala-bridge/src/main/java/org.apache.atlas.impala/model/ImpalaOperationType.java
@@ -18,7 +18,17 @@
 package org.apache.atlas.impala.model;
 
 public enum ImpalaOperationType{
+    // main operation type
     CREATEVIEW ("CREATEVIEW"),
+    CREATETABLE_AS_SELECT ("CREATETABLE_AS_SELECT"),
+    ALTERVIEW_AS ("ALTERVIEW_AS"),
+    QUERY ("QUERY"),
+
+    // sub operation type, which is associated with output
+    INSERT ("INSERT"),
+    INSERT_OVERWRITE ("INSERT_OVERWRITE"),
+
+    // default type
     UNKNOWN ("UNKNOWN");
 
     private final String name;
diff --git a/addons/impala-bridge/src/test/java/org/apache/atlas/impala/ImpalaLineageToolIT.java b/addons/impala-bridge/src/test/java/org/apache/atlas/impala/ImpalaLineageToolIT.java
index 05190b6..6e4d321 100644
--- a/addons/impala-bridge/src/test/java/org/apache/atlas/impala/ImpalaLineageToolIT.java
+++ b/addons/impala-bridge/src/test/java/org/apache/atlas/impala/ImpalaLineageToolIT.java
@@ -19,13 +19,14 @@ package org.apache.atlas.impala;
 
 import java.util.ArrayList;
 import java.util.List;
-
 import org.apache.atlas.impala.hook.AtlasImpalaHookContext;
 import org.apache.atlas.impala.hook.ImpalaLineageHook;
 import org.apache.atlas.impala.model.ImpalaQuery;
 import org.testng.annotations.Test;
 
 public class ImpalaLineageToolIT extends ImpalaLineageITBase {
+    public static final long TABLE_CREATE_TIME_SOURCE = 1554750070;
+    public static final long TABLE_CREATE_TIME        = 1554750072;
     private static String dir = System.getProperty("user.dir") + "/src/test/resources/";
     private static String IMPALA = dir + "impala3.json";
     private static String IMPALA_WAL = dir + "WALimpala.wal";
@@ -75,4 +76,137 @@ public class ImpalaLineageToolIT extends ImpalaLineageITBase {
             System.out.print("Appending file error");
         }
     }
-}
+
+    /**
+     * This tests
+     * 1) ImpalaLineageTool can parse one lineage file that contains "create table as select" command lineage,
+     *    there is table vertex with createTime.
+     * 2) Lineage is sent to Atlas
+     * 3) Atlas can get this lineage from Atlas
+     */
+    @Test
+    public void testCreateTableAsSelectFromFile() throws Exception {
+        String IMPALA = dir + "impala5.json";
+        String IMPALA_WAL = dir + "WALimpala.wal";
+
+        ImpalaLineageHook impalaLineageHook = new ImpalaLineageHook();
+
+        // create database and tables to simulate Impala behavior that Impala updates metadata
+        // to HMS and HMSHook sends the metadata to Atlas, which has to happen before
+        // Atlas can handle lineage notification
+        String dbName = "db_3";
+        createDatabase(dbName);
+
+        String sourceTableName = "table_1";
+        createTable(dbName, sourceTableName,"(id string, count int)", false);
+
+        String targetTableName = "table_2";
+        createTable(dbName, targetTableName,"(count int, id string)", false);
+
+        // process lineage record, and send corresponding notification to Atlas
+        String[] args = new String[]{"-d", "./", "-p", "impala"};
+        ImpalaLineageTool toolInstance = new ImpalaLineageTool(args);
+        toolInstance.importHImpalaEntities(impalaLineageHook, IMPALA, IMPALA_WAL);
+
+        // verify the process is saved in Atlas
+        // the value is from info in IMPALA_4.
+        String createTime = new Long(TABLE_CREATE_TIME*1000).toString();
+        String processQFName =
+            dbName + "." + targetTableName + AtlasImpalaHookContext.QNAME_SEP_CLUSTER_NAME +
+                CLUSTER_NAME + AtlasImpalaHookContext.QNAME_SEP_PROCESS + createTime;
+
+        processQFName = processQFName.toLowerCase();
+
+        assertProcessIsRegistered(processQFName,
+            "create table " + dbName + "." + targetTableName + " as select count, id from " + dbName + "." + sourceTableName);
+    }
+
+    /**
+     * This tests
+     * 1) ImpalaLineageTool can parse one lineage file that contains "alter view as select" command lineage,
+     *    there is table vertex with createTime.
+     * 2) Lineage is sent to Atlas
+     * 3) Atlas can get this lineage from Atlas
+     */
+    @Test
+    public void testAlterViewAsSelectFromFile() throws Exception {
+        String IMPALA = dir + "impala6.json";
+        String IMPALA_WAL = dir + "WALimpala.wal";
+
+        ImpalaLineageHook impalaLineageHook = new ImpalaLineageHook();
+
+        // create database and tables to simulate Impala behavior that Impala updates metadata
+        // to HMS and HMSHook sends the metadata to Atlas, which has to happen before
+        // Atlas can handle lineage notification
+        String dbName = "db_4";
+        createDatabase(dbName);
+
+        String sourceTableName = "table_1";
+        createTable(dbName, sourceTableName,"(id string, count int)", false);
+
+        String targetTableName = "view_1";
+        createTable(dbName, targetTableName,"(count int, id string)", false);
+
+        // process lineage record, and send corresponding notification to Atlas
+        String[] args = new String[]{"-d", "./", "-p", "impala"};
+        ImpalaLineageTool toolInstance = new ImpalaLineageTool(args);
+        toolInstance.importHImpalaEntities(impalaLineageHook, IMPALA, IMPALA_WAL);
+
+        // verify the process is saved in Atlas
+        // the value is from info in IMPALA_4.
+        String createTime = new Long(TABLE_CREATE_TIME*1000).toString();
+        String processQFName =
+            dbName + "." + targetTableName + AtlasImpalaHookContext.QNAME_SEP_CLUSTER_NAME +
+                CLUSTER_NAME + AtlasImpalaHookContext.QNAME_SEP_PROCESS + createTime;
+
+        processQFName = processQFName.toLowerCase();
+
+        assertProcessIsRegistered(processQFName,
+            "alter view " + dbName + "." + targetTableName + " as select count, id from " + dbName + "." + sourceTableName);
+    }
+
+    /**
+     * This tests
+     * 1) ImpalaLineageTool can parse one lineage file that contains "insert into" command lineage,
+     *    there is table vertex with createTime.
+     * 2) Lineage is sent to Atlas
+     * 3) Atlas can get this lineage from Atlas
+     */
+    @Test
+    public void testInsertIntoAsSelectFromFile() throws Exception {
+        String IMPALA = dir + "impala7.json";
+        String IMPALA_WAL = dir + "WALimpala.wal";
+
+        ImpalaLineageHook impalaLineageHook = new ImpalaLineageHook();
+
+        // create database and tables to simulate Impala behavior that Impala updates metadata
+        // to HMS and HMSHook sends the metadata to Atlas, which has to happen before
+        // Atlas can handle lineage notification
+        String dbName = "db_5";
+        createDatabase(dbName);
+
+        String sourceTableName = "table_1";
+        createTable(dbName, sourceTableName,"(id string, count int)", false);
+
+        String targetTableName = "table_2";
+        createTable(dbName, targetTableName,"(count int, id string, int_col int)", false);
+
+        // process lineage record, and send corresponding notification to Atlas
+        String[] args = new String[]{"-d", "./", "-p", "impala"};
+        ImpalaLineageTool toolInstance = new ImpalaLineageTool(args);
+        toolInstance.importHImpalaEntities(impalaLineageHook, IMPALA, IMPALA_WAL);
+
+        // verify the process is saved in Atlas
+        // the value is from info in IMPALA_4.
+        String createTime1 = new Long(TABLE_CREATE_TIME_SOURCE*1000).toString();
+        String createTime2 = new Long(TABLE_CREATE_TIME*1000).toString();
+        String sourceQFName = dbName + "." + sourceTableName + AtlasImpalaHookContext.QNAME_SEP_CLUSTER_NAME +
+            CLUSTER_NAME + AtlasImpalaHookContext.QNAME_SEP_PROCESS + createTime1;
+        String targetQFName = dbName + "." + targetTableName + AtlasImpalaHookContext.QNAME_SEP_CLUSTER_NAME +
+            CLUSTER_NAME + AtlasImpalaHookContext.QNAME_SEP_PROCESS + createTime2;
+        String processQFName = "QUERY:" + sourceQFName.toLowerCase() + "->:INSERT:" + targetQFName.toLowerCase();
+
+        assertProcessIsRegistered(processQFName,
+            "insert into table " + dbName + "." + targetTableName + " (count, id) select count, id from " + dbName + "." + sourceTableName);
+    }
+}
\ No newline at end of file
diff --git a/addons/impala-bridge/src/test/resources/impala5.json b/addons/impala-bridge/src/test/resources/impala5.json
new file mode 100644
index 0000000..854969b
--- /dev/null
+++ b/addons/impala-bridge/src/test/resources/impala5.json
@@ -0,0 +1,62 @@
+{
+  "queryText":"create table db_3.table_2 as select count, id from db_3.table_1",
+  "queryId":"3a441d0c130962f8:7f634aec00000000",
+  "hash":"64ff0425ccdfaada53e3f2fd76f566f7",
+  "user":"admin",
+  "timestamp":1554750072,
+  "endTime":1554750554,
+  "edges":[
+    {
+      "sources":[
+        1
+      ],
+      "targets":[
+        0
+      ],
+      "edgeType":"PROJECTION"
+    },
+    {
+      "sources":[
+        3
+      ],
+      "targets":[
+        2
+      ],
+      "edgeType":"PROJECTION"
+    }
+  ],
+  "vertices":[
+    {
+      "id":0,
+      "vertexType":"COLUMN",
+      "vertexId":"db_3.table_2.count"
+    },
+    {
+      "id":1,
+      "vertexType":"COLUMN",
+      "vertexId":"db_3.table_1.count"
+    },
+    {
+      "id":2,
+      "vertexType":"COLUMN",
+      "vertexId":"db_3.table_2.id"
+    },
+    {
+      "id":3,
+      "vertexType":"COLUMN",
+      "vertexId":"db_3.table_1.id"
+    },
+    {
+      "id":4,
+      "vertexType":"TABLE",
+      "vertexId":"db_3.table_1",
+      "createTime":1554750070
+    },
+    {
+      "id":5,
+      "vertexType":"TABLE",
+      "vertexId":"db_3.table_2",
+      "createTime":1554750072
+    }
+  ]
+}
\ No newline at end of file
diff --git a/addons/impala-bridge/src/test/resources/impala6.json b/addons/impala-bridge/src/test/resources/impala6.json
new file mode 100644
index 0000000..f136180
--- /dev/null
+++ b/addons/impala-bridge/src/test/resources/impala6.json
@@ -0,0 +1,62 @@
+{
+  "queryText":"alter view db_4.view_1 as select count, id from db_4.table_1",
+  "queryId":"3a441d0c130962f8:7f634aec00000000",
+  "hash":"64ff0425ccdfaada53e3f2fd76f566f7",
+  "user":"admin",
+  "timestamp":1554750072,
+  "endTime":1554750554,
+  "edges":[
+    {
+      "sources":[
+        1
+      ],
+      "targets":[
+        0
+      ],
+      "edgeType":"PROJECTION"
+    },
+    {
+      "sources":[
+        3
+      ],
+      "targets":[
+        2
+      ],
+      "edgeType":"PROJECTION"
+    }
+  ],
+  "vertices":[
+    {
+      "id":0,
+      "vertexType":"COLUMN",
+      "vertexId":"db_4.view_1.count"
+    },
+    {
+      "id":1,
+      "vertexType":"COLUMN",
+      "vertexId":"db_4.table_1.count"
+    },
+    {
+      "id":2,
+      "vertexType":"COLUMN",
+      "vertexId":"db_4.view_1.id"
+    },
+    {
+      "id":3,
+      "vertexType":"COLUMN",
+      "vertexId":"db_4.table_1.id"
+    },
+    {
+      "id":4,
+      "vertexType":"TABLE",
+      "vertexId":"db_4.table_1",
+      "createTime":1554750070
+    },
+    {
+      "id":5,
+      "vertexType":"TABLE",
+      "vertexId":"db_4.view_1",
+      "createTime":1554750072
+    }
+  ]
+}
\ No newline at end of file
diff --git a/addons/impala-bridge/src/test/resources/impala7.json b/addons/impala-bridge/src/test/resources/impala7.json
new file mode 100644
index 0000000..f9ee670
--- /dev/null
+++ b/addons/impala-bridge/src/test/resources/impala7.json
@@ -0,0 +1,75 @@
+{
+  "queryText":"insert into table db_5.table_2 (count, id) select count, id from db_5.table_1",
+  "queryId":"3a441d0c130962f8:7f634aec00000000",
+  "hash":"64ff0425ccdfaada53e3f2fd76f566f7",
+  "user":"admin",
+  "timestamp":1554750072,
+  "endTime":1554750554,
+  "edges":[
+    {
+      "sources":[
+        1
+      ],
+      "targets":[
+        0
+      ],
+      "edgeType":"PROJECTION"
+    },
+    {
+      "sources":[
+        3
+      ],
+      "targets":[
+        2
+      ],
+      "edgeType":"PROJECTION"
+    },
+    {
+      "sources":[
+      ],
+      "targets":[
+        6
+      ],
+      "edgeType":"PROJECTION"
+    }
+  ],
+  "vertices":[
+    {
+      "id":0,
+      "vertexType":"COLUMN",
+      "vertexId":"db_5.table_2.count"
+    },
+    {
+      "id":1,
+      "vertexType":"COLUMN",
+      "vertexId":"db_5.table_1.count"
+    },
+    {
+      "id":2,
+      "vertexType":"COLUMN",
+      "vertexId":"db_5.table_2.id"
+    },
+    {
+      "id":3,
+      "vertexType":"COLUMN",
+      "vertexId":"db_5.table_1.id"
+    },
+    {
+      "id":4,
+      "vertexType":"TABLE",
+      "vertexId":"db_5.table_1",
+      "createTime":1554750070
+    },
+    {
+      "id":5,
+      "vertexType":"TABLE",
+      "vertexId":"db_5.table_2",
+      "createTime":1554750072
+    },
+    {
+      "id":6,
+      "vertexType":"COLUMN",
+      "vertexId":"db_5.table_2.int_col"
+    }
+  ]
+}
\ No newline at end of file