You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2019/12/01 16:40:22 UTC

[atlas] branch branch-2.0 updated: ATLAS-3261: added option to authorize notifications using username given in the message

This is an automated email from the ASF dual-hosted git repository.

madhan pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 76d0276  ATLAS-3261: added option to authorize notifications using username given in the message
76d0276 is described below

commit 76d0276c0f8859661b5af6a10d7c09bda1aee022
Author: arempter <ar...@gmail.com>
AuthorDate: Tue Nov 26 18:50:36 2019 -0800

    ATLAS-3261: added option to authorize notifications using username given in the message
    
    Signed-off-by: Madhan Neethiraj <ma...@apache.org>
    (cherry picked from commit 48df6544f92df81dc49820b8cd2900666cb14e0a)
---
 pom.xml                                            |  7 +++
 webapp/pom.xml                                     |  5 ++
 .../notification/NotificationHookConsumer.java     | 57 ++++++++++++++++++++++
 3 files changed, 69 insertions(+)

diff --git a/pom.xml b/pom.xml
index 33fb862..533df28 100644
--- a/pom.xml
+++ b/pom.xml
@@ -698,6 +698,7 @@
         <commons-conf.version>1.10</commons-conf.version>
         <commons-conf2.version>2.2</commons-conf2.version>
         <commons-collections.version>3.2.2</commons-collections.version>
+        <commons-collections4.version>4.4</commons-collections4.version>
         <commons-logging.version>1.1.3</commons-logging.version>
         <commons-lang.version>2.6</commons-lang.version>
         <commons-validator.version>1.6</commons-validator.version>
@@ -1095,6 +1096,12 @@
                 <version>${commons-collections.version}</version>
             </dependency>
 
+            <dependency>
+                <groupId>org.apache.commons</groupId>
+                <artifactId>commons-collections4</artifactId>
+                <version>${commons-collections4.version}</version>
+            </dependency>
+
             <!--Javax inject-->
             <dependency>
                 <groupId>javax.inject</groupId>
diff --git a/webapp/pom.xml b/webapp/pom.xml
index 96cfe5a..45b188a 100755
--- a/webapp/pom.xml
+++ b/webapp/pom.xml
@@ -229,6 +229,11 @@
         </dependency>
 
         <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-collections4</artifactId>
+        </dependency>
+
+        <dependency>
             <groupId>com.googlecode.json-simple</groupId>
             <artifactId>json-simple</artifactId>
         </dependency>
diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 41a6c2e..14cae42 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -64,12 +64,19 @@ import org.apache.atlas.web.filters.AuditFilter.AuditLog;
 import org.apache.atlas.web.service.ServiceState;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.MapUtils;
+import org.apache.commons.collections4.map.PassiveExpiringMap;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.context.annotation.DependsOn;
 import org.springframework.core.annotation.Order;
+import org.springframework.security.authentication.UsernamePasswordAuthenticationToken;
+import org.springframework.security.core.Authentication;
+import org.springframework.security.core.GrantedAuthority;
+import org.springframework.security.core.context.SecurityContextHolder;
+import org.springframework.security.core.userdetails.User;
+import org.springframework.security.core.userdetails.UserDetails;
 import org.springframework.stereotype.Component;
 
 import javax.inject.Inject;
@@ -90,6 +97,7 @@ import java.util.regex.Pattern;
 
 import static org.apache.atlas.model.instance.AtlasObjectId.*;
 import static org.apache.atlas.notification.preprocessor.EntityPreprocessor.TYPE_HIVE_PROCESS;
