You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2019/02/11 14:46:09 UTC

[nifi] branch master updated: NIFI-5985: Added capability for DBCPConnectionPool to use KerberosCredentialsService. Refactored KerberosAction to return a result from execute() Removed usage of ProcessContext.yield() from KerberosAction, which should instead be handled the component using the KerberosCredentialsService. Updated SolrProcessor to yield a flowfile on error, rather than the KerberosAction invoking the yield.

This is an automated email from the ASF dual-hosted git repository.

bbende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 8c8a9b4  NIFI-5985: Added capability for DBCPConnectionPool to use KerberosCredentialsService. Refactored KerberosAction to return a result from execute() Removed usage of ProcessContext.yield() from KerberosAction, which should instead be handled the component using the KerberosCredentialsService. Updated SolrProcessor to yield a flowfile on error, rather than the KerberosAction invoking the yield.
8c8a9b4 is described below

commit 8c8a9b4d53584913881d16771c8ace05a6cf6819
Author: Jeff Storck <jt...@gmail.com>
AuthorDate: Wed Jan 30 19:08:29 2019 -0500

    NIFI-5985: Added capability for DBCPConnectionPool to use KerberosCredentialsService.
    Refactored KerberosAction to return a result from execute()
    Removed usage of ProcessContext.yield() from KerberosAction, which should instead be handled the component using the KerberosCredentialsService.
    Updated SolrProcessor to yield a flowfile on error, rather than the KerberosAction invoking the yield.
    
    NIFI-5985: Updated TestPutSolrContentStream.testUpdateWithKerberosAuth test case to match on PrivilegedExceptionAction instead of PrivilegedAction doAs arguments.
    
    NIFI-5985: Moved kerberosUser logout after closing the datasource in the shutdown method.
    
    NIFI-5985: Removed catching exceptions in DBCPConnectionPool.shutdown
    Exception when closing the datasource is prioritized over an exception when logging out the kerberos principal
    Added GroovyDBCPServiceTest tests to verify prioritizing datasource.close() exception over kerberosUser.logout() exception
    
    This closes #3288.
    
    Signed-off-by: Bryan Bende <bb...@apache.org>
---
 .../apache/nifi/security/krb/KerberosAction.java   | 31 +++++-------
 .../apache/nifi/security/krb/KerberosUserIT.java   |  6 +--
 .../apache/nifi/processors/solr/SolrProcessor.java | 13 +++--
 .../processors/solr/TestPutSolrContentStream.java  | 11 ++--
 .../nifi-dbcp-service/pom.xml                      |  8 ++-
 .../org/apache/nifi/dbcp/DBCPConnectionPool.java   | 55 ++++++++++++++++++--
 .../apache/nifi/dbcp/GroovyDBCPServiceTest.groovy  | 58 ++++++++++++++++++++++
 7 files changed, 146 insertions(+), 36 deletions(-)

diff --git a/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/krb/KerberosAction.java b/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/krb/KerberosAction.java
index bd3e1f9..18c752b 100644
--- a/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/krb/KerberosAction.java
+++ b/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/krb/KerberosAction.java
@@ -18,45 +18,40 @@ package org.apache.nifi.security.krb;
 
 import org.apache.commons.lang3.Validate;
 import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.exception.ProcessException;
 
 import javax.security.auth.login.LoginException;
-import java.security.PrivilegedAction;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
 
 /**
  * Helper class for processors to perform an action as a KerberosUser.
  */
