You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2019/03/09 01:30:53 UTC

[atlas] branch master updated: ATLAS-3067: updated hive types to remove use of ownedRef/inverseRef attributes

This is an automated email from the ASF dual-hosted git repository.

madhan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/master by this push:
     new 2471525  ATLAS-3067: updated hive types to remove use of ownedRef/inverseRef attributes
2471525 is described below

commit 24715256558bed6f89d4bec25c6eada0c9db2075
Author: Madhan Neethiraj <ma...@apache.org>
AuthorDate: Wed Mar 6 23:49:40 2019 -0800

    ATLAS-3067: updated hive types to remove use of ownedRef/inverseRef attributes
---
 .../patches/008-remove-hive-legacy-attributes.json | 50 ++++++++++++++++
 .../003-remove-rdbms-legacy-attributes.json        | 54 ++++++++---------
 .../bootstrap/AtlasTypeDefStoreInitializer.java    | 44 ++++++++++----
 .../graph/v2/AtlasRelationshipDefStoreV2.java      | 20 ++++---
 .../notification/NotificationHookConsumer.java     | 47 ++++++++-------
 .../preprocessor/EntityPreprocessor.java           |  2 +
 .../preprocessor/HivePreprocessor.java             | 67 ++++++++++++++++++++++
 .../preprocessor/PreprocessorContext.java          |  6 +-
 8 files changed, 223 insertions(+), 67 deletions(-)

