You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by sa...@apache.org on 2019/05/09 01:38:51 UTC

[atlas] branch master updated: ATLAS-3183: Read Impala lineage record for creating view and send to Atlas

This is an automated email from the ASF dual-hosted git repository.

sarath pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/master by this push:
     new 3137bb2  ATLAS-3183: Read Impala lineage record for creating view and send to Atlas
3137bb2 is described below

commit 3137bb2151abcc362008d7449f1e9a80d57b17a7
Author: lina.li <li...@cloudera.com>
AuthorDate: Wed May 8 18:38:10 2019 -0700

    ATLAS-3183: Read Impala lineage record for creating view and send to Atlas
    
    Signed-off-by: Sarath Subramanian <ss...@hortonworks.com>
---
 addons/impala-bridge/pom.xml                       | 557 +++++++++++++++++++++
 .../org.apache.atlas.impala/ImpalaLineageTool.java | 217 ++++++++
 .../hook/AtlasImpalaHookContext.java               | 170 +++++++
 .../hook/ImpalaIdentifierParser.java               | 392 +++++++++++++++
 .../hook/ImpalaLineageHook.java                    | 132 +++++
 .../hook/ImpalaOperationParser.java                |  39 ++
 .../hook/events/BaseImpalaEvent.java               | 452 +++++++++++++++++
 .../hook/events/CreateImpalaProcess.java           | 312 ++++++++++++
 .../model/IImpalaLineageHook.java                  |  28 ++
 .../model/ImpalaDataType.java                      |  32 ++
 .../model/ImpalaDependencyType.java                |  34 ++
 .../org.apache.atlas.impala/model/ImpalaNode.java  |  55 ++
 .../model/ImpalaOperationType.java                 |  37 ++
 .../org.apache.atlas.impala/model/ImpalaQuery.java | 110 ++++
 .../model/ImpalaVertexType.java                    |  37 ++
 .../org.apache.atlas.impala/model/LineageEdge.java |  63 +++
 .../model/LineageVertex.java                       |  74 +++
 .../src/main/resources/atlas-log4j.xml             |  42 ++
 .../src/main/resources/import-impala.sh            | 109 ++++
 .../apache/atlas/impala/ImpalaLineageITBase.java   | 249 +++++++++
 .../apache/atlas/impala/ImpalaLineageToolIT.java   |  78 +++
 .../atlas/impala/hook/ImpalaLineageHookIT.java     | 148 ++++++
 .../test/resources/atlas-application.properties    | 124 +++++
 .../src/test/resources/atlas-log4j.xml             | 130 +++++
 .../impala-bridge/src/test/resources/hive-site.xml |  94 ++++
 .../impala-bridge/src/test/resources/impala1.json  |  84 ++++
 .../impala-bridge/src/test/resources/impala2.json  | 306 +++++++++++
 .../impala-bridge/src/test/resources/impala3.json  |  62 +++
 .../test/resources/users-credentials.properties    |   3 +
 addons/models/1000-Hadoop/1090-impala_model.json   | 229 +++++++++
 pom.xml                                            |   1 +
 31 files changed, 4400 insertions(+)

