You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2019/07/16 00:59:55 UTC
[atlas] branch branch-2.0 updated: ATLAS-3333: updated notification
pre-process with an option to ignore dummy Hive database/table
This is an automated email from the ASF dual-hosted git repository.
madhan pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 0fe6712 ATLAS-3333: updated notification pre-process with an option to ignore dummy Hive database/table
0fe6712 is described below
commit 0fe67124169f5f72ba1a5dca47dac77330ee6927
Author: Madhan Neethiraj <ma...@apache.org>
AuthorDate: Fri Jul 12 00:56:49 2019 -0700
ATLAS-3333: updated notification pre-process with an option to ignore dummy Hive database/table
(cherry picked from commit 23eacbafcea2e58378271fa6dc7b56be08b7cac7)
---
.../notification/NotificationHookConsumer.java | 79 +++++++++++++-
.../preprocessor/EntityPreprocessor.java | 8 ++
.../preprocessor/HivePreprocessor.java | 34 ++++--
.../preprocessor/PreprocessorContext.java | 121 +++++++++++++++++++--
4 files changed, 219 insertions(+), 23 deletions(-)
diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 1c8d72b..f7df6b3 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -71,7 +71,6 @@ import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
-import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.DependsOn;
@@ -116,6 +115,11 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
private static final String ATTRIBUTE_INPUTS = "inputs";
private static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName";
+ // from org.apache.hadoop.hive.ql.parse.SemanticAnalyzer
+ public static final String DUMMY_DATABASE = "_dummy_database";
+ public static final String DUMMY_TABLE = "_dummy_table";
+ public static final String VALUES_TMP_TABLE_NAME_PREFIX = "Values__Tmp__Table__";
+
private static final String THREADNAME_PREFIX = NotificationHookConsumer.class.getSimpleName();
public static final String CONSUMER_THREADS_PROPERTY = "atlas.notification.hook.numthreads";
@@ -133,6 +137,12 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
public static final String CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_PATTERN = "atlas.notification.consumer.preprocess.hive_table.ignore.pattern";
public static final String CONSUMER_PREPROCESS_HIVE_TABLE_PRUNE_PATTERN = "atlas.notification.consumer.preprocess.hive_table.prune.pattern";
public static final String CONSUMER_PREPROCESS_HIVE_TABLE_CACHE_SIZE = "atlas.notification.consumer.preprocess.hive_table.cache.size";
+ public static final String CONSUMER_PREPROCESS_HIVE_DB_IGNORE_DUMMY_ENABLED = "atlas.notification.consumer.preprocess.hive_db.ignore.dummy.enabled";
+ public static final String CONSUMER_PREPROCESS_HIVE_DB_IGNORE_DUMMY_NAMES = "atlas.notification.consumer.preprocess.hive_db.ignore.dummy.names";
+ public static final String CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_DUMMY_ENABLED = "atlas.notification.consumer.preprocess.hive_table.ignore.dummy.enabled";
+ public static final String CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_DUMMY_NAMES = "atlas.notification.consumer.preprocess.hive_table.ignore.dummy.names";
+ public static final String CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_NAME_PREFIXES_ENABLED = "atlas.notification.consumer.preprocess.hive_table.ignore.name.prefixes.enabled";
+ public static final String CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_NAME_PREFIXES = "atlas.notification.consumer.preprocess.hive_table.ignore.name.prefixes";
public static final String CONSUMER_PREPROCESS_HIVE_TYPES_REMOVE_OWNEDREF_ATTRS = "atlas.notification.consumer.preprocess.hive_types.remove.ownedref.attrs";
public static final String CONSUMER_PREPROCESS_RDBMS_TYPES_REMOVE_OWNEDREF_ATTRS = "atlas.notification.consumer.preprocess.rdbms_types.remove.ownedref.attrs";
@@ -154,6 +164,9 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
private final boolean consumerDisabled;
private final List<Pattern> hiveTablesToIgnore = new ArrayList<>();
private final List<Pattern> hiveTablesToPrune = new ArrayList<>();
+ private final List<String> hiveDummyDatabasesToIgnore;
+ private final List<String> hiveDummyTablesToIgnore;
+ private final List<String> hiveTablePrefixesToIgnore;
private final Map<String, PreprocessAction> hiveTablesCache;
private final boolean hiveTypesRemoveOwnedRefAttrs;
private final boolean rdbmsTypesRemoveOwnedRefAttrs;
@@ -228,9 +241,47 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
hiveTablesCache = Collections.emptyMap();
}
+ boolean hiveDbIgnoreDummyEnabled = applicationProperties.getBoolean(CONSUMER_PREPROCESS_HIVE_DB_IGNORE_DUMMY_ENABLED, true);
+ boolean hiveTableIgnoreDummyEnabled = applicationProperties.getBoolean(CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_DUMMY_ENABLED, true);
+ boolean hiveTableIgnoreNamePrefixEnabled = applicationProperties.getBoolean(CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_NAME_PREFIXES_ENABLED, true);
+
+ LOG.info("{}={}", CONSUMER_PREPROCESS_HIVE_DB_IGNORE_DUMMY_ENABLED, hiveDbIgnoreDummyEnabled);
+ LOG.info("{}={}", CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_DUMMY_ENABLED, hiveTableIgnoreDummyEnabled);
+ LOG.info("{}={}", CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_NAME_PREFIXES_ENABLED, hiveTableIgnoreNamePrefixEnabled);
+
+ if (hiveDbIgnoreDummyEnabled) {
+ String[] dummyDatabaseNames = applicationProperties.getStringArray(CONSUMER_PREPROCESS_HIVE_DB_IGNORE_DUMMY_NAMES);
+
+ hiveDummyDatabasesToIgnore = trimAndPurge(dummyDatabaseNames, DUMMY_DATABASE);
+
+ LOG.info("{}={}", CONSUMER_PREPROCESS_HIVE_DB_IGNORE_DUMMY_NAMES, StringUtils.join(hiveDummyDatabasesToIgnore, ','));
+ } else {
+ hiveDummyDatabasesToIgnore = Collections.emptyList();
+ }
+
+ if (hiveTableIgnoreDummyEnabled) {
+ String[] dummyTableNames = applicationProperties.getStringArray(CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_DUMMY_NAMES);
+
+ hiveDummyTablesToIgnore = trimAndPurge(dummyTableNames, DUMMY_TABLE);
+
+ LOG.info("{}={}", CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_DUMMY_NAMES, StringUtils.join(hiveDummyTablesToIgnore, ','));
+ } else {
+ hiveDummyTablesToIgnore = Collections.emptyList();
+ }
+
+ if (hiveTableIgnoreNamePrefixEnabled) {
+ String[] ignoreNamePrefixes = applicationProperties.getStringArray(CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_NAME_PREFIXES);
+
+ hiveTablePrefixesToIgnore = trimAndPurge(ignoreNamePrefixes, VALUES_TMP_TABLE_NAME_PREFIX);
+
+ LOG.info("{}={}", CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_NAME_PREFIXES, StringUtils.join(hiveTablePrefixesToIgnore, ','));
+ } else {
+ hiveTablePrefixesToIgnore = Collections.emptyList();
+ }
+
hiveTypesRemoveOwnedRefAttrs = applicationProperties.getBoolean(CONSUMER_PREPROCESS_HIVE_TYPES_REMOVE_OWNEDREF_ATTRS, true);
rdbmsTypesRemoveOwnedRefAttrs = applicationProperties.getBoolean(CONSUMER_PREPROCESS_RDBMS_TYPES_REMOVE_OWNEDREF_ATTRS, true);
- preprocessEnabled = !hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() || skipHiveColumnLineageHive20633 || hiveTypesRemoveOwnedRefAttrs || rdbmsTypesRemoveOwnedRefAttrs;
+ preprocessEnabled = skipHiveColumnLineageHive20633 || hiveTypesRemoveOwnedRefAttrs || rdbmsTypesRemoveOwnedRefAttrs || !hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() || !hiveDummyDatabasesToIgnore.isEmpty() || !hiveDummyTablesToIgnore.isEmpty() || !hiveTablePrefixesToIgnore.isEmpty();
LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, skipHiveColumnLineageHive20633);
LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, skipHiveColumnLineageHive20633InputsThreshold);
@@ -366,6 +417,26 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
}
}
+ private List<String> trimAndPurge(String[] values, String defaultValue) {
+ final List<String> ret;
+
+ if (values != null && values.length > 0) {
+ ret = new ArrayList<>(values.length);
+
+ for (String val : values) {
+ if (StringUtils.isNotBlank(val)) {
+ ret.add(val.trim());
+ }
+ }
+ } else if (StringUtils.isNotBlank(defaultValue)) {
+ ret = Collections.singletonList(defaultValue.trim());
+ } else {
+ ret = Collections.emptyList();
+ }
+
+ return ret;
+ }
+
static class AdaptiveWaiter {
private final long increment;
private final long maxDuration;
@@ -843,7 +914,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
PreprocessorContext context = null;
if (preprocessEnabled) {
- context = new PreprocessorContext(kafkaMsg, typeRegistry, hiveTablesToIgnore, hiveTablesToPrune, hiveTablesCache, hiveTypesRemoveOwnedRefAttrs, rdbmsTypesRemoveOwnedRefAttrs);
+ context = new PreprocessorContext(kafkaMsg, typeRegistry, hiveTablesToIgnore, hiveTablesToPrune, hiveTablesCache, hiveDummyDatabasesToIgnore, hiveDummyTablesToIgnore, hiveTablePrefixesToIgnore, hiveTypesRemoveOwnedRefAttrs, rdbmsTypesRemoveOwnedRefAttrs);
if (context.isHivePreprocessEnabled()) {
preprocessHiveTypes(context);
@@ -878,7 +949,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
}
if (entities.size() - count > 0) {
- LOG.info("moved {} hive_process/hive_column_lineage entities to end of list (listSize={})", entities.size() - count, entities.size());
+ LOG.info("preprocess: moved {} hive_process/hive_column_lineage entities to end of list (listSize={}). topic-offset={}, partition={}", entities.size() - count, entities.size(), kafkaMsg.getOffset(), kafkaMsg.getPartition());
}
}
}
diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java
index 9b620dd..0cddd41 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java
@@ -31,6 +31,7 @@ public abstract class EntityPreprocessor {
public static final String TYPE_HIVE_COLUMN_LINEAGE = "hive_column_lineage";
public static final String TYPE_HIVE_PROCESS = "hive_process";
public static final String TYPE_HIVE_STORAGEDESC = "hive_storagedesc";
+ public static final String TYPE_HIVE_DB = "hive_db";
public static final String TYPE_HIVE_TABLE = "hive_table";
public static final String TYPE_RDBMS_INSTANCE = "rdbms_instance";
public static final String TYPE_RDBMS_DB = "rdbms_db";
@@ -67,6 +68,7 @@ public abstract class EntityPreprocessor {
static {
EntityPreprocessor[] hivePreprocessors = new EntityPreprocessor[] {
+ new HivePreprocessor.HiveDbPreprocessor(),
new HivePreprocessor.HiveTablePreprocessor(),
new HivePreprocessor.HiveColumnPreprocessor(),
new HivePreprocessor.HiveProcessPreprocessor(),
@@ -114,6 +116,12 @@ public abstract class EntityPreprocessor {
return obj != null ? obj.toString() : null;
}
+ public static String getName(AtlasEntity entity) {
+ Object obj = entity != null ? entity.getAttribute(ATTRIBUTE_NAME) : null;
+
+ return obj != null ? obj.toString() : null;
+ }
+
public String getTypeName(Object obj) {
Object ret = null;
diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java
index cc31032..d31495c 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java
@@ -34,6 +34,23 @@ public class HivePreprocessor {
private static final String RELATIONSHIP_TYPE_HIVE_TABLE_PARTITION_KEYS = "hive_table_partitionkeys";
private static final String RELATIONSHIP_TYPE_HIVE_TABLE_STORAGEDESC = "hive_table_storagedesc";
+ static class HiveDbPreprocessor extends EntityPreprocessor {
+ public HiveDbPreprocessor() {
+ super(TYPE_HIVE_DB);
+ }
+
+ @Override
+ public void preprocess(AtlasEntity entity, PreprocessorContext context) {
+ if (!context.isIgnoredEntity(entity.getGuid())) {
+ PreprocessAction action = context.getPreprocessActionForHiveDb(getName(entity));
+
+ if (action == PreprocessAction.IGNORE) {
+ context.addToIgnoredEntities(entity);
+ }
+ }
+ }
+ }
+
static class HiveTablePreprocessor extends EntityPreprocessor {
public HiveTablePreprocessor() {
super(TYPE_HIVE_TABLE);
@@ -147,20 +164,19 @@ public class HivePreprocessor {
if (context.isIgnoredEntity(entity.getGuid())) {
context.addToIgnoredEntities(entity); // so that this will be logged with typeName and qualifiedName
} else {
- Object inputs = entity.getAttribute(ATTRIBUTE_INPUTS);
- Object outputs = entity.getAttribute(ATTRIBUTE_OUTPUTS);
-
- int inputsCount = (inputs instanceof Collection) ? ((Collection) inputs).size() : 0;
- int outputsCount = (outputs instanceof Collection) ? ((Collection) outputs).size() : 0;
+ Object inputs = entity.getAttribute(ATTRIBUTE_INPUTS);
+ Object outputs = entity.getAttribute(ATTRIBUTE_OUTPUTS);
+ int inputsCount = getCollectionSize(inputs);
+ int outputsCount = getCollectionSize(outputs);
removeIgnoredObjectIds(inputs, context);
removeIgnoredObjectIds(outputs, context);
boolean isInputsEmpty = isEmpty(inputs);
boolean isOutputsEmpty = isEmpty(outputs);
+ boolean isAnyRemoved = inputsCount > getCollectionSize(inputs) || outputsCount > getCollectionSize(outputs);
- // if inputs/outputs became empty due to removal of ignored entities, ignore the process entity as well
- if ((inputsCount > 0 && isInputsEmpty) || (outputsCount > 0 && isOutputsEmpty)) {
+ if (isAnyRemoved && (isInputsEmpty || isOutputsEmpty)) {
context.addToIgnoredEntities(entity);
// since the process entity is ignored, entities referenced by inputs/outputs of this process entity
@@ -186,6 +202,10 @@ public class HivePreprocessor {
}
}
+ private int getCollectionSize(Object obj) {
+ return (obj instanceof Collection) ? ((Collection) obj).size() : 0;
+ }
+
private void removeIgnoredObjectIds(Object obj, PreprocessorContext context) {
if (obj == null || !(obj instanceof Collection)) {
return;
diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
index 2db0574..3c58c9f 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
@@ -41,6 +41,8 @@ import java.util.Set;
import java.util.regex.Pattern;
import static org.apache.atlas.model.instance.AtlasObjectId.KEY_GUID;
+import static org.apache.atlas.notification.preprocessor.EntityPreprocessor.QNAME_SEP_CLUSTER_NAME;
+import static org.apache.atlas.notification.preprocessor.EntityPreprocessor.QNAME_SEP_ENTITY_NAME;
public class PreprocessorContext {
@@ -54,8 +56,12 @@ public class PreprocessorContext {
private final List<Pattern> hiveTablesToIgnore;
private final List<Pattern> hiveTablesToPrune;
private final Map<String, PreprocessAction> hiveTablesCache;
+ private final List<String> hiveDummyDatabasesToIgnore;
+ private final List<String> hiveDummyTablesToIgnore;
+ private final List<String> hiveTablePrefixesToIgnore;
private final boolean hiveTypesRemoveOwnedRefAttrs;
private final boolean rdbmsTypesRemoveOwnedRefAttrs;
+ private final boolean isHivePreProcessEnabled;
private final Set<String> ignoredEntities = new HashSet<>();
private final Set<String> prunedEntities = new HashSet<>();
private final Set<String> referredEntitiesToMove = new HashSet<>();
@@ -64,12 +70,15 @@ public class PreprocessorContext {
private final Map<String, String> guidAssignments = new HashMap<>();
private List<AtlasEntity> postUpdateEntities = null;
- public PreprocessorContext(AtlasKafkaMessage<HookNotification> kafkaMessage, AtlasTypeRegistry typeRegistry, List<Pattern> hiveTablesToIgnore, List<Pattern> hiveTablesToPrune, Map<String, PreprocessAction> hiveTablesCache, boolean hiveTypesRemoveOwnedRefAttrs, boolean rdbmsTypesRemoveOwnedRefAttrs) {
+ public PreprocessorContext(AtlasKafkaMessage<HookNotification> kafkaMessage, AtlasTypeRegistry typeRegistry, List<Pattern> hiveTablesToIgnore, List<Pattern> hiveTablesToPrune, Map<String, PreprocessAction> hiveTablesCache, List<String> hiveDummyDatabasesToIgnore, List<String> hiveDummyTablesToIgnore, List<String> hiveTablePrefixesToIgnore, boolean hiveTypesRemoveOwnedRefAttrs, boolean rdbmsTypesRemoveOwnedRefAttrs) {
this.kafkaMessage = kafkaMessage;
this.typeRegistry = typeRegistry;
this.hiveTablesToIgnore = hiveTablesToIgnore;
this.hiveTablesToPrune = hiveTablesToPrune;
this.hiveTablesCache = hiveTablesCache;
+ this.hiveDummyDatabasesToIgnore = hiveDummyDatabasesToIgnore;
+ this.hiveDummyTablesToIgnore = hiveDummyTablesToIgnore;
+ this.hiveTablePrefixesToIgnore = hiveTablePrefixesToIgnore;
this.hiveTypesRemoveOwnedRefAttrs = hiveTypesRemoveOwnedRefAttrs;
this.rdbmsTypesRemoveOwnedRefAttrs = rdbmsTypesRemoveOwnedRefAttrs;
@@ -88,6 +97,8 @@ public class PreprocessorContext {
entitiesWithExtInfo = null;
break;
}
+
+ this.isHivePreProcessEnabled = hiveTypesRemoveOwnedRefAttrs || !hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() || !hiveDummyDatabasesToIgnore.isEmpty() || !hiveDummyTablesToIgnore.isEmpty() || !hiveTablePrefixesToIgnore.isEmpty();
}
public AtlasKafkaMessage<HookNotification> getKafkaMessage() {
@@ -107,7 +118,7 @@ public class PreprocessorContext {
public boolean getRdbmsTypesRemoveOwnedRefAttrs() { return rdbmsTypesRemoveOwnedRefAttrs; }
public boolean isHivePreprocessEnabled() {
- return !hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() || hiveTypesRemoveOwnedRefAttrs;
+ return isHivePreProcessEnabled;
}
public List<AtlasEntity> getEntities() {
@@ -142,22 +153,78 @@ public class PreprocessorContext {
public List<AtlasEntity> getPostUpdateEntities() { return postUpdateEntities; }
+ public PreprocessAction getPreprocessActionForHiveDb(String dbName) {
+ PreprocessAction ret = PreprocessAction.NONE;
+
+ if (dbName != null) {
+ for (String dummyDbName : hiveDummyDatabasesToIgnore) {
+ if (StringUtils.equalsIgnoreCase(dbName, dummyDbName)) {
+ ret = PreprocessAction.IGNORE;
+
+ break;
+ }
+ }
+ }
+
+ return ret;
+ }
+
public PreprocessAction getPreprocessActionForHiveTable(String qualifiedName) {
PreprocessAction ret = PreprocessAction.NONE;
- if (qualifiedName != null && (CollectionUtils.isNotEmpty(hiveTablesToIgnore) || CollectionUtils.isNotEmpty(hiveTablesToPrune))) {
- ret = hiveTablesCache.get(qualifiedName);
+ if (qualifiedName != null) {
+ if (CollectionUtils.isNotEmpty(hiveTablesToIgnore) || CollectionUtils.isNotEmpty(hiveTablesToPrune)) {
+ ret = hiveTablesCache.get(qualifiedName);
- if (ret == null) {
- if (isMatch(qualifiedName, hiveTablesToIgnore)) {
- ret = PreprocessAction.IGNORE;
- } else if (isMatch(qualifiedName, hiveTablesToPrune)) {
- ret = PreprocessAction.PRUNE;
- } else {
- ret = PreprocessAction.NONE;
+ if (ret == null) {
+ if (isMatch(qualifiedName, hiveTablesToIgnore)) {
+ ret = PreprocessAction.IGNORE;
+ } else if (isMatch(qualifiedName, hiveTablesToPrune)) {
+ ret = PreprocessAction.PRUNE;
+ } else {
+ ret = PreprocessAction.NONE;
+ }
+
+ hiveTablesCache.put(qualifiedName, ret);
}
+ }
+
+ if (ret != PreprocessAction.IGNORE && (CollectionUtils.isNotEmpty(hiveDummyTablesToIgnore) || CollectionUtils.isNotEmpty(hiveTablePrefixesToIgnore))) {
+ String tblName = getHiveTableNameFromQualifiedName(qualifiedName);
- hiveTablesCache.put(qualifiedName, ret);
+ if (tblName != null) {
+ for (String dummyTblName : hiveDummyTablesToIgnore) {
+ if (StringUtils.equalsIgnoreCase(tblName, dummyTblName)) {
+ ret = PreprocessAction.IGNORE;
+
+ break;
+ }
+ }
+
+ if (ret != PreprocessAction.IGNORE) {
+ for (String tableNamePrefix : hiveTablePrefixesToIgnore) {
+ if (StringUtils.startsWithIgnoreCase(tblName, tableNamePrefix)) {
+ ret = PreprocessAction.IGNORE;
+
+ break;
+ }
+ }
+ }
+ }
+
+ if (ret != PreprocessAction.IGNORE && CollectionUtils.isNotEmpty(hiveDummyDatabasesToIgnore)) {
+ String dbName = getHiveDbNameFromQualifiedName(qualifiedName);
+
+ if (dbName != null) {
+ for (String dummyDbName : hiveDummyDatabasesToIgnore) {
+ if (StringUtils.equalsIgnoreCase(dbName, dummyDbName)) {
+ ret = PreprocessAction.IGNORE;
+
+ break;
+ }
+ }
+ }
+ }
}
}
@@ -321,6 +388,36 @@ public class PreprocessorContext {
}
}
+ public String getHiveTableNameFromQualifiedName(String qualifiedName) {
+ String ret = null;
+ int idxStart = qualifiedName.indexOf(QNAME_SEP_ENTITY_NAME) + 1;
+
+ if (idxStart != 0 && qualifiedName.length() > idxStart) {
+ int idxEnd = qualifiedName.indexOf(QNAME_SEP_CLUSTER_NAME, idxStart);
+
+ if (idxEnd != -1) {
+ ret = qualifiedName.substring(idxStart, idxEnd);
+ }
+ }
+
+ return ret;
+ }
+
+ public String getHiveDbNameFromQualifiedName(String qualifiedName) {
+ String ret = null;
+ int idxEnd = qualifiedName.indexOf(QNAME_SEP_ENTITY_NAME); // db.table@cluster, db.table.column@cluster
+
+ if (idxEnd == -1) {
+ idxEnd = qualifiedName.indexOf(QNAME_SEP_CLUSTER_NAME); // db@cluster
+ }
+
+ if (idxEnd != -1) {
+ ret = qualifiedName.substring(0, idxEnd);
+ }
+
+ return ret;
+ }
+
public String getTypeName(Object obj) {
Object ret = null;