diff --git a/addons/models/1000-Hadoop/patches/008-remove-hive-legacy-attributes.json b/addons/models/1000-Hadoop/patches/008-remove-hive-legacy-attributes.json
new file mode 100644
index 0000000..32a0876
--- /dev/null
+++ b/addons/models/1000-Hadoop/patches/008-remove-hive-legacy-attributes.json
@@ -0,0 +1,50 @@
+{
+  "patches": [
+    {
+      "action":          "REMOVE_LEGACY_REF_ATTRIBUTES",
+      "typeName":        "hive_table_db",
+      "applyToVersion":  "1.1",
+      "updateToVersion": "1.2",
+      "params": {
+        "relationshipLabel": "__hive_table.db"
+      }
+    },
+    {
+      "action":          "REMOVE_LEGACY_REF_ATTRIBUTES",
+      "typeName":        "hive_table_columns",
+      "applyToVersion":  "1.1",
+      "updateToVersion": "1.2",
+      "params": {
+        "relationshipLabel": "__hive_table.columns"
+      }
+    },
+    {
+      "action":          "REMOVE_LEGACY_REF_ATTRIBUTES",
+      "typeName":        "hive_table_partitionkeys",
+      "applyToVersion":  "1.1",
+      "updateToVersion": "1.2",
+      "params": {
+        "relationshipLabel": "__hive_table.partitionkeys"
+      }
+    },
+    {
+      "action":          "REMOVE_LEGACY_REF_ATTRIBUTES",
+      "typeName":        "hive_table_storagedesc",
+      "applyToVersion":  "1.1",
+      "updateToVersion": "1.2",
+      "params": {
+        "relationshipLabel": "__hive_table.sd"
+      }
+    },
+    {
+      "action":          "REMOVE_LEGACY_REF_ATTRIBUTES",
+      "typeName":        "hive_process_column_lineage",
+      "applyToVersion":  "1.1",
+      "updateToVersion": "1.2",
+      "params": {
+        "relationshipLabel": "__hive_column_lineage.query",
+        "swapEnds":          "true"
+      }
+    }
+  ]
+}
diff --git a/addons/models/2000-RDBMS/patches/003-remove-rdbms-legacy-attributes.json b/addons/models/2000-RDBMS/patches/003-remove-rdbms-legacy-attributes.json
index d087c66..5531bee 100644
--- a/addons/models/2000-RDBMS/patches/003-remove-rdbms-legacy-attributes.json
+++ b/addons/models/2000-RDBMS/patches/003-remove-rdbms-legacy-attributes.json
@@ -1,87 +1,87 @@
 {
   "patches": [
     {
-      "action":          "REMOVE_LEGACY_ATTRIBUTES",
+      "action":          "REMOVE_LEGACY_REF_ATTRIBUTES",
       "typeName":        "rdbms_instance_databases",
-      "applyToVersion":  "1.0",
-      "updateToVersion": "1.1",
+      "applyToVersion":  "1.1",
+      "updateToVersion": "1.2",
       "params": {
         "relationshipLabel":    "__rdbms_instance.databases",
         "relationshipCategory": "COMPOSITION"
       }
     },
     {
-      "action":          "REMOVE_LEGACY_ATTRIBUTES",
+      "action":          "REMOVE_LEGACY_REF_ATTRIBUTES",
       "typeName":        "rdbms_db_tables",
-      "applyToVersion":  "1.0",
-      "updateToVersion": "1.1",
+      "applyToVersion":  "1.1",
+      "updateToVersion": "1.2",
       "params": {
         "relationshipLabel":    "__rdbms_db.tables",
         "relationshipCategory": "COMPOSITION"
       }
     },
     {
-      "action":          "REMOVE_LEGACY_ATTRIBUTES",
+      "action":          "REMOVE_LEGACY_REF_ATTRIBUTES",
       "typeName":        "rdbms_table_columns",
-      "applyToVersion":  "1.0",
-      "updateToVersion": "1.1",
+      "applyToVersion":  "1.1",
+      "updateToVersion": "1.2",
       "params": {
         "relationshipLabel":    "__rdbms_table.columns",
         "relationshipCategory": "COMPOSITION"
       }
     },
     {
-      "action":          "REMOVE_LEGACY_ATTRIBUTES",
+      "action":          "REMOVE_LEGACY_REF_ATTRIBUTES",
       "typeName":        "rdbms_table_indexes",
-      "applyToVersion":  "1.0",
-      "updateToVersion": "1.1",
+      "applyToVersion":  "1.1",
+      "updateToVersion": "1.2",
       "params": {
         "relationshipLabel":    "__rdbms_table.indexes",
         "relationshipCategory": "COMPOSITION"
       }
     },
     {
-      "action":          "REMOVE_LEGACY_ATTRIBUTES",
+      "action":          "REMOVE_LEGACY_REF_ATTRIBUTES",
       "typeName":        "rdbms_table_foreign_key",
-      "applyToVersion":  "1.0",
-      "updateToVersion": "1.1",
+      "applyToVersion":  "1.1",
+      "updateToVersion": "1.2",
       "params": {
         "relationshipLabel":    "__rdbms_table.foreign_keys",
         "relationshipCategory": "COMPOSITION"
       }
     },
     {
-      "action":          "REMOVE_LEGACY_ATTRIBUTES",
+      "action":          "REMOVE_LEGACY_REF_ATTRIBUTES",
       "typeName":        "rdbms_index_columns",
-      "applyToVersion":  "1.0",
-      "updateToVersion": "1.1",
+      "applyToVersion":  "1.1",
+      "updateToVersion": "1.2",
       "params": {
         "relationshipLabel": "__rdbms_index.columns"
       }
     },
     {
-      "action":          "REMOVE_LEGACY_ATTRIBUTES",
+      "action":          "REMOVE_LEGACY_REF_ATTRIBUTES",
       "typeName":        "rdbms_foreign_key_key_columns",
-      "applyToVersion":  "1.0",
-      "updateToVersion": "1.1",
+      "applyToVersion":  "1.1",
+      "updateToVersion": "1.2",
       "params": {
         "relationshipLabel": "__rdbms_foreign_key.key_columns"
       }
     },
     {
-      "action":          "REMOVE_LEGACY_ATTRIBUTES",
+      "action":          "REMOVE_LEGACY_REF_ATTRIBUTES",
       "typeName":        "rdbms_foreign_key_table_references",
-      "applyToVersion":  "1.0",
-      "updateToVersion": "1.1",
+      "applyToVersion":  "1.1",
+      "updateToVersion": "1.2",
       "params": {
         "relationshipLabel": "__rdbms_foreign_key.references_table"
       }
     },
     {
-      "action":          "REMOVE_LEGACY_ATTRIBUTES",
+      "action":          "REMOVE_LEGACY_REF_ATTRIBUTES",
       "typeName":        "rdbms_foreign_key_column_references",
-      "applyToVersion":  "1.0",
-      "updateToVersion": "1.1",
+      "applyToVersion":  "1.1",
+      "updateToVersion": "1.2",
       "params": {
         "relationshipLabel": "__rdbms_foreign_key.references_columns"
       }
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java b/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java
index d12284e..66162aa 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java
@@ -78,9 +78,10 @@ import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_
 @Service
 public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
     private static final Logger LOG = LoggerFactory.getLogger(AtlasTypeDefStoreInitializer.class);
-    public static final String PATCHES_FOLDER_NAME   = "patches";
-    public static final String RELATIONSHIP_LABEL    = "relationshipLabel";
-    public static final String RELATIONSHIP_CATEGORY = "relationshipCategory";
+    public static final String PATCHES_FOLDER_NAME    = "patches";
+    public static final String RELATIONSHIP_LABEL     = "relationshipLabel";
+    public static final String RELATIONSHIP_CATEGORY  = "relationshipCategory";
+    public static final String RELATIONSHIP_SWAP_ENDS = "swapEnds";
 
     private final AtlasTypeDefStore atlasTypeDefStore;
     private final AtlasTypeRegistry atlasTypeRegistry;
@@ -414,7 +415,7 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
             PatchHandler[] patchHandlers = new PatchHandler[] {
                     new AddAttributePatchHandler(atlasTypeDefStore, atlasTypeRegistry),
                     new UpdateAttributePatchHandler(atlasTypeDefStore, atlasTypeRegistry),
-                    new RemoveLegacyAttributesPatchHandler(atlasTypeDefStore, atlasTypeRegistry),
+                    new RemoveLegacyRefAttributesPatchHandler(atlasTypeDefStore, atlasTypeRegistry),
                     new UpdateTypeDefOptionsPatchHandler(atlasTypeDefStore, atlasTypeRegistry),
                     new SetServiceTypePatchHandler(atlasTypeDefStore, atlasTypeRegistry)
             };
@@ -710,9 +711,9 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
         }
     }
 
-    class RemoveLegacyAttributesPatchHandler extends PatchHandler {
-        public RemoveLegacyAttributesPatchHandler(AtlasTypeDefStore typeDefStore, AtlasTypeRegistry typeRegistry) {
-            super(typeDefStore, typeRegistry, new String[] { "REMOVE_LEGACY_ATTRIBUTES" });
+    class RemoveLegacyRefAttributesPatchHandler extends PatchHandler {
+        public RemoveLegacyRefAttributesPatchHandler(AtlasTypeDefStore typeDefStore, AtlasTypeRegistry typeRegistry) {
+            super(typeDefStore, typeRegistry, new String[] { "REMOVE_LEGACY_REF_ATTRIBUTES" });
         }
 
         @Override
@@ -734,10 +735,12 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
 
                     String               newRelationshipLabel    = null;
                     RelationshipCategory newRelationshipCategory = null;
+                    boolean              swapEnds                = false;
 
                     if (patch.getParams() != null) {
                         Object relLabel    = patch.getParams().get(RELATIONSHIP_LABEL);
                         Object relCategory = patch.getParams().get(RELATIONSHIP_CATEGORY);
+                        Object relSwapEnds = patch.getParams().get(RELATIONSHIP_SWAP_ENDS);
 
                         if (relLabel != null) {
                             newRelationshipLabel = relLabel.toString();
@@ -746,6 +749,10 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
                         if (relCategory != null) {
                             newRelationshipCategory = RelationshipCategory.valueOf(relCategory.toString());
                         }
+
+                        if (relSwapEnds != null) {
+                            swapEnds = Boolean.valueOf(relSwapEnds.toString());
+                        }
                     }
 
                     if (StringUtils.isEmpty(newRelationshipLabel)) {
@@ -766,9 +773,19 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
                         }
                     }
 
-                    AtlasRelationshipDef updatedDef        = new AtlasRelationshipDef(relationshipDef);
-                    AtlasEntityDef       updatedEntityDef1 = new AtlasEntityDef(end1Type.getEntityDef());
-                    AtlasEntityDef       updatedEntityDef2 = new AtlasEntityDef(end2Type.getEntityDef());
+                    AtlasRelationshipDef updatedDef = new AtlasRelationshipDef(relationshipDef);
+
+                    if (swapEnds) {
+                        AtlasRelationshipEndDef tmp = updatedDef.getEndDef1();
+
+                        updatedDef.setEndDef1(updatedDef.getEndDef2());
+                        updatedDef.setEndDef2(tmp);
+                    }
+
+                    end1Def  = updatedDef.getEndDef1();
+                    end2Def  = updatedDef.getEndDef2();
+                    end1Type = typeRegistry.getEntityTypeByName(end1Def.getType());
+                    end2Type = typeRegistry.getEntityTypeByName(end2Def.getType());
 
                     updatedDef.setRelationshipLabel(newRelationshipLabel);
 
@@ -776,10 +793,13 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
                         updatedDef.setRelationshipCategory(newRelationshipCategory);
                     }
 
-                    updatedDef.getEndDef1().setIsLegacyAttribute(false);
-                    updatedDef.getEndDef2().setIsLegacyAttribute(false);
+                    end1Def.setIsLegacyAttribute(false);
+                    end2Def.setIsLegacyAttribute(false);
                     updatedDef.setTypeVersion(patch.getUpdateToVersion());
 
+                    AtlasEntityDef updatedEntityDef1 = new AtlasEntityDef(end1Type.getEntityDef());
+                    AtlasEntityDef updatedEntityDef2 = new AtlasEntityDef(end2Type.getEntityDef());
+
                     updatedEntityDef1.removeAttribute(end1Def.getName());
                     updatedEntityDef2.removeAttribute(end2Def.getName());
 
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipDefStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipDefStoreV2.java
index 35d0577..332c18a 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipDefStoreV2.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipDefStoreV2.java
@@ -436,19 +436,25 @@ public class AtlasRelationshipDefStoreV2 extends AtlasAbstractDefStoreV2<AtlasRe
         }
 
         AtlasRelationshipEndDef existingEnd1 = existingRelationshipDef.getEndDef1();
+        AtlasRelationshipEndDef existingEnd2 = existingRelationshipDef.getEndDef2();
         AtlasRelationshipEndDef newEnd1      = newRelationshipDef.getEndDef1();
+        AtlasRelationshipEndDef newEnd2      = newRelationshipDef.getEndDef2();
+        boolean                 endsSwaped   = false;
 
         if ( !isValidUpdate(existingEnd1, newEnd1) ) {
-            throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIPDEF_INVALID_END1_UPDATE,
-                                         newRelationshipDef.getName(), newEnd1.toString(), existingEnd1.toString());
+            if (RequestContext.get().isInTypePatching() && isValidUpdate(existingEnd1, newEnd2)) { // allow swap of ends during type-patch
+                endsSwaped = true;
+            } else {
+                throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIPDEF_INVALID_END1_UPDATE,
+                                             newRelationshipDef.getName(), newEnd1.toString(), existingEnd1.toString());
+            }
         }
 