-public class KerberosAction {
+public class KerberosAction<T> {
 
     private final KerberosUser kerberosUser;
-    private final PrivilegedAction action;
-    private final ProcessContext context;
+    private final PrivilegedExceptionAction<T> action;
     private final ComponentLog logger;
 
     public KerberosAction(final KerberosUser kerberosUser,
-                          final PrivilegedAction action,
-                          final ProcessContext context,
+                          final PrivilegedExceptionAction<T> action,
                           final ComponentLog logger) {
         this.kerberosUser = kerberosUser;
         this.action = action;
-        this.context = context;
         this.logger = logger;
         Validate.notNull(this.kerberosUser);
         Validate.notNull(this.action);
-        Validate.notNull(this.context);
         Validate.notNull(this.logger);
     }
 
-    public void execute() {
+    public T execute() {
+        T result;
         // lazily login the first time the processor executes
         if (!kerberosUser.isLoggedIn()) {
             try {
                 kerberosUser.login();
                 logger.info("Successful login for {}", new Object[]{kerberosUser.getPrincipal()});
             } catch (LoginException e) {
-                // make sure to yield so the processor doesn't keep retrying the rolled back flow files immediately
-                context.yield();
                 throw new ProcessException("Login failed due to: " + e.getMessage(), e);
             }
         }
@@ -65,14 +60,12 @@ public class KerberosAction {
         try {
             kerberosUser.checkTGTAndRelogin();
         } catch (LoginException e) {
-            // make sure to yield so the processor doesn't keep retrying the rolled back flow files immediately
-            context.yield();
             throw new ProcessException("Relogin check failed due to: " + e.getMessage(), e);
         }
 
         // attempt to execute the action, if an exception is caught attempt to logout/login and retry
         try {
-            kerberosUser.doAs(action);
+            result = kerberosUser.doAs(action);
         } catch (SecurityException se) {
             logger.info("Privileged action failed, attempting relogin and retrying...");
             logger.debug("", se);
@@ -80,13 +73,15 @@ public class KerberosAction {
             try {
                 kerberosUser.logout();
                 kerberosUser.login();
-                kerberosUser.doAs(action);
+                result = kerberosUser.doAs(action);
             } catch (Exception e) {
-                // make sure to yield so the processor doesn't keep retrying the rolled back flow files immediately
-                context.yield();
                 throw new ProcessException("Retrying privileged action failed due to: " + e.getMessage(), e);
             }
+        } catch (PrivilegedActionException e) {
+            throw new ProcessException("Privileged action failed due to: " + e.getMessage(), e.getException());
         }
+
+        return result;
     }
 
 }
diff --git a/nifi-commons/nifi-security-utils/src/test/java/org/apache/nifi/security/krb/KerberosUserIT.java b/nifi-commons/nifi-security-utils/src/test/java/org/apache/nifi/security/krb/KerberosUserIT.java
index e636008..5a390d6 100644
--- a/nifi-commons/nifi-security-utils/src/test/java/org/apache/nifi/security/krb/KerberosUserIT.java
+++ b/nifi-commons/nifi-security-utils/src/test/java/org/apache/nifi/security/krb/KerberosUserIT.java
@@ -29,7 +29,7 @@ import javax.security.auth.kerberos.KerberosPrincipal;
 import javax.security.auth.login.LoginException;
 import java.io.File;
 import java.security.Principal;
-import java.security.PrivilegedAction;
+import java.security.PrivilegedExceptionAction;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -174,7 +174,7 @@ public class KerberosUserIT {
         final KerberosUser user1 = new KerberosKeytabUser(principal1.getName(), principal1KeytabFile.getAbsolutePath());
 
         final AtomicReference<String> resultHolder = new AtomicReference<>(null);
-        final PrivilegedAction privilegedAction = () -> {
+        final PrivilegedExceptionAction<Void> privilegedAction = () -> {
             resultHolder.set("SUCCESS");
             return null;
         };
@@ -183,7 +183,7 @@ public class KerberosUserIT {
         final ComponentLog logger = Mockito.mock(ComponentLog.class);
 
         // create the action to test and execute it
-        final KerberosAction kerberosAction = new KerberosAction(user1, privilegedAction, context, logger);
+        final KerberosAction kerberosAction = new KerberosAction<>(user1, privilegedAction, logger);
         kerberosAction.execute();
 
         // if the result holder has the string success then we know the action executed
diff --git a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrProcessor.java b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrProcessor.java
index 4a193b6..79f397f 100755
--- a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrProcessor.java
+++ b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrProcessor.java
@@ -36,7 +36,7 @@ import org.apache.solr.client.solrj.SolrClient;
 
 import javax.security.auth.login.LoginException;
 import java.io.IOException;
-import java.security.PrivilegedAction;
+import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
@@ -113,14 +113,19 @@ public abstract class SolrProcessor extends AbstractProcessor {
             doOnTrigger(context, session);
         } else {
             // wrap doOnTrigger in a privileged action
-            final PrivilegedAction action = () -> {
+            final PrivilegedExceptionAction<Void> action = () -> {
                 doOnTrigger(context, session);
                 return null;
             };
 
             // execute the privileged action as the given keytab user
-            final KerberosAction kerberosAction = new KerberosAction(kerberosUser, action, context, getLogger());
-            kerberosAction.execute();
+            final KerberosAction kerberosAction = new KerberosAction<>(kerberosUser, action, getLogger());
+            try {
+                kerberosAction.execute();
+            } catch (ProcessException e) {
+                context.yield();
+                throw e;
+            }
         }
     }
 
diff --git a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestPutSolrContentStream.java b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestPutSolrContentStream.java
index e3c3b79..d41f89a 100755
--- a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestPutSolrContentStream.java
+++ b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestPutSolrContentStream.java
@@ -44,7 +44,8 @@ import javax.net.ssl.SSLContext;
 import javax.security.auth.login.LoginException;
 import java.io.FileInputStream;
 import java.io.IOException;
-import java.security.PrivilegedAction;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Date;
@@ -459,7 +460,7 @@ public class TestPutSolrContentStream {
     }
 
     @Test
-    public void testUpdateWithKerberosAuth() throws IOException, InitializationException, LoginException {
+    public void testUpdateWithKerberosAuth() throws IOException, InitializationException, LoginException, PrivilegedActionException {
         final String principal = "nifi@FOO.COM";
         final String keytab = "src/test/resources/foo.keytab";
 
@@ -467,8 +468,8 @@ public class TestPutSolrContentStream {
         final KerberosKeytabUser kerberosUser = Mockito.mock(KerberosKeytabUser.class);
         when(kerberosUser.getPrincipal()).thenReturn(principal);
         when(kerberosUser.getKeytabFile()).thenReturn(keytab);
-        when(kerberosUser.doAs(any(PrivilegedAction.class))).thenAnswer((invocation -> {
-                    final PrivilegedAction action = (PrivilegedAction) invocation.getArguments()[0];
+        when(kerberosUser.doAs(any(PrivilegedExceptionAction.class))).thenAnswer((invocation -> {
+                    final PrivilegedExceptionAction action = (PrivilegedExceptionAction) invocation.getArguments()[0];
                     action.run();
                     return null;
                 })
@@ -502,7 +503,7 @@ public class TestPutSolrContentStream {
         // Verify that during the update the user was logged in, TGT was checked, and the action was executed
         verify(kerberosUser, times(1)).login();
         verify(kerberosUser, times(1)).checkTGTAndRelogin();
-        verify(kerberosUser, times(1)).doAs(any(PrivilegedAction.class));
+        verify(kerberosUser, times(1)).doAs(any(PrivilegedExceptionAction.class));
     }
 
 
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/pom.xml
index 82d5b8d..0640582 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/pom.xml
@@ -44,6 +44,12 @@
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-kerberos-credentials-service-api</artifactId>
+            <version>1.9.0-SNAPSHOT</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-mock</artifactId>
             <version>1.9.0-SNAPSHOT</version>
             <scope>test</scope>
@@ -57,7 +63,7 @@
             <groupId>org.apache.derby</groupId>
             <artifactId>derby</artifactId>
             <version>10.11.1.1</version>
-        </dependency>        
+        </dependency>
         <dependency>
             <groupId>org.apache.derby</groupId>
             <artifactId>derbynet</artifactId>
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPool.java b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPool.java
index d6f83f2..6dd24d1 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPool.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPool.java
@@ -32,12 +32,16 @@ import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.expression.AttributeExpression;
 import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.kerberos.KerberosCredentialsService;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.security.krb.KerberosAction;
+import org.apache.nifi.security.krb.KerberosKeytabUser;
 import org.apache.nifi.util.FormatUtils;
 import org.apache.nifi.util.file.classloader.ClassLoaderUtils;
 
+import javax.security.auth.login.LoginException;
 import java.net.MalformedURLException;
 import java.sql.Connection;
 import java.sql.Driver;
@@ -263,6 +267,14 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC
             .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .build();
 
+    public static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder()
+            .name("kerberos-credentials-service")
+            .displayName("Kerberos Credentials Service")
+            .description("Specifies the Kerberos Credentials Controller Service that should be used for authenticating with Kerberos")
+            .identifiesControllerService(KerberosCredentialsService.class)
+            .required(false)
+            .build();
+
     private static final List<PropertyDescriptor> properties;
 
     static {
@@ -270,6 +282,7 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC
         props.add(DATABASE_URL);
         props.add(DB_DRIVERNAME);
         props.add(DB_DRIVER_LOCATION);
+        props.add(KERBEROS_CREDENTIALS_SERVICE);
         props.add(DB_USER);
         props.add(DB_PASSWORD);
         props.add(MAX_WAIT_TIME);
@@ -286,6 +299,7 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC
     }
 
     private volatile BasicDataSource dataSource;
+    private volatile KerberosKeytabUser kerberosUser;
 
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@@ -333,6 +347,16 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC
         final Long timeBetweenEvictionRunsMillis = extractMillisWithInfinite(context.getProperty(EVICTION_RUN_PERIOD));
         final Long minEvictableIdleTimeMillis = extractMillisWithInfinite(context.getProperty(MIN_EVICTABLE_IDLE_TIME));
         final Long softMinEvictableIdleTimeMillis = extractMillisWithInfinite(context.getProperty(SOFT_MIN_EVICTABLE_IDLE_TIME));
+        final KerberosCredentialsService kerberosCredentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
+
+        if (kerberosCredentialsService != null) {
+            kerberosUser = new KerberosKeytabUser(kerberosCredentialsService.getPrincipal(), kerberosCredentialsService.getKeytab());
+            try {
+                kerberosUser.login();
+            } catch (LoginException e) {
+                throw new InitializationException("Unable to authenticate Kerberos principal", e);
+            }
+        }
 
         dataSource = new BasicDataSource();
         dataSource.setDriverClassName(drv);
@@ -410,20 +434,41 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC
 
     /**
      * Shutdown pool, close all open connections.
+     * If a principal is authenticated with a KDC, that principal is logged out.
+     *
+     * If a @{@link LoginException} occurs while attempting to log out the @{@link org.apache.nifi.security.krb.KerberosUser},
+     * an attempt will still be made to shut down the pool and close open connections.
+     *
+     * @throws SQLException if there is an error while closing open connections
+     * @throws LoginException if there is an error during the principal log out, and will only be thrown if there was
+     * no exception while closing open connections
      */
     @OnDisabled
-    public void shutdown() {
+    public void shutdown() throws SQLException, LoginException {
         try {
-            dataSource.close();
-        } catch (final SQLException e) {
-            throw new ProcessException(e);
+            if (kerberosUser != null) {
+                kerberosUser.logout();
+            }
+        } finally {
+            kerberosUser = null;
+            try {
+                dataSource.close();
+            } finally {
+                dataSource = null;
+            }
         }
     }
 
     @Override
     public Connection getConnection() throws ProcessException {
         try {
-            final Connection con = dataSource.getConnection();
+            final Connection con;
+            if (kerberosUser != null) {
+                KerberosAction<Connection> kerberosAction = new KerberosAction<>(kerberosUser, () -> dataSource.getConnection(), getLogger());
+                con = kerberosAction.execute();
+            } else {
+                con = dataSource.getConnection();
+            }
             return con;
         } catch (final SQLException e) {
             throw new ProcessException(e);
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/groovy/org/apache/nifi/dbcp/GroovyDBCPServiceTest.groovy b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/groovy/org/apache/nifi/dbcp/GroovyDBCPServiceTest.groovy
index 9da4b22..cb267db 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/groovy/org/apache/nifi/dbcp/GroovyDBCPServiceTest.groovy
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/groovy/org/apache/nifi/dbcp/GroovyDBCPServiceTest.groovy
@@ -16,13 +16,16 @@
  */
 package org.apache.nifi.dbcp
 
+import org.apache.commons.dbcp2.BasicDataSource
 import org.apache.nifi.reporting.InitializationException
+import org.apache.nifi.security.krb.KerberosKeytabUser
 import org.apache.nifi.util.TestRunner
 import org.apache.nifi.util.TestRunners
 import org.junit.Assert
 import org.junit.BeforeClass
 import org.junit.Test
 
+import javax.security.auth.login.LoginException
 import java.sql.Connection
 import java.sql.SQLException
 
@@ -72,4 +75,59 @@ class GroovyDBCPServiceTest {
             connection.close() // will return connection to pool
         }
     }
+
+    @Test(expected = LoginException)
+    void testDatasourceCloseSuccessWithKerberosUserLogoutException() {
+        final DBCPConnectionPool dbcpConnectionPoolService = new DBCPConnectionPool()
+
+        def basicDataSource = [close: { -> }] as BasicDataSource
+        dbcpConnectionPoolService.dataSource = basicDataSource
+        def kerberosKeytabUser = new KerberosKeytabUser("bad@PRINCIPAL.COM", "fake.keytab") {
+            @Override
+            void logout() throws LoginException {
+                throw new LoginException("fake logout exception")
+            }
+        }
+        dbcpConnectionPoolService.kerberosUser = kerberosKeytabUser
+
+        dbcpConnectionPoolService.shutdown()
+
+    }
+
+    @Test(expected = SQLException)
+    void testDatasourceCloseExceptionWithKerberosUserLogoutSuccess() {
+        final DBCPConnectionPool dbcpConnectionPoolService = new DBCPConnectionPool()
+
+        def basicDataSource = [
+                close: { -> throw new SQLException("fake sql exception")
+                }] as BasicDataSource
+        dbcpConnectionPoolService.dataSource = basicDataSource
+        def kerberosKeytabUser = new KerberosKeytabUser("bad@PRINCIPAL.COM", "fake.keytab") {
+            @Override
+            void logout() throws LoginException {
+            }
+        }
+        dbcpConnectionPoolService.kerberosUser = kerberosKeytabUser
+
+        dbcpConnectionPoolService.shutdown()
+    }
+
+    @Test(expected = SQLException)
+    void testDatasourceCloseExceptionWithKerberosUserLogoutException() {
+        final DBCPConnectionPool dbcpConnectionPoolService = new DBCPConnectionPool()
+
+        def basicDataSource = [
+                close: { -> throw new SQLException("fake sql exception")
+        }] as BasicDataSource
+        dbcpConnectionPoolService.dataSource = basicDataSource
+        def kerberosKeytabUser = new KerberosKeytabUser("bad@PRINCIPAL.COM", "fake.keytab") {
+            @Override
+            void logout() throws LoginException {
+                throw new LoginException("fake logout exception")
+            }
+        }
+        dbcpConnectionPoolService.kerberosUser = kerberosKeytabUser
+
+        dbcpConnectionPoolService.shutdown()
+    }
 }