You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2018/03/19 21:08:53 UTC
[6/6] atlas git commit: ATLAS-2491: Hive hook should use v2
notifications
ATLAS-2491: Hive hook should use v2 notifications
Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/6e02ec5b
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/6e02ec5b
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/6e02ec5b
Branch: refs/heads/master
Commit: 6e02ec5b3a97ee2bfaf16ef5e875c14e383d5823
Parents: dee8a2d
Author: rmani <rm...@hortonworks.com>
Authored: Tue Mar 6 08:50:18 2018 -0800
Committer: Madhan Neethiraj <ma...@apache.org>
Committed: Mon Mar 19 12:20:10 2018 -0700
----------------------------------------------------------------------
.../atlas/falcon/bridge/FalconBridge.java | 3 +-
addons/hive-bridge/pom.xml | 6 +
.../atlas/hive/bridge/ColumnLineageUtils.java | 158 --
.../atlas/hive/bridge/HiveMetaStoreBridge.java | 1016 +++++++------
.../atlas/hive/hook/AtlasHiveHookContext.java | 91 ++
.../org/apache/atlas/hive/hook/HiveHook.java | 1201 ++-------------
.../atlas/hive/hook/events/AlterDatabase.java | 41 +
.../atlas/hive/hook/events/AlterTable.java | 42 +
.../hive/hook/events/AlterTableRename.java | 173 +++
.../hive/hook/events/AlterTableRenameCol.java | 99 ++
.../atlas/hive/hook/events/BaseHiveEvent.java | 831 ++++++++++
.../atlas/hive/hook/events/CreateDatabase.java | 75 +
.../hive/hook/events/CreateHiveProcess.java | 195 +++
.../atlas/hive/hook/events/CreateTable.java | 93 ++
.../atlas/hive/hook/events/DropDatabase.java | 68 +
.../atlas/hive/hook/events/DropTable.java | 61 +
.../apache/atlas/hive/rewrite/ASTRewriter.java | 26 -
.../atlas/hive/rewrite/HiveASTRewriter.java | 95 --
.../atlas/hive/rewrite/LiteralRewriter.java | 76 -
.../atlas/hive/rewrite/RewriteContext.java | 48 -
.../atlas/hive/rewrite/RewriteException.java | 24 -
.../java/org/apache/atlas/hive/HiveITBase.java | 541 ++++++-
.../atlas/hive/bridge/ColumnLineageUtils.java | 161 ++
.../hive/bridge/HiveLiteralRewriterTest.java | 68 -
.../hive/bridge/HiveMetaStoreBridgeTest.java | 184 ++-
.../hive/bridge/HiveMetastoreBridgeIT.java | 53 +-
.../org/apache/atlas/hive/hook/HiveHookIT.java | 1427 ++++++++++--------
.../org/apache/atlas/sqoop/hook/SqoopHook.java | 3 +-
.../apache/atlas/storm/hook/StormAtlasHook.java | 3 +-
29 files changed, 4076 insertions(+), 2786 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/atlas/blob/6e02ec5b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/bridge/FalconBridge.java
----------------------------------------------------------------------
diff --git a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/bridge/FalconBridge.java b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/bridge/FalconBridge.java
index 11d13b3..cbf002f 100644
--- a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/bridge/FalconBridge.java
+++ b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/bridge/FalconBridge.java
@@ -68,6 +68,7 @@ public class FalconBridge {
public static final String RUNSON = "runs-on";
public static final String STOREDIN = "stored-in";
public static final String FREQUENCY = "frequency";
+ public static final String ATTRIBUTE_DB = "db";
/**
* Creates cluster entity
@@ -357,7 +358,7 @@ public class FalconBridge {
tableRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
HiveMetaStoreBridge.getTableQualifiedName(clusterName, dbName, tableName));
tableRef.set(AtlasClient.NAME, tableName.toLowerCase());
- tableRef.set(HiveMetaStoreBridge.DB, dbRef);
+ tableRef.set(ATTRIBUTE_DB, dbRef);
entities.add(tableRef);
return entities;
http://git-wip-us.apache.org/repos/asf/atlas/blob/6e02ec5b/addons/hive-bridge/pom.xml
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/pom.xml b/addons/hive-bridge/pom.xml
index 14e4295..b3ebe8e 100755
--- a/addons/hive-bridge/pom.xml
+++ b/addons/hive-bridge/pom.xml
@@ -106,6 +106,12 @@
<dependency>
<groupId>org.apache.atlas</groupId>
+ <artifactId>atlas-client-v2</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.atlas</groupId>
<artifactId>atlas-notification</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/atlas/blob/6e02ec5b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/ColumnLineageUtils.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/ColumnLineageUtils.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/ColumnLineageUtils.java
deleted file mode 100644
index ba10008..0000000
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/ColumnLineageUtils.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.atlas.hive.bridge;
-
-import org.apache.atlas.AtlasClient;
-import org.apache.atlas.hive.model.HiveDataTypes;
-import org.apache.atlas.v1.model.instance.Referenceable;
-import org.apache.hadoop.hive.ql.hooks.LineageInfo;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class ColumnLineageUtils {
- public static final Logger LOG = LoggerFactory.getLogger(ColumnLineageUtils.class);
- public static class HiveColumnLineageInfo {
- public final String depenendencyType;
- public final String expr;
- public final String inputColumn;
-
- HiveColumnLineageInfo(LineageInfo.Dependency d, String inputCol) {
- depenendencyType = d.getType().name();
- expr = d.getExpr();
- inputColumn = inputCol;
- }
-
- @Override
- public String toString(){
- return inputColumn;
- }
- }
-
- public static String getQualifiedName(LineageInfo.DependencyKey key){
- String db = key.getDataContainer().getTable().getDbName();
- String table = key.getDataContainer().getTable().getTableName();
- String col = key.getFieldSchema().getName();
- return db + "." + table + "." + col;
- }
-
- public static Map<String, List<HiveColumnLineageInfo>> buildLineageMap(LineageInfo lInfo) {
- Map<String, List<HiveColumnLineageInfo>> m = new HashMap<>();
-
- for (Map.Entry<LineageInfo.DependencyKey, LineageInfo.Dependency> e : lInfo.entrySet()) {
- List<HiveColumnLineageInfo> l = new ArrayList<>();
- String k = getQualifiedName(e.getKey());
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("buildLineageMap(): key={}; value={}", e.getKey(), e.getValue());
- }
-
- Collection<LineageInfo.BaseColumnInfo> baseCols = getBaseCols(e.getValue());
-
- if (baseCols != null) {
- for (LineageInfo.BaseColumnInfo iCol : baseCols) {
- String db = iCol.getTabAlias().getTable().getDbName();
- String table = iCol.getTabAlias().getTable().getTableName();
- String colQualifiedName = iCol.getColumn() == null ? db + "." + table : db + "." + table + "." + iCol.getColumn().getName();
- l.add(new HiveColumnLineageInfo(e.getValue(), colQualifiedName));
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Setting lineage --> Input: {} ==> Output : {}", l, k);
- }
- m.put(k, l);
- }
- }
- return m;
- }
-
- static Collection<LineageInfo.BaseColumnInfo> getBaseCols(LineageInfo.Dependency lInfoDep) {
- Collection<LineageInfo.BaseColumnInfo> ret = null;
-
- if (lInfoDep != null) {
- try {
- Method getBaseColsMethod = lInfoDep.getClass().getMethod("getBaseCols");
-
- Object retGetBaseCols = getBaseColsMethod.invoke(lInfoDep);
-
- if (retGetBaseCols != null) {
- if (retGetBaseCols instanceof Collection) {
- ret = (Collection) retGetBaseCols;
- } else {
- LOG.warn("{}: unexpected return type from LineageInfo.Dependency.getBaseCols(), expected type {}",
- retGetBaseCols.getClass().getName(), "Collection");
- }
- }
- } catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException ex) {
- LOG.warn("getBaseCols()", ex);
- }
- }
-
- return ret;
- }
-
- static String[] extractComponents(String qualifiedName) {
- String[] comps = qualifiedName.split("\\.");
- int lastIdx = comps.length - 1;
- int atLoc = comps[lastIdx].indexOf('@');
- if (atLoc > 0) {
- comps[lastIdx] = comps[lastIdx].substring(0, atLoc);
- }
- return comps;
- }
-
- static void populateColumnReferenceableMap(Map<String, Referenceable> m,
- Referenceable r) {
- if (r.getTypeName().equals(HiveDataTypes.HIVE_TABLE.getName())) {
- String qName = (String) r.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME);
- String[] qNameComps = extractComponents(qName);
- for (Referenceable col : (List<Referenceable>) r.get(HiveMetaStoreBridge.COLUMNS)) {
- String cName = (String) col.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME);
- String[] colQNameComps = extractComponents(cName);
- String colQName = colQNameComps[0] + "." + colQNameComps[1] + "." + colQNameComps[2];
- m.put(colQName, col);
- }
- String tableQName = qNameComps[0] + "." + qNameComps[1];
- m.put(tableQName, r);
- }
- }
-
-
- public static Map<String, Referenceable> buildColumnReferenceableMap(List<Referenceable> inputs,
- List<Referenceable> outputs) {
- Map<String, Referenceable> m = new HashMap<>();
-
- for (Referenceable r : inputs) {
- populateColumnReferenceableMap(m, r);
- }
-
- for (Referenceable r : outputs) {
- populateColumnReferenceableMap(m, r);
- }
-
- return m;
- }
-}
http://git-wip-us.apache.org/repos/asf/atlas/blob/6e02ec5b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
index 4047c16..51df8d2 100755
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
@@ -21,22 +21,28 @@ package org.apache.atlas.hive.bridge;
import com.google.common.annotations.VisibleForTesting;
import com.sun.jersey.api.client.ClientResponse;
import org.apache.atlas.ApplicationProperties;
-import org.apache.atlas.AtlasClient;
-import org.apache.atlas.AtlasConstants;
+import org.apache.atlas.AtlasClientV2;
import org.apache.atlas.AtlasServiceException;
-import org.apache.atlas.hive.hook.HiveHook;
+import org.apache.atlas.hive.hook.events.BaseHiveEvent;
import org.apache.atlas.hive.model.HiveDataTypes;
import org.apache.atlas.hook.AtlasHookException;
-import org.apache.atlas.type.AtlasType;
+import org.apache.atlas.model.instance.AtlasEntityHeader;
+import org.apache.atlas.model.instance.EntityMutationResponse;
+import org.apache.atlas.model.instance.EntityMutations;
import org.apache.atlas.utils.AuthenticationUtil;
import org.apache.atlas.utils.HdfsNameServiceResolver;
-import org.apache.atlas.v1.model.instance.Id;
-import org.apache.atlas.v1.model.instance.Referenceable;
-import org.apache.atlas.v1.model.instance.Struct;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.model.instance.AtlasStruct;
+import org.apache.commons.collections.CollectionUtils;
+
import org.apache.commons.cli.BasicParser;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.Options;
+import org.apache.commons.collections.MapUtils;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.commons.lang.StringUtils;
@@ -57,119 +63,221 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
-import java.util.Date;
+import java.util.Collections;
import java.util.List;
+import java.util.Map;
-import static org.apache.atlas.hive.hook.HiveHook.CONF_PREFIX;
+import static org.apache.atlas.hive.hook.events.BaseHiveEvent.*;
/**
* A Bridge Utility that imports metadata from the Hive Meta Store
* and registers them in Atlas.
*/
-public class HiveMetaStoreBridge {
- private static final String DEFAULT_DGI_URL = "http://localhost:21000/";
- public static final String HIVE_CLUSTER_NAME = "atlas.cluster.name";
- public static final String DEFAULT_CLUSTER_NAME = "primary";
- public static final String DESCRIPTION_ATTR = "description";
-
- public static final String TEMP_TABLE_PREFIX = "_temp-";
-
- private final String clusterName;
- public static final long MILLIS_CONVERT_FACTOR = 1000;
-
- public static final String ATLAS_ENDPOINT = "atlas.rest.address";
-
- public static final String COMMENT = "comment";
- public static final String PARAMETERS = "parameters";
- public static final String COLUMNS = "columns";
- public static final String POSITION = "position";
- public static final String PART_COLS = "partitionKeys";
- public static final String TABLE_ALIAS_LIST = "aliases";
- public static final String STORAGE_NUM_BUCKETS = "numBuckets";
- public static final String STORAGE_IS_STORED_AS_SUB_DIRS = "storedAsSubDirectories";
- public static final String TABLE = "table";
- public static final String DB = "db";
- public static final String STORAGE_DESC = "sd";
- public static final String STORAGE_DESC_INPUT_FMT = "inputFormat";
- public static final String STORAGE_DESC_OUTPUT_FMT = "outputFormat";
- public static final String LOCATION = "location";
- public static final String TABLE_TYPE_ATTR = "tableType";
- public static final String CREATE_TIME = "createTime";
- public static final String LAST_ACCESS_TIME = "lastAccessTime";
- public static final String HDFS_PATH = "hdfs_path";
- public static final String SEP = ":".intern();
+public class HiveMetaStoreBridge {
private static final Logger LOG = LoggerFactory.getLogger(HiveMetaStoreBridge.class);
- public final Hive hiveClient;
- private final AtlasClient atlasClient;
- private final boolean convertHdfsPathToLowerCase;
+ public static final String CONF_PREFIX = "atlas.hook.hive.";
+ public static final String HIVE_CLUSTER_NAME = "atlas.cluster.name";
+ public static final String HDFS_PATH_CONVERT_TO_LOWER_CASE = CONF_PREFIX + "hdfs_path.convert_to_lowercase";
+ public static final String DEFAULT_CLUSTER_NAME = "primary";
+ public static final String TEMP_TABLE_PREFIX = "_temp-";
+ public static final String ATLAS_ENDPOINT = "atlas.rest.address";
+ public static final String SEP = ":".intern();
+ public static final String HDFS_PATH = "hdfs_path";
+
+ private static final String DEFAULT_ATLAS_URL = "http://localhost:21000/";
private final HdfsNameServiceResolver hdfsNameServiceResolver = HdfsNameServiceResolver.getInstance();
+ private final String clusterName;
+ private final Hive hiveClient;
+ private final AtlasClientV2 atlasClientV2;
+ private final boolean convertHdfsPathToLowerCase;
- HiveMetaStoreBridge(String clusterName, Hive hiveClient, AtlasClient atlasClient) {
- this(clusterName, hiveClient, atlasClient, true);
- }
- HiveMetaStoreBridge(String clusterName, Hive hiveClient, AtlasClient atlasClient, boolean convertHdfsPathToLowerCase) {
- this.clusterName = clusterName;
- this.hiveClient = hiveClient;
- this.atlasClient = atlasClient;
- this.convertHdfsPathToLowerCase = convertHdfsPathToLowerCase;
- }
+ public static void main(String[] args) throws AtlasHookException {
+ try {
+ Configuration atlasConf = ApplicationProperties.get();
+ String[] atlasEndpoint = atlasConf.getStringArray(ATLAS_ENDPOINT);
- public String getClusterName() {
- return clusterName;
+ if (atlasEndpoint == null || atlasEndpoint.length == 0){
+ atlasEndpoint = new String[] { DEFAULT_ATLAS_URL };
+ }
+
+ AtlasClientV2 atlasClientV2;
+
+ if (!AuthenticationUtil.isKerberosAuthenticationEnabled()) {
+ String[] basicAuthUsernamePassword = AuthenticationUtil.getBasicAuthenticationInput();
+
+ atlasClientV2 = new AtlasClientV2(atlasEndpoint, basicAuthUsernamePassword);
+ } else {
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+
+ atlasClientV2 = new AtlasClientV2(ugi, ugi.getShortUserName(), atlasEndpoint);
+ }
+
+ Options options = new Options();
+ CommandLineParser parser = new BasicParser();
+ CommandLine cmd = parser.parse(options, args);
+ boolean failOnError = cmd.hasOption("failOnError");
+
+ HiveMetaStoreBridge hiveMetaStoreBridge = new HiveMetaStoreBridge(atlasConf, new HiveConf(), atlasClientV2);
+
+ hiveMetaStoreBridge.importHiveMetadata(failOnError);
+ } catch(Exception e) {
+ throw new AtlasHookException("HiveMetaStoreBridge.main() failed.", e);
+ }
}
/**
* Construct a HiveMetaStoreBridge.
* @param hiveConf {@link HiveConf} for Hive component in the cluster
*/
- public HiveMetaStoreBridge(Configuration atlasProperties, HiveConf hiveConf) throws Exception {
- this(atlasProperties, hiveConf, null);
+ public HiveMetaStoreBridge(Configuration atlasProperties, HiveConf hiveConf, AtlasClientV2 atlasClientV2) throws Exception {
+ this(atlasProperties.getString(HIVE_CLUSTER_NAME, DEFAULT_CLUSTER_NAME), Hive.get(hiveConf), atlasClientV2, atlasProperties.getBoolean(HDFS_PATH_CONVERT_TO_LOWER_CASE, true));
}
/**
* Construct a HiveMetaStoreBridge.
* @param hiveConf {@link HiveConf} for Hive component in the cluster
*/
- public HiveMetaStoreBridge(Configuration atlasProperties, HiveConf hiveConf, AtlasClient atlasClient) throws Exception {
- this(atlasProperties.getString(HIVE_CLUSTER_NAME, DEFAULT_CLUSTER_NAME), Hive.get(hiveConf), atlasClient, atlasProperties.getBoolean(CONF_PREFIX + "hdfs_path.convert_to_lowercase", true));
+ public HiveMetaStoreBridge(Configuration atlasProperties, HiveConf hiveConf) throws Exception {
+ this(atlasProperties, hiveConf, null);
+ }
+
+ HiveMetaStoreBridge(String clusterName, Hive hiveClient, AtlasClientV2 atlasClientV2) {
+ this(clusterName, hiveClient, atlasClientV2, true);
}
- AtlasClient getAtlasClient() {
- return atlasClient;
+ HiveMetaStoreBridge(String clusterName, Hive hiveClient, AtlasClientV2 atlasClientV2, boolean convertHdfsPathToLowerCase) {
+ this.clusterName = clusterName;
+ this.hiveClient = hiveClient;
+ this.atlasClientV2 = atlasClientV2;
+ this.convertHdfsPathToLowerCase = convertHdfsPathToLowerCase;
+ }
+
+ public String getClusterName() {
+ return clusterName;
+ }
+
+ public Hive getHiveClient() {
+ return hiveClient;
+ }
+
+ public AtlasClientV2 getAtlasClient() {
+ return atlasClientV2;
}
public boolean isConvertHdfsPathToLowerCase() {
return convertHdfsPathToLowerCase;
}
- void importHiveMetadata(boolean failOnError) throws Exception {
- LOG.info("Importing hive metadata");
+
+ @VisibleForTesting
+ public void importHiveMetadata(boolean failOnError) throws Exception {
+ LOG.info("Importing Hive metadata");
+
importDatabases(failOnError);
}
private void importDatabases(boolean failOnError) throws Exception {
List<String> databases = hiveClient.getAllDatabases();
+
+ LOG.info("Found {} databases", databases.size());
+
for (String databaseName : databases) {
- Referenceable dbReference = registerDatabase(databaseName);
+ AtlasEntityWithExtInfo dbEntity = registerDatabase(databaseName);
- if (dbReference != null) {
- importTables(dbReference, databaseName, failOnError);
+ if (dbEntity != null) {
+ importTables(dbEntity.getEntity(), databaseName, failOnError);
}
}
}
/**
- * Create a Hive Database entity
- * @param hiveDB The Hive {@link Database} object from which to map properties
- * @return new Hive Database entity
- * @throws HiveException
+ * Imports all tables for the given db
+ * @param dbEntity
+ * @param databaseName
+ * @param failOnError
+ * @throws Exception
*/
- public Referenceable createDBInstance(Database hiveDB) throws HiveException {
- return createOrUpdateDBInstance(hiveDB, null);
+ private int importTables(AtlasEntity dbEntity, String databaseName, final boolean failOnError) throws Exception {
+ List<String> hiveTables = hiveClient.getAllTables(databaseName);
+
+ LOG.info("Found {} tables in database {}", hiveTables.size(), databaseName);
+
+ int tablesImported = 0;
+
+ try {
+ for (String tableName : hiveTables) {
+ int imported = importTable(dbEntity, databaseName, tableName, failOnError);
+
+ tablesImported += imported;
+ }
+ } finally {
+ if (tablesImported == hiveTables.size()) {
+ LOG.info("Successfully imported all {} tables from database {}", tablesImported, databaseName);
+ } else {
+ LOG.error("Imported {} of {} tables from database {}. Please check logs for errors during import", tablesImported, hiveTables.size(), databaseName);
+ }
+ }
+
+ return tablesImported;
+ }
+
+ @VisibleForTesting
+ public int importTable(AtlasEntity dbEntity, String databaseName, String tableName, final boolean failOnError) throws Exception {
+ try {
+ Table table = hiveClient.getTable(databaseName, tableName);
+ AtlasEntityWithExtInfo tableEntity = registerTable(dbEntity, table);
+
+ if (table.getTableType() == TableType.EXTERNAL_TABLE) {
+ String processQualifiedName = getTableProcessQualifiedName(clusterName, table);
+ AtlasEntityWithExtInfo processEntity = findProcessEntity(processQualifiedName);
+
+ if (processEntity == null) {
+ String tableLocation = isConvertHdfsPathToLowerCase() ? lower(table.getDataLocation().toString()) : table.getDataLocation().toString();
+ String query = getCreateTableString(table, tableLocation);
+ AtlasEntity pathInst = toHdfsPathEntity(tableLocation);
+ AtlasEntity tableInst = tableEntity.getEntity();
+ AtlasEntity processInst = new AtlasEntity(HiveDataTypes.HIVE_PROCESS.getName());
+ long now = System.currentTimeMillis();
+
+ processInst.setAttribute(ATTRIBUTE_QUALIFIED_NAME, processQualifiedName);
+ processInst.setAttribute(ATTRIBUTE_NAME, query);
+ processInst.setAttribute(ATTRIBUTE_CLUSTER_NAME, clusterName);
+ processInst.setAttribute(ATTRIBUTE_INPUTS, Collections.singletonList(BaseHiveEvent.getObjectId(pathInst)));
+ processInst.setAttribute(ATTRIBUTE_OUTPUTS, Collections.singletonList(BaseHiveEvent.getObjectId(tableInst)));
+ processInst.setAttribute(ATTRIBUTE_USER_NAME, table.getOwner());
+ processInst.setAttribute(ATTRIBUTE_START_TIME, now);
+ processInst.setAttribute(ATTRIBUTE_END_TIME, now);
+ processInst.setAttribute(ATTRIBUTE_OPERATION_TYPE, "CREATETABLE");
+ processInst.setAttribute(ATTRIBUTE_QUERY_TEXT, query);
+ processInst.setAttribute(ATTRIBUTE_QUERY_ID, query);
+ processInst.setAttribute(ATTRIBUTE_QUERY_PLAN, "{}");
+ processInst.setAttribute(ATTRIBUTE_RECENT_QUERIES, Collections.singletonList(query));
+
+ AtlasEntitiesWithExtInfo createTableProcess = new AtlasEntitiesWithExtInfo();
+
+ createTableProcess.addEntity(processInst);
+ createTableProcess.addEntity(pathInst);
+
+ registerInstances(createTableProcess);
+ } else {
+ LOG.info("Process {} is already registered", processQualifiedName);
+ }
+ }
+
+ return 1;
+ } catch (Exception e) {
+ LOG.error("Import failed for hive_table {}", tableName, e);
+
+ if (failOnError) {
+ throw e;
+ }
+
+ return 0;
+ }
}
/**
@@ -178,518 +286,506 @@ public class HiveMetaStoreBridge {
* @return
* @throws Exception
*/
- private Referenceable registerDatabase(String databaseName) throws Exception {
- Referenceable dbRef = getDatabaseReference(clusterName, databaseName);
- Database db = hiveClient.getDatabase(databaseName);
+ private AtlasEntityWithExtInfo registerDatabase(String databaseName) throws Exception {
+ AtlasEntityWithExtInfo ret = null;
+ Database db = hiveClient.getDatabase(databaseName);
if (db != null) {
- if (dbRef == null) {
- dbRef = createDBInstance(db);
- dbRef = registerInstance(dbRef);
+ ret = findDatabase(clusterName, databaseName);
+
+ if (ret == null) {
+ ret = registerInstance(new AtlasEntityWithExtInfo(toDbEntity(db)));
} else {
- LOG.info("Database {} is already registered with id {}. Updating it.", databaseName, dbRef.getId().getId());
- dbRef = createOrUpdateDBInstance(db, dbRef);
- updateInstance(dbRef);
+ LOG.info("Database {} is already registered - id={}. Updating it.", databaseName, ret.getEntity().getGuid());
+
+ ret.setEntity(toDbEntity(db, ret.getEntity()));
+
+ updateInstance(ret);
}
}
- return dbRef;
+
+ return ret;
}
- private Referenceable createOrUpdateDBInstance(Database hiveDB, Referenceable dbRef) {
- LOG.info("Importing objects from databaseName : {}", hiveDB.getName());
+ private AtlasEntityWithExtInfo registerTable(AtlasEntity dbEntity, Table table) throws AtlasHookException {
+ try {
+ AtlasEntityWithExtInfo ret;
+ AtlasEntityWithExtInfo tableEntity = findTableEntity(table);
- if (dbRef == null) {
- dbRef = new Referenceable(HiveDataTypes.HIVE_DB.getName());
- }
- String dbName = hiveDB.getName().toLowerCase();
- dbRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getDBQualifiedName(clusterName, dbName));
- dbRef.set(AtlasClient.NAME, dbName);
- dbRef.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, clusterName);
- dbRef.set(DESCRIPTION_ATTR, hiveDB.getDescription());
-
- dbRef.set(LOCATION, hdfsNameServiceResolver.getPathWithNameServiceID(hiveDB.getLocationUri()));
- dbRef.set(PARAMETERS, hiveDB.getParameters());
- dbRef.set(AtlasClient.OWNER, hiveDB.getOwnerName());
- if (hiveDB.getOwnerType() != null) {
- dbRef.set("ownerType", hiveDB.getOwnerType().getValue());
+ if (tableEntity == null) {
+ tableEntity = toTableEntity(dbEntity, table);
+
+ ret = registerInstance(tableEntity);
+ } else {
+ LOG.info("Table {}.{} is already registered with id {}. Updating entity.", table.getDbName(), table.getTableName(), tableEntity.getEntity().getGuid());
+
+ ret = toTableEntity(dbEntity, table, tableEntity);
+
+ updateInstance(ret);
+ }
+
+ return ret;
+ } catch (Exception e) {
+ throw new AtlasHookException("HiveMetaStoreBridge.registerTable() failed.", e);
}
- return dbRef;
}
/**
* Registers an entity in atlas
- * @param referenceable
+ * @param entity
* @return
* @throws Exception
*/
- private Referenceable registerInstance(Referenceable referenceable) throws Exception {
- String typeName = referenceable.getTypeName();
- LOG.debug("creating instance of type {}", typeName);
+ private AtlasEntityWithExtInfo registerInstance(AtlasEntityWithExtInfo entity) throws Exception {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("creating {} entity: {}", entity.getEntity().getTypeName(), entity);
+ }
- String entityJSON = AtlasType.toV1Json(referenceable);
- LOG.debug("Submitting new entity {} = {}", referenceable.getTypeName(), entityJSON);
- List<String> guids = getAtlasClient().createEntity(entityJSON);
- LOG.debug("created instance for type {}, guid: {}", typeName, guids);
+ AtlasEntityWithExtInfo ret = null;
+ EntityMutationResponse response = atlasClientV2.createEntity(entity);
+ List<AtlasEntityHeader> createdEntities = response.getEntitiesByOperation(EntityMutations.EntityOperation.CREATE);
- return new Referenceable(guids.get(guids.size() - 1), referenceable.getTypeName(), null);
- }
+ if (CollectionUtils.isNotEmpty(createdEntities)) {
+ for (AtlasEntityHeader createdEntity : createdEntities) {
+ if (ret == null) {
+ ret = atlasClientV2.getEntityByGuid(createdEntity.getGuid());
- /**
- * Gets reference to the atlas entity for the database
- * @param databaseName database Name
- * @param clusterName cluster name
- * @return Reference for database if exists, else null
- * @throws Exception
- */
- private Referenceable getDatabaseReference(String clusterName, String databaseName) throws Exception {
- LOG.debug("Getting reference for database {}", databaseName);
- String typeName = HiveDataTypes.HIVE_DB.getName();
+ LOG.info("Created {} entity: name={}, guid={}", ret.getEntity().getTypeName(), ret.getEntity().getAttribute(ATTRIBUTE_QUALIFIED_NAME), ret.getEntity().getGuid());
+ } else if (ret.getEntity(createdEntity.getGuid()) == null) {
+ AtlasEntityWithExtInfo newEntity = atlasClientV2.getEntityByGuid(createdEntity.getGuid());
- return getEntityReference(typeName, getDBQualifiedName(clusterName, databaseName));
- }
+ ret.addReferredEntity(newEntity.getEntity());
- /**
- * Construct the qualified name used to uniquely identify a Database instance in Atlas.
- * @param clusterName Name of the cluster to which the Hive component belongs
- * @param dbName Name of the Hive database
- * @return Unique qualified name to identify the Database instance in Atlas.
- */
- public static String getDBQualifiedName(String clusterName, String dbName) {
- return String.format("%s@%s", dbName.toLowerCase(), clusterName);
- }
+ if (MapUtils.isNotEmpty(newEntity.getReferredEntities())) {
+ for (Map.Entry<String, AtlasEntity> entry : newEntity.getReferredEntities().entrySet()) {
+ ret.addReferredEntity(entry.getKey(), entry.getValue());
+ }
+ }
- private String getCreateTableString(Table table, String location){
- String colString = "";
- List<FieldSchema> colList = table.getAllCols();
- if ( colList != null) {
- for (FieldSchema col : colList) {
- colString += col.getName() + " " + col.getType() + ",";
- }
- if (colList.size() > 0) {
- colString = colString.substring(0, colString.length() - 1);
- colString = "(" + colString + ")";
+ LOG.info("Created {} entity: name={}, guid={}", newEntity.getEntity().getTypeName(), newEntity.getEntity().getAttribute(ATTRIBUTE_QUALIFIED_NAME), newEntity.getEntity().getGuid());
+ }
}
}
- String query = "create external table " + table.getTableName() + colString +
- " location '" + location + "'";
- return query;
+
+ return ret;
}
/**
- * Imports all tables for the given db
- * @param databaseReferenceable
- * @param databaseName
- * @param failOnError
+ * Registers an entity in atlas
+ * @param entities
+ * @return
* @throws Exception
*/
- private int importTables(Referenceable databaseReferenceable, String databaseName, final boolean failOnError) throws Exception {
- int tablesImported = 0;
- List<String> hiveTables = hiveClient.getAllTables(databaseName);
- LOG.info("Importing tables {} for db {}", hiveTables.toString(), databaseName);
- for (String tableName : hiveTables) {
- int imported = importTable(databaseReferenceable, databaseName, tableName, failOnError);
- tablesImported += imported;
+ private AtlasEntitiesWithExtInfo registerInstances(AtlasEntitiesWithExtInfo entities) throws Exception {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("creating {} entities: {}", entities.getEntities().size(), entities);
}
- if (tablesImported == hiveTables.size()) {
- LOG.info("Successfully imported all {} tables from {} ", tablesImported, databaseName);
- } else {
- LOG.error("Able to import {} tables out of {} tables from {}. Please check logs for import errors", tablesImported, hiveTables.size(), databaseName);
- }
+ AtlasEntitiesWithExtInfo ret = null;
+ EntityMutationResponse response = atlasClientV2.createEntities(entities);
+ List<AtlasEntityHeader> createdEntities = response.getEntitiesByOperation(EntityMutations.EntityOperation.CREATE);
- return tablesImported;
- }
+ if (CollectionUtils.isNotEmpty(createdEntities)) {
+ ret = new AtlasEntitiesWithExtInfo();
- @VisibleForTesting
- public int importTable(Referenceable databaseReferenceable, String databaseName, String tableName, final boolean failOnError) throws Exception {
- try {
- Table table = hiveClient.getTable(databaseName, tableName);
- Referenceable tableReferenceable = registerTable(databaseReferenceable, table);
- if (table.getTableType() == TableType.EXTERNAL_TABLE) {
- String tableQualifiedName = getTableProcessQualifiedName(clusterName, table);
- Referenceable process = getProcessReference(tableQualifiedName);
- if (process == null) {
- LOG.info("Attempting to register create table process for {}", tableQualifiedName);
- Referenceable lineageProcess = new Referenceable(HiveDataTypes.HIVE_PROCESS.getName());
- ArrayList<Referenceable> sourceList = new ArrayList<>();
- ArrayList<Referenceable> targetList = new ArrayList<>();
- String tableLocation = isConvertHdfsPathToLowerCase() ? HiveHook.lower(table.getDataLocation().toString()) : table.getDataLocation().toString();
- Referenceable path = fillHDFSDataSet(tableLocation);
- String query = getCreateTableString(table, tableLocation);
- sourceList.add(path);
- targetList.add(tableReferenceable);
- lineageProcess.set("inputs", sourceList);
- lineageProcess.set("outputs", targetList);
- lineageProcess.set("userName", table.getOwner());
- lineageProcess.set("startTime", new Date(System.currentTimeMillis()));
- lineageProcess.set("endTime", new Date(System.currentTimeMillis()));
- lineageProcess.set("operationType", "CREATETABLE");
- lineageProcess.set("queryText", query);
- lineageProcess.set("queryId", query);
- lineageProcess.set("queryPlan", "{}");
- lineageProcess.set("clusterName", clusterName);
- List<String> recentQueries = new ArrayList<>(1);
- recentQueries.add(query);
- lineageProcess.set("recentQueries", recentQueries);
- String processQualifiedName = getTableProcessQualifiedName(clusterName, table);
- lineageProcess.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, processQualifiedName);
- lineageProcess.set(AtlasClient.NAME, query);
- registerInstance(lineageProcess);
- } else {
- LOG.info("Process {} is already registered", process.toString());
+ for (AtlasEntityHeader createdEntity : createdEntities) {
+ AtlasEntityWithExtInfo entity = atlasClientV2.getEntityByGuid(createdEntity.getGuid());
+
+ ret.addEntity(entity.getEntity());
+
+ if (MapUtils.isNotEmpty(entity.getReferredEntities())) {
+ for (Map.Entry<String, AtlasEntity> entry : entity.getReferredEntities().entrySet()) {
+ ret.addReferredEntity(entry.getKey(), entry.getValue());
+ }
}
+
+ LOG.info("Created {} entity: name={}, guid={}", entity.getEntity().getTypeName(), entity.getEntity().getAttribute(ATTRIBUTE_QUALIFIED_NAME), entity.getEntity().getGuid());
}
- return 1;
- } catch (Exception e) {
- LOG.error("Import failed for hive_table {} ", tableName, e);
- if (failOnError) {
- throw e;
- }
- return 0;
}
- }
- /**
- * Gets reference for the table
- *
- * @param hiveTable
- * @return table reference if exists, else null
- * @throws Exception
- */
- private Referenceable getTableReference(Table hiveTable) throws Exception {
- LOG.debug("Getting reference for table {}.{}", hiveTable.getDbName(), hiveTable.getTableName());
-
- String typeName = HiveDataTypes.HIVE_TABLE.getName();
- String tblQualifiedName = getTableQualifiedName(getClusterName(), hiveTable.getDbName(), hiveTable.getTableName());
- return getEntityReference(typeName, tblQualifiedName);
+ return ret;
}
- private Referenceable getEntityReference(final String typeName, final String tblQualifiedName) throws AtlasServiceException {
- AtlasClient dgiClient = getAtlasClient();
- try {
- return dgiClient.getEntity(typeName, AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, tblQualifiedName);
- } catch (AtlasServiceException e) {
- if(e.getStatus() == ClientResponse.Status.NOT_FOUND) {
- return null;
- }
- throw e;
+ private void updateInstance(AtlasEntityWithExtInfo entity) throws AtlasServiceException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("updating {} entity: {}", entity.getEntity().getTypeName(), entity);
}
- }
- private Referenceable getProcessReference(String qualifiedName) throws Exception{
- LOG.debug("Getting reference for process {}", qualifiedName);
- String typeName = HiveDataTypes.HIVE_PROCESS.getName();
- return getEntityReference(typeName, qualifiedName);
+ atlasClientV2.updateEntity(entity);
+
+ LOG.info("Updated {} entity: name={}, guid={}", entity.getEntity().getTypeName(), entity.getEntity().getAttribute(ATTRIBUTE_QUALIFIED_NAME), entity.getEntity().getGuid());
}
/**
- * Construct the qualified name used to uniquely identify a Table instance in Atlas.
- * @param clusterName Name of the cluster to which the Hive component belongs
- * @param dbName Name of the Hive database to which the Table belongs
- * @param tableName Name of the Hive table
- * @return Unique qualified name to identify the Table instance in Atlas.
+ * Create a Hive Database entity
+ * @param hiveDB The Hive {@link Database} object from which to map properties
+ * @return new Hive Database AtlasEntity
+ * @throws HiveException
*/
- public static String getTableQualifiedName(String clusterName, String dbName, String tableName, boolean isTemporaryTable) {
- String tableTempName = tableName;
- if (isTemporaryTable) {
- if (SessionState.get() != null && SessionState.get().getSessionId() != null) {
- tableTempName = tableName + TEMP_TABLE_PREFIX + SessionState.get().getSessionId();
- } else {
- tableTempName = tableName + TEMP_TABLE_PREFIX + RandomStringUtils.random(10);
- }
- }
- return String.format("%s.%s@%s", dbName.toLowerCase(), tableTempName.toLowerCase(), clusterName);
+ private AtlasEntity toDbEntity(Database hiveDB) throws HiveException {
+ return toDbEntity(hiveDB, null);
}
+ private AtlasEntity toDbEntity(Database hiveDB, AtlasEntity dbEntity) {
+ if (dbEntity == null) {
+ dbEntity = new AtlasEntity(HiveDataTypes.HIVE_DB.getName());
+ }
+ String dbName = hiveDB.getName().toLowerCase();
- /**
- * Construct the qualified name used to uniquely identify a Table instance in Atlas.
- * @param clusterName Name of the cluster to which the Hive component belongs
- * @param table hive table for which the qualified name is needed
- * @return Unique qualified name to identify the Table instance in Atlas.
- */
- public static String getTableQualifiedName(String clusterName, Table table) {
- return getTableQualifiedName(clusterName, table.getDbName(), table.getTableName(), table.isTemporary());
- }
+ dbEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, getDBQualifiedName(clusterName, dbName));
+ dbEntity.setAttribute(ATTRIBUTE_NAME, dbName);
+ dbEntity.setAttribute(ATTRIBUTE_DESCRIPTION, hiveDB.getDescription());
+ dbEntity.setAttribute(ATTRIBUTE_OWNER, hiveDB.getOwnerName());
- public static String getTableProcessQualifiedName(String clusterName, Table table) {
- String tableQualifiedName = getTableQualifiedName(clusterName, table);
- Date createdTime = getTableCreatedTime(table);
- return tableQualifiedName + SEP + createdTime.getTime();
- }
+ dbEntity.setAttribute(ATTRIBUTE_CLUSTER_NAME, clusterName);
+ dbEntity.setAttribute(ATTRIBUTE_LOCATION, hdfsNameServiceResolver.getPathWithNameServiceID(hiveDB.getLocationUri()));
+ dbEntity.setAttribute(ATTRIBUTE_PARAMETERS, hiveDB.getParameters());
- /**
- * Construct the qualified name used to uniquely identify a Table instance in Atlas.
- * @param clusterName Name of the cluster to which the Hive component belongs
- * @param dbName Name of the Hive database to which the Table belongs
- * @param tableName Name of the Hive table
- * @return Unique qualified name to identify the Table instance in Atlas.
- */
- public static String getTableQualifiedName(String clusterName, String dbName, String tableName) {
- return getTableQualifiedName(clusterName, dbName, tableName, false);
- }
+ if (hiveDB.getOwnerType() != null) {
+ dbEntity.setAttribute(ATTRIBUTE_OWNER_TYPE, OWNER_TYPE_TO_ENUM_VALUE.get(hiveDB.getOwnerType().getValue()));
+ }
+ return dbEntity;
+ }
/**
* Create a new table instance in Atlas
- * @param dbReference reference to a created Hive database {@link Referenceable} to which this table belongs
+ * @param database AtlasEntity for Hive {@link AtlasEntity} to which this table belongs
* @param hiveTable reference to the Hive {@link Table} from which to map properties
- * @return Newly created Hive reference
+ * @return Newly created Hive AtlasEntity
* @throws Exception
*/
- public Referenceable createTableInstance(Referenceable dbReference, Table hiveTable)
- throws AtlasHookException {
- return createOrUpdateTableInstance(dbReference, null, hiveTable);
- }
-
- public static Date getTableCreatedTime(Table table) {
- return new Date(table.getTTable().getCreateTime() * MILLIS_CONVERT_FACTOR);
+ private AtlasEntityWithExtInfo toTableEntity(AtlasEntity database, Table hiveTable) throws AtlasHookException {
+ return toTableEntity(database, hiveTable, null);
}
- private Referenceable createOrUpdateTableInstance(Referenceable dbReference, Referenceable tableReference,
- final Table hiveTable) throws AtlasHookException {
- LOG.info("Importing objects from {}.{}", hiveTable.getDbName(), hiveTable.getTableName());
-
- if (tableReference == null) {
- tableReference = new Referenceable(HiveDataTypes.HIVE_TABLE.getName());
- }
-
- String tableQualifiedName = getTableQualifiedName(clusterName, hiveTable);
- tableReference.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, tableQualifiedName);
- tableReference.set(AtlasClient.NAME, hiveTable.getTableName().toLowerCase());
- tableReference.set(AtlasClient.OWNER, hiveTable.getOwner());
-
- Date createDate = new Date();
- if (hiveTable.getTTable() != null){
- try {
- createDate = getTableCreatedTime(hiveTable);
- LOG.debug("Setting create time to {} ", createDate);
- tableReference.set(CREATE_TIME, createDate);
- } catch(Exception ne) {
- LOG.error("Error while setting createTime for the table {} ", hiveTable.getCompleteName(), ne);
- }
+ private AtlasEntityWithExtInfo toTableEntity(AtlasEntity database, final Table hiveTable, AtlasEntityWithExtInfo table) throws AtlasHookException {
+ if (table == null) {
+ table = new AtlasEntityWithExtInfo(new AtlasEntity(HiveDataTypes.HIVE_TABLE.getName()));
}
- Date lastAccessTime = createDate;
- if ( hiveTable.getLastAccessTime() > 0) {
- lastAccessTime = new Date(hiveTable.getLastAccessTime() * MILLIS_CONVERT_FACTOR);
- }
- tableReference.set(LAST_ACCESS_TIME, lastAccessTime);
- tableReference.set("retention", hiveTable.getRetention());
+ AtlasEntity tableEntity = table.getEntity();
+ String tableQualifiedName = getTableQualifiedName(clusterName, hiveTable);
+ long createTime = BaseHiveEvent.getTableCreateTime(hiveTable);
+ long lastAccessTime = hiveTable.getLastAccessTime() > 0 ? hiveTable.getLastAccessTime() : createTime;
- tableReference.set(COMMENT, hiveTable.getParameters().get(COMMENT));
+ tableEntity.setAttribute(ATTRIBUTE_DB, BaseHiveEvent.getObjectId(database));
+ tableEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, tableQualifiedName);
+ tableEntity.setAttribute(ATTRIBUTE_NAME, hiveTable.getTableName().toLowerCase());
+ tableEntity.setAttribute(ATTRIBUTE_OWNER, hiveTable.getOwner());
- // add reference to the database
- tableReference.set(DB, dbReference);
-
- // add reference to the StorageDescriptor
- Referenceable sdReferenceable = fillStorageDesc(hiveTable.getSd(), tableQualifiedName, getStorageDescQFName(tableQualifiedName), tableReference.getId());
- tableReference.set(STORAGE_DESC, sdReferenceable);
-
- tableReference.set(PARAMETERS, hiveTable.getParameters());
+ tableEntity.setAttribute(ATTRIBUTE_CREATE_TIME, createTime);
+ tableEntity.setAttribute(ATTRIBUTE_LAST_ACCESS_TIME, lastAccessTime);
+ tableEntity.setAttribute(ATTRIBUTE_RETENTION, hiveTable.getRetention());
+ tableEntity.setAttribute(ATTRIBUTE_PARAMETERS, hiveTable.getParameters());
+ tableEntity.setAttribute(ATTRIBUTE_COMMENT, hiveTable.getParameters().get(ATTRIBUTE_COMMENT));
+ tableEntity.setAttribute(ATTRIBUTE_TABLE_TYPE, hiveTable.getTableType().name());
+ tableEntity.setAttribute(ATTRIBUTE_TEMPORARY, hiveTable.isTemporary());
if (hiveTable.getViewOriginalText() != null) {
- tableReference.set("viewOriginalText", hiveTable.getViewOriginalText());
+ tableEntity.setAttribute(ATTRIBUTE_VIEW_ORIGINAL_TEXT, hiveTable.getViewOriginalText());
}
if (hiveTable.getViewExpandedText() != null) {
- tableReference.set("viewExpandedText", hiveTable.getViewExpandedText());
+ tableEntity.setAttribute(ATTRIBUTE_VIEW_EXPANDED_TEXT, hiveTable.getViewExpandedText());
}
- tableReference.set(TABLE_TYPE_ATTR, hiveTable.getTableType().name());
- tableReference.set("temporary", hiveTable.isTemporary());
+ AtlasEntity sdEntity = toStroageDescEntity(hiveTable.getSd(), tableQualifiedName, getStorageDescQFName(tableQualifiedName), BaseHiveEvent.getObjectId(tableEntity));
+ List<AtlasEntity> partKeys = toColumns(hiveTable.getPartitionKeys(), tableEntity);
+ List<AtlasEntity> columns = toColumns(hiveTable.getCols(), tableEntity);
- // add reference to the Partition Keys
- List<Referenceable> partKeys = getColumns(hiveTable.getPartitionKeys(), tableReference);
- tableReference.set("partitionKeys", partKeys);
+ tableEntity.setAttribute(ATTRIBUTE_STORAGEDESC, BaseHiveEvent.getObjectId(sdEntity));
+ tableEntity.setAttribute(ATTRIBUTE_PARTITION_KEYS, BaseHiveEvent.getObjectIds(partKeys));
+ tableEntity.setAttribute(ATTRIBUTE_COLUMNS, BaseHiveEvent.getObjectIds(columns));
- tableReference.set(COLUMNS, getColumns(hiveTable.getCols(), tableReference));
+ if (MapUtils.isNotEmpty(table.getReferredEntities())) {
+ table.getReferredEntities().clear();
+ }
- return tableReference;
- }
+ table.addReferredEntity(database);
+ table.addReferredEntity(sdEntity);
- public static String getStorageDescQFName(String entityQualifiedName) {
- return entityQualifiedName + "_storage";
- }
+ if (partKeys != null) {
+ for (AtlasEntity partKey : partKeys) {
+ table.addReferredEntity(partKey);
+ }
+ }
- private Referenceable registerTable(Referenceable dbReference, Table table) throws AtlasHookException {
- try {
- String dbName = table.getDbName();
- String tableName = table.getTableName();
- LOG.info("Attempting to register table [{}]", tableName);
- Referenceable tableReference = getTableReference(table);
- LOG.info("Found result {}", tableReference);
- if (tableReference == null) {
- tableReference = createTableInstance(dbReference, table);
- tableReference = registerInstance(tableReference);
- } else {
- LOG.info("Table {}.{} is already registered with id {}. Updating entity.", dbName, tableName,
- tableReference.getId().getId());
- tableReference = createOrUpdateTableInstance(dbReference, tableReference, table);
- updateInstance(tableReference);
+ if (columns != null) {
+ for (AtlasEntity column : columns) {
+ table.addReferredEntity(column);
}
- return tableReference;
- } catch (Exception e) {
- throw new AtlasHookException("HiveMetaStoreBridge.getStorageDescQFName() failed.", e);
}
+
+ return table;
}
- private void updateInstance(Referenceable referenceable) throws AtlasServiceException {
- String typeName = referenceable.getTypeName();
- LOG.debug("updating instance of type {}", typeName);
+ private AtlasEntity toStroageDescEntity(StorageDescriptor storageDesc, String tableQualifiedName, String sdQualifiedName, AtlasObjectId tableId ) throws AtlasHookException {
+ AtlasEntity ret = new AtlasEntity(HiveDataTypes.HIVE_STORAGEDESC.getName());
- String entityJSON = AtlasType.toV1Json(referenceable);
- LOG.debug("Updating entity {} = {}", referenceable.getTypeName(), entityJSON);
+ ret.setAttribute(ATTRIBUTE_TABLE, tableId);
+ ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, sdQualifiedName);
+ ret.setAttribute(ATTRIBUTE_PARAMETERS, storageDesc.getParameters());
+ ret.setAttribute(ATTRIBUTE_LOCATION, hdfsNameServiceResolver.getPathWithNameServiceID(storageDesc.getLocation()));
+ ret.setAttribute(ATTRIBUTE_INPUT_FORMAT, storageDesc.getInputFormat());
+ ret.setAttribute(ATTRIBUTE_OUTPUT_FORMAT, storageDesc.getOutputFormat());
+ ret.setAttribute(ATTRIBUTE_COMPRESSED, storageDesc.isCompressed());
+ ret.setAttribute(ATTRIBUTE_NUM_BUCKETS, storageDesc.getNumBuckets());
+ ret.setAttribute(ATTRIBUTE_STORED_AS_SUB_DIRECTORIES, storageDesc.isStoredAsSubDirectories());
- atlasClient.updateEntity(referenceable.getId().getId(), referenceable);
- }
+ if (storageDesc.getBucketCols().size() > 0) {
+ ret.setAttribute(ATTRIBUTE_BUCKET_COLS, storageDesc.getBucketCols());
+ }
- public Referenceable fillStorageDesc(StorageDescriptor storageDesc, String tableQualifiedName,
- String sdQualifiedName, Id tableId) throws AtlasHookException {
- LOG.debug("Filling storage descriptor information for {}", storageDesc);
+ if (storageDesc.getSerdeInfo() != null) {
+ SerDeInfo serdeInfo = storageDesc.getSerdeInfo();
- Referenceable sdReferenceable = new Referenceable(HiveDataTypes.HIVE_STORAGEDESC.getName());
- sdReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, sdQualifiedName);
+ LOG.debug("serdeInfo = {}", serdeInfo);
+ // SkewedInfo skewedInfo = storageDesc.getSkewedInfo();
- SerDeInfo serdeInfo = storageDesc.getSerdeInfo();
- LOG.debug("serdeInfo = {}", serdeInfo);
- // SkewedInfo skewedInfo = storageDesc.getSkewedInfo();
+ AtlasStruct serdeInfoStruct = new AtlasStruct(HiveDataTypes.HIVE_SERDE.getName());
- String serdeInfoName = HiveDataTypes.HIVE_SERDE.getName();
- Struct serdeInfoStruct = new Struct(serdeInfoName);
+ serdeInfoStruct.setAttribute(ATTRIBUTE_NAME, serdeInfo.getName());
+ serdeInfoStruct.setAttribute(ATTRIBUTE_SERIALIZATION_LIB, serdeInfo.getSerializationLib());
+ serdeInfoStruct.setAttribute(ATTRIBUTE_PARAMETERS, serdeInfo.getParameters());
- serdeInfoStruct.set(AtlasClient.NAME, serdeInfo.getName());
- serdeInfoStruct.set("serializationLib", serdeInfo.getSerializationLib());
- serdeInfoStruct.set(PARAMETERS, serdeInfo.getParameters());
+ ret.setAttribute(ATTRIBUTE_SERDE_INFO, serdeInfoStruct);
+ }
- sdReferenceable.set("serdeInfo", serdeInfoStruct);
- sdReferenceable.set(STORAGE_NUM_BUCKETS, storageDesc.getNumBuckets());
- sdReferenceable
- .set(STORAGE_IS_STORED_AS_SUB_DIRS, storageDesc.isStoredAsSubDirectories());
+ if (CollectionUtils.isNotEmpty(storageDesc.getSortCols())) {
+ List<AtlasStruct> sortColsStruct = new ArrayList<>();
- List<Struct> sortColsStruct = new ArrayList<>();
- for (Order sortcol : storageDesc.getSortCols()) {
- String hiveOrderName = HiveDataTypes.HIVE_ORDER.getName();
- Struct colStruct = new Struct(hiveOrderName);
- colStruct.set("col", sortcol.getCol());
- colStruct.set("order", sortcol.getOrder());
+ for (Order sortcol : storageDesc.getSortCols()) {
+ String hiveOrderName = HiveDataTypes.HIVE_ORDER.getName();
+ AtlasStruct colStruct = new AtlasStruct(hiveOrderName);
+ colStruct.setAttribute("col", sortcol.getCol());
+ colStruct.setAttribute("order", sortcol.getOrder());
- sortColsStruct.add(colStruct);
- }
- if (sortColsStruct.size() > 0) {
- sdReferenceable.set("sortCols", sortColsStruct);
+ sortColsStruct.add(colStruct);
+ }
+
+ ret.setAttribute(ATTRIBUTE_SORT_COLS, sortColsStruct);
}
- sdReferenceable.set(LOCATION, hdfsNameServiceResolver.getPathWithNameServiceID(storageDesc.getLocation()));
- sdReferenceable.set("inputFormat", storageDesc.getInputFormat());
- sdReferenceable.set("outputFormat", storageDesc.getOutputFormat());
- sdReferenceable.set("compressed", storageDesc.isCompressed());
+ return ret;
+ }
- if (storageDesc.getBucketCols().size() > 0) {
- sdReferenceable.set("bucketCols", storageDesc.getBucketCols());
- }
+ private List<AtlasEntity> toColumns(List<FieldSchema> schemaList, AtlasEntity table) throws AtlasHookException {
+ List<AtlasEntity> ret = new ArrayList<>();
- sdReferenceable.set(PARAMETERS, storageDesc.getParameters());
- sdReferenceable.set("storedAsSubDirectories", storageDesc.isStoredAsSubDirectories());
- sdReferenceable.set(TABLE, tableId);
+ int columnPosition = 0;
+ for (FieldSchema fs : schemaList) {
+ LOG.debug("Processing field {}", fs);
- return sdReferenceable;
- }
+ AtlasEntity column = new AtlasEntity(HiveDataTypes.HIVE_COLUMN.getName());
+
+ column.setAttribute(ATTRIBUTE_TABLE, BaseHiveEvent.getObjectId(table));
+ column.setAttribute(ATTRIBUTE_QUALIFIED_NAME, getColumnQualifiedName((String) table.getAttribute(ATTRIBUTE_QUALIFIED_NAME), fs.getName()));
+ column.setAttribute(ATTRIBUTE_NAME, fs.getName());
+ column.setAttribute(ATTRIBUTE_OWNER, table.getAttribute(ATTRIBUTE_OWNER));
+ column.setAttribute(ATTRIBUTE_COL_TYPE, fs.getType());
+ column.setAttribute(ATTRIBUTE_COL_POSITION, columnPosition++);
+ column.setAttribute(ATTRIBUTE_COMMENT, fs.getComment());
- public Referenceable fillHDFSDataSet(String pathUri) {
- Referenceable ref = new Referenceable(HDFS_PATH);
+ ret.add(column);
+ }
+ return ret;
+ }
- // Get the name service ID for the given HDFS path
- String nameServiceID = hdfsNameServiceResolver.getNameServiceIDForPath(pathUri);
+ private AtlasEntity toHdfsPathEntity(String pathUri) {
+ AtlasEntity ret = new AtlasEntity(HDFS_PATH);
+ String nameServiceID = hdfsNameServiceResolver.getNameServiceIDForPath(pathUri);
+ Path path = new Path(pathUri);
- Path path = new Path(pathUri);
- ref.set(AtlasClient.NAME, Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase());
+ ret.setAttribute(ATTRIBUTE_NAME, Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase());
+ ret.setAttribute(ATTRIBUTE_CLUSTER_NAME, clusterName);
if (StringUtils.isNotEmpty(nameServiceID)) {
- // Name service resolution is successful, now get updated HDFS path where the host port info is replaced by
- // resolved name service
+ // Name service resolution is successful, now get updated HDFS path where the host port info is replaced by resolved name service
String updatedHdfsPath = hdfsNameServiceResolver.getPathWithNameServiceID(pathUri);
- ref.set("path", updatedHdfsPath);
- ref.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getHdfsPathQualifiedName(clusterName, updatedHdfsPath));
- // Only set name service if it was resolved
- ref.set("nameServiceId", nameServiceID);
+
+ ret.setAttribute(ATTRIBUTE_PATH, updatedHdfsPath);
+ ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, getHdfsPathQualifiedName(updatedHdfsPath));
+ ret.setAttribute(ATTRIBUTE_NAMESERVICE_ID, nameServiceID);
} else {
- ref.set("path", pathUri);
+ ret.setAttribute(ATTRIBUTE_PATH, pathUri);
+
// Only append clusterName for the HDFS path
if (pathUri.startsWith(HdfsNameServiceResolver.HDFS_SCHEME)) {
- ref.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getHdfsPathQualifiedName(clusterName, pathUri));
+ ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, getHdfsPathQualifiedName(pathUri));
} else {
- ref.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, pathUri);
+ ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, pathUri);
}
}
- ref.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, clusterName);
- return ref;
+
+ return ret;
}
- public static String getColumnQualifiedName(final String tableQualifiedName, final String colName) {
- final String[] parts = tableQualifiedName.split("@");
- final String tableName = parts[0];
- final String clusterName = parts[1];
- return String.format("%s.%s@%s", tableName, colName.toLowerCase(), clusterName);
+ /**
+ * Gets the atlas entity for the database
+ * @param databaseName database Name
+ * @param clusterName cluster name
+ * @return AtlasEntity for database if exists, else null
+ * @throws Exception
+ */
+ private AtlasEntityWithExtInfo findDatabase(String clusterName, String databaseName) throws Exception {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Searching Atlas for database {}", databaseName);
+ }
+
+ String typeName = HiveDataTypes.HIVE_DB.getName();
+
+ return findEntity(typeName, getDBQualifiedName(clusterName, databaseName));
}
- public List<Referenceable> getColumns(List<FieldSchema> schemaList, Referenceable tableReference) throws AtlasHookException {
- List<Referenceable> colList = new ArrayList<>();
- int columnPosition = 0;
- for (FieldSchema fs : schemaList) {
- LOG.debug("Processing field {}", fs);
- Referenceable colReferenceable = new Referenceable(HiveDataTypes.HIVE_COLUMN.getName());
- colReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
- getColumnQualifiedName((String) tableReference.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME), fs.getName()));
- colReferenceable.set(AtlasClient.NAME, fs.getName());
- colReferenceable.set(AtlasClient.OWNER, tableReference.get(AtlasClient.OWNER));
- colReferenceable.set("type", fs.getType());
- colReferenceable.set(POSITION, columnPosition++);
- colReferenceable.set(COMMENT, fs.getComment());
- colReferenceable.set(TABLE, tableReference.getId());
-
-
- colList.add(colReferenceable);
+ /**
+ * Gets Atlas Entity for the table
+ *
+ * @param hiveTable
+ * @return table entity from Atlas if exists, else null
+ * @throws Exception
+ */
+ private AtlasEntityWithExtInfo findTableEntity(Table hiveTable) throws Exception {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Searching Atlas for table {}.{}", hiveTable.getDbName(), hiveTable.getTableName());
}
- return colList;
+
+ String typeName = HiveDataTypes.HIVE_TABLE.getName();
+ String tblQualifiedName = getTableQualifiedName(getClusterName(), hiveTable.getDbName(), hiveTable.getTableName());
+
+ return findEntity(typeName, tblQualifiedName);
}
- public static String getHdfsPathQualifiedName(String clusterName, String hdfsPath) {
- return String.format("%s@%s", hdfsPath, clusterName);
+ private AtlasEntityWithExtInfo findProcessEntity(String qualifiedName) throws Exception{
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Searching Atlas for process {}", qualifiedName);
+ }
+
+ String typeName = HiveDataTypes.HIVE_PROCESS.getName();
+
+ return findEntity(typeName, qualifiedName);
}
+ private AtlasEntityWithExtInfo findEntity(final String typeName, final String qualifiedName) throws AtlasServiceException {
+ AtlasClientV2 atlasClientV2 = getAtlasClient();
- public static void main(String[] args) throws AtlasHookException {
try {
- Configuration atlasConf = ApplicationProperties.get();
- String[] atlasEndpoint = atlasConf.getStringArray(ATLAS_ENDPOINT);
- if (atlasEndpoint == null || atlasEndpoint.length == 0){
- atlasEndpoint = new String[] { DEFAULT_DGI_URL };
- }
- AtlasClient atlasClient;
+ return atlasClientV2.getEntityByAttribute(typeName, Collections.singletonMap(ATTRIBUTE_QUALIFIED_NAME, qualifiedName));
+ } catch (AtlasServiceException e) {
+ if(e.getStatus() == ClientResponse.Status.NOT_FOUND) {
+ return null;
+ }
- if (!AuthenticationUtil.isKerberosAuthenticationEnabled()) {
- String[] basicAuthUsernamePassword = AuthenticationUtil.getBasicAuthenticationInput();
- atlasClient = new AtlasClient(atlasEndpoint, basicAuthUsernamePassword);
- } else {
- UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
- atlasClient = new AtlasClient(ugi, ugi.getShortUserName(), atlasEndpoint);
+ throw e;
}
+ }
- Options options = new Options();
- CommandLineParser parser = new BasicParser();
- CommandLine cmd = parser.parse( options, args);
+ private String getCreateTableString(Table table, String location){
+ String colString = "";
+ List<FieldSchema> colList = table.getAllCols();
- boolean failOnError = false;
- if (cmd.hasOption("failOnError")) {
- failOnError = true;
+ if (colList != null) {
+ for (FieldSchema col : colList) {
+ colString += col.getName() + " " + col.getType() + ",";
+ }
+
+ if (colList.size() > 0) {
+ colString = colString.substring(0, colString.length() - 1);
+ colString = "(" + colString + ")";
+ }
}
- HiveMetaStoreBridge hiveMetaStoreBridge = new HiveMetaStoreBridge(atlasConf, new HiveConf(), atlasClient);
- hiveMetaStoreBridge.importHiveMetadata(failOnError);
+ String query = "create external table " + table.getTableName() + colString + " location '" + location + "'";
+
+ return query;
+ }
+
+ private String lower(String str) {
+ if (StringUtils.isEmpty(str)) {
+ return "";
}
- catch(Exception e) {
- throw new AtlasHookException("HiveMetaStoreBridge.main() failed.", e);
+
+ return str.toLowerCase().trim();
+ }
+
+
+ /**
+ * Construct the qualified name used to uniquely identify a Table instance in Atlas.
+ * @param clusterName Name of the cluster to which the Hive component belongs
+ * @param table hive table for which the qualified name is needed
+ * @return Unique qualified name to identify the Table instance in Atlas.
+ */
+ private static String getTableQualifiedName(String clusterName, Table table) {
+ return getTableQualifiedName(clusterName, table.getDbName(), table.getTableName(), table.isTemporary());
+ }
+
+ private String getHdfsPathQualifiedName(String hdfsPath) {
+ return String.format("%s@%s", hdfsPath, clusterName);
+ }
+
+ /**
+ * Construct the qualified name used to uniquely identify a Database instance in Atlas.
+ * @param clusterName Name of the cluster to which the Hive component belongs
+ * @param dbName Name of the Hive database
+ * @return Unique qualified name to identify the Database instance in Atlas.
+ */
+ public static String getDBQualifiedName(String clusterName, String dbName) {
+ return String.format("%s@%s", dbName.toLowerCase(), clusterName);
+ }
+
+ /**
+ * Construct the qualified name used to uniquely identify a Table instance in Atlas.
+ * @param clusterName Name of the cluster to which the Hive component belongs
+ * @param dbName Name of the Hive database to which the Table belongs
+ * @param tableName Name of the Hive table
+ * @param isTemporaryTable is this a temporary table
+ * @return Unique qualified name to identify the Table instance in Atlas.
+ */
+ public static String getTableQualifiedName(String clusterName, String dbName, String tableName, boolean isTemporaryTable) {
+ String tableTempName = tableName;
+
+ if (isTemporaryTable) {
+ if (SessionState.get() != null && SessionState.get().getSessionId() != null) {
+ tableTempName = tableName + TEMP_TABLE_PREFIX + SessionState.get().getSessionId();
+ } else {
+ tableTempName = tableName + TEMP_TABLE_PREFIX + RandomStringUtils.random(10);
+ }
}
+
+ return String.format("%s.%s@%s", dbName.toLowerCase(), tableTempName.toLowerCase(), clusterName);
+ }
+
+ public static String getTableProcessQualifiedName(String clusterName, Table table) {
+ String tableQualifiedName = getTableQualifiedName(clusterName, table);
+ long createdTime = getTableCreatedTime(table);
+
+ return tableQualifiedName + SEP + createdTime;
+ }
+
+
+ /**
+ * Construct the qualified name used to uniquely identify a Table instance in Atlas.
+ * @param clusterName Name of the cluster to which the Hive component belongs
+ * @param dbName Name of the Hive database to which the Table belongs
+ * @param tableName Name of the Hive table
+ * @return Unique qualified name to identify the Table instance in Atlas.
+ */
+ public static String getTableQualifiedName(String clusterName, String dbName, String tableName) {
+ return getTableQualifiedName(clusterName, dbName, tableName, false);
+ }
+ public static String getStorageDescQFName(String tableQualifiedName) {
+ return tableQualifiedName + "_storage";
+ }
+
+ public static String getColumnQualifiedName(final String tableQualifiedName, final String colName) {
+ final String[] parts = tableQualifiedName.split("@");
+ final String tableName = parts[0];
+ final String clusterName = parts[1];
+
+ return String.format("%s.%s@%s", tableName, colName.toLowerCase(), clusterName);
+ }
+
+ public static long getTableCreatedTime(Table table) {
+ return table.getTTable().getCreateTime() * MILLIS_CONVERT_FACTOR;
}
}
http://git-wip-us.apache.org/repos/asf/atlas/blob/6e02ec5b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java
new file mode 100644
index 0000000..9105ebe
--- /dev/null
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.hive.hook;
+
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.hadoop.hive.ql.hooks.HookContext;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.plan.HiveOperation;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+
+public class AtlasHiveHookContext {
+ private final HiveHook hook;
+ private final HiveOperation hiveOperation;
+ private final HookContext hiveContext;
+ private final Hive hive;
+ private final Map<String, AtlasEntity> qNameEntityMap = new HashMap<>();
+
+ public AtlasHiveHookContext(HiveHook hook, HiveOperation hiveOperation, HookContext hiveContext) throws Exception {
+ this.hook = hook;
+ this.hiveOperation = hiveOperation;
+ this.hiveContext = hiveContext;
+ this.hive = Hive.get(hiveContext.getConf());
+ }
+
+ public HookContext getHiveContext() {
+ return hiveContext;
+ }
+
+ public Hive getHive() {
+ return hive;
+ }
+
+ public HiveOperation getHiveOperation() {
+ return hiveOperation;
+ }
+
+ public void putEntity(String qualifiedName, AtlasEntity entity) {
+ qNameEntityMap.put(qualifiedName, entity);
+ }
+
+ public AtlasEntity getEntity(String qualifiedName) {
+ return qNameEntityMap.get(qualifiedName);
+ }
+
+ public Collection<AtlasEntity> getEntities() { return qNameEntityMap.values(); }
+
+
+ public String getClusterName() {
+ return hook.getClusterName();
+ }
+
+ public boolean isKnownDatabase(String dbQualifiedName) {
+ return hook.isKnownDatabase(dbQualifiedName);
+ }
+
+ public boolean isKnownTable(String tblQualifiedName) {
+ return hook.isKnownTable(tblQualifiedName);
+ }
+
+ public void addToKnownEntities(Collection<AtlasEntity> entities) {
+ hook.addToKnownEntities(entities);
+ }
+
+ public void removeFromKnownDatabase(String dbQualifiedName) {
+ hook.removeFromKnownDatabase(dbQualifiedName);
+ }
+
+ public void removeFromKnownTable(String tblQualifiedName) {
+ hook.removeFromKnownTable(tblQualifiedName);
+ }
+}