You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by sa...@apache.org on 2019/01/04 21:59:21 UTC

atlas git commit: ATLAS-3006: Option to ignore/prune metadata for temporary/staging Hive tables

Repository: atlas
Updated Branches:
  refs/heads/branch-0.8 a48249e0b -> a35379c1d


ATLAS-3006: Option to ignore/prune metadata for temporary/staging Hive tables

Signed-off-by: Sarath Subramanian <ss...@hortonworks.com>


Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/a35379c1
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/a35379c1
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/a35379c1

Branch: refs/heads/branch-0.8
Commit: a35379c1deee6f11396e00916400ec9f475ee8a3
Parents: a48249e
Author: Madhan Neethiraj <ma...@apache.org>
Authored: Fri Jan 4 13:58:58 2019 -0800
Committer: Sarath Subramanian <ss...@hortonworks.com>
Committed: Fri Jan 4 13:58:58 2019 -0800

----------------------------------------------------------------------
 .../atlas/hive/hook/AtlasHiveHookContext.java   |   5 +
 .../org/apache/atlas/hive/hook/HiveHook.java    |  87 ++++++-
 .../hive/hook/events/AlterTableRename.java      |   9 +-
 .../atlas/hive/hook/events/BaseHiveEvent.java   | 119 +++++----
 .../atlas/hive/hook/events/CreateTable.java     |   2 +-
 .../notification/NotificationHookConsumer.java  | 208 ++++++++++++---
 .../preprocessor/EntityPreprocessor.java        | 126 +++++++++
 .../preprocessor/HivePreprocessor.java          | 253 +++++++++++++++++++
 .../preprocessor/PreprocessorContext.java       | 229 +++++++++++++++++
 9 files changed, 943 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/atlas/blob/a35379c1/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java
index 23cb853..249f48b 100644
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java
@@ -20,6 +20,7 @@ package org.apache.atlas.hive.hook;
 
 import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.hive.hook.HiveHook.HiveHookObjectNamesCache;
+import org.apache.atlas.hive.hook.HiveHook.PreprocessAction;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang.RandomStringUtils;
 import org.apache.hadoop.hive.metastore.api.Database;
@@ -97,6 +98,10 @@ public class AtlasHiveHookContext {
         return hook.getSkipHiveColumnLineageHive20633InputsThreshold();
     }
 
+    public PreprocessAction getPreprocessActionForHiveTable(String qualifiedName) {
+        return hook.getPreprocessActionForHiveTable(qualifiedName);
+    }
+
     public String getQualifiedName(Database db) {
         return (db.getName() + QNAME_SEP_CLUSTER_NAME).toLowerCase() + getClusterName();
     }

http://git-wip-us.apache.org/repos/asf/atlas/blob/a35379c1/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
index 7b60553..4a6b417 100644
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
@@ -21,6 +21,8 @@ package org.apache.atlas.hive.hook;
 import org.apache.atlas.hive.hook.events.*;
 import org.apache.atlas.hook.AtlasHook;
 import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.utils.LruCache;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext;
 import org.apache.hadoop.hive.ql.hooks.HookContext;
@@ -30,12 +32,15 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.regex.Pattern;
 
 import static org.apache.atlas.hive.hook.events.BaseHiveEvent.ATTRIBUTE_QUALIFIED_NAME;
 import static org.apache.atlas.hive.hook.events.BaseHiveEvent.HIVE_TYPE_DB;
@@ -45,6 +50,8 @@ import static org.apache.atlas.hive.hook.events.BaseHiveEvent.HIVE_TYPE_TABLE;
 public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
     private static final Logger LOG = LoggerFactory.getLogger(HiveHook.class);
 
+    public enum PreprocessAction { NONE, IGNORE, PRUNE }
+
     public static final String CONF_PREFIX                         = "atlas.hook.hive.";
     public static final String CONF_CLUSTER_NAME                   = "atlas.cluster.name";
     public static final String HDFS_PATH_CONVERT_TO_LOWER_CASE     = CONF_PREFIX + "hdfs_path.convert_to_lowercase";
@@ -54,6 +61,9 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
     public static final String HOOK_NAME_CACHE_REBUID_INTERVAL_SEC = CONF_PREFIX + "name.cache.rebuild.interval.seconds";
     public static final String HOOK_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633                  = CONF_PREFIX + "skip.hive_column_lineage.hive-20633";
     public static final String HOOK_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD = CONF_PREFIX + "skip.hive_column_lineage.hive-20633.inputs.threshold";
+    public static final String HOOK_HIVE_TABLE_IGNORE_PATTERN                            = CONF_PREFIX + "hive_table.ignore.pattern";
+    public static final String HOOK_HIVE_TABLE_PRUNE_PATTERN                             = CONF_PREFIX + "hive_table.prune.pattern";
+    public static final String HOOK_HIVE_TABLE_CACHE_SIZE                                = CONF_PREFIX + "hive_table.cache.size";
 
     public static final String DEFAULT_CLUSTER_NAME = "primary";
 
@@ -66,8 +76,11 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
     private static final int     nameCacheTableMaxCount;
     private static final int     nameCacheRebuildIntervalSeconds;
 
-    private static final boolean skipHiveColumnLineageHive20633;
-    private static final int     skipHiveColumnLineageHive20633InputsThreshold;
+    private static final boolean                       skipHiveColumnLineageHive20633;
+    private static final int                           skipHiveColumnLineageHive20633InputsThreshold;
+    private static final List<Pattern>                 hiveTablesToIgnore = new ArrayList<>();
+    private static final List<Pattern>                 hiveTablesToPrune  = new ArrayList<>();
+    private static final Map<String, PreprocessAction> hiveTablesCache;
 
     private static HiveHookObjectNamesCache knownObjects = null;
 
@@ -85,6 +98,41 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
         skipHiveColumnLineageHive20633                = atlasProperties.getBoolean(HOOK_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, true);
         skipHiveColumnLineageHive20633InputsThreshold = atlasProperties.getInt(HOOK_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, 15); // skip if avg # of inputs is > 15
 
+        String[] patternHiveTablesToIgnore = atlasProperties.getStringArray(HOOK_HIVE_TABLE_IGNORE_PATTERN);
+        String[] patternHiveTablesToPrune  = atlasProperties.getStringArray(HOOK_HIVE_TABLE_PRUNE_PATTERN);
+
+        if (patternHiveTablesToIgnore != null) {
+            for (String pattern : patternHiveTablesToIgnore) {
+                try {
+                    hiveTablesToIgnore.add(Pattern.compile(pattern));
+
+                    LOG.info("{}={}", HOOK_HIVE_TABLE_IGNORE_PATTERN, pattern);
+                } catch (Throwable t) {
+                    LOG.warn("failed to compile pattern {}", pattern, t);
+                    LOG.warn("Ignoring invalid pattern in configuration {}: {}", HOOK_HIVE_TABLE_IGNORE_PATTERN, pattern);
+                }
+            }
+        }
+
+        if (patternHiveTablesToPrune != null) {
+            for (String pattern : patternHiveTablesToPrune) {
+                try {
+                    hiveTablesToPrune.add(Pattern.compile(pattern));
+
+                    LOG.info("{}={}", HOOK_HIVE_TABLE_PRUNE_PATTERN, pattern);
+                } catch (Throwable t) {
+                    LOG.warn("failed to compile pattern {}", pattern, t);
+                    LOG.warn("Ignoring invalid pattern in configuration {}: {}", HOOK_HIVE_TABLE_PRUNE_PATTERN, pattern);
+                }
+            }
+        }
+
+        if (!hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty()) {
+            hiveTablesCache = new LruCache<>(atlasProperties.getInt(HOOK_HIVE_TABLE_CACHE_SIZE, 10000), 0);
+        } else {
+            hiveTablesCache = Collections.emptyMap();
+        }
+
         knownObjects = nameCacheEnabled ? new HiveHookObjectNamesCache(nameCacheDatabaseMaxCount, nameCacheTableMaxCount, nameCacheRebuildIntervalSeconds) : null;
     }
 