-        AtlasRelationshipEndDef existingEnd2 = existingRelationshipDef.getEndDef2();
-        AtlasRelationshipEndDef newEnd2      = newRelationshipDef.getEndDef2();
+        AtlasRelationshipEndDef newEndToCompareWith = endsSwaped ? newEnd1 : newEnd2;
 
-        if ( !isValidUpdate(existingEnd2, newEnd2) ) {
+        if ( !isValidUpdate(existingEnd2, newEndToCompareWith) ) {
                 throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIPDEF_INVALID_END2_UPDATE,
-                                         newRelationshipDef.getName(), newEnd2.toString(), existingEnd2.toString());
+                                             newRelationshipDef.getName(), newEndToCompareWith.toString(), existingEnd2.toString());
         }
     }
 
@@ -520,7 +526,7 @@ public class AtlasRelationshipDefStoreV2 extends AtlasAbstractDefStoreV2<AtlasRe
     }
 
     private static boolean isValidUpdate(AtlasRelationshipEndDef currentDef, AtlasRelationshipEndDef updatedDef) {
-        // permit updates to description and isLegacyAttribute (ref type-patch REMOVE_LEGACY_ATTRIBUTES)
+        // permit updates to description and isLegacyAttribute (ref type-patch REMOVE_LEGACY_REF_ATTRIBUTES)
         return Objects.equals(currentDef.getType(), updatedDef.getType()) &&
                 Objects.equals(currentDef.getName(), updatedDef.getName()) &&
                 Objects.equals(currentDef.getIsContainer(), updatedDef.getIsContainer()) &&
diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 50559dd..d16d544 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -78,7 +78,6 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
-import java.util.ListIterator;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
@@ -87,6 +86,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.regex.Pattern;
 
+
 /**
  * Consumer of notifications from hooks e.g., hive hook etc.
  */
@@ -123,6 +123,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
     public static final String CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_PATTERN                 = "atlas.notification.consumer.preprocess.hive_table.ignore.pattern";
     public static final String CONSUMER_PREPROCESS_HIVE_TABLE_PRUNE_PATTERN                  = "atlas.notification.consumer.preprocess.hive_table.prune.pattern";
     public static final String CONSUMER_PREPROCESS_HIVE_TABLE_CACHE_SIZE                     = "atlas.notification.consumer.preprocess.hive_table.cache.size";
+    public static final String CONSUMER_PREPROCESS_HIVE_TYPES_REMOVE_OWNEDREF_ATTRS          = "atlas.notification.consumer.preprocess.hive_types.remove.ownedref.attrs";
     public static final String CONSUMER_PREPROCESS_RDBMS_TYPES_REMOVE_OWNEDREF_ATTRS         = "atlas.notification.consumer.preprocess.rdbms_types.remove.ownedref.attrs";
 
     public static final int SERVER_READY_WAIT_TIME_MS = 1000;
@@ -143,6 +144,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
     private final List<Pattern>                 hiveTablesToIgnore = new ArrayList<>();
     private final List<Pattern>                 hiveTablesToPrune  = new ArrayList<>();
     private final Map<String, PreprocessAction> hiveTablesCache;
+    private final boolean                       hiveTypesRemoveOwnedRefAttrs;
     private final boolean                       rdbmsTypesRemoveOwnedRefAttrs;
     private final boolean                       preprocessEnabled;
 
@@ -172,7 +174,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
         consumerRetryInterval = applicationProperties.getInt(CONSUMER_RETRY_INTERVAL, 500);
         minWaitDuration       = applicationProperties.getInt(CONSUMER_MIN_RETRY_INTERVAL, consumerRetryInterval); // 500 ms  by default
         maxWaitDuration       = applicationProperties.getInt(CONSUMER_MAX_RETRY_INTERVAL, minWaitDuration * 60);  //  30 sec by default
-        commitBatchSize       = applicationProperties.getInt(CONSUMER_COMMIT_BATCH_SIZE, 50);
+        commitBatchSize       = applicationProperties.getInt(CONSUMER_COMMIT_BATCH_SIZE, 0);
 
         skipHiveColumnLineageHive20633                = applicationProperties.getBoolean(CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, false);
         skipHiveColumnLineageHive20633InputsThreshold = applicationProperties.getInt(CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, 15); // skip if avg # of inputs is > 15
@@ -214,11 +216,16 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
             hiveTablesCache = Collections.emptyMap();
         }
 