+import static org.apache.atlas.web.security.AtlasAbstractAuthenticationProvider.getAuthoritiesFromUGI;
 
 
 /**
@@ -140,6 +148,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
     public static final String CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_NAME_PREFIXES           = "atlas.notification.consumer.preprocess.hive_table.ignore.name.prefixes";
     public static final String CONSUMER_PREPROCESS_HIVE_TYPES_REMOVE_OWNEDREF_ATTRS          = "atlas.notification.consumer.preprocess.hive_types.remove.ownedref.attrs";
     public static final String CONSUMER_PREPROCESS_RDBMS_TYPES_REMOVE_OWNEDREF_ATTRS         = "atlas.notification.consumer.preprocess.rdbms_types.remove.ownedref.attrs";
+    public static final String CONSUMER_AUTHORIZE_USING_MESSAGE_USER                         = "atlas.notification.authorize.using.message.user";
+    public static final String CONSUMER_AUTHORIZE_AUTHN_CACHE_TTL_SECONDS                    = "atlas.notification.authorize.authn.cache.ttl.seconds";
 
     public static final int SERVER_READY_WAIT_TIME_MS = 1000;
 
@@ -167,6 +177,9 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
     private final boolean                       rdbmsTypesRemoveOwnedRefAttrs;
     private final boolean                       preprocessEnabled;
     private final boolean createShellEntityForNonExistingReference;
+    private final boolean                       authorizeUsingMessageUser;
+    private final Map<String, Authentication>   authnCache;
+
     private final NotificationInterface         notificationInterface;
     private final Configuration                 applicationProperties;
     private       ExecutorService               executors;
@@ -202,6 +215,11 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
         consumerDisabled                              = applicationProperties.getBoolean(CONSUMER_DISABLED, false);
         largeMessageProcessingTimeThresholdMs         = applicationProperties.getInt("atlas.notification.consumer.large.message.processing.time.threshold.ms", 60 * 1000);  //  60 sec by default
         createShellEntityForNonExistingReference      = AtlasConfiguration.NOTIFICATION_CREATE_SHELL_ENTITY_FOR_NON_EXISTING_REF.getBoolean();
+        authorizeUsingMessageUser                     = applicationProperties.getBoolean(CONSUMER_AUTHORIZE_USING_MESSAGE_USER, false);
+
+        int authnCacheTtlSeconds = applicationProperties.getInt(CONSUMER_AUTHORIZE_AUTHN_CACHE_TTL_SECONDS, 300);
+
+        authnCache = (authorizeUsingMessageUser && authnCacheTtlSeconds > 0) ? new PassiveExpiringMap<>(authnCacheTtlSeconds * 1000) : null;
 
         String[] patternHiveTablesToIgnore = applicationProperties.getStringArray(CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_PATTERN);
         String[] patternHiveTablesToPrune  = applicationProperties.getStringArray(CONSUMER_PREPROCESS_HIVE_TABLE_PRUNE_PATTERN);
@@ -552,6 +570,10 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
             NotificationStat stats          = new NotificationStat();
             AuditLog         auditLog       = null;
 
+            if (authorizeUsingMessageUser) {
+                setCurrentUser(messageUser);
+            }
+
             if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
                 perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, message.getType().name());
             }
@@ -1225,6 +1247,41 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
         }
     }
 
+    private void setCurrentUser(String userName) {
+        Authentication authentication = getAuthenticationForUser(userName);
+
+        if (LOG.isDebugEnabled()) {
+            if (authentication != null) {
+                LOG.debug("setCurrentUser(): notification processing will be authorized as user '{}'", userName);
+            } else {
+                LOG.debug("setCurrentUser(): Failed to get authentication for user '{}'.", userName);
+            }
+        }
+
+        SecurityContextHolder.getContext().setAuthentication(authentication);
+    }
+
+    private Authentication getAuthenticationForUser(String userName) {
+        Authentication ret = null;
+
+        if (StringUtils.isNotBlank(userName)) {
+            ret = authnCache != null ? authnCache.get(userName) : null;
+
+            if (ret == null) {
+                List<GrantedAuthority> grantedAuths = getAuthoritiesFromUGI(userName);
+                UserDetails            principal    = new User(userName, "", grantedAuths);
+
+                ret = new UsernamePasswordAuthenticationToken(principal, "");
+
+                if (authnCache != null) {
+                    authnCache.put(userName, ret);
+                }
+            }
+        }
+
+        return ret;
+    }
+
     static class FailedCommitOffsetRecorder {
         private Long currentOffset;