diff --git a/addons/impala-bridge/pom.xml b/addons/impala-bridge/pom.xml
new file mode 100644
index 0000000..54bc83a
--- /dev/null
+++ b/addons/impala-bridge/pom.xml
@@ -0,0 +1,557 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <artifactId>apache-atlas</artifactId>
+    <groupId>org.apache.atlas</groupId>
+    <version>2.0.0-SNAPSHOT</version>
+    <relativePath>../../</relativePath>
+  </parent>
+  <artifactId>impala-bridge</artifactId>
+  <description>Apache Atlas Impala Bridge Module</description>
+  <name>Apache Atlas Impala Bridge</name>
+  <packaging>jar</packaging>
+
+  <properties>
+    <!-- log4j 2.9 and later are multi-release jars for Java 9. Our Jetty version don't support
+    that. Therefore, we have to use log4j 2.8 in integration test -->
+    <log4j.version>2.8</log4j.version>
+  </properties>
+
+  <dependencies>
+    <!-- Logging -->
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.atlas</groupId>
+      <artifactId>atlas-notification</artifactId>
+    </dependency>
+
+    <!-- to bring up atlas server for integration tests -->
+    <dependency>
+      <groupId>org.apache.atlas</groupId>
+      <artifactId>atlas-client-v2</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.atlas</groupId>
+      <artifactId>hdfs-model</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-core</artifactId>
+      <version>${log4j.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-api</artifactId>
+      <version>${log4j.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>com.sun.jersey</groupId>
+      <artifactId>jersey-server</artifactId>
+      <version>${jersey.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.atlas</groupId>
+      <artifactId>hive-bridge</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.atlas</groupId>
+      <artifactId>atlas-webapp</artifactId>
+      <type>war</type>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-client</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>javax.servlet</groupId>
+          <artifactId>servlet-api</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.eclipse.jetty</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-annotations</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+      <version>${jackson.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>commons-io</groupId>
+      <artifactId>commons-io</artifactId>
+      <version>${commons-io.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>commons-cli</groupId>
+      <artifactId>commons-cli</artifactId>
+      <version>${commons-cli.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>commons-lang</groupId>
+      <artifactId>commons-lang</artifactId>
+      <version>${commons-lang.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.testng</groupId>
+      <artifactId>testng</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.eclipse.jetty</groupId>
+      <artifactId>jetty-server</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.atlas</groupId>
+      <artifactId>atlas-graphdb-impls</artifactId>
+      <type>pom</type>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.atlas</groupId>
+      <artifactId>atlas-intg</artifactId>
+      <classifier>tests</classifier>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.atlas</groupId>
+      <artifactId>atlas-repository</artifactId>
+      <classifier>tests</classifier>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>hive-exec</artifactId>
+      <version>${hive.version}</version>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>javax.servlet</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>hive-jdbc</artifactId>
+      <version>${hive.version}</version>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>javax.servlet</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>javax.ws.rs</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.eclipse.jetty</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>hive-cli</artifactId>
+      <version>${hive.version}</version>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>javax.servlet</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.eclipse.jetty.aggregate</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+  </dependencies>
+
+  <profiles>
+    <profile>
+      <id>dist</id>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-dependency-plugin</artifactId>
+            <executions>
+              <execution>
+                <id>copy-hook</id>
+                <phase>package</phase>
+                <goals>
+                  <goal>copy</goal>
+                </goals>
+                <configuration>
+                  <outputDirectory>${project.build.directory}/dependency/hook/impala/atlas-impala-plugin-impl</outputDirectory>
+                  <overWriteReleases>false</overWriteReleases>
+                  <overWriteSnapshots>false</overWriteSnapshots>
+                  <overWriteIfNewer>true</overWriteIfNewer>
+                  <artifactItems>
+                    <artifactItem>
+                      <groupId>${project.groupId}</groupId>
+                      <artifactId>${project.artifactId}</artifactId>
+                      <version>${project.version}</version>
+                    </artifactItem>
+                    <artifactItem>
+                      <groupId>${project.groupId}</groupId>
+                      <artifactId>atlas-client-common</artifactId>
+                      <version>${project.version}</version>
+                    </artifactItem>
+                    <artifactItem>
+                      <groupId>${project.groupId}</groupId>
+                      <artifactId>atlas-client-v1</artifactId>
+                      <version>${project.version}</version>
+                    </artifactItem>
+                    <artifactItem>
+                      <groupId>${project.groupId}</groupId>
+                      <artifactId>atlas-client-v2</artifactId>
+                      <version>${project.version}</version>
+                    </artifactItem>
+                    <artifactItem>
+                      <groupId>${project.groupId}</groupId>
+                      <artifactId>atlas-intg</artifactId>
+                      <version>${project.version}</version>
+                    </artifactItem>
+                    <artifactItem>
+                      <groupId>${project.groupId}</groupId>
+                      <artifactId>atlas-notification</artifactId>
+                      <version>${project.version}</version>
+                    </artifactItem>
+                    <artifactItem>
+                      <groupId>${project.groupId}</groupId>
+                      <artifactId>hdfs-model</artifactId>
+                      <version>${project.version}</version>
+                    </artifactItem>
+                    <artifactItem>
+                      <groupId>${project.groupId}</groupId>
+                      <artifactId>atlas-common</artifactId>
+                      <version>${project.version}</version>
+                    </artifactItem>
+                    <artifactItem>
+                      <groupId>org.apache.kafka</groupId>
+                      <artifactId>kafka_${kafka.scala.binary.version}</artifactId>
+                      <version>${kafka.version}</version>
+                    </artifactItem>
+                    <artifactItem>
+                      <groupId>org.apache.kafka</groupId>
+                      <artifactId>kafka-clients</artifactId>
+                      <version>${kafka.version}</version>
+                    </artifactItem>
+                    <artifactItem>
+                      <groupId>com.sun.jersey.contribs</groupId>
+                      <artifactId>jersey-multipart</artifactId>
+                      <version>${jersey.version}</version>
+                    </artifactItem>
+                    <artifactItem>
+                      <groupId>com.fasterxml.jackson.core</groupId>
+                      <artifactId>jackson-databind</artifactId>
+                      <version>${jackson.version}</version>
+                    </artifactItem>
+                    <artifactItem>
+                      <groupId>com.fasterxml.jackson.core</groupId>
+                      <artifactId>jackson-core</artifactId>
+                      <version>${jackson.version}</version>
+                    </artifactItem>
+                    <artifactItem>
+                      <groupId>com.fasterxml.jackson.core</groupId>
+                      <artifactId>jackson-annotations</artifactId>
+                      <version>${jackson.version}</version>
+                    </artifactItem>
+                    <artifactItem>
+                      <groupId>commons-configuration</groupId>
+                      <artifactId>commons-configuration</artifactId>
+                      <version>${commons-conf.version}</version>
+                    </artifactItem>
+                    <artifactItem>
+                      <groupId>org.apache.hbase</groupId>
+                      <artifactId>hbase-common</artifactId>
+                      <version>${hbase.version}</version>
+                    </artifactItem>
+                    <artifactItem>
+                      <groupId>org.apache.hbase</groupId>
+                      <artifactId>hbase-server</artifactId>
+                      <version>${hbase.version}</version>
+                    </artifactItem>
+                    <artifactItem>
+                      <groupId>com.sun.jersey</groupId>
+                      <artifactId>jersey-json</artifactId>
+                      <version>${jersey.version}</version>
+                    </artifactItem>
+                    <artifactItem>
+                      <groupId>javax.ws.rs</groupId>
+                      <artifactId>jsr311-api</artifactId>
+                      <version>${jsr.version}</version>
+                    </artifactItem>
+                  </artifactItems>
+                </configuration>
+              </execution>
+              <execution>
+                <id>copy-hook-shim</id>
+                <phase>package</phase>
+                <goals>
+                  <goal>copy</goal>
+                </goals>
+                <configuration>
+                  <outputDirectory>${project.build.directory}/dependency/hook/impala</outputDirectory>
+                  <overWriteReleases>false</overWriteReleases>
+                  <overWriteSnapshots>false</overWriteSnapshots>
+                  <overWriteIfNewer>true</overWriteIfNewer>
+                  <artifactItems>
+                    <artifactItem>
+                      <groupId>${project.groupId}</groupId>
+                      <artifactId>impala-bridge-shim</artifactId>
+                      <version>${project.version}</version>
+                    </artifactItem>
+                    <artifactItem>
+                      <groupId>${project.groupId}</groupId>
+                      <artifactId>atlas-plugin-classloader</artifactId>
+                      <version>${project.version}</version>
+                    </artifactItem>
+                  </artifactItems>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+  </profiles>
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.eclipse.jetty</groupId>
+        <artifactId>jetty-maven-plugin</artifactId>
+        <configuration>
+          <skip>${skipTests}</skip>
+          <!--only skip int tests -->
+          <httpConnector>
+            <port>31000</port>
+            <idleTimeout>60000</idleTimeout>
+          </httpConnector>
+          <war>../../webapp/target/atlas-webapp-${project.version}.war</war>
+          <daemon>true</daemon>
+          <webAppSourceDirectory>../../webapp/src/test/webapp</webAppSourceDirectory>
+          <webApp>
+            <contextPath>/</contextPath>
+            <descriptor>${project.basedir}/../../webapp/src/test/webapp/WEB-INF/web.xml</descriptor>
+          </webApp>
+          <useTestScope>true</useTestScope>
+          <systemProperties>
+            <force>true</force>
+            <systemProperty>
+              <name>atlas.home</name>
+              <value>${project.build.directory}</value>
+            </systemProperty>
+            <systemProperty>
+              <key>atlas.conf</key>
+              <value>${project.build.directory}/test-classes</value>
+            </systemProperty>
+            <systemProperty>
+              <name>atlas.data</name>
+              <value>${project.build.directory}/data</value>
+            </systemProperty>
+            <systemProperty>
+              <name>atlas.log.dir</name>
+              <value>${project.build.directory}/logs</value>
+            </systemProperty>
+            <systemProperty>
+              <name>atlas.log.file</name>
+              <value>application.log</value>
+            </systemProperty>
+            <systemProperty>
+              <name>log4j.configuration</name>
+              <value>file:///${project.build.directory}/../../../distro/src/conf/atlas-log4j.xml</value>
+            </systemProperty>
+            <systemProperty>
+              <name>atlas.graphdb.backend</name>
+              <value>${graphdb.backend.impl}</value>
+            </systemProperty>
+            <systemProperty>
+              <key>embedded.solr.directory</key>
+              <value>${project.build.directory}</value>
+            </systemProperty>
+          </systemProperties>
+          <stopKey>atlas-stop</stopKey>
+          <stopPort>31001</stopPort>
+          <stopWait>${jetty-maven-plugin.stopWait}</stopWait>
+          <daemon>${debug.jetty.daemon}</daemon>
+          <testClassesDirectory>${project.build.testOutputDirectory}</testClassesDirectory>
+          <useTestClasspath>true</useTestClasspath>
+        </configuration>
+        <dependencies>
+          <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-core</artifactId>
+            <version>${log4j.version}</version>
+          </dependency>
+          <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-api</artifactId>
+            <version>${log4j.version}</version>
+          </dependency>
+        </dependencies>
+        <executions>
+          <execution>
+            <id>start-jetty</id>
+            <phase>pre-integration-test</phase>
+            <goals>
+              <goal>deploy-war</goal>
+            </goals>
+          </execution>
+          <execution>
+            <id>stop-jetty</id>
+            <phase>post-integration-test</phase>
+            <goals>
+              <goal>stop</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-site-plugin</artifactId>
+        <dependencies>
+          <dependency>
+            <groupId>org.apache.maven.doxia</groupId>
+            <artifactId>doxia-module-twiki</artifactId>
+            <version>${doxia.version}</version>
+          </dependency>
+          <dependency>
+            <groupId>org.apache.maven.doxia</groupId>
+            <artifactId>doxia-core</artifactId>
+            <version>${doxia.version}</version>
+          </dependency>
+        </dependencies>
+        <executions>
+          <execution>
+            <goals>
+              <goal>site</goal>
+            </goals>
+            <phase>prepare-package</phase>
+          </execution>
+        </executions>
+        <configuration>
+          <generateProjectInfo>false</generateProjectInfo>
+          <generateReports>false</generateReports>
+        </configuration>
+      </plugin>
+
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>exec-maven-plugin</artifactId>
+        <version>1.2.1</version>
+        <inherited>false</inherited>
+        <executions>
+        </executions>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-resources-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>copy-resources</id>
+            <phase>validate</phase>
+            <goals>
+              <goal>copy-resources</goal>
+            </goals>
+            <configuration>
+              <outputDirectory>${basedir}/target/models</outputDirectory>
+              <resources>
+                <resource>
+                  <directory>${basedir}/../models</directory>
+                  <includes>
+                    <include>0000-Area0/0010-base_model.json</include>
+                    <include>1000-Hadoop/**</include>
+                  </includes>
+                </resource>
+              </resources>
+            </configuration>
+          </execution>
+          <execution>
+            <id>copy-solr-resources</id>
+            <phase>validate</phase>
+            <goals>
+              <goal>copy-resources</goal>
+            </goals>
+            <configuration>
+              <outputDirectory>${project.build.directory}/solr</outputDirectory>
+              <resources>
+                <resource>
+                  <directory>${basedir}/../../test-tools/src/main/resources/solr</directory>
+                </resource>
+              </resources>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+
+    </plugins>
+  </build>
+</project>
diff --git a/addons/impala-bridge/src/main/java/org.apache.atlas.impala/ImpalaLineageTool.java b/addons/impala-bridge/src/main/java/org.apache.atlas.impala/ImpalaLineageTool.java
new file mode 100644
index 0000000..7c9abc8
--- /dev/null
+++ b/addons/impala-bridge/src/main/java/org.apache.atlas.impala/ImpalaLineageTool.java
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.impala;
+
+import java.lang.Runnable;
+import org.apache.atlas.impala.hook.ImpalaLineageHook;
+
+import java.io.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOCase;
+import org.apache.commons.io.comparator.LastModifiedFileComparator;
+import org.apache.commons.io.filefilter.PrefixFileFilter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Entry point of actual implementation of Impala lineage tool. It reads the lineage records in
+ * lineage log. It then calls instance of ImpalaLineageHook to convert lineage records to
+ * lineage notifications and send them to Atlas.
+ */
+public class ImpalaLineageTool {
+  private static final Logger LOG = LoggerFactory.getLogger(ImpalaLineageTool.class);
+  private static final String WAL_FILE_EXTENSION = ".wal";
+  private static final String WAL_FILE_PREFIX = "WAL";
+  private String directoryName;
+  private String prefix;
+
+  public ImpalaLineageTool(String[] args) {
+    try {
+      Options options = new Options();
+      options.addOption("d", "directory", true, "the lineage files' folder");
+      options.addOption("p", "prefix", true, "the prefix of the lineage files");
+
+      CommandLine cmd = new DefaultParser().parse(options, args);
+      directoryName = cmd.getOptionValue("d");
+      prefix    = cmd.getOptionValue("p");
+    } catch(ParseException e) {
+      LOG.warn("Failed to parse command arguments. Error: ", e.getMessage());
+      printUsage();
+
+      throw new RuntimeException(e);
+    }
+  }
+
+  public void run() {
+    ImpalaLineageHook impalaLineageHook = new ImpalaLineageHook();
+
+    File[] currentFiles = getCurrentFiles();
+    int fileNum = currentFiles.length;
+
+    for(int i = 0; i < fileNum; i++) {
+      String filename = currentFiles[i].getAbsolutePath();
+      String walFilename = directoryName + WAL_FILE_PREFIX + currentFiles[i].getName() + WAL_FILE_EXTENSION;
+
+      LOG.info("Importing: {}", filename);
+      importHImpalaEntities(impalaLineageHook, filename, walFilename);
+
+      if(i != fileNum - 1) {
+        deleteLineageAndWal(currentFiles[i], walFilename);
+      }
+    }
+    LOG.info("Impala bridge processing: Done! ");
+  }
+
+  public static void main(String[] args) {
+    if (args != null && args.length != 4) {
+      // The lineage file location and prefix should be input as the parameters
+      System.out.println("Impala bridge: wrong number of arguments. Please try again");
+      printUsage();
+      return;
+    }
+
+    ImpalaLineageTool instance = new ImpalaLineageTool(args);
+    instance.run();
+  }
+
+  /**
+   * Delete the used lineage file and wal file
+   * @param currentFile The current file
+   * @param wal The wal file
+   */
+  public static void deleteLineageAndWal(File currentFile, String wal) {
+    if(currentFile.exists() && currentFile.delete()) {
+      LOG.info("Lineage file {} is deleted successfully", currentFile.getPath());
+    } else {
+      LOG.info("Failed to delete the lineage file {}", currentFile.getPath());
+    }
+
+    File file = new File(wal);
+
+    if(file.exists() && file.delete()) {
+      LOG.info("Wal file {} deleted successfully", wal);
+    } else {
+      LOG.info("Failed to delete the wal file {}", wal);
+    }
+  }
+
+  private static void printUsage() {
+    System.out.println();
+    System.out.println();
+    System.out.println("Usage: import-impala.sh [-d <directory>] [-p <prefix>]"  );
+    System.out.println("    Imports specified lineage files by given directory and file prefix.");
+    System.out.println();
+  }
+
+  /**
+   * This function figures out the right lineage file path+name to process sorted by the last
+   * time they are modified. (old -> new)
+   * @return get the lineage files from given directory with given prefix.
+   */
+  public File[] getCurrentFiles() {
+    try {
+      LOG.info("Scanning: " + directoryName);
+      File folder = new File(directoryName);
+      File[] listOfFiles = folder.listFiles((FileFilter) new PrefixFileFilter(prefix, IOCase.SENSITIVE));
+
+      if ((listOfFiles == null) || (listOfFiles.length == 0)) {
+        LOG.info("Found no lineage files.");
+        return new File[0];
+      }
+
+      if(listOfFiles.length > 1) {
+        Arrays.sort(listOfFiles, LastModifiedFileComparator.LASTMODIFIED_COMPARATOR);
+      }
+
+      LOG.info("Found {} lineage files" + listOfFiles.length);
+      return listOfFiles;
+    } catch(Exception e) {
+      LOG.error("Import lineage file failed.", e);
+    }
+    return new File[0];
+  }
+
+  private boolean processImpalaLineageHook(ImpalaLineageHook impalaLineageHook, List<String> lineageList) {
+    boolean allSucceed = true;
+
+    // returns true if successfully sent to Atlas
+    for (String lineageRecord : lineageList) {
+      try {
+        impalaLineageHook.process(lineageRecord);
+      } catch (Exception ex) {
+        String errorMessage = String.format("Exception at query {} \n", lineageRecord);
+        LOG.error(errorMessage, ex);
+
+        allSucceed = false;
+      }
+    }
+
+    return allSucceed;
+  }
+
+  /**
+   * Create a list of lineage queries based on the lineage file and the wal file
+   * @param name
+   * @param walfile
+   * @return
+   */
+  public void importHImpalaEntities(ImpalaLineageHook impalaLineageHook, String name, String walfile) {
+    List<String> lineageList = new ArrayList<>();
+
+    try {
+      File lineageFile = new File(name); //use current file length to minus the offset
+      File walFile = new File(walfile);
+      // if the wal file does not exist, create one with 0 byte read, else, read the number
+      if(!walFile.exists()) {
+        BufferedWriter writer = new BufferedWriter(new FileWriter(walfile));
+        writer.write("0, " + name);
+        writer.close();
+      }
+
+      LOG.debug("Reading: " + name);
+      String lineageRecord = FileUtils.readFileToString(lineageFile, "UTF-8");
+
+      lineageList.add(lineageRecord);
+
+      // call instance of ImpalaLineageHook to process the list of Impala lineage record
+      if(processImpalaLineageHook(impalaLineageHook, lineageList)) {
+        // write how many bytes the current file is to the wal file
+        FileWriter newWalFile = new FileWriter(walfile, true);
+        BufferedWriter newWalFileBuf = new BufferedWriter(newWalFile);
+        newWalFileBuf.newLine();
+        newWalFileBuf.write(String.valueOf(lineageFile.length()) + "," + name);
+
+        newWalFileBuf.close();
+        newWalFile.close();
+      } else {
+        LOG.error("Error sending some of impala lineage records to ImpalaHook");
+      }
+    } catch (Exception e) {
+      LOG.error("Error in processing lineage records. Exception: " + e.getMessage());
+    }
+  }
+
+}
\ No newline at end of file
diff --git a/addons/impala-bridge/src/main/java/org.apache.atlas.impala/hook/AtlasImpalaHookContext.java b/addons/impala-bridge/src/main/java/org.apache.atlas.impala/hook/AtlasImpalaHookContext.java
new file mode 100644
index 0000000..88faace
--- /dev/null
+++ b/addons/impala-bridge/src/main/java/org.apache.atlas.impala/hook/AtlasImpalaHookContext.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.impala.hook;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.atlas.impala.model.ImpalaOperationType;
+import org.apache.atlas.impala.model.ImpalaQuery;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.commons.lang.StringUtils;
+
+/**
+ * Contain the info related to an linear record from Impala
+ */
+public class AtlasImpalaHookContext {
+    public static final char   QNAME_SEP_CLUSTER_NAME = '@';
+    public static final char   QNAME_SEP_ENTITY_NAME  = '.';
+    public static final char   QNAME_SEP_PROCESS      = ':';
+
+    private final ImpalaLineageHook        hook;
+    private final ImpalaOperationType      impalaOperation;
+    private final ImpalaQuery              lineageQuery;
+    private final Map<String, AtlasEntity> qNameEntityMap = new HashMap<>();
+
+    public AtlasImpalaHookContext(ImpalaLineageHook hook, ImpalaOperationType operationType,
+        ImpalaQuery lineageQuery) throws Exception {
+        this.hook          = hook;
+        this.impalaOperation = operationType;
+        this.lineageQuery   = lineageQuery;
+
+    }
+
+    public ImpalaQuery getLineageQuery() {
+        return lineageQuery;
+    }
+    public String getQueryStr() { return lineageQuery.getQueryText(); }
+
+    public ImpalaOperationType getImpalaOperationType() {
+        return impalaOperation;
+    }
+
+    public void putEntity(String qualifiedName, AtlasEntity entity) {
+        qNameEntityMap.put(qualifiedName, entity);
+    }
+
+    public AtlasEntity getEntity(String qualifiedName) {
+        return qNameEntityMap.get(qualifiedName);
+    }
+
+    public Collection<AtlasEntity> getEntities() { return qNameEntityMap.values(); }
+
+    public String getClusterName() {
+        return hook.getClusterName();
+    }
+
+    public boolean isConvertHdfsPathToLowerCase() {
+        return hook.isConvertHdfsPathToLowerCase();
+    }
+
+    public String getQualifiedNameForDb(String dbName) {
+        return (dbName + QNAME_SEP_CLUSTER_NAME).toLowerCase() + getClusterName();
+    }
+
+    public String getQualifiedNameForTable(String fullTableName) throws IllegalArgumentException {
+        if (fullTableName == null) {
+            throw new IllegalArgumentException("fullTableName is null");
+        }
+
+        int sepPos = fullTableName.lastIndexOf(QNAME_SEP_ENTITY_NAME);
+
+        if (!isSeparatorIndexValid(sepPos)) {
+            throw new IllegalArgumentException(fullTableName + " does not contain database name");
+        }
+
+        return getQualifiedNameForTable(fullTableName.substring(0, sepPos), fullTableName.substring(sepPos+1));
+    }
+
+    public String getQualifiedNameForTable(String dbName, String tableName) {
+        return (dbName + QNAME_SEP_ENTITY_NAME + tableName + QNAME_SEP_CLUSTER_NAME).toLowerCase() +
+            getClusterName();
+    }
+
+    public String getQualifiedNameForColumn(String fullColumnName) throws IllegalArgumentException {
+        if (fullColumnName == null) {
+            throw new IllegalArgumentException("fullColumnName is null");
+        }
+
+        int sepPosFirst = fullColumnName.indexOf(QNAME_SEP_ENTITY_NAME);
+        int sepPosLast = fullColumnName.lastIndexOf(QNAME_SEP_ENTITY_NAME);
+
+        if (!isSeparatorIndexValid(sepPosFirst) || !isSeparatorIndexValid(sepPosLast) ||
+            sepPosFirst == sepPosLast) {
+            throw new IllegalArgumentException(
+                String.format("fullColumnName {} does not contain database name or table name",
+                    fullColumnName));
+        }
+
+        return getQualifiedNameForColumn(
+            fullColumnName.substring(0, sepPosFirst),
+            fullColumnName.substring(sepPosFirst+1, sepPosLast),
+            fullColumnName.substring(sepPosLast+1));
+    }
+
+    public String getColumnNameOnly(String fullColumnName) throws IllegalArgumentException {
+        if (fullColumnName == null) {
+            throw new IllegalArgumentException("fullColumnName is null");
+        }
+
+        int sepPosLast = fullColumnName.lastIndexOf(QNAME_SEP_ENTITY_NAME);
+
+        if (!isSeparatorIndexValid(sepPosLast)) {
+            return fullColumnName;
+        }
+
+        return fullColumnName.substring(sepPosLast+1);
+    }
+
+    public String getQualifiedNameForColumn(String dbName, String tableName, String columnName) {
+        return
+            (dbName + QNAME_SEP_ENTITY_NAME  + tableName + QNAME_SEP_ENTITY_NAME +
+             columnName + QNAME_SEP_CLUSTER_NAME).toLowerCase() + getClusterName();
+    }
+
+    public String getUserName() { return lineageQuery.getUser(); }
+
+    public String getDatabaseNameFromTable(String fullTableName) {
+        int sepPos = fullTableName.lastIndexOf(QNAME_SEP_ENTITY_NAME);
+        if (isSeparatorIndexValid(sepPos)) {
+            return fullTableName.substring(0, sepPos);
+        }
+
+        return null;
+    }
+
+    public String getTableNameFromColumn(String columnName) {
+        int sepPos = columnName.lastIndexOf(QNAME_SEP_ENTITY_NAME);
+        if (!isSeparatorIndexValid(sepPos)) {
+            return null;
+        }
+
+        String tableName = columnName.substring(0, sepPos);
+        if (!ImpalaIdentifierParser.isTableNameValid(tableName)) {
+            return null;
+        }
+
+        return tableName;
+    }
+
+    public boolean isSeparatorIndexValid(int index) {
+        return index > 0;
+    }
+
+}
diff --git a/addons/impala-bridge/src/main/java/org.apache.atlas.impala/hook/ImpalaIdentifierParser.java b/addons/impala-bridge/src/main/java/org.apache.atlas.impala/hook/ImpalaIdentifierParser.java
new file mode 100644
index 0000000..b9d6cbb
--- /dev/null
+++ b/addons/impala-bridge/src/main/java/org.apache.atlas.impala/hook/ImpalaIdentifierParser.java
@@ -0,0 +1,392 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.impala.hook;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+import org.apache.commons.lang.StringUtils;
+
+/**
+ * Check if a string is a valid Impala table identifier.
+ * It could be <dbName>.<tableName> or <tableName>
+ */
+public class ImpalaIdentifierParser {
+    // http://www.cloudera.com/content/www/en-us/documentation/enterprise/latest/topics/impala_identifiers.html
+    // https://github.com/apache/impala/blob/64e6719870db5602a6fa85014bc6c264080b9414/tests/common/patterns.py
+    // VALID_IMPALA_IDENTIFIER_REGEX = re.compile(r'^[a-zA-Z][a-zA-Z0-9_]{,127}$')
+    // add "." to allow <dbName>.<tableName>
+    public static final String VALID_IMPALA_IDENTIFIER_REGEX = "^[a-zA-Z][a-zA-Z0-9_.]{0,127}$";
+
+    public static boolean isTableNameValid(String inTableName) {
+        if (StringUtils.isEmpty(inTableName)) {
+            return false;
+        }
+
+        if (!inTableName.matches(VALID_IMPALA_IDENTIFIER_REGEX)) {
+            return false;
+        }
+
+        String[] tokens = inTableName.split(".");
+        if (tokens.length > 2) {
+            // valid value should be <dbName>.<tableName> or <tableName>
+            return false;
+        }
+
+        for (String token : tokens) {
+            if (isReserved(token)) {
+                return false;
+            }
+        }
+
+        return true;
+    }
+
+    // The following is extracted from Impala code.
+    // Mainly from https://github.com/apache/impala/blob/master/fe/src/main/jflex/sql-scanner.flex
+    // Map from keyword string to token id.
+    // We use a linked hash map because the insertion order is important.
+    // for example, we want "and" to come after "&&" to make sure error reporting
+    // uses "and" as a display name and not "&&".
+    // Please keep the puts sorted alphabetically by keyword (where the order
+    // does not affect the desired error reporting)
+    static HashSet<String> keywordMap;
+    // map from token id to token description
+    static HashSet<String> tokenIdMap;
+    // Reserved words are words that cannot be used as identifiers. It is a superset of
+    // keywords.
+    static Set<String> reservedWords;
+
+
+    public static void init() {
+        // initilize keywords
+        keywordMap = new HashSet<>();
+        keywordMap.add("&&");
+        keywordMap.add("add");
+        keywordMap.add("aggregate");
+        keywordMap.add("all");
+        keywordMap.add("alter");
+        keywordMap.add("analytic");
+        keywordMap.add("and");
+        keywordMap.add("anti");
+        keywordMap.add("api_version");
+        keywordMap.add("array");
+        keywordMap.add("as");
+        keywordMap.add("asc");
+        keywordMap.add("authorization");
+        keywordMap.add("avro");
+        keywordMap.add("between");
+        keywordMap.add("bigint");
+        keywordMap.add("binary");
+        keywordMap.add("block_size");
+        keywordMap.add("boolean");
+        keywordMap.add("by");
+        keywordMap.add("cached");
+        keywordMap.add("case");
+        keywordMap.add("cascade");
+        keywordMap.add("cast");
+        keywordMap.add("change");
+        keywordMap.add("char");
+        keywordMap.add("class");
+        keywordMap.add("close_fn");
+        keywordMap.add("column");
+        keywordMap.add("columns");
+        keywordMap.add("comment");
+        keywordMap.add("compression");
+        keywordMap.add("compute");
+        keywordMap.add("copy");
+        keywordMap.add("create");
+        keywordMap.add("cross");
+        keywordMap.add("current");
+        keywordMap.add("data");
+        keywordMap.add("database");
+        keywordMap.add("databases");
+        keywordMap.add("date");
+        keywordMap.add("datetime");
+        keywordMap.add("decimal");
+        //keywordMap.add("default"); "default" can be database or table name
+        keywordMap.add("delete");
+        keywordMap.add("delimited");
+        keywordMap.add("desc");
+        keywordMap.add("describe");
+        keywordMap.add("distinct");
+        keywordMap.add("div");
+        keywordMap.add("double");
+        keywordMap.add("drop");
+        keywordMap.add("else");
+        keywordMap.add("encoding");
+        keywordMap.add("end");
+        keywordMap.add("escaped");
+        keywordMap.add("exists");
+        keywordMap.add("explain");
+        keywordMap.add("extended");
+        keywordMap.add("external");
+        keywordMap.add("false");
+        keywordMap.add("fields");
+        keywordMap.add("fileformat");
+        keywordMap.add("files");
+        keywordMap.add("finalize_fn");
+        keywordMap.add("first");
+        keywordMap.add("float");
+        keywordMap.add("following");
+        keywordMap.add("for");
+        keywordMap.add("format");
+        keywordMap.add("formatted");
+        keywordMap.add("from");
+        keywordMap.add("full");
+        keywordMap.add("function");
+        keywordMap.add("functions");
+        keywordMap.add("grant");
+        keywordMap.add("group");
+        keywordMap.add("hash");
+        keywordMap.add("having");
+        keywordMap.add("if");
+        keywordMap.add("ilike");
+        keywordMap.add("ignore");
+        keywordMap.add("in");
+        keywordMap.add("incremental");
+        keywordMap.add("init_fn");
+        keywordMap.add("inner");
+        keywordMap.add("inpath");
+        keywordMap.add("insert");
+        keywordMap.add("int");
+        keywordMap.add("integer");
+        keywordMap.add("intermediate");
+        keywordMap.add("interval");
+        keywordMap.add("into");
+        keywordMap.add("invalidate");
+        keywordMap.add("iregexp");
+        keywordMap.add("is");
+        keywordMap.add("join");
+        keywordMap.add("kudu");
+        keywordMap.add("last");
+        keywordMap.add("left");
+        keywordMap.add("like");
+        keywordMap.add("limit");
+        keywordMap.add("lines");
+        keywordMap.add("load");
+        keywordMap.add("location");
+        keywordMap.add("map");
+        keywordMap.add("merge_fn");
+        keywordMap.add("metadata");
+        keywordMap.add("not");
+        keywordMap.add("null");
+        keywordMap.add("nulls");
+        keywordMap.add("offset");
+        keywordMap.add("on");
+        keywordMap.add("||");
+        keywordMap.add("or");
+        keywordMap.add("orc");
+        keywordMap.add("order");
+        keywordMap.add("outer");
+        keywordMap.add("over");
+        keywordMap.add("overwrite");
+        keywordMap.add("parquet");
+        keywordMap.add("parquetfile");
+        keywordMap.add("partition");
+        keywordMap.add("partitioned");
+        keywordMap.add("partitions");
+        keywordMap.add("preceding");
+        keywordMap.add("prepare_fn");
+        keywordMap.add("primary");
+        keywordMap.add("produced");
+        keywordMap.add("purge");
+        keywordMap.add("range");
+        keywordMap.add("rcfile");
+        keywordMap.add("real");
+        keywordMap.add("recover");
+        keywordMap.add("refresh");
+        keywordMap.add("regexp");
+        keywordMap.add("rename");
+        keywordMap.add("repeatable");
+        keywordMap.add("replace");
+        keywordMap.add("replication");
+        keywordMap.add("restrict");
+        keywordMap.add("returns");
+        keywordMap.add("revoke");
+        keywordMap.add("right");
+        keywordMap.add("rlike");
+        keywordMap.add("role");
+        keywordMap.add("roles");
+        keywordMap.add("row");
+        keywordMap.add("rows");
+        keywordMap.add("schema");
+        keywordMap.add("schemas");
+        keywordMap.add("select");
+        keywordMap.add("semi");
+        keywordMap.add("sequencefile");
+        keywordMap.add("serdeproperties");
+        keywordMap.add("serialize_fn");
+        keywordMap.add("set");
+        keywordMap.add("show");
+        keywordMap.add("smallint");
+        keywordMap.add("sort");
+        keywordMap.add("stats");
+        keywordMap.add("stored");
+        keywordMap.add("straight_join");
+        keywordMap.add("string");
+        keywordMap.add("struct");
+        keywordMap.add("symbol");
+        keywordMap.add("table");
+        keywordMap.add("tables");
+        keywordMap.add("tablesample");
+        keywordMap.add("tblproperties");
+        keywordMap.add("terminated");
+        keywordMap.add("textfile");
+        keywordMap.add("then");
+        keywordMap.add("timestamp");
+        keywordMap.add("tinyint");
+        keywordMap.add("to");
+        keywordMap.add("true");
+        keywordMap.add("truncate");
+        keywordMap.add("unbounded");
+        keywordMap.add("uncached");
+        keywordMap.add("union");
+        keywordMap.add("unknown");
+        keywordMap.add("update");
+        keywordMap.add("update_fn");
+        keywordMap.add("upsert");
+        keywordMap.add("use");
+        keywordMap.add("using");
+        keywordMap.add("values");
+        keywordMap.add("varchar");
+        keywordMap.add("view");
+        keywordMap.add("when");
+        keywordMap.add("where");
+        keywordMap.add("with");
+
+        // Initilize tokenIdMap for error reporting
+        tokenIdMap = new HashSet<>(keywordMap);
+
+        // add non-keyword tokens. Please keep this in the same order as they are used in this
+        // file.
+        tokenIdMap.add("EOF");
+        tokenIdMap.add("...");
+        tokenIdMap.add(":");
+        tokenIdMap.add(";");
+        tokenIdMap.add("COMMA");
+        tokenIdMap.add(".");
+        tokenIdMap.add("*");
+        tokenIdMap.add("(");
+        tokenIdMap.add(")");
+        tokenIdMap.add("[");
+        tokenIdMap.add("]");
+        tokenIdMap.add("/");
+        tokenIdMap.add("%");
+        tokenIdMap.add("+");
+        tokenIdMap.add("-");
+        tokenIdMap.add("&");
+        tokenIdMap.add("|");
+        tokenIdMap.add("^");
+        tokenIdMap.add("~");
+        tokenIdMap.add("=");
+        tokenIdMap.add("!");
+        tokenIdMap.add("<");
+        tokenIdMap.add(">");
+        tokenIdMap.add("UNMATCHED STRING LITERAL");
+        tokenIdMap.add("!=");
+        tokenIdMap.add("INTEGER LITERAL");
+        tokenIdMap.add("NUMERIC OVERFLOW");
+        tokenIdMap.add("DECIMAL LITERAL");
+        tokenIdMap.add("EMPTY IDENTIFIER");
+        tokenIdMap.add("IDENTIFIER");
+        tokenIdMap.add("STRING LITERAL");
+        tokenIdMap.add("COMMENTED_PLAN_HINT_START");
+        tokenIdMap.add("COMMENTED_PLAN_HINT_END");
+        tokenIdMap.add("Unexpected character");
+
+
+        // For impala 3.0, reserved words = keywords + sql16ReservedWords - builtinFunctions
+        // - whitelist
+        // unused reserved words = reserved words - keywords. These words are reserved for
+        // forward compatibility purposes.
+        reservedWords = new HashSet<>(keywordMap);
+        // Add SQL:2016 reserved words
+        reservedWords.addAll(Arrays.asList(new String[] {
+            "abs", "acos", "allocate", "any", "are", "array_agg", "array_max_cardinality",
+            "asensitive", "asin", "asymmetric", "at", "atan", "atomic", "avg", "begin",
+            "begin_frame", "begin_partition", "blob", "both", "call", "called", "cardinality",
+            "cascaded", "ceil", "ceiling", "char_length", "character", "character_length",
+            "check", "classifier", "clob", "close", "coalesce", "collate", "collect",
+            "commit", "condition", "connect", "constraint", "contains", "convert", "copy",
+            "corr", "corresponding", "cos", "cosh", "count", "covar_pop", "covar_samp",
+            "cube", "cume_dist", "current_catalog", "current_date",
+            "current_default_transform_group", "current_path", "current_path", "current_role",
+            "current_role", "current_row", "current_schema", "current_time",
+            "current_timestamp", "current_transform_group_for_type", "current_user", "cursor",
+            "cycle", "day", "deallocate", "dec", "decfloat", "declare", "define",
+            "dense_rank", "deref", "deterministic", "disconnect", "dynamic", "each",
+            "element", "empty", "end-exec", "end_frame", "end_partition", "equals", "escape",
+            "every", "except", "exec", "execute", "exp", "extract", "fetch", "filter",
+            "first_value", "floor", "foreign", "frame_row", "free", "fusion", "get", "global",
+            "grouping", "groups", "hold", "hour", "identity", "indicator", "initial", "inout",
+            "insensitive", "integer", "intersect", "intersection", "json_array",
+            "json_arrayagg", "json_exists", "json_object", "json_objectagg", "json_query",
+            "json_table", "json_table_primitive", "json_value", "lag", "language", "large",
+            "last_value", "lateral", "lead", "leading", "like_regex", "listagg", "ln",
+            "local", "localtime", "localtimestamp", "log", "log10 ", "lower", "match",
+            "match_number", "match_recognize", "matches", "max", "member", "merge", "method",
+            "min", "minute", "mod", "modifies", "module", "month", "multiset", "national",
+            "natural", "nchar", "nclob", "new", "no", "none", "normalize", "nth_value",
+            "ntile", "nullif", "numeric", "occurrences_regex", "octet_length", "of", "old",
+            "omit", "one", "only", "open", "out", "overlaps", "overlay", "parameter",
+            "pattern", "per", "percent", "percent_rank", "percentile_cont", "percentile_disc",
+            "period", "portion", "position", "position_regex", "power", "precedes",
+            "precision", "prepare", "procedure", "ptf", "rank", "reads", "real", "recursive",
+            "ref", "references", "referencing", "regr_avgx", "regr_avgy", "regr_count",
+            "regr_intercept", "regr_r2", "regr_slope", "regr_sxx", "regr_sxy", "regr_syy",
+            "release", "result", "return", "rollback", "rollup", "row_number", "running",
+            "savepoint", "scope", "scroll", "search", "second", "seek", "sensitive",
+            "session_user", "similar", "sin", "sinh", "skip", "some", "specific",
+            "specifictype", "sql", "sqlexception", "sqlstate", "sqlwarning", "sqrt", "start",
+            "static", "stddev_pop", "stddev_samp", "submultiset", "subset", "substring",
+            "substring_regex", "succeeds", "sum", "symmetric", "system", "system_time",
+            "system_user", "tan", "tanh", "time", "timezone_hour", "timezone_minute",
+            "trailing", "translate", "translate_regex", "translation", "treat", "trigger",
+            "trim", "trim_array", "uescape", "unique", "unknown", "unnest", "update  ",
+            "upper", "user", "value", "value_of", "var_pop", "var_samp", "varbinary",
+            "varying", "versioning", "whenever", "width_bucket", "window", "within",
+            "without", "year"}));
+        // TODO: Remove impala builtin function names. Need to find content of
+        // BuiltinsDb.getInstance().getAllFunctions()
+        //reservedWords.removeAll(BuiltinsDb.getInstance().getAllFunctions().keySet());
+
+        // Remove whitelist words. These words might be heavily used in production, and
+        // impala is unlikely to implement SQL features around these words in the near future.
+        reservedWords.removeAll(Arrays.asList(new String[] {
+            // time units
+            "year", "month", "day", "hour", "minute", "second",
+            "begin", "call", "check", "classifier", "close", "identity", "language",
+            "localtime", "member", "module", "new", "nullif", "old", "open", "parameter",
+            "period", "result", "return", "sql", "start", "system", "time", "user", "value"
+        }));
+    }
+
+    static {
+        init();
+    }
+
+    static boolean isReserved(String token) {
+        return token != null && reservedWords.contains(token.toLowerCase());
+    }
+}
diff --git a/addons/impala-bridge/src/main/java/org.apache.atlas.impala/hook/ImpalaLineageHook.java b/addons/impala-bridge/src/main/java/org.apache.atlas.impala/hook/ImpalaLineageHook.java
new file mode 100644
index 0000000..c77bb38
--- /dev/null
+++ b/addons/impala-bridge/src/main/java/org.apache.atlas.impala/hook/ImpalaLineageHook.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.impala.hook;
+
+import static org.apache.atlas.AtlasConstants.DEFAULT_CLUSTER_NAME;
+
+import com.google.common.collect.Sets;
+import java.io.IOException;
+import org.apache.atlas.hook.AtlasHook;
+import org.apache.atlas.impala.hook.events.BaseImpalaEvent;
+import org.apache.atlas.impala.hook.events.CreateImpalaProcess;
+import org.apache.atlas.impala.model.IImpalaLineageHook;
+import org.apache.atlas.impala.model.ImpalaOperationType;
+import org.apache.atlas.impala.model.ImpalaQuery;
+import org.apache.atlas.type.AtlasType;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import javax.security.auth.Subject;
+import javax.security.auth.kerberos.KerberosPrincipal;
+import java.util.HashSet;
+
+public class ImpalaLineageHook extends AtlasHook implements IImpalaLineageHook {
+    private static final Logger LOG = LoggerFactory.getLogger(ImpalaLineageHook.class);
+    public static final String ATLAS_ENDPOINT                      = "atlas.rest.address";
+    public static final String REALM_SEPARATOR                     = "@";
+    public static final String CONF_PREFIX                         = "atlas.hook.impala.";
+    public static final String CONF_CLUSTER_NAME                   = "atlas.cluster.name";
+    public static final String CONF_REALM_NAME                     = "atlas.realm.name";
+    public static final String HDFS_PATH_CONVERT_TO_LOWER_CASE     = CONF_PREFIX + "hdfs_path.convert_to_lowercase";
+
+    private ImpalaOperationParser parser = new ImpalaOperationParser();
+    private static final String clusterName;
+    private  static final String realm;
+    private static final boolean convertHdfsPathToLowerCase;
+
+    static {
+        clusterName                     = atlasProperties.getString(CONF_CLUSTER_NAME, DEFAULT_CLUSTER_NAME);
+        realm                           = atlasProperties.getString(CONF_REALM_NAME, DEFAULT_CLUSTER_NAME);  // what should default be ??
+        convertHdfsPathToLowerCase      = atlasProperties.getBoolean(HDFS_PATH_CONVERT_TO_LOWER_CASE, false);
+    }
+
+    public ImpalaLineageHook() {
+
+    }
+
+    public void process(String impalaQueryString) throws Exception {
+        ImpalaQuery lineageQuery = AtlasType.fromJson(impalaQueryString, ImpalaQuery.class);
+        process(lineageQuery);
+    }
+
+    public void process(ImpalaQuery lineageQuery) throws Exception {
+        if (StringUtils.isEmpty(lineageQuery.getQueryText())) {
+            LOG.warn("==> ImpalaLineageHook.process skips because the query text is empty <==");
+            return;
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> ImpalaLineageHook.process({})", lineageQuery.getQueryText());
+        }
+
+        try {
+            ImpalaOperationType operationType = parser.getImpalaOperationType(lineageQuery.getQueryText());
+            AtlasImpalaHookContext context =
+                new AtlasImpalaHookContext(this, operationType, lineageQuery);
+            BaseImpalaEvent event = null;
+
+            switch (operationType) {
+                    case CREATEVIEW:
+                        event = new CreateImpalaProcess(context);
+                        break;
+                default:
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("HiveHook.run({}): operation ignored", lineageQuery.getQueryText());
+                    }
+                    break;
+            }
+
+            if (event != null) {
+                LOG.debug("Processing event: " + lineageQuery.getQueryText());
+
+                final UserGroupInformation ugi = getUgiFromUserName(lineageQuery.getUser());
+
+                super.notifyEntities(event.getNotificationMessages(), ugi);
+            }
+        } catch (Throwable t) {
+
+            LOG.error("ImpalaLineageHook.process(): failed to process query {}",
+                lineageQuery.getQueryText(), t);
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== ImpalaLineageHook.process({})", lineageQuery.getQueryText());
+        }
+    }
+
+    private UserGroupInformation getUgiFromUserName(String userName)  throws IOException {
+        String userPrincipal = userName.contains(REALM_SEPARATOR)? userName : userName + "@" + getRealm();
+        Subject userSubject = new Subject(false, Sets.newHashSet(
+            new KerberosPrincipal(userPrincipal)), new HashSet<Object>(),new HashSet<Object>());
+        return UserGroupInformation.getUGIFromSubject(userSubject);
+    }
+
+    public String getClusterName() {
+        return clusterName;
+    }
+
+    public String getRealm() {
+        return realm;
+    }
+
+    public boolean isConvertHdfsPathToLowerCase() {
+        return convertHdfsPathToLowerCase;
+    }
+}
\ No newline at end of file
diff --git a/addons/impala-bridge/src/main/java/org.apache.atlas.impala/hook/ImpalaOperationParser.java b/addons/impala-bridge/src/main/java/org.apache.atlas.impala/hook/ImpalaOperationParser.java
new file mode 100644
index 0000000..6cb726c
--- /dev/null
+++ b/addons/impala-bridge/src/main/java/org.apache.atlas.impala/hook/ImpalaOperationParser.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.impala.hook;
+
+import org.apache.atlas.impala.model.ImpalaOperationType;
+
+/**
+ * Parse an Impala query text and output the impala operation type
+ */
+public class ImpalaOperationParser {
+
+    public ImpalaOperationParser() {
+    }
+
+    public ImpalaOperationType getImpalaOperationType(String queryText) {
+        // TODO: more Impala commands will be handled in ATLAS-3184
+        if (queryText.toLowerCase().startsWith("create view")) {
+            return ImpalaOperationType.CREATEVIEW;
+        }
+
+        return ImpalaOperationType.UNKNOWN;
+    }
+}
\ No newline at end of file
diff --git a/addons/impala-bridge/src/main/java/org.apache.atlas.impala/hook/events/BaseImpalaEvent.java b/addons/impala-bridge/src/main/java/org.apache.atlas.impala/hook/events/BaseImpalaEvent.java
new file mode 100644
index 0000000..0487739
--- /dev/null
+++ b/addons/impala-bridge/src/main/java/org.apache.atlas.impala/hook/events/BaseImpalaEvent.java
@@ -0,0 +1,452 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.impala.hook.events;
+
+import static org.apache.atlas.impala.hook.AtlasImpalaHookContext.QNAME_SEP_PROCESS;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.atlas.impala.hook.AtlasImpalaHookContext;
+import org.apache.atlas.impala.model.ImpalaDataType;
+import org.apache.atlas.impala.model.ImpalaNode;
+import org.apache.atlas.impala.model.ImpalaOperationType;
+import org.apache.atlas.impala.model.ImpalaVertexType;
+import org.apache.atlas.impala.model.LineageVertex;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityExtInfo;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.model.notification.HookNotification;
+import org.apache.commons.collections.CollectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The base class for generating notification event to Atlas server
+ * Most code is copied from BaseHiveEvent to avoid depending on org.apache.atlas.hive.hook
+ */
+public abstract class BaseImpalaEvent {
+    private static final Logger LOG = LoggerFactory.getLogger(BaseImpalaEvent.class);
+
+    // Impala should re-use the same entity type as hive. So Hive and Impala can operate on same
+    // database or table
+    public static final String HIVE_TYPE_DB                        = "hive_db";
+    public static final String HIVE_TYPE_TABLE                     = "hive_table";
+    public static final String HIVE_TYPE_COLUMN                    = "hive_column";
+
+    public static final String ATTRIBUTE_QUALIFIED_NAME            = "qualifiedName";
+    public static final String ATTRIBUTE_NAME                      = "name";
+    public static final String ATTRIBUTE_OWNER                     = "owner";
+    public static final String ATTRIBUTE_CLUSTER_NAME              = "clusterName";
+    public static final String ATTRIBUTE_CREATE_TIME               = "createTime";
+    public static final String ATTRIBUTE_LAST_ACCESS_TIME          = "lastAccessTime";
+    public static final String ATTRIBUTE_DB                        = "db";
+    public static final String ATTRIBUTE_COLUMNS                   = "columns";
+    public static final String ATTRIBUTE_TABLE                     = "table";
+    public static final String ATTRIBUTE_INPUTS                    = "inputs";
+    public static final String ATTRIBUTE_OUTPUTS                   = "outputs";
+    public static final String ATTRIBUTE_OPERATION_TYPE            = "operationType";
+    public static final String ATTRIBUTE_START_TIME                = "startTime";
+    public static final String ATTRIBUTE_USER_NAME                 = "userName";
+    public static final String ATTRIBUTE_QUERY_TEXT                = "queryText";
+    public static final String ATTRIBUTE_QUERY_ID                  = "queryId";
+    public static final String ATTRIBUTE_QUERY_PLAN                = "queryPlan";
+    public static final String ATTRIBUTE_END_TIME                  = "endTime";
+    public static final String ATTRIBUTE_RECENT_QUERIES            = "recentQueries";
+    public static final String ATTRIBUTE_QUERY                     = "query";
+    public static final String ATTRIBUTE_DEPENDENCY_TYPE           = "dependencyType";
+    public static final long   MILLIS_CONVERT_FACTOR               = 1000;
+
+    public static final Map<Integer, String> OWNER_TYPE_TO_ENUM_VALUE = new HashMap<>();
+
+    static {
+        OWNER_TYPE_TO_ENUM_VALUE.put(1, "USER");
+        OWNER_TYPE_TO_ENUM_VALUE.put(2, "ROLE");
+        OWNER_TYPE_TO_ENUM_VALUE.put(3, "GROUP");
+    }
+
+    protected final AtlasImpalaHookContext context;
+    protected final Map<String, ImpalaNode> vertexNameMap;
+    protected final Map<Long, LineageVertex> verticesMap;
+
+    public BaseImpalaEvent(AtlasImpalaHookContext context) {
+
+        this.context   = context;
+        vertexNameMap  = new HashMap<>();
+        verticesMap    = new HashMap<>();
+    }
+
+    public AtlasImpalaHookContext getContext() {
+        return context;
+    }
+
+    public abstract List<HookNotification> getNotificationMessages() throws Exception;
+
+    public String getUserName() { return context.getUserName(); }
+
+    public String getTableNameFromColumn(String columnName) {
+        return context.getTableNameFromColumn(columnName);
+    }
+
+    public String getQualifiedName(ImpalaNode node) throws IllegalArgumentException {
+
+        return getQualifiedName(node.getOwnVertex());
+    }
+
+    public String getQualifiedName(LineageVertex node) throws IllegalArgumentException {
+        if (node == null) {
+            throw new IllegalArgumentException("node is null");
+        }
+
+        ImpalaVertexType nodeType = node.getVertexType();
+
+        if (nodeType == null) {
+            if (node.getVertexId() != null) {
+                LOG.warn("null qualified name for type: null and name: {}", node.getVertexId());
+            }
+            return null;
+        }
+
+        if (node.getVertexId() == null) {
+            LOG.warn("null qualified name for type: {} and name: null", nodeType);
+            return null;
+        }
+
+        switch (nodeType) {
+            case DATABASE:
+                return context.getQualifiedNameForDb(node.getVertexId());
+
+            case TABLE:
+                return context.getQualifiedNameForTable(node.getVertexId());
+
+            case COLUMN:
+                return context.getQualifiedNameForColumn(node.getVertexId());
+
+            default:
+                LOG.warn("null qualified name for type: {} and name: {}", nodeType, node.getVertexId());
+                return null;
+        }
+    }
+
+    static final class AtlasEntityComparator implements Comparator<AtlasEntity> {
+        @Override
+        public int compare(AtlasEntity entity1, AtlasEntity entity2) {
+            String name1 = (String)entity1.getAttribute(ATTRIBUTE_QUALIFIED_NAME);
+            String name2 = (String)entity2.getAttribute(ATTRIBUTE_QUALIFIED_NAME);
+
+            if (name1 == null) {
+                return -1;
+            }
+
+            if (name2 == null) {
+                return 1;
+            }
+
+            return name1.toLowerCase().compareTo(name2.toLowerCase());
+        }
+    }
+
+    static final Comparator<AtlasEntity> entityComparator = new AtlasEntityComparator();
+
+    protected String getQualifiedName(List<AtlasEntity> inputs, List<AtlasEntity> outputs) throws Exception {
+        ImpalaOperationType operation = context.getImpalaOperationType();
+
+        // TODO: add more operation type here
+        if (operation == ImpalaOperationType.CREATEVIEW) {
+            List<? extends AtlasEntity> sortedEntities = new ArrayList<>(outputs);
+
+            Collections.sort(sortedEntities, entityComparator);
+
+            for (AtlasEntity entity : sortedEntities) {
+                if (entity.getTypeName().equalsIgnoreCase(HIVE_TYPE_TABLE)) {
+                    Long createTime = (Long)entity.getAttribute(ATTRIBUTE_CREATE_TIME);
+
+                    return (String)entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME) + QNAME_SEP_PROCESS + createTime;
+                }
+            }
+        }
+
+        // TODO: add code for name construction for HDFS path
+        return null;
+    }
+
+    protected AtlasEntity getInputOutputEntity(ImpalaNode node, AtlasEntityExtInfo entityExtInfo) throws Exception {
+        AtlasEntity ret = null;
+
+        switch(node.getNodeType()) {
+            case TABLE:
+            case PARTITION:
+            case DFS_DIR: {
+                ret = toAtlasEntity(node, entityExtInfo);
+            }
+            break;
+        }
+
+        return ret;
+    }
+
+    protected AtlasEntity toAtlasEntity(ImpalaNode node, AtlasEntityExtInfo entityExtInfo) throws Exception {
+        AtlasEntity ret = null;
+
+        switch (node.getNodeType()) {
+            case DATABASE:
+                ret = toDbEntity(node);
+                break;
+
+            case TABLE:
+            case PARTITION:
+                ret = toTableEntity(node, entityExtInfo);
+                break;
+
+            default:
+                break;
+        }
+
+        return ret;
+    }
+
+    protected AtlasEntity toDbEntity(ImpalaNode db) throws Exception {
+        return toDbEntity(db.getNodeName());
+    }
+
+    protected AtlasEntity toDbEntity(String dbName) throws Exception {
+        String dbQualifiedName = context.getQualifiedNameForDb(dbName);
+        AtlasEntity ret = context.getEntity(dbQualifiedName);
+
+        if (ret == null) {
+            ret = new AtlasEntity(HIVE_TYPE_DB);
+
+            // Impala hook should not send metadata entities. set 'guid' to null - which will:
+            //  - result in this entity to be not included in 'referredEntities'
+            //  - cause Atlas server to resolve the entity by its qualifiedName
+            ret.setGuid(null);
+
+            ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, dbQualifiedName);
+            ret.setAttribute(ATTRIBUTE_NAME, dbName.toLowerCase());
+            ret.setAttribute(ATTRIBUTE_CLUSTER_NAME, context.getClusterName());
+
+            context.putEntity(dbQualifiedName, ret);
+        }
+
+        return ret;
+    }
+
+    protected AtlasEntityWithExtInfo toTableEntity(ImpalaNode table) throws Exception {
+        AtlasEntityWithExtInfo ret = new AtlasEntityWithExtInfo();
+
+        AtlasEntity entity = toTableEntity(table, ret);
+
+        if (entity != null) {
+            ret.setEntity(entity);
+        } else {
+            ret = null;
+        }
+
+        return ret;
+    }
+
+    protected AtlasEntity toTableEntity(ImpalaNode table, AtlasEntitiesWithExtInfo entities) throws Exception {
+        AtlasEntity ret = toTableEntity(table, (AtlasEntityExtInfo) entities);
+
+        if (ret != null) {
+            entities.addEntity(ret);
+        }
+
+        return ret;
+    }
+
+    protected AtlasEntity toTableEntity(ImpalaNode table, AtlasEntityExtInfo entityExtInfo) throws Exception {
+        if ((table == null) || (table.getNodeName() == null)) {
+            throw new IllegalArgumentException("table is null or its name is null");
+        }
+
+        String dbName = context.getDatabaseNameFromTable(table.getNodeName());
+        if (dbName == null) {
+            throw new IllegalArgumentException(String.format("db name is null for table: {}", table.getNodeName()));
+        }
+
+        AtlasEntity dbEntity = toDbEntity(dbName);
+
+        if (entityExtInfo != null) {
+            if (dbEntity != null) {
+                entityExtInfo.addReferredEntity(dbEntity);
+            }
+        }
+
+        AtlasEntity ret = toTableEntity(getObjectId(dbEntity), table, entityExtInfo);
+
+        return ret;
+    }
+
+    protected AtlasEntity toTableEntity(AtlasObjectId dbId, ImpalaNode table, AtlasEntityExtInfo entityExtInfo) throws Exception {
+        String  tblQualifiedName = getQualifiedName(table);
+        AtlasEntity ret = context.getEntity(tblQualifiedName);
+
+        if (ret != null) {
+            return ret;
+        }
+
+        // a table created in Impala still uses HIVE_TYPE_TABLE to allow both Impala and Hive operate
+        // on the same table
+        ret = new AtlasEntity(HIVE_TYPE_TABLE);
+
+        // Impala hook should not send meta data entity to Atlas. set 'guid' to null - which will:
+        //  - result in this entity to be not included in 'referredEntities'
+        //  - cause Atlas server to resolve the entity by its qualifiedName
+        // TODO: enable this once HMS hook is in. Disable this before that.
+        ret.setGuid(null);
+
+        long createTime     = getTableCreateTime(table);
+        long lastAccessTime = createTime;
+
+        ret.setAttribute(ATTRIBUTE_DB, dbId);
+        ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, tblQualifiedName);
+        ret.setAttribute(ATTRIBUTE_NAME, table.getNodeName().toLowerCase());
+
+        // just fake it. It should not be sent to Atlas once HMS hook is in
+        ret.setAttribute(ATTRIBUTE_OWNER, getUserName());
+
+        ret.setAttribute(ATTRIBUTE_CREATE_TIME, createTime);
+        ret.setAttribute(ATTRIBUTE_LAST_ACCESS_TIME, lastAccessTime);
+
+        AtlasObjectId     tableId       = getObjectId(ret);
+        List<AtlasEntity> columns       = getColumnEntities(tableId, table);
+
+        if (entityExtInfo != null) {
+            if (columns != null) {
+                for (AtlasEntity column : columns) {
+                    entityExtInfo.addReferredEntity(column);
+                }
+            }
+        }
+
+        ret.setAttribute(ATTRIBUTE_COLUMNS, getObjectIds(columns));
+
+
+        context.putEntity(tblQualifiedName, ret);
+
+        return ret;
+    }
+
+    public static AtlasObjectId getObjectId(AtlasEntity entity) {
+        String        qualifiedName = (String) entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME);
+        AtlasObjectId ret           = new AtlasObjectId(entity.getGuid(), entity.getTypeName(), Collections
+            .singletonMap(ATTRIBUTE_QUALIFIED_NAME, qualifiedName));
+
+        return ret;
+    }
+
+    public static List<AtlasObjectId> getObjectIds(List<AtlasEntity> entities) {
+        final List<AtlasObjectId> ret;
+
+        if (CollectionUtils.isNotEmpty(entities)) {
+            ret = new ArrayList<>(entities.size());
+
+            for (AtlasEntity entity : entities) {
+                ret.add(getObjectId(entity));
+            }
+        } else {
+            ret = Collections.emptyList();
+        }
+
+        return ret;
+    }
+
+    public static long getTableCreateTime(ImpalaNode table) {
+        return getTableCreateTime(table.getOwnVertex());
+    }
+
+    public static long getTableCreateTime(LineageVertex tableVertex) {
+        Long createTime = tableVertex.getCreateTime();
+        if (createTime != null) {
+            return createTime.longValue() * MILLIS_CONVERT_FACTOR;
+        } else {
+            return System.currentTimeMillis();
+        }
+    }
+
+    protected List<AtlasEntity> getColumnEntities(AtlasObjectId tableId, ImpalaNode table) {
+        List<AtlasEntity> ret          = new ArrayList<>();
+
+        for (ImpalaNode childNode : table.getChildren().values()) {
+            String      colQualifiedName = getQualifiedName(childNode);
+            AtlasEntity column           = context.getEntity(colQualifiedName);
+
+            if (column == null) {
+                column = new AtlasEntity(HIVE_TYPE_COLUMN);
+
+                // if column's table was sent in an earlier notification, set 'guid' to null - which will:
+                //  - result in this entity to be not included in 'referredEntities'
+                //  - cause Atlas server to resolve the entity by its qualifiedName
+                // TODO: enable this once HMS hook is in. Disable this before that.
+                column.setGuid(null);
+
+                column.setAttribute(ATTRIBUTE_TABLE, tableId);
+                column.setAttribute(ATTRIBUTE_QUALIFIED_NAME, colQualifiedName);
+                column.setAttribute(ATTRIBUTE_NAME, context.getColumnNameOnly(childNode.getNodeName()));
+
+                // just fake it. It should not be sent to Atlas once HMS hook is in
+                column.setAttribute(ATTRIBUTE_OWNER, getUserName());
+
+                context.putEntity(colQualifiedName, column);
+            }
+
+            ret.add(column);
+        }
+
+        return ret;
+    }
+
+    protected AtlasEntity getImpalaProcessEntity(List<AtlasEntity> inputs, List<AtlasEntity> outputs) throws Exception {
+        AtlasEntity ret         = new AtlasEntity(ImpalaDataType.IMPALA_PROCESS.getName());
+        String      queryStr    = context.getQueryStr();
+
+        if (queryStr != null) {
+            queryStr = queryStr.toLowerCase().trim();
+        }
+
+        ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, getQualifiedName(inputs, outputs));
+        ret.setAttribute(ATTRIBUTE_INPUTS, getObjectIds(inputs));
+        ret.setAttribute(ATTRIBUTE_OUTPUTS,  getObjectIds(outputs));
+        ret.setAttribute(ATTRIBUTE_NAME, queryStr);
+        ret.setAttribute(ATTRIBUTE_OPERATION_TYPE, context.getImpalaOperationType());
+        ret.setAttribute(ATTRIBUTE_START_TIME, context.getLineageQuery().getTimestamp());
+        ret.setAttribute(ATTRIBUTE_END_TIME, System.currentTimeMillis());
+        ret.setAttribute(ATTRIBUTE_USER_NAME, getUserName());
+        ret.setAttribute(ATTRIBUTE_QUERY_TEXT, queryStr);
+        ret.setAttribute(ATTRIBUTE_QUERY_ID, context.getLineageQuery().getQueryId());
+        ret.setAttribute(ATTRIBUTE_QUERY_PLAN, "Not Supported");
+        ret.setAttribute(ATTRIBUTE_RECENT_QUERIES, Collections.singletonList(queryStr));
+
+        return ret;
+    }
+
+    protected void addProcessedEntities(AtlasEntitiesWithExtInfo entitiesWithExtInfo) {
+        for (AtlasEntity entity : context.getEntities()) {
+            entitiesWithExtInfo.addReferredEntity(entity);
+        }
+
+        entitiesWithExtInfo.compact();
+    }
+}
diff --git a/addons/impala-bridge/src/main/java/org.apache.atlas.impala/hook/events/CreateImpalaProcess.java b/addons/impala-bridge/src/main/java/org.apache.atlas.impala/hook/events/CreateImpalaProcess.java
new file mode 100644
index 0000000..e4a38a3
--- /dev/null
+++ b/addons/impala-bridge/src/main/java/org.apache.atlas.impala/hook/events/CreateImpalaProcess.java
@@ -0,0 +1,312 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.impala.hook.events;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.atlas.impala.hook.AtlasImpalaHookContext;
+import org.apache.atlas.impala.model.ImpalaDataType;
+import org.apache.atlas.impala.model.ImpalaDependencyType;
+import org.apache.atlas.impala.model.ImpalaNode;
+import org.apache.atlas.impala.model.ImpalaVertexType;
+import org.apache.atlas.impala.model.LineageEdge;
+import org.apache.atlas.impala.model.ImpalaQuery;
+import org.apache.atlas.impala.model.LineageVertex;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
+import org.apache.atlas.model.notification.HookNotification;
+import org.apache.atlas.model.notification.HookNotification.EntityCreateRequestV2;
+import org.apache.commons.collections.CollectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CreateImpalaProcess extends BaseImpalaEvent {
+    private static final Logger LOG = LoggerFactory.getLogger(CreateImpalaProcess.class);
+
+    public CreateImpalaProcess(AtlasImpalaHookContext context) {
+        super(context);
+    }
+
+    public List<HookNotification> getNotificationMessages() throws Exception {
+        List<HookNotification>   ret      = null;
+        AtlasEntitiesWithExtInfo entities = getEntities();
+
+        if (entities != null && CollectionUtils.isNotEmpty(entities.getEntities())) {
+            ret = Collections.singletonList(new EntityCreateRequestV2(getUserName(), entities));
+        }
+
+        return ret;
+    }
+
+    public AtlasEntitiesWithExtInfo getEntities() throws Exception {
+        AtlasEntitiesWithExtInfo ret     = null;
+        List<ImpalaNode> inputNodes      = new ArrayList<>();
+        List<ImpalaNode> outputNodes     = new ArrayList<>();
+        List<AtlasEntity> inputs         = new ArrayList<>();
+        List<AtlasEntity> outputs        = new ArrayList<>();
+        Set<String> processedNames       = new HashSet<>();
+
+        getInputOutList(context.getLineageQuery(), inputNodes, outputNodes);
+
+        if (skipProcess(inputNodes, outputNodes)) {
+            return ret;
+        }
+
+        ret = new AtlasEntitiesWithExtInfo();
+
+        if (!inputNodes.isEmpty()) {
+            for (ImpalaNode input : inputNodes) {
+                String qualifiedName = getQualifiedName(input);
+
+                if (qualifiedName == null || !processedNames.add(qualifiedName)) {
+                    continue;
+                }
+
+                AtlasEntity entity = getInputOutputEntity(input, ret);
+
+                if (entity != null) {
+                    inputs.add(entity);
+                }
+            }
+        }
+
+        if (outputNodes != null) {
+            for (ImpalaNode output : outputNodes) {
+                String qualifiedName = getQualifiedName(output);
+
+                if (qualifiedName == null || !processedNames.add(qualifiedName)) {
+                    continue;
+                }
+
+                AtlasEntity entity = getInputOutputEntity(output, ret);
+
+                if (entity != null) {
+                    outputs.add(entity);
+                }
+            }
+        }
+
+        if (!inputs.isEmpty() || !outputs.isEmpty()) {
+            AtlasEntity process = getImpalaProcessEntity(inputs, outputs);
+            if (process!= null && LOG.isDebugEnabled()) {
+                LOG.debug("get process entity with qualifiedName: {}", process.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
+            }
+
+            ret.addEntity(process);
+
+            processColumnLineage(process, ret);
+
+            addProcessedEntities(ret);
+        } else {
+            ret = null;
+        }
+
+
+        return ret;
+    }
+
+    private void processColumnLineage(AtlasEntity impalaProcess, AtlasEntitiesWithExtInfo entities) {
+        List<LineageEdge> edges = context.getLineageQuery().getEdges();
+
+        if (CollectionUtils.isEmpty(edges)) {
+            return;
+        }
+
+        final List<AtlasEntity> columnLineages      = new ArrayList<>();
+        final Set<String>       processedOutputCols = new HashSet<>();
+
+        for (LineageEdge edge : edges) {
+
+            if (!edge.getEdgeType().equals(ImpalaDependencyType.PROJECTION)) {
+                // Impala dependency type can only be predicate or projection.
+                // Impala predicate dependency: This is a dependency between a set of target
+                // columns (or exprs) and a set of source columns (base table columns). It
+                // indicates that the source columns restrict the values of their targets (e.g.
+                // by participating in WHERE clause predicates). It should not be part of lineage
+                continue;
+            }
+
+            List<AtlasEntity> outputColumns = new ArrayList<>();
+            for (Long targetId : edge.getTargets()) {
+                LineageVertex columnVertex = verticesMap.get(targetId);
+                String outputColName = getQualifiedName(columnVertex);
+                AtlasEntity outputColumn = context.getEntity(outputColName);
+
+                LOG.debug("processColumnLineage(): target id = {}, target column name = {}",
+                        targetId, outputColName);
+
+                if (outputColumn == null) {
+                    LOG.warn("column-lineage: non-existing output-column {}", outputColName);
+                    continue;
+                }
+
+                if (processedOutputCols.contains(outputColName)) {
+                    LOG.warn("column-lineage: duplicate for output-column {}", outputColName);
+                    continue;
+                } else {
+                    processedOutputCols.add(outputColName);
+                }
+
+                outputColumns.add(outputColumn);
+            }
+
+            List<AtlasEntity> inputColumns = new ArrayList<>();
+
+            for (Long sourceId : edge.getSources()) {
+                LineageVertex columnVertex = verticesMap.get(sourceId);
+                String        inputColName = getQualifiedName(columnVertex);
+                AtlasEntity   inputColumn  = context.getEntity(inputColName);
+
+                if (inputColumn == null) {
+                    LOG.warn("column-lineage: non-existing input-column {} with id ={}", inputColName, sourceId);
+                    continue;
+                }
+
+                inputColumns.add(inputColumn);
+            }
+
+            if (inputColumns.isEmpty()) {
+                continue;
+            }
+
+            AtlasEntity columnLineageProcess = new AtlasEntity(ImpalaDataType.IMPALA_COLUMN_LINEAGE.getName());
+
+            // TODO: when there are multiple target IDs, should we use first column name or all of their name?
+            String columnQualifiedName = (String)impalaProcess.getAttribute(ATTRIBUTE_QUALIFIED_NAME) +
+                AtlasImpalaHookContext.QNAME_SEP_PROCESS + outputColumns.get(0).getAttribute(ATTRIBUTE_NAME);
+            columnLineageProcess.setAttribute(ATTRIBUTE_NAME, columnQualifiedName);
+            columnLineageProcess.setAttribute(ATTRIBUTE_QUALIFIED_NAME, columnQualifiedName);
+            columnLineageProcess.setAttribute(ATTRIBUTE_INPUTS, getObjectIds(inputColumns));
+            columnLineageProcess.setAttribute(ATTRIBUTE_OUTPUTS, getObjectIds(outputColumns));
+            columnLineageProcess.setAttribute(ATTRIBUTE_QUERY, getObjectId(impalaProcess));
+
+            // based on https://github.com/apache/impala/blob/master/fe/src/main/java/org/apache/impala/analysis/ColumnLineageGraph.java#L267
+            // There are two types of dependencies that are represented as edges in the column
+            // lineage graph:
+            //    a) Projection dependency: This is a dependency between a set of source
+            //    columns (base table columns) and a single target (result expr or table column).
+            //    This dependency indicates that values of the target depend on the values of the source
+            //    columns.
+            //    b) Predicate dependency: This is a dependency between a set of target
+            //    columns (or exprs) and a set of source columns (base table columns). It indicates that
+            //    the source columns restrict the values of their targets (e.g. by participating in
+            //    WHERE clause predicates).
+            columnLineageProcess.setAttribute(ATTRIBUTE_DEPENDENCY_TYPE, ImpalaDependencyType.PROJECTION.getName());
+
+            columnLineages.add(columnLineageProcess);
+        }
+
+        for (AtlasEntity columnLineage : columnLineages) {
+            String columnQualifiedName = (String)columnLineage.getAttribute(ATTRIBUTE_QUALIFIED_NAME);
+            LOG.debug("get column lineage entity with qualifiedName: {}", columnQualifiedName);
+
+            entities.addEntity(columnLineage);
+        }
+    }
+
+    // Process the impala query, classify the vertices as input or output based on LineageEdge
+    // Then organize the vertices into hierarchical structure: put all column vertices of a table
+    // as children of a ImpalaNode representing that table.
+    private void getInputOutList(ImpalaQuery lineageQuery, List<ImpalaNode> inputNodes,
+        List<ImpalaNode> outputNodes) {
+        // get vertex map with key being its id and
+        // ImpalaNode map with its own vertex's vertexId as its key
+        for (LineageVertex vertex : lineageQuery.getVertices()) {
+            verticesMap.put(vertex.getId(), vertex);
+            vertexNameMap.put(vertex.getVertexId(), new ImpalaNode(vertex));
+        }
+
+        // get set of source ID and set of target Id
+        Set<Long> sourceIds = new HashSet<>();
+        Set<Long> targetIds = new HashSet<>();
+        for (LineageEdge edge : lineageQuery.getEdges()) {
+            if (ImpalaDependencyType.PROJECTION.equals(edge.getEdgeType())) {
+                sourceIds.addAll(edge.getSources());
+                targetIds.addAll(edge.getTargets());
+            }
+        }
+
+        Map<String, ImpalaNode> inputMap  = buildInputOutputList(sourceIds, verticesMap, vertexNameMap);
+        Map<String, ImpalaNode> outputMap = buildInputOutputList(targetIds, verticesMap, vertexNameMap);
+
+        inputNodes.addAll(inputMap.values());
+        outputNodes.addAll(outputMap.values());
+    }
+
+    /**
+     * From the list of Ids and Id to Vertices map, generate the Table name to ImpalaNode map.
+     * @param idSet the list of Ids. They are from lineage edges
+     * @param vertexMap the Id to Vertex map
+     * @param vertexNameMap the vertexId to ImpalaNode map.
+     * @return the table name to ImpalaNode map, whose table node contains its columns
+     */
+    private Map<String, ImpalaNode> buildInputOutputList(Set<Long> idSet, Map<Long, LineageVertex> vertexMap,
+        Map<String, ImpalaNode> vertexNameMap) {
+        Map<String, ImpalaNode> returnTableMap = new HashMap<>();
+
+        for (Long id : idSet) {
+            LineageVertex vertex = vertexMap.get(id);
+            if (vertex == null) {
+                LOG.warn("cannot find vertex with id: {}", id);
+                continue;
+            }
+
+            if (ImpalaVertexType.COLUMN.equals(vertex.getVertexType())) {
+                // add column to its table node
+                String tableName = getTableNameFromColumn(vertex.getVertexId());
+                if (tableName == null) {
+                    LOG.warn("cannot find tableName for vertex with id: {}, column name : {}",
+                        id, vertex.getVertexId() == null? "null" : vertex.getVertexId());
+                    continue;
+                }
+
+                ImpalaNode tableNode = returnTableMap.get(tableName);
+
+                if (tableNode == null) {
+                    tableNode = vertexNameMap.get(tableName);
+
+                    if (tableNode == null) {
+                        LOG.warn("cannot find table node for vertex with id: {}, column name : {}",
+                            id, vertex.getVertexId());
+                        continue;
+                    }
+
+                    returnTableMap.put(tableName, tableNode);
+                }
+
+                tableNode.addChild(vertex);
+            }
+        }
+
+        return returnTableMap;
+    }
+
+    private boolean skipProcess(List<ImpalaNode> inputNodes, List<ImpalaNode> ouputNodes) {
+        if (inputNodes.isEmpty() || ouputNodes.isEmpty()) {
+            return true;
+        }
+
+        return false;
+    }
+}
diff --git a/addons/impala-bridge/src/main/java/org.apache.atlas.impala/model/IImpalaLineageHook.java b/addons/impala-bridge/src/main/java/org.apache.atlas.impala/model/IImpalaLineageHook.java
new file mode 100644
index 0000000..7c1103a
--- /dev/null
+++ b/addons/impala-bridge/src/main/java/org.apache.atlas.impala/model/IImpalaLineageHook.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.impala.model;
+
+/**
+ * Define the interface to process Impala lineage record
+ */
+public interface IImpalaLineageHook {
+
+  // The input is a serialized string of an Impala lineage record
+  void process(String impalaQueryString) throws Exception;
+}
\ No newline at end of file
diff --git a/addons/impala-bridge/src/main/java/org.apache.atlas.impala/model/ImpalaDataType.java b/addons/impala-bridge/src/main/java/org.apache.atlas.impala/model/ImpalaDataType.java
new file mode 100644
index 0000000..10ce448
--- /dev/null
+++ b/addons/impala-bridge/src/main/java/org.apache.atlas.impala/model/ImpalaDataType.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.impala.model;
+
+/**
+ * Data types used for Impala bridge
+ */
+public enum ImpalaDataType {
+
+    IMPALA_PROCESS,
+    IMPALA_PROCESS_EXECUTION,
+    IMPALA_COLUMN_LINEAGE;
+
+    public String getName() {
+        return name().toLowerCase();
+    }
+}
diff --git a/addons/impala-bridge/src/main/java/org.apache.atlas.impala/model/ImpalaDependencyType.java b/addons/impala-bridge/src/main/java/org.apache.atlas.impala/model/ImpalaDependencyType.java
new file mode 100644
index 0000000..892ee9b
--- /dev/null
+++ b/addons/impala-bridge/src/main/java/org.apache.atlas.impala/model/ImpalaDependencyType.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.impala.model;
+
+public enum ImpalaDependencyType {
+    PROJECTION("PROJECTION"),
+    PREDICATE("PREDICATE");
+
+    private final String name;
+
+    ImpalaDependencyType(String name) {
+        this.name = name;
+    }
+
+    public String getName() {
+        return name.toUpperCase();
+    }
+}
diff --git a/addons/impala-bridge/src/main/java/org.apache.atlas.impala/model/ImpalaNode.java b/addons/impala-bridge/src/main/java/org.apache.atlas.impala/model/ImpalaNode.java
new file mode 100644
index 0000000..a3ddf53
--- /dev/null
+++ b/addons/impala-bridge/src/main/java/org.apache.atlas.impala/model/ImpalaNode.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.impala.model;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Contain vertex info of this node and its children. It is used only internally
+ */
+public class ImpalaNode {
+    LineageVertex ownVertex;
+    Map<Long, ImpalaNode> children;
+
+    public ImpalaNode(LineageVertex ownVertex) {
+        this.ownVertex = ownVertex;
+        children = new HashMap<>();
+    }
+
+    public String getNodeName() { return ownVertex.getVertexId(); }
+    public ImpalaVertexType getNodeType() { return ownVertex.getVertexType(); }
+    public LineageVertex getOwnVertex() { return ownVertex; }
+    public Map<Long, ImpalaNode> getChildren() { return children; }
+
+    /**
+     * Add child to this node
+     * @param child
+     * @return the node corresponding to the input child vertex
+     */
+    public ImpalaNode addChild(LineageVertex child) {
+        ImpalaNode exitingChild = children.get(child.getId());
+        if (exitingChild != null) {
+            return exitingChild;
+        }
+
+        ImpalaNode newChild = new ImpalaNode(child);
+        return children.put(child.getId(), newChild);
+    }
+}
diff --git a/addons/impala-bridge/src/main/java/org.apache.atlas.impala/model/ImpalaOperationType.java b/addons/impala-bridge/src/main/java/org.apache.atlas.impala/model/ImpalaOperationType.java
new file mode 100644
index 0000000..8b0be16
--- /dev/null
+++ b/addons/impala-bridge/src/main/java/org.apache.atlas.impala/model/ImpalaOperationType.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.impala.model;
+
+public enum ImpalaOperationType{
+    CREATEVIEW ("CREATEVIEW"),
+    UNKNOWN ("UNKNOWN");
+
+    private final String name;
+
+    ImpalaOperationType(String s) {
+        name = s;
+    }
+
+    public boolean equalsName(String otherName) {
+        return name.equals(otherName);
+    }
+
+    public String toString() {
+        return this.name;
+    }
+}
\ No newline at end of file
diff --git a/addons/impala-bridge/src/main/java/org.apache.atlas.impala/model/ImpalaQuery.java b/addons/impala-bridge/src/main/java/org.apache.atlas.impala/model/ImpalaQuery.java
new file mode 100644
index 0000000..27bdc72
--- /dev/null
+++ b/addons/impala-bridge/src/main/java/org.apache.atlas.impala/model/ImpalaQuery.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.impala.model;
+
+import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
+import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import java.util.List;
+
+/**
+ * Represent an Impala lineage record in lineage log.
+ */
+@JsonAutoDetect(getterVisibility=PUBLIC_ONLY, setterVisibility=PUBLIC_ONLY, fieldVisibility=NONE)
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown=true)
+public class ImpalaQuery {
+  private String queryText;
+  private String queryId;
+  private String hash;
+  private String user;
+
+  // the time stamp is in seconds. It is Unix epoch, which is the number of seconds that have
+  // elapsed since January 1, 1970 (midnight UTC/GMT), not counting leap seconds
+  private Long timestamp;
+  private Long endTime;
+  private List<LineageEdge> edges;
+  private List<LineageVertex> vertices;
+
+  public List<LineageEdge> getEdges() {
+    return edges;
+  }
+
+  public List<LineageVertex> getVertices() {
+    return vertices;
+  }
+
+  public Long getEndTime() {
+    return endTime;
+  }
+
+  public String getHash() {
+    return hash;
+  }
+
+  public String getQueryId() {
+    return queryId;
+  }
+
+  public String getQueryText() {
+    return queryText;
+  }
+
+  public Long getTimestamp() {
+    return timestamp;
+  }
+
+  public String getUser() {
+    return user;
+  }
+
+  public void setEdges(List<LineageEdge> edges) {
+    this.edges = edges;
+  }
+
+  public void setEndTime(Long endTime) {
+    this.endTime = endTime;
+  }
+
+  public void setHash(String hash) {
+    this.hash = hash;
+  }
+
+  public void setQueryId(String queryId) {
+    this.queryId = queryId;
+  }
+
+  public void setQueryText(String queryText) {
+    this.queryText = queryText;
+  }
+
+  public void setTimestamp(Long timestamp) { this.timestamp = timestamp; }
+
+  public void setUser(String user) {
+    this.user = user;
+  }
+
+  public void setVertices(List<LineageVertex> vertices) {
+    this.vertices = vertices;
+  }
+
+}
diff --git a/addons/impala-bridge/src/main/java/org.apache.atlas.impala/model/ImpalaVertexType.java b/addons/impala-bridge/src/main/java/org.apache.atlas.impala/model/ImpalaVertexType.java
new file mode 100644
index 0000000..8ec3f85
--- /dev/null
+++ b/addons/impala-bridge/src/main/java/org.apache.atlas.impala/model/ImpalaVertexType.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.impala.model;
+
+public enum ImpalaVertexType {
+    DFS_DIR("DFS_DIR"),
+    PARTITION("PARTITION"),
+    COLUMN("COLUMN"),
+    TABLE("TABLE"),
+    DATABASE("DATABASE");
+
+    private final String name;
+
+    ImpalaVertexType(String name) {
+        this.name = name;
+    }
+
+    public String getName() {
+        return name.toUpperCase();
+    }
+}
diff --git a/addons/impala-bridge/src/main/java/org.apache.atlas.impala/model/LineageEdge.java b/addons/impala-bridge/src/main/java/org.apache.atlas.impala/model/LineageEdge.java
new file mode 100644
index 0000000..251507e
--- /dev/null
+++ b/addons/impala-bridge/src/main/java/org.apache.atlas.impala/model/LineageEdge.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.impala.model;
+
+import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
+import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import java.util.List;
+
+/**
+ * This represents an edge in Impala's lineage record that connects two entities
+ */
+@JsonAutoDetect(getterVisibility=PUBLIC_ONLY, setterVisibility=PUBLIC_ONLY, fieldVisibility=NONE)
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown=true)
+public class LineageEdge {
+  private List<Long> sources;
+  private List<Long> targets;
+  private ImpalaDependencyType edgeType;
+
+  public List<Long> getSources() {
+    return sources;
+  }
+
+  public List<Long> getTargets() {
+    return targets;
+  }
+
+  public ImpalaDependencyType getEdgeType() {
+    return edgeType;
+  }
+
+  public void setSources(List<Long> sources) {
+    this.sources = sources;
+  }
+
+  public void setTargets(List<Long> targets) {
+    this.targets = targets;
+  }
+
+  public void setEdgeType(ImpalaDependencyType edgeType) {
+    this.edgeType = edgeType;
+  }
+}
diff --git a/addons/impala-bridge/src/main/java/org.apache.atlas.impala/model/LineageVertex.java b/addons/impala-bridge/src/main/java/org.apache.atlas.impala/model/LineageVertex.java
new file mode 100644
index 0000000..82672c9
--- /dev/null
+++ b/addons/impala-bridge/src/main/java/org.apache.atlas.impala/model/LineageVertex.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.impala.model;
+
+import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
+import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+/**
+ * This represents an entity in Impala's lineage record.
+ */
+@JsonAutoDetect(getterVisibility=PUBLIC_ONLY, setterVisibility=PUBLIC_ONLY, fieldVisibility=NONE)
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown=true)
+public class LineageVertex {
+  // id is used to reference this entity. It is used in LineageEdge to specify source and target
+  // https://github.com/apache/impala/blob/master/be/src/util/lineage-util.h#L40
+  // Impala id is int64. Therefore, define this field as Long
+  private Long id;
+
+  // specify the type of the entity, it could be "TABLE", "COLUMN" etc.
+  private ImpalaVertexType vertexType;
+
+  // specify the name of the entity
+  private String vertexId;
+
+  // It is optional, and could be null. It is only set if the verType is "TABLE"
+  private Long createTime;
+
+  public Long getId() { return id; }
+
+  public ImpalaVertexType getVertexType() {
+    return vertexType;
+  }
+
+  public String getVertexId() {
+    return vertexId;
+  }
+
+  public Long getCreateTime() { return createTime; }
+
+  public void setId(Long id) {
+    this.id = id;
+  }
+
+  public void setVertexType(ImpalaVertexType vertexType) {
+    this.vertexType = vertexType;
+  }
+
+  public void setVertexId(String vertexId) {
+    this.vertexId = vertexId;
+  }
+
+  public void setCreateTime(Long createTime) { this.createTime = createTime; }
+}
\ No newline at end of file
diff --git a/addons/impala-bridge/src/main/resources/atlas-log4j.xml b/addons/impala-bridge/src/main/resources/atlas-log4j.xml
new file mode 100644
index 0000000..97317a8
--- /dev/null
+++ b/addons/impala-bridge/src/main/resources/atlas-log4j.xml
@@ -0,0 +1,42 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
+  <appender name="FILE" class="org.apache.log4j.RollingFileAppender">
+    <param name="File" value="/var/log/hive/impala-bridge.log"/>
+    <param name="Append" value="true"/>
+    <param name="maxFileSize" value="100MB" />
+    <param name="maxBackupIndex" value="20" />
+    <layout class="org.apache.log4j.PatternLayout">
+      <param name="ConversionPattern" value="%d %-5p - [%t:%x] ~ %m (%C{1}:%L)%n"/>
+    </layout>
+  </appender>
+
+  <logger name="org.apache.atlas.impala.ImpalaLineageTool" additivity="false">
+    <level value="info"/>
+    <appender-ref ref="FILE"/>
+  </logger>
+
+  <root>
+    <priority value="warn"/>
+    <appender-ref ref="FILE"/>
+  </root>
+</log4j:configuration>
\ No newline at end of file
diff --git a/addons/impala-bridge/src/main/resources/import-impala.sh b/addons/impala-bridge/src/main/resources/import-impala.sh
new file mode 100644
index 0000000..5ca8984
--- /dev/null
+++ b/addons/impala-bridge/src/main/resources/import-impala.sh
@@ -0,0 +1,109 @@
+#!/bin/bash
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License. See accompanying LICENSE file.
+#
+# resolve links - $0 may be a softlink
+PRG="${0}"
+
+[[ `uname -s` == *"CYGWIN"* ]] && CYGWIN=true
+
+while [ -h "${PRG}" ]; do
+  ls=`ls -ld "${PRG}"`
+  link=`expr "$ls" : '.*-> \(.*\)$'`
+  if expr "$link" : '/.*' > /dev/null; then
+    PRG="$link"
+  else
+    PRG=`dirname "${PRG}"`/"$link"
+  fi
+done
+
+BASEDIR=`dirname ${PRG}`
+
+if test -z "${JAVA_HOME}"
+then
+    JAVA_BIN=`which java`
+    JAR_BIN=`which jar`
+else
+    JAVA_BIN="${JAVA_HOME}/bin/java"
+    JAR_BIN="${JAVA_HOME}/bin/jar"
+fi
+export JAVA_BIN
+
+if [ ! -e "${JAVA_BIN}" ] || [ ! -e "${JAR_BIN}" ]; then
+  echo "$JAVA_BIN and/or $JAR_BIN not found on the system. Please make sure java and jar commands are available."
+  exit 1
+fi
+
+# Construct ATLAS_CONF where atlas-properties reside
+# assume the hive-server2 is installed and contains Atlas configuration
+# Otherwise, need to setup Atlas required properties and libraries before running this tool
+if [ ! -z "$HIVE_CONF_DIR" ]; then
+    HIVE_CONF=$HIVE_CONF_DIR
+elif [ ! -z "$HIVE_HOME" ]; then
+    HIVE_CONF="$HIVE_HOME/conf"
+elif [ -e /etc/hive/conf ]; then
+    HIVE_CONF="/etc/hive/conf"
+else
+    echo "Could not find a valid HIVE configuration for ATLAS"
+    exit 1
+fi
+if [ -z "$ATLAS_CONF" ]; then
+ export ATLAS_CONF=$HIVE_CONF
+fi
+
+# log dir for applications
+ATLAS_LOG_DIR="/var/log/atlas"
+ATLAS_LOG_FILE="impala-bridge.log"
+LOG_CONFIG="${BASEDIR}/atlas-log4j.xml"
+
+# Construct Atlas classpath.
+DIR=$PWD
+PARENT="$(dirname "$DIR")"
+GRANDPARENT="$(dirname "$PARENT")"
+LIB_PATH="$GRANDPARENT/server/webapp/atlas/WEB-INF/lib"
+echo "$LIB_PATH"
+# Construct Atlas classpath.
+for i in "$LIB_PATH/"*.jar; do
+  ATLASCPPATH="${ATLASCPPATH}:$i"
+done
+
+for i in "${BASEDIR}/"*.jar; do
+  ATLASCPPATH="${ATLASCPPATH}:$i"
+done
+
+echo "Logging: ${ATLAS_LOG_DIR}/${ATLAS_LOG_FILE}"
+echo "Log config: ${LOG_CONFIG}"
+
+TIME=`date %Y%m%d%H%M%s`
+CP="${ATLASCPPATH}:${ATLAS_CONF}"
+
+# If running in cygwin, convert pathnames and classpath to Windows format.
+if [ "${CYGWIN}" == "true" ]
+then
+   ATLAS_LOG_DIR=`cygpath -w ${ATLAS_LOG_DIR}`
+   ATLAS_LOG_FILE=`cygpath -w ${ATLAS_LOG_FILE}`
+   CP=`cygpath -w -p ${CP}`
+fi
+
+JAVA_PROPERTIES="$ATLAS_OPTS -Datlas.log.dir=$ATLAS_LOG_DIR -Datlas.log.file=$ATLAS_LOG_FILE -Dlog4j.configuration=file://$LOG_CONFIG"
+
+IMPORT_ARGS=$@
+JVM_ARGS=
+
+JAVA_PROPERTIES="${JAVA_PROPERTIES} ${JVM_ARGS}"
+"${JAVA_BIN}" ${JAVA_PROPERTIES} -cp "${CP}" org.apache.atlas.impala.ImpalaLineageTool $IMPORT_ARGS
+
+RETVAL=$?
+[ $RETVAL -eq 0 ] && echo Done!
+[ $RETVAL -ne 0 ] && echo Failed!
+exit $RETVAL
\ No newline at end of file
diff --git a/addons/impala-bridge/src/test/java/org/apache/atlas/impala/ImpalaLineageITBase.java b/addons/impala-bridge/src/test/java/org/apache/atlas/impala/ImpalaLineageITBase.java
new file mode 100644
index 0000000..cc62955
--- /dev/null
+++ b/addons/impala-bridge/src/test/java/org/apache/atlas/impala/ImpalaLineageITBase.java
@@ -0,0 +1,249 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.impala;
+
+import static org.apache.atlas.impala.hook.events.BaseImpalaEvent.ATTRIBUTE_QUALIFIED_NAME;
+import static org.apache.atlas.impala.hook.events.BaseImpalaEvent.ATTRIBUTE_RECENT_QUERIES;
+import static org.apache.atlas.impala.hook.events.BaseImpalaEvent.HIVE_TYPE_DB;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.fail;
+
+import java.util.Collections;
+import java.util.List;
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.impala.hook.AtlasImpalaHookContext;
+import org.apache.atlas.impala.hook.ImpalaLineageHook;
+import org.apache.atlas.impala.model.ImpalaDataType;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.utils.AuthenticationUtil;
+import org.apache.atlas.utils.ParamChecker;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.testng.annotations.BeforeClass;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+
+public class ImpalaLineageITBase {
+    private static final Logger LOG = LoggerFactory.getLogger(ImpalaLineageITBase.class);
+
+    public static final String DEFAULT_DB = "default";
+    public static final String SEP = ":".intern();
+    public static final String IO_SEP = "->".intern();
+    protected static final String DGI_URL = "http://localhost:21000/";
+    protected static final String CLUSTER_NAME = "primary";
+    protected static final String PART_FILE = "2015-01-01";
+    protected static final String INPUTS = "inputs";
+    protected static final String OUTPUTS = "outputs";
+    protected static AtlasClientV2 atlasClientV2;
+
+    private static final String REFERENCEABLE_ATTRIBUTE_NAME = "qualifiedName";
+    private static final String ATTR_NAME = "name";
+
+    // to push entity creation/update to HMS, so HMS hook can push the metadata notification
+    // to Atlas, then the lineage notification from this tool can be created at Atlas
+    protected static Driver              driverWithoutContext;
+    protected static SessionState        ss;
+    protected static HiveConf            conf;
+
+
+    @BeforeClass
+    public void setUp() throws Exception {
+        //Set-up hive session
+        conf = new HiveConf();
+        conf.setClassLoader(Thread.currentThread().getContextClassLoader());
+        HiveConf conf = new HiveConf();
+        SessionState ss = new SessionState(conf);
+        ss = SessionState.start(ss);
+        SessionState.setCurrentSessionState(ss);
+        driverWithoutContext = new Driver(conf);
+
+        Configuration configuration = ApplicationProperties.get();
+
+        String[] atlasEndPoint = configuration.getStringArray(ImpalaLineageHook.ATLAS_ENDPOINT);
+        if (atlasEndPoint == null || atlasEndPoint.length == 0) {
+            atlasEndPoint = new String[]{DGI_URL};
+        }
+
+        if (!AuthenticationUtil.isKerberosAuthenticationEnabled()) {
+            atlasClientV2 = new AtlasClientV2(atlasEndPoint, new String[]{"admin", "admin"});
+        } else {
+            atlasClientV2 = new AtlasClientV2(atlasEndPoint);
+        }
+
+    }
+
+    protected String assertEntityIsRegistered(final String typeName, final String property, final String value,
+        final AssertPredicate assertPredicate) throws Exception {
+        waitFor(80000, new Predicate() {
+            @Override
+            public void evaluate() throws Exception {
+                AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo = atlasClientV2.getEntityByAttribute(typeName, Collections
+                    .singletonMap(property,value));
+                AtlasEntity entity = atlasEntityWithExtInfo.getEntity();
+                assertNotNull(entity);
+                if (assertPredicate != null) {
+                    assertPredicate.assertOnEntity(entity);
+                }
+            }
+        });
+        AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo = atlasClientV2.getEntityByAttribute(typeName, Collections.singletonMap(property,value));
+        AtlasEntity entity = atlasEntityWithExtInfo.getEntity();
+        return (String) entity.getGuid();
+    }
+
+    protected String assertProcessIsRegistered(String processQFName, String queryString) throws Exception {
+        try {
+            LOG.debug("Searching for process with query {}", processQFName);
+
+            return assertEntityIsRegistered(ImpalaDataType.IMPALA_PROCESS.getName(), ATTRIBUTE_QUALIFIED_NAME, processQFName, new AssertPredicate() {
+                @Override
+                public void assertOnEntity(final AtlasEntity entity) throws Exception {
+                    List<String> recentQueries = (List<String>) entity.getAttribute(ATTRIBUTE_RECENT_QUERIES);
+
+                    Assert.assertEquals(recentQueries.get(0), lower(queryString));
+                }
+            });
+        } catch(Exception e) {
+            LOG.error("Exception : ", e);
+            throw e;
+        }
+    }
+
+    protected String assertDatabaseIsRegistered(String dbName) throws Exception {
+        return assertDatabaseIsRegistered(dbName, null);
+    }
+
+    protected String assertDatabaseIsRegistered(String dbName, AssertPredicate assertPredicate) throws Exception {
+        LOG.debug("Searching for database: {}", dbName);
+
+        String dbQualifiedName = dbName + AtlasImpalaHookContext.QNAME_SEP_CLUSTER_NAME +
+            CLUSTER_NAME;
+
+        dbQualifiedName = dbQualifiedName.toLowerCase();
+
+        return assertEntityIsRegistered(HIVE_TYPE_DB, REFERENCEABLE_ATTRIBUTE_NAME, dbQualifiedName, assertPredicate);
+    }
+
+    protected String createDatabase() throws Exception {
+        String dbName = dbName();
+
+        return createDatabase(dbName);
+    }
+
+    protected  String createDatabase(String dbName) throws Exception {
+        runCommand("CREATE DATABASE IF NOT EXISTS " + dbName);
+
+        return dbName;
+    }
+
+    protected String createTable(String dbName, String columnsString) throws Exception {
+        return createTable(dbName, columnsString, false);
+    }
+
+    protected String createTable(String dbName, String columnsString, boolean isPartitioned) throws Exception {
+        String tableName = tableName();
+        return createTable(dbName, tableName, columnsString, isPartitioned);
+    }
+
+    protected String createTable(String dbName, String tableName, String columnsString, boolean isPartitioned) throws Exception {
+        runCommand("CREATE TABLE IF NOT EXISTS " + dbName + "." + tableName + " " + columnsString + " comment 'table comment' " + (isPartitioned ? " partitioned by(dt string)" : ""));
+
+        return dbName + "." + tableName;
+    }
+
+    public interface AssertPredicate {
+        void assertOnEntity(AtlasEntity entity) throws Exception;
+    }
+
+    public interface Predicate {
+        /**
+         * Perform a predicate evaluation.
+         *
+         * @return the boolean result of the evaluation.
+         * @throws Exception thrown if the predicate evaluation could not evaluate.
+         */
+        void evaluate() throws Exception;
+    }
+
+    /**
+     * Wait for a condition, expressed via a {@link Predicate} to become true.
+     *
+     * @param timeout maximum time in milliseconds to wait for the predicate to become true.
+     * @param predicate predicate waiting on.
+     */
+    protected void waitFor(int timeout, Predicate predicate) throws Exception {
+        ParamChecker.notNull(predicate, "predicate");
+        long mustEnd = System.currentTimeMillis() + timeout;
+
+        while (true) {
+            try {
+                predicate.evaluate();
+                return;
+            } catch(Error | Exception e) {
+                if (System.currentTimeMillis() >= mustEnd) {
+                    fail("Assertions failed. Failing after waiting for timeout " + timeout + " msecs", e);
+                }
+                LOG.debug("Waiting up to {} msec as assertion failed", mustEnd - System.currentTimeMillis(), e);
+                Thread.sleep(5000);
+            }
+        }
+    }
+
+    public static String lower(String str) {
+        if (StringUtils.isEmpty(str)) {
+            return null;
+        }
+        return str.toLowerCase().trim();
+    }
+
+    protected void runCommand(String cmd) throws Exception {
+        runCommandWithDelay(cmd, 0);
+    }
+
+    protected void runCommandWithDelay(String cmd, int sleepMs) throws Exception {
+        runCommandWithDelay(driverWithoutContext, cmd, sleepMs);
+    }
+
+    protected void runCommandWithDelay(Driver driver, String cmd, int sleepMs) throws Exception {
+        LOG.debug("Running command '{}'", cmd);
+        CommandProcessorResponse response = driver.run(cmd);
+        assertEquals(response.getResponseCode(), 0);
+        if (sleepMs != 0) {
+            Thread.sleep(sleepMs);
+        }
+    }
+
+    protected String random() {
+        return RandomStringUtils.randomAlphanumeric(10);
+    }
+
+    protected String tableName() {
+        return "table_" + random();
+    }
+    protected String dbName() {return "db_" + random();}
+}
diff --git a/addons/impala-bridge/src/test/java/org/apache/atlas/impala/ImpalaLineageToolIT.java b/addons/impala-bridge/src/test/java/org/apache/atlas/impala/ImpalaLineageToolIT.java
new file mode 100644
index 0000000..05190b6
--- /dev/null
+++ b/addons/impala-bridge/src/test/java/org/apache/atlas/impala/ImpalaLineageToolIT.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.impala;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.atlas.impala.hook.AtlasImpalaHookContext;
+import org.apache.atlas.impala.hook.ImpalaLineageHook;
+import org.apache.atlas.impala.model.ImpalaQuery;
+import org.testng.annotations.Test;
+
+public class ImpalaLineageToolIT extends ImpalaLineageITBase {
+    private static String dir = System.getProperty("user.dir") + "/src/test/resources/";
+    private static String IMPALA = dir + "impala3.json";
+    private static String IMPALA_WAL = dir + "WALimpala.wal";
+
+    /**
+     * This tests
+     * 1) ImpalaLineageTool can parse one lineage file that contains "create view" command lineage
+     * 2) Lineage is sent to Atlas
+     * 3) Atlas can get this lineage from Atlas
+     */
+    @Test
+    public void testCreateViewFromFile() {
+        List<ImpalaQuery> lineageList = new ArrayList<>();
+        ImpalaLineageHook impalaLineageHook = new ImpalaLineageHook();
+
+        try {
+            // create database and tables to simulate Impala behavior that Impala updates metadata
+            // to HMS and HMSHook sends the metadata to Atlas, which has to happen before
+            // Atlas can handle lineage notification
+            String dbName = "db_1";
+            createDatabase(dbName);
+
+            String sourceTableName = "table_1";
+            createTable(dbName, sourceTableName,"(id string, count int)", false);
+
+            String targetTableName = "view_1";
+            createTable(dbName, targetTableName,"(count int, id string)", false);
+
+            // process lineage record, and send corresponding notification to Atlas
+            String[] args = new String[]{"-d", "./", "-p", "impala"};
+            ImpalaLineageTool toolInstance = new ImpalaLineageTool(args);
+            toolInstance.importHImpalaEntities(impalaLineageHook, IMPALA, IMPALA_WAL);
+
+            // verify the process is saved in Atlas
+            // the value is from info in IMPALA_3
+            String createTime = new Long((long)(1554750072)*1000).toString();
+            String processQFName =
+                "db_1.view_1" + AtlasImpalaHookContext.QNAME_SEP_CLUSTER_NAME +
+                    CLUSTER_NAME + AtlasImpalaHookContext.QNAME_SEP_PROCESS + createTime;
+
+            processQFName = processQFName.toLowerCase();
+
+            assertProcessIsRegistered(processQFName,
+                "create view db_1.view_1 as select count, id from db_1.table_1");
+
+        } catch (Exception e) {
+            System.out.print("Appending file error");
+        }
+    }
+}
diff --git a/addons/impala-bridge/src/test/java/org/apache/atlas/impala/hook/ImpalaLineageHookIT.java b/addons/impala-bridge/src/test/java/org/apache/atlas/impala/hook/ImpalaLineageHookIT.java
new file mode 100644
index 0000000..86801e3
--- /dev/null
+++ b/addons/impala-bridge/src/test/java/org/apache/atlas/impala/hook/ImpalaLineageHookIT.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.impala.hook;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.atlas.impala.ImpalaLineageITBase;
+import org.apache.atlas.impala.hook.events.BaseImpalaEvent;
+import org.apache.atlas.impala.model.ImpalaDependencyType;
+import org.apache.atlas.impala.model.ImpalaVertexType;
+import org.apache.atlas.impala.model.LineageEdge;
+import org.apache.atlas.impala.model.ImpalaQuery;
+import org.apache.atlas.impala.model.LineageVertex;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.testng.Assert.assertFalse;
+
+public class ImpalaLineageHookIT extends ImpalaLineageITBase {
+    private static final Logger LOG = LoggerFactory.getLogger(ImpalaLineageHookIT.class);
+    private static ImpalaLineageHook impalaHook;
+
+    @BeforeClass
+    public void setUp() throws Exception {
+        super.setUp();
+        impalaHook = new ImpalaLineageHook();
+    }
+
+    @AfterClass
+    public void testClean() {
+        impalaHook = null;
+    }
+
+    @Test
+    public void testCreateView() throws Exception {
+        // first trigger HMS hook to create related entities
+        String dbName = createDatabase();
+        assertDatabaseIsRegistered(dbName);
+
+        String tableName = createTable(dbName, "(id string, count int)");
+        String viewName = createTable(dbName, "(count int, id string)");
+
+        // then process lineage record to push lineage to Atlas
+        ImpalaQuery queryObj = new ImpalaQuery();
+        List<LineageEdge> edges = new ArrayList<>();
+        List<LineageVertex> vertices = new ArrayList<>();
+
+        queryObj.setQueryText("create view " + viewName + " as select count, id from " + tableName);
+        queryObj.setQueryId("3a441d0c130962f8:7f634aec00000000");
+        queryObj.setHash("64ff0425ccdfaada53e3f2fd76f566f7");
+        queryObj.setUser("admin");
+        queryObj.setTimestamp((long)1554750072);
+        queryObj.setEndTime((long)1554750554);
+
+        LineageEdge edge1 = new LineageEdge();
+        edge1.setSources( Arrays.asList((long)1));
+        edge1.setTargets( Arrays.asList((long)0));
+        edge1.setEdgeType(ImpalaDependencyType.PROJECTION);
+        edges.add(edge1);
+
+        LineageEdge edge2 = new LineageEdge();
+        edge2.setSources( Arrays.asList((long)3));
+        edge2.setTargets( Arrays.asList((long)2));
+        edge2.setEdgeType(ImpalaDependencyType.PROJECTION);
+        edges.add(edge2);
+
+        queryObj.setEdges(edges);
+
+        LineageVertex vertex1 = new LineageVertex();
+        vertex1.setId((long)0);
+        vertex1.setVertexType(ImpalaVertexType.COLUMN);
+        vertex1.setVertexId(viewName + ".count");
+        vertices.add(vertex1);
+
+        LineageVertex vertex2 = new LineageVertex();
+        vertex2.setId((long)1);
+        vertex2.setVertexType(ImpalaVertexType.COLUMN);
+        vertex2.setVertexId(tableName + ".count");
+        vertices.add(vertex2);
+
+        LineageVertex vertex3 = new LineageVertex();
+        vertex3.setId((long)2);
+        vertex3.setVertexType(ImpalaVertexType.COLUMN);
+        vertex3.setVertexId(viewName + ".id");
+        vertices.add(vertex3);
+
+        LineageVertex vertex4 = new LineageVertex();
+        vertex4.setId((long)3);
+        vertex4.setVertexType(ImpalaVertexType.COLUMN);
+        vertex4.setVertexId(tableName + ".id");
+        vertices.add(vertex4);
+
+        LineageVertex vertex5 = new LineageVertex();
+        vertex5.setId((long)4);
+        vertex5.setVertexType(ImpalaVertexType.TABLE);
+        vertex5.setVertexId(viewName);
+        vertex5.setCreateTime(System.currentTimeMillis() / 1000);
+        vertices.add(vertex5);
+
+        LineageVertex vertex6 = new LineageVertex();
+        vertex6.setId((long)5);
+        vertex6.setVertexType(ImpalaVertexType.TABLE);
+        vertex6.setVertexId(tableName);
+        vertex6.setCreateTime(System.currentTimeMillis() / 1000);
+        vertices.add(vertex6);
+
+        queryObj.setVertices(vertices);
+
+        try {
+            impalaHook.process(queryObj);
+            String createTime = new Long(BaseImpalaEvent.getTableCreateTime(vertex5)).toString();
+            String processQFName =
+                vertex5.getVertexId() + AtlasImpalaHookContext.QNAME_SEP_CLUSTER_NAME +
+                    CLUSTER_NAME + AtlasImpalaHookContext.QNAME_SEP_PROCESS + createTime;
+
+            processQFName = processQFName.toLowerCase();
+
+            assertProcessIsRegistered(processQFName, queryObj.getQueryText());
+        } catch (Exception ex) {
+            LOG.error("process create_view failed: ", ex);
+            assertFalse(true);
+        }
+    }
+
+
+
+
+}
diff --git a/addons/impala-bridge/src/test/resources/atlas-application.properties b/addons/impala-bridge/src/test/resources/atlas-application.properties
new file mode 100644
index 0000000..898b69c
--- /dev/null
+++ b/addons/impala-bridge/src/test/resources/atlas-application.properties
@@ -0,0 +1,124 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+#########  Atlas Server Configs #########
+atlas.rest.address=http://localhost:31000
+
+#########  Graph Database Configs  #########
+
+
+# Graph database implementation.  Value inserted by maven.
+atlas.graphdb.backend=org.apache.atlas.repository.graphdb.janus.AtlasJanusGraphDatabase
+
+# Graph Storage
+atlas.graph.storage.backend=berkeleyje
+
+# Entity repository implementation
+atlas.EntityAuditRepository.impl=org.apache.atlas.repository.audit.InMemoryEntityAuditRepository
+
+# Graph Search Index Backend
+atlas.graph.index.search.backend=solr
+
+#Berkeley storage directory
+atlas.graph.storage.directory=${sys:atlas.data}/berkley
+
+#hbase
+#For standalone mode , specify localhost
+#for distributed mode, specify zookeeper quorum here
+
+atlas.graph.storage.hostname=${graph.storage.hostname}
+atlas.graph.storage.hbase.regions-per-server=1
+atlas.graph.storage.lock.wait-time=10000
+
+#ElasticSearch
+atlas.graph.index.search.directory=${sys:atlas.data}/es
+atlas.graph.index.search.elasticsearch.client-only=false
+atlas.graph.index.search.elasticsearch.local-mode=true
+atlas.graph.index.search.elasticsearch.create.sleep=2000
+
+# Solr cloud mode properties
+atlas.graph.index.search.solr.mode=cloud
+atlas.graph.index.search.solr.zookeeper-url=${solr.zk.address}
+atlas.graph.index.search.solr.embedded=true
+atlas.graph.index.search.max-result-set-size=150
+
+
+#########  Notification Configs  #########
+atlas.notification.embedded=true
+
+atlas.kafka.zookeeper.connect=localhost:19026
+atlas.kafka.bootstrap.servers=localhost:19027
+atlas.kafka.data=${sys:atlas.data}/kafka
+atlas.kafka.zookeeper.session.timeout.ms=4000
+atlas.kafka.zookeeper.sync.time.ms=20
+atlas.kafka.consumer.timeout.ms=4000
+atlas.kafka.auto.commit.interval.ms=100
+atlas.kafka.hook.group.id=atlas
+atlas.kafka.entities.group.id=atlas_entities
+#atlas.kafka.auto.commit.enable=false
+
+atlas.kafka.enable.auto.commit=false
+atlas.kafka.auto.offset.reset=earliest
+atlas.kafka.session.timeout.ms=30000
+atlas.kafka.offsets.topic.replication.factor=1
+
+
+
+#########  Entity Audit Configs  #########
+atlas.audit.hbase.tablename=ATLAS_ENTITY_AUDIT_EVENTS
+atlas.audit.zookeeper.session.timeout.ms=1000
+atlas.audit.hbase.zookeeper.quorum=localhost
+atlas.audit.hbase.zookeeper.property.clientPort=19026
+
+#########  Security Properties  #########
+
+# SSL config
+atlas.enableTLS=false
+atlas.server.https.port=31443
+
+#########  Security Properties  #########
+
+hbase.security.authentication=simple
+
+atlas.hook.falcon.synchronous=true
+
+#########  JAAS Configuration ########
+
+atlas.jaas.KafkaClient.loginModuleName = com.sun.security.auth.module.Krb5LoginModule
+atlas.jaas.KafkaClient.loginModuleControlFlag = required
+atlas.jaas.KafkaClient.option.useKeyTab = true
+atlas.jaas.KafkaClient.option.storeKey = true
+atlas.jaas.KafkaClient.option.serviceName = kafka
+atlas.jaas.KafkaClient.option.keyTab = /etc/security/keytabs/atlas.service.keytab
+atlas.jaas.KafkaClient.option.principal = atlas/_HOST@EXAMPLE.COM
+
+#########  High Availability Configuration ########
+atlas.server.ha.enabled=false
+#atlas.server.ids=id1
+#atlas.server.address.id1=localhost:21000
+
+######### Atlas Authorization #########
+atlas.authorizer.impl=none
+# atlas.authorizer.impl=simple
+# atlas.authorizer.simple.authz.policy.file=atlas-simple-authz-policy.json
+
+######### Atlas Authentication #########
+atlas.authentication.method.file=true
+atlas.authentication.method.ldap.type=none
+atlas.authentication.method.kerberos=false
+# atlas.authentication.method.file.filename=users-credentials.properties
diff --git a/addons/impala-bridge/src/test/resources/atlas-log4j.xml b/addons/impala-bridge/src/test/resources/atlas-log4j.xml
new file mode 100644
index 0000000..de7f55c
--- /dev/null
+++ b/addons/impala-bridge/src/test/resources/atlas-log4j.xml
@@ -0,0 +1,130 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
+    <appender name="console" class="org.apache.log4j.ConsoleAppender">
+        <param name="Target" value="System.out"/>
+        <layout class="org.apache.log4j.PatternLayout">
+            <param name="ConversionPattern" value="%d %-5p - [%t:%x] ~ %m (%C{1}:%L)%n"/>
+        </layout>
+    </appender>
+
+    <appender name="FILE" class="org.apache.log4j.RollingFileAppender">
+        <param name="File" value="${atlas.log.dir}/${atlas.log.file}"/>
+        <param name="Append" value="true"/>
+        <layout class="org.apache.log4j.PatternLayout">
+            <param name="ConversionPattern" value="%d %-5p - [%t:%x] ~ %m (%C{1}:%L)%n"/>
+            <param name="maxFileSize" value="100MB" />
+            <param name="maxBackupIndex" value="20" />
+        </layout>
+    </appender>
+
+    <appender name="AUDIT" class="org.apache.log4j.RollingFileAppender">
+        <param name="File" value="${atlas.log.dir}/audit.log"/>
+        <param name="Append" value="true"/>
+        <layout class="org.apache.log4j.PatternLayout">
+            <param name="ConversionPattern" value="%d %x %m%n"/>
+            <param name="maxFileSize" value="100MB" />
+            <param name="maxBackupIndex" value="20" />
+        </layout>
+    </appender>
+
+    <appender name="METRICS" class="org.apache.log4j.RollingFileAppender">
+        <param name="File" value="${atlas.log.dir}/metric.log"/>
+        <param name="Append" value="true"/>
+        <layout class="org.apache.log4j.PatternLayout">
+            <param name="ConversionPattern" value="%d %x %m%n"/>
+            <param name="maxFileSize" value="100MB" />
+        </layout>
+    </appender>
+
+    <appender name="FAILED" class="org.apache.log4j.RollingFileAppender">
+        <param name="File" value="${atlas.log.dir}/failed.log"/>
+        <param name="Append" value="true"/>
+        <layout class="org.apache.log4j.PatternLayout">
+            <param name="ConversionPattern" value="%d %m"/>
+            <param name="maxFileSize" value="100MB" />
+            <param name="maxBackupIndex" value="20" />
+        </layout>
+    </appender>
+
+    <logger name="org.apache.atlas" additivity="false">
+        <level value="info"/>
+        <appender-ref ref="FILE"/>
+    </logger>
+
+    <logger name="org.apache.atlas.impala.ImpalaLineageTool" additivity="false">
+        <level value="debug"/>
+        <appender-ref ref="FILE"/>
+    </logger>
+
+    <logger name="org.apache.atlas.impala.hook.ImpalaLineageHook" additivity="false">
+        <level value="debug"/>
+        <appender-ref ref="FILE"/>
+    </logger>
+
+    <logger name="org.janusgraph" additivity="false">
+        <level value="warn"/>
+        <appender-ref ref="FILE"/>
+    </logger>
+
+    <logger name="org.springframework" additivity="false">
+        <level value="warn"/>
+        <appender-ref ref="console"/>
+    </logger>
+
+    <logger name="org.eclipse" additivity="false">
+        <level value="warn"/>
+        <appender-ref ref="console"/>
+    </logger>
+
+    <logger name="com.sun.jersey" additivity="false">
+        <level value="warn"/>
+        <appender-ref ref="console"/>
+    </logger>
+
+    <!-- to avoid logs - The configuration log.flush.interval.messages = 1 was supplied but isn't a known config -->
+    <logger name="org.apache.kafka.common.config.AbstractConfig" additivity="false">
+        <level value="error"/>
+        <appender-ref ref="FILE"/>
+    </logger>
+
+    <logger name="AUDIT" additivity="false">
+        <level value="info"/>
+        <appender-ref ref="AUDIT"/>
+    </logger>
+
+    <logger name="METRICS" additivity="false">
+        <level value="debug"/>
+        <appender-ref ref="METRICS"/>
+    </logger>
+
+    <logger name="FAILED" additivity="false">
+        <level value="info"/>
+        <appender-ref ref="AUDIT"/>
+    </logger>
+
+    <root>
+        <priority value="warn"/>
+        <appender-ref ref="FILE"/>
+    </root>
+
+</log4j:configuration>
diff --git a/addons/impala-bridge/src/test/resources/hive-site.xml b/addons/impala-bridge/src/test/resources/hive-site.xml
new file mode 100644
index 0000000..edd0c54
--- /dev/null
+++ b/addons/impala-bridge/src/test/resources/hive-site.xml
@@ -0,0 +1,94 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+       http://www.apache.org/licenses/LICENSE-2.0
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<configuration>
+    <property>
+        <name>hive.exec.submit.local.task.via.child</name>
+        <value>false</value>
+    </property>
+
+    <property>
+        <name>mapreduce.framework.name</name>
+        <value>local</value>
+    </property>
+
+    <property>
+        <name>fs.default.name</name>
+        <value>file:///</value>
+    </property>
+
+    <property>
+        <name>hive.metastore.event.listeners</name>
+        <value>org.apache.atlas.hive.hook.HiveMetastoreHookImpl</value>
+    </property>
+
+    <property>
+        <name>hive.support.concurrency</name>
+        <value>false</value>
+    </property>
+
+    <property>
+        <name>hive.metastore.warehouse.dir</name>
+        <value>${project.basedir}/target/metastore</value>
+    </property>
+
+    <property>
+        <name>javax.jdo.option.ConnectionURL</name>
+        <value>jdbc:derby:;databaseName=${project.basedir}/target/metastore_db;create=true</value>
+    </property>
+
+    <property>
+        <name>atlas.hook.hive.synchronous</name>
+        <value>true</value>
+    </property>
+
+    <property>
+        <name>fs.pfile.impl</name>
+        <value>org.apache.hadoop.fs.ProxyLocalFileSystem</value>
+    </property>
+
+    <property>
+        <name>hive.in.test</name>
+        <value>true</value>
+    </property>
+
+    <property>
+        <name>hive.zookeeper.quorum</name>
+        <value>localhost:19026</value>
+    </property>
+
+    <property>
+        <name>hive.metastore.schema.verification</name>
+        <value>false</value>
+    </property>
+
+    <property>
+        <name>hive.metastore.disallow.incompatible.col.type.changes</name>
+        <value>false</value>
+    </property>
+
+    <property>
+        <name>datanucleus.schema.autoCreateAll</name>
+        <value>true</value>
+    </property>
+
+    <property>
+        <name>hive.exec.scratchdir</name>
+        <value>${project.basedir}/target/scratchdir</value>
+    </property>
+
+</configuration>
\ No newline at end of file
diff --git a/addons/impala-bridge/src/test/resources/impala1.json b/addons/impala-bridge/src/test/resources/impala1.json
new file mode 100644
index 0000000..8f747f6
--- /dev/null
+++ b/addons/impala-bridge/src/test/resources/impala1.json
@@ -0,0 +1,84 @@
+{
+  "queryText": "INSERT INTO TABLE db_wlwspnwgfp.tbl_wlwspnwgfp_7 VALUES (1, 'foo', 'foo', 'foo', 'foo', 1), (2, 'foo', 'foo', 'foo', 'foo', 2)",
+  "queryId": "4c4cb5dc22194e20:c440f5fe00000000",
+  "hash": "d936d531989bd0f46d636bd05fc2540c",
+  "user": "impala@GCE.ABCDEFGH.COM",
+  "timestamp": 1553528501,
+  "endTime": 1553528505,
+  "edges": [
+    {
+      "sources": [],
+      "targets": [
+        0
+      ],
+      "edgeType": "PROJECTION"
+    },
+    {
+      "sources": [],
+      "targets": [
+        1
+      ],
+      "edgeType": "PROJECTION"
+    },
+    {
+      "sources": [],
+      "targets": [
+        2
+      ],
+      "edgeType": "PROJECTION"
+    },
+    {
+      "sources": [],
+      "targets": [
+        3
+      ],
+      "edgeType": "PROJECTION"
+    },
+    {
+      "sources": [],
+      "targets": [
+        4
+      ],
+      "edgeType": "PROJECTION"
+    },
+    {
+      "sources": [],
+      "targets": [
+        5
+      ],
+      "edgeType": "PROJECTION"
+    }
+  ],
+  "vertices": [
+    {
+      "id": 0,
+      "vertexType": "COLUMN",
+      "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_7.col_tbl_wlwspnwgfp_7_id"
+    },
+    {
+      "id": 1,
+      "vertexType": "COLUMN",
+      "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_7.col_tbl_wlwspnwgfp_7_name"
+    },
+    {
+      "id": 2,
+      "vertexType": "COLUMN",
+      "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_7.col_tbl_wlwspnwgfp_7_street"
+    },
+    {
+      "id": 3,
+      "vertexType": "COLUMN",
+      "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_7.col_tbl_wlwspnwgfp_7_city"
+    },
+    {
+      "id": 4,
+      "vertexType": "COLUMN",
+      "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_7.col_tbl_wlwspnwgfp_7_state"
+    },
+    {
+      "id": 5,
+      "vertexType": "COLUMN",
+      "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_7.col_tbl_wlwspnwgfp_7_zipcode"
+    }
+  ]
+}
\ No newline at end of file
diff --git a/addons/impala-bridge/src/test/resources/impala2.json b/addons/impala-bridge/src/test/resources/impala2.json
new file mode 100644
index 0000000..239797b
--- /dev/null
+++ b/addons/impala-bridge/src/test/resources/impala2.json
@@ -0,0 +1,306 @@
+{
+  "queryText": "with a AS (SELECT * FROM db_wlwspnwgfp.tbl_wlwspnwgfp_1), b AS (SELECT * FROM db_wlwspnwgfp.tbl_wlwspnwgfp_2), c AS (SELECT * FROM db_wlwspnwgfp.tbl_wlwspnwgfp_3), d AS (SELECT * FROM db_wlwspnwgfp.tbl_wlwspnwgfp_4), e AS (SELECT * FROM db_wlwspnwgfp.tbl_wlwspnwgfp_5), f AS (SELECT * FROM db_wlwspnwgfp.tbl_wlwspnwgfp_6) INSERT INTO table db_wlwspnwgfp.tbl_wlwspnwgfp_7 SELECT * FROM a UNION SELECT * FROM b UNION SELECT * FROM c UNION SELECT * FROM d UNION SELECT * FROM e U [...]
+  "queryId": "b9423a1de88a33c3:997879c000000000",
+  "hash": "a6ff7959c66c23499346eef791c66439",
+  "user": "impala@GCE.ABCDEFGH.COM",
+  "timestamp": 1553528521,
+  "endTime": 1553528525,
+  "edges": [
+    {
+      "sources": [
+        1,
+        2,
+        3,
+        4,
+        5,
+        6
+      ],
+      "targets": [
+        0
+      ],
+      "edgeType": "PROJECTION"
+    },
+    {
+      "sources": [
+        8,
+        9,
+        10,
+        11,
+        12,
+        13
+      ],
+      "targets": [
+        7
+      ],
+      "edgeType": "PROJECTION"
+    },
+    {
+      "sources": [
+        15,
+        16,
+        17,
+        18,
+        19,
+        20
+      ],
+      "targets": [
+        14
+      ],
+      "edgeType": "PROJECTION"
+    },
+    {
+      "sources": [
+        22,
+        23,
+        24,
+        25,
+        26,
+        27
+      ],
+      "targets": [
+        21
+      ],
+      "edgeType": "PROJECTION"
+    },
+    {
+      "sources": [
+        29,
+        30,
+        31,
+        32,
+        33,
+        34
+      ],
+      "targets": [
+        28
+      ],
+      "edgeType": "PROJECTION"
+    },
+    {
+      "sources": [
+        36,
+        37,
+        38,
+        39,
+        40,
+        41
+      ],
+      "targets": [
+        35
+      ],
+      "edgeType": "PROJECTION"
+    }
+  ],
+  "vertices": [
+    {
+      "id": 0,
+      "vertexType": "COLUMN",
+      "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_7.col_tbl_wlwspnwgfp_7_id"
+    },
+    {
+      "id": 1,
+      "vertexType": "COLUMN",
+      "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_1.col_tbl_wlwspnwgfp_1_id"
+    },
+    {
+      "id": 2,
+      "vertexType": "COLUMN",
+      "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_2.col_tbl_wlwspnwgfp_2_id"
+    },
+    {
+      "id": 3,
+      "vertexType": "COLUMN",
+      "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_3.col_tbl_wlwspnwgfp_3_id"
+    },
+    {
+      "id": 4,
+      "vertexType": "COLUMN",
+      "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_4.col_tbl_wlwspnwgfp_4_id"
+    },
+    {
+      "id": 5,
+      "vertexType": "COLUMN",
+      "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_5.col_tbl_wlwspnwgfp_5_id"
+    },
+    {
+      "id": 6,
+      "vertexType": "COLUMN",
+      "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_6.col_tbl_wlwspnwgfp_6_id"
+    },
+    {
+      "id": 7,
+      "vertexType": "COLUMN",
+      "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_7.col_tbl_wlwspnwgfp_7_name"
+    },
+    {
+      "id": 8,
+      "vertexType": "COLUMN",
+      "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_1.col_tbl_wlwspnwgfp_1_name"
+    },
+    {
+      "id": 9,
+      "vertexType": "COLUMN",
+      "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_2.col_tbl_wlwspnwgfp_2_name"
+    },
+    {
+      "id": 10,
+      "vertexType": "COLUMN",
+      "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_3.col_tbl_wlwspnwgfp_3_name"
+    },
+    {
+      "id": 11,
+      "vertexType": "COLUMN",
+      "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_4.col_tbl_wlwspnwgfp_4_name"
+    },
+    {
+      "id": 12,
+      "vertexType": "COLUMN",
+      "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_5.col_tbl_wlwspnwgfp_5_name"
+    },
+    {
+      "id": 13,
+      "vertexType": "COLUMN",
+      "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_6.col_tbl_wlwspnwgfp_6_name"
+    },
+    {
+      "id": 14,
+      "vertexType": "COLUMN",
+      "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_7.col_tbl_wlwspnwgfp_7_street"
+    },
+    {
+      "id": 15,
+      "vertexType": "COLUMN",
+      "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_1.col_tbl_wlwspnwgfp_1_street"
+    },
+    {
+      "id": 16,
+      "vertexType": "COLUMN",
+      "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_2.col_tbl_wlwspnwgfp_2_street"
+    },
+    {
+      "id": 17,
+      "vertexType": "COLUMN",
+      "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_3.col_tbl_wlwspnwgfp_3_street"
+    },
+    {
+      "id": 18,
+      "vertexType": "COLUMN",
+      "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_4.col_tbl_wlwspnwgfp_4_street"
+    },
+    {
+      "id": 19,
+      "vertexType": "COLUMN",
+      "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_5.col_tbl_wlwspnwgfp_5_street"
+    },
+    {
+      "id": 20,
+      "vertexType": "COLUMN",
+      "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_6.col_tbl_wlwspnwgfp_6_street"
+    },
+    {
+      "id": 21,
+      "vertexType": "COLUMN",
+      "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_7.col_tbl_wlwspnwgfp_7_city"
+    },
+    {
+      "id": 22,
+      "vertexType": "COLUMN",
+      "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_1.col_tbl_wlwspnwgfp_1_city"
+    },
+    {
+      "id": 23,
+      "vertexType": "COLUMN",
+      "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_2.col_tbl_wlwspnwgfp_2_city"
+    },
+    {
+      "id": 24,
+      "vertexType": "COLUMN",
+      "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_3.col_tbl_wlwspnwgfp_3_city"
+    },
+    {
+      "id": 25,
+      "vertexType": "COLUMN",
+      "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_4.col_tbl_wlwspnwgfp_4_city"
+    },
+    {
+      "id": 26,
+      "vertexType": "COLUMN",
+      "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_5.col_tbl_wlwspnwgfp_5_city"
+    },
+    {
+      "id": 27,
+      "vertexType": "COLUMN",
+      "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_6.col_tbl_wlwspnwgfp_6_city"
+    },
+    {
+      "id": 28,
+      "vertexType": "COLUMN",
+      "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_7.col_tbl_wlwspnwgfp_7_state"
+    },
+    {
+      "id": 29,
+      "vertexType": "COLUMN",
+      "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_1.col_tbl_wlwspnwgfp_1_state"
+    },
+    {
+      "id": 30,
+      "vertexType": "COLUMN",
+      "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_2.col_tbl_wlwspnwgfp_2_state"
+    },
+    {
+      "id": 31,
+      "vertexType": "COLUMN",
+      "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_3.col_tbl_wlwspnwgfp_3_state"
+    },
+    {
+      "id": 32,
+      "vertexType": "COLUMN",
+      "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_4.col_tbl_wlwspnwgfp_4_state"
+    },
+    {
+      "id": 33,
+      "vertexType": "COLUMN",
+      "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_5.col_tbl_wlwspnwgfp_5_state"
+    },
+    {
+      "id": 34,
+      "vertexType": "COLUMN",
+      "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_6.col_tbl_wlwspnwgfp_6_state"
+    },
+    {
+      "id": 35,
+      "vertexType": "COLUMN",
+      "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_7.col_tbl_wlwspnwgfp_7_zipcode"
+    },
+    {
+      "id": 36,
+      "vertexType": "COLUMN",
+      "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_1.col_tbl_wlwspnwgfp_1_zipcode"
+    },
+    {
+      "id": 37,
+      "vertexType": "COLUMN",
+      "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_2.col_tbl_wlwspnwgfp_2_zipcode"
+    },
+    {
+      "id": 38,
+      "vertexType": "COLUMN",
+      "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_3.col_tbl_wlwspnwgfp_3_zipcode"
+    },
+    {
+      "id": 39,
+      "vertexType": "COLUMN",
+      "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_4.col_tbl_wlwspnwgfp_4_zipcode"
+    },
+    {
+      "id": 40,
+      "vertexType": "COLUMN",
+      "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_5.col_tbl_wlwspnwgfp_5_zipcode"
+    },
+    {
+      "id": 41,
+      "vertexType": "COLUMN",
+      "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_6.col_tbl_wlwspnwgfp_6_zipcode"
+    }
+  ]
+}
\ No newline at end of file
diff --git a/addons/impala-bridge/src/test/resources/impala3.json b/addons/impala-bridge/src/test/resources/impala3.json
new file mode 100644
index 0000000..6a7d171
--- /dev/null
+++ b/addons/impala-bridge/src/test/resources/impala3.json
@@ -0,0 +1,62 @@
+{
+  "queryText":"create view db_1.view_1 as select count, id from db_1.table_1",
+  "queryId":"3a441d0c130962f8:7f634aec00000000",
+  "hash":"64ff0425ccdfaada53e3f2fd76f566f7",
+  "user":"admin",
+  "timestamp":1554750072,
+  "endTime":1554750554,
+  "edges":[
+    {
+      "sources":[
+        1
+      ],
+      "targets":[
+        0
+      ],
+      "edgeType":"PROJECTION"
+    },
+    {
+      "sources":[
+        3
+      ],
+      "targets":[
+        2
+      ],
+      "edgeType":"PROJECTION"
+    }
+  ],
+  "vertices":[
+    {
+      "id":0,
+      "vertexType":"COLUMN",
+      "vertexId":"db_1.view_1.count"
+    },
+    {
+      "id":1,
+      "vertexType":"COLUMN",
+      "vertexId":"db_1.table_1.count"
+    },
+    {
+      "id":2,
+      "vertexType":"COLUMN",
+      "vertexId":"db_1.view_1.id"
+    },
+    {
+      "id":3,
+      "vertexType":"COLUMN",
+      "vertexId":"db_1.table_1.id"
+    },
+    {
+      "id":4,
+      "vertexType":"TABLE",
+      "vertexId":"db_1.table_1",
+      "createTime":1554750070
+    },
+    {
+      "id":5,
+      "vertexType":"TABLE",
+      "vertexId":"db_1.view_1",
+      "createTime":1554750072
+    }
+  ]
+}
\ No newline at end of file
diff --git a/addons/impala-bridge/src/test/resources/users-credentials.properties b/addons/impala-bridge/src/test/resources/users-credentials.properties
new file mode 100644
index 0000000..3fc3bb1
--- /dev/null
+++ b/addons/impala-bridge/src/test/resources/users-credentials.properties
@@ -0,0 +1,3 @@
+#username=group::sha256-password
+admin=ADMIN::8c6976e5b5410415bde908bd4dee15dfb167a9c873fc4bb8a81f6f2ab448a918
+rangertagsync=RANGER_TAG_SYNC::e3f67240f5117d1753c940dae9eea772d36ed5fe9bd9c94a300e40413f1afb9d
diff --git a/addons/models/1000-Hadoop/1090-impala_model.json b/addons/models/1000-Hadoop/1090-impala_model.json
new file mode 100644
index 0000000..dad50ff
--- /dev/null
+++ b/addons/models/1000-Hadoop/1090-impala_model.json
@@ -0,0 +1,229 @@
+{
+  "enumDefs": [],
+  "structDefs": [],
+  "classificationDefs": [],
+  "entityDefs": [
+    {
+      "name": "impala_process",
+      "superTypes": [
+        "Process"
+      ],
+      "serviceType": "impala",
+      "typeVersion": "1.0",
+      "attributeDefs": [
+        {
+          "name": "startTime",
+          "typeName": "date",
+          "cardinality": "SINGLE",
+          "isIndexable": false,
+          "isOptional": false,
+          "isUnique": false
+        },
+        {
+          "name": "endTime",
+          "typeName": "date",
+          "cardinality": "SINGLE",
+          "isIndexable": false,
+          "isOptional": false,
+          "isUnique": false
+        },
+        {
+          "name": "userName",
+          "typeName": "string",
+          "cardinality": "SINGLE",
+          "isIndexable": true,
+          "isOptional": false,
+          "isUnique": false
+        },
+        {
+          "name": "operationType",
+          "typeName": "string",
+          "cardinality": "SINGLE",
+          "isIndexable": true,
+          "isOptional": false,
+          "isUnique": false
+        },
+        {
+          "name": "queryText",
+          "typeName": "string",
+          "cardinality": "SINGLE",
+          "isIndexable": false,
+          "isOptional": false,
+          "isUnique": false
+        },
+        {
+          "name": "queryPlan",
+          "typeName": "string",
+          "cardinality": "SINGLE",
+          "isIndexable": false,
+          "isOptional": false,
+          "isUnique": false
+        },
+        {
+          "name": "queryId",
+          "typeName": "string",
+          "cardinality": "SINGLE",
+          "isIndexable": false,
+          "isOptional": false,
+          "isUnique": false
+        },
+        {
+          "name": "recentQueries",
+          "typeName": "array<string>",
+          "cardinality": "LIST",
+          "isIndexable": false,
+          "isOptional": true,
+          "isUnique": false
+        },
+        {
+          "name": "clusterName",
+          "typeName": "string",
+          "cardinality": "SINGLE",
+          "isIndexable": false,
+          "isOptional": true,
+          "includeInNotification": true,
+          "isUnique": false
+        },
+        {
+          "name": "queryGraph",
+          "typeName": "string",
+          "cardinality": "SINGLE",
+          "isIndexable": false,
+          "isOptional": true,
+          "isUnique": false
+        }
+      ]
+    },
+    {
+      "name" : "impala_column_lineage",
+      "superTypes" : [
+        "Process"
+      ],
+      "serviceType": "impala",
+      "typeVersion" : "1.0",
+      "attributeDefs" : [
+        {
+          "name": "dependencyType",
+          "typeName": "string",
+          "cardinality" : "SINGLE",
+          "isIndexable": false,
+          "isOptional": false,
+          "isUnique": false
+        }
+      ]
+    },
+    {
+      "name" : "impala_process_execution",
+      "superTypes" : [
+        "ProcessExecution"
+      ],
+      "serviceType": "impala",
+      "typeVersion" : "1.0",
+      "attributeDefs" : [
+        {
+          "name": "startTime",
+          "typeName": "date",
+          "cardinality": "SINGLE",
+          "isIndexable": false,
+          "isOptional": false,
+          "isUnique": false
+        },
+        {
+          "name": "endTime",
+          "typeName": "date",
+          "cardinality": "SINGLE",
+          "isIndexable": false,
+          "isOptional": false,
+          "isUnique": false
+        },
+        {
+          "name": "userName",
+          "typeName": "string",
+          "cardinality": "SINGLE",
+          "isIndexable": true,
+          "isOptional": false,
+          "isUnique": false
+        },
+        {
+          "name": "queryText",
+          "typeName": "string",
+          "cardinality": "SINGLE",
+          "isIndexable": false,
+          "isOptional": false,
+          "isUnique": false
+        },
+        {
+          "name": "queryGraph",
+          "typeName": "string",
+          "cardinality": "SINGLE",
+          "isIndexable": false,
+          "isOptional": true,
+          "isUnique": false
+        },
+        {
+          "name": "queryId",
+          "typeName": "string",
+          "cardinality": "SINGLE",
+          "isIndexable": false,
+          "isOptional": false,
+          "isUnique": false
+        },
+        {
+          "name": "queryPlan",
+          "typeName": "string",
+          "cardinality": "SINGLE",
+          "isIndexable": false,
+          "isOptional": false,
+          "isUnique": false
+        },
+        {
+          "name": "hostName",
+          "typeName": "string",
+          "cardinality": "SINGLE",
+          "isIndexable": true,
+          "isOptional": false,
+          "isUnique": false
+        }
+      ]
+    }
+  ],
+  "relationshipDefs": [
+    {
+      "name": "impala_process_column_lineage",
+      "serviceType": "impala",
+      "typeVersion": "1.0",
+      "relationshipCategory": "COMPOSITION",
+      "endDef1": {
+        "type": "impala_column_lineage",
+        "name": "query",
+        "isContainer": false,
+        "cardinality": "SINGLE"
+      },
+      "endDef2": {
+        "type": "impala_process",
+        "name": "columnLineages",
+        "isContainer": true,
+        "cardinality": "SET"
+      },
+      "propagateTags": "NONE"
+    },
+    {
+      "name": "impala_process_process_executions",
+      "serviceType": "impala",
+      "typeVersion": "1.0",
+      "relationshipCategory": "COMPOSITION",
+      "endDef1": {
+        "type": "impala_process",
+        "name": "processExecutions",
+        "cardinality": "SET",
+        "isContainer": true
+      },
+      "endDef2": {
+        "type": "impala_process_execution",
+        "name": "process",
+        "cardinality": "SINGLE"
+      },
+      "propagateTags": "NONE"
+    }
+  ]
+}
diff --git a/pom.xml b/pom.xml
index ae4dfdc..7ad24ea 100644
--- a/pom.xml
+++ b/pom.xml
@@ -784,6 +784,7 @@
         <module>addons/hbase-testing-util</module>
         <module>addons/kafka-bridge</module>
         <module>tools/classification-updater</module>
+        <module>addons/impala-bridge</module>
 
         <module>distro</module>
     </modules>