-        rdbmsTypesRemoveOwnedRefAttrs = applicationProperties.getBoolean(CONSUMER_PREPROCESS_RDBMS_TYPES_REMOVE_OWNEDREF_ATTRS, true);
-        preprocessEnabled             = !hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() || skipHiveColumnLineageHive20633 || rdbmsTypesRemoveOwnedRefAttrs;
+        hiveTypesRemoveOwnedRefAttrs  = applicationProperties.getBoolean(CONSUMER_PREPROCESS_HIVE_TYPES_REMOVE_OWNEDREF_ATTRS, false);
+        rdbmsTypesRemoveOwnedRefAttrs = applicationProperties.getBoolean(CONSUMER_PREPROCESS_RDBMS_TYPES_REMOVE_OWNEDREF_ATTRS, false);
+        preprocessEnabled             = !hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() || skipHiveColumnLineageHive20633 || hiveTypesRemoveOwnedRefAttrs || rdbmsTypesRemoveOwnedRefAttrs;
 
         LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, skipHiveColumnLineageHive20633);
         LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, skipHiveColumnLineageHive20633InputsThreshold);
+        LOG.info("{}={}", CONSUMER_PREPROCESS_HIVE_TYPES_REMOVE_OWNEDREF_ATTRS, hiveTypesRemoveOwnedRefAttrs);
+        LOG.info("{}={}", CONSUMER_PREPROCESS_RDBMS_TYPES_REMOVE_OWNEDREF_ATTRS, rdbmsTypesRemoveOwnedRefAttrs);
+        LOG.info("{}={}", CONSUMER_COMMIT_BATCH_SIZE, commitBatchSize);
+        LOG.info("{}={}", CONSUMER_DISABLED, consumerDisabled);
     }
 
     @Override
