You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by sa...@apache.org on 2019/01/04 21:59:21 UTC
atlas git commit: ATLAS-3006: Option to ignore/prune metadata for
temporary/staging Hive tables
Repository: atlas
Updated Branches:
refs/heads/branch-0.8 a48249e0b -> a35379c1d
ATLAS-3006: Option to ignore/prune metadata for temporary/staging Hive tables
Signed-off-by: Sarath Subramanian <ss...@hortonworks.com>
Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/a35379c1
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/a35379c1
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/a35379c1
Branch: refs/heads/branch-0.8
Commit: a35379c1deee6f11396e00916400ec9f475ee8a3
Parents: a48249e
Author: Madhan Neethiraj <ma...@apache.org>
Authored: Fri Jan 4 13:58:58 2019 -0800
Committer: Sarath Subramanian <ss...@hortonworks.com>
Committed: Fri Jan 4 13:58:58 2019 -0800
----------------------------------------------------------------------
.../atlas/hive/hook/AtlasHiveHookContext.java | 5 +
.../org/apache/atlas/hive/hook/HiveHook.java | 87 ++++++-
.../hive/hook/events/AlterTableRename.java | 9 +-
.../atlas/hive/hook/events/BaseHiveEvent.java | 119 +++++----
.../atlas/hive/hook/events/CreateTable.java | 2 +-
.../notification/NotificationHookConsumer.java | 208 ++++++++++++---
.../preprocessor/EntityPreprocessor.java | 126 +++++++++
.../preprocessor/HivePreprocessor.java | 253 +++++++++++++++++++
.../preprocessor/PreprocessorContext.java | 229 +++++++++++++++++
9 files changed, 943 insertions(+), 95 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/atlas/blob/a35379c1/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java
index 23cb853..249f48b 100644
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java
@@ -20,6 +20,7 @@ package org.apache.atlas.hive.hook;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.hive.hook.HiveHook.HiveHookObjectNamesCache;
+import org.apache.atlas.hive.hook.HiveHook.PreprocessAction;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.hadoop.hive.metastore.api.Database;
@@ -97,6 +98,10 @@ public class AtlasHiveHookContext {
return hook.getSkipHiveColumnLineageHive20633InputsThreshold();
}
+ public PreprocessAction getPreprocessActionForHiveTable(String qualifiedName) {
+ return hook.getPreprocessActionForHiveTable(qualifiedName);
+ }
+
public String getQualifiedName(Database db) {
return (db.getName() + QNAME_SEP_CLUSTER_NAME).toLowerCase() + getClusterName();
}
http://git-wip-us.apache.org/repos/asf/atlas/blob/a35379c1/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
index 7b60553..4a6b417 100644
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
@@ -21,6 +21,8 @@ package org.apache.atlas.hive.hook;
import org.apache.atlas.hive.hook.events.*;
import org.apache.atlas.hook.AtlasHook;
import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.utils.LruCache;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext;
import org.apache.hadoop.hive.ql.hooks.HookContext;
@@ -30,12 +32,15 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.regex.Pattern;
import static org.apache.atlas.hive.hook.events.BaseHiveEvent.ATTRIBUTE_QUALIFIED_NAME;
import static org.apache.atlas.hive.hook.events.BaseHiveEvent.HIVE_TYPE_DB;
@@ -45,6 +50,8 @@ import static org.apache.atlas.hive.hook.events.BaseHiveEvent.HIVE_TYPE_TABLE;
public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
private static final Logger LOG = LoggerFactory.getLogger(HiveHook.class);
+ public enum PreprocessAction { NONE, IGNORE, PRUNE }
+
public static final String CONF_PREFIX = "atlas.hook.hive.";
public static final String CONF_CLUSTER_NAME = "atlas.cluster.name";
public static final String HDFS_PATH_CONVERT_TO_LOWER_CASE = CONF_PREFIX + "hdfs_path.convert_to_lowercase";
@@ -54,6 +61,9 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
public static final String HOOK_NAME_CACHE_REBUID_INTERVAL_SEC = CONF_PREFIX + "name.cache.rebuild.interval.seconds";
public static final String HOOK_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633 = CONF_PREFIX + "skip.hive_column_lineage.hive-20633";
public static final String HOOK_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD = CONF_PREFIX + "skip.hive_column_lineage.hive-20633.inputs.threshold";
+ public static final String HOOK_HIVE_TABLE_IGNORE_PATTERN = CONF_PREFIX + "hive_table.ignore.pattern";
+ public static final String HOOK_HIVE_TABLE_PRUNE_PATTERN = CONF_PREFIX + "hive_table.prune.pattern";
+ public static final String HOOK_HIVE_TABLE_CACHE_SIZE = CONF_PREFIX + "hive_table.cache.size";
public static final String DEFAULT_CLUSTER_NAME = "primary";
@@ -66,8 +76,11 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
private static final int nameCacheTableMaxCount;
private static final int nameCacheRebuildIntervalSeconds;
- private static final boolean skipHiveColumnLineageHive20633;
- private static final int skipHiveColumnLineageHive20633InputsThreshold;
+ private static final boolean skipHiveColumnLineageHive20633;
+ private static final int skipHiveColumnLineageHive20633InputsThreshold;
+ private static final List<Pattern> hiveTablesToIgnore = new ArrayList<>();
+ private static final List<Pattern> hiveTablesToPrune = new ArrayList<>();
+ private static final Map<String, PreprocessAction> hiveTablesCache;
private static HiveHookObjectNamesCache knownObjects = null;
@@ -85,6 +98,41 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
skipHiveColumnLineageHive20633 = atlasProperties.getBoolean(HOOK_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, true);
skipHiveColumnLineageHive20633InputsThreshold = atlasProperties.getInt(HOOK_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, 15); // skip if avg # of inputs is > 15
+ String[] patternHiveTablesToIgnore = atlasProperties.getStringArray(HOOK_HIVE_TABLE_IGNORE_PATTERN);
+ String[] patternHiveTablesToPrune = atlasProperties.getStringArray(HOOK_HIVE_TABLE_PRUNE_PATTERN);
+
+ if (patternHiveTablesToIgnore != null) {
+ for (String pattern : patternHiveTablesToIgnore) {
+ try {
+ hiveTablesToIgnore.add(Pattern.compile(pattern));
+
+ LOG.info("{}={}", HOOK_HIVE_TABLE_IGNORE_PATTERN, pattern);
+ } catch (Throwable t) {
+ LOG.warn("failed to compile pattern {}", pattern, t);
+ LOG.warn("Ignoring invalid pattern in configuration {}: {}", HOOK_HIVE_TABLE_IGNORE_PATTERN, pattern);
+ }
+ }
+ }
+
+ if (patternHiveTablesToPrune != null) {
+ for (String pattern : patternHiveTablesToPrune) {
+ try {
+ hiveTablesToPrune.add(Pattern.compile(pattern));
+
+ LOG.info("{}={}", HOOK_HIVE_TABLE_PRUNE_PATTERN, pattern);
+ } catch (Throwable t) {
+ LOG.warn("failed to compile pattern {}", pattern, t);
+ LOG.warn("Ignoring invalid pattern in configuration {}: {}", HOOK_HIVE_TABLE_PRUNE_PATTERN, pattern);
+ }
+ }
+ }
+
+ if (!hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty()) {
+ hiveTablesCache = new LruCache<>(atlasProperties.getInt(HOOK_HIVE_TABLE_CACHE_SIZE, 10000), 0);
+ } else {
+ hiveTablesCache = Collections.emptyMap();
+ }
+
knownObjects = nameCacheEnabled ? new HiveHookObjectNamesCache(nameCacheDatabaseMaxCount, nameCacheTableMaxCount, nameCacheRebuildIntervalSeconds) : null;
}
@@ -204,6 +252,41 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
return skipHiveColumnLineageHive20633InputsThreshold;
}
+ public PreprocessAction getPreprocessActionForHiveTable(String qualifiedName) {
+ PreprocessAction ret = PreprocessAction.NONE;
+
+ if (qualifiedName != null && (CollectionUtils.isNotEmpty(hiveTablesToIgnore) || CollectionUtils.isNotEmpty(hiveTablesToPrune))) {
+ ret = hiveTablesCache.get(qualifiedName);
+
+ if (ret == null) {
+ if (isMatch(qualifiedName, hiveTablesToIgnore)) {
+ ret = PreprocessAction.IGNORE;
+ } else if (isMatch(qualifiedName, hiveTablesToPrune)) {
+ ret = PreprocessAction.PRUNE;
+ } else {
+ ret = PreprocessAction.NONE;
+ }
+
+ hiveTablesCache.put(qualifiedName, ret);
+ }
+ }
+
+ return ret;
+ }
+
+ private boolean isMatch(String name, List<Pattern> patterns) {
+ boolean ret = false;
+
+ for (Pattern p : patterns) {
+ if (p.matcher(name).matches()) {
+ ret = true;
+
+ break;
+ }
+ }
+
+ return ret;
+ }
public static class HiveHookObjectNamesCache {
private final int dbMaxCacheCount;
http://git-wip-us.apache.org/repos/asf/atlas/blob/a35379c1/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/AlterTableRename.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/AlterTableRename.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/AlterTableRename.java
index 6ced340..f4d7a82 100644
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/AlterTableRename.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/AlterTableRename.java
@@ -85,13 +85,16 @@ public class AlterTableRename extends BaseHiveEvent {
return ret;
}
- AtlasEntityWithExtInfo oldTableEntity = toTableEntity(oldTable);
+ AtlasEntityWithExtInfo oldTableEntity = toTableEntity(oldTable);
+ AtlasEntityWithExtInfo renamedTableEntity = toTableEntity(newTable);
+
+ if (oldTableEntity == null || renamedTableEntity == null) {
+ return ret;
+ }
// first update with oldTable info, so that the table will be created if it is not present in Atlas
ret.add(new EntityUpdateRequestV2(getUserName(), new AtlasEntitiesWithExtInfo(oldTableEntity)));
- AtlasEntityWithExtInfo renamedTableEntity = toTableEntity(newTable);
-
// update qualifiedName for all columns, partitionKeys, storageDesc
String renamedTableQualifiedName = (String) renamedTableEntity.getEntity().getAttribute(ATTRIBUTE_QUALIFIED_NAME);
http://git-wip-us.apache.org/repos/asf/atlas/blob/a35379c1/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java
index 2d90a15..5c52cf4 100644
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java
@@ -19,6 +19,7 @@
package org.apache.atlas.hive.hook.events;
import org.apache.atlas.hive.hook.AtlasHiveHookContext;
+import org.apache.atlas.hive.hook.HiveHook.PreprocessAction;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
@@ -284,7 +285,11 @@ public abstract class BaseHiveEvent {
AtlasEntity entity = toTableEntity(table, ret);
- ret.setEntity(entity);
+ if (entity != null) {
+ ret.setEntity(entity);
+ } else {
+ ret = null;
+ }
return ret;
}
@@ -292,7 +297,9 @@ public abstract class BaseHiveEvent {
protected AtlasEntity toTableEntity(Table table, AtlasEntitiesWithExtInfo entities) throws Exception {
AtlasEntity ret = toTableEntity(table, (AtlasEntityExtInfo) entities);
- entities.addEntity(ret);
+ if (ret != null) {
+ entities.addEntity(ret);
+ }
return ret;
}
@@ -318,64 +325,76 @@ public abstract class BaseHiveEvent {
AtlasEntity ret = context.getEntity(tblQualifiedName);
if (ret == null) {
- ret = new AtlasEntity(HIVE_TYPE_TABLE);
+ PreprocessAction action = context.getPreprocessActionForHiveTable(tblQualifiedName);
- // if this table was sent in an earlier notification, set 'guid' to null - which will:
- // - result in this entity to be not included in 'referredEntities'
- // - cause Atlas server to resolve the entity by its qualifiedName
- if (isKnownTable && !isAlterTableOperation()) {
- ret.setGuid(null);
- }
+ if (action == PreprocessAction.IGNORE) {
+ LOG.info("ignoring table {}", tblQualifiedName);
+ } else {
+ ret = new AtlasEntity(HIVE_TYPE_TABLE);
- long createTime = getTableCreateTime(table);
- long lastAccessTime = table.getLastAccessTime() > 0 ? (table.getLastAccessTime() * MILLIS_CONVERT_FACTOR) : createTime;
-
- ret.setAttribute(ATTRIBUTE_DB, dbId);
- ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, tblQualifiedName);
- ret.setAttribute(ATTRIBUTE_NAME, table.getTableName().toLowerCase());
- ret.setAttribute(ATTRIBUTE_OWNER, table.getOwner());
- ret.setAttribute(ATTRIBUTE_CREATE_TIME, createTime);
- ret.setAttribute(ATTRIBUTE_LAST_ACCESS_TIME, lastAccessTime);
- ret.setAttribute(ATTRIBUTE_RETENTION, table.getRetention());
- ret.setAttribute(ATTRIBUTE_PARAMETERS, table.getParameters());
- ret.setAttribute(ATTRIBUTE_COMMENT, table.getParameters().get(ATTRIBUTE_COMMENT));
- ret.setAttribute(ATTRIBUTE_TABLE_TYPE, table.getTableType().name());
- ret.setAttribute(ATTRIBUTE_TEMPORARY, table.isTemporary());
-
- if (table.getViewOriginalText() != null) {
- ret.setAttribute(ATTRIBUTE_VIEW_ORIGINAL_TEXT, table.getViewOriginalText());
- }
+ // if this table was sent in an earlier notification, set 'guid' to null - which will:
+ // - result in this entity to be not included in 'referredEntities'
+ // - cause Atlas server to resolve the entity by its qualifiedName
+ if (isKnownTable && !isAlterTableOperation()) {
+ ret.setGuid(null);
+ }
- if (table.getViewExpandedText() != null) {
- ret.setAttribute(ATTRIBUTE_VIEW_EXPANDED_TEXT, table.getViewExpandedText());
- }
+ long createTime = getTableCreateTime(table);
+ long lastAccessTime = table.getLastAccessTime() > 0 ? (table.getLastAccessTime() * MILLIS_CONVERT_FACTOR) : createTime;
+
+ ret.setAttribute(ATTRIBUTE_DB, dbId);
+ ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, tblQualifiedName);
+ ret.setAttribute(ATTRIBUTE_NAME, table.getTableName().toLowerCase());
+ ret.setAttribute(ATTRIBUTE_OWNER, table.getOwner());
+ ret.setAttribute(ATTRIBUTE_CREATE_TIME, createTime);
+ ret.setAttribute(ATTRIBUTE_LAST_ACCESS_TIME, lastAccessTime);
+ ret.setAttribute(ATTRIBUTE_RETENTION, table.getRetention());
+ ret.setAttribute(ATTRIBUTE_PARAMETERS, table.getParameters());
+ ret.setAttribute(ATTRIBUTE_COMMENT, table.getParameters().get(ATTRIBUTE_COMMENT));
+ ret.setAttribute(ATTRIBUTE_TABLE_TYPE, table.getTableType().name());
+ ret.setAttribute(ATTRIBUTE_TEMPORARY, table.isTemporary());
+
+ if (table.getViewOriginalText() != null) {
+ ret.setAttribute(ATTRIBUTE_VIEW_ORIGINAL_TEXT, table.getViewOriginalText());
+ }
- AtlasObjectId tableId = getObjectId(ret);
- AtlasEntity sd = getStorageDescEntity(tableId, table);
- List<AtlasEntity> partitionKeys = getColumnEntities(tableId, table, table.getPartitionKeys());
- List<AtlasEntity> columns = getColumnEntities(tableId, table, table.getCols());
+ if (table.getViewExpandedText() != null) {
+ ret.setAttribute(ATTRIBUTE_VIEW_EXPANDED_TEXT, table.getViewExpandedText());
+ }
- if (entityExtInfo != null) {
- entityExtInfo.addReferredEntity(sd);
+ boolean pruneTable = table.isTemporary() || action == PreprocessAction.PRUNE;
- if (partitionKeys != null) {
- for (AtlasEntity partitionKey : partitionKeys) {
- entityExtInfo.addReferredEntity(partitionKey);
+ if (pruneTable) {
+ LOG.info("ignoring details of table {}", tblQualifiedName);
+ } else {
+ AtlasObjectId tableId = getObjectId(ret);
+ AtlasEntity sd = getStorageDescEntity(tableId, table);
+ List<AtlasEntity> partitionKeys = getColumnEntities(tableId, table, table.getPartitionKeys());
+ List<AtlasEntity> columns = getColumnEntities(tableId, table, table.getCols());
+
+ if (entityExtInfo != null) {
+ entityExtInfo.addReferredEntity(sd);
+
+ if (partitionKeys != null) {
+ for (AtlasEntity partitionKey : partitionKeys) {
+ entityExtInfo.addReferredEntity(partitionKey);
+ }
+ }
+
+ if (columns != null) {
+ for (AtlasEntity column : columns) {
+ entityExtInfo.addReferredEntity(column);
+ }
+ }
}
- }
- if (columns != null) {
- for (AtlasEntity column : columns) {
- entityExtInfo.addReferredEntity(column);
- }
+ ret.setAttribute(ATTRIBUTE_STORAGEDESC, getObjectId(sd));
+ ret.setAttribute(ATTRIBUTE_PARTITION_KEYS, getObjectIds(partitionKeys));
+ ret.setAttribute(ATTRIBUTE_COLUMNS, getObjectIds(columns));
}
- }
- ret.setAttribute(ATTRIBUTE_STORAGEDESC, getObjectId(sd));
- ret.setAttribute(ATTRIBUTE_PARTITION_KEYS, getObjectIds(partitionKeys));
- ret.setAttribute(ATTRIBUTE_COLUMNS, getObjectIds(columns));
-
- context.putEntity(tblQualifiedName, ret);
+ context.putEntity(tblQualifiedName, ret);
+ }
}
return ret;
http://git-wip-us.apache.org/repos/asf/atlas/blob/a35379c1/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateTable.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateTable.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateTable.java
index 2afaf9f..316222d 100644
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateTable.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateTable.java
@@ -81,7 +81,7 @@ public class CreateTable extends BaseHiveEvent {
if (table != null) {
AtlasEntity tblEntity = toTableEntity(table, ret);
- if (TableType.EXTERNAL_TABLE.equals(table.getTableType())) {
+ if (tblEntity != null && TableType.EXTERNAL_TABLE.equals(table.getTableType())) {
AtlasEntity hdfsPathEntity = getHDFSPathEntity(table.getDataLocation());
AtlasEntity processEntity = getHiveProcessEntity(Collections.singletonList(hdfsPathEntity), Collections.singletonList(tblEntity));
http://git-wip-us.apache.org/repos/asf/atlas/blob/a35379c1/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 2d2a6fb..dec6860 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -45,6 +45,9 @@ import org.apache.atlas.notification.hook.HookNotification.EntityPartialUpdateRe
import org.apache.atlas.notification.hook.HookNotification.EntityUpdateRequest;
import org.apache.atlas.notification.hook.HookNotification.EntityUpdateRequestV2;
import org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage;
+import org.apache.atlas.notification.preprocessor.EntityPreprocessor;
+import org.apache.atlas.notification.preprocessor.PreprocessorContext;
+import org.apache.atlas.notification.preprocessor.PreprocessorContext.PreprocessAction;
import org.apache.atlas.repository.converters.AtlasInstanceConverter;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.repository.store.graph.v1.AtlasEntityStream;
@@ -54,9 +57,11 @@ import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.utils.AtlasPerfTracer;
+import org.apache.atlas.utils.LruCache;
import org.apache.atlas.web.filters.AuditFilter;
import org.apache.atlas.web.service.ServiceState;
import org.apache.atlas.web.util.DateTimeHelper;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
import org.apache.kafka.common.TopicPartition;
@@ -68,16 +73,20 @@ import org.springframework.stereotype.Component;
import javax.inject.Inject;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.regex.Pattern;
import static org.apache.atlas.AtlasClientV2.API_V2.DELETE_ENTITY_BY_ATTRIBUTE;
import static org.apache.atlas.AtlasClientV2.API_V2.UPDATE_ENTITY;
@@ -113,19 +122,24 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
public static final String CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633 = "atlas.notification.consumer.skip.hive_column_lineage.hive-20633";
public static final String CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD = "atlas.notification.consumer.skip.hive_column_lineage.hive-20633.inputs.threshold";
-
-
-
- private final AtlasEntityStore atlasEntityStore;
- private final ServiceState serviceState;
- private final AtlasInstanceConverter instanceConverter;
- private final AtlasTypeRegistry typeRegistry;
- private final int maxRetries;
- private final int failedMsgCacheSize;
- private final boolean skipHiveColumnLineageHive20633;
- private final int skipHiveColumnLineageHive20633InputsThreshold;
- private final int largeMessageProcessingTimeThresholdMs;
- private final boolean consumerDisabled;
+ public static final String CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_PATTERN = "atlas.notification.consumer.preprocess.hive_table.ignore.pattern";
+ public static final String CONSUMER_PREPROCESS_HIVE_TABLE_PRUNE_PATTERN = "atlas.notification.consumer.preprocess.hive_table.prune.pattern";
+ public static final String CONSUMER_PREPROCESS_HIVE_TABLE_CACHE_SIZE = "atlas.notification.consumer.preprocess.hive_table.cache.size";
+
+ private final AtlasEntityStore atlasEntityStore;
+ private final ServiceState serviceState;
+ private final AtlasInstanceConverter instanceConverter;
+ private final AtlasTypeRegistry typeRegistry;
+ private final int maxRetries;
+ private final int failedMsgCacheSize;
+ private final boolean skipHiveColumnLineageHive20633;
+ private final int skipHiveColumnLineageHive20633InputsThreshold;
+ private final int largeMessageProcessingTimeThresholdMs;
+ private final boolean consumerDisabled;
+ private final List<Pattern> hiveTablesToIgnore = new ArrayList<>();
+ private final List<Pattern> hiveTablesToPrune = new ArrayList<>();
+ private final Map<String, PreprocessAction> hiveTablesCache;
+ private final boolean preprocessEnabled;
@VisibleForTesting
final int consumerRetryInterval;
@@ -144,10 +158,10 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
ServiceState serviceState, AtlasInstanceConverter instanceConverter,
AtlasTypeRegistry typeRegistry) throws AtlasException {
this.notificationInterface = notificationInterface;
- this.atlasEntityStore = atlasEntityStore;
- this.serviceState = serviceState;
- this.instanceConverter = instanceConverter;
- this.typeRegistry = typeRegistry;
+ this.atlasEntityStore = atlasEntityStore;
+ this.serviceState = serviceState;
+ this.instanceConverter = instanceConverter;
+ this.typeRegistry = typeRegistry;
this.applicationProperties = ApplicationProperties.get();
@@ -162,6 +176,43 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
largeMessageProcessingTimeThresholdMs = applicationProperties.getInt("atlas.notification.consumer.large.message.processing.time.threshold.ms", 60 * 1000); // 60 sec by default
consumerDisabled = applicationProperties.getBoolean(CONSUMER_DISABLED, false);
+ String[] patternHiveTablesToIgnore = applicationProperties.getStringArray(CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_PATTERN);
+ String[] patternHiveTablesToPrune = applicationProperties.getStringArray(CONSUMER_PREPROCESS_HIVE_TABLE_PRUNE_PATTERN);
+
+ if (patternHiveTablesToIgnore != null) {
+ for (String pattern : patternHiveTablesToIgnore) {
+ try {
+ hiveTablesToIgnore.add(Pattern.compile(pattern));
+
+ LOG.info("{}={}", CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_PATTERN, pattern);
+ } catch (Throwable t) {
+ LOG.warn("failed to compile pattern {}", pattern, t);
+ LOG.warn("Ignoring invalid pattern in configuration {}: {}", CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_PATTERN, pattern);
+ }
+ }
+ }
+
+ if (patternHiveTablesToPrune != null) {
+ for (String pattern : patternHiveTablesToPrune) {
+ try {
+ hiveTablesToPrune.add(Pattern.compile(pattern));
+
+ LOG.info("{}={}", CONSUMER_PREPROCESS_HIVE_TABLE_PRUNE_PATTERN, pattern);
+ } catch (Throwable t) {
+ LOG.warn("failed to compile pattern {}", pattern, t);
+ LOG.warn("Ignoring invalid pattern in configuration {}: {}", CONSUMER_PREPROCESS_HIVE_TABLE_PRUNE_PATTERN, pattern);
+ }
+ }
+ }
+
+ if (!hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty()) {
+ hiveTablesCache = new LruCache<>(applicationProperties.getInt(CONSUMER_PREPROCESS_HIVE_TABLE_CACHE_SIZE, 10000), 0);
+ } else {
+ hiveTablesCache = Collections.emptyMap();
+ }
+
+ preprocessEnabled = !hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() || skipHiveColumnLineageHive20633;
+
LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, skipHiveColumnLineageHive20633);
LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, skipHiveColumnLineageHive20633InputsThreshold);
}
@@ -406,6 +457,11 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
preProcessNotificationMessage(kafkaMsg);
+ if (isEmptyMessage(kafkaMsg)) {
+ commit(kafkaMsg);
+ return;
+ }
+
// Used for intermediate conversions during create and update
for (int numRetries = 0; numRetries < maxRetries; numRetries++) {
if (LOG.isDebugEnabled()) {
@@ -669,39 +725,86 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
}
private void preProcessNotificationMessage(AtlasKafkaMessage<HookNotificationMessage> kafkaMsg) {
- skipHiveColumnLineage(kafkaMsg);
- }
-
- private void skipHiveColumnLineage(AtlasKafkaMessage<HookNotificationMessage> kafkaMessage) {
- if (!skipHiveColumnLineageHive20633) {
+ if (!preprocessEnabled) {
return;
}
- final HookNotificationMessage message = kafkaMessage.getMessage();
- final AtlasEntitiesWithExtInfo entities;
+ PreprocessorContext context = new PreprocessorContext(kafkaMsg, hiveTablesToIgnore, hiveTablesToPrune, hiveTablesCache);
- switch (message.getType()) {
- case ENTITY_CREATE_V2:
- entities = ((EntityCreateRequestV2) message).getEntities();
- break;
+ if (!hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty()) {
+ ignoreOrPruneHiveTables(context);
+ }
+
+ if (skipHiveColumnLineageHive20633) {
+ skipHiveColumnLineage(context);
+ }
+ }
- case ENTITY_FULL_UPDATE_V2:
- entities = ((EntityUpdateRequestV2) message).getEntities();
- break;
+ private void ignoreOrPruneHiveTables(PreprocessorContext context) {
+ List<AtlasEntity> entities = context.getEntities();
- default:
- entities = null;
- break;
+ if (entities != null) {
+ for (ListIterator<AtlasEntity> iter = entities.listIterator(); iter.hasNext(); ) {
+ AtlasEntity entity = iter.next();
+ EntityPreprocessor preprocessor = EntityPreprocessor.getPreprocessor(entity.getTypeName());
+
+ if (preprocessor != null) {
+ preprocessor.preprocess(entity, context);
+
+ if (context.isIgnoredEntity(entity.getGuid())) {
+ iter.remove();
+ }
+ }
+ }
+
+ Map<String, AtlasEntity> referredEntities = context.getReferredEntities();
+
+ if (referredEntities != null) {
+ // scan referredEntities for pruning
+ for (Iterator<Map.Entry<String, AtlasEntity>> iter = referredEntities.entrySet().iterator(); iter.hasNext(); ) {
+ AtlasEntity entity = iter.next().getValue();
+ EntityPreprocessor preprocessor = EntityPreprocessor.getPreprocessor(entity.getTypeName());
+
+ if (preprocessor != null) {
+ preprocessor.preprocess(entity, context);
+
+ if (context.isIgnoredEntity(entity.getGuid())) {
+ iter.remove();
+ }
+ }
+ }
+
+ for (String guid : context.getReferredEntitiesToMove()) {
+ AtlasEntity entity = referredEntities.remove(guid);
+
+ if (entity != null) {
+ entities.add(entity);
+
+ LOG.info("moved referred entity: typeName={}, qualifiedName={}. topic-offset={}, partition={}", entity.getTypeName(), EntityPreprocessor.getQualifiedName(entity), context.getKafkaMessageOffset(), context.getKafkaPartition());
+ }
+ }
+ }
+
+ int ignoredEntities = context.getIgnoredEntities().size();
+ int prunedEntities = context.getPrunedEntities().size();
+
+ if (ignoredEntities > 0 || prunedEntities > 0) {
+ LOG.info("preprocess: ignored entities={}; pruned entities={}. topic-offset={}, partition={}", ignoredEntities, prunedEntities, context.getKafkaMessageOffset(), context.getKafkaPartition());
+ }
}
+ }
+
+ private void skipHiveColumnLineage(PreprocessorContext context) {
+ List<AtlasEntity> entities = context.getEntities();
- if (entities != null && entities.getEntities() != null) {
+ if (entities != null) {
int lineageCount = 0;
int lineageInputsCount = 0;
int numRemovedEntities = 0;
Set<String> lineageQNames = new HashSet<>();
// find if all hive_column_lineage entities have same number of inputs, which is likely to be caused by HIVE-20633 that results in incorrect lineage in some cases
- for (ListIterator<AtlasEntity> iter = entities.getEntities().listIterator(); iter.hasNext(); ) {
+ for (ListIterator<AtlasEntity> iter = entities.listIterator(); iter.hasNext(); ) {
AtlasEntity entity = iter.next();
if (StringUtils.equals(entity.getTypeName(), TYPE_HIVE_COLUMN_LINEAGE)) {
@@ -713,7 +816,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
if (lineageQNames.contains(qualifiedName)) {
iter.remove();
- LOG.warn("removed duplicate hive_column_lineage entity: qualifiedName={}. topic-offset={}, partition={}", qualifiedName, kafkaMessage.getOffset(), kafkaMessage.getPartition());
+ LOG.warn("removed duplicate hive_column_lineage entity: qualifiedName={}. topic-offset={}, partition={}", qualifiedName, context.getKafkaMessageOffset(), context.getKafkaPartition());
numRemovedEntities++;
@@ -738,7 +841,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
float avgInputsCount = lineageCount > 0 ? (((float) lineageInputsCount) / lineageCount) : 0;
if (avgInputsCount > skipHiveColumnLineageHive20633InputsThreshold) {
- for (ListIterator<AtlasEntity> iter = entities.getEntities().listIterator(); iter.hasNext(); ) {
+ for (ListIterator<AtlasEntity> iter = entities.listIterator(); iter.hasNext(); ) {
AtlasEntity entity = iter.next();
if (StringUtils.equals(entity.getTypeName(), TYPE_HIVE_COLUMN_LINEAGE)) {
@@ -750,9 +853,36 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
}
if (numRemovedEntities > 0) {
- LOG.warn("removed {} hive_column_lineage entities. Average # of inputs={}, threshold={}, total # of inputs={}. topic-offset={}, partition={}", numRemovedEntities, avgInputsCount, skipHiveColumnLineageHive20633InputsThreshold, lineageInputsCount, kafkaMessage.getOffset(), kafkaMessage.getPartition());
+ LOG.warn("removed {} hive_column_lineage entities. Average # of inputs={}, threshold={}, total # of inputs={}. topic-offset={}, partition={}", numRemovedEntities, avgInputsCount, skipHiveColumnLineageHive20633InputsThreshold, lineageInputsCount, context.getKafkaMessageOffset(), context.getKafkaPartition());
+ }
+ }
+ }
+
+ private boolean isEmptyMessage(AtlasKafkaMessage<HookNotificationMessage> kafkaMsg) {
+ final boolean ret;
+ final HookNotificationMessage message = kafkaMsg.getMessage();
+
+ switch (message.getType()) {
+ case ENTITY_CREATE_V2: {
+ AtlasEntitiesWithExtInfo entities = ((EntityCreateRequestV2) message).getEntities();
+
+ ret = entities == null || CollectionUtils.isEmpty(entities.getEntities());
}
+ break;
+
+ case ENTITY_FULL_UPDATE_V2: {
+ AtlasEntitiesWithExtInfo entities = ((EntityUpdateRequestV2) message).getEntities();
+
+ ret = entities == null || CollectionUtils.isEmpty(entities.getEntities());
+ }
+ break;
+
+ default:
+ ret = false;
+ break;
}
+
+ return ret;
}
private void audit(String messageUser, String method, String path) {
http://git-wip-us.apache.org/repos/asf/atlas/blob/a35379c1/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java
new file mode 100644
index 0000000..bdea14a
--- /dev/null
+++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.notification.preprocessor;
+
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
+import org.apache.atlas.model.instance.AtlasObjectId;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+
+public abstract class EntityPreprocessor {
+ public static final String TYPE_HIVE_COLUMN = "hive_column";
+ public static final String TYPE_HIVE_COLUMN_LINEAGE = "hive_column_lineage";
+ public static final String TYPE_HIVE_PROCESS = "hive_process";
+ public static final String TYPE_HIVE_STORAGEDESC = "hive_storagedesc";
+ public static final String TYPE_HIVE_TABLE = "hive_table";
+
+ public static final String ATTRIBUTE_COLUMNS = "columns";
+ public static final String ATTRIBUTE_INPUTS = "inputs";
+ public static final String ATTRIBUTE_OUTPUTS = "outputs";
+ public static final String ATTRIBUTE_PARTITION_KEYS = "partitionKeys";
+ public static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName";
+ public static final String ATTRIBUTE_SD = "sd";
+
+ public static final char QNAME_SEP_CLUSTER_NAME = '@';
+ public static final char QNAME_SEP_ENTITY_NAME = '.';
+ public static final String QNAME_SD_SUFFIX = "_storage";
+
+ private static final Map<String, EntityPreprocessor> PREPROCESSOR_MAP = new HashMap<>();
+
+ private final String typeName;
+
+
+ static {
+ EntityPreprocessor[] preprocessors = new EntityPreprocessor[] {
+ new HivePreprocessor.HiveTablePreprocessor(),
+ new HivePreprocessor.HiveColumnPreprocessor(),
+ new HivePreprocessor.HiveProcessPreprocessor(),
+ new HivePreprocessor.HiveColumnLineageProcessPreprocessor(),
+ new HivePreprocessor.HiveStorageDescPreprocessor()
+ };
+
+ for (EntityPreprocessor preprocessor : preprocessors) {
+ PREPROCESSOR_MAP.put(preprocessor.getTypeName(), preprocessor);
+ }
+ }
+
+ protected EntityPreprocessor(String typeName) {
+ this.typeName = typeName;
+ }
+
+ public String getTypeName() {
+ return typeName;
+ }
+
+ public abstract void preprocess(AtlasEntity entity, PreprocessorContext context);
+
+
+ public static EntityPreprocessor getPreprocessor(String typeName) {
+ return typeName != null ? PREPROCESSOR_MAP.get(typeName) : null;
+ }
+
+ public static String getQualifiedName(AtlasEntity entity) {
+ Object obj = entity != null ? entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME) : null;
+
+ return obj != null ? obj.toString() : null;
+ }
+
+ public String getTypeName(Object obj) {
+ Object ret = null;
+
+ if (obj instanceof AtlasObjectId) {
+ ret = ((AtlasObjectId) obj).getTypeName();
+ } else if (obj instanceof Map) {
+ ret = ((Map) obj).get(AtlasObjectId.KEY_TYPENAME);
+ } else if (obj instanceof AtlasEntity) {
+ ret = ((AtlasEntity) obj).getTypeName();
+ } else if (obj instanceof AtlasEntityWithExtInfo) {
+ ret = ((AtlasEntityWithExtInfo) obj).getEntity().getTypeName();
+ }
+
+ return ret != null ? ret.toString() : null;
+ }
+
+ public String getQualifiedName(Object obj) {
+ Map<String, Object> attributes = null;
+
+ if (obj instanceof AtlasObjectId) {
+ attributes = ((AtlasObjectId) obj).getUniqueAttributes();
+ } else if (obj instanceof Map) {
+ attributes = (Map) ((Map) obj).get(AtlasObjectId.KEY_UNIQUE_ATTRIBUTES);
+ } else if (obj instanceof AtlasEntity) {
+ attributes = ((AtlasEntity) obj).getAttributes();
+ } else if (obj instanceof AtlasEntityWithExtInfo) {
+ attributes = ((AtlasEntityWithExtInfo) obj).getEntity().getAttributes();
+ }
+
+ Object ret = attributes != null ? attributes.get(ATTRIBUTE_QUALIFIED_NAME) : null;
+
+ return ret != null ? ret.toString() : null;
+ }
+
+ protected boolean isEmpty(Object obj) {
+ return obj == null || ((obj instanceof Collection) && ((Collection) obj).isEmpty());
+ }
+}
+
+
http://git-wip-us.apache.org/repos/asf/atlas/blob/a35379c1/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java
new file mode 100644
index 0000000..d54c88d
--- /dev/null
+++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java
@@ -0,0 +1,253 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.notification.preprocessor;
+
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.notification.preprocessor.PreprocessorContext.PreprocessAction;
+import org.apache.commons.lang.StringUtils;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+public class HivePreprocessor {
+ static class HiveTablePreprocessor extends EntityPreprocessor {
+ public HiveTablePreprocessor() {
+ super(TYPE_HIVE_TABLE);
+ }
+
+ @Override
+ public void preprocess(AtlasEntity entity, PreprocessorContext context) {
+ if (context.isIgnoredEntity(entity.getGuid())) {
+ context.addToIgnoredEntities(entity); // so that this will be logged with typeName and qualifiedName
+ } else {
+ PreprocessAction action = context.getPreprocessActionForHiveTable(getQualifiedName(entity));
+
+ if (action == PreprocessAction.IGNORE) {
+ context.addToIgnoredEntities(entity);
+
+ context.addToIgnoredEntities(entity.getAttribute(ATTRIBUTE_SD));
+ context.addToIgnoredEntities(entity.getAttribute(ATTRIBUTE_COLUMNS));
+ context.addToIgnoredEntities(entity.getAttribute(ATTRIBUTE_PARTITION_KEYS));
+ } else if (action == PreprocessAction.PRUNE) {
+ context.addToPrunedEntities(entity);
+
+ context.addToIgnoredEntities(entity.getAttribute(ATTRIBUTE_SD));
+ context.addToIgnoredEntities(entity.getAttribute(ATTRIBUTE_COLUMNS));
+ context.addToIgnoredEntities(entity.getAttribute(ATTRIBUTE_PARTITION_KEYS));
+
+ entity.setAttribute(ATTRIBUTE_SD, null);
+ entity.setAttribute(ATTRIBUTE_COLUMNS, null);
+ entity.setAttribute(ATTRIBUTE_PARTITION_KEYS, null);
+ }
+ }
+ }
+ }
+
+
+ static class HiveColumnPreprocessor extends EntityPreprocessor {
+ public HiveColumnPreprocessor() {
+ super(TYPE_HIVE_COLUMN);
+ }
+
+ @Override
+ public void preprocess(AtlasEntity entity, PreprocessorContext context) {
+ if (!context.isIgnoredEntity(entity.getGuid())) {
+ PreprocessAction action = context.getPreprocessActionForHiveTable(getHiveTableQualifiedName(getQualifiedName(entity)));
+
+ if (action == PreprocessAction.IGNORE || action == PreprocessAction.PRUNE) {
+ context.addToIgnoredEntities(entity.getGuid());
+ }
+ }
+ }
+
+ public static String getHiveTableQualifiedName(String columnQualifiedName) {
+ String dbTableName = null;
+ String clusterName = null;
+
+ int sepPos = columnQualifiedName.lastIndexOf(QNAME_SEP_CLUSTER_NAME);
+
+ if (sepPos != -1 && columnQualifiedName.length() > (sepPos + 1)) {
+ clusterName = columnQualifiedName.substring(sepPos + 1);
+ }
+
+ sepPos = columnQualifiedName.lastIndexOf(QNAME_SEP_ENTITY_NAME);
+
+ if (sepPos != -1) {
+ dbTableName = columnQualifiedName.substring(0, sepPos);
+ }
+
+ return clusterName != null ? (dbTableName + QNAME_SEP_CLUSTER_NAME + clusterName) : dbTableName;
+ }
+ }
+
+
+ static class HiveStorageDescPreprocessor extends EntityPreprocessor {
+ public HiveStorageDescPreprocessor() {
+ super(TYPE_HIVE_STORAGEDESC);
+ }
+
+ @Override
+ public void preprocess(AtlasEntity entity, PreprocessorContext context) {
+ if (!context.isIgnoredEntity(entity.getGuid())) {
+ PreprocessAction action = context.getPreprocessActionForHiveTable(getHiveTableQualifiedName(getQualifiedName(entity)));
+
+ if (action == PreprocessAction.IGNORE || action == PreprocessAction.PRUNE) {
+ context.addToIgnoredEntities(entity.getGuid());
+ }
+ }
+ }
+
+ public static String getHiveTableQualifiedName(String sdQualifiedName) {
+ int sepPos = sdQualifiedName.lastIndexOf(QNAME_SD_SUFFIX);
+
+ return sepPos != -1 ? sdQualifiedName.substring(0, sepPos) : sdQualifiedName;
+ }
+ }
+
+
+ static class HiveProcessPreprocessor extends EntityPreprocessor {
+ public HiveProcessPreprocessor() {
+ super(TYPE_HIVE_PROCESS);
+ }
+
+ public HiveProcessPreprocessor(String typeName) {
+ super(typeName);
+ }
+
+ @Override
+ public void preprocess(AtlasEntity entity, PreprocessorContext context) {
+ if (context.isIgnoredEntity(entity.getGuid())) {
+ context.addToIgnoredEntities(entity); // so that this will be logged with typeName and qualifiedName
+ } else {
+ Object inputs = entity.getAttribute(ATTRIBUTE_INPUTS);
+ Object outputs = entity.getAttribute(ATTRIBUTE_OUTPUTS);
+
+ removeIgnoredObjectIds(inputs, context);
+ removeIgnoredObjectIds(outputs, context);
+
+ boolean isInputsEmpty = isEmpty(inputs);
+ boolean isOutputsEmpty = isEmpty(outputs);
+
+ if (isInputsEmpty || isOutputsEmpty) {
+ context.addToIgnoredEntities(entity);
+
+ // since the process entity is ignored, entities referenced by inputs/outputs of this process entity
+ // may not be processed by Atlas, if they are present in referredEntities. So, move them from
+ // 'referredEntities' to 'entities'. However, this is not necessary for hive_column entities,
+ // as these entities would be referenced by hive_table entities
+ if (!StringUtils.equals(entity.getTypeName(), TYPE_HIVE_COLUMN_LINEAGE)) {
+ if (!isInputsEmpty) {
+ for (Object obj : (Collection) inputs) {
+ String guid = context.getGuid(obj);
+
+ context.addToReferredEntitiesToMove(guid);
+ }
+ } else if (!isOutputsEmpty) {
+ for (Object obj : (Collection) outputs) {
+ String guid = context.getGuid(obj);
+
+ context.addToReferredEntitiesToMove(guid);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ private void removeIgnoredObjectIds(Object obj, PreprocessorContext context) {
+ if (obj == null || !(obj instanceof Collection)) {
+ return;
+ }
+
+ Collection objList = (Collection) obj;
+ List<Object> toRemove = null;
+
+ for (Object objElem : objList) {
+ boolean removeEntry = false;
+ String guid = context.getGuid(objElem);
+
+ if (guid != null) {
+ removeEntry = context.isIgnoredEntity(guid);
+
+ if (!removeEntry) { // perhaps entity hasn't been preprocessed yet
+ AtlasEntity entity = context.getEntity(guid);
+
+ if (entity != null) {
+ switch (entity.getTypeName()) {
+ case TYPE_HIVE_TABLE: {
+ PreprocessAction action = context.getPreprocessActionForHiveTable(getQualifiedName(entity));
+
+ removeEntry = (action == PreprocessAction.IGNORE);
+ }
+ break;
+
+ case TYPE_HIVE_COLUMN: {
+ PreprocessAction action = context.getPreprocessActionForHiveTable(HiveColumnPreprocessor.getHiveTableQualifiedName(getQualifiedName(entity)));
+
+ // if the table is ignored or pruned, remove the column
+ removeEntry = (action == PreprocessAction.IGNORE || action == PreprocessAction.PRUNE);
+ }
+ break;
+ }
+ }
+ }
+ } else {
+ String typeName = getTypeName(objElem);
+
+ if (typeName != null) {
+ switch (typeName) {
+ case TYPE_HIVE_TABLE: {
+ PreprocessAction action = context.getPreprocessActionForHiveTable(getQualifiedName(objElem));
+
+ removeEntry = (action == PreprocessAction.IGNORE);
+ }
+ break;
+
+ case TYPE_HIVE_COLUMN: {
+ PreprocessAction action = context.getPreprocessActionForHiveTable(HiveColumnPreprocessor.getHiveTableQualifiedName(getQualifiedName(objElem)));
+
+ // if the table is ignored or pruned, remove the column
+ removeEntry = (action == PreprocessAction.IGNORE || action == PreprocessAction.PRUNE);
+ }
+ break;
+ }
+ }
+ }
+
+ if (removeEntry) {
+ if (toRemove == null) {
+ toRemove = new ArrayList();
+ }
+
+ toRemove.add(objElem);
+ }
+ }
+
+ if (toRemove != null) {
+ objList.removeAll(toRemove);
+ }
+ }
+ }
+
+ static class HiveColumnLineageProcessPreprocessor extends HiveProcessPreprocessor {
+ public HiveColumnLineageProcessPreprocessor() {
+ super(TYPE_HIVE_COLUMN_LINEAGE);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/atlas/blob/a35379c1/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
new file mode 100644
index 0000000..8f62768
--- /dev/null
+++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.notification.preprocessor;
+
+import org.apache.atlas.kafka.AtlasKafkaMessage;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.notification.hook.HookNotification;
+import org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage;
+import org.apache.commons.collections.CollectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+
+public class PreprocessorContext {
+ private static final Logger LOG = LoggerFactory.getLogger(PreprocessorContext.class);
+
+ public enum PreprocessAction { NONE, IGNORE, PRUNE }
+
+ private final AtlasKafkaMessage<HookNotificationMessage> kafkaMessage;
+ private final AtlasEntitiesWithExtInfo entitiesWithExtInfo;
+ private final List<Pattern> hiveTablesToIgnore;
+ private final List<Pattern> hiveTablesToPrune;
+ private final Map<String, PreprocessAction> hiveTablesCache;
+ private final Set<String> ignoredEntities = new HashSet<>();
+ private final Set<String> prunedEntities = new HashSet<>();
+ private final Set<String> referredEntitiesToMove = new HashSet<>();
+
+ public PreprocessorContext(AtlasKafkaMessage<HookNotificationMessage> kafkaMessage, List<Pattern> hiveTablesToIgnore, List<Pattern> hiveTablesToPrune, Map<String, PreprocessAction> hiveTablesCache) {
+ this.kafkaMessage = kafkaMessage;
+ this.hiveTablesToIgnore = hiveTablesToIgnore;
+ this.hiveTablesToPrune = hiveTablesToPrune;
+ this.hiveTablesCache = hiveTablesCache;
+
+ final HookNotificationMessage message = kafkaMessage.getMessage();
+
+ switch (message.getType()) {
+ case ENTITY_CREATE_V2:
+ entitiesWithExtInfo = ((HookNotification.EntityCreateRequestV2) message).getEntities();
+ break;
+
+ case ENTITY_FULL_UPDATE_V2:
+ entitiesWithExtInfo = ((HookNotification.EntityUpdateRequestV2) message).getEntities();
+ break;
+
+ default:
+ entitiesWithExtInfo = null;
+ break;
+ }
+ }
+
+ public AtlasKafkaMessage<HookNotificationMessage> getKafkaMessage() {
+ return kafkaMessage;
+ }
+
+ public long getKafkaMessageOffset() {
+ return kafkaMessage.getOffset();
+ }
+
+ public int getKafkaPartition() {
+ return kafkaMessage.getPartition();
+ }
+
+ public List<AtlasEntity> getEntities() {
+ return entitiesWithExtInfo != null ? entitiesWithExtInfo.getEntities() : null;
+ }
+
+ public Map<String, AtlasEntity> getReferredEntities() {
+ return entitiesWithExtInfo != null ? entitiesWithExtInfo.getReferredEntities() : null;
+ }
+
+ public AtlasEntity getEntity(String guid) {
+ return entitiesWithExtInfo != null && guid != null ? entitiesWithExtInfo.getEntity(guid) : null;
+ }
+
+ public Set<String> getIgnoredEntities() { return ignoredEntities; }
+
+ public Set<String> getPrunedEntities() { return prunedEntities; }
+
+ public Set<String> getReferredEntitiesToMove() { return referredEntitiesToMove; }
+
+ public PreprocessAction getPreprocessActionForHiveTable(String qualifiedName) {
+ PreprocessAction ret = PreprocessAction.NONE;
+
+ if (qualifiedName != null && (CollectionUtils.isNotEmpty(hiveTablesToIgnore) || CollectionUtils.isNotEmpty(hiveTablesToPrune))) {
+ ret = hiveTablesCache.get(qualifiedName);
+
+ if (ret == null) {
+ if (isMatch(qualifiedName, hiveTablesToIgnore)) {
+ ret = PreprocessAction.IGNORE;
+ } else if (isMatch(qualifiedName, hiveTablesToPrune)) {
+ ret = PreprocessAction.PRUNE;
+ } else {
+ ret = PreprocessAction.NONE;
+ }
+
+ hiveTablesCache.put(qualifiedName, ret);
+ }
+ }
+
+ return ret;
+ }
+
+ public boolean isIgnoredEntity(String guid) {
+ return guid != null ? ignoredEntities.contains(guid) : false;
+ }
+
+ public boolean isPrunedEntity(String guid) {
+ return guid != null ? prunedEntities.contains(guid) : false;
+ }
+
+ public void addToIgnoredEntities(AtlasEntity entity) {
+ if (!ignoredEntities.contains(entity.getGuid())) {
+ ignoredEntities.add(entity.getGuid());
+
+ LOG.info("ignored entity: typeName={}, qualifiedName={}. topic-offset={}, partition={}", entity.getTypeName(), EntityPreprocessor.getQualifiedName(entity), getKafkaMessageOffset(), getKafkaPartition());
+ }
+ }
+
+ public void addToPrunedEntities(AtlasEntity entity) {
+ if (!prunedEntities.contains(entity.getGuid())) {
+ prunedEntities.add(entity.getGuid());
+
+ LOG.info("pruned entity: typeName={}, qualifiedName={} topic-offset={}, partition={}", entity.getTypeName(), EntityPreprocessor.getQualifiedName(entity), getKafkaMessageOffset(), getKafkaPartition());
+ }
+ }
+
+ public void addToIgnoredEntities(String guid) {
+ if (guid != null) {
+ ignoredEntities.add(guid);
+ }
+ }
+
+ public void addToPrunedEntities(String guid) {
+ if (guid != null) {
+ prunedEntities.add(guid);
+ }
+ }
+
+ public void addToReferredEntitiesToMove(String guid) {
+ if (guid != null) {
+ referredEntitiesToMove.add(guid);
+ }
+ }
+
+ public void addToIgnoredEntities(Object obj) {
+ collectGuids(obj, ignoredEntities);
+ }
+
+ public void addToPrunedEntities(Object obj) {
+ collectGuids(obj, prunedEntities);
+ }
+
+ public String getGuid(Object obj) {
+ Object ret = null;
+
+ if (obj instanceof AtlasObjectId) {
+ ret = ((AtlasObjectId) obj).getGuid();
+ } else if (obj instanceof Map) {
+ ret = ((Map) obj).get(AtlasObjectId.KEY_GUID);
+ } else if (obj instanceof AtlasEntity) {
+ ret = ((AtlasEntity) obj).getGuid();
+ } else if (obj instanceof AtlasEntity.AtlasEntityWithExtInfo) {
+ ret = ((AtlasEntity.AtlasEntityWithExtInfo) obj).getEntity().getGuid();
+ }
+
+ return ret != null ? ret.toString() : null;
+ }
+
+
+ private boolean isMatch(String name, List<Pattern> patterns) {
+ boolean ret = false;
+
+ for (Pattern p : patterns) {
+ if (p.matcher(name).matches()) {
+ ret = true;
+
+ break;
+ }
+ }
+
+ return ret;
+ }
+
+ private void collectGuids(Object obj, Set<String> guids) {
+ if (obj != null) {
+ if (obj instanceof Collection) {
+ Collection objList = (Collection) obj;
+
+ for (Object objElem : objList) {
+ collectGuid(objElem, guids);
+ }
+ } else {
+ collectGuid(obj, guids);
+ }
+ }
+ }
+
+ private void collectGuid(Object obj, Set<String> guids) {
+ String guid = getGuid(obj);
+
+ if (guid != null) {
+ guids.add(guid);
+ }
+ }
+}