You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by sa...@apache.org on 2019/05/09 01:38:51 UTC
[atlas] branch master updated: ATLAS-3183: Read Impala lineage
record for creating view and send to Atlas
This is an automated email from the ASF dual-hosted git repository.
sarath pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push:
new 3137bb2 ATLAS-3183: Read Impala lineage record for creating view and send to Atlas
3137bb2 is described below
commit 3137bb2151abcc362008d7449f1e9a80d57b17a7
Author: lina.li <li...@cloudera.com>
AuthorDate: Wed May 8 18:38:10 2019 -0700
ATLAS-3183: Read Impala lineage record for creating view and send to Atlas
Signed-off-by: Sarath Subramanian <ss...@hortonworks.com>
---
addons/impala-bridge/pom.xml | 557 +++++++++++++++++++++
.../org.apache.atlas.impala/ImpalaLineageTool.java | 217 ++++++++
.../hook/AtlasImpalaHookContext.java | 170 +++++++
.../hook/ImpalaIdentifierParser.java | 392 +++++++++++++++
.../hook/ImpalaLineageHook.java | 132 +++++
.../hook/ImpalaOperationParser.java | 39 ++
.../hook/events/BaseImpalaEvent.java | 452 +++++++++++++++++
.../hook/events/CreateImpalaProcess.java | 312 ++++++++++++
.../model/IImpalaLineageHook.java | 28 ++
.../model/ImpalaDataType.java | 32 ++
.../model/ImpalaDependencyType.java | 34 ++
.../org.apache.atlas.impala/model/ImpalaNode.java | 55 ++
.../model/ImpalaOperationType.java | 37 ++
.../org.apache.atlas.impala/model/ImpalaQuery.java | 110 ++++
.../model/ImpalaVertexType.java | 37 ++
.../org.apache.atlas.impala/model/LineageEdge.java | 63 +++
.../model/LineageVertex.java | 74 +++
.../src/main/resources/atlas-log4j.xml | 42 ++
.../src/main/resources/import-impala.sh | 109 ++++
.../apache/atlas/impala/ImpalaLineageITBase.java | 249 +++++++++
.../apache/atlas/impala/ImpalaLineageToolIT.java | 78 +++
.../atlas/impala/hook/ImpalaLineageHookIT.java | 148 ++++++
.../test/resources/atlas-application.properties | 124 +++++
.../src/test/resources/atlas-log4j.xml | 130 +++++
.../impala-bridge/src/test/resources/hive-site.xml | 94 ++++
.../impala-bridge/src/test/resources/impala1.json | 84 ++++
.../impala-bridge/src/test/resources/impala2.json | 306 +++++++++++
.../impala-bridge/src/test/resources/impala3.json | 62 +++
.../test/resources/users-credentials.properties | 3 +
addons/models/1000-Hadoop/1090-impala_model.json | 229 +++++++++
pom.xml | 1 +
31 files changed, 4400 insertions(+)
diff --git a/addons/impala-bridge/pom.xml b/addons/impala-bridge/pom.xml
new file mode 100644
index 0000000..54bc83a
--- /dev/null
+++ b/addons/impala-bridge/pom.xml
@@ -0,0 +1,557 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <artifactId>apache-atlas</artifactId>
+ <groupId>org.apache.atlas</groupId>
+ <version>2.0.0-SNAPSHOT</version>
+ <relativePath>../../</relativePath>
+ </parent>
+ <artifactId>impala-bridge</artifactId>
+ <description>Apache Atlas Impala Bridge Module</description>
+ <name>Apache Atlas Impala Bridge</name>
+ <packaging>jar</packaging>
+
+ <properties>
+ <!-- log4j 2.9 and later are multi-release jars for Java 9. Our Jetty version don't support
+ that. Therefore, we have to use log4j 2.8 in integration test -->
+ <log4j.version>2.8</log4j.version>
+ </properties>
+
+ <dependencies>
+ <!-- Logging -->
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.atlas</groupId>
+ <artifactId>atlas-notification</artifactId>
+ </dependency>
+
+ <!-- to bring up atlas server for integration tests -->
+ <dependency>
+ <groupId>org.apache.atlas</groupId>
+ <artifactId>atlas-client-v2</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.atlas</groupId>
+ <artifactId>hdfs-model</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-core</artifactId>
+ <version>${log4j.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-api</artifactId>
+ <version>${log4j.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-server</artifactId>
+ <version>${jersey.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.atlas</groupId>
+ <artifactId>hive-bridge</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.atlas</groupId>
+ <artifactId>atlas-webapp</artifactId>
+ <type>war</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>javax.servlet</groupId>
+ <artifactId>servlet-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-annotations</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ <version>${jackson.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ <version>${commons-io.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>commons-cli</groupId>
+ <artifactId>commons-cli</artifactId>
+ <version>${commons-cli.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>commons-lang</groupId>
+ <artifactId>commons-lang</artifactId>
+ <version>${commons-lang.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.testng</groupId>
+ <artifactId>testng</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-server</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.atlas</groupId>
+ <artifactId>atlas-graphdb-impls</artifactId>
+ <type>pom</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.atlas</groupId>
+ <artifactId>atlas-intg</artifactId>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.atlas</groupId>
+ <artifactId>atlas-repository</artifactId>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-exec</artifactId>
+ <version>${hive.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>javax.servlet</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-jdbc</artifactId>
+ <version>${hive.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>javax.servlet</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.ws.rs</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-cli</artifactId>
+ <version>${hive.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>javax.servlet</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.eclipse.jetty.aggregate</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
+
+ <profiles>
+ <profile>
+ <id>dist</id>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>copy-hook</id>
+ <phase>package</phase>
+ <goals>
+ <goal>copy</goal>
+ </goals>
+ <configuration>
+ <outputDirectory>${project.build.directory}/dependency/hook/impala/atlas-impala-plugin-impl</outputDirectory>
+ <overWriteReleases>false</overWriteReleases>
+ <overWriteSnapshots>false</overWriteSnapshots>
+ <overWriteIfNewer>true</overWriteIfNewer>
+ <artifactItems>
+ <artifactItem>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>${project.artifactId}</artifactId>
+ <version>${project.version}</version>
+ </artifactItem>
+ <artifactItem>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>atlas-client-common</artifactId>
+ <version>${project.version}</version>
+ </artifactItem>
+ <artifactItem>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>atlas-client-v1</artifactId>
+ <version>${project.version}</version>
+ </artifactItem>
+ <artifactItem>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>atlas-client-v2</artifactId>
+ <version>${project.version}</version>
+ </artifactItem>
+ <artifactItem>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>atlas-intg</artifactId>
+ <version>${project.version}</version>
+ </artifactItem>
+ <artifactItem>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>atlas-notification</artifactId>
+ <version>${project.version}</version>
+ </artifactItem>
+ <artifactItem>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>hdfs-model</artifactId>
+ <version>${project.version}</version>
+ </artifactItem>
+ <artifactItem>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>atlas-common</artifactId>
+ <version>${project.version}</version>
+ </artifactItem>
+ <artifactItem>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_${kafka.scala.binary.version}</artifactId>
+ <version>${kafka.version}</version>
+ </artifactItem>
+ <artifactItem>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>${kafka.version}</version>
+ </artifactItem>
+ <artifactItem>
+ <groupId>com.sun.jersey.contribs</groupId>
+ <artifactId>jersey-multipart</artifactId>
+ <version>${jersey.version}</version>
+ </artifactItem>
+ <artifactItem>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ <version>${jackson.version}</version>
+ </artifactItem>
+ <artifactItem>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ <version>${jackson.version}</version>
+ </artifactItem>
+ <artifactItem>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ <version>${jackson.version}</version>
+ </artifactItem>
+ <artifactItem>
+ <groupId>commons-configuration</groupId>
+ <artifactId>commons-configuration</artifactId>
+ <version>${commons-conf.version}</version>
+ </artifactItem>
+ <artifactItem>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-common</artifactId>
+ <version>${hbase.version}</version>
+ </artifactItem>
+ <artifactItem>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-server</artifactId>
+ <version>${hbase.version}</version>
+ </artifactItem>
+ <artifactItem>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-json</artifactId>
+ <version>${jersey.version}</version>
+ </artifactItem>
+ <artifactItem>
+ <groupId>javax.ws.rs</groupId>
+ <artifactId>jsr311-api</artifactId>
+ <version>${jsr.version}</version>
+ </artifactItem>
+ </artifactItems>
+ </configuration>
+ </execution>
+ <execution>
+ <id>copy-hook-shim</id>
+ <phase>package</phase>
+ <goals>
+ <goal>copy</goal>
+ </goals>
+ <configuration>
+ <outputDirectory>${project.build.directory}/dependency/hook/impala</outputDirectory>
+ <overWriteReleases>false</overWriteReleases>
+ <overWriteSnapshots>false</overWriteSnapshots>
+ <overWriteIfNewer>true</overWriteIfNewer>
+ <artifactItems>
+ <artifactItem>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>impala-bridge-shim</artifactId>
+ <version>${project.version}</version>
+ </artifactItem>
+ <artifactItem>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>atlas-plugin-classloader</artifactId>
+ <version>${project.version}</version>
+ </artifactItem>
+ </artifactItems>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-maven-plugin</artifactId>
+ <configuration>
+ <skip>${skipTests}</skip>
+ <!--only skip int tests -->
+ <httpConnector>
+ <port>31000</port>
+ <idleTimeout>60000</idleTimeout>
+ </httpConnector>
+ <war>../../webapp/target/atlas-webapp-${project.version}.war</war>
+ <daemon>true</daemon>
+ <webAppSourceDirectory>../../webapp/src/test/webapp</webAppSourceDirectory>
+ <webApp>
+ <contextPath>/</contextPath>
+ <descriptor>${project.basedir}/../../webapp/src/test/webapp/WEB-INF/web.xml</descriptor>
+ </webApp>
+ <useTestScope>true</useTestScope>
+ <systemProperties>
+ <force>true</force>
+ <systemProperty>
+ <name>atlas.home</name>
+ <value>${project.build.directory}</value>
+ </systemProperty>
+ <systemProperty>
+ <key>atlas.conf</key>
+ <value>${project.build.directory}/test-classes</value>
+ </systemProperty>
+ <systemProperty>
+ <name>atlas.data</name>
+ <value>${project.build.directory}/data</value>
+ </systemProperty>
+ <systemProperty>
+ <name>atlas.log.dir</name>
+ <value>${project.build.directory}/logs</value>
+ </systemProperty>
+ <systemProperty>
+ <name>atlas.log.file</name>
+ <value>application.log</value>
+ </systemProperty>
+ <systemProperty>
+ <name>log4j.configuration</name>
+ <value>file:///${project.build.directory}/../../../distro/src/conf/atlas-log4j.xml</value>
+ </systemProperty>
+ <systemProperty>
+ <name>atlas.graphdb.backend</name>
+ <value>${graphdb.backend.impl}</value>
+ </systemProperty>
+ <systemProperty>
+ <key>embedded.solr.directory</key>
+ <value>${project.build.directory}</value>
+ </systemProperty>
+ </systemProperties>
+ <stopKey>atlas-stop</stopKey>
+ <stopPort>31001</stopPort>
+ <stopWait>${jetty-maven-plugin.stopWait}</stopWait>
+ <daemon>${debug.jetty.daemon}</daemon>
+ <testClassesDirectory>${project.build.testOutputDirectory}</testClassesDirectory>
+ <useTestClasspath>true</useTestClasspath>
+ </configuration>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-core</artifactId>
+ <version>${log4j.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-api</artifactId>
+ <version>${log4j.version}</version>
+ </dependency>
+ </dependencies>
+ <executions>
+ <execution>
+ <id>start-jetty</id>
+ <phase>pre-integration-test</phase>
+ <goals>
+ <goal>deploy-war</goal>
+ </goals>
+ </execution>
+ <execution>
+ <id>stop-jetty</id>
+ <phase>post-integration-test</phase>
+ <goals>
+ <goal>stop</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-site-plugin</artifactId>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.maven.doxia</groupId>
+ <artifactId>doxia-module-twiki</artifactId>
+ <version>${doxia.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.maven.doxia</groupId>
+ <artifactId>doxia-core</artifactId>
+ <version>${doxia.version}</version>
+ </dependency>
+ </dependencies>
+ <executions>
+ <execution>
+ <goals>
+ <goal>site</goal>
+ </goals>
+ <phase>prepare-package</phase>
+ </execution>
+ </executions>
+ <configuration>
+ <generateProjectInfo>false</generateProjectInfo>
+ <generateReports>false</generateReports>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>exec-maven-plugin</artifactId>
+ <version>1.2.1</version>
+ <inherited>false</inherited>
+ <executions>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-resources-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>copy-resources</id>
+ <phase>validate</phase>
+ <goals>
+ <goal>copy-resources</goal>
+ </goals>
+ <configuration>
+ <outputDirectory>${basedir}/target/models</outputDirectory>
+ <resources>
+ <resource>
+ <directory>${basedir}/../models</directory>
+ <includes>
+ <include>0000-Area0/0010-base_model.json</include>
+ <include>1000-Hadoop/**</include>
+ </includes>
+ </resource>
+ </resources>
+ </configuration>
+ </execution>
+ <execution>
+ <id>copy-solr-resources</id>
+ <phase>validate</phase>
+ <goals>
+ <goal>copy-resources</goal>
+ </goals>
+ <configuration>
+ <outputDirectory>${project.build.directory}/solr</outputDirectory>
+ <resources>
+ <resource>
+ <directory>${basedir}/../../test-tools/src/main/resources/solr</directory>
+ </resource>
+ </resources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ </plugins>
+ </build>
+</project>
diff --git a/addons/impala-bridge/src/main/java/org.apache.atlas.impala/ImpalaLineageTool.java b/addons/impala-bridge/src/main/java/org.apache.atlas.impala/ImpalaLineageTool.java
new file mode 100644
index 0000000..7c9abc8
--- /dev/null
+++ b/addons/impala-bridge/src/main/java/org.apache.atlas.impala/ImpalaLineageTool.java
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.impala;
+
+import java.lang.Runnable;
+import org.apache.atlas.impala.hook.ImpalaLineageHook;
+
+import java.io.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOCase;
+import org.apache.commons.io.comparator.LastModifiedFileComparator;
+import org.apache.commons.io.filefilter.PrefixFileFilter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Entry point of actual implementation of Impala lineage tool. It reads the lineage records in
+ * lineage log. It then calls instance of ImpalaLineageHook to convert lineage records to
+ * lineage notifications and send them to Atlas.
+ */
+public class ImpalaLineageTool {
+ private static final Logger LOG = LoggerFactory.getLogger(ImpalaLineageTool.class);
+ private static final String WAL_FILE_EXTENSION = ".wal";
+ private static final String WAL_FILE_PREFIX = "WAL";
+ private String directoryName;
+ private String prefix;
+
+ public ImpalaLineageTool(String[] args) {
+ try {
+ Options options = new Options();
+ options.addOption("d", "directory", true, "the lineage files' folder");
+ options.addOption("p", "prefix", true, "the prefix of the lineage files");
+
+ CommandLine cmd = new DefaultParser().parse(options, args);
+ directoryName = cmd.getOptionValue("d");
+ prefix = cmd.getOptionValue("p");
+ } catch(ParseException e) {
+ LOG.warn("Failed to parse command arguments. Error: ", e.getMessage());
+ printUsage();
+
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void run() {
+ ImpalaLineageHook impalaLineageHook = new ImpalaLineageHook();
+
+ File[] currentFiles = getCurrentFiles();
+ int fileNum = currentFiles.length;
+
+ for(int i = 0; i < fileNum; i++) {
+ String filename = currentFiles[i].getAbsolutePath();
+ String walFilename = directoryName + WAL_FILE_PREFIX + currentFiles[i].getName() + WAL_FILE_EXTENSION;
+
+ LOG.info("Importing: {}", filename);
+ importHImpalaEntities(impalaLineageHook, filename, walFilename);
+
+ if(i != fileNum - 1) {
+ deleteLineageAndWal(currentFiles[i], walFilename);
+ }
+ }
+ LOG.info("Impala bridge processing: Done! ");
+ }
+
+ public static void main(String[] args) {
+ if (args != null && args.length != 4) {
+ // The lineage file location and prefix should be input as the parameters
+ System.out.println("Impala bridge: wrong number of arguments. Please try again");
+ printUsage();
+ return;
+ }
+
+ ImpalaLineageTool instance = new ImpalaLineageTool(args);
+ instance.run();
+ }
+
+ /**
+ * Delete the used lineage file and wal file
+ * @param currentFile The current file
+ * @param wal The wal file
+ */
+ public static void deleteLineageAndWal(File currentFile, String wal) {
+ if(currentFile.exists() && currentFile.delete()) {
+ LOG.info("Lineage file {} is deleted successfully", currentFile.getPath());
+ } else {
+ LOG.info("Failed to delete the lineage file {}", currentFile.getPath());
+ }
+
+ File file = new File(wal);
+
+ if(file.exists() && file.delete()) {
+ LOG.info("Wal file {} deleted successfully", wal);
+ } else {
+ LOG.info("Failed to delete the wal file {}", wal);
+ }
+ }
+
+ private static void printUsage() {
+ System.out.println();
+ System.out.println();
+ System.out.println("Usage: import-impala.sh [-d <directory>] [-p <prefix>]" );
+ System.out.println(" Imports specified lineage files by given directory and file prefix.");
+ System.out.println();
+ }
+
+ /**
+ * This function figures out the right lineage file path+name to process sorted by the last
+ * time they are modified. (old -> new)
+ * @return get the lineage files from given directory with given prefix.
+ */
+ public File[] getCurrentFiles() {
+ try {
+ LOG.info("Scanning: " + directoryName);
+ File folder = new File(directoryName);
+ File[] listOfFiles = folder.listFiles((FileFilter) new PrefixFileFilter(prefix, IOCase.SENSITIVE));
+
+ if ((listOfFiles == null) || (listOfFiles.length == 0)) {
+ LOG.info("Found no lineage files.");
+ return new File[0];
+ }
+
+ if(listOfFiles.length > 1) {
+ Arrays.sort(listOfFiles, LastModifiedFileComparator.LASTMODIFIED_COMPARATOR);
+ }
+
+ LOG.info("Found {} lineage files" + listOfFiles.length);
+ return listOfFiles;
+ } catch(Exception e) {
+ LOG.error("Import lineage file failed.", e);
+ }
+ return new File[0];
+ }
+
+ private boolean processImpalaLineageHook(ImpalaLineageHook impalaLineageHook, List<String> lineageList) {
+ boolean allSucceed = true;
+
+ // returns true if successfully sent to Atlas
+ for (String lineageRecord : lineageList) {
+ try {
+ impalaLineageHook.process(lineageRecord);
+ } catch (Exception ex) {
+ String errorMessage = String.format("Exception at query {} \n", lineageRecord);
+ LOG.error(errorMessage, ex);
+
+ allSucceed = false;
+ }
+ }
+
+ return allSucceed;
+ }
+
+ /**
+ * Create a list of lineage queries based on the lineage file and the wal file
+ * @param name
+ * @param walfile
+ * @return
+ */
+ public void importHImpalaEntities(ImpalaLineageHook impalaLineageHook, String name, String walfile) {
+ List<String> lineageList = new ArrayList<>();
+
+ try {
+ File lineageFile = new File(name); //use current file length to minus the offset
+ File walFile = new File(walfile);
+ // if the wal file does not exist, create one with 0 byte read, else, read the number
+ if(!walFile.exists()) {
+ BufferedWriter writer = new BufferedWriter(new FileWriter(walfile));
+ writer.write("0, " + name);
+ writer.close();
+ }
+
+ LOG.debug("Reading: " + name);
+ String lineageRecord = FileUtils.readFileToString(lineageFile, "UTF-8");
+
+ lineageList.add(lineageRecord);
+
+ // call instance of ImpalaLineageHook to process the list of Impala lineage record
+ if(processImpalaLineageHook(impalaLineageHook, lineageList)) {
+ // write how many bytes the current file is to the wal file
+ FileWriter newWalFile = new FileWriter(walfile, true);
+ BufferedWriter newWalFileBuf = new BufferedWriter(newWalFile);
+ newWalFileBuf.newLine();
+ newWalFileBuf.write(String.valueOf(lineageFile.length()) + "," + name);
+
+ newWalFileBuf.close();
+ newWalFile.close();
+ } else {
+ LOG.error("Error sending some of impala lineage records to ImpalaHook");
+ }
+ } catch (Exception e) {
+ LOG.error("Error in processing lineage records. Exception: " + e.getMessage());
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/addons/impala-bridge/src/main/java/org.apache.atlas.impala/hook/AtlasImpalaHookContext.java b/addons/impala-bridge/src/main/java/org.apache.atlas.impala/hook/AtlasImpalaHookContext.java
new file mode 100644
index 0000000..88faace
--- /dev/null
+++ b/addons/impala-bridge/src/main/java/org.apache.atlas.impala/hook/AtlasImpalaHookContext.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.impala.hook;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.atlas.impala.model.ImpalaOperationType;
+import org.apache.atlas.impala.model.ImpalaQuery;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.commons.lang.StringUtils;
+
+/**
+ * Contain the info related to an linear record from Impala
+ */
+public class AtlasImpalaHookContext {
+ public static final char QNAME_SEP_CLUSTER_NAME = '@';
+ public static final char QNAME_SEP_ENTITY_NAME = '.';
+ public static final char QNAME_SEP_PROCESS = ':';
+
+ private final ImpalaLineageHook hook;
+ private final ImpalaOperationType impalaOperation;
+ private final ImpalaQuery lineageQuery;
+ private final Map<String, AtlasEntity> qNameEntityMap = new HashMap<>();
+
+ public AtlasImpalaHookContext(ImpalaLineageHook hook, ImpalaOperationType operationType,
+ ImpalaQuery lineageQuery) throws Exception {
+ this.hook = hook;
+ this.impalaOperation = operationType;
+ this.lineageQuery = lineageQuery;
+
+ }
+
+ public ImpalaQuery getLineageQuery() {
+ return lineageQuery;
+ }
+ public String getQueryStr() { return lineageQuery.getQueryText(); }
+
+ public ImpalaOperationType getImpalaOperationType() {
+ return impalaOperation;
+ }
+
+ public void putEntity(String qualifiedName, AtlasEntity entity) {
+ qNameEntityMap.put(qualifiedName, entity);
+ }
+
+ public AtlasEntity getEntity(String qualifiedName) {
+ return qNameEntityMap.get(qualifiedName);
+ }
+
+ public Collection<AtlasEntity> getEntities() { return qNameEntityMap.values(); }
+
+ public String getClusterName() {
+ return hook.getClusterName();
+ }
+
+ public boolean isConvertHdfsPathToLowerCase() {
+ return hook.isConvertHdfsPathToLowerCase();
+ }
+
+ public String getQualifiedNameForDb(String dbName) {
+ return (dbName + QNAME_SEP_CLUSTER_NAME).toLowerCase() + getClusterName();
+ }
+
+ public String getQualifiedNameForTable(String fullTableName) throws IllegalArgumentException {
+ if (fullTableName == null) {
+ throw new IllegalArgumentException("fullTableName is null");
+ }
+
+ int sepPos = fullTableName.lastIndexOf(QNAME_SEP_ENTITY_NAME);
+
+ if (!isSeparatorIndexValid(sepPos)) {
+ throw new IllegalArgumentException(fullTableName + " does not contain database name");
+ }
+
+ return getQualifiedNameForTable(fullTableName.substring(0, sepPos), fullTableName.substring(sepPos+1));
+ }
+
+ public String getQualifiedNameForTable(String dbName, String tableName) {
+ return (dbName + QNAME_SEP_ENTITY_NAME + tableName + QNAME_SEP_CLUSTER_NAME).toLowerCase() +
+ getClusterName();
+ }
+
+ public String getQualifiedNameForColumn(String fullColumnName) throws IllegalArgumentException {
+ if (fullColumnName == null) {
+ throw new IllegalArgumentException("fullColumnName is null");
+ }
+
+ int sepPosFirst = fullColumnName.indexOf(QNAME_SEP_ENTITY_NAME);
+ int sepPosLast = fullColumnName.lastIndexOf(QNAME_SEP_ENTITY_NAME);
+
+ if (!isSeparatorIndexValid(sepPosFirst) || !isSeparatorIndexValid(sepPosLast) ||
+ sepPosFirst == sepPosLast) {
+ throw new IllegalArgumentException(
+ String.format("fullColumnName {} does not contain database name or table name",
+ fullColumnName));
+ }
+
+ return getQualifiedNameForColumn(
+ fullColumnName.substring(0, sepPosFirst),
+ fullColumnName.substring(sepPosFirst+1, sepPosLast),
+ fullColumnName.substring(sepPosLast+1));
+ }
+
+ public String getColumnNameOnly(String fullColumnName) throws IllegalArgumentException {
+ if (fullColumnName == null) {
+ throw new IllegalArgumentException("fullColumnName is null");
+ }
+
+ int sepPosLast = fullColumnName.lastIndexOf(QNAME_SEP_ENTITY_NAME);
+
+ if (!isSeparatorIndexValid(sepPosLast)) {
+ return fullColumnName;
+ }
+
+ return fullColumnName.substring(sepPosLast+1);
+ }
+
+ public String getQualifiedNameForColumn(String dbName, String tableName, String columnName) {
+ return
+ (dbName + QNAME_SEP_ENTITY_NAME + tableName + QNAME_SEP_ENTITY_NAME +
+ columnName + QNAME_SEP_CLUSTER_NAME).toLowerCase() + getClusterName();
+ }
+
+ public String getUserName() { return lineageQuery.getUser(); }
+
+ public String getDatabaseNameFromTable(String fullTableName) {
+ int sepPos = fullTableName.lastIndexOf(QNAME_SEP_ENTITY_NAME);
+ if (isSeparatorIndexValid(sepPos)) {
+ return fullTableName.substring(0, sepPos);
+ }
+
+ return null;
+ }
+
+ public String getTableNameFromColumn(String columnName) {
+ int sepPos = columnName.lastIndexOf(QNAME_SEP_ENTITY_NAME);
+ if (!isSeparatorIndexValid(sepPos)) {
+ return null;
+ }
+
+ String tableName = columnName.substring(0, sepPos);
+ if (!ImpalaIdentifierParser.isTableNameValid(tableName)) {
+ return null;
+ }
+
+ return tableName;
+ }
+
+ public boolean isSeparatorIndexValid(int index) {
+ return index > 0;
+ }
+
+}
diff --git a/addons/impala-bridge/src/main/java/org.apache.atlas.impala/hook/ImpalaIdentifierParser.java b/addons/impala-bridge/src/main/java/org.apache.atlas.impala/hook/ImpalaIdentifierParser.java
new file mode 100644
index 0000000..b9d6cbb
--- /dev/null
+++ b/addons/impala-bridge/src/main/java/org.apache.atlas.impala/hook/ImpalaIdentifierParser.java
@@ -0,0 +1,392 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.impala.hook;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+import org.apache.commons.lang.StringUtils;
+
+/**
+ * Check if a string is a valid Impala table identifier.
+ * It could be <dbName>.<tableName> or <tableName>
+ */
+public class ImpalaIdentifierParser {
+ // http://www.cloudera.com/content/www/en-us/documentation/enterprise/latest/topics/impala_identifiers.html
+ // https://github.com/apache/impala/blob/64e6719870db5602a6fa85014bc6c264080b9414/tests/common/patterns.py
+ // VALID_IMPALA_IDENTIFIER_REGEX = re.compile(r'^[a-zA-Z][a-zA-Z0-9_]{,127}$')
+ // add "." to allow <dbName>.<tableName>
+ public static final String VALID_IMPALA_IDENTIFIER_REGEX = "^[a-zA-Z][a-zA-Z0-9_.]{0,127}$";
+
+ public static boolean isTableNameValid(String inTableName) {
+ if (StringUtils.isEmpty(inTableName)) {
+ return false;
+ }
+
+ if (!inTableName.matches(VALID_IMPALA_IDENTIFIER_REGEX)) {
+ return false;
+ }
+
+ String[] tokens = inTableName.split(".");
+ if (tokens.length > 2) {
+ // valid value should be <dbName>.<tableName> or <tableName>
+ return false;
+ }
+
+ for (String token : tokens) {
+ if (isReserved(token)) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ // The following is extracted from Impala code.
+ // Mainly from https://github.com/apache/impala/blob/master/fe/src/main/jflex/sql-scanner.flex
+ // Map from keyword string to token id.
+ // We use a linked hash map because the insertion order is important.
+ // for example, we want "and" to come after "&&" to make sure error reporting
+ // uses "and" as a display name and not "&&".
+ // Please keep the puts sorted alphabetically by keyword (where the order
+ // does not affect the desired error reporting)
+ static HashSet<String> keywordMap;
+ // map from token id to token description
+ static HashSet<String> tokenIdMap;
+ // Reserved words are words that cannot be used as identifiers. It is a superset of
+ // keywords.
+ static Set<String> reservedWords;
+
+
+ public static void init() {
+ // initilize keywords
+ keywordMap = new HashSet<>();
+ keywordMap.add("&&");
+ keywordMap.add("add");
+ keywordMap.add("aggregate");
+ keywordMap.add("all");
+ keywordMap.add("alter");
+ keywordMap.add("analytic");
+ keywordMap.add("and");
+ keywordMap.add("anti");
+ keywordMap.add("api_version");
+ keywordMap.add("array");
+ keywordMap.add("as");
+ keywordMap.add("asc");
+ keywordMap.add("authorization");
+ keywordMap.add("avro");
+ keywordMap.add("between");
+ keywordMap.add("bigint");
+ keywordMap.add("binary");
+ keywordMap.add("block_size");
+ keywordMap.add("boolean");
+ keywordMap.add("by");
+ keywordMap.add("cached");
+ keywordMap.add("case");
+ keywordMap.add("cascade");
+ keywordMap.add("cast");
+ keywordMap.add("change");
+ keywordMap.add("char");
+ keywordMap.add("class");
+ keywordMap.add("close_fn");
+ keywordMap.add("column");
+ keywordMap.add("columns");
+ keywordMap.add("comment");
+ keywordMap.add("compression");
+ keywordMap.add("compute");
+ keywordMap.add("copy");
+ keywordMap.add("create");
+ keywordMap.add("cross");
+ keywordMap.add("current");
+ keywordMap.add("data");
+ keywordMap.add("database");
+ keywordMap.add("databases");
+ keywordMap.add("date");
+ keywordMap.add("datetime");
+ keywordMap.add("decimal");
+ //keywordMap.add("default"); "default" can be database or table name
+ keywordMap.add("delete");
+ keywordMap.add("delimited");
+ keywordMap.add("desc");
+ keywordMap.add("describe");
+ keywordMap.add("distinct");
+ keywordMap.add("div");
+ keywordMap.add("double");
+ keywordMap.add("drop");
+ keywordMap.add("else");
+ keywordMap.add("encoding");
+ keywordMap.add("end");
+ keywordMap.add("escaped");
+ keywordMap.add("exists");
+ keywordMap.add("explain");
+ keywordMap.add("extended");
+ keywordMap.add("external");
+ keywordMap.add("false");
+ keywordMap.add("fields");
+ keywordMap.add("fileformat");
+ keywordMap.add("files");
+ keywordMap.add("finalize_fn");
+ keywordMap.add("first");
+ keywordMap.add("float");
+ keywordMap.add("following");
+ keywordMap.add("for");
+ keywordMap.add("format");
+ keywordMap.add("formatted");
+ keywordMap.add("from");
+ keywordMap.add("full");
+ keywordMap.add("function");
+ keywordMap.add("functions");
+ keywordMap.add("grant");
+ keywordMap.add("group");
+ keywordMap.add("hash");
+ keywordMap.add("having");
+ keywordMap.add("if");
+ keywordMap.add("ilike");
+ keywordMap.add("ignore");
+ keywordMap.add("in");
+ keywordMap.add("incremental");
+ keywordMap.add("init_fn");
+ keywordMap.add("inner");
+ keywordMap.add("inpath");
+ keywordMap.add("insert");
+ keywordMap.add("int");
+ keywordMap.add("integer");
+ keywordMap.add("intermediate");
+ keywordMap.add("interval");
+ keywordMap.add("into");
+ keywordMap.add("invalidate");
+ keywordMap.add("iregexp");
+ keywordMap.add("is");
+ keywordMap.add("join");
+ keywordMap.add("kudu");
+ keywordMap.add("last");
+ keywordMap.add("left");
+ keywordMap.add("like");
+ keywordMap.add("limit");
+ keywordMap.add("lines");
+ keywordMap.add("load");
+ keywordMap.add("location");
+ keywordMap.add("map");
+ keywordMap.add("merge_fn");
+ keywordMap.add("metadata");
+ keywordMap.add("not");
+ keywordMap.add("null");
+ keywordMap.add("nulls");
+ keywordMap.add("offset");
+ keywordMap.add("on");
+ keywordMap.add("||");
+ keywordMap.add("or");
+ keywordMap.add("orc");
+ keywordMap.add("order");
+ keywordMap.add("outer");
+ keywordMap.add("over");
+ keywordMap.add("overwrite");
+ keywordMap.add("parquet");
+ keywordMap.add("parquetfile");
+ keywordMap.add("partition");
+ keywordMap.add("partitioned");
+ keywordMap.add("partitions");
+ keywordMap.add("preceding");
+ keywordMap.add("prepare_fn");
+ keywordMap.add("primary");
+ keywordMap.add("produced");
+ keywordMap.add("purge");
+ keywordMap.add("range");
+ keywordMap.add("rcfile");
+ keywordMap.add("real");
+ keywordMap.add("recover");
+ keywordMap.add("refresh");
+ keywordMap.add("regexp");
+ keywordMap.add("rename");
+ keywordMap.add("repeatable");
+ keywordMap.add("replace");
+ keywordMap.add("replication");
+ keywordMap.add("restrict");
+ keywordMap.add("returns");
+ keywordMap.add("revoke");
+ keywordMap.add("right");
+ keywordMap.add("rlike");
+ keywordMap.add("role");
+ keywordMap.add("roles");
+ keywordMap.add("row");
+ keywordMap.add("rows");
+ keywordMap.add("schema");
+ keywordMap.add("schemas");
+ keywordMap.add("select");
+ keywordMap.add("semi");
+ keywordMap.add("sequencefile");
+ keywordMap.add("serdeproperties");
+ keywordMap.add("serialize_fn");
+ keywordMap.add("set");
+ keywordMap.add("show");
+ keywordMap.add("smallint");
+ keywordMap.add("sort");
+ keywordMap.add("stats");
+ keywordMap.add("stored");
+ keywordMap.add("straight_join");
+ keywordMap.add("string");
+ keywordMap.add("struct");
+ keywordMap.add("symbol");
+ keywordMap.add("table");
+ keywordMap.add("tables");
+ keywordMap.add("tablesample");
+ keywordMap.add("tblproperties");
+ keywordMap.add("terminated");
+ keywordMap.add("textfile");
+ keywordMap.add("then");
+ keywordMap.add("timestamp");
+ keywordMap.add("tinyint");
+ keywordMap.add("to");
+ keywordMap.add("true");
+ keywordMap.add("truncate");
+ keywordMap.add("unbounded");
+ keywordMap.add("uncached");
+ keywordMap.add("union");
+ keywordMap.add("unknown");
+ keywordMap.add("update");
+ keywordMap.add("update_fn");
+ keywordMap.add("upsert");
+ keywordMap.add("use");
+ keywordMap.add("using");
+ keywordMap.add("values");
+ keywordMap.add("varchar");
+ keywordMap.add("view");
+ keywordMap.add("when");
+ keywordMap.add("where");
+ keywordMap.add("with");
+
+ // Initilize tokenIdMap for error reporting
+ tokenIdMap = new HashSet<>(keywordMap);
+
+ // add non-keyword tokens. Please keep this in the same order as they are used in this
+ // file.
+ tokenIdMap.add("EOF");
+ tokenIdMap.add("...");
+ tokenIdMap.add(":");
+ tokenIdMap.add(";");
+ tokenIdMap.add("COMMA");
+ tokenIdMap.add(".");
+ tokenIdMap.add("*");
+ tokenIdMap.add("(");
+ tokenIdMap.add(")");
+ tokenIdMap.add("[");
+ tokenIdMap.add("]");
+ tokenIdMap.add("/");
+ tokenIdMap.add("%");
+ tokenIdMap.add("+");
+ tokenIdMap.add("-");
+ tokenIdMap.add("&");
+ tokenIdMap.add("|");
+ tokenIdMap.add("^");
+ tokenIdMap.add("~");
+ tokenIdMap.add("=");
+ tokenIdMap.add("!");
+ tokenIdMap.add("<");
+ tokenIdMap.add(">");
+ tokenIdMap.add("UNMATCHED STRING LITERAL");
+ tokenIdMap.add("!=");
+ tokenIdMap.add("INTEGER LITERAL");
+ tokenIdMap.add("NUMERIC OVERFLOW");
+ tokenIdMap.add("DECIMAL LITERAL");
+ tokenIdMap.add("EMPTY IDENTIFIER");
+ tokenIdMap.add("IDENTIFIER");
+ tokenIdMap.add("STRING LITERAL");
+ tokenIdMap.add("COMMENTED_PLAN_HINT_START");
+ tokenIdMap.add("COMMENTED_PLAN_HINT_END");
+ tokenIdMap.add("Unexpected character");
+
+
+ // For impala 3.0, reserved words = keywords + sql16ReservedWords - builtinFunctions
+ // - whitelist
+ // unused reserved words = reserved words - keywords. These words are reserved for
+ // forward compatibility purposes.
+ reservedWords = new HashSet<>(keywordMap);
+ // Add SQL:2016 reserved words
+ reservedWords.addAll(Arrays.asList(new String[] {
+ "abs", "acos", "allocate", "any", "are", "array_agg", "array_max_cardinality",
+ "asensitive", "asin", "asymmetric", "at", "atan", "atomic", "avg", "begin",
+ "begin_frame", "begin_partition", "blob", "both", "call", "called", "cardinality",
+ "cascaded", "ceil", "ceiling", "char_length", "character", "character_length",
+ "check", "classifier", "clob", "close", "coalesce", "collate", "collect",
+ "commit", "condition", "connect", "constraint", "contains", "convert", "copy",
+ "corr", "corresponding", "cos", "cosh", "count", "covar_pop", "covar_samp",
+ "cube", "cume_dist", "current_catalog", "current_date",
+ "current_default_transform_group", "current_path", "current_path", "current_role",
+ "current_role", "current_row", "current_schema", "current_time",
+ "current_timestamp", "current_transform_group_for_type", "current_user", "cursor",
+ "cycle", "day", "deallocate", "dec", "decfloat", "declare", "define",
+ "dense_rank", "deref", "deterministic", "disconnect", "dynamic", "each",
+ "element", "empty", "end-exec", "end_frame", "end_partition", "equals", "escape",
+ "every", "except", "exec", "execute", "exp", "extract", "fetch", "filter",
+ "first_value", "floor", "foreign", "frame_row", "free", "fusion", "get", "global",
+ "grouping", "groups", "hold", "hour", "identity", "indicator", "initial", "inout",
+ "insensitive", "integer", "intersect", "intersection", "json_array",
+ "json_arrayagg", "json_exists", "json_object", "json_objectagg", "json_query",
+ "json_table", "json_table_primitive", "json_value", "lag", "language", "large",
+ "last_value", "lateral", "lead", "leading", "like_regex", "listagg", "ln",
+ "local", "localtime", "localtimestamp", "log", "log10 ", "lower", "match",
+ "match_number", "match_recognize", "matches", "max", "member", "merge", "method",
+ "min", "minute", "mod", "modifies", "module", "month", "multiset", "national",
+ "natural", "nchar", "nclob", "new", "no", "none", "normalize", "nth_value",
+ "ntile", "nullif", "numeric", "occurrences_regex", "octet_length", "of", "old",
+ "omit", "one", "only", "open", "out", "overlaps", "overlay", "parameter",
+ "pattern", "per", "percent", "percent_rank", "percentile_cont", "percentile_disc",
+ "period", "portion", "position", "position_regex", "power", "precedes",
+ "precision", "prepare", "procedure", "ptf", "rank", "reads", "real", "recursive",
+ "ref", "references", "referencing", "regr_avgx", "regr_avgy", "regr_count",
+ "regr_intercept", "regr_r2", "regr_slope", "regr_sxx", "regr_sxy", "regr_syy",
+ "release", "result", "return", "rollback", "rollup", "row_number", "running",
+ "savepoint", "scope", "scroll", "search", "second", "seek", "sensitive",
+ "session_user", "similar", "sin", "sinh", "skip", "some", "specific",
+ "specifictype", "sql", "sqlexception", "sqlstate", "sqlwarning", "sqrt", "start",
+ "static", "stddev_pop", "stddev_samp", "submultiset", "subset", "substring",
+ "substring_regex", "succeeds", "sum", "symmetric", "system", "system_time",
+ "system_user", "tan", "tanh", "time", "timezone_hour", "timezone_minute",
+ "trailing", "translate", "translate_regex", "translation", "treat", "trigger",
+ "trim", "trim_array", "uescape", "unique", "unknown", "unnest", "update ",
+ "upper", "user", "value", "value_of", "var_pop", "var_samp", "varbinary",
+ "varying", "versioning", "whenever", "width_bucket", "window", "within",
+ "without", "year"}));
+ // TODO: Remove impala builtin function names. Need to find content of
+ // BuiltinsDb.getInstance().getAllFunctions()
+ //reservedWords.removeAll(BuiltinsDb.getInstance().getAllFunctions().keySet());
+
+ // Remove whitelist words. These words might be heavily used in production, and
+ // impala is unlikely to implement SQL features around these words in the near future.
+ reservedWords.removeAll(Arrays.asList(new String[] {
+ // time units
+ "year", "month", "day", "hour", "minute", "second",
+ "begin", "call", "check", "classifier", "close", "identity", "language",
+ "localtime", "member", "module", "new", "nullif", "old", "open", "parameter",
+ "period", "result", "return", "sql", "start", "system", "time", "user", "value"
+ }));
+ }
+
+ static {
+ init();
+ }
+
+ static boolean isReserved(String token) {
+ return token != null && reservedWords.contains(token.toLowerCase());
+ }
+}
diff --git a/addons/impala-bridge/src/main/java/org.apache.atlas.impala/hook/ImpalaLineageHook.java b/addons/impala-bridge/src/main/java/org.apache.atlas.impala/hook/ImpalaLineageHook.java
new file mode 100644
index 0000000..c77bb38
--- /dev/null
+++ b/addons/impala-bridge/src/main/java/org.apache.atlas.impala/hook/ImpalaLineageHook.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.impala.hook;
+
+import static org.apache.atlas.AtlasConstants.DEFAULT_CLUSTER_NAME;
+
+import com.google.common.collect.Sets;
+import java.io.IOException;
+import org.apache.atlas.hook.AtlasHook;
+import org.apache.atlas.impala.hook.events.BaseImpalaEvent;
+import org.apache.atlas.impala.hook.events.CreateImpalaProcess;
+import org.apache.atlas.impala.model.IImpalaLineageHook;
+import org.apache.atlas.impala.model.ImpalaOperationType;
+import org.apache.atlas.impala.model.ImpalaQuery;
+import org.apache.atlas.type.AtlasType;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import javax.security.auth.Subject;
+import javax.security.auth.kerberos.KerberosPrincipal;
+import java.util.HashSet;
+
+public class ImpalaLineageHook extends AtlasHook implements IImpalaLineageHook {
+ private static final Logger LOG = LoggerFactory.getLogger(ImpalaLineageHook.class);
+ public static final String ATLAS_ENDPOINT = "atlas.rest.address";
+ public static final String REALM_SEPARATOR = "@";
+ public static final String CONF_PREFIX = "atlas.hook.impala.";
+ public static final String CONF_CLUSTER_NAME = "atlas.cluster.name";
+ public static final String CONF_REALM_NAME = "atlas.realm.name";
+ public static final String HDFS_PATH_CONVERT_TO_LOWER_CASE = CONF_PREFIX + "hdfs_path.convert_to_lowercase";
+
+ private ImpalaOperationParser parser = new ImpalaOperationParser();
+ private static final String clusterName;
+ private static final String realm;
+ private static final boolean convertHdfsPathToLowerCase;
+
+ static {
+ clusterName = atlasProperties.getString(CONF_CLUSTER_NAME, DEFAULT_CLUSTER_NAME);
+ realm = atlasProperties.getString(CONF_REALM_NAME, DEFAULT_CLUSTER_NAME); // what should default be ??
+ convertHdfsPathToLowerCase = atlasProperties.getBoolean(HDFS_PATH_CONVERT_TO_LOWER_CASE, false);
+ }
+
+ public ImpalaLineageHook() {
+
+ }
+
+ public void process(String impalaQueryString) throws Exception {
+ ImpalaQuery lineageQuery = AtlasType.fromJson(impalaQueryString, ImpalaQuery.class);
+ process(lineageQuery);
+ }
+
+ public void process(ImpalaQuery lineageQuery) throws Exception {
+ if (StringUtils.isEmpty(lineageQuery.getQueryText())) {
+ LOG.warn("==> ImpalaLineageHook.process skips because the query text is empty <==");
+ return;
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> ImpalaLineageHook.process({})", lineageQuery.getQueryText());
+ }
+
+ try {
+ ImpalaOperationType operationType = parser.getImpalaOperationType(lineageQuery.getQueryText());
+ AtlasImpalaHookContext context =
+ new AtlasImpalaHookContext(this, operationType, lineageQuery);
+ BaseImpalaEvent event = null;
+
+ switch (operationType) {
+ case CREATEVIEW:
+ event = new CreateImpalaProcess(context);
+ break;
+ default:
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("HiveHook.run({}): operation ignored", lineageQuery.getQueryText());
+ }
+ break;
+ }
+
+ if (event != null) {
+ LOG.debug("Processing event: " + lineageQuery.getQueryText());
+
+ final UserGroupInformation ugi = getUgiFromUserName(lineageQuery.getUser());
+
+ super.notifyEntities(event.getNotificationMessages(), ugi);
+ }
+ } catch (Throwable t) {
+
+ LOG.error("ImpalaLineageHook.process(): failed to process query {}",
+ lineageQuery.getQueryText(), t);
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<== ImpalaLineageHook.process({})", lineageQuery.getQueryText());
+ }
+ }
+
+ private UserGroupInformation getUgiFromUserName(String userName) throws IOException {
+ String userPrincipal = userName.contains(REALM_SEPARATOR)? userName : userName + "@" + getRealm();
+ Subject userSubject = new Subject(false, Sets.newHashSet(
+ new KerberosPrincipal(userPrincipal)), new HashSet<Object>(),new HashSet<Object>());
+ return UserGroupInformation.getUGIFromSubject(userSubject);
+ }
+
+ public String getClusterName() {
+ return clusterName;
+ }
+
+ public String getRealm() {
+ return realm;
+ }
+
+ public boolean isConvertHdfsPathToLowerCase() {
+ return convertHdfsPathToLowerCase;
+ }
+}
\ No newline at end of file
diff --git a/addons/impala-bridge/src/main/java/org.apache.atlas.impala/hook/ImpalaOperationParser.java b/addons/impala-bridge/src/main/java/org.apache.atlas.impala/hook/ImpalaOperationParser.java
new file mode 100644
index 0000000..6cb726c
--- /dev/null
+++ b/addons/impala-bridge/src/main/java/org.apache.atlas.impala/hook/ImpalaOperationParser.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.impala.hook;
+
+import org.apache.atlas.impala.model.ImpalaOperationType;
+
+/**
+ * Parse an Impala query text and output the impala operation type
+ */
+public class ImpalaOperationParser {
+
+ public ImpalaOperationParser() {
+ }
+
+ public ImpalaOperationType getImpalaOperationType(String queryText) {
+ // TODO: more Impala commands will be handled in ATLAS-3184
+ if (queryText.toLowerCase().startsWith("create view")) {
+ return ImpalaOperationType.CREATEVIEW;
+ }
+
+ return ImpalaOperationType.UNKNOWN;
+ }
+}
\ No newline at end of file
diff --git a/addons/impala-bridge/src/main/java/org.apache.atlas.impala/hook/events/BaseImpalaEvent.java b/addons/impala-bridge/src/main/java/org.apache.atlas.impala/hook/events/BaseImpalaEvent.java
new file mode 100644
index 0000000..0487739
--- /dev/null
+++ b/addons/impala-bridge/src/main/java/org.apache.atlas.impala/hook/events/BaseImpalaEvent.java
@@ -0,0 +1,452 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.impala.hook.events;
+
+import static org.apache.atlas.impala.hook.AtlasImpalaHookContext.QNAME_SEP_PROCESS;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.atlas.impala.hook.AtlasImpalaHookContext;
+import org.apache.atlas.impala.model.ImpalaDataType;
+import org.apache.atlas.impala.model.ImpalaNode;
+import org.apache.atlas.impala.model.ImpalaOperationType;
+import org.apache.atlas.impala.model.ImpalaVertexType;
+import org.apache.atlas.impala.model.LineageVertex;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityExtInfo;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.model.notification.HookNotification;
+import org.apache.commons.collections.CollectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The base class for generating notification event to Atlas server
+ * Most code is copied from BaseHiveEvent to avoid depending on org.apache.atlas.hive.hook
+ */
+public abstract class BaseImpalaEvent {
+ private static final Logger LOG = LoggerFactory.getLogger(BaseImpalaEvent.class);
+
+ // Impala should re-use the same entity type as hive. So Hive and Impala can operate on same
+ // database or table
+ public static final String HIVE_TYPE_DB = "hive_db";
+ public static final String HIVE_TYPE_TABLE = "hive_table";
+ public static final String HIVE_TYPE_COLUMN = "hive_column";
+
+ public static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName";
+ public static final String ATTRIBUTE_NAME = "name";
+ public static final String ATTRIBUTE_OWNER = "owner";
+ public static final String ATTRIBUTE_CLUSTER_NAME = "clusterName";
+ public static final String ATTRIBUTE_CREATE_TIME = "createTime";
+ public static final String ATTRIBUTE_LAST_ACCESS_TIME = "lastAccessTime";
+ public static final String ATTRIBUTE_DB = "db";
+ public static final String ATTRIBUTE_COLUMNS = "columns";
+ public static final String ATTRIBUTE_TABLE = "table";
+ public static final String ATTRIBUTE_INPUTS = "inputs";
+ public static final String ATTRIBUTE_OUTPUTS = "outputs";
+ public static final String ATTRIBUTE_OPERATION_TYPE = "operationType";
+ public static final String ATTRIBUTE_START_TIME = "startTime";
+ public static final String ATTRIBUTE_USER_NAME = "userName";
+ public static final String ATTRIBUTE_QUERY_TEXT = "queryText";
+ public static final String ATTRIBUTE_QUERY_ID = "queryId";
+ public static final String ATTRIBUTE_QUERY_PLAN = "queryPlan";
+ public static final String ATTRIBUTE_END_TIME = "endTime";
+ public static final String ATTRIBUTE_RECENT_QUERIES = "recentQueries";
+ public static final String ATTRIBUTE_QUERY = "query";
+ public static final String ATTRIBUTE_DEPENDENCY_TYPE = "dependencyType";
+ public static final long MILLIS_CONVERT_FACTOR = 1000;
+
+ public static final Map<Integer, String> OWNER_TYPE_TO_ENUM_VALUE = new HashMap<>();
+
+ static {
+ OWNER_TYPE_TO_ENUM_VALUE.put(1, "USER");
+ OWNER_TYPE_TO_ENUM_VALUE.put(2, "ROLE");
+ OWNER_TYPE_TO_ENUM_VALUE.put(3, "GROUP");
+ }
+
+ protected final AtlasImpalaHookContext context;
+ protected final Map<String, ImpalaNode> vertexNameMap;
+ protected final Map<Long, LineageVertex> verticesMap;
+
+ public BaseImpalaEvent(AtlasImpalaHookContext context) {
+
+ this.context = context;
+ vertexNameMap = new HashMap<>();
+ verticesMap = new HashMap<>();
+ }
+
+ public AtlasImpalaHookContext getContext() {
+ return context;
+ }
+
+ public abstract List<HookNotification> getNotificationMessages() throws Exception;
+
+ public String getUserName() { return context.getUserName(); }
+
+ public String getTableNameFromColumn(String columnName) {
+ return context.getTableNameFromColumn(columnName);
+ }
+
+ public String getQualifiedName(ImpalaNode node) throws IllegalArgumentException {
+
+ return getQualifiedName(node.getOwnVertex());
+ }
+
+ public String getQualifiedName(LineageVertex node) throws IllegalArgumentException {
+ if (node == null) {
+ throw new IllegalArgumentException("node is null");
+ }
+
+ ImpalaVertexType nodeType = node.getVertexType();
+
+ if (nodeType == null) {
+ if (node.getVertexId() != null) {
+ LOG.warn("null qualified name for type: null and name: {}", node.getVertexId());
+ }
+ return null;
+ }
+
+ if (node.getVertexId() == null) {
+ LOG.warn("null qualified name for type: {} and name: null", nodeType);
+ return null;
+ }
+
+ switch (nodeType) {
+ case DATABASE:
+ return context.getQualifiedNameForDb(node.getVertexId());
+
+ case TABLE:
+ return context.getQualifiedNameForTable(node.getVertexId());
+
+ case COLUMN:
+ return context.getQualifiedNameForColumn(node.getVertexId());
+
+ default:
+ LOG.warn("null qualified name for type: {} and name: {}", nodeType, node.getVertexId());
+ return null;
+ }
+ }
+
+ static final class AtlasEntityComparator implements Comparator<AtlasEntity> {
+ @Override
+ public int compare(AtlasEntity entity1, AtlasEntity entity2) {
+ String name1 = (String)entity1.getAttribute(ATTRIBUTE_QUALIFIED_NAME);
+ String name2 = (String)entity2.getAttribute(ATTRIBUTE_QUALIFIED_NAME);
+
+ if (name1 == null) {
+ return -1;
+ }
+
+ if (name2 == null) {
+ return 1;
+ }
+
+ return name1.toLowerCase().compareTo(name2.toLowerCase());
+ }
+ }
+
+ static final Comparator<AtlasEntity> entityComparator = new AtlasEntityComparator();
+
+ protected String getQualifiedName(List<AtlasEntity> inputs, List<AtlasEntity> outputs) throws Exception {
+ ImpalaOperationType operation = context.getImpalaOperationType();
+
+ // TODO: add more operation type here
+ if (operation == ImpalaOperationType.CREATEVIEW) {
+ List<? extends AtlasEntity> sortedEntities = new ArrayList<>(outputs);
+
+ Collections.sort(sortedEntities, entityComparator);
+
+ for (AtlasEntity entity : sortedEntities) {
+ if (entity.getTypeName().equalsIgnoreCase(HIVE_TYPE_TABLE)) {
+ Long createTime = (Long)entity.getAttribute(ATTRIBUTE_CREATE_TIME);
+
+ return (String)entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME) + QNAME_SEP_PROCESS + createTime;
+ }
+ }
+ }
+
+ // TODO: add code for name construction for HDFS path
+ return null;
+ }
+
+ protected AtlasEntity getInputOutputEntity(ImpalaNode node, AtlasEntityExtInfo entityExtInfo) throws Exception {
+ AtlasEntity ret = null;
+
+ switch(node.getNodeType()) {
+ case TABLE:
+ case PARTITION:
+ case DFS_DIR: {
+ ret = toAtlasEntity(node, entityExtInfo);
+ }
+ break;
+ }
+
+ return ret;
+ }
+
+ protected AtlasEntity toAtlasEntity(ImpalaNode node, AtlasEntityExtInfo entityExtInfo) throws Exception {
+ AtlasEntity ret = null;
+
+ switch (node.getNodeType()) {
+ case DATABASE:
+ ret = toDbEntity(node);
+ break;
+
+ case TABLE:
+ case PARTITION:
+ ret = toTableEntity(node, entityExtInfo);
+ break;
+
+ default:
+ break;
+ }
+
+ return ret;
+ }
+
+ protected AtlasEntity toDbEntity(ImpalaNode db) throws Exception {
+ return toDbEntity(db.getNodeName());
+ }
+
+ protected AtlasEntity toDbEntity(String dbName) throws Exception {
+ String dbQualifiedName = context.getQualifiedNameForDb(dbName);
+ AtlasEntity ret = context.getEntity(dbQualifiedName);
+
+ if (ret == null) {
+ ret = new AtlasEntity(HIVE_TYPE_DB);
+
+ // Impala hook should not send metadata entities. set 'guid' to null - which will:
+ // - result in this entity to be not included in 'referredEntities'
+ // - cause Atlas server to resolve the entity by its qualifiedName
+ ret.setGuid(null);
+
+ ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, dbQualifiedName);
+ ret.setAttribute(ATTRIBUTE_NAME, dbName.toLowerCase());
+ ret.setAttribute(ATTRIBUTE_CLUSTER_NAME, context.getClusterName());
+
+ context.putEntity(dbQualifiedName, ret);
+ }
+
+ return ret;
+ }
+
+ protected AtlasEntityWithExtInfo toTableEntity(ImpalaNode table) throws Exception {
+ AtlasEntityWithExtInfo ret = new AtlasEntityWithExtInfo();
+
+ AtlasEntity entity = toTableEntity(table, ret);
+
+ if (entity != null) {
+ ret.setEntity(entity);
+ } else {
+ ret = null;
+ }
+
+ return ret;
+ }
+
+ protected AtlasEntity toTableEntity(ImpalaNode table, AtlasEntitiesWithExtInfo entities) throws Exception {
+ AtlasEntity ret = toTableEntity(table, (AtlasEntityExtInfo) entities);
+
+ if (ret != null) {
+ entities.addEntity(ret);
+ }
+
+ return ret;
+ }
+
+ protected AtlasEntity toTableEntity(ImpalaNode table, AtlasEntityExtInfo entityExtInfo) throws Exception {
+ if ((table == null) || (table.getNodeName() == null)) {
+ throw new IllegalArgumentException("table is null or its name is null");
+ }
+
+ String dbName = context.getDatabaseNameFromTable(table.getNodeName());
+ if (dbName == null) {
+ throw new IllegalArgumentException(String.format("db name is null for table: {}", table.getNodeName()));
+ }
+
+ AtlasEntity dbEntity = toDbEntity(dbName);
+
+ if (entityExtInfo != null) {
+ if (dbEntity != null) {
+ entityExtInfo.addReferredEntity(dbEntity);
+ }
+ }
+
+ AtlasEntity ret = toTableEntity(getObjectId(dbEntity), table, entityExtInfo);
+
+ return ret;
+ }
+
+ protected AtlasEntity toTableEntity(AtlasObjectId dbId, ImpalaNode table, AtlasEntityExtInfo entityExtInfo) throws Exception {
+ String tblQualifiedName = getQualifiedName(table);
+ AtlasEntity ret = context.getEntity(tblQualifiedName);
+
+ if (ret != null) {
+ return ret;
+ }
+
+ // a table created in Impala still uses HIVE_TYPE_TABLE to allow both Impala and Hive operate
+ // on the same table
+ ret = new AtlasEntity(HIVE_TYPE_TABLE);
+
+ // Impala hook should not send meta data entity to Atlas. set 'guid' to null - which will:
+ // - result in this entity to be not included in 'referredEntities'
+ // - cause Atlas server to resolve the entity by its qualifiedName
+ // TODO: enable this once HMS hook is in. Disable this before that.
+ ret.setGuid(null);
+
+ long createTime = getTableCreateTime(table);
+ long lastAccessTime = createTime;
+
+ ret.setAttribute(ATTRIBUTE_DB, dbId);
+ ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, tblQualifiedName);
+ ret.setAttribute(ATTRIBUTE_NAME, table.getNodeName().toLowerCase());
+
+ // just fake it. It should not be sent to Atlas once HMS hook is in
+ ret.setAttribute(ATTRIBUTE_OWNER, getUserName());
+
+ ret.setAttribute(ATTRIBUTE_CREATE_TIME, createTime);
+ ret.setAttribute(ATTRIBUTE_LAST_ACCESS_TIME, lastAccessTime);
+
+ AtlasObjectId tableId = getObjectId(ret);
+ List<AtlasEntity> columns = getColumnEntities(tableId, table);
+
+ if (entityExtInfo != null) {
+ if (columns != null) {
+ for (AtlasEntity column : columns) {
+ entityExtInfo.addReferredEntity(column);
+ }
+ }
+ }
+
+ ret.setAttribute(ATTRIBUTE_COLUMNS, getObjectIds(columns));
+
+
+ context.putEntity(tblQualifiedName, ret);
+
+ return ret;
+ }
+
+ public static AtlasObjectId getObjectId(AtlasEntity entity) {
+ String qualifiedName = (String) entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME);
+ AtlasObjectId ret = new AtlasObjectId(entity.getGuid(), entity.getTypeName(), Collections
+ .singletonMap(ATTRIBUTE_QUALIFIED_NAME, qualifiedName));
+
+ return ret;
+ }
+
+ public static List<AtlasObjectId> getObjectIds(List<AtlasEntity> entities) {
+ final List<AtlasObjectId> ret;
+
+ if (CollectionUtils.isNotEmpty(entities)) {
+ ret = new ArrayList<>(entities.size());
+
+ for (AtlasEntity entity : entities) {
+ ret.add(getObjectId(entity));
+ }
+ } else {
+ ret = Collections.emptyList();
+ }
+
+ return ret;
+ }
+
+ public static long getTableCreateTime(ImpalaNode table) {
+ return getTableCreateTime(table.getOwnVertex());
+ }
+
+ public static long getTableCreateTime(LineageVertex tableVertex) {
+ Long createTime = tableVertex.getCreateTime();
+ if (createTime != null) {
+ return createTime.longValue() * MILLIS_CONVERT_FACTOR;
+ } else {
+ return System.currentTimeMillis();
+ }
+ }
+
+ protected List<AtlasEntity> getColumnEntities(AtlasObjectId tableId, ImpalaNode table) {
+ List<AtlasEntity> ret = new ArrayList<>();
+
+ for (ImpalaNode childNode : table.getChildren().values()) {
+ String colQualifiedName = getQualifiedName(childNode);
+ AtlasEntity column = context.getEntity(colQualifiedName);
+
+ if (column == null) {
+ column = new AtlasEntity(HIVE_TYPE_COLUMN);
+
+ // if column's table was sent in an earlier notification, set 'guid' to null - which will:
+ // - result in this entity to be not included in 'referredEntities'
+ // - cause Atlas server to resolve the entity by its qualifiedName
+ // TODO: enable this once HMS hook is in. Disable this before that.
+ column.setGuid(null);
+
+ column.setAttribute(ATTRIBUTE_TABLE, tableId);
+ column.setAttribute(ATTRIBUTE_QUALIFIED_NAME, colQualifiedName);
+ column.setAttribute(ATTRIBUTE_NAME, context.getColumnNameOnly(childNode.getNodeName()));
+
+ // just fake it. It should not be sent to Atlas once HMS hook is in
+ column.setAttribute(ATTRIBUTE_OWNER, getUserName());
+
+ context.putEntity(colQualifiedName, column);
+ }
+
+ ret.add(column);
+ }
+
+ return ret;
+ }
+
+ protected AtlasEntity getImpalaProcessEntity(List<AtlasEntity> inputs, List<AtlasEntity> outputs) throws Exception {
+ AtlasEntity ret = new AtlasEntity(ImpalaDataType.IMPALA_PROCESS.getName());
+ String queryStr = context.getQueryStr();
+
+ if (queryStr != null) {
+ queryStr = queryStr.toLowerCase().trim();
+ }
+
+ ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, getQualifiedName(inputs, outputs));
+ ret.setAttribute(ATTRIBUTE_INPUTS, getObjectIds(inputs));
+ ret.setAttribute(ATTRIBUTE_OUTPUTS, getObjectIds(outputs));
+ ret.setAttribute(ATTRIBUTE_NAME, queryStr);
+ ret.setAttribute(ATTRIBUTE_OPERATION_TYPE, context.getImpalaOperationType());
+ ret.setAttribute(ATTRIBUTE_START_TIME, context.getLineageQuery().getTimestamp());
+ ret.setAttribute(ATTRIBUTE_END_TIME, System.currentTimeMillis());
+ ret.setAttribute(ATTRIBUTE_USER_NAME, getUserName());
+ ret.setAttribute(ATTRIBUTE_QUERY_TEXT, queryStr);
+ ret.setAttribute(ATTRIBUTE_QUERY_ID, context.getLineageQuery().getQueryId());
+ ret.setAttribute(ATTRIBUTE_QUERY_PLAN, "Not Supported");
+ ret.setAttribute(ATTRIBUTE_RECENT_QUERIES, Collections.singletonList(queryStr));
+
+ return ret;
+ }
+
+ protected void addProcessedEntities(AtlasEntitiesWithExtInfo entitiesWithExtInfo) {
+ for (AtlasEntity entity : context.getEntities()) {
+ entitiesWithExtInfo.addReferredEntity(entity);
+ }
+
+ entitiesWithExtInfo.compact();
+ }
+}
diff --git a/addons/impala-bridge/src/main/java/org.apache.atlas.impala/hook/events/CreateImpalaProcess.java b/addons/impala-bridge/src/main/java/org.apache.atlas.impala/hook/events/CreateImpalaProcess.java
new file mode 100644
index 0000000..e4a38a3
--- /dev/null
+++ b/addons/impala-bridge/src/main/java/org.apache.atlas.impala/hook/events/CreateImpalaProcess.java
@@ -0,0 +1,312 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.impala.hook.events;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.atlas.impala.hook.AtlasImpalaHookContext;
+import org.apache.atlas.impala.model.ImpalaDataType;
+import org.apache.atlas.impala.model.ImpalaDependencyType;
+import org.apache.atlas.impala.model.ImpalaNode;
+import org.apache.atlas.impala.model.ImpalaVertexType;
+import org.apache.atlas.impala.model.LineageEdge;
+import org.apache.atlas.impala.model.ImpalaQuery;
+import org.apache.atlas.impala.model.LineageVertex;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
+import org.apache.atlas.model.notification.HookNotification;
+import org.apache.atlas.model.notification.HookNotification.EntityCreateRequestV2;
+import org.apache.commons.collections.CollectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CreateImpalaProcess extends BaseImpalaEvent {
+ private static final Logger LOG = LoggerFactory.getLogger(CreateImpalaProcess.class);
+
+ public CreateImpalaProcess(AtlasImpalaHookContext context) {
+ super(context);
+ }
+
+ public List<HookNotification> getNotificationMessages() throws Exception {
+ List<HookNotification> ret = null;
+ AtlasEntitiesWithExtInfo entities = getEntities();
+
+ if (entities != null && CollectionUtils.isNotEmpty(entities.getEntities())) {
+ ret = Collections.singletonList(new EntityCreateRequestV2(getUserName(), entities));
+ }
+
+ return ret;
+ }
+
+ public AtlasEntitiesWithExtInfo getEntities() throws Exception {
+ AtlasEntitiesWithExtInfo ret = null;
+ List<ImpalaNode> inputNodes = new ArrayList<>();
+ List<ImpalaNode> outputNodes = new ArrayList<>();
+ List<AtlasEntity> inputs = new ArrayList<>();
+ List<AtlasEntity> outputs = new ArrayList<>();
+ Set<String> processedNames = new HashSet<>();
+
+ getInputOutList(context.getLineageQuery(), inputNodes, outputNodes);
+
+ if (skipProcess(inputNodes, outputNodes)) {
+ return ret;
+ }
+
+ ret = new AtlasEntitiesWithExtInfo();
+
+ if (!inputNodes.isEmpty()) {
+ for (ImpalaNode input : inputNodes) {
+ String qualifiedName = getQualifiedName(input);
+
+ if (qualifiedName == null || !processedNames.add(qualifiedName)) {
+ continue;
+ }
+
+ AtlasEntity entity = getInputOutputEntity(input, ret);
+
+ if (entity != null) {
+ inputs.add(entity);
+ }
+ }
+ }
+
+ if (outputNodes != null) {
+ for (ImpalaNode output : outputNodes) {
+ String qualifiedName = getQualifiedName(output);
+
+ if (qualifiedName == null || !processedNames.add(qualifiedName)) {
+ continue;
+ }
+
+ AtlasEntity entity = getInputOutputEntity(output, ret);
+
+ if (entity != null) {
+ outputs.add(entity);
+ }
+ }
+ }
+
+ if (!inputs.isEmpty() || !outputs.isEmpty()) {
+ AtlasEntity process = getImpalaProcessEntity(inputs, outputs);
+ if (process!= null && LOG.isDebugEnabled()) {
+ LOG.debug("get process entity with qualifiedName: {}", process.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
+ }
+
+ ret.addEntity(process);
+
+ processColumnLineage(process, ret);
+
+ addProcessedEntities(ret);
+ } else {
+ ret = null;
+ }
+
+
+ return ret;
+ }
+
+ private void processColumnLineage(AtlasEntity impalaProcess, AtlasEntitiesWithExtInfo entities) {
+ List<LineageEdge> edges = context.getLineageQuery().getEdges();
+
+ if (CollectionUtils.isEmpty(edges)) {
+ return;
+ }
+
+ final List<AtlasEntity> columnLineages = new ArrayList<>();
+ final Set<String> processedOutputCols = new HashSet<>();
+
+ for (LineageEdge edge : edges) {
+
+ if (!edge.getEdgeType().equals(ImpalaDependencyType.PROJECTION)) {
+ // Impala dependency type can only be predicate or projection.
+ // Impala predicate dependency: This is a dependency between a set of target
+ // columns (or exprs) and a set of source columns (base table columns). It
+ // indicates that the source columns restrict the values of their targets (e.g.
+ // by participating in WHERE clause predicates). It should not be part of lineage
+ continue;
+ }
+
+ List<AtlasEntity> outputColumns = new ArrayList<>();
+ for (Long targetId : edge.getTargets()) {
+ LineageVertex columnVertex = verticesMap.get(targetId);
+ String outputColName = getQualifiedName(columnVertex);
+ AtlasEntity outputColumn = context.getEntity(outputColName);
+
+ LOG.debug("processColumnLineage(): target id = {}, target column name = {}",
+ targetId, outputColName);
+
+ if (outputColumn == null) {
+ LOG.warn("column-lineage: non-existing output-column {}", outputColName);
+ continue;
+ }
+
+ if (processedOutputCols.contains(outputColName)) {
+ LOG.warn("column-lineage: duplicate for output-column {}", outputColName);
+ continue;
+ } else {
+ processedOutputCols.add(outputColName);
+ }
+
+ outputColumns.add(outputColumn);
+ }
+
+ List<AtlasEntity> inputColumns = new ArrayList<>();
+
+ for (Long sourceId : edge.getSources()) {
+ LineageVertex columnVertex = verticesMap.get(sourceId);
+ String inputColName = getQualifiedName(columnVertex);
+ AtlasEntity inputColumn = context.getEntity(inputColName);
+
+ if (inputColumn == null) {
+ LOG.warn("column-lineage: non-existing input-column {} with id ={}", inputColName, sourceId);
+ continue;
+ }
+
+ inputColumns.add(inputColumn);
+ }
+
+ if (inputColumns.isEmpty()) {
+ continue;
+ }
+
+ AtlasEntity columnLineageProcess = new AtlasEntity(ImpalaDataType.IMPALA_COLUMN_LINEAGE.getName());
+
+ // TODO: when there are multiple target IDs, should we use first column name or all of their name?
+ String columnQualifiedName = (String)impalaProcess.getAttribute(ATTRIBUTE_QUALIFIED_NAME) +
+ AtlasImpalaHookContext.QNAME_SEP_PROCESS + outputColumns.get(0).getAttribute(ATTRIBUTE_NAME);
+ columnLineageProcess.setAttribute(ATTRIBUTE_NAME, columnQualifiedName);
+ columnLineageProcess.setAttribute(ATTRIBUTE_QUALIFIED_NAME, columnQualifiedName);
+ columnLineageProcess.setAttribute(ATTRIBUTE_INPUTS, getObjectIds(inputColumns));
+ columnLineageProcess.setAttribute(ATTRIBUTE_OUTPUTS, getObjectIds(outputColumns));
+ columnLineageProcess.setAttribute(ATTRIBUTE_QUERY, getObjectId(impalaProcess));
+
+ // based on https://github.com/apache/impala/blob/master/fe/src/main/java/org/apache/impala/analysis/ColumnLineageGraph.java#L267
+ // There are two types of dependencies that are represented as edges in the column
+ // lineage graph:
+ // a) Projection dependency: This is a dependency between a set of source
+ // columns (base table columns) and a single target (result expr or table column).
+ // This dependency indicates that values of the target depend on the values of the source
+ // columns.
+ // b) Predicate dependency: This is a dependency between a set of target
+ // columns (or exprs) and a set of source columns (base table columns). It indicates that
+ // the source columns restrict the values of their targets (e.g. by participating in
+ // WHERE clause predicates).
+ columnLineageProcess.setAttribute(ATTRIBUTE_DEPENDENCY_TYPE, ImpalaDependencyType.PROJECTION.getName());
+
+ columnLineages.add(columnLineageProcess);
+ }
+
+ for (AtlasEntity columnLineage : columnLineages) {
+ String columnQualifiedName = (String)columnLineage.getAttribute(ATTRIBUTE_QUALIFIED_NAME);
+ LOG.debug("get column lineage entity with qualifiedName: {}", columnQualifiedName);
+
+ entities.addEntity(columnLineage);
+ }
+ }
+
+ // Process the impala query, classify the vertices as input or output based on LineageEdge
+ // Then organize the vertices into hierarchical structure: put all column vertices of a table
+ // as children of a ImpalaNode representing that table.
+ private void getInputOutList(ImpalaQuery lineageQuery, List<ImpalaNode> inputNodes,
+ List<ImpalaNode> outputNodes) {
+ // get vertex map with key being its id and
+ // ImpalaNode map with its own vertex's vertexId as its key
+ for (LineageVertex vertex : lineageQuery.getVertices()) {
+ verticesMap.put(vertex.getId(), vertex);
+ vertexNameMap.put(vertex.getVertexId(), new ImpalaNode(vertex));
+ }
+
+ // get set of source ID and set of target Id
+ Set<Long> sourceIds = new HashSet<>();
+ Set<Long> targetIds = new HashSet<>();
+ for (LineageEdge edge : lineageQuery.getEdges()) {
+ if (ImpalaDependencyType.PROJECTION.equals(edge.getEdgeType())) {
+ sourceIds.addAll(edge.getSources());
+ targetIds.addAll(edge.getTargets());
+ }
+ }
+
+ Map<String, ImpalaNode> inputMap = buildInputOutputList(sourceIds, verticesMap, vertexNameMap);
+ Map<String, ImpalaNode> outputMap = buildInputOutputList(targetIds, verticesMap, vertexNameMap);
+
+ inputNodes.addAll(inputMap.values());
+ outputNodes.addAll(outputMap.values());
+ }
+
+ /**
+ * From the list of Ids and Id to Vertices map, generate the Table name to ImpalaNode map.
+ * @param idSet the list of Ids. They are from lineage edges
+ * @param vertexMap the Id to Vertex map
+ * @param vertexNameMap the vertexId to ImpalaNode map.
+ * @return the table name to ImpalaNode map, whose table node contains its columns
+ */
+ private Map<String, ImpalaNode> buildInputOutputList(Set<Long> idSet, Map<Long, LineageVertex> vertexMap,
+ Map<String, ImpalaNode> vertexNameMap) {
+ Map<String, ImpalaNode> returnTableMap = new HashMap<>();
+
+ for (Long id : idSet) {
+ LineageVertex vertex = vertexMap.get(id);
+ if (vertex == null) {
+ LOG.warn("cannot find vertex with id: {}", id);
+ continue;
+ }
+
+ if (ImpalaVertexType.COLUMN.equals(vertex.getVertexType())) {
+ // add column to its table node
+ String tableName = getTableNameFromColumn(vertex.getVertexId());
+ if (tableName == null) {
+ LOG.warn("cannot find tableName for vertex with id: {}, column name : {}",
+ id, vertex.getVertexId() == null? "null" : vertex.getVertexId());
+ continue;
+ }
+
+ ImpalaNode tableNode = returnTableMap.get(tableName);
+
+ if (tableNode == null) {
+ tableNode = vertexNameMap.get(tableName);
+
+ if (tableNode == null) {
+ LOG.warn("cannot find table node for vertex with id: {}, column name : {}",
+ id, vertex.getVertexId());
+ continue;
+ }
+
+ returnTableMap.put(tableName, tableNode);
+ }
+
+ tableNode.addChild(vertex);
+ }
+ }
+
+ return returnTableMap;
+ }
+
+ private boolean skipProcess(List<ImpalaNode> inputNodes, List<ImpalaNode> ouputNodes) {
+ if (inputNodes.isEmpty() || ouputNodes.isEmpty()) {
+ return true;
+ }
+
+ return false;
+ }
+}
diff --git a/addons/impala-bridge/src/main/java/org.apache.atlas.impala/model/IImpalaLineageHook.java b/addons/impala-bridge/src/main/java/org.apache.atlas.impala/model/IImpalaLineageHook.java
new file mode 100644
index 0000000..7c1103a
--- /dev/null
+++ b/addons/impala-bridge/src/main/java/org.apache.atlas.impala/model/IImpalaLineageHook.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.impala.model;
+
+/**
+ * Define the interface to process Impala lineage record
+ */
+public interface IImpalaLineageHook {
+
+ // The input is a serialized string of an Impala lineage record
+ void process(String impalaQueryString) throws Exception;
+}
\ No newline at end of file
diff --git a/addons/impala-bridge/src/main/java/org.apache.atlas.impala/model/ImpalaDataType.java b/addons/impala-bridge/src/main/java/org.apache.atlas.impala/model/ImpalaDataType.java
new file mode 100644
index 0000000..10ce448
--- /dev/null
+++ b/addons/impala-bridge/src/main/java/org.apache.atlas.impala/model/ImpalaDataType.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.impala.model;
+
+/**
+ * Data types used for Impala bridge
+ */
+public enum ImpalaDataType {
+
+ IMPALA_PROCESS,
+ IMPALA_PROCESS_EXECUTION,
+ IMPALA_COLUMN_LINEAGE;
+
+ public String getName() {
+ return name().toLowerCase();
+ }
+}
diff --git a/addons/impala-bridge/src/main/java/org.apache.atlas.impala/model/ImpalaDependencyType.java b/addons/impala-bridge/src/main/java/org.apache.atlas.impala/model/ImpalaDependencyType.java
new file mode 100644
index 0000000..892ee9b
--- /dev/null
+++ b/addons/impala-bridge/src/main/java/org.apache.atlas.impala/model/ImpalaDependencyType.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.impala.model;
+
+public enum ImpalaDependencyType {
+ PROJECTION("PROJECTION"),
+ PREDICATE("PREDICATE");
+
+ private final String name;
+
+ ImpalaDependencyType(String name) {
+ this.name = name;
+ }
+
+ public String getName() {
+ return name.toUpperCase();
+ }
+}
diff --git a/addons/impala-bridge/src/main/java/org.apache.atlas.impala/model/ImpalaNode.java b/addons/impala-bridge/src/main/java/org.apache.atlas.impala/model/ImpalaNode.java
new file mode 100644
index 0000000..a3ddf53
--- /dev/null
+++ b/addons/impala-bridge/src/main/java/org.apache.atlas.impala/model/ImpalaNode.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.impala.model;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Contain vertex info of this node and its children. It is used only internally
+ */
+public class ImpalaNode {
+ LineageVertex ownVertex;
+ Map<Long, ImpalaNode> children;
+
+ public ImpalaNode(LineageVertex ownVertex) {
+ this.ownVertex = ownVertex;
+ children = new HashMap<>();
+ }
+
+ public String getNodeName() { return ownVertex.getVertexId(); }
+ public ImpalaVertexType getNodeType() { return ownVertex.getVertexType(); }
+ public LineageVertex getOwnVertex() { return ownVertex; }
+ public Map<Long, ImpalaNode> getChildren() { return children; }
+
+ /**
+ * Add child to this node
+ * @param child
+ * @return the node corresponding to the input child vertex
+ */
+ public ImpalaNode addChild(LineageVertex child) {
+ ImpalaNode exitingChild = children.get(child.getId());
+ if (exitingChild != null) {
+ return exitingChild;
+ }
+
+ ImpalaNode newChild = new ImpalaNode(child);
+ return children.put(child.getId(), newChild);
+ }
+}
diff --git a/addons/impala-bridge/src/main/java/org.apache.atlas.impala/model/ImpalaOperationType.java b/addons/impala-bridge/src/main/java/org.apache.atlas.impala/model/ImpalaOperationType.java
new file mode 100644
index 0000000..8b0be16
--- /dev/null
+++ b/addons/impala-bridge/src/main/java/org.apache.atlas.impala/model/ImpalaOperationType.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.impala.model;
+
+public enum ImpalaOperationType{
+ CREATEVIEW ("CREATEVIEW"),
+ UNKNOWN ("UNKNOWN");
+
+ private final String name;
+
+ ImpalaOperationType(String s) {
+ name = s;
+ }
+
+ public boolean equalsName(String otherName) {
+ return name.equals(otherName);
+ }
+
+ public String toString() {
+ return this.name;
+ }
+}
\ No newline at end of file
diff --git a/addons/impala-bridge/src/main/java/org.apache.atlas.impala/model/ImpalaQuery.java b/addons/impala-bridge/src/main/java/org.apache.atlas.impala/model/ImpalaQuery.java
new file mode 100644
index 0000000..27bdc72
--- /dev/null
+++ b/addons/impala-bridge/src/main/java/org.apache.atlas.impala/model/ImpalaQuery.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.impala.model;
+
+import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
+import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import java.util.List;
+
+/**
+ * Represent an Impala lineage record in lineage log.
+ */
+@JsonAutoDetect(getterVisibility=PUBLIC_ONLY, setterVisibility=PUBLIC_ONLY, fieldVisibility=NONE)
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown=true)
+public class ImpalaQuery {
+ private String queryText;
+ private String queryId;
+ private String hash;
+ private String user;
+
+ // the time stamp is in seconds. It is Unix epoch, which is the number of seconds that have
+ // elapsed since January 1, 1970 (midnight UTC/GMT), not counting leap seconds
+ private Long timestamp;
+ private Long endTime;
+ private List<LineageEdge> edges;
+ private List<LineageVertex> vertices;
+
+ public List<LineageEdge> getEdges() {
+ return edges;
+ }
+
+ public List<LineageVertex> getVertices() {
+ return vertices;
+ }
+
+ public Long getEndTime() {
+ return endTime;
+ }
+
+ public String getHash() {
+ return hash;
+ }
+
+ public String getQueryId() {
+ return queryId;
+ }
+
+ public String getQueryText() {
+ return queryText;
+ }
+
+ public Long getTimestamp() {
+ return timestamp;
+ }
+
+ public String getUser() {
+ return user;
+ }
+
+ public void setEdges(List<LineageEdge> edges) {
+ this.edges = edges;
+ }
+
+ public void setEndTime(Long endTime) {
+ this.endTime = endTime;
+ }
+
+ public void setHash(String hash) {
+ this.hash = hash;
+ }
+
+ public void setQueryId(String queryId) {
+ this.queryId = queryId;
+ }
+
+ public void setQueryText(String queryText) {
+ this.queryText = queryText;
+ }
+
+ public void setTimestamp(Long timestamp) { this.timestamp = timestamp; }
+
+ public void setUser(String user) {
+ this.user = user;
+ }
+
+ public void setVertices(List<LineageVertex> vertices) {
+ this.vertices = vertices;
+ }
+
+}
diff --git a/addons/impala-bridge/src/main/java/org.apache.atlas.impala/model/ImpalaVertexType.java b/addons/impala-bridge/src/main/java/org.apache.atlas.impala/model/ImpalaVertexType.java
new file mode 100644
index 0000000..8ec3f85
--- /dev/null
+++ b/addons/impala-bridge/src/main/java/org.apache.atlas.impala/model/ImpalaVertexType.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.impala.model;
+
+public enum ImpalaVertexType {
+ DFS_DIR("DFS_DIR"),
+ PARTITION("PARTITION"),
+ COLUMN("COLUMN"),
+ TABLE("TABLE"),
+ DATABASE("DATABASE");
+
+ private final String name;
+
+ ImpalaVertexType(String name) {
+ this.name = name;
+ }
+
+ public String getName() {
+ return name.toUpperCase();
+ }
+}
diff --git a/addons/impala-bridge/src/main/java/org.apache.atlas.impala/model/LineageEdge.java b/addons/impala-bridge/src/main/java/org.apache.atlas.impala/model/LineageEdge.java
new file mode 100644
index 0000000..251507e
--- /dev/null
+++ b/addons/impala-bridge/src/main/java/org.apache.atlas.impala/model/LineageEdge.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.impala.model;
+
+import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
+import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import java.util.List;
+
+/**
+ * This represents an edge in Impala's lineage record that connects two entities
+ */
+@JsonAutoDetect(getterVisibility=PUBLIC_ONLY, setterVisibility=PUBLIC_ONLY, fieldVisibility=NONE)
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown=true)
+public class LineageEdge {
+ private List<Long> sources;
+ private List<Long> targets;
+ private ImpalaDependencyType edgeType;
+
+ public List<Long> getSources() {
+ return sources;
+ }
+
+ public List<Long> getTargets() {
+ return targets;
+ }
+
+ public ImpalaDependencyType getEdgeType() {
+ return edgeType;
+ }
+
+ public void setSources(List<Long> sources) {
+ this.sources = sources;
+ }
+
+ public void setTargets(List<Long> targets) {
+ this.targets = targets;
+ }
+
+ public void setEdgeType(ImpalaDependencyType edgeType) {
+ this.edgeType = edgeType;
+ }
+}
diff --git a/addons/impala-bridge/src/main/java/org.apache.atlas.impala/model/LineageVertex.java b/addons/impala-bridge/src/main/java/org.apache.atlas.impala/model/LineageVertex.java
new file mode 100644
index 0000000..82672c9
--- /dev/null
+++ b/addons/impala-bridge/src/main/java/org.apache.atlas.impala/model/LineageVertex.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.impala.model;
+
+import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
+import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+/**
+ * This represents an entity in Impala's lineage record.
+ */
+@JsonAutoDetect(getterVisibility=PUBLIC_ONLY, setterVisibility=PUBLIC_ONLY, fieldVisibility=NONE)
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown=true)
+public class LineageVertex {
+ // id is used to reference this entity. It is used in LineageEdge to specify source and target
+ // https://github.com/apache/impala/blob/master/be/src/util/lineage-util.h#L40
+ // Impala id is int64. Therefore, define this field as Long
+ private Long id;
+
+ // specify the type of the entity, it could be "TABLE", "COLUMN" etc.
+ private ImpalaVertexType vertexType;
+
+ // specify the name of the entity
+ private String vertexId;
+
+ // It is optional, and could be null. It is only set if the verType is "TABLE"
+ private Long createTime;
+
+ public Long getId() { return id; }
+
+ public ImpalaVertexType getVertexType() {
+ return vertexType;
+ }
+
+ public String getVertexId() {
+ return vertexId;
+ }
+
+ public Long getCreateTime() { return createTime; }
+
+ public void setId(Long id) {
+ this.id = id;
+ }
+
+ public void setVertexType(ImpalaVertexType vertexType) {
+ this.vertexType = vertexType;
+ }
+
+ public void setVertexId(String vertexId) {
+ this.vertexId = vertexId;
+ }
+
+ public void setCreateTime(Long createTime) { this.createTime = createTime; }
+}
\ No newline at end of file
diff --git a/addons/impala-bridge/src/main/resources/atlas-log4j.xml b/addons/impala-bridge/src/main/resources/atlas-log4j.xml
new file mode 100644
index 0000000..97317a8
--- /dev/null
+++ b/addons/impala-bridge/src/main/resources/atlas-log4j.xml
@@ -0,0 +1,42 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
+ <appender name="FILE" class="org.apache.log4j.RollingFileAppender">
+ <param name="File" value="/var/log/hive/impala-bridge.log"/>
+ <param name="Append" value="true"/>
+ <param name="maxFileSize" value="100MB" />
+ <param name="maxBackupIndex" value="20" />
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%d %-5p - [%t:%x] ~ %m (%C{1}:%L)%n"/>
+ </layout>
+ </appender>
+
+ <logger name="org.apache.atlas.impala.ImpalaLineageTool" additivity="false">
+ <level value="info"/>
+ <appender-ref ref="FILE"/>
+ </logger>
+
+ <root>
+ <priority value="warn"/>
+ <appender-ref ref="FILE"/>
+ </root>
+</log4j:configuration>
\ No newline at end of file
diff --git a/addons/impala-bridge/src/main/resources/import-impala.sh b/addons/impala-bridge/src/main/resources/import-impala.sh
new file mode 100644
index 0000000..5ca8984
--- /dev/null
+++ b/addons/impala-bridge/src/main/resources/import-impala.sh
@@ -0,0 +1,109 @@
+#!/bin/bash
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License. See accompanying LICENSE file.
+#
+# resolve links - $0 may be a softlink
+PRG="${0}"
+
+[[ `uname -s` == *"CYGWIN"* ]] && CYGWIN=true
+
+while [ -h "${PRG}" ]; do
+ ls=`ls -ld "${PRG}"`
+ link=`expr "$ls" : '.*-> \(.*\)$'`
+ if expr "$link" : '/.*' > /dev/null; then
+ PRG="$link"
+ else
+ PRG=`dirname "${PRG}"`/"$link"
+ fi
+done
+
+BASEDIR=`dirname ${PRG}`
+
+if test -z "${JAVA_HOME}"
+then
+ JAVA_BIN=`which java`
+ JAR_BIN=`which jar`
+else
+ JAVA_BIN="${JAVA_HOME}/bin/java"
+ JAR_BIN="${JAVA_HOME}/bin/jar"
+fi
+export JAVA_BIN
+
+if [ ! -e "${JAVA_BIN}" ] || [ ! -e "${JAR_BIN}" ]; then
+ echo "$JAVA_BIN and/or $JAR_BIN not found on the system. Please make sure java and jar commands are available."
+ exit 1
+fi
+
+# Construct ATLAS_CONF where atlas-properties reside
+# assume the hive-server2 is installed and contains Atlas configuration
+# Otherwise, need to setup Atlas required properties and libraries before running this tool
+if [ ! -z "$HIVE_CONF_DIR" ]; then
+ HIVE_CONF=$HIVE_CONF_DIR
+elif [ ! -z "$HIVE_HOME" ]; then
+ HIVE_CONF="$HIVE_HOME/conf"
+elif [ -e /etc/hive/conf ]; then
+ HIVE_CONF="/etc/hive/conf"
+else
+ echo "Could not find a valid HIVE configuration for ATLAS"
+ exit 1
+fi
+if [ -z "$ATLAS_CONF" ]; then
+ export ATLAS_CONF=$HIVE_CONF
+fi
+
+# log dir for applications
+ATLAS_LOG_DIR="/var/log/atlas"
+ATLAS_LOG_FILE="impala-bridge.log"
+LOG_CONFIG="${BASEDIR}/atlas-log4j.xml"
+
+# Construct Atlas classpath.
+DIR=$PWD
+PARENT="$(dirname "$DIR")"
+GRANDPARENT="$(dirname "$PARENT")"
+LIB_PATH="$GRANDPARENT/server/webapp/atlas/WEB-INF/lib"
+echo "$LIB_PATH"
+# Construct Atlas classpath.
+for i in "$LIB_PATH/"*.jar; do
+ ATLASCPPATH="${ATLASCPPATH}:$i"
+done
+
+for i in "${BASEDIR}/"*.jar; do
+ ATLASCPPATH="${ATLASCPPATH}:$i"
+done
+
+echo "Logging: ${ATLAS_LOG_DIR}/${ATLAS_LOG_FILE}"
+echo "Log config: ${LOG_CONFIG}"
+
+TIME=`date %Y%m%d%H%M%s`
+CP="${ATLASCPPATH}:${ATLAS_CONF}"
+
+# If running in cygwin, convert pathnames and classpath to Windows format.
+if [ "${CYGWIN}" == "true" ]
+then
+ ATLAS_LOG_DIR=`cygpath -w ${ATLAS_LOG_DIR}`
+ ATLAS_LOG_FILE=`cygpath -w ${ATLAS_LOG_FILE}`
+ CP=`cygpath -w -p ${CP}`
+fi
+
+JAVA_PROPERTIES="$ATLAS_OPTS -Datlas.log.dir=$ATLAS_LOG_DIR -Datlas.log.file=$ATLAS_LOG_FILE -Dlog4j.configuration=file://$LOG_CONFIG"
+
+IMPORT_ARGS=$@
+JVM_ARGS=
+
+JAVA_PROPERTIES="${JAVA_PROPERTIES} ${JVM_ARGS}"
+"${JAVA_BIN}" ${JAVA_PROPERTIES} -cp "${CP}" org.apache.atlas.impala.ImpalaLineageTool $IMPORT_ARGS
+
+RETVAL=$?
+[ $RETVAL -eq 0 ] && echo Done!
+[ $RETVAL -ne 0 ] && echo Failed!
+exit $RETVAL
\ No newline at end of file
diff --git a/addons/impala-bridge/src/test/java/org/apache/atlas/impala/ImpalaLineageITBase.java b/addons/impala-bridge/src/test/java/org/apache/atlas/impala/ImpalaLineageITBase.java
new file mode 100644
index 0000000..cc62955
--- /dev/null
+++ b/addons/impala-bridge/src/test/java/org/apache/atlas/impala/ImpalaLineageITBase.java
@@ -0,0 +1,249 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.impala;
+
+import static org.apache.atlas.impala.hook.events.BaseImpalaEvent.ATTRIBUTE_QUALIFIED_NAME;
+import static org.apache.atlas.impala.hook.events.BaseImpalaEvent.ATTRIBUTE_RECENT_QUERIES;
+import static org.apache.atlas.impala.hook.events.BaseImpalaEvent.HIVE_TYPE_DB;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.fail;
+
+import java.util.Collections;
+import java.util.List;
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.impala.hook.AtlasImpalaHookContext;
+import org.apache.atlas.impala.hook.ImpalaLineageHook;
+import org.apache.atlas.impala.model.ImpalaDataType;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.utils.AuthenticationUtil;
+import org.apache.atlas.utils.ParamChecker;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.testng.annotations.BeforeClass;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+
+public class ImpalaLineageITBase {
+ private static final Logger LOG = LoggerFactory.getLogger(ImpalaLineageITBase.class);
+
+ public static final String DEFAULT_DB = "default";
+ public static final String SEP = ":".intern();
+ public static final String IO_SEP = "->".intern();
+ protected static final String DGI_URL = "http://localhost:21000/";
+ protected static final String CLUSTER_NAME = "primary";
+ protected static final String PART_FILE = "2015-01-01";
+ protected static final String INPUTS = "inputs";
+ protected static final String OUTPUTS = "outputs";
+ protected static AtlasClientV2 atlasClientV2;
+
+ private static final String REFERENCEABLE_ATTRIBUTE_NAME = "qualifiedName";
+ private static final String ATTR_NAME = "name";
+
+ // to push entity creation/update to HMS, so HMS hook can push the metadata notification
+ // to Atlas, then the lineage notification from this tool can be created at Atlas
+ protected static Driver driverWithoutContext;
+ protected static SessionState ss;
+ protected static HiveConf conf;
+
+
+ @BeforeClass
+ public void setUp() throws Exception {
+ //Set-up hive session
+ conf = new HiveConf();
+ conf.setClassLoader(Thread.currentThread().getContextClassLoader());
+ HiveConf conf = new HiveConf();
+ SessionState ss = new SessionState(conf);
+ ss = SessionState.start(ss);
+ SessionState.setCurrentSessionState(ss);
+ driverWithoutContext = new Driver(conf);
+
+ Configuration configuration = ApplicationProperties.get();
+
+ String[] atlasEndPoint = configuration.getStringArray(ImpalaLineageHook.ATLAS_ENDPOINT);
+ if (atlasEndPoint == null || atlasEndPoint.length == 0) {
+ atlasEndPoint = new String[]{DGI_URL};
+ }
+
+ if (!AuthenticationUtil.isKerberosAuthenticationEnabled()) {
+ atlasClientV2 = new AtlasClientV2(atlasEndPoint, new String[]{"admin", "admin"});
+ } else {
+ atlasClientV2 = new AtlasClientV2(atlasEndPoint);
+ }
+
+ }
+
+ protected String assertEntityIsRegistered(final String typeName, final String property, final String value,
+ final AssertPredicate assertPredicate) throws Exception {
+ waitFor(80000, new Predicate() {
+ @Override
+ public void evaluate() throws Exception {
+ AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo = atlasClientV2.getEntityByAttribute(typeName, Collections
+ .singletonMap(property,value));
+ AtlasEntity entity = atlasEntityWithExtInfo.getEntity();
+ assertNotNull(entity);
+ if (assertPredicate != null) {
+ assertPredicate.assertOnEntity(entity);
+ }
+ }
+ });
+ AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo = atlasClientV2.getEntityByAttribute(typeName, Collections.singletonMap(property,value));
+ AtlasEntity entity = atlasEntityWithExtInfo.getEntity();
+ return (String) entity.getGuid();
+ }
+
+ protected String assertProcessIsRegistered(String processQFName, String queryString) throws Exception {
+ try {
+ LOG.debug("Searching for process with query {}", processQFName);
+
+ return assertEntityIsRegistered(ImpalaDataType.IMPALA_PROCESS.getName(), ATTRIBUTE_QUALIFIED_NAME, processQFName, new AssertPredicate() {
+ @Override
+ public void assertOnEntity(final AtlasEntity entity) throws Exception {
+ List<String> recentQueries = (List<String>) entity.getAttribute(ATTRIBUTE_RECENT_QUERIES);
+
+ Assert.assertEquals(recentQueries.get(0), lower(queryString));
+ }
+ });
+ } catch(Exception e) {
+ LOG.error("Exception : ", e);
+ throw e;
+ }
+ }
+
+ protected String assertDatabaseIsRegistered(String dbName) throws Exception {
+ return assertDatabaseIsRegistered(dbName, null);
+ }
+
+ protected String assertDatabaseIsRegistered(String dbName, AssertPredicate assertPredicate) throws Exception {
+ LOG.debug("Searching for database: {}", dbName);
+
+ String dbQualifiedName = dbName + AtlasImpalaHookContext.QNAME_SEP_CLUSTER_NAME +
+ CLUSTER_NAME;
+
+ dbQualifiedName = dbQualifiedName.toLowerCase();
+
+ return assertEntityIsRegistered(HIVE_TYPE_DB, REFERENCEABLE_ATTRIBUTE_NAME, dbQualifiedName, assertPredicate);
+ }
+
+ protected String createDatabase() throws Exception {
+ String dbName = dbName();
+
+ return createDatabase(dbName);
+ }
+
+ protected String createDatabase(String dbName) throws Exception {
+ runCommand("CREATE DATABASE IF NOT EXISTS " + dbName);
+
+ return dbName;
+ }
+
+ protected String createTable(String dbName, String columnsString) throws Exception {
+ return createTable(dbName, columnsString, false);
+ }
+
+ protected String createTable(String dbName, String columnsString, boolean isPartitioned) throws Exception {
+ String tableName = tableName();
+ return createTable(dbName, tableName, columnsString, isPartitioned);
+ }
+
+ protected String createTable(String dbName, String tableName, String columnsString, boolean isPartitioned) throws Exception {
+ runCommand("CREATE TABLE IF NOT EXISTS " + dbName + "." + tableName + " " + columnsString + " comment 'table comment' " + (isPartitioned ? " partitioned by(dt string)" : ""));
+
+ return dbName + "." + tableName;
+ }
+
+ public interface AssertPredicate {
+ void assertOnEntity(AtlasEntity entity) throws Exception;
+ }
+
+ public interface Predicate {
+ /**
+ * Perform a predicate evaluation.
+ *
+ * @return the boolean result of the evaluation.
+ * @throws Exception thrown if the predicate evaluation could not evaluate.
+ */
+ void evaluate() throws Exception;
+ }
+
+ /**
+ * Wait for a condition, expressed via a {@link Predicate} to become true.
+ *
+ * @param timeout maximum time in milliseconds to wait for the predicate to become true.
+ * @param predicate predicate waiting on.
+ */
+ protected void waitFor(int timeout, Predicate predicate) throws Exception {
+ ParamChecker.notNull(predicate, "predicate");
+ long mustEnd = System.currentTimeMillis() + timeout;
+
+ while (true) {
+ try {
+ predicate.evaluate();
+ return;
+ } catch(Error | Exception e) {
+ if (System.currentTimeMillis() >= mustEnd) {
+ fail("Assertions failed. Failing after waiting for timeout " + timeout + " msecs", e);
+ }
+ LOG.debug("Waiting up to {} msec as assertion failed", mustEnd - System.currentTimeMillis(), e);
+ Thread.sleep(5000);
+ }
+ }
+ }
+
+ public static String lower(String str) {
+ if (StringUtils.isEmpty(str)) {
+ return null;
+ }
+ return str.toLowerCase().trim();
+ }
+
+ protected void runCommand(String cmd) throws Exception {
+ runCommandWithDelay(cmd, 0);
+ }
+
+ protected void runCommandWithDelay(String cmd, int sleepMs) throws Exception {
+ runCommandWithDelay(driverWithoutContext, cmd, sleepMs);
+ }
+
+ protected void runCommandWithDelay(Driver driver, String cmd, int sleepMs) throws Exception {
+ LOG.debug("Running command '{}'", cmd);
+ CommandProcessorResponse response = driver.run(cmd);
+ assertEquals(response.getResponseCode(), 0);
+ if (sleepMs != 0) {
+ Thread.sleep(sleepMs);
+ }
+ }
+
+ protected String random() {
+ return RandomStringUtils.randomAlphanumeric(10);
+ }
+
+ protected String tableName() {
+ return "table_" + random();
+ }
+ protected String dbName() {return "db_" + random();}
+}
diff --git a/addons/impala-bridge/src/test/java/org/apache/atlas/impala/ImpalaLineageToolIT.java b/addons/impala-bridge/src/test/java/org/apache/atlas/impala/ImpalaLineageToolIT.java
new file mode 100644
index 0000000..05190b6
--- /dev/null
+++ b/addons/impala-bridge/src/test/java/org/apache/atlas/impala/ImpalaLineageToolIT.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.impala;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.atlas.impala.hook.AtlasImpalaHookContext;
+import org.apache.atlas.impala.hook.ImpalaLineageHook;
+import org.apache.atlas.impala.model.ImpalaQuery;
+import org.testng.annotations.Test;
+
+public class ImpalaLineageToolIT extends ImpalaLineageITBase {
+ private static String dir = System.getProperty("user.dir") + "/src/test/resources/";
+ private static String IMPALA = dir + "impala3.json";
+ private static String IMPALA_WAL = dir + "WALimpala.wal";
+
+ /**
+ * This tests
+ * 1) ImpalaLineageTool can parse one lineage file that contains "create view" command lineage
+ * 2) Lineage is sent to Atlas
+ * 3) Atlas can get this lineage from Atlas
+ */
+ @Test
+ public void testCreateViewFromFile() {
+ List<ImpalaQuery> lineageList = new ArrayList<>();
+ ImpalaLineageHook impalaLineageHook = new ImpalaLineageHook();
+
+ try {
+ // create database and tables to simulate Impala behavior that Impala updates metadata
+ // to HMS and HMSHook sends the metadata to Atlas, which has to happen before
+ // Atlas can handle lineage notification
+ String dbName = "db_1";
+ createDatabase(dbName);
+
+ String sourceTableName = "table_1";
+ createTable(dbName, sourceTableName,"(id string, count int)", false);
+
+ String targetTableName = "view_1";
+ createTable(dbName, targetTableName,"(count int, id string)", false);
+
+ // process lineage record, and send corresponding notification to Atlas
+ String[] args = new String[]{"-d", "./", "-p", "impala"};
+ ImpalaLineageTool toolInstance = new ImpalaLineageTool(args);
+ toolInstance.importHImpalaEntities(impalaLineageHook, IMPALA, IMPALA_WAL);
+
+ // verify the process is saved in Atlas
+ // the value is from info in IMPALA_3
+ String createTime = new Long((long)(1554750072)*1000).toString();
+ String processQFName =
+ "db_1.view_1" + AtlasImpalaHookContext.QNAME_SEP_CLUSTER_NAME +
+ CLUSTER_NAME + AtlasImpalaHookContext.QNAME_SEP_PROCESS + createTime;
+
+ processQFName = processQFName.toLowerCase();
+
+ assertProcessIsRegistered(processQFName,
+ "create view db_1.view_1 as select count, id from db_1.table_1");
+
+ } catch (Exception e) {
+ System.out.print("Appending file error");
+ }
+ }
+}
diff --git a/addons/impala-bridge/src/test/java/org/apache/atlas/impala/hook/ImpalaLineageHookIT.java b/addons/impala-bridge/src/test/java/org/apache/atlas/impala/hook/ImpalaLineageHookIT.java
new file mode 100644
index 0000000..86801e3
--- /dev/null
+++ b/addons/impala-bridge/src/test/java/org/apache/atlas/impala/hook/ImpalaLineageHookIT.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.impala.hook;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.atlas.impala.ImpalaLineageITBase;
+import org.apache.atlas.impala.hook.events.BaseImpalaEvent;
+import org.apache.atlas.impala.model.ImpalaDependencyType;
+import org.apache.atlas.impala.model.ImpalaVertexType;
+import org.apache.atlas.impala.model.LineageEdge;
+import org.apache.atlas.impala.model.ImpalaQuery;
+import org.apache.atlas.impala.model.LineageVertex;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.testng.Assert.assertFalse;
+
+public class ImpalaLineageHookIT extends ImpalaLineageITBase {
+ private static final Logger LOG = LoggerFactory.getLogger(ImpalaLineageHookIT.class);
+ private static ImpalaLineageHook impalaHook;
+
+ @BeforeClass
+ public void setUp() throws Exception {
+ super.setUp();
+ impalaHook = new ImpalaLineageHook();
+ }
+
+ @AfterClass
+ public void testClean() {
+ impalaHook = null;
+ }
+
+ @Test
+ public void testCreateView() throws Exception {
+ // first trigger HMS hook to create related entities
+ String dbName = createDatabase();
+ assertDatabaseIsRegistered(dbName);
+
+ String tableName = createTable(dbName, "(id string, count int)");
+ String viewName = createTable(dbName, "(count int, id string)");
+
+ // then process lineage record to push lineage to Atlas
+ ImpalaQuery queryObj = new ImpalaQuery();
+ List<LineageEdge> edges = new ArrayList<>();
+ List<LineageVertex> vertices = new ArrayList<>();
+
+ queryObj.setQueryText("create view " + viewName + " as select count, id from " + tableName);
+ queryObj.setQueryId("3a441d0c130962f8:7f634aec00000000");
+ queryObj.setHash("64ff0425ccdfaada53e3f2fd76f566f7");
+ queryObj.setUser("admin");
+ queryObj.setTimestamp((long)1554750072);
+ queryObj.setEndTime((long)1554750554);
+
+ LineageEdge edge1 = new LineageEdge();
+ edge1.setSources( Arrays.asList((long)1));
+ edge1.setTargets( Arrays.asList((long)0));
+ edge1.setEdgeType(ImpalaDependencyType.PROJECTION);
+ edges.add(edge1);
+
+ LineageEdge edge2 = new LineageEdge();
+ edge2.setSources( Arrays.asList((long)3));
+ edge2.setTargets( Arrays.asList((long)2));
+ edge2.setEdgeType(ImpalaDependencyType.PROJECTION);
+ edges.add(edge2);
+
+ queryObj.setEdges(edges);
+
+ LineageVertex vertex1 = new LineageVertex();
+ vertex1.setId((long)0);
+ vertex1.setVertexType(ImpalaVertexType.COLUMN);
+ vertex1.setVertexId(viewName + ".count");
+ vertices.add(vertex1);
+
+ LineageVertex vertex2 = new LineageVertex();
+ vertex2.setId((long)1);
+ vertex2.setVertexType(ImpalaVertexType.COLUMN);
+ vertex2.setVertexId(tableName + ".count");
+ vertices.add(vertex2);
+
+ LineageVertex vertex3 = new LineageVertex();
+ vertex3.setId((long)2);
+ vertex3.setVertexType(ImpalaVertexType.COLUMN);
+ vertex3.setVertexId(viewName + ".id");
+ vertices.add(vertex3);
+
+ LineageVertex vertex4 = new LineageVertex();
+ vertex4.setId((long)3);
+ vertex4.setVertexType(ImpalaVertexType.COLUMN);
+ vertex4.setVertexId(tableName + ".id");
+ vertices.add(vertex4);
+
+ LineageVertex vertex5 = new LineageVertex();
+ vertex5.setId((long)4);
+ vertex5.setVertexType(ImpalaVertexType.TABLE);
+ vertex5.setVertexId(viewName);
+ vertex5.setCreateTime(System.currentTimeMillis() / 1000);
+ vertices.add(vertex5);
+
+ LineageVertex vertex6 = new LineageVertex();
+ vertex6.setId((long)5);
+ vertex6.setVertexType(ImpalaVertexType.TABLE);
+ vertex6.setVertexId(tableName);
+ vertex6.setCreateTime(System.currentTimeMillis() / 1000);
+ vertices.add(vertex6);
+
+ queryObj.setVertices(vertices);
+
+ try {
+ impalaHook.process(queryObj);
+ String createTime = new Long(BaseImpalaEvent.getTableCreateTime(vertex5)).toString();
+ String processQFName =
+ vertex5.getVertexId() + AtlasImpalaHookContext.QNAME_SEP_CLUSTER_NAME +
+ CLUSTER_NAME + AtlasImpalaHookContext.QNAME_SEP_PROCESS + createTime;
+
+ processQFName = processQFName.toLowerCase();
+
+ assertProcessIsRegistered(processQFName, queryObj.getQueryText());
+ } catch (Exception ex) {
+ LOG.error("process create_view failed: ", ex);
+ assertFalse(true);
+ }
+ }
+
+
+
+
+}
diff --git a/addons/impala-bridge/src/test/resources/atlas-application.properties b/addons/impala-bridge/src/test/resources/atlas-application.properties
new file mode 100644
index 0000000..898b69c
--- /dev/null
+++ b/addons/impala-bridge/src/test/resources/atlas-application.properties
@@ -0,0 +1,124 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+######### Atlas Server Configs #########
+atlas.rest.address=http://localhost:31000
+
+######### Graph Database Configs #########
+
+
+# Graph database implementation. Value inserted by maven.
+atlas.graphdb.backend=org.apache.atlas.repository.graphdb.janus.AtlasJanusGraphDatabase
+
+# Graph Storage
+atlas.graph.storage.backend=berkeleyje
+
+# Entity repository implementation
+atlas.EntityAuditRepository.impl=org.apache.atlas.repository.audit.InMemoryEntityAuditRepository
+
+# Graph Search Index Backend
+atlas.graph.index.search.backend=solr
+
+#Berkeley storage directory
+atlas.graph.storage.directory=${sys:atlas.data}/berkley
+
+#hbase
+#For standalone mode , specify localhost
+#for distributed mode, specify zookeeper quorum here
+
+atlas.graph.storage.hostname=${graph.storage.hostname}
+atlas.graph.storage.hbase.regions-per-server=1
+atlas.graph.storage.lock.wait-time=10000
+
+#ElasticSearch
+atlas.graph.index.search.directory=${sys:atlas.data}/es
+atlas.graph.index.search.elasticsearch.client-only=false
+atlas.graph.index.search.elasticsearch.local-mode=true
+atlas.graph.index.search.elasticsearch.create.sleep=2000
+
+# Solr cloud mode properties
+atlas.graph.index.search.solr.mode=cloud
+atlas.graph.index.search.solr.zookeeper-url=${solr.zk.address}
+atlas.graph.index.search.solr.embedded=true
+atlas.graph.index.search.max-result-set-size=150
+
+
+######### Notification Configs #########
+atlas.notification.embedded=true
+
+atlas.kafka.zookeeper.connect=localhost:19026
+atlas.kafka.bootstrap.servers=localhost:19027
+atlas.kafka.data=${sys:atlas.data}/kafka
+atlas.kafka.zookeeper.session.timeout.ms=4000
+atlas.kafka.zookeeper.sync.time.ms=20
+atlas.kafka.consumer.timeout.ms=4000
+atlas.kafka.auto.commit.interval.ms=100
+atlas.kafka.hook.group.id=atlas
+atlas.kafka.entities.group.id=atlas_entities
+#atlas.kafka.auto.commit.enable=false
+
+atlas.kafka.enable.auto.commit=false
+atlas.kafka.auto.offset.reset=earliest
+atlas.kafka.session.timeout.ms=30000
+atlas.kafka.offsets.topic.replication.factor=1
+
+
+
+######### Entity Audit Configs #########
+atlas.audit.hbase.tablename=ATLAS_ENTITY_AUDIT_EVENTS
+atlas.audit.zookeeper.session.timeout.ms=1000
+atlas.audit.hbase.zookeeper.quorum=localhost
+atlas.audit.hbase.zookeeper.property.clientPort=19026
+
+######### Security Properties #########
+
+# SSL config
+atlas.enableTLS=false
+atlas.server.https.port=31443
+
+######### Security Properties #########
+
+hbase.security.authentication=simple
+
+atlas.hook.falcon.synchronous=true
+
+######### JAAS Configuration ########
+
+atlas.jaas.KafkaClient.loginModuleName = com.sun.security.auth.module.Krb5LoginModule
+atlas.jaas.KafkaClient.loginModuleControlFlag = required
+atlas.jaas.KafkaClient.option.useKeyTab = true
+atlas.jaas.KafkaClient.option.storeKey = true
+atlas.jaas.KafkaClient.option.serviceName = kafka
+atlas.jaas.KafkaClient.option.keyTab = /etc/security/keytabs/atlas.service.keytab
+atlas.jaas.KafkaClient.option.principal = atlas/_HOST@EXAMPLE.COM
+
+######### High Availability Configuration ########
+atlas.server.ha.enabled=false
+#atlas.server.ids=id1
+#atlas.server.address.id1=localhost:21000
+
+######### Atlas Authorization #########
+atlas.authorizer.impl=none
+# atlas.authorizer.impl=simple
+# atlas.authorizer.simple.authz.policy.file=atlas-simple-authz-policy.json
+
+######### Atlas Authentication #########
+atlas.authentication.method.file=true
+atlas.authentication.method.ldap.type=none
+atlas.authentication.method.kerberos=false
+# atlas.authentication.method.file.filename=users-credentials.properties
diff --git a/addons/impala-bridge/src/test/resources/atlas-log4j.xml b/addons/impala-bridge/src/test/resources/atlas-log4j.xml
new file mode 100644
index 0000000..de7f55c
--- /dev/null
+++ b/addons/impala-bridge/src/test/resources/atlas-log4j.xml
@@ -0,0 +1,130 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
+ <appender name="console" class="org.apache.log4j.ConsoleAppender">
+ <param name="Target" value="System.out"/>
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%d %-5p - [%t:%x] ~ %m (%C{1}:%L)%n"/>
+ </layout>
+ </appender>
+
+ <appender name="FILE" class="org.apache.log4j.RollingFileAppender">
+ <param name="File" value="${atlas.log.dir}/${atlas.log.file}"/>
+ <param name="Append" value="true"/>
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%d %-5p - [%t:%x] ~ %m (%C{1}:%L)%n"/>
+ <param name="maxFileSize" value="100MB" />
+ <param name="maxBackupIndex" value="20" />
+ </layout>
+ </appender>
+
+ <appender name="AUDIT" class="org.apache.log4j.RollingFileAppender">
+ <param name="File" value="${atlas.log.dir}/audit.log"/>
+ <param name="Append" value="true"/>
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%d %x %m%n"/>
+ <param name="maxFileSize" value="100MB" />
+ <param name="maxBackupIndex" value="20" />
+ </layout>
+ </appender>
+
+ <appender name="METRICS" class="org.apache.log4j.RollingFileAppender">
+ <param name="File" value="${atlas.log.dir}/metric.log"/>
+ <param name="Append" value="true"/>
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%d %x %m%n"/>
+ <param name="maxFileSize" value="100MB" />
+ </layout>
+ </appender>
+
+ <appender name="FAILED" class="org.apache.log4j.RollingFileAppender">
+ <param name="File" value="${atlas.log.dir}/failed.log"/>
+ <param name="Append" value="true"/>
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%d %m"/>
+ <param name="maxFileSize" value="100MB" />
+ <param name="maxBackupIndex" value="20" />
+ </layout>
+ </appender>
+
+ <logger name="org.apache.atlas" additivity="false">
+ <level value="info"/>
+ <appender-ref ref="FILE"/>
+ </logger>
+
+ <logger name="org.apache.atlas.impala.ImpalaLineageTool" additivity="false">
+ <level value="debug"/>
+ <appender-ref ref="FILE"/>
+ </logger>
+
+ <logger name="org.apache.atlas.impala.hook.ImpalaLineageHook" additivity="false">
+ <level value="debug"/>
+ <appender-ref ref="FILE"/>
+ </logger>
+
+ <logger name="org.janusgraph" additivity="false">
+ <level value="warn"/>
+ <appender-ref ref="FILE"/>
+ </logger>
+
+ <logger name="org.springframework" additivity="false">
+ <level value="warn"/>
+ <appender-ref ref="console"/>
+ </logger>
+
+ <logger name="org.eclipse" additivity="false">
+ <level value="warn"/>
+ <appender-ref ref="console"/>
+ </logger>
+
+ <logger name="com.sun.jersey" additivity="false">
+ <level value="warn"/>
+ <appender-ref ref="console"/>
+ </logger>
+
+ <!-- to avoid logs - The configuration log.flush.interval.messages = 1 was supplied but isn't a known config -->
+ <logger name="org.apache.kafka.common.config.AbstractConfig" additivity="false">
+ <level value="error"/>
+ <appender-ref ref="FILE"/>
+ </logger>
+
+ <logger name="AUDIT" additivity="false">
+ <level value="info"/>
+ <appender-ref ref="AUDIT"/>
+ </logger>
+
+ <logger name="METRICS" additivity="false">
+ <level value="debug"/>
+ <appender-ref ref="METRICS"/>
+ </logger>
+
+ <logger name="FAILED" additivity="false">
+ <level value="info"/>
+ <appender-ref ref="AUDIT"/>
+ </logger>
+
+ <root>
+ <priority value="warn"/>
+ <appender-ref ref="FILE"/>
+ </root>
+
+</log4j:configuration>
diff --git a/addons/impala-bridge/src/test/resources/hive-site.xml b/addons/impala-bridge/src/test/resources/hive-site.xml
new file mode 100644
index 0000000..edd0c54
--- /dev/null
+++ b/addons/impala-bridge/src/test/resources/hive-site.xml
@@ -0,0 +1,94 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<configuration>
+ <property>
+ <name>hive.exec.submit.local.task.via.child</name>
+ <value>false</value>
+ </property>
+
+ <property>
+ <name>mapreduce.framework.name</name>
+ <value>local</value>
+ </property>
+
+ <property>
+ <name>fs.default.name</name>
+ <value>file:///</value>
+ </property>
+
+ <property>
+ <name>hive.metastore.event.listeners</name>
+ <value>org.apache.atlas.hive.hook.HiveMetastoreHookImpl</value>
+ </property>
+
+ <property>
+ <name>hive.support.concurrency</name>
+ <value>false</value>
+ </property>
+
+ <property>
+ <name>hive.metastore.warehouse.dir</name>
+ <value>${project.basedir}/target/metastore</value>
+ </property>
+
+ <property>
+ <name>javax.jdo.option.ConnectionURL</name>
+ <value>jdbc:derby:;databaseName=${project.basedir}/target/metastore_db;create=true</value>
+ </property>
+
+ <property>
+ <name>atlas.hook.hive.synchronous</name>
+ <value>true</value>
+ </property>
+
+ <property>
+ <name>fs.pfile.impl</name>
+ <value>org.apache.hadoop.fs.ProxyLocalFileSystem</value>
+ </property>
+
+ <property>
+ <name>hive.in.test</name>
+ <value>true</value>
+ </property>
+
+ <property>
+ <name>hive.zookeeper.quorum</name>
+ <value>localhost:19026</value>
+ </property>
+
+ <property>
+ <name>hive.metastore.schema.verification</name>
+ <value>false</value>
+ </property>
+
+ <property>
+ <name>hive.metastore.disallow.incompatible.col.type.changes</name>
+ <value>false</value>
+ </property>
+
+ <property>
+ <name>datanucleus.schema.autoCreateAll</name>
+ <value>true</value>
+ </property>
+
+ <property>
+ <name>hive.exec.scratchdir</name>
+ <value>${project.basedir}/target/scratchdir</value>
+ </property>
+
+</configuration>
\ No newline at end of file
diff --git a/addons/impala-bridge/src/test/resources/impala1.json b/addons/impala-bridge/src/test/resources/impala1.json
new file mode 100644
index 0000000..8f747f6
--- /dev/null
+++ b/addons/impala-bridge/src/test/resources/impala1.json
@@ -0,0 +1,84 @@
+{
+ "queryText": "INSERT INTO TABLE db_wlwspnwgfp.tbl_wlwspnwgfp_7 VALUES (1, 'foo', 'foo', 'foo', 'foo', 1), (2, 'foo', 'foo', 'foo', 'foo', 2)",
+ "queryId": "4c4cb5dc22194e20:c440f5fe00000000",
+ "hash": "d936d531989bd0f46d636bd05fc2540c",
+ "user": "impala@GCE.ABCDEFGH.COM",
+ "timestamp": 1553528501,
+ "endTime": 1553528505,
+ "edges": [
+ {
+ "sources": [],
+ "targets": [
+ 0
+ ],
+ "edgeType": "PROJECTION"
+ },
+ {
+ "sources": [],
+ "targets": [
+ 1
+ ],
+ "edgeType": "PROJECTION"
+ },
+ {
+ "sources": [],
+ "targets": [
+ 2
+ ],
+ "edgeType": "PROJECTION"
+ },
+ {
+ "sources": [],
+ "targets": [
+ 3
+ ],
+ "edgeType": "PROJECTION"
+ },
+ {
+ "sources": [],
+ "targets": [
+ 4
+ ],
+ "edgeType": "PROJECTION"
+ },
+ {
+ "sources": [],
+ "targets": [
+ 5
+ ],
+ "edgeType": "PROJECTION"
+ }
+ ],
+ "vertices": [
+ {
+ "id": 0,
+ "vertexType": "COLUMN",
+ "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_7.col_tbl_wlwspnwgfp_7_id"
+ },
+ {
+ "id": 1,
+ "vertexType": "COLUMN",
+ "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_7.col_tbl_wlwspnwgfp_7_name"
+ },
+ {
+ "id": 2,
+ "vertexType": "COLUMN",
+ "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_7.col_tbl_wlwspnwgfp_7_street"
+ },
+ {
+ "id": 3,
+ "vertexType": "COLUMN",
+ "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_7.col_tbl_wlwspnwgfp_7_city"
+ },
+ {
+ "id": 4,
+ "vertexType": "COLUMN",
+ "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_7.col_tbl_wlwspnwgfp_7_state"
+ },
+ {
+ "id": 5,
+ "vertexType": "COLUMN",
+ "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_7.col_tbl_wlwspnwgfp_7_zipcode"
+ }
+ ]
+}
\ No newline at end of file
diff --git a/addons/impala-bridge/src/test/resources/impala2.json b/addons/impala-bridge/src/test/resources/impala2.json
new file mode 100644
index 0000000..239797b
--- /dev/null
+++ b/addons/impala-bridge/src/test/resources/impala2.json
@@ -0,0 +1,306 @@
+{
+ "queryText": "with a AS (SELECT * FROM db_wlwspnwgfp.tbl_wlwspnwgfp_1), b AS (SELECT * FROM db_wlwspnwgfp.tbl_wlwspnwgfp_2), c AS (SELECT * FROM db_wlwspnwgfp.tbl_wlwspnwgfp_3), d AS (SELECT * FROM db_wlwspnwgfp.tbl_wlwspnwgfp_4), e AS (SELECT * FROM db_wlwspnwgfp.tbl_wlwspnwgfp_5), f AS (SELECT * FROM db_wlwspnwgfp.tbl_wlwspnwgfp_6) INSERT INTO table db_wlwspnwgfp.tbl_wlwspnwgfp_7 SELECT * FROM a UNION SELECT * FROM b UNION SELECT * FROM c UNION SELECT * FROM d UNION SELECT * FROM e U [...]
+ "queryId": "b9423a1de88a33c3:997879c000000000",
+ "hash": "a6ff7959c66c23499346eef791c66439",
+ "user": "impala@GCE.ABCDEFGH.COM",
+ "timestamp": 1553528521,
+ "endTime": 1553528525,
+ "edges": [
+ {
+ "sources": [
+ 1,
+ 2,
+ 3,
+ 4,
+ 5,
+ 6
+ ],
+ "targets": [
+ 0
+ ],
+ "edgeType": "PROJECTION"
+ },
+ {
+ "sources": [
+ 8,
+ 9,
+ 10,
+ 11,
+ 12,
+ 13
+ ],
+ "targets": [
+ 7
+ ],
+ "edgeType": "PROJECTION"
+ },
+ {
+ "sources": [
+ 15,
+ 16,
+ 17,
+ 18,
+ 19,
+ 20
+ ],
+ "targets": [
+ 14
+ ],
+ "edgeType": "PROJECTION"
+ },
+ {
+ "sources": [
+ 22,
+ 23,
+ 24,
+ 25,
+ 26,
+ 27
+ ],
+ "targets": [
+ 21
+ ],
+ "edgeType": "PROJECTION"
+ },
+ {
+ "sources": [
+ 29,
+ 30,
+ 31,
+ 32,
+ 33,
+ 34
+ ],
+ "targets": [
+ 28
+ ],
+ "edgeType": "PROJECTION"
+ },
+ {
+ "sources": [
+ 36,
+ 37,
+ 38,
+ 39,
+ 40,
+ 41
+ ],
+ "targets": [
+ 35
+ ],
+ "edgeType": "PROJECTION"
+ }
+ ],
+ "vertices": [
+ {
+ "id": 0,
+ "vertexType": "COLUMN",
+ "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_7.col_tbl_wlwspnwgfp_7_id"
+ },
+ {
+ "id": 1,
+ "vertexType": "COLUMN",
+ "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_1.col_tbl_wlwspnwgfp_1_id"
+ },
+ {
+ "id": 2,
+ "vertexType": "COLUMN",
+ "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_2.col_tbl_wlwspnwgfp_2_id"
+ },
+ {
+ "id": 3,
+ "vertexType": "COLUMN",
+ "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_3.col_tbl_wlwspnwgfp_3_id"
+ },
+ {
+ "id": 4,
+ "vertexType": "COLUMN",
+ "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_4.col_tbl_wlwspnwgfp_4_id"
+ },
+ {
+ "id": 5,
+ "vertexType": "COLUMN",
+ "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_5.col_tbl_wlwspnwgfp_5_id"
+ },
+ {
+ "id": 6,
+ "vertexType": "COLUMN",
+ "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_6.col_tbl_wlwspnwgfp_6_id"
+ },
+ {
+ "id": 7,
+ "vertexType": "COLUMN",
+ "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_7.col_tbl_wlwspnwgfp_7_name"
+ },
+ {
+ "id": 8,
+ "vertexType": "COLUMN",
+ "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_1.col_tbl_wlwspnwgfp_1_name"
+ },
+ {
+ "id": 9,
+ "vertexType": "COLUMN",
+ "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_2.col_tbl_wlwspnwgfp_2_name"
+ },
+ {
+ "id": 10,
+ "vertexType": "COLUMN",
+ "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_3.col_tbl_wlwspnwgfp_3_name"
+ },
+ {
+ "id": 11,
+ "vertexType": "COLUMN",
+ "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_4.col_tbl_wlwspnwgfp_4_name"
+ },
+ {
+ "id": 12,
+ "vertexType": "COLUMN",
+ "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_5.col_tbl_wlwspnwgfp_5_name"
+ },
+ {
+ "id": 13,
+ "vertexType": "COLUMN",
+ "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_6.col_tbl_wlwspnwgfp_6_name"
+ },
+ {
+ "id": 14,
+ "vertexType": "COLUMN",
+ "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_7.col_tbl_wlwspnwgfp_7_street"
+ },
+ {
+ "id": 15,
+ "vertexType": "COLUMN",
+ "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_1.col_tbl_wlwspnwgfp_1_street"
+ },
+ {
+ "id": 16,
+ "vertexType": "COLUMN",
+ "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_2.col_tbl_wlwspnwgfp_2_street"
+ },
+ {
+ "id": 17,
+ "vertexType": "COLUMN",
+ "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_3.col_tbl_wlwspnwgfp_3_street"
+ },
+ {
+ "id": 18,
+ "vertexType": "COLUMN",
+ "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_4.col_tbl_wlwspnwgfp_4_street"
+ },
+ {
+ "id": 19,
+ "vertexType": "COLUMN",
+ "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_5.col_tbl_wlwspnwgfp_5_street"
+ },
+ {
+ "id": 20,
+ "vertexType": "COLUMN",
+ "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_6.col_tbl_wlwspnwgfp_6_street"
+ },
+ {
+ "id": 21,
+ "vertexType": "COLUMN",
+ "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_7.col_tbl_wlwspnwgfp_7_city"
+ },
+ {
+ "id": 22,
+ "vertexType": "COLUMN",
+ "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_1.col_tbl_wlwspnwgfp_1_city"
+ },
+ {
+ "id": 23,
+ "vertexType": "COLUMN",
+ "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_2.col_tbl_wlwspnwgfp_2_city"
+ },
+ {
+ "id": 24,
+ "vertexType": "COLUMN",
+ "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_3.col_tbl_wlwspnwgfp_3_city"
+ },
+ {
+ "id": 25,
+ "vertexType": "COLUMN",
+ "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_4.col_tbl_wlwspnwgfp_4_city"
+ },
+ {
+ "id": 26,
+ "vertexType": "COLUMN",
+ "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_5.col_tbl_wlwspnwgfp_5_city"
+ },
+ {
+ "id": 27,
+ "vertexType": "COLUMN",
+ "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_6.col_tbl_wlwspnwgfp_6_city"
+ },
+ {
+ "id": 28,
+ "vertexType": "COLUMN",
+ "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_7.col_tbl_wlwspnwgfp_7_state"
+ },
+ {
+ "id": 29,
+ "vertexType": "COLUMN",
+ "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_1.col_tbl_wlwspnwgfp_1_state"
+ },
+ {
+ "id": 30,
+ "vertexType": "COLUMN",
+ "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_2.col_tbl_wlwspnwgfp_2_state"
+ },
+ {
+ "id": 31,
+ "vertexType": "COLUMN",
+ "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_3.col_tbl_wlwspnwgfp_3_state"
+ },
+ {
+ "id": 32,
+ "vertexType": "COLUMN",
+ "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_4.col_tbl_wlwspnwgfp_4_state"
+ },
+ {
+ "id": 33,
+ "vertexType": "COLUMN",
+ "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_5.col_tbl_wlwspnwgfp_5_state"
+ },
+ {
+ "id": 34,
+ "vertexType": "COLUMN",
+ "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_6.col_tbl_wlwspnwgfp_6_state"
+ },
+ {
+ "id": 35,
+ "vertexType": "COLUMN",
+ "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_7.col_tbl_wlwspnwgfp_7_zipcode"
+ },
+ {
+ "id": 36,
+ "vertexType": "COLUMN",
+ "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_1.col_tbl_wlwspnwgfp_1_zipcode"
+ },
+ {
+ "id": 37,
+ "vertexType": "COLUMN",
+ "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_2.col_tbl_wlwspnwgfp_2_zipcode"
+ },
+ {
+ "id": 38,
+ "vertexType": "COLUMN",
+ "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_3.col_tbl_wlwspnwgfp_3_zipcode"
+ },
+ {
+ "id": 39,
+ "vertexType": "COLUMN",
+ "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_4.col_tbl_wlwspnwgfp_4_zipcode"
+ },
+ {
+ "id": 40,
+ "vertexType": "COLUMN",
+ "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_5.col_tbl_wlwspnwgfp_5_zipcode"
+ },
+ {
+ "id": 41,
+ "vertexType": "COLUMN",
+ "vertexId": "db_wlwspnwgfp.tbl_wlwspnwgfp_6.col_tbl_wlwspnwgfp_6_zipcode"
+ }
+ ]
+}
\ No newline at end of file
diff --git a/addons/impala-bridge/src/test/resources/impala3.json b/addons/impala-bridge/src/test/resources/impala3.json
new file mode 100644
index 0000000..6a7d171
--- /dev/null
+++ b/addons/impala-bridge/src/test/resources/impala3.json
@@ -0,0 +1,62 @@
+{
+ "queryText":"create view db_1.view_1 as select count, id from db_1.table_1",
+ "queryId":"3a441d0c130962f8:7f634aec00000000",
+ "hash":"64ff0425ccdfaada53e3f2fd76f566f7",
+ "user":"admin",
+ "timestamp":1554750072,
+ "endTime":1554750554,
+ "edges":[
+ {
+ "sources":[
+ 1
+ ],
+ "targets":[
+ 0
+ ],
+ "edgeType":"PROJECTION"
+ },
+ {
+ "sources":[
+ 3
+ ],
+ "targets":[
+ 2
+ ],
+ "edgeType":"PROJECTION"
+ }
+ ],
+ "vertices":[
+ {
+ "id":0,
+ "vertexType":"COLUMN",
+ "vertexId":"db_1.view_1.count"
+ },
+ {
+ "id":1,
+ "vertexType":"COLUMN",
+ "vertexId":"db_1.table_1.count"
+ },
+ {
+ "id":2,
+ "vertexType":"COLUMN",
+ "vertexId":"db_1.view_1.id"
+ },
+ {
+ "id":3,
+ "vertexType":"COLUMN",
+ "vertexId":"db_1.table_1.id"
+ },
+ {
+ "id":4,
+ "vertexType":"TABLE",
+ "vertexId":"db_1.table_1",
+ "createTime":1554750070
+ },
+ {
+ "id":5,
+ "vertexType":"TABLE",
+ "vertexId":"db_1.view_1",
+ "createTime":1554750072
+ }
+ ]
+}
\ No newline at end of file
diff --git a/addons/impala-bridge/src/test/resources/users-credentials.properties b/addons/impala-bridge/src/test/resources/users-credentials.properties
new file mode 100644
index 0000000..3fc3bb1
--- /dev/null
+++ b/addons/impala-bridge/src/test/resources/users-credentials.properties
@@ -0,0 +1,3 @@
+#username=group::sha256-password
+admin=ADMIN::8c6976e5b5410415bde908bd4dee15dfb167a9c873fc4bb8a81f6f2ab448a918
+rangertagsync=RANGER_TAG_SYNC::e3f67240f5117d1753c940dae9eea772d36ed5fe9bd9c94a300e40413f1afb9d
diff --git a/addons/models/1000-Hadoop/1090-impala_model.json b/addons/models/1000-Hadoop/1090-impala_model.json
new file mode 100644
index 0000000..dad50ff
--- /dev/null
+++ b/addons/models/1000-Hadoop/1090-impala_model.json
@@ -0,0 +1,229 @@
+{
+ "enumDefs": [],
+ "structDefs": [],
+ "classificationDefs": [],
+ "entityDefs": [
+ {
+ "name": "impala_process",
+ "superTypes": [
+ "Process"
+ ],
+ "serviceType": "impala",
+ "typeVersion": "1.0",
+ "attributeDefs": [
+ {
+ "name": "startTime",
+ "typeName": "date",
+ "cardinality": "SINGLE",
+ "isIndexable": false,
+ "isOptional": false,
+ "isUnique": false
+ },
+ {
+ "name": "endTime",
+ "typeName": "date",
+ "cardinality": "SINGLE",
+ "isIndexable": false,
+ "isOptional": false,
+ "isUnique": false
+ },
+ {
+ "name": "userName",
+ "typeName": "string",
+ "cardinality": "SINGLE",
+ "isIndexable": true,
+ "isOptional": false,
+ "isUnique": false
+ },
+ {
+ "name": "operationType",
+ "typeName": "string",
+ "cardinality": "SINGLE",
+ "isIndexable": true,
+ "isOptional": false,
+ "isUnique": false
+ },
+ {
+ "name": "queryText",
+ "typeName": "string",
+ "cardinality": "SINGLE",
+ "isIndexable": false,
+ "isOptional": false,
+ "isUnique": false
+ },
+ {
+ "name": "queryPlan",
+ "typeName": "string",
+ "cardinality": "SINGLE",
+ "isIndexable": false,
+ "isOptional": false,
+ "isUnique": false
+ },
+ {
+ "name": "queryId",
+ "typeName": "string",
+ "cardinality": "SINGLE",
+ "isIndexable": false,
+ "isOptional": false,
+ "isUnique": false
+ },
+ {
+ "name": "recentQueries",
+ "typeName": "array<string>",
+ "cardinality": "LIST",
+ "isIndexable": false,
+ "isOptional": true,
+ "isUnique": false
+ },
+ {
+ "name": "clusterName",
+ "typeName": "string",
+ "cardinality": "SINGLE",
+ "isIndexable": false,
+ "isOptional": true,
+ "includeInNotification": true,
+ "isUnique": false
+ },
+ {
+ "name": "queryGraph",
+ "typeName": "string",
+ "cardinality": "SINGLE",
+ "isIndexable": false,
+ "isOptional": true,
+ "isUnique": false
+ }
+ ]
+ },
+ {
+ "name" : "impala_column_lineage",
+ "superTypes" : [
+ "Process"
+ ],
+ "serviceType": "impala",
+ "typeVersion" : "1.0",
+ "attributeDefs" : [
+ {
+ "name": "dependencyType",
+ "typeName": "string",
+ "cardinality" : "SINGLE",
+ "isIndexable": false,
+ "isOptional": false,
+ "isUnique": false
+ }
+ ]
+ },
+ {
+ "name" : "impala_process_execution",
+ "superTypes" : [
+ "ProcessExecution"
+ ],
+ "serviceType": "impala",
+ "typeVersion" : "1.0",
+ "attributeDefs" : [
+ {
+ "name": "startTime",
+ "typeName": "date",
+ "cardinality": "SINGLE",
+ "isIndexable": false,
+ "isOptional": false,
+ "isUnique": false
+ },
+ {
+ "name": "endTime",
+ "typeName": "date",
+ "cardinality": "SINGLE",
+ "isIndexable": false,
+ "isOptional": false,
+ "isUnique": false
+ },
+ {
+ "name": "userName",
+ "typeName": "string",
+ "cardinality": "SINGLE",
+ "isIndexable": true,
+ "isOptional": false,
+ "isUnique": false
+ },
+ {
+ "name": "queryText",
+ "typeName": "string",
+ "cardinality": "SINGLE",
+ "isIndexable": false,
+ "isOptional": false,
+ "isUnique": false
+ },
+ {
+ "name": "queryGraph",
+ "typeName": "string",
+ "cardinality": "SINGLE",
+ "isIndexable": false,
+ "isOptional": true,
+ "isUnique": false
+ },
+ {
+ "name": "queryId",
+ "typeName": "string",
+ "cardinality": "SINGLE",
+ "isIndexable": false,
+ "isOptional": false,
+ "isUnique": false
+ },
+ {
+ "name": "queryPlan",
+ "typeName": "string",
+ "cardinality": "SINGLE",
+ "isIndexable": false,
+ "isOptional": false,
+ "isUnique": false
+ },
+ {
+ "name": "hostName",
+ "typeName": "string",
+ "cardinality": "SINGLE",
+ "isIndexable": true,
+ "isOptional": false,
+ "isUnique": false
+ }
+ ]
+ }
+ ],
+ "relationshipDefs": [
+ {
+ "name": "impala_process_column_lineage",
+ "serviceType": "impala",
+ "typeVersion": "1.0",
+ "relationshipCategory": "COMPOSITION",
+ "endDef1": {
+ "type": "impala_column_lineage",
+ "name": "query",
+ "isContainer": false,
+ "cardinality": "SINGLE"
+ },
+ "endDef2": {
+ "type": "impala_process",
+ "name": "columnLineages",
+ "isContainer": true,
+ "cardinality": "SET"
+ },
+ "propagateTags": "NONE"
+ },
+ {
+ "name": "impala_process_process_executions",
+ "serviceType": "impala",
+ "typeVersion": "1.0",
+ "relationshipCategory": "COMPOSITION",
+ "endDef1": {
+ "type": "impala_process",
+ "name": "processExecutions",
+ "cardinality": "SET",
+ "isContainer": true
+ },
+ "endDef2": {
+ "type": "impala_process_execution",
+ "name": "process",
+ "cardinality": "SINGLE"
+ },
+ "propagateTags": "NONE"
+ }
+ ]
+}
diff --git a/pom.xml b/pom.xml
index ae4dfdc..7ad24ea 100644
--- a/pom.xml
+++ b/pom.xml
@@ -784,6 +784,7 @@
<module>addons/hbase-testing-util</module>
<module>addons/kafka-bridge</module>
<module>tools/classification-updater</module>
+ <module>addons/impala-bridge</module>
<module>distro</module>
</modules>