You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2019/03/07 18:36:28 UTC
[atlas] branch branch-1.0 updated: ATLAS-3056: updated rdbms types
to remove use of ownedRef/inverseRef
This is an automated email from the ASF dual-hosted git repository.
madhan pushed a commit to branch branch-1.0
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/branch-1.0 by this push:
new be98213 ATLAS-3056: updated rdbms types to remove use of ownedRef/inverseRef
be98213 is described below
commit be98213f5d60d6af7d485782dc077e06e92b0089
Author: Madhan Neethiraj <ma...@apache.org>
AuthorDate: Thu Feb 21 18:16:34 2019 -0800
ATLAS-3056: updated rdbms types to remove use of ownedRef/inverseRef
(cherry picked from commit f103e1438bbfc0c61912ded3551e22578876fbbe)
(cherry picked from commit 5d99c40a51d910d3858d4d25e321afad5f12996d)
---
addons/models/2000-RDBMS/2010-rdbms_model.json | 6 +-
.../003-remove-rdbms-legacy-attributes.json | 90 +++++++++++++
.../apache/atlas/model/instance/AtlasStruct.java | 6 +-
.../bootstrap/AtlasTypeDefStoreInitializer.java | 25 +++-
.../graph/v2/AtlasRelationshipDefStoreV2.java | 11 +-
.../notification/NotificationHookConsumer.java | 30 ++++-
.../preprocessor/EntityPreprocessor.java | 39 +++++-
.../preprocessor/PreprocessorContext.java | 86 +++++++++----
.../preprocessor/RdbmsPreprocessor.java | 139 +++++++++++++++++++++
9 files changed, 384 insertions(+), 48 deletions(-)
diff --git a/addons/models/2000-RDBMS/2010-rdbms_model.json b/addons/models/2000-RDBMS/2010-rdbms_model.json
index 386446c..e72e13a 100644
--- a/addons/models/2000-RDBMS/2010-rdbms_model.json
+++ b/addons/models/2000-RDBMS/2010-rdbms_model.json
@@ -21,7 +21,7 @@
{
"name": "platform",
"typeName": "string",
- "isOptional": false,
+ "isOptional": true,
"cardinality": "SINGLE",
"isUnique": false,
"isIndexable": true
@@ -99,7 +99,7 @@
{
"name": "prodOrOther",
"typeName": "string",
- "isOptional": false,
+ "isOptional": true,
"cardinality": "SINGLE",
"isUnique": false,
"isIndexable": true
@@ -259,7 +259,7 @@
{
"name": "data_type",
"typeName": "string",
- "isOptional": false,
+ "isOptional": true,
"cardinality": "SINGLE",
"isUnique": false,
"isIndexable": true
diff --git a/addons/models/2000-RDBMS/patches/003-remove-rdbms-legacy-attributes.json b/addons/models/2000-RDBMS/patches/003-remove-rdbms-legacy-attributes.json
new file mode 100644
index 0000000..d087c66
--- /dev/null
+++ b/addons/models/2000-RDBMS/patches/003-remove-rdbms-legacy-attributes.json
@@ -0,0 +1,90 @@
+{
+ "patches": [
+ {
+ "action": "REMOVE_LEGACY_ATTRIBUTES",
+ "typeName": "rdbms_instance_databases",
+ "applyToVersion": "1.0",
+ "updateToVersion": "1.1",
+ "params": {
+ "relationshipLabel": "__rdbms_instance.databases",
+ "relationshipCategory": "COMPOSITION"
+ }
+ },
+ {
+ "action": "REMOVE_LEGACY_ATTRIBUTES",
+ "typeName": "rdbms_db_tables",
+ "applyToVersion": "1.0",
+ "updateToVersion": "1.1",
+ "params": {
+ "relationshipLabel": "__rdbms_db.tables",
+ "relationshipCategory": "COMPOSITION"
+ }
+ },
+ {
+ "action": "REMOVE_LEGACY_ATTRIBUTES",
+ "typeName": "rdbms_table_columns",
+ "applyToVersion": "1.0",
+ "updateToVersion": "1.1",
+ "params": {
+ "relationshipLabel": "__rdbms_table.columns",
+ "relationshipCategory": "COMPOSITION"
+ }
+ },
+ {
+ "action": "REMOVE_LEGACY_ATTRIBUTES",
+ "typeName": "rdbms_table_indexes",
+ "applyToVersion": "1.0",
+ "updateToVersion": "1.1",
+ "params": {
+ "relationshipLabel": "__rdbms_table.indexes",
+ "relationshipCategory": "COMPOSITION"
+ }
+ },
+ {
+ "action": "REMOVE_LEGACY_ATTRIBUTES",
+ "typeName": "rdbms_table_foreign_key",
+ "applyToVersion": "1.0",
+ "updateToVersion": "1.1",
+ "params": {
+ "relationshipLabel": "__rdbms_table.foreign_keys",
+ "relationshipCategory": "COMPOSITION"
+ }
+ },
+ {
+ "action": "REMOVE_LEGACY_ATTRIBUTES",
+ "typeName": "rdbms_index_columns",
+ "applyToVersion": "1.0",
+ "updateToVersion": "1.1",
+ "params": {
+ "relationshipLabel": "__rdbms_index.columns"
+ }
+ },
+ {
+ "action": "REMOVE_LEGACY_ATTRIBUTES",
+ "typeName": "rdbms_foreign_key_key_columns",
+ "applyToVersion": "1.0",
+ "updateToVersion": "1.1",
+ "params": {
+ "relationshipLabel": "__rdbms_foreign_key.key_columns"
+ }
+ },
+ {
+ "action": "REMOVE_LEGACY_ATTRIBUTES",
+ "typeName": "rdbms_foreign_key_table_references",
+ "applyToVersion": "1.0",
+ "updateToVersion": "1.1",
+ "params": {
+ "relationshipLabel": "__rdbms_foreign_key.references_table"
+ }
+ },
+ {
+ "action": "REMOVE_LEGACY_ATTRIBUTES",
+ "typeName": "rdbms_foreign_key_column_references",
+ "applyToVersion": "1.0",
+ "updateToVersion": "1.1",
+ "params": {
+ "relationshipLabel": "__rdbms_foreign_key.references_columns"
+ }
+ }
+ ]
+}
diff --git a/intg/src/main/java/org/apache/atlas/model/instance/AtlasStruct.java b/intg/src/main/java/org/apache/atlas/model/instance/AtlasStruct.java
index 18e7407..027b160 100644
--- a/intg/src/main/java/org/apache/atlas/model/instance/AtlasStruct.java
+++ b/intg/src/main/java/org/apache/atlas/model/instance/AtlasStruct.java
@@ -147,12 +147,10 @@ public class AtlasStruct implements Serializable {
}
}
- public void removeAttribute(String name) {
+ public Object removeAttribute(String name) {
Map<String, Object> a = this.attributes;
- if (a != null && a.containsKey(name)) {
- a.remove(name);
- }
+ return a != null ? a.remove(name) : null;
}
public StringBuilder toString(StringBuilder sb) {
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java b/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java
index a86282e..d12284e 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java
@@ -35,6 +35,7 @@ import org.apache.atlas.model.typedef.AtlasEntityDef;
import org.apache.atlas.model.typedef.AtlasEnumDef;
import org.apache.atlas.model.typedef.AtlasEnumDef.AtlasEnumElementDef;
import org.apache.atlas.model.typedef.AtlasRelationshipDef;
+import org.apache.atlas.model.typedef.AtlasRelationshipDef.RelationshipCategory;
import org.apache.atlas.model.typedef.AtlasRelationshipEndDef;
import org.apache.atlas.model.typedef.AtlasStructDef;
import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
@@ -77,8 +78,9 @@ import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_
@Service
public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
private static final Logger LOG = LoggerFactory.getLogger(AtlasTypeDefStoreInitializer.class);
- public static final String PATCHES_FOLDER_NAME = "patches";
- public static final String RELATIONSHIP_LABEL = "relationshipLabel";
+ public static final String PATCHES_FOLDER_NAME = "patches";
+ public static final String RELATIONSHIP_LABEL = "relationshipLabel";
+ public static final String RELATIONSHIP_CATEGORY = "relationshipCategory";
private final AtlasTypeDefStore atlasTypeDefStore;
private final AtlasTypeRegistry atlasTypeRegistry;
@@ -730,13 +732,19 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
AtlasEntityType end1Type = typeRegistry.getEntityTypeByName(end1Def.getType());
AtlasEntityType end2Type = typeRegistry.getEntityTypeByName(end2Def.getType());
- String newRelationshipLabel = null;
+ String newRelationshipLabel = null;
+ RelationshipCategory newRelationshipCategory = null;
if (patch.getParams() != null) {
- Object val = patch.getParams().get(RELATIONSHIP_LABEL);
+ Object relLabel = patch.getParams().get(RELATIONSHIP_LABEL);
+ Object relCategory = patch.getParams().get(RELATIONSHIP_CATEGORY);
- if (val != null) {
- newRelationshipLabel = val.toString();
+ if (relLabel != null) {
+ newRelationshipLabel = relLabel.toString();
+ }
+
+ if (relCategory != null) {
+ newRelationshipCategory = RelationshipCategory.valueOf(relCategory.toString());
}
}
@@ -763,6 +771,11 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
AtlasEntityDef updatedEntityDef2 = new AtlasEntityDef(end2Type.getEntityDef());
updatedDef.setRelationshipLabel(newRelationshipLabel);
+
+ if (newRelationshipCategory != null) {
+ updatedDef.setRelationshipCategory(newRelationshipCategory);
+ }
+
updatedDef.getEndDef1().setIsLegacyAttribute(false);
updatedDef.getEndDef2().setIsLegacyAttribute(false);
updatedDef.setTypeVersion(patch.getUpdateToVersion());
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipDefStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipDefStoreV2.java
index bfee34e..35d0577 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipDefStoreV2.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipDefStoreV2.java
@@ -20,6 +20,7 @@ package org.apache.atlas.repository.store.graph.v2;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.AtlasException;
+import org.apache.atlas.RequestContext;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.typedef.AtlasRelationshipDef;
import org.apache.atlas.model.typedef.AtlasRelationshipDef.RelationshipCategory;
@@ -425,9 +426,13 @@ public class AtlasRelationshipDefStoreV2 extends AtlasAbstractDefStoreV2<AtlasRe
RelationshipCategory newRelationshipCategory = newRelationshipDef.getRelationshipCategory();
if ( !existingRelationshipCategory.equals(newRelationshipCategory)){
- throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIPDEF_INVALID_CATEGORY_UPDATE,
- newRelationshipDef.getName(),newRelationshipCategory.name(),
- existingRelationshipCategory.name() );
+ if (!RequestContext.get().isInTypePatching()) {
+ throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIPDEF_INVALID_CATEGORY_UPDATE,
+ newRelationshipDef.getName(), newRelationshipCategory.name(),
+ existingRelationshipCategory.name());
+ } else {
+ LOG.warn("RELATIONSHIP UPDATE: relationship category from {} to {} for {}", existingRelationshipCategory.name(), newRelationshipCategory.name(), newRelationshipDef.getName());
+ }
}
AtlasRelationshipEndDef existingEnd1 = existingRelationshipDef.getEndDef1();
diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 53fd117..eab5d51 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -123,6 +123,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
public static final String CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_PATTERN = "atlas.notification.consumer.preprocess.hive_table.ignore.pattern";
public static final String CONSUMER_PREPROCESS_HIVE_TABLE_PRUNE_PATTERN = "atlas.notification.consumer.preprocess.hive_table.prune.pattern";
public static final String CONSUMER_PREPROCESS_HIVE_TABLE_CACHE_SIZE = "atlas.notification.consumer.preprocess.hive_table.cache.size";
+ public static final String CONSUMER_PREPROCESS_RDBMS_TYPES_REMOVE_OWNEDREF_ATTRS = "atlas.notification.consumer.preprocess.rdbms_types.remove.ownedref.attrs";
public static final int SERVER_READY_WAIT_TIME_MS = 1000;
@@ -142,6 +143,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
private final List<Pattern> hiveTablesToIgnore = new ArrayList<>();
private final List<Pattern> hiveTablesToPrune = new ArrayList<>();
private final Map<String, PreprocessAction> hiveTablesCache;
+ private final boolean rdbmsTypesRemoveOwnedRefAttrs;
private final boolean preprocessEnabled;
private NotificationInterface notificationInterface;
@@ -212,7 +214,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
hiveTablesCache = Collections.emptyMap();
}
- preprocessEnabled = !hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() || skipHiveColumnLineageHive20633;
+ rdbmsTypesRemoveOwnedRefAttrs = applicationProperties.getBoolean(CONSUMER_PREPROCESS_RDBMS_TYPES_REMOVE_OWNEDREF_ATTRS, true);
+ preprocessEnabled = !hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() || skipHiveColumnLineageHive20633 || rdbmsTypesRemoveOwnedRefAttrs;
LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, skipHiveColumnLineageHive20633);
LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, skipHiveColumnLineageHive20633InputsThreshold);
@@ -778,7 +781,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
return;
}
- PreprocessorContext context = new PreprocessorContext(kafkaMsg, hiveTablesToIgnore, hiveTablesToPrune, hiveTablesCache);
+ PreprocessorContext context = new PreprocessorContext(kafkaMsg, hiveTablesToIgnore, hiveTablesToPrune, hiveTablesCache, rdbmsTypesRemoveOwnedRefAttrs);
if (!hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty()) {
ignoreOrPruneHiveTables(context);
@@ -788,16 +791,35 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
skipHiveColumnLineage(context);
}
+ if (rdbmsTypesRemoveOwnedRefAttrs) {
+ rdbmsTypeRemoveOwnedRefAttrs(context);
+ }
+
context.moveRegisteredReferredEntities();
}
+ private void rdbmsTypeRemoveOwnedRefAttrs(PreprocessorContext context) {
+ List<AtlasEntity> entities = context.getEntities();
+
+ if (entities != null) {
+ for (ListIterator<AtlasEntity> iter = entities.listIterator(); iter.hasNext(); ) {
+ AtlasEntity entity = iter.next();
+ EntityPreprocessor preprocessor = EntityPreprocessor.getRdbmsPreprocessor(entity.getTypeName());
+
+ if (preprocessor != null) {
+ preprocessor.preprocess(entity, context);
+ }
+ }
+ }
+ }
+
private void ignoreOrPruneHiveTables(PreprocessorContext context) {
List<AtlasEntity> entities = context.getEntities();
if (entities != null) {
for (ListIterator<AtlasEntity> iter = entities.listIterator(); iter.hasNext(); ) {
AtlasEntity entity = iter.next();
- EntityPreprocessor preprocessor = EntityPreprocessor.getPreprocessor(entity.getTypeName());
+ EntityPreprocessor preprocessor = EntityPreprocessor.getHivePreprocessor(entity.getTypeName());
if (preprocessor != null) {
preprocessor.preprocess(entity, context);
@@ -813,7 +835,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
if (referredEntities != null) {
for (Iterator<Map.Entry<String, AtlasEntity>> iter = referredEntities.entrySet().iterator(); iter.hasNext(); ) {
AtlasEntity entity = iter.next().getValue();
- EntityPreprocessor preprocessor = EntityPreprocessor.getPreprocessor(entity.getTypeName());
+ EntityPreprocessor preprocessor = EntityPreprocessor.getHivePreprocessor(entity.getTypeName());
if (preprocessor != null) {
preprocessor.preprocess(entity, context);
diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java
index bdea14a..7eba27a 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java
@@ -32,25 +32,38 @@ public abstract class EntityPreprocessor {
public static final String TYPE_HIVE_PROCESS = "hive_process";
public static final String TYPE_HIVE_STORAGEDESC = "hive_storagedesc";
public static final String TYPE_HIVE_TABLE = "hive_table";
+ public static final String TYPE_RDBMS_INSTANCE = "rdbms_instance";
+ public static final String TYPE_RDBMS_DB = "rdbms_db";
+ public static final String TYPE_RDBMS_TABLE = "rdbms_table";
+ public static final String TYPE_RDBMS_COLUMN = "rdbms_column";
+ public static final String TYPE_RDBMS_INDEX = "rdbms_index";
+ public static final String TYPE_RDBMS_FOREIGN_KEY = "rdbms_foreign_key";
public static final String ATTRIBUTE_COLUMNS = "columns";
public static final String ATTRIBUTE_INPUTS = "inputs";
public static final String ATTRIBUTE_OUTPUTS = "outputs";
public static final String ATTRIBUTE_PARTITION_KEYS = "partitionKeys";
public static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName";
+ public static final String ATTRIBUTE_NAME = "name";
public static final String ATTRIBUTE_SD = "sd";
+ public static final String ATTRIBUTE_DB = "db";
+ public static final String ATTRIBUTE_DATABASES = "databases";
+ public static final String ATTRIBUTE_TABLES = "tables";
+ public static final String ATTRIBUTE_INDEXES = "indexes";
+ public static final String ATTRIBUTE_FOREIGN_KEYS = "foreign_keys";
public static final char QNAME_SEP_CLUSTER_NAME = '@';
public static final char QNAME_SEP_ENTITY_NAME = '.';
public static final String QNAME_SD_SUFFIX = "_storage";
- private static final Map<String, EntityPreprocessor> PREPROCESSOR_MAP = new HashMap<>();
+ private static final Map<String, EntityPreprocessor> HIVE_PREPROCESSOR_MAP = new HashMap<>();
+ private static final Map<String, EntityPreprocessor> RDBMS_PREPROCESSOR_MAP = new HashMap<>();
private final String typeName;
static {
- EntityPreprocessor[] preprocessors = new EntityPreprocessor[] {
+ EntityPreprocessor[] hivePreprocessors = new EntityPreprocessor[] {
new HivePreprocessor.HiveTablePreprocessor(),
new HivePreprocessor.HiveColumnPreprocessor(),
new HivePreprocessor.HiveProcessPreprocessor(),
@@ -58,8 +71,18 @@ public abstract class EntityPreprocessor {
new HivePreprocessor.HiveStorageDescPreprocessor()
};
- for (EntityPreprocessor preprocessor : preprocessors) {
- PREPROCESSOR_MAP.put(preprocessor.getTypeName(), preprocessor);
+ EntityPreprocessor[] rdbmsPreprocessors = new EntityPreprocessor[] {
+ new RdbmsPreprocessor.RdbmsInstancePreprocessor(),
+ new RdbmsPreprocessor.RdbmsDbPreprocessor(),
+ new RdbmsPreprocessor.RdbmsTablePreprocessor()
+ };
+
+ for (EntityPreprocessor preprocessor : hivePreprocessors) {
+ HIVE_PREPROCESSOR_MAP.put(preprocessor.getTypeName(), preprocessor);
+ }
+
+ for (EntityPreprocessor preprocessor : rdbmsPreprocessors) {
+ RDBMS_PREPROCESSOR_MAP.put(preprocessor.getTypeName(), preprocessor);
}
}
@@ -74,8 +97,12 @@ public abstract class EntityPreprocessor {
public abstract void preprocess(AtlasEntity entity, PreprocessorContext context);
- public static EntityPreprocessor getPreprocessor(String typeName) {
- return typeName != null ? PREPROCESSOR_MAP.get(typeName) : null;
+ public static EntityPreprocessor getHivePreprocessor(String typeName) {
+ return typeName != null ? HIVE_PREPROCESSOR_MAP.get(typeName) : null;
+ }
+
+ public static EntityPreprocessor getRdbmsPreprocessor(String typeName) {
+ return typeName != null ? RDBMS_PREPROCESSOR_MAP.get(typeName) : null;
}
public static String getQualifiedName(AtlasEntity entity) {
diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
index 2d2c09a..0f95fba 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
@@ -44,15 +44,17 @@ public class PreprocessorContext {
private final List<Pattern> hiveTablesToIgnore;
private final List<Pattern> hiveTablesToPrune;
private final Map<String, PreprocessAction> hiveTablesCache;
+ private final boolean rdbmsTypesRemoveOwnedRefAttrs;
private final Set<String> ignoredEntities = new HashSet<>();
private final Set<String> prunedEntities = new HashSet<>();
private final Set<String> referredEntitiesToMove = new HashSet<>();
- public PreprocessorContext(AtlasKafkaMessage<HookNotification> kafkaMessage, List<Pattern> hiveTablesToIgnore, List<Pattern> hiveTablesToPrune, Map<String, PreprocessAction> hiveTablesCache) {
- this.kafkaMessage = kafkaMessage;
- this.hiveTablesToIgnore = hiveTablesToIgnore;
- this.hiveTablesToPrune = hiveTablesToPrune;
- this.hiveTablesCache = hiveTablesCache;
+ public PreprocessorContext(AtlasKafkaMessage<HookNotification> kafkaMessage, List<Pattern> hiveTablesToIgnore, List<Pattern> hiveTablesToPrune, Map<String, PreprocessAction> hiveTablesCache, boolean rdbmsTypesRemoveOwnedRefAttrs) {
+ this.kafkaMessage = kafkaMessage;
+ this.hiveTablesToIgnore = hiveTablesToIgnore;
+ this.hiveTablesToPrune = hiveTablesToPrune;
+ this.hiveTablesCache = hiveTablesCache;
+ this.rdbmsTypesRemoveOwnedRefAttrs = rdbmsTypesRemoveOwnedRefAttrs;
final HookNotification message = kafkaMessage.getMessage();
@@ -83,6 +85,8 @@ public class PreprocessorContext {
return kafkaMessage.getPartition();
}
+ public boolean getRdbmsTypesRemoveOwnedRefAttrs() { return rdbmsTypesRemoveOwnedRefAttrs; }
+
public List<AtlasEntity> getEntities() {
return entitiesWithExtInfo != null ? entitiesWithExtInfo.getEntities() : null;
}
@@ -95,6 +99,12 @@ public class PreprocessorContext {
return entitiesWithExtInfo != null && guid != null ? entitiesWithExtInfo.getEntity(guid) : null;
}
+ public AtlasEntity removeReferredEntity(String guid) {
+ Map<String, AtlasEntity> referredEntities = getReferredEntities();
+
+ return referredEntities != null && guid != null ? referredEntities.remove(guid) : null;
+ }
+
public Set<String> getIgnoredEntities() { return ignoredEntities; }
public Set<String> getPrunedEntities() { return prunedEntities; }
@@ -165,6 +175,14 @@ public class PreprocessorContext {
}
}
+ public void addToReferredEntitiesToMove(Collection<String> guids) {
+ if (guids != null) {
+ for (String guid : guids) {
+ addToReferredEntitiesToMove(guid);
+ }
+ }
+ }
+
public void addToIgnoredEntities(Object obj) {
collectGuids(obj, ignoredEntities);
}
@@ -173,6 +191,14 @@ public class PreprocessorContext {
collectGuids(obj, prunedEntities);
}
+ public void removeRefAttributeAndRegisterToMove(AtlasEntity entity, String attrName) {
+ Set<String> guidsToMove = new HashSet<>();
+
+ collectGuids(entity.removeAttribute(attrName), guidsToMove);
+
+ addToReferredEntitiesToMove(guidsToMove);
+ }
+
public void moveRegisteredReferredEntities() {
List<AtlasEntity> entities = getEntities();
Map<String, AtlasEntity> referredEntities = getReferredEntities();
@@ -202,38 +228,39 @@ public class PreprocessorContext {
}
}
- public String getGuid(Object obj) {
+ public String getTypeName(Object obj) {
Object ret = null;
if (obj instanceof AtlasObjectId) {
- ret = ((AtlasObjectId) obj).getGuid();
+ ret = ((AtlasObjectId) obj).getTypeName();
} else if (obj instanceof Map) {
- ret = ((Map) obj).get(AtlasObjectId.KEY_GUID);
+ ret = ((Map) obj).get(AtlasObjectId.KEY_TYPENAME);
} else if (obj instanceof AtlasEntity) {
- ret = ((AtlasEntity) obj).getGuid();
+ ret = ((AtlasEntity) obj).getTypeName();
} else if (obj instanceof AtlasEntity.AtlasEntityWithExtInfo) {
- ret = ((AtlasEntity.AtlasEntityWithExtInfo) obj).getEntity().getGuid();
+ ret = ((AtlasEntity.AtlasEntityWithExtInfo) obj).getEntity().getTypeName();
}
return ret != null ? ret.toString() : null;
}
+ public String getGuid(Object obj) {
+ Object ret = null;
- private boolean isMatch(String name, List<Pattern> patterns) {
- boolean ret = false;
-
- for (Pattern p : patterns) {
- if (p.matcher(name).matches()) {
- ret = true;
-
- break;
- }
+ if (obj instanceof AtlasObjectId) {
+ ret = ((AtlasObjectId) obj).getGuid();
+ } else if (obj instanceof Map) {
+ ret = ((Map) obj).get(AtlasObjectId.KEY_GUID);
+ } else if (obj instanceof AtlasEntity) {
+ ret = ((AtlasEntity) obj).getGuid();
+ } else if (obj instanceof AtlasEntity.AtlasEntityWithExtInfo) {
+ ret = ((AtlasEntity.AtlasEntityWithExtInfo) obj).getEntity().getGuid();
}
- return ret;
+ return ret != null ? ret.toString() : null;
}
- private void collectGuids(Object obj, Set<String> guids) {
+ public void collectGuids(Object obj, Set<String> guids) {
if (obj != null) {
if (obj instanceof Collection) {
Collection objList = (Collection) obj;
@@ -247,11 +274,26 @@ public class PreprocessorContext {
}
}
- private void collectGuid(Object obj, Set<String> guids) {
+ public void collectGuid(Object obj, Set<String> guids) {
String guid = getGuid(obj);
if (guid != null) {
guids.add(guid);
}
}
+
+
+ private boolean isMatch(String name, List<Pattern> patterns) {
+ boolean ret = false;
+
+ for (Pattern p : patterns) {
+ if (p.matcher(name).matches()) {
+ ret = true;
+
+ break;
+ }
+ }
+
+ return ret;
+ }
}
diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/RdbmsPreprocessor.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/RdbmsPreprocessor.java
new file mode 100644
index 0000000..adc1983
--- /dev/null
+++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/RdbmsPreprocessor.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.notification.preprocessor;
+
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.commons.collections.MapUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+
+public class RdbmsPreprocessor {
+ private static final Logger LOG = LoggerFactory.getLogger(RdbmsPreprocessor.class);
+
+ static class RdbmsInstancePreprocessor extends RdbmsTypePreprocessor {
+ public RdbmsInstancePreprocessor() {
+ super(TYPE_RDBMS_INSTANCE);
+ }
+ }
+
+ static class RdbmsDbPreprocessor extends RdbmsTypePreprocessor {
+ public RdbmsDbPreprocessor() {
+ super(TYPE_RDBMS_DB);
+ }
+ }
+
+ static class RdbmsTablePreprocessor extends RdbmsTypePreprocessor {
+ public RdbmsTablePreprocessor() {
+ super(TYPE_RDBMS_TABLE);
+ }
+ @Override
+ public void preprocess(AtlasEntity entity, PreprocessorContext context) {
+ super.preprocess(entity, context);
+
+ // try auto-fix when 'db' attribute is not present in relationshipAttribute & attributes
+ Object db = entity.getRelationshipAttribute(ATTRIBUTE_DB);
+
+ if (db == null) {
+ db = entity.getAttribute(ATTRIBUTE_DB);
+ }
+
+ if (db == null) {
+ String dbQualifiedName = getDbQualifiedName(entity);
+
+ if (dbQualifiedName != null) {
+ AtlasObjectId dbId = new AtlasObjectId(TYPE_RDBMS_DB, Collections.singletonMap(ATTRIBUTE_QUALIFIED_NAME, dbQualifiedName));
+
+ LOG.info("missing attribute {}.{} is set to {}", TYPE_RDBMS_TABLE, ATTRIBUTE_DB, dbId);
+
+ entity.setRelationshipAttribute(ATTRIBUTE_DB, dbId);
+ }
+ }
+ }
+
+ private String getDbQualifiedName(AtlasEntity tableEntity) {
+ String ret = null;
+ Object tblQualifiedName = tableEntity.getAttribute(ATTRIBUTE_QUALIFIED_NAME); // dbName.tblName@clusterName
+ Object tblName = tableEntity.getAttribute(ATTRIBUTE_NAME); // tblName
+
+ if (tblQualifiedName != null && tblName != null) {
+ ret = tblQualifiedName.toString().replace("." + tblName.toString() + "@", "@"); // dbName@clusterName
+ }
+
+ return ret;
+ }
+
+ }
+
+ static class RdbmsTypePreprocessor extends EntityPreprocessor {
+ private static final Set<String> entityTypesToMove = new HashSet<>();
+
+ static {
+ entityTypesToMove.add(TYPE_RDBMS_DB);
+ entityTypesToMove.add(TYPE_RDBMS_TABLE);
+ entityTypesToMove.add(TYPE_RDBMS_COLUMN);
+ entityTypesToMove.add(TYPE_RDBMS_INDEX);
+ entityTypesToMove.add(TYPE_RDBMS_FOREIGN_KEY);
+ }
+
+ protected RdbmsTypePreprocessor(String typeName) {
+ super(typeName);
+ }
+
+ @Override
+ public void preprocess(AtlasEntity entity, PreprocessorContext context) {
+ if (context.getRdbmsTypesRemoveOwnedRefAttrs()) {
+ clearRefAttributesAndMove(entity, context);
+
+ Map<String, AtlasEntity> referredEntities = context.getReferredEntities();
+
+ if (MapUtils.isNotEmpty(referredEntities)) {
+ for (AtlasEntity referredEntity : referredEntities.values()) {
+ if (entityTypesToMove.contains(referredEntity.getTypeName())) {
+ clearRefAttributesAndMove(referredEntity, context);
+ }
+ }
+ }
+ }
+ }
+
+ private void clearRefAttributesAndMove(AtlasEntity entity, PreprocessorContext context) {
+ switch (entity.getTypeName()) {
+ case TYPE_RDBMS_INSTANCE:
+ context.removeRefAttributeAndRegisterToMove(entity, ATTRIBUTE_DATABASES);
+ break;
+
+ case TYPE_RDBMS_DB:
+ context.removeRefAttributeAndRegisterToMove(entity, ATTRIBUTE_TABLES);
+ break;
+
+ case TYPE_RDBMS_TABLE:
+ context.removeRefAttributeAndRegisterToMove(entity, ATTRIBUTE_COLUMNS);
+ context.removeRefAttributeAndRegisterToMove(entity, ATTRIBUTE_INDEXES);
+ context.removeRefAttributeAndRegisterToMove(entity, ATTRIBUTE_FOREIGN_KEYS);
+ break;
+ }
+ }
+ }
+}