You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2019/12/01 16:40:22 UTC
[atlas] branch branch-2.0 updated: ATLAS-3261: added option to
authorize notifications using username given in the message
This is an automated email from the ASF dual-hosted git repository.
madhan pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 76d0276 ATLAS-3261: added option to authorize notifications using username given in the message
76d0276 is described below
commit 76d0276c0f8859661b5af6a10d7c09bda1aee022
Author: arempter <ar...@gmail.com>
AuthorDate: Tue Nov 26 18:50:36 2019 -0800
ATLAS-3261: added option to authorize notifications using username given in the message
Signed-off-by: Madhan Neethiraj <ma...@apache.org>
(cherry picked from commit 48df6544f92df81dc49820b8cd2900666cb14e0a)
---
pom.xml | 7 +++
webapp/pom.xml | 5 ++
.../notification/NotificationHookConsumer.java | 57 ++++++++++++++++++++++
3 files changed, 69 insertions(+)
diff --git a/pom.xml b/pom.xml
index 33fb862..533df28 100644
--- a/pom.xml
+++ b/pom.xml
@@ -698,6 +698,7 @@
<commons-conf.version>1.10</commons-conf.version>
<commons-conf2.version>2.2</commons-conf2.version>
<commons-collections.version>3.2.2</commons-collections.version>
+ <commons-collections4.version>4.4</commons-collections4.version>
<commons-logging.version>1.1.3</commons-logging.version>
<commons-lang.version>2.6</commons-lang.version>
<commons-validator.version>1.6</commons-validator.version>
@@ -1095,6 +1096,12 @@
<version>${commons-collections.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-collections4</artifactId>
+ <version>${commons-collections4.version}</version>
+ </dependency>
+
<!--Javax inject-->
<dependency>
<groupId>javax.inject</groupId>
diff --git a/webapp/pom.xml b/webapp/pom.xml
index 96cfe5a..45b188a 100755
--- a/webapp/pom.xml
+++ b/webapp/pom.xml
@@ -229,6 +229,11 @@
</dependency>
<dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-collections4</artifactId>
+ </dependency>
+
+ <dependency>
<groupId>com.googlecode.json-simple</groupId>
<artifactId>json-simple</artifactId>
</dependency>
diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 41a6c2e..14cae42 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -64,12 +64,19 @@ import org.apache.atlas.web.filters.AuditFilter.AuditLog;
import org.apache.atlas.web.service.ServiceState;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
+import org.apache.commons.collections4.map.PassiveExpiringMap;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.DependsOn;
import org.springframework.core.annotation.Order;
+import org.springframework.security.authentication.UsernamePasswordAuthenticationToken;
+import org.springframework.security.core.Authentication;
+import org.springframework.security.core.GrantedAuthority;
+import org.springframework.security.core.context.SecurityContextHolder;
+import org.springframework.security.core.userdetails.User;
+import org.springframework.security.core.userdetails.UserDetails;
import org.springframework.stereotype.Component;
import javax.inject.Inject;
@@ -90,6 +97,7 @@ import java.util.regex.Pattern;
import static org.apache.atlas.model.instance.AtlasObjectId.*;
import static org.apache.atlas.notification.preprocessor.EntityPreprocessor.TYPE_HIVE_PROCESS;
+import static org.apache.atlas.web.security.AtlasAbstractAuthenticationProvider.getAuthoritiesFromUGI;
/**
@@ -140,6 +148,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
public static final String CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_NAME_PREFIXES = "atlas.notification.consumer.preprocess.hive_table.ignore.name.prefixes";
public static final String CONSUMER_PREPROCESS_HIVE_TYPES_REMOVE_OWNEDREF_ATTRS = "atlas.notification.consumer.preprocess.hive_types.remove.ownedref.attrs";
public static final String CONSUMER_PREPROCESS_RDBMS_TYPES_REMOVE_OWNEDREF_ATTRS = "atlas.notification.consumer.preprocess.rdbms_types.remove.ownedref.attrs";
+ public static final String CONSUMER_AUTHORIZE_USING_MESSAGE_USER = "atlas.notification.authorize.using.message.user";
+ public static final String CONSUMER_AUTHORIZE_AUTHN_CACHE_TTL_SECONDS = "atlas.notification.authorize.authn.cache.ttl.seconds";
public static final int SERVER_READY_WAIT_TIME_MS = 1000;
@@ -167,6 +177,9 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
private final boolean rdbmsTypesRemoveOwnedRefAttrs;
private final boolean preprocessEnabled;
private final boolean createShellEntityForNonExistingReference;
+ private final boolean authorizeUsingMessageUser;
+ private final Map<String, Authentication> authnCache;
+
private final NotificationInterface notificationInterface;
private final Configuration applicationProperties;
private ExecutorService executors;
@@ -202,6 +215,11 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
consumerDisabled = applicationProperties.getBoolean(CONSUMER_DISABLED, false);
largeMessageProcessingTimeThresholdMs = applicationProperties.getInt("atlas.notification.consumer.large.message.processing.time.threshold.ms", 60 * 1000); // 60 sec by default
createShellEntityForNonExistingReference = AtlasConfiguration.NOTIFICATION_CREATE_SHELL_ENTITY_FOR_NON_EXISTING_REF.getBoolean();
+ authorizeUsingMessageUser = applicationProperties.getBoolean(CONSUMER_AUTHORIZE_USING_MESSAGE_USER, false);
+
+ int authnCacheTtlSeconds = applicationProperties.getInt(CONSUMER_AUTHORIZE_AUTHN_CACHE_TTL_SECONDS, 300);
+
+ authnCache = (authorizeUsingMessageUser && authnCacheTtlSeconds > 0) ? new PassiveExpiringMap<>(authnCacheTtlSeconds * 1000) : null;
String[] patternHiveTablesToIgnore = applicationProperties.getStringArray(CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_PATTERN);
String[] patternHiveTablesToPrune = applicationProperties.getStringArray(CONSUMER_PREPROCESS_HIVE_TABLE_PRUNE_PATTERN);
@@ -552,6 +570,10 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
NotificationStat stats = new NotificationStat();
AuditLog auditLog = null;
+ if (authorizeUsingMessageUser) {
+ setCurrentUser(messageUser);
+ }
+
if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, message.getType().name());
}
@@ -1225,6 +1247,41 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
}
}
+ private void setCurrentUser(String userName) {
+ Authentication authentication = getAuthenticationForUser(userName);
+
+ if (LOG.isDebugEnabled()) {
+ if (authentication != null) {
+ LOG.debug("setCurrentUser(): notification processing will be authorized as user '{}'", userName);
+ } else {
+ LOG.debug("setCurrentUser(): Failed to get authentication for user '{}'.", userName);
+ }
+ }
+
+ SecurityContextHolder.getContext().setAuthentication(authentication);
+ }
+
+ private Authentication getAuthenticationForUser(String userName) {
+ Authentication ret = null;
+
+ if (StringUtils.isNotBlank(userName)) {
+ ret = authnCache != null ? authnCache.get(userName) : null;
+
+ if (ret == null) {
+ List<GrantedAuthority> grantedAuths = getAuthoritiesFromUGI(userName);
+ UserDetails principal = new User(userName, "", grantedAuths);
+
+ ret = new UsernamePasswordAuthenticationToken(principal, "");
+
+ if (authnCache != null) {
+ authnCache.put(userName, ret);
+ }
+ }
+ }
+
+ return ret;
+ }
+
static class FailedCommitOffsetRecorder {
private Long currentOffset;