@@ -204,6 +252,41 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
         return skipHiveColumnLineageHive20633InputsThreshold;
     }
 
+    public PreprocessAction getPreprocessActionForHiveTable(String qualifiedName) {
+        PreprocessAction ret = PreprocessAction.NONE;
+
+        if (qualifiedName != null && (CollectionUtils.isNotEmpty(hiveTablesToIgnore) || CollectionUtils.isNotEmpty(hiveTablesToPrune))) {
+            ret = hiveTablesCache.get(qualifiedName);
+
+            if (ret == null) {
+                if (isMatch(qualifiedName, hiveTablesToIgnore)) {
+                    ret = PreprocessAction.IGNORE;
+                } else if (isMatch(qualifiedName, hiveTablesToPrune)) {
+                    ret = PreprocessAction.PRUNE;
+                } else {
+                    ret = PreprocessAction.NONE;
+                }
+
+                hiveTablesCache.put(qualifiedName, ret);
+            }
+        }
+
+        return ret;
+    }
+
+    private boolean isMatch(String name, List<Pattern> patterns) {
+        boolean ret = false;
+
+        for (Pattern p : patterns) {
+            if (p.matcher(name).matches()) {
+                ret = true;
+
+                break;
+            }
+        }
+
+        return ret;
+    }
 
     public static class HiveHookObjectNamesCache {
         private final int         dbMaxCacheCount;

http://git-wip-us.apache.org/repos/asf/atlas/blob/a35379c1/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/AlterTableRename.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/AlterTableRename.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/AlterTableRename.java
index 6ced340..f4d7a82 100644
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/AlterTableRename.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/AlterTableRename.java
@@ -85,13 +85,16 @@ public class AlterTableRename extends BaseHiveEvent {
             return ret;
         }
 
-        AtlasEntityWithExtInfo oldTableEntity = toTableEntity(oldTable);
+        AtlasEntityWithExtInfo oldTableEntity     = toTableEntity(oldTable);
+        AtlasEntityWithExtInfo renamedTableEntity = toTableEntity(newTable);
+
+        if (oldTableEntity == null || renamedTableEntity == null) {
+            return ret;
+        }
 
         // first update with oldTable info, so that the table will be created if it is not present in Atlas
         ret.add(new EntityUpdateRequestV2(getUserName(), new AtlasEntitiesWithExtInfo(oldTableEntity)));
 
-        AtlasEntityWithExtInfo renamedTableEntity = toTableEntity(newTable);
-
         // update qualifiedName for all columns, partitionKeys, storageDesc
         String renamedTableQualifiedName = (String) renamedTableEntity.getEntity().getAttribute(ATTRIBUTE_QUALIFIED_NAME);
 

http://git-wip-us.apache.org/repos/asf/atlas/blob/a35379c1/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java
index 2d90a15..5c52cf4 100644
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java
@@ -19,6 +19,7 @@
 package org.apache.atlas.hive.hook.events;
 
 import org.apache.atlas.hive.hook.AtlasHiveHookContext;
+import org.apache.atlas.hive.hook.HiveHook.PreprocessAction;
 import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
 import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
@@ -284,7 +285,11 @@ public abstract class BaseHiveEvent {
 
         AtlasEntity entity = toTableEntity(table, ret);
 
-        ret.setEntity(entity);
+        if (entity != null) {
+            ret.setEntity(entity);
+        } else {
+            ret = null;
+        }
 
         return ret;
     }
@@ -292,7 +297,9 @@ public abstract class BaseHiveEvent {
     protected AtlasEntity toTableEntity(Table table, AtlasEntitiesWithExtInfo entities) throws Exception {
         AtlasEntity ret = toTableEntity(table, (AtlasEntityExtInfo) entities);
 
-        entities.addEntity(ret);
+        if (ret != null) {
+            entities.addEntity(ret);
+        }
 
         return ret;
     }
@@ -318,64 +325,76 @@ public abstract class BaseHiveEvent {
         AtlasEntity ret = context.getEntity(tblQualifiedName);
 
         if (ret == null) {
-            ret = new AtlasEntity(HIVE_TYPE_TABLE);
+            PreprocessAction action = context.getPreprocessActionForHiveTable(tblQualifiedName);
 
-            // if this table was sent in an earlier notification, set 'guid' to null - which will:
-            //  - result in this entity to be not included in 'referredEntities'
-            //  - cause Atlas server to resolve the entity by its qualifiedName
-            if (isKnownTable && !isAlterTableOperation()) {
-                ret.setGuid(null);
-            }
+            if (action == PreprocessAction.IGNORE) {
+                LOG.info("ignoring table {}", tblQualifiedName);
+            } else {
+                ret = new AtlasEntity(HIVE_TYPE_TABLE);
 
-            long createTime     = getTableCreateTime(table);
-            long lastAccessTime = table.getLastAccessTime() > 0 ? (table.getLastAccessTime() * MILLIS_CONVERT_FACTOR) : createTime;
-
-            ret.setAttribute(ATTRIBUTE_DB, dbId);
-            ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, tblQualifiedName);
-            ret.setAttribute(ATTRIBUTE_NAME, table.getTableName().toLowerCase());
-            ret.setAttribute(ATTRIBUTE_OWNER, table.getOwner());
-            ret.setAttribute(ATTRIBUTE_CREATE_TIME, createTime);
-            ret.setAttribute(ATTRIBUTE_LAST_ACCESS_TIME, lastAccessTime);
-            ret.setAttribute(ATTRIBUTE_RETENTION, table.getRetention());
-            ret.setAttribute(ATTRIBUTE_PARAMETERS, table.getParameters());
-            ret.setAttribute(ATTRIBUTE_COMMENT, table.getParameters().get(ATTRIBUTE_COMMENT));
-            ret.setAttribute(ATTRIBUTE_TABLE_TYPE, table.getTableType().name());
-            ret.setAttribute(ATTRIBUTE_TEMPORARY, table.isTemporary());
-
-            if (table.getViewOriginalText() != null) {
-                ret.setAttribute(ATTRIBUTE_VIEW_ORIGINAL_TEXT, table.getViewOriginalText());
-            }
+                // if this table was sent in an earlier notification, set 'guid' to null - which will:
+                //  - result in this entity to be not included in 'referredEntities'
+                //  - cause Atlas server to resolve the entity by its qualifiedName
+                if (isKnownTable && !isAlterTableOperation()) {
+                    ret.setGuid(null);
+                }
 
-            if (table.getViewExpandedText() != null) {
-                ret.setAttribute(ATTRIBUTE_VIEW_EXPANDED_TEXT, table.getViewExpandedText());
-            }
+                long createTime     = getTableCreateTime(table);
+                long lastAccessTime = table.getLastAccessTime() > 0 ? (table.getLastAccessTime() * MILLIS_CONVERT_FACTOR) : createTime;
+
+                ret.setAttribute(ATTRIBUTE_DB, dbId);
+                ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, tblQualifiedName);
+                ret.setAttribute(ATTRIBUTE_NAME, table.getTableName().toLowerCase());
+                ret.setAttribute(ATTRIBUTE_OWNER, table.getOwner());
+                ret.setAttribute(ATTRIBUTE_CREATE_TIME, createTime);
+                ret.setAttribute(ATTRIBUTE_LAST_ACCESS_TIME, lastAccessTime);
+                ret.setAttribute(ATTRIBUTE_RETENTION, table.getRetention());
+                ret.setAttribute(ATTRIBUTE_PARAMETERS, table.getParameters());
+                ret.setAttribute(ATTRIBUTE_COMMENT, table.getParameters().get(ATTRIBUTE_COMMENT));
+                ret.setAttribute(ATTRIBUTE_TABLE_TYPE, table.getTableType().name());
+                ret.setAttribute(ATTRIBUTE_TEMPORARY, table.isTemporary());
+
+                if (table.getViewOriginalText() != null) {
+                    ret.setAttribute(ATTRIBUTE_VIEW_ORIGINAL_TEXT, table.getViewOriginalText());
+                }
 
-            AtlasObjectId     tableId       = getObjectId(ret);
-            AtlasEntity       sd            = getStorageDescEntity(tableId, table);
-            List<AtlasEntity> partitionKeys = getColumnEntities(tableId, table, table.getPartitionKeys());
-            List<AtlasEntity> columns       = getColumnEntities(tableId, table, table.getCols());
+                if (table.getViewExpandedText() != null) {
+                    ret.setAttribute(ATTRIBUTE_VIEW_EXPANDED_TEXT, table.getViewExpandedText());
+                }
 
-            if (entityExtInfo != null) {
-                entityExtInfo.addReferredEntity(sd);
+                boolean pruneTable = table.isTemporary() || action == PreprocessAction.PRUNE;
 
-                if (partitionKeys != null) {
-                    for (AtlasEntity partitionKey : partitionKeys) {
-                        entityExtInfo.addReferredEntity(partitionKey);
+                if (pruneTable) {
+                    LOG.info("ignoring details of table {}", tblQualifiedName);
+                } else {
+                    AtlasObjectId     tableId       = getObjectId(ret);
+                    AtlasEntity       sd            = getStorageDescEntity(tableId, table);
+                    List<AtlasEntity> partitionKeys = getColumnEntities(tableId, table, table.getPartitionKeys());
+                    List<AtlasEntity> columns       = getColumnEntities(tableId, table, table.getCols());
+
+                    if (entityExtInfo != null) {
+                        entityExtInfo.addReferredEntity(sd);
+
+                        if (partitionKeys != null) {
+                            for (AtlasEntity partitionKey : partitionKeys) {
+                                entityExtInfo.addReferredEntity(partitionKey);
+                            }
+                        }
+
+                        if (columns != null) {
+                            for (AtlasEntity column : columns) {
+                                entityExtInfo.addReferredEntity(column);
+                            }
+                        }
                     }
-                }
 
-                if (columns != null) {
-                    for (AtlasEntity column : columns) {
-                        entityExtInfo.addReferredEntity(column);
-                    }
+                    ret.setAttribute(ATTRIBUTE_STORAGEDESC, getObjectId(sd));
+                    ret.setAttribute(ATTRIBUTE_PARTITION_KEYS, getObjectIds(partitionKeys));
+                    ret.setAttribute(ATTRIBUTE_COLUMNS, getObjectIds(columns));
                 }
-            }
 
-            ret.setAttribute(ATTRIBUTE_STORAGEDESC, getObjectId(sd));
-            ret.setAttribute(ATTRIBUTE_PARTITION_KEYS, getObjectIds(partitionKeys));
-            ret.setAttribute(ATTRIBUTE_COLUMNS, getObjectIds(columns));
-
-            context.putEntity(tblQualifiedName, ret);
+                context.putEntity(tblQualifiedName, ret);
+            }
         }
 
         return ret;

http://git-wip-us.apache.org/repos/asf/atlas/blob/a35379c1/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateTable.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateTable.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateTable.java
index 2afaf9f..316222d 100644
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateTable.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateTable.java
@@ -81,7 +81,7 @@ public class CreateTable extends BaseHiveEvent {
         if (table != null) {
             AtlasEntity tblEntity = toTableEntity(table, ret);
 
-            if (TableType.EXTERNAL_TABLE.equals(table.getTableType())) {
+            if (tblEntity != null && TableType.EXTERNAL_TABLE.equals(table.getTableType())) {
                 AtlasEntity hdfsPathEntity = getHDFSPathEntity(table.getDataLocation());
                 AtlasEntity processEntity  = getHiveProcessEntity(Collections.singletonList(hdfsPathEntity), Collections.singletonList(tblEntity));
 

http://git-wip-us.apache.org/repos/asf/atlas/blob/a35379c1/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 2d2a6fb..dec6860 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -45,6 +45,9 @@ import org.apache.atlas.notification.hook.HookNotification.EntityPartialUpdateRe
 import org.apache.atlas.notification.hook.HookNotification.EntityUpdateRequest;
 import org.apache.atlas.notification.hook.HookNotification.EntityUpdateRequestV2;
 import org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage;
+import org.apache.atlas.notification.preprocessor.EntityPreprocessor;
+import org.apache.atlas.notification.preprocessor.PreprocessorContext;
+import org.apache.atlas.notification.preprocessor.PreprocessorContext.PreprocessAction;
 import org.apache.atlas.repository.converters.AtlasInstanceConverter;
 import org.apache.atlas.repository.store.graph.AtlasEntityStore;
 import org.apache.atlas.repository.store.graph.v1.AtlasEntityStream;
@@ -54,9 +57,11 @@ import org.apache.atlas.type.AtlasEntityType;
 import org.apache.atlas.type.AtlasTypeRegistry;
 import org.apache.atlas.typesystem.Referenceable;
 import org.apache.atlas.utils.AtlasPerfTracer;
+import org.apache.atlas.utils.LruCache;
 import org.apache.atlas.web.filters.AuditFilter;
 import org.apache.atlas.web.service.ServiceState;
 import org.apache.atlas.web.util.DateTimeHelper;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang.StringUtils;
 import org.apache.kafka.common.TopicPartition;
@@ -68,16 +73,20 @@ import org.springframework.stereotype.Component;
 import javax.inject.Inject;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.ListIterator;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.regex.Pattern;
 
 import static org.apache.atlas.AtlasClientV2.API_V2.DELETE_ENTITY_BY_ATTRIBUTE;
 import static org.apache.atlas.AtlasClientV2.API_V2.UPDATE_ENTITY;
@@ -113,19 +122,24 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
 
     public static final String CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633                  = "atlas.notification.consumer.skip.hive_column_lineage.hive-20633";
     public static final String CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD = "atlas.notification.consumer.skip.hive_column_lineage.hive-20633.inputs.threshold";
-
-
-
-    private final AtlasEntityStore       atlasEntityStore;
-    private final ServiceState           serviceState;
-    private final AtlasInstanceConverter instanceConverter;
-    private final AtlasTypeRegistry      typeRegistry;
-    private final int                    maxRetries;
-    private final int                    failedMsgCacheSize;
-    private final boolean                skipHiveColumnLineageHive20633;
-    private final int                    skipHiveColumnLineageHive20633InputsThreshold;
-    private final int                    largeMessageProcessingTimeThresholdMs;
-    private final boolean                consumerDisabled;
+    public static final String CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_PATTERN                 = "atlas.notification.consumer.preprocess.hive_table.ignore.pattern";
+    public static final String CONSUMER_PREPROCESS_HIVE_TABLE_PRUNE_PATTERN                  = "atlas.notification.consumer.preprocess.hive_table.prune.pattern";
+    public static final String CONSUMER_PREPROCESS_HIVE_TABLE_CACHE_SIZE                     = "atlas.notification.consumer.preprocess.hive_table.cache.size";
+
+    private final AtlasEntityStore              atlasEntityStore;
+    private final ServiceState                  serviceState;
+    private final AtlasInstanceConverter        instanceConverter;
+    private final AtlasTypeRegistry             typeRegistry;
+    private final int                           maxRetries;
+    private final int                           failedMsgCacheSize;
+    private final boolean                       skipHiveColumnLineageHive20633;
+    private final int                           skipHiveColumnLineageHive20633InputsThreshold;
+    private final int                           largeMessageProcessingTimeThresholdMs;
+    private final boolean                       consumerDisabled;
+    private final List<Pattern>                 hiveTablesToIgnore = new ArrayList<>();
+    private final List<Pattern>                 hiveTablesToPrune  = new ArrayList<>();
+    private final Map<String, PreprocessAction> hiveTablesCache;
+    private final boolean                       preprocessEnabled;
 
     @VisibleForTesting
     final int consumerRetryInterval;
@@ -144,10 +158,10 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
                                     ServiceState serviceState, AtlasInstanceConverter instanceConverter,
                                     AtlasTypeRegistry typeRegistry) throws AtlasException {
         this.notificationInterface = notificationInterface;
-        this.atlasEntityStore = atlasEntityStore;
-        this.serviceState = serviceState;
-        this.instanceConverter = instanceConverter;
-        this.typeRegistry = typeRegistry;
+        this.atlasEntityStore      = atlasEntityStore;
+        this.serviceState          = serviceState;
+        this.instanceConverter     = instanceConverter;
+        this.typeRegistry          = typeRegistry;
 
         this.applicationProperties = ApplicationProperties.get();
 
@@ -162,6 +176,43 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
         largeMessageProcessingTimeThresholdMs         = applicationProperties.getInt("atlas.notification.consumer.large.message.processing.time.threshold.ms", 60 * 1000);  //  60 sec by default
         consumerDisabled 							  = applicationProperties.getBoolean(CONSUMER_DISABLED, false);
 
+        String[] patternHiveTablesToIgnore = applicationProperties.getStringArray(CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_PATTERN);
+        String[] patternHiveTablesToPrune  = applicationProperties.getStringArray(CONSUMER_PREPROCESS_HIVE_TABLE_PRUNE_PATTERN);
+
+        if (patternHiveTablesToIgnore != null) {
+            for (String pattern : patternHiveTablesToIgnore) {
+                try {
+                    hiveTablesToIgnore.add(Pattern.compile(pattern));
+
+                    LOG.info("{}={}", CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_PATTERN, pattern);
+                } catch (Throwable t) {
+                    LOG.warn("failed to compile pattern {}", pattern, t);
+                    LOG.warn("Ignoring invalid pattern in configuration {}: {}", CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_PATTERN, pattern);
+                }
+            }
+        }
+
+        if (patternHiveTablesToPrune != null) {
+            for (String pattern : patternHiveTablesToPrune) {
+                try {
+                hiveTablesToPrune.add(Pattern.compile(pattern));
+
+                LOG.info("{}={}", CONSUMER_PREPROCESS_HIVE_TABLE_PRUNE_PATTERN, pattern);
+                } catch (Throwable t) {
+                    LOG.warn("failed to compile pattern {}", pattern, t);
+                    LOG.warn("Ignoring invalid pattern in configuration {}: {}", CONSUMER_PREPROCESS_HIVE_TABLE_PRUNE_PATTERN, pattern);
+                }
+            }
+        }
+
+        if (!hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty()) {
+            hiveTablesCache = new LruCache<>(applicationProperties.getInt(CONSUMER_PREPROCESS_HIVE_TABLE_CACHE_SIZE, 10000), 0);
+        } else {
+            hiveTablesCache = Collections.emptyMap();
+        }
+
+        preprocessEnabled = !hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() || skipHiveColumnLineageHive20633;
+
         LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, skipHiveColumnLineageHive20633);
         LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, skipHiveColumnLineageHive20633InputsThreshold);
     }
@@ -406,6 +457,11 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
 
                 preProcessNotificationMessage(kafkaMsg);
 
+                if (isEmptyMessage(kafkaMsg)) {
+                    commit(kafkaMsg);
+                    return;
+                }
+
                 // Used for intermediate conversions during create and update
                 for (int numRetries = 0; numRetries < maxRetries; numRetries++) {
                     if (LOG.isDebugEnabled()) {
@@ -669,39 +725,86 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
     }
 
     private void preProcessNotificationMessage(AtlasKafkaMessage<HookNotificationMessage> kafkaMsg) {
-        skipHiveColumnLineage(kafkaMsg);
-    }
-
-    private void skipHiveColumnLineage(AtlasKafkaMessage<HookNotificationMessage> kafkaMessage) {
-        if (!skipHiveColumnLineageHive20633) {
+        if (!preprocessEnabled) {
             return;
         }
 
-        final HookNotificationMessage  message = kafkaMessage.getMessage();
-        final AtlasEntitiesWithExtInfo entities;
+        PreprocessorContext context = new PreprocessorContext(kafkaMsg, hiveTablesToIgnore, hiveTablesToPrune, hiveTablesCache);
 
-        switch (message.getType()) {
-            case ENTITY_CREATE_V2:
-                entities = ((EntityCreateRequestV2) message).getEntities();
-            break;
+        if (!hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty()) {
+            ignoreOrPruneHiveTables(context);
+        }
+
+        if (skipHiveColumnLineageHive20633) {
+            skipHiveColumnLineage(context);
+        }
+    }
 
-            case ENTITY_FULL_UPDATE_V2:
-                entities = ((EntityUpdateRequestV2) message).getEntities();
-                break;
+    private void ignoreOrPruneHiveTables(PreprocessorContext context) {
+        List<AtlasEntity> entities = context.getEntities();
 
-            default:
-                entities = null;
-            break;
+        if (entities != null) {
+            for (ListIterator<AtlasEntity> iter = entities.listIterator(); iter.hasNext(); ) {
+                AtlasEntity        entity       = iter.next();
+                EntityPreprocessor preprocessor = EntityPreprocessor.getPreprocessor(entity.getTypeName());
+
+                if (preprocessor != null) {
+                    preprocessor.preprocess(entity, context);
+
+                    if (context.isIgnoredEntity(entity.getGuid())) {
+                        iter.remove();
+                    }
+                }
+            }
+
+            Map<String, AtlasEntity> referredEntities = context.getReferredEntities();
+
+            if (referredEntities != null) {
+                // scan referredEntities for pruning
+                for (Iterator<Map.Entry<String, AtlasEntity>> iter = referredEntities.entrySet().iterator(); iter.hasNext(); ) {
+                    AtlasEntity        entity       = iter.next().getValue();
+                    EntityPreprocessor preprocessor = EntityPreprocessor.getPreprocessor(entity.getTypeName());
+
+                    if (preprocessor != null) {
+                        preprocessor.preprocess(entity, context);
+
+                        if (context.isIgnoredEntity(entity.getGuid())) {
+                            iter.remove();
+                        }
+                    }
+                }
+
+                for (String guid : context.getReferredEntitiesToMove()) {
+                    AtlasEntity entity = referredEntities.remove(guid);
+
+                    if (entity != null) {
+                        entities.add(entity);
+
+                        LOG.info("moved referred entity: typeName={}, qualifiedName={}. topic-offset={}, partition={}", entity.getTypeName(), EntityPreprocessor.getQualifiedName(entity), context.getKafkaMessageOffset(), context.getKafkaPartition());
+                    }
+                }
+            }
+
+            int ignoredEntities = context.getIgnoredEntities().size();
+            int prunedEntities  = context.getPrunedEntities().size();
+
+            if (ignoredEntities > 0 || prunedEntities > 0) {
+                LOG.info("preprocess: ignored entities={}; pruned entities={}. topic-offset={}, partition={}", ignoredEntities, prunedEntities, context.getKafkaMessageOffset(), context.getKafkaPartition());
+            }
         }
+    }
+
+    private void skipHiveColumnLineage(PreprocessorContext context) {
+        List<AtlasEntity> entities = context.getEntities();
 
-        if (entities != null && entities.getEntities() != null) {
+        if (entities != null) {
             int         lineageCount       = 0;
             int         lineageInputsCount = 0;
             int         numRemovedEntities = 0;
             Set<String> lineageQNames      = new HashSet<>();
 
             // find if all hive_column_lineage entities have same number of inputs, which is likely to be caused by HIVE-20633 that results in incorrect lineage in some cases
-            for (ListIterator<AtlasEntity> iter = entities.getEntities().listIterator(); iter.hasNext(); ) {
+            for (ListIterator<AtlasEntity> iter = entities.listIterator(); iter.hasNext(); ) {
                 AtlasEntity entity = iter.next();
 
                 if (StringUtils.equals(entity.getTypeName(), TYPE_HIVE_COLUMN_LINEAGE)) {
@@ -713,7 +816,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
                         if (lineageQNames.contains(qualifiedName)) {
                             iter.remove();
 
-                            LOG.warn("removed duplicate hive_column_lineage entity: qualifiedName={}. topic-offset={}, partition={}", qualifiedName, kafkaMessage.getOffset(), kafkaMessage.getPartition());
+                            LOG.warn("removed duplicate hive_column_lineage entity: qualifiedName={}. topic-offset={}, partition={}", qualifiedName, context.getKafkaMessageOffset(), context.getKafkaPartition());
 
                             numRemovedEntities++;
 
@@ -738,7 +841,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
             float avgInputsCount = lineageCount > 0 ? (((float) lineageInputsCount) / lineageCount) : 0;
 
             if (avgInputsCount > skipHiveColumnLineageHive20633InputsThreshold) {
-                for (ListIterator<AtlasEntity> iter = entities.getEntities().listIterator(); iter.hasNext(); ) {
+                for (ListIterator<AtlasEntity> iter = entities.listIterator(); iter.hasNext(); ) {
                     AtlasEntity entity = iter.next();
 
                     if (StringUtils.equals(entity.getTypeName(), TYPE_HIVE_COLUMN_LINEAGE)) {
@@ -750,9 +853,36 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
             }
 
             if (numRemovedEntities > 0) {
-                LOG.warn("removed {} hive_column_lineage entities. Average # of inputs={}, threshold={}, total # of inputs={}. topic-offset={}, partition={}", numRemovedEntities, avgInputsCount, skipHiveColumnLineageHive20633InputsThreshold, lineageInputsCount, kafkaMessage.getOffset(), kafkaMessage.getPartition());
+                LOG.warn("removed {} hive_column_lineage entities. Average # of inputs={}, threshold={}, total # of inputs={}. topic-offset={}, partition={}", numRemovedEntities, avgInputsCount, skipHiveColumnLineageHive20633InputsThreshold, lineageInputsCount, context.getKafkaMessageOffset(), context.getKafkaPartition());
+            }
+        }
+    }
+
+    private boolean isEmptyMessage(AtlasKafkaMessage<HookNotificationMessage> kafkaMsg) {
+        final boolean                 ret;
+        final HookNotificationMessage message = kafkaMsg.getMessage();
+
+        switch (message.getType()) {
+            case ENTITY_CREATE_V2: {
+                AtlasEntitiesWithExtInfo entities = ((EntityCreateRequestV2) message).getEntities();
+
+                ret = entities == null || CollectionUtils.isEmpty(entities.getEntities());
             }
+            break;
+
+            case ENTITY_FULL_UPDATE_V2: {
+                AtlasEntitiesWithExtInfo entities = ((EntityUpdateRequestV2) message).getEntities();
+
+                ret = entities == null || CollectionUtils.isEmpty(entities.getEntities());
+            }
+            break;
+
+            default:
+                ret = false;
+            break;
         }
+
+        return ret;
     }
 
     private void audit(String messageUser, String method, String path) {

http://git-wip-us.apache.org/repos/asf/atlas/blob/a35379c1/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java
new file mode 100644
index 0000000..bdea14a
--- /dev/null
+++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.notification.preprocessor;
+
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
+import org.apache.atlas.model.instance.AtlasObjectId;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+
+public abstract class EntityPreprocessor {
+    public static final String TYPE_HIVE_COLUMN         = "hive_column";
+    public static final String TYPE_HIVE_COLUMN_LINEAGE = "hive_column_lineage";
+    public static final String TYPE_HIVE_PROCESS        = "hive_process";
+    public static final String TYPE_HIVE_STORAGEDESC    = "hive_storagedesc";
+    public static final String TYPE_HIVE_TABLE          = "hive_table";
+
+    public static final String ATTRIBUTE_COLUMNS        = "columns";
+    public static final String ATTRIBUTE_INPUTS         = "inputs";
+    public static final String ATTRIBUTE_OUTPUTS        = "outputs";
+    public static final String ATTRIBUTE_PARTITION_KEYS = "partitionKeys";
+    public static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName";
+    public static final String ATTRIBUTE_SD             = "sd";
+
+    public static final char   QNAME_SEP_CLUSTER_NAME = '@';
+    public static final char   QNAME_SEP_ENTITY_NAME  = '.';
+    public static final String QNAME_SD_SUFFIX        = "_storage";
+
+    private static final Map<String, EntityPreprocessor> PREPROCESSOR_MAP = new HashMap<>();
+
+    private final String typeName;
+
+
+    static {
+        EntityPreprocessor[] preprocessors = new EntityPreprocessor[] {
+                                                                    new HivePreprocessor.HiveTablePreprocessor(),
+                                                                    new HivePreprocessor.HiveColumnPreprocessor(),
+                                                                    new HivePreprocessor.HiveProcessPreprocessor(),
+                                                                    new HivePreprocessor.HiveColumnLineageProcessPreprocessor(),
+                                                                    new HivePreprocessor.HiveStorageDescPreprocessor()
+        };
+
+        for (EntityPreprocessor preprocessor : preprocessors) {
+            PREPROCESSOR_MAP.put(preprocessor.getTypeName(), preprocessor);
+        }
+    }
+
+    protected EntityPreprocessor(String typeName) {
+        this.typeName = typeName;
+    }
+
+    public String getTypeName() {
+        return typeName;
+    }
+
+    public abstract void preprocess(AtlasEntity entity, PreprocessorContext context);
+
+
+    public static EntityPreprocessor getPreprocessor(String typeName) {
+        return typeName != null ? PREPROCESSOR_MAP.get(typeName) : null;
+    }
+
+    public static String getQualifiedName(AtlasEntity entity) {
+        Object obj = entity != null ? entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME) : null;
+
+        return obj != null ? obj.toString() : null;
+    }
+
+    public String getTypeName(Object obj) {
+        Object ret = null;
+
+        if (obj instanceof AtlasObjectId) {
+            ret = ((AtlasObjectId) obj).getTypeName();
+        } else if (obj instanceof Map) {
+            ret = ((Map) obj).get(AtlasObjectId.KEY_TYPENAME);
+        } else if (obj instanceof AtlasEntity) {
+            ret = ((AtlasEntity) obj).getTypeName();
+        } else if (obj instanceof AtlasEntityWithExtInfo) {
+            ret = ((AtlasEntityWithExtInfo) obj).getEntity().getTypeName();
+        }
+
+        return ret != null ? ret.toString() : null;
+    }
+
+    public String getQualifiedName(Object obj) {
+        Map<String, Object> attributes = null;
+
+        if (obj instanceof AtlasObjectId) {
+            attributes = ((AtlasObjectId) obj).getUniqueAttributes();
+        } else if (obj instanceof Map) {
+            attributes = (Map) ((Map) obj).get(AtlasObjectId.KEY_UNIQUE_ATTRIBUTES);
+        } else if (obj instanceof AtlasEntity) {
+            attributes = ((AtlasEntity) obj).getAttributes();
+        } else if (obj instanceof AtlasEntityWithExtInfo) {
+            attributes = ((AtlasEntityWithExtInfo) obj).getEntity().getAttributes();
+        }
+
+        Object ret = attributes != null ? attributes.get(ATTRIBUTE_QUALIFIED_NAME) : null;
+
+        return ret != null ? ret.toString() : null;
+    }
+
+    protected boolean isEmpty(Object obj) {
+        return obj == null || ((obj instanceof Collection) && ((Collection) obj).isEmpty());
+    }
+}
+
+

http://git-wip-us.apache.org/repos/asf/atlas/blob/a35379c1/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java
new file mode 100644
index 0000000..d54c88d
--- /dev/null
+++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java
@@ -0,0 +1,253 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.notification.preprocessor;
+
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.notification.preprocessor.PreprocessorContext.PreprocessAction;
+import org.apache.commons.lang.StringUtils;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+public class HivePreprocessor {
+    static class HiveTablePreprocessor extends EntityPreprocessor {
+        public HiveTablePreprocessor() {
+            super(TYPE_HIVE_TABLE);
+        }
+
+        @Override
+        public void preprocess(AtlasEntity entity, PreprocessorContext context) {
+            if (context.isIgnoredEntity(entity.getGuid())) {
+                context.addToIgnoredEntities(entity); // so that this will be logged with typeName and qualifiedName
+            } else {
+                PreprocessAction action = context.getPreprocessActionForHiveTable(getQualifiedName(entity));
+
+                if (action == PreprocessAction.IGNORE) {
+                    context.addToIgnoredEntities(entity);
+
+                    context.addToIgnoredEntities(entity.getAttribute(ATTRIBUTE_SD));
+                    context.addToIgnoredEntities(entity.getAttribute(ATTRIBUTE_COLUMNS));
+                    context.addToIgnoredEntities(entity.getAttribute(ATTRIBUTE_PARTITION_KEYS));
+                } else if (action == PreprocessAction.PRUNE) {
+                    context.addToPrunedEntities(entity);
+
+                    context.addToIgnoredEntities(entity.getAttribute(ATTRIBUTE_SD));
+                    context.addToIgnoredEntities(entity.getAttribute(ATTRIBUTE_COLUMNS));
+                    context.addToIgnoredEntities(entity.getAttribute(ATTRIBUTE_PARTITION_KEYS));
+
+                    entity.setAttribute(ATTRIBUTE_SD, null);
+                    entity.setAttribute(ATTRIBUTE_COLUMNS, null);
+                    entity.setAttribute(ATTRIBUTE_PARTITION_KEYS, null);
+                }
+            }
+        }
+    }
+
+
+    static class HiveColumnPreprocessor extends EntityPreprocessor {
+        public HiveColumnPreprocessor() {
+            super(TYPE_HIVE_COLUMN);
+        }
+
+        @Override
+        public void preprocess(AtlasEntity entity, PreprocessorContext context) {
+            if (!context.isIgnoredEntity(entity.getGuid())) {
+                PreprocessAction action = context.getPreprocessActionForHiveTable(getHiveTableQualifiedName(getQualifiedName(entity)));
+
+                if (action == PreprocessAction.IGNORE || action == PreprocessAction.PRUNE) {
+                    context.addToIgnoredEntities(entity.getGuid());
+                }
+            }
+        }
+
+        public static String getHiveTableQualifiedName(String columnQualifiedName) {
+            String dbTableName = null;
+            String clusterName = null;
+
+            int sepPos = columnQualifiedName.lastIndexOf(QNAME_SEP_CLUSTER_NAME);
+
+            if (sepPos != -1 && columnQualifiedName.length() > (sepPos + 1)) {
+                clusterName = columnQualifiedName.substring(sepPos + 1);
+            }
+
+            sepPos = columnQualifiedName.lastIndexOf(QNAME_SEP_ENTITY_NAME);
+
+            if (sepPos != -1) {
+                dbTableName = columnQualifiedName.substring(0, sepPos);
+            }
+
+            return clusterName != null ? (dbTableName + QNAME_SEP_CLUSTER_NAME + clusterName) : dbTableName;
+        }
+    }
+
+
+    static class HiveStorageDescPreprocessor extends EntityPreprocessor {
+        public HiveStorageDescPreprocessor() {
+            super(TYPE_HIVE_STORAGEDESC);
+        }
+
+        @Override
+        public void preprocess(AtlasEntity entity, PreprocessorContext context) {
+            if (!context.isIgnoredEntity(entity.getGuid())) {
+                PreprocessAction action = context.getPreprocessActionForHiveTable(getHiveTableQualifiedName(getQualifiedName(entity)));
+
+                if (action == PreprocessAction.IGNORE || action == PreprocessAction.PRUNE) {
+                    context.addToIgnoredEntities(entity.getGuid());
+                }
+            }
+        }
+
+        public static String getHiveTableQualifiedName(String sdQualifiedName) {
+            int sepPos = sdQualifiedName.lastIndexOf(QNAME_SD_SUFFIX);
+
+            return sepPos != -1 ? sdQualifiedName.substring(0, sepPos) : sdQualifiedName;
+        }
+    }
+
+
+    static class HiveProcessPreprocessor extends EntityPreprocessor {
+        public HiveProcessPreprocessor() {
+            super(TYPE_HIVE_PROCESS);
+        }
+
+        public HiveProcessPreprocessor(String typeName) {
+            super(typeName);
+        }
+
+        @Override
+        public void preprocess(AtlasEntity entity, PreprocessorContext context) {
+            if (context.isIgnoredEntity(entity.getGuid())) {
+                context.addToIgnoredEntities(entity); // so that this will be logged with typeName and qualifiedName
+            } else {
+                Object inputs  = entity.getAttribute(ATTRIBUTE_INPUTS);
+                Object outputs = entity.getAttribute(ATTRIBUTE_OUTPUTS);
+
+                removeIgnoredObjectIds(inputs, context);
+                removeIgnoredObjectIds(outputs, context);
+
+                boolean isInputsEmpty  = isEmpty(inputs);
+                boolean isOutputsEmpty = isEmpty(outputs);
+
+                if (isInputsEmpty || isOutputsEmpty) {
+                    context.addToIgnoredEntities(entity);
+
+                    // since the process entity is ignored, entities referenced by inputs/outputs of this process entity
+                    // may not be processed by Atlas, if they are present in referredEntities. So, move them from
+                    // 'referredEntities' to 'entities'. However, this is not necessary for hive_column entities,
+                    // as these entities would be referenced by hive_table entities
+                    if (!StringUtils.equals(entity.getTypeName(), TYPE_HIVE_COLUMN_LINEAGE)) {
+                        if (!isInputsEmpty) {
+                            for (Object obj : (Collection) inputs) {
+                                String guid = context.getGuid(obj);
+
+                                context.addToReferredEntitiesToMove(guid);
+                            }
+                        } else if (!isOutputsEmpty) {
+                            for (Object obj : (Collection) outputs) {
+                                String guid = context.getGuid(obj);
+
+                                context.addToReferredEntitiesToMove(guid);
+                            }
+                        }
+                    }
+                }
+            }
+        }
+
+        private void removeIgnoredObjectIds(Object obj, PreprocessorContext context) {
+            if (obj == null || !(obj instanceof Collection)) {
+                return;
+            }
+
+            Collection   objList  = (Collection) obj;
+            List<Object> toRemove = null;
+
+            for (Object objElem : objList) {
+                boolean removeEntry = false;
+                String  guid        = context.getGuid(objElem);
+
+                if (guid != null) {
+                    removeEntry = context.isIgnoredEntity(guid);
+
+                    if (!removeEntry) { // perhaps entity hasn't been preprocessed yet
+                        AtlasEntity entity = context.getEntity(guid);
+
+                        if (entity != null) {
+                            switch (entity.getTypeName()) {
+                                case TYPE_HIVE_TABLE: {
+                                    PreprocessAction action = context.getPreprocessActionForHiveTable(getQualifiedName(entity));
+
+                                    removeEntry = (action == PreprocessAction.IGNORE);
+                                }
+                                break;
+
+                                case TYPE_HIVE_COLUMN: {
+                                    PreprocessAction action = context.getPreprocessActionForHiveTable(HiveColumnPreprocessor.getHiveTableQualifiedName(getQualifiedName(entity)));
+
+                                    // if the table is ignored or pruned, remove the column
+                                    removeEntry = (action == PreprocessAction.IGNORE || action == PreprocessAction.PRUNE);
+                                }
+                                break;
+                            }
+                        }
+                    }
+                } else {
+                    String typeName = getTypeName(objElem);
+
+                    if (typeName != null) {
+                        switch (typeName) {
+                            case TYPE_HIVE_TABLE: {
+                                PreprocessAction action = context.getPreprocessActionForHiveTable(getQualifiedName(objElem));
+
+                                removeEntry = (action == PreprocessAction.IGNORE);
+                            }
+                            break;
+
+                            case TYPE_HIVE_COLUMN: {
+                                PreprocessAction action = context.getPreprocessActionForHiveTable(HiveColumnPreprocessor.getHiveTableQualifiedName(getQualifiedName(objElem)));
+
+                                // if the table is ignored or pruned, remove the column
+                                removeEntry = (action == PreprocessAction.IGNORE || action == PreprocessAction.PRUNE);
+                            }
+                            break;
+                        }
+                    }
+                }
+
+                if (removeEntry) {
+                    if (toRemove == null) {
+                        toRemove = new ArrayList();
+                    }
+
+                    toRemove.add(objElem);
+                }
+            }
+
+            if (toRemove != null) {
+                objList.removeAll(toRemove);
+            }
+        }
+    }
+
+    static class HiveColumnLineageProcessPreprocessor extends HiveProcessPreprocessor {
+        public HiveColumnLineageProcessPreprocessor() {
+            super(TYPE_HIVE_COLUMN_LINEAGE);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/atlas/blob/a35379c1/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
new file mode 100644
index 0000000..8f62768
--- /dev/null
+++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.notification.preprocessor;
+
+import org.apache.atlas.kafka.AtlasKafkaMessage;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.notification.hook.HookNotification;
+import org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage;
+import org.apache.commons.collections.CollectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+
+public class PreprocessorContext {
+    private static final Logger LOG = LoggerFactory.getLogger(PreprocessorContext.class);
+
+    public enum PreprocessAction { NONE, IGNORE, PRUNE }
+
+    private final AtlasKafkaMessage<HookNotificationMessage> kafkaMessage;
+    private final AtlasEntitiesWithExtInfo                   entitiesWithExtInfo;
+    private final List<Pattern>                              hiveTablesToIgnore;
+    private final List<Pattern>                              hiveTablesToPrune;
+    private final Map<String, PreprocessAction>              hiveTablesCache;
+    private final Set<String>                                ignoredEntities        = new HashSet<>();
+    private final Set<String>                                prunedEntities         = new HashSet<>();
+    private final Set<String>                                referredEntitiesToMove = new HashSet<>();
+
+    public PreprocessorContext(AtlasKafkaMessage<HookNotificationMessage> kafkaMessage, List<Pattern> hiveTablesToIgnore, List<Pattern> hiveTablesToPrune, Map<String, PreprocessAction> hiveTablesCache) {
+        this.kafkaMessage       = kafkaMessage;
+        this.hiveTablesToIgnore = hiveTablesToIgnore;
+        this.hiveTablesToPrune  = hiveTablesToPrune;
+        this.hiveTablesCache    = hiveTablesCache;
+
+        final HookNotificationMessage  message = kafkaMessage.getMessage();
+
+        switch (message.getType()) {
+            case ENTITY_CREATE_V2:
+                entitiesWithExtInfo = ((HookNotification.EntityCreateRequestV2) message).getEntities();
+            break;
+
+            case ENTITY_FULL_UPDATE_V2:
+                entitiesWithExtInfo = ((HookNotification.EntityUpdateRequestV2) message).getEntities();
+            break;
+
+            default:
+                entitiesWithExtInfo = null;
+            break;
+        }
+    }
+
+    public AtlasKafkaMessage<HookNotificationMessage> getKafkaMessage() {
+        return kafkaMessage;
+    }
+
+    public long getKafkaMessageOffset() {
+        return kafkaMessage.getOffset();
+    }
+
+    public int getKafkaPartition() {
+        return kafkaMessage.getPartition();
+    }
+
+    public List<AtlasEntity> getEntities() {
+        return entitiesWithExtInfo != null ? entitiesWithExtInfo.getEntities() : null;
+    }
+
+    public Map<String, AtlasEntity> getReferredEntities() {
+        return entitiesWithExtInfo != null ? entitiesWithExtInfo.getReferredEntities() : null;
+    }
+
+    public AtlasEntity getEntity(String guid) {
+        return entitiesWithExtInfo != null && guid != null ? entitiesWithExtInfo.getEntity(guid) : null;
+    }
+
+    public Set<String> getIgnoredEntities() { return ignoredEntities; }
+
+    public Set<String> getPrunedEntities() { return prunedEntities; }
+
+    public Set<String> getReferredEntitiesToMove() { return referredEntitiesToMove; }
+
+    public PreprocessAction getPreprocessActionForHiveTable(String qualifiedName) {
+        PreprocessAction ret = PreprocessAction.NONE;
+
+        if (qualifiedName != null && (CollectionUtils.isNotEmpty(hiveTablesToIgnore) || CollectionUtils.isNotEmpty(hiveTablesToPrune))) {
+            ret = hiveTablesCache.get(qualifiedName);
+
+            if (ret == null) {
+                if (isMatch(qualifiedName, hiveTablesToIgnore)) {
+                    ret = PreprocessAction.IGNORE;
+                } else if (isMatch(qualifiedName, hiveTablesToPrune)) {
+                    ret = PreprocessAction.PRUNE;
+                } else {
+                    ret = PreprocessAction.NONE;
+                }
+
+                hiveTablesCache.put(qualifiedName, ret);
+            }
+        }
+
+        return ret;
+    }
+
+    public boolean isIgnoredEntity(String guid) {
+        return guid != null ? ignoredEntities.contains(guid) : false;
+    }
+
+    public boolean isPrunedEntity(String guid) {
+        return guid != null ? prunedEntities.contains(guid) : false;
+    }
+
+    public void addToIgnoredEntities(AtlasEntity entity) {
+        if (!ignoredEntities.contains(entity.getGuid())) {
+            ignoredEntities.add(entity.getGuid());
+
+            LOG.info("ignored entity: typeName={}, qualifiedName={}. topic-offset={}, partition={}", entity.getTypeName(), EntityPreprocessor.getQualifiedName(entity), getKafkaMessageOffset(), getKafkaPartition());
+        }
+    }
+
+    public void addToPrunedEntities(AtlasEntity entity) {
+        if (!prunedEntities.contains(entity.getGuid())) {
+            prunedEntities.add(entity.getGuid());
+
+            LOG.info("pruned entity: typeName={}, qualifiedName={} topic-offset={}, partition={}", entity.getTypeName(), EntityPreprocessor.getQualifiedName(entity), getKafkaMessageOffset(), getKafkaPartition());
+        }
+    }
+
+    public void addToIgnoredEntities(String guid) {
+        if (guid != null) {
+            ignoredEntities.add(guid);
+        }
+    }
+
+    public void addToPrunedEntities(String guid) {
+        if (guid != null) {
+            prunedEntities.add(guid);
+        }
+    }
+
+    public void addToReferredEntitiesToMove(String guid) {
+        if (guid != null) {
+            referredEntitiesToMove.add(guid);
+        }
+    }
+
+    public void addToIgnoredEntities(Object obj) {
+        collectGuids(obj, ignoredEntities);
+    }
+
+    public void addToPrunedEntities(Object obj) {
+        collectGuids(obj, prunedEntities);
+    }
+
+    public String getGuid(Object obj) {
+        Object ret = null;
+
+        if (obj instanceof AtlasObjectId) {
+            ret = ((AtlasObjectId) obj).getGuid();
+        } else if (obj instanceof Map) {
+            ret = ((Map) obj).get(AtlasObjectId.KEY_GUID);
+        } else if (obj instanceof AtlasEntity) {
+            ret = ((AtlasEntity) obj).getGuid();
+        } else if (obj instanceof AtlasEntity.AtlasEntityWithExtInfo) {
+            ret = ((AtlasEntity.AtlasEntityWithExtInfo) obj).getEntity().getGuid();
+        }
+
+        return ret != null ? ret.toString() : null;
+    }
+
+
+    private boolean isMatch(String name, List<Pattern> patterns) {
+        boolean ret = false;
+
+        for (Pattern p : patterns) {
+            if (p.matcher(name).matches()) {
+                ret = true;
+
+                break;
+            }
+        }
+
+        return ret;
+    }
+
+    private void collectGuids(Object obj, Set<String> guids) {
+        if (obj != null) {
+            if (obj instanceof Collection) {
+                Collection objList = (Collection) obj;
+
+                for (Object objElem : objList) {
+                    collectGuid(objElem, guids);
+                }
+            } else {
+                collectGuid(obj, guids);
+            }
+        }
+    }
+
+    private void collectGuid(Object obj, Set<String> guids) {
+        String guid = getGuid(obj);
+
+        if (guid != null) {
+            guids.add(guid);
+        }
+    }
+}