@@ -694,7 +701,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
             List<AtlasEntity> entitiesList = entities.getEntities();
             AtlasEntityStream entityStream = new AtlasEntityStream(entities);
 
-            if (entitiesList.size() <= commitBatchSize) {
+            if (commitBatchSize <= 0 || entitiesList.size() <= commitBatchSize) {
                 atlasEntityStore.createOrUpdate(entityStream, isPartialUpdate);
             } else {
                 for (int fromIdx = 0; fromIdx < entitiesList.size(); fromIdx += commitBatchSize) {
@@ -792,10 +799,10 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
             return;
         }
 
-        PreprocessorContext context = new PreprocessorContext(kafkaMsg, hiveTablesToIgnore, hiveTablesToPrune, hiveTablesCache, rdbmsTypesRemoveOwnedRefAttrs);
+        PreprocessorContext context = new PreprocessorContext(kafkaMsg, hiveTablesToIgnore, hiveTablesToPrune, hiveTablesCache, hiveTypesRemoveOwnedRefAttrs, rdbmsTypesRemoveOwnedRefAttrs);
 
-        if (!hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty()) {
-            ignoreOrPruneHiveTables(context);
+        if (!hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() || hiveTypesRemoveOwnedRefAttrs) {
+            preprocessHiveTypes(context);
         }
 
         if (skipHiveColumnLineageHive20633) {
@@ -813,8 +820,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
         List<AtlasEntity> entities = context.getEntities();
 
         if (entities != null) {
-            for (ListIterator<AtlasEntity> iter = entities.listIterator(); iter.hasNext(); ) {
-                AtlasEntity        entity       = iter.next();
+            for (int i = 0; i < entities.size(); i++) {
+                AtlasEntity        entity       = entities.get(i);
                 EntityPreprocessor preprocessor = EntityPreprocessor.getRdbmsPreprocessor(entity.getTypeName());
 
                 if (preprocessor != null) {
@@ -824,19 +831,19 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
         }
     }
 
-    private void ignoreOrPruneHiveTables(PreprocessorContext context) {
+    private void preprocessHiveTypes(PreprocessorContext context) {
         List<AtlasEntity> entities = context.getEntities();
 
         if (entities != null) {
-            for (ListIterator<AtlasEntity> iter = entities.listIterator(); iter.hasNext(); ) {
-                AtlasEntity        entity       = iter.next();
+            for (int i = 0; i < entities.size(); i++) {
+                AtlasEntity        entity       = entities.get(i);
                 EntityPreprocessor preprocessor = EntityPreprocessor.getHivePreprocessor(entity.getTypeName());
 
                 if (preprocessor != null) {
                     preprocessor.preprocess(entity, context);
 
                     if (context.isIgnoredEntity(entity.getGuid())) {
-                        iter.remove();
+                        entities.remove(i--);
                     }
                 }
             }
@@ -877,8 +884,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
             Set<String> lineageQNames      = new HashSet<>();
 
             // find if all hive_column_lineage entities have same number of inputs, which is likely to be caused by HIVE-20633 that results in incorrect lineage in some cases
-            for (ListIterator<AtlasEntity> iter = entities.listIterator(); iter.hasNext(); ) {
-                AtlasEntity entity = iter.next();
+            for (int i = 0; i < entities.size(); i++) {
+                AtlasEntity entity = entities.get(i);
 
                 if (StringUtils.equals(entity.getTypeName(), TYPE_HIVE_COLUMN_LINEAGE)) {
                     final Object qName = entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME);
@@ -887,7 +894,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
                         final String qualifiedName = qName.toString();
 
                         if (lineageQNames.contains(qualifiedName)) {
-                            iter.remove();
+                            entities.remove(i--);
 
                             LOG.warn("removed duplicate hive_column_lineage entity: qualifiedName={}. topic-offset={}, partition={}", qualifiedName, lineageInputsCount, context.getKafkaMessageOffset(), context.getKafkaPartition());
 
@@ -914,11 +921,11 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
             float avgInputsCount = lineageCount > 0 ? (((float) lineageInputsCount) / lineageCount) : 0;
 
             if (avgInputsCount > skipHiveColumnLineageHive20633InputsThreshold) {
-                for (ListIterator<AtlasEntity> iter = entities.listIterator(); iter.hasNext(); ) {
-                    AtlasEntity entity = iter.next();
+                for (int i = 0; i < entities.size(); i++) {
+                    AtlasEntity entity = entities.get(i);
 
                     if (StringUtils.equals(entity.getTypeName(), TYPE_HIVE_COLUMN_LINEAGE)) {
-                        iter.remove();
+                        entities.remove(i--);
 
                         numRemovedEntities++;
                     }
diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java
index 7eba27a..085e746 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java
@@ -48,6 +48,8 @@ public abstract class EntityPreprocessor {
     public static final String ATTRIBUTE_SD             = "sd";
     public static final String ATTRIBUTE_DB             = "db";
     public static final String ATTRIBUTE_DATABASES      = "databases";
+    public static final String ATTRIBUTE_QUERY          = "query";
+    public static final String ATTRIBUTE_TABLE          = "table";
     public static final String ATTRIBUTE_TABLES         = "tables";
     public static final String ATTRIBUTE_INDEXES        = "indexes";
     public static final String ATTRIBUTE_FOREIGN_KEYS   = "foreign_keys";
diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java
index d54c88d..ff9c9cb 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java
@@ -18,14 +18,26 @@
 package org.apache.atlas.notification.preprocessor;
 
 import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.model.instance.AtlasRelatedObjectId;
 import org.apache.atlas.notification.preprocessor.PreprocessorContext.PreprocessAction;
 import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 public class HivePreprocessor {
+    private static final Logger LOG = LoggerFactory.getLogger(HivePreprocessor.class);
+
+    private static final String RELATIONSHIP_TYPE_HIVE_TABLE_COLUMNS        = "hive_table_columns";
+    private static final String RELATIONSHIP_TYPE_HIVE_TABLE_PARTITION_KEYS = "hive_table_partitionKeys";
+
     static class HiveTablePreprocessor extends EntityPreprocessor {
         public HiveTablePreprocessor() {
             super(TYPE_HIVE_TABLE);
@@ -54,9 +66,64 @@ public class HivePreprocessor {
                     entity.setAttribute(ATTRIBUTE_SD, null);
                     entity.setAttribute(ATTRIBUTE_COLUMNS, null);
                     entity.setAttribute(ATTRIBUTE_PARTITION_KEYS, null);
+                } else if (context.getHiveTypesRemoveOwnedRefAttrs()) {
+                    context.removeRefAttributeAndRegisterToMove(entity, ATTRIBUTE_SD);
+
+                    removeColumnsAttributeAndRegisterToMove(entity, ATTRIBUTE_COLUMNS, RELATIONSHIP_TYPE_HIVE_TABLE_COLUMNS, context);
+                    removeColumnsAttributeAndRegisterToMove(entity, ATTRIBUTE_PARTITION_KEYS, RELATIONSHIP_TYPE_HIVE_TABLE_PARTITION_KEYS, context);
                 }
             }
         }
+
+        private void removeColumnsAttributeAndRegisterToMove(AtlasEntity entity, String attrName, String relationshipType, PreprocessorContext context) {
+            Object attrVal = entity.getAttribute(attrName);
+
+            if (attrVal != null) {
+                Set<String> guids = new HashSet<>();
+
+                context.collectGuids(attrVal, guids);
+
+                for (String guid : guids) {
+                    AtlasEntity colEntity = context.getEntity(guid);
+
+                    if (colEntity != null) {
+                        Object attrTable = null;
+
+                        if (colEntity.hasRelationshipAttribute(ATTRIBUTE_TABLE)) {
+                            attrTable = colEntity.getRelationshipAttribute(ATTRIBUTE_TABLE);
+                        } else if (colEntity.hasAttribute(ATTRIBUTE_TABLE)) {
+                            attrTable = colEntity.getAttribute(ATTRIBUTE_TABLE);
+                        }
+
+                        attrTable = setRelationshipType(attrTable, relationshipType);
+
+                        if (attrTable != null) {
+                            colEntity.setRelationshipAttribute(ATTRIBUTE_TABLE, attrTable);
+                        }
+
+                        context.addToReferredEntitiesToMove(guid);
+                    }
+                }
+            }
+        }
+
+        private AtlasRelatedObjectId setRelationshipType(Object attr, String relationshipType) {
+            AtlasRelatedObjectId ret = null;
+
+            if (attr instanceof AtlasRelatedObjectId) {
+                ret = (AtlasRelatedObjectId) attr;
+            } else if (attr instanceof AtlasObjectId) {
+                ret = new AtlasRelatedObjectId((AtlasObjectId) attr);
+            } else if (attr instanceof Map) {
+                ret = new AtlasRelatedObjectId((Map) attr);
+            }
+
+            if (ret != null) {
+                ret.setRelationshipType(relationshipType);
+            }
+
+            return ret;
+        }
     }
 
 
diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
index 0f95fba..94e0993 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
@@ -44,16 +44,18 @@ public class PreprocessorContext {
     private final List<Pattern>                       hiveTablesToIgnore;
     private final List<Pattern>                       hiveTablesToPrune;
     private final Map<String, PreprocessAction>       hiveTablesCache;
+    private final boolean                             hiveTypesRemoveOwnedRefAttrs;
     private final boolean                             rdbmsTypesRemoveOwnedRefAttrs;
     private final Set<String>                         ignoredEntities        = new HashSet<>();
     private final Set<String>                         prunedEntities         = new HashSet<>();
     private final Set<String>                         referredEntitiesToMove = new HashSet<>();
 
-    public PreprocessorContext(AtlasKafkaMessage<HookNotification> kafkaMessage, List<Pattern> hiveTablesToIgnore, List<Pattern> hiveTablesToPrune, Map<String, PreprocessAction> hiveTablesCache, boolean rdbmsTypesRemoveOwnedRefAttrs) {
+    public PreprocessorContext(AtlasKafkaMessage<HookNotification> kafkaMessage, List<Pattern> hiveTablesToIgnore, List<Pattern> hiveTablesToPrune, Map<String, PreprocessAction> hiveTablesCache, boolean hiveTypesRemoveOwnedRefAttrs, boolean rdbmsTypesRemoveOwnedRefAttrs) {
         this.kafkaMessage                  = kafkaMessage;
         this.hiveTablesToIgnore            = hiveTablesToIgnore;
         this.hiveTablesToPrune             = hiveTablesToPrune;
         this.hiveTablesCache               = hiveTablesCache;
+        this.hiveTypesRemoveOwnedRefAttrs  = hiveTypesRemoveOwnedRefAttrs;
         this.rdbmsTypesRemoveOwnedRefAttrs = rdbmsTypesRemoveOwnedRefAttrs;
 
         final HookNotification  message = kafkaMessage.getMessage();
@@ -85,6 +87,8 @@ public class PreprocessorContext {
         return kafkaMessage.getPartition();
     }
 
+    public boolean getHiveTypesRemoveOwnedRefAttrs() { return hiveTypesRemoveOwnedRefAttrs; }
+
     public boolean getRdbmsTypesRemoveOwnedRefAttrs() { return rdbmsTypesRemoveOwnedRefAttrs; }
 
     public List<AtlasEntity> getEntities() {