You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2019/03/09 01:30:53 UTC
[atlas] branch master updated: ATLAS-3067: updated hive types to
remove use of ownedRef/inverseRef attributes
This is an automated email from the ASF dual-hosted git repository.
madhan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push:
new 2471525 ATLAS-3067: updated hive types to remove use of ownedRef/inverseRef attributes
2471525 is described below
commit 24715256558bed6f89d4bec25c6eada0c9db2075
Author: Madhan Neethiraj <ma...@apache.org>
AuthorDate: Wed Mar 6 23:49:40 2019 -0800
ATLAS-3067: updated hive types to remove use of ownedRef/inverseRef attributes
---
.../patches/008-remove-hive-legacy-attributes.json | 50 ++++++++++++++++
.../003-remove-rdbms-legacy-attributes.json | 54 ++++++++---------
.../bootstrap/AtlasTypeDefStoreInitializer.java | 44 ++++++++++----
.../graph/v2/AtlasRelationshipDefStoreV2.java | 20 ++++---
.../notification/NotificationHookConsumer.java | 47 ++++++++-------
.../preprocessor/EntityPreprocessor.java | 2 +
.../preprocessor/HivePreprocessor.java | 67 ++++++++++++++++++++++
.../preprocessor/PreprocessorContext.java | 6 +-
8 files changed, 223 insertions(+), 67 deletions(-)
diff --git a/addons/models/1000-Hadoop/patches/008-remove-hive-legacy-attributes.json b/addons/models/1000-Hadoop/patches/008-remove-hive-legacy-attributes.json
new file mode 100644
index 0000000..32a0876
--- /dev/null
+++ b/addons/models/1000-Hadoop/patches/008-remove-hive-legacy-attributes.json
@@ -0,0 +1,50 @@
+{
+ "patches": [
+ {
+ "action": "REMOVE_LEGACY_REF_ATTRIBUTES",
+ "typeName": "hive_table_db",
+ "applyToVersion": "1.1",
+ "updateToVersion": "1.2",
+ "params": {
+ "relationshipLabel": "__hive_table.db"
+ }
+ },
+ {
+ "action": "REMOVE_LEGACY_REF_ATTRIBUTES",
+ "typeName": "hive_table_columns",
+ "applyToVersion": "1.1",
+ "updateToVersion": "1.2",
+ "params": {
+ "relationshipLabel": "__hive_table.columns"
+ }
+ },
+ {
+ "action": "REMOVE_LEGACY_REF_ATTRIBUTES",
+ "typeName": "hive_table_partitionkeys",
+ "applyToVersion": "1.1",
+ "updateToVersion": "1.2",
+ "params": {
+ "relationshipLabel": "__hive_table.partitionkeys"
+ }
+ },
+ {
+ "action": "REMOVE_LEGACY_REF_ATTRIBUTES",
+ "typeName": "hive_table_storagedesc",
+ "applyToVersion": "1.1",
+ "updateToVersion": "1.2",
+ "params": {
+ "relationshipLabel": "__hive_table.sd"
+ }
+ },
+ {
+ "action": "REMOVE_LEGACY_REF_ATTRIBUTES",
+ "typeName": "hive_process_column_lineage",
+ "applyToVersion": "1.1",
+ "updateToVersion": "1.2",
+ "params": {
+ "relationshipLabel": "__hive_column_lineage.query",
+ "swapEnds": "true"
+ }
+ }
+ ]
+}
diff --git a/addons/models/2000-RDBMS/patches/003-remove-rdbms-legacy-attributes.json b/addons/models/2000-RDBMS/patches/003-remove-rdbms-legacy-attributes.json
index d087c66..5531bee 100644
--- a/addons/models/2000-RDBMS/patches/003-remove-rdbms-legacy-attributes.json
+++ b/addons/models/2000-RDBMS/patches/003-remove-rdbms-legacy-attributes.json
@@ -1,87 +1,87 @@
{
"patches": [
{
- "action": "REMOVE_LEGACY_ATTRIBUTES",
+ "action": "REMOVE_LEGACY_REF_ATTRIBUTES",
"typeName": "rdbms_instance_databases",
- "applyToVersion": "1.0",
- "updateToVersion": "1.1",
+ "applyToVersion": "1.1",
+ "updateToVersion": "1.2",
"params": {
"relationshipLabel": "__rdbms_instance.databases",
"relationshipCategory": "COMPOSITION"
}
},
{
- "action": "REMOVE_LEGACY_ATTRIBUTES",
+ "action": "REMOVE_LEGACY_REF_ATTRIBUTES",
"typeName": "rdbms_db_tables",
- "applyToVersion": "1.0",
- "updateToVersion": "1.1",
+ "applyToVersion": "1.1",
+ "updateToVersion": "1.2",
"params": {
"relationshipLabel": "__rdbms_db.tables",
"relationshipCategory": "COMPOSITION"
}
},
{
- "action": "REMOVE_LEGACY_ATTRIBUTES",
+ "action": "REMOVE_LEGACY_REF_ATTRIBUTES",
"typeName": "rdbms_table_columns",
- "applyToVersion": "1.0",
- "updateToVersion": "1.1",
+ "applyToVersion": "1.1",
+ "updateToVersion": "1.2",
"params": {
"relationshipLabel": "__rdbms_table.columns",
"relationshipCategory": "COMPOSITION"
}
},
{
- "action": "REMOVE_LEGACY_ATTRIBUTES",
+ "action": "REMOVE_LEGACY_REF_ATTRIBUTES",
"typeName": "rdbms_table_indexes",
- "applyToVersion": "1.0",
- "updateToVersion": "1.1",
+ "applyToVersion": "1.1",
+ "updateToVersion": "1.2",
"params": {
"relationshipLabel": "__rdbms_table.indexes",
"relationshipCategory": "COMPOSITION"
}
},
{
- "action": "REMOVE_LEGACY_ATTRIBUTES",
+ "action": "REMOVE_LEGACY_REF_ATTRIBUTES",
"typeName": "rdbms_table_foreign_key",
- "applyToVersion": "1.0",
- "updateToVersion": "1.1",
+ "applyToVersion": "1.1",
+ "updateToVersion": "1.2",
"params": {
"relationshipLabel": "__rdbms_table.foreign_keys",
"relationshipCategory": "COMPOSITION"
}
},
{
- "action": "REMOVE_LEGACY_ATTRIBUTES",
+ "action": "REMOVE_LEGACY_REF_ATTRIBUTES",
"typeName": "rdbms_index_columns",
- "applyToVersion": "1.0",
- "updateToVersion": "1.1",
+ "applyToVersion": "1.1",
+ "updateToVersion": "1.2",
"params": {
"relationshipLabel": "__rdbms_index.columns"
}
},
{
- "action": "REMOVE_LEGACY_ATTRIBUTES",
+ "action": "REMOVE_LEGACY_REF_ATTRIBUTES",
"typeName": "rdbms_foreign_key_key_columns",
- "applyToVersion": "1.0",
- "updateToVersion": "1.1",
+ "applyToVersion": "1.1",
+ "updateToVersion": "1.2",
"params": {
"relationshipLabel": "__rdbms_foreign_key.key_columns"
}
},
{
- "action": "REMOVE_LEGACY_ATTRIBUTES",
+ "action": "REMOVE_LEGACY_REF_ATTRIBUTES",
"typeName": "rdbms_foreign_key_table_references",
- "applyToVersion": "1.0",
- "updateToVersion": "1.1",
+ "applyToVersion": "1.1",
+ "updateToVersion": "1.2",
"params": {
"relationshipLabel": "__rdbms_foreign_key.references_table"
}
},
{
- "action": "REMOVE_LEGACY_ATTRIBUTES",
+ "action": "REMOVE_LEGACY_REF_ATTRIBUTES",
"typeName": "rdbms_foreign_key_column_references",
- "applyToVersion": "1.0",
- "updateToVersion": "1.1",
+ "applyToVersion": "1.1",
+ "updateToVersion": "1.2",
"params": {
"relationshipLabel": "__rdbms_foreign_key.references_columns"
}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java b/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java
index d12284e..66162aa 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java
@@ -78,9 +78,10 @@ import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_
@Service
public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
private static final Logger LOG = LoggerFactory.getLogger(AtlasTypeDefStoreInitializer.class);
- public static final String PATCHES_FOLDER_NAME = "patches";
- public static final String RELATIONSHIP_LABEL = "relationshipLabel";
- public static final String RELATIONSHIP_CATEGORY = "relationshipCategory";
+ public static final String PATCHES_FOLDER_NAME = "patches";
+ public static final String RELATIONSHIP_LABEL = "relationshipLabel";
+ public static final String RELATIONSHIP_CATEGORY = "relationshipCategory";
+ public static final String RELATIONSHIP_SWAP_ENDS = "swapEnds";
private final AtlasTypeDefStore atlasTypeDefStore;
private final AtlasTypeRegistry atlasTypeRegistry;
@@ -414,7 +415,7 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
PatchHandler[] patchHandlers = new PatchHandler[] {
new AddAttributePatchHandler(atlasTypeDefStore, atlasTypeRegistry),
new UpdateAttributePatchHandler(atlasTypeDefStore, atlasTypeRegistry),
- new RemoveLegacyAttributesPatchHandler(atlasTypeDefStore, atlasTypeRegistry),
+ new RemoveLegacyRefAttributesPatchHandler(atlasTypeDefStore, atlasTypeRegistry),
new UpdateTypeDefOptionsPatchHandler(atlasTypeDefStore, atlasTypeRegistry),
new SetServiceTypePatchHandler(atlasTypeDefStore, atlasTypeRegistry)
};
@@ -710,9 +711,9 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
}
}
- class RemoveLegacyAttributesPatchHandler extends PatchHandler {
- public RemoveLegacyAttributesPatchHandler(AtlasTypeDefStore typeDefStore, AtlasTypeRegistry typeRegistry) {
- super(typeDefStore, typeRegistry, new String[] { "REMOVE_LEGACY_ATTRIBUTES" });
+ class RemoveLegacyRefAttributesPatchHandler extends PatchHandler {
+ public RemoveLegacyRefAttributesPatchHandler(AtlasTypeDefStore typeDefStore, AtlasTypeRegistry typeRegistry) {
+ super(typeDefStore, typeRegistry, new String[] { "REMOVE_LEGACY_REF_ATTRIBUTES" });
}
@Override
@@ -734,10 +735,12 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
String newRelationshipLabel = null;
RelationshipCategory newRelationshipCategory = null;
+ boolean swapEnds = false;
if (patch.getParams() != null) {
Object relLabel = patch.getParams().get(RELATIONSHIP_LABEL);
Object relCategory = patch.getParams().get(RELATIONSHIP_CATEGORY);
+ Object relSwapEnds = patch.getParams().get(RELATIONSHIP_SWAP_ENDS);
if (relLabel != null) {
newRelationshipLabel = relLabel.toString();
@@ -746,6 +749,10 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
if (relCategory != null) {
newRelationshipCategory = RelationshipCategory.valueOf(relCategory.toString());
}
+
+ if (relSwapEnds != null) {
+ swapEnds = Boolean.valueOf(relSwapEnds.toString());
+ }
}
if (StringUtils.isEmpty(newRelationshipLabel)) {
@@ -766,9 +773,19 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
}
}
- AtlasRelationshipDef updatedDef = new AtlasRelationshipDef(relationshipDef);
- AtlasEntityDef updatedEntityDef1 = new AtlasEntityDef(end1Type.getEntityDef());
- AtlasEntityDef updatedEntityDef2 = new AtlasEntityDef(end2Type.getEntityDef());
+ AtlasRelationshipDef updatedDef = new AtlasRelationshipDef(relationshipDef);
+
+ if (swapEnds) {
+ AtlasRelationshipEndDef tmp = updatedDef.getEndDef1();
+
+ updatedDef.setEndDef1(updatedDef.getEndDef2());
+ updatedDef.setEndDef2(tmp);
+ }
+
+ end1Def = updatedDef.getEndDef1();
+ end2Def = updatedDef.getEndDef2();
+ end1Type = typeRegistry.getEntityTypeByName(end1Def.getType());
+ end2Type = typeRegistry.getEntityTypeByName(end2Def.getType());
updatedDef.setRelationshipLabel(newRelationshipLabel);
@@ -776,10 +793,13 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
updatedDef.setRelationshipCategory(newRelationshipCategory);
}
- updatedDef.getEndDef1().setIsLegacyAttribute(false);
- updatedDef.getEndDef2().setIsLegacyAttribute(false);
+ end1Def.setIsLegacyAttribute(false);
+ end2Def.setIsLegacyAttribute(false);
updatedDef.setTypeVersion(patch.getUpdateToVersion());
+ AtlasEntityDef updatedEntityDef1 = new AtlasEntityDef(end1Type.getEntityDef());
+ AtlasEntityDef updatedEntityDef2 = new AtlasEntityDef(end2Type.getEntityDef());
+
updatedEntityDef1.removeAttribute(end1Def.getName());
updatedEntityDef2.removeAttribute(end2Def.getName());
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipDefStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipDefStoreV2.java
index 35d0577..332c18a 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipDefStoreV2.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipDefStoreV2.java
@@ -436,19 +436,25 @@ public class AtlasRelationshipDefStoreV2 extends AtlasAbstractDefStoreV2<AtlasRe
}
AtlasRelationshipEndDef existingEnd1 = existingRelationshipDef.getEndDef1();
+ AtlasRelationshipEndDef existingEnd2 = existingRelationshipDef.getEndDef2();
AtlasRelationshipEndDef newEnd1 = newRelationshipDef.getEndDef1();
+ AtlasRelationshipEndDef newEnd2 = newRelationshipDef.getEndDef2();
+ boolean endsSwaped = false;
if ( !isValidUpdate(existingEnd1, newEnd1) ) {
- throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIPDEF_INVALID_END1_UPDATE,
- newRelationshipDef.getName(), newEnd1.toString(), existingEnd1.toString());
+ if (RequestContext.get().isInTypePatching() && isValidUpdate(existingEnd1, newEnd2)) { // allow swap of ends during type-patch
+ endsSwaped = true;
+ } else {
+ throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIPDEF_INVALID_END1_UPDATE,
+ newRelationshipDef.getName(), newEnd1.toString(), existingEnd1.toString());
+ }
}
- AtlasRelationshipEndDef existingEnd2 = existingRelationshipDef.getEndDef2();
- AtlasRelationshipEndDef newEnd2 = newRelationshipDef.getEndDef2();
+ AtlasRelationshipEndDef newEndToCompareWith = endsSwaped ? newEnd1 : newEnd2;
- if ( !isValidUpdate(existingEnd2, newEnd2) ) {
+ if ( !isValidUpdate(existingEnd2, newEndToCompareWith) ) {
throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIPDEF_INVALID_END2_UPDATE,
- newRelationshipDef.getName(), newEnd2.toString(), existingEnd2.toString());
+ newRelationshipDef.getName(), newEndToCompareWith.toString(), existingEnd2.toString());
}
}
@@ -520,7 +526,7 @@ public class AtlasRelationshipDefStoreV2 extends AtlasAbstractDefStoreV2<AtlasRe
}
private static boolean isValidUpdate(AtlasRelationshipEndDef currentDef, AtlasRelationshipEndDef updatedDef) {
- // permit updates to description and isLegacyAttribute (ref type-patch REMOVE_LEGACY_ATTRIBUTES)
+ // permit updates to description and isLegacyAttribute (ref type-patch REMOVE_LEGACY_REF_ATTRIBUTES)
return Objects.equals(currentDef.getType(), updatedDef.getType()) &&
Objects.equals(currentDef.getName(), updatedDef.getName()) &&
Objects.equals(currentDef.getIsContainer(), updatedDef.getIsContainer()) &&
diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 50559dd..d16d544 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -78,7 +78,6 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
-import java.util.ListIterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
@@ -87,6 +86,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
+
/**
* Consumer of notifications from hooks e.g., hive hook etc.
*/
@@ -123,6 +123,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
public static final String CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_PATTERN = "atlas.notification.consumer.preprocess.hive_table.ignore.pattern";
public static final String CONSUMER_PREPROCESS_HIVE_TABLE_PRUNE_PATTERN = "atlas.notification.consumer.preprocess.hive_table.prune.pattern";
public static final String CONSUMER_PREPROCESS_HIVE_TABLE_CACHE_SIZE = "atlas.notification.consumer.preprocess.hive_table.cache.size";
+ public static final String CONSUMER_PREPROCESS_HIVE_TYPES_REMOVE_OWNEDREF_ATTRS = "atlas.notification.consumer.preprocess.hive_types.remove.ownedref.attrs";
public static final String CONSUMER_PREPROCESS_RDBMS_TYPES_REMOVE_OWNEDREF_ATTRS = "atlas.notification.consumer.preprocess.rdbms_types.remove.ownedref.attrs";
public static final int SERVER_READY_WAIT_TIME_MS = 1000;
@@ -143,6 +144,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
private final List<Pattern> hiveTablesToIgnore = new ArrayList<>();
private final List<Pattern> hiveTablesToPrune = new ArrayList<>();
private final Map<String, PreprocessAction> hiveTablesCache;
+ private final boolean hiveTypesRemoveOwnedRefAttrs;
private final boolean rdbmsTypesRemoveOwnedRefAttrs;
private final boolean preprocessEnabled;
@@ -172,7 +174,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
consumerRetryInterval = applicationProperties.getInt(CONSUMER_RETRY_INTERVAL, 500);
minWaitDuration = applicationProperties.getInt(CONSUMER_MIN_RETRY_INTERVAL, consumerRetryInterval); // 500 ms by default
maxWaitDuration = applicationProperties.getInt(CONSUMER_MAX_RETRY_INTERVAL, minWaitDuration * 60); // 30 sec by default
- commitBatchSize = applicationProperties.getInt(CONSUMER_COMMIT_BATCH_SIZE, 50);
+ commitBatchSize = applicationProperties.getInt(CONSUMER_COMMIT_BATCH_SIZE, 0);
skipHiveColumnLineageHive20633 = applicationProperties.getBoolean(CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, false);
skipHiveColumnLineageHive20633InputsThreshold = applicationProperties.getInt(CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, 15); // skip if avg # of inputs is > 15
@@ -214,11 +216,16 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
hiveTablesCache = Collections.emptyMap();
}
- rdbmsTypesRemoveOwnedRefAttrs = applicationProperties.getBoolean(CONSUMER_PREPROCESS_RDBMS_TYPES_REMOVE_OWNEDREF_ATTRS, true);
- preprocessEnabled = !hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() || skipHiveColumnLineageHive20633 || rdbmsTypesRemoveOwnedRefAttrs;
+ hiveTypesRemoveOwnedRefAttrs = applicationProperties.getBoolean(CONSUMER_PREPROCESS_HIVE_TYPES_REMOVE_OWNEDREF_ATTRS, false);
+ rdbmsTypesRemoveOwnedRefAttrs = applicationProperties.getBoolean(CONSUMER_PREPROCESS_RDBMS_TYPES_REMOVE_OWNEDREF_ATTRS, false);
+ preprocessEnabled = !hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() || skipHiveColumnLineageHive20633 || hiveTypesRemoveOwnedRefAttrs || rdbmsTypesRemoveOwnedRefAttrs;
LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, skipHiveColumnLineageHive20633);
LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, skipHiveColumnLineageHive20633InputsThreshold);
+ LOG.info("{}={}", CONSUMER_PREPROCESS_HIVE_TYPES_REMOVE_OWNEDREF_ATTRS, hiveTypesRemoveOwnedRefAttrs);
+ LOG.info("{}={}", CONSUMER_PREPROCESS_RDBMS_TYPES_REMOVE_OWNEDREF_ATTRS, rdbmsTypesRemoveOwnedRefAttrs);
+ LOG.info("{}={}", CONSUMER_COMMIT_BATCH_SIZE, commitBatchSize);
+ LOG.info("{}={}", CONSUMER_DISABLED, consumerDisabled);
}
@Override
@@ -694,7 +701,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
List<AtlasEntity> entitiesList = entities.getEntities();
AtlasEntityStream entityStream = new AtlasEntityStream(entities);
- if (entitiesList.size() <= commitBatchSize) {
+ if (commitBatchSize <= 0 || entitiesList.size() <= commitBatchSize) {
atlasEntityStore.createOrUpdate(entityStream, isPartialUpdate);
} else {
for (int fromIdx = 0; fromIdx < entitiesList.size(); fromIdx += commitBatchSize) {
@@ -792,10 +799,10 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
return;
}
- PreprocessorContext context = new PreprocessorContext(kafkaMsg, hiveTablesToIgnore, hiveTablesToPrune, hiveTablesCache, rdbmsTypesRemoveOwnedRefAttrs);
+ PreprocessorContext context = new PreprocessorContext(kafkaMsg, hiveTablesToIgnore, hiveTablesToPrune, hiveTablesCache, hiveTypesRemoveOwnedRefAttrs, rdbmsTypesRemoveOwnedRefAttrs);
- if (!hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty()) {
- ignoreOrPruneHiveTables(context);
+ if (!hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() || hiveTypesRemoveOwnedRefAttrs) {
+ preprocessHiveTypes(context);
}
if (skipHiveColumnLineageHive20633) {
@@ -813,8 +820,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
List<AtlasEntity> entities = context.getEntities();
if (entities != null) {
- for (ListIterator<AtlasEntity> iter = entities.listIterator(); iter.hasNext(); ) {
- AtlasEntity entity = iter.next();
+ for (int i = 0; i < entities.size(); i++) {
+ AtlasEntity entity = entities.get(i);
EntityPreprocessor preprocessor = EntityPreprocessor.getRdbmsPreprocessor(entity.getTypeName());
if (preprocessor != null) {
@@ -824,19 +831,19 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
}
}
- private void ignoreOrPruneHiveTables(PreprocessorContext context) {
+ private void preprocessHiveTypes(PreprocessorContext context) {
List<AtlasEntity> entities = context.getEntities();
if (entities != null) {
- for (ListIterator<AtlasEntity> iter = entities.listIterator(); iter.hasNext(); ) {
- AtlasEntity entity = iter.next();
+ for (int i = 0; i < entities.size(); i++) {
+ AtlasEntity entity = entities.get(i);
EntityPreprocessor preprocessor = EntityPreprocessor.getHivePreprocessor(entity.getTypeName());
if (preprocessor != null) {
preprocessor.preprocess(entity, context);
if (context.isIgnoredEntity(entity.getGuid())) {
- iter.remove();
+ entities.remove(i--);
}
}
}
@@ -877,8 +884,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
Set<String> lineageQNames = new HashSet<>();
// find if all hive_column_lineage entities have same number of inputs, which is likely to be caused by HIVE-20633 that results in incorrect lineage in some cases
- for (ListIterator<AtlasEntity> iter = entities.listIterator(); iter.hasNext(); ) {
- AtlasEntity entity = iter.next();
+ for (int i = 0; i < entities.size(); i++) {
+ AtlasEntity entity = entities.get(i);
if (StringUtils.equals(entity.getTypeName(), TYPE_HIVE_COLUMN_LINEAGE)) {
final Object qName = entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME);
@@ -887,7 +894,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
final String qualifiedName = qName.toString();
if (lineageQNames.contains(qualifiedName)) {
- iter.remove();
+ entities.remove(i--);
LOG.warn("removed duplicate hive_column_lineage entity: qualifiedName={}. topic-offset={}, partition={}", qualifiedName, lineageInputsCount, context.getKafkaMessageOffset(), context.getKafkaPartition());
@@ -914,11 +921,11 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
float avgInputsCount = lineageCount > 0 ? (((float) lineageInputsCount) / lineageCount) : 0;
if (avgInputsCount > skipHiveColumnLineageHive20633InputsThreshold) {
- for (ListIterator<AtlasEntity> iter = entities.listIterator(); iter.hasNext(); ) {
- AtlasEntity entity = iter.next();
+ for (int i = 0; i < entities.size(); i++) {
+ AtlasEntity entity = entities.get(i);
if (StringUtils.equals(entity.getTypeName(), TYPE_HIVE_COLUMN_LINEAGE)) {
- iter.remove();
+ entities.remove(i--);
numRemovedEntities++;
}
diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java
index 7eba27a..085e746 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java
@@ -48,6 +48,8 @@ public abstract class EntityPreprocessor {
public static final String ATTRIBUTE_SD = "sd";
public static final String ATTRIBUTE_DB = "db";
public static final String ATTRIBUTE_DATABASES = "databases";
+ public static final String ATTRIBUTE_QUERY = "query";
+ public static final String ATTRIBUTE_TABLE = "table";
public static final String ATTRIBUTE_TABLES = "tables";
public static final String ATTRIBUTE_INDEXES = "indexes";
public static final String ATTRIBUTE_FOREIGN_KEYS = "foreign_keys";
diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java
index d54c88d..ff9c9cb 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java
@@ -18,14 +18,26 @@
package org.apache.atlas.notification.preprocessor;
import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.model.instance.AtlasRelatedObjectId;
import org.apache.atlas.notification.preprocessor.PreprocessorContext.PreprocessAction;
import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashSet;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
public class HivePreprocessor {
+ private static final Logger LOG = LoggerFactory.getLogger(HivePreprocessor.class);
+
+ private static final String RELATIONSHIP_TYPE_HIVE_TABLE_COLUMNS = "hive_table_columns";
+ private static final String RELATIONSHIP_TYPE_HIVE_TABLE_PARTITION_KEYS = "hive_table_partitionKeys";
+
static class HiveTablePreprocessor extends EntityPreprocessor {
public HiveTablePreprocessor() {
super(TYPE_HIVE_TABLE);
@@ -54,9 +66,64 @@ public class HivePreprocessor {
entity.setAttribute(ATTRIBUTE_SD, null);
entity.setAttribute(ATTRIBUTE_COLUMNS, null);
entity.setAttribute(ATTRIBUTE_PARTITION_KEYS, null);
+ } else if (context.getHiveTypesRemoveOwnedRefAttrs()) {
+ context.removeRefAttributeAndRegisterToMove(entity, ATTRIBUTE_SD);
+
+ removeColumnsAttributeAndRegisterToMove(entity, ATTRIBUTE_COLUMNS, RELATIONSHIP_TYPE_HIVE_TABLE_COLUMNS, context);
+ removeColumnsAttributeAndRegisterToMove(entity, ATTRIBUTE_PARTITION_KEYS, RELATIONSHIP_TYPE_HIVE_TABLE_PARTITION_KEYS, context);
}
}
}
+
+ private void removeColumnsAttributeAndRegisterToMove(AtlasEntity entity, String attrName, String relationshipType, PreprocessorContext context) {
+ Object attrVal = entity.getAttribute(attrName);
+
+ if (attrVal != null) {
+ Set<String> guids = new HashSet<>();
+
+ context.collectGuids(attrVal, guids);
+
+ for (String guid : guids) {
+ AtlasEntity colEntity = context.getEntity(guid);
+
+ if (colEntity != null) {
+ Object attrTable = null;
+
+ if (colEntity.hasRelationshipAttribute(ATTRIBUTE_TABLE)) {
+ attrTable = colEntity.getRelationshipAttribute(ATTRIBUTE_TABLE);
+ } else if (colEntity.hasAttribute(ATTRIBUTE_TABLE)) {
+ attrTable = colEntity.getAttribute(ATTRIBUTE_TABLE);
+ }
+
+ attrTable = setRelationshipType(attrTable, relationshipType);
+
+ if (attrTable != null) {
+ colEntity.setRelationshipAttribute(ATTRIBUTE_TABLE, attrTable);
+ }
+
+ context.addToReferredEntitiesToMove(guid);
+ }
+ }
+ }
+ }
+
+ private AtlasRelatedObjectId setRelationshipType(Object attr, String relationshipType) {
+ AtlasRelatedObjectId ret = null;
+
+ if (attr instanceof AtlasRelatedObjectId) {
+ ret = (AtlasRelatedObjectId) attr;
+ } else if (attr instanceof AtlasObjectId) {
+ ret = new AtlasRelatedObjectId((AtlasObjectId) attr);
+ } else if (attr instanceof Map) {
+ ret = new AtlasRelatedObjectId((Map) attr);
+ }
+
+ if (ret != null) {
+ ret.setRelationshipType(relationshipType);
+ }
+
+ return ret;
+ }
}
diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
index 0f95fba..94e0993 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
@@ -44,16 +44,18 @@ public class PreprocessorContext {
private final List<Pattern> hiveTablesToIgnore;
private final List<Pattern> hiveTablesToPrune;
private final Map<String, PreprocessAction> hiveTablesCache;
+ private final boolean hiveTypesRemoveOwnedRefAttrs;
private final boolean rdbmsTypesRemoveOwnedRefAttrs;
private final Set<String> ignoredEntities = new HashSet<>();
private final Set<String> prunedEntities = new HashSet<>();
private final Set<String> referredEntitiesToMove = new HashSet<>();
- public PreprocessorContext(AtlasKafkaMessage<HookNotification> kafkaMessage, List<Pattern> hiveTablesToIgnore, List<Pattern> hiveTablesToPrune, Map<String, PreprocessAction> hiveTablesCache, boolean rdbmsTypesRemoveOwnedRefAttrs) {
+ public PreprocessorContext(AtlasKafkaMessage<HookNotification> kafkaMessage, List<Pattern> hiveTablesToIgnore, List<Pattern> hiveTablesToPrune, Map<String, PreprocessAction> hiveTablesCache, boolean hiveTypesRemoveOwnedRefAttrs, boolean rdbmsTypesRemoveOwnedRefAttrs) {
this.kafkaMessage = kafkaMessage;
this.hiveTablesToIgnore = hiveTablesToIgnore;
this.hiveTablesToPrune = hiveTablesToPrune;
this.hiveTablesCache = hiveTablesCache;
+ this.hiveTypesRemoveOwnedRefAttrs = hiveTypesRemoveOwnedRefAttrs;
this.rdbmsTypesRemoveOwnedRefAttrs = rdbmsTypesRemoveOwnedRefAttrs;
final HookNotification message = kafkaMessage.getMessage();
@@ -85,6 +87,8 @@ public class PreprocessorContext {
return kafkaMessage.getPartition();
}
+ public boolean getHiveTypesRemoveOwnedRefAttrs() { return hiveTypesRemoveOwnedRefAttrs; }
+
public boolean getRdbmsTypesRemoveOwnedRefAttrs() { return rdbmsTypesRemoveOwnedRefAttrs; }
public List<AtlasEntity> getEntities() {