You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tu...@apache.org on 2020/05/26 16:06:29 UTC

[nifi] branch master updated: NIFI-7453 In PutKudu creating a new Kudu client when refreshing TGT

This is an automated email from the ASF dual-hosted git repository.

turcsanyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new ca65bba  NIFI-7453 In PutKudu creating a new Kudu client when refreshing TGT
ca65bba is described below

commit ca65bba5d720550aab97fcfc58be46e1b77001d3
Author: Tamas Palfy <ta...@gmail.com>
AuthorDate: Fri May 15 13:48:54 2020 +0200

    NIFI-7453 In PutKudu creating a new Kudu client when refreshing TGT
    
    NIFI-7453 Creating a new Kudu client when refreshing TGT in KerberosPasswordUser as well. (Applied to KerberosKeytabUser only before.)
    NIFI-7453 Safely closing old Kudu client before creating a new one.
    NIFI-7453 Visibility adjustment.
    
    This closes #4276.
    
    Signed-off-by: Peter Turcsanyi <tu...@apache.org>
---
 .../processors/kudu/AbstractKuduProcessor.java     | 95 +++++++++++++++++-----
 .../org/apache/nifi/processors/kudu/PutKudu.java   | 15 ++--
 .../apache/nifi/processors/kudu/MockPutKudu.java   | 12 +--
 3 files changed, 90 insertions(+), 32 deletions(-)

diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
index 7561ee4..c41c8d5 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
@@ -44,6 +44,8 @@ import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.kerberos.KerberosCredentialsService;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.security.krb.KerberosAction;
 import org.apache.nifi.security.krb.KerberosKeytabUser;
@@ -62,6 +64,9 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 public abstract class AbstractKuduProcessor extends AbstractProcessor {
 
@@ -120,40 +125,70 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor {
             .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .build();
 
-    protected KuduClient kuduClient;
+    private volatile KuduClient kuduClient;
+    private final ReadWriteLock kuduClientReadWriteLock = new ReentrantReadWriteLock();
+    private final Lock kuduClientReadLock = kuduClientReadWriteLock.readLock();
+    private final Lock kuduClientWriteLock = kuduClientReadWriteLock.writeLock();
 
     private volatile KerberosUser kerberosUser;
 
-    public KerberosUser getKerberosUser() {
+    protected abstract void onTrigger(ProcessContext context, ProcessSession session, KuduClient kuduClient) throws ProcessException;
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        kuduClientReadLock.lock();
+        try {
+            onTrigger(context, session, kuduClient);
+        } finally {
+            kuduClientReadLock.unlock();
+        }
+    }
+
+    protected KerberosUser getKerberosUser() {
         return this.kerberosUser;
     }
 
-    public KuduClient getKuduClient() {
-        return this.kuduClient;
+    protected void createKerberosUserAndKuduClient(ProcessContext context) throws LoginException {
+        createKerberosUser(context);
+        createKuduClient(context);
     }
 
-    public void createKuduClient(ProcessContext context) throws LoginException {
-        final String kuduMasters = context.getProperty(KUDU_MASTERS).evaluateAttributeExpressions().getValue();
+    protected void createKerberosUser(ProcessContext context) throws LoginException {
         final KerberosCredentialsService credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
         final String kerberosPrincipal = context.getProperty(KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue();
         final String kerberosPassword = context.getProperty(KERBEROS_PASSWORD).getValue();
 
         if (credentialsService != null) {
-            kerberosUser = loginKerberosKeytabUser(credentialsService.getPrincipal(), credentialsService.getKeytab());
+            kerberosUser = loginKerberosKeytabUser(credentialsService.getPrincipal(), credentialsService.getKeytab(), context);
         } else if (!StringUtils.isBlank(kerberosPrincipal) && !StringUtils.isBlank(kerberosPassword)) {
-            kerberosUser = loginKerberosPasswordUser(kerberosPrincipal, kerberosPassword);
+            kerberosUser = loginKerberosPasswordUser(kerberosPrincipal, kerberosPassword, context);
         }
+    }
 
-        if (kerberosUser != null) {
-            final KerberosAction<KuduClient> kerberosAction = new KerberosAction<>(kerberosUser, () -> buildClient(kuduMasters, context), getLogger());
-            this.kuduClient = kerberosAction.execute();
-        } else {
-            this.kuduClient = buildClient(kuduMasters, context);
+    protected void createKuduClient(ProcessContext context) {
+        kuduClientWriteLock.lock();
+        try {
+            if (this.kuduClient != null) {
+                try {
+                    this.kuduClient.close();
+                } catch (KuduException e) {
+                    getLogger().error("Couldn't close Kudu client.");
+                }
+            }
+
+            if (kerberosUser != null) {
+                final KerberosAction<KuduClient> kerberosAction = new KerberosAction<>(kerberosUser, () -> buildClient(context), getLogger());
+                this.kuduClient = kerberosAction.execute();
+            } else {
+                this.kuduClient = buildClient(context);
+            }
+        } finally {
+            kuduClientWriteLock.unlock();
         }
     }
 
-
-    protected KuduClient buildClient(final String masters, final ProcessContext context) {
+    protected KuduClient buildClient(final ProcessContext context) {
+        final String masters = context.getProperty(KUDU_MASTERS).evaluateAttributeExpressions().getValue();
         final Integer operationTimeout = context.getProperty(KUDU_OPERATION_TIMEOUT_MS).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue();
         final Integer adminOperationTimeout = context.getProperty(KUDU_KEEP_ALIVE_PERIOD_TIMEOUT_MS).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue();
 
@@ -176,14 +211,36 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor {
         }
     }
 
-    protected KerberosUser loginKerberosKeytabUser(final String principal, final String keytab) throws LoginException {
-        final KerberosUser kerberosUser = new KerberosKeytabUser(principal, keytab);
+    protected KerberosUser loginKerberosKeytabUser(final String principal, final String keytab, ProcessContext context) throws LoginException {
+        final KerberosUser kerberosUser = new KerberosKeytabUser(principal, keytab) {
+            @Override
+            public synchronized boolean checkTGTAndRelogin() throws LoginException {
+                boolean didRelogin = super.checkTGTAndRelogin();
+
+                if (didRelogin) {
+                    createKuduClient(context);
+                }
+
+                return didRelogin;
+            }
+        };
         kerberosUser.login();
         return kerberosUser;
     }
 
-    protected KerberosUser loginKerberosPasswordUser(final String principal, final String password) throws LoginException {
-        final KerberosUser kerberosUser = new KerberosPasswordUser(principal, password);
+    protected KerberosUser loginKerberosPasswordUser(final String principal, final String password, ProcessContext context) throws LoginException {
+        final KerberosUser kerberosUser = new KerberosPasswordUser(principal, password) {
+            @Override
+            public synchronized boolean checkTGTAndRelogin() throws LoginException {
+                boolean didRelogin = super.checkTGTAndRelogin();
+
+                if (didRelogin) {
+                    createKuduClient(context);
+                }
+
+                return didRelogin;
+            }
+        };
         kerberosUser.login();
         return kerberosUser;
     }
diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
index 1893c64..2ac0195 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
@@ -260,11 +260,11 @@ public class PutKudu extends AbstractKuduProcessor {
         batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger();
         ffbatch   = context.getProperty(FLOWFILE_BATCH_SIZE).evaluateAttributeExpressions().asInteger();
         flushMode = SessionConfiguration.FlushMode.valueOf(context.getProperty(FLUSH_MODE).getValue().toUpperCase());
-        createKuduClient(context);
+        createKerberosUserAndKuduClient(context);
     }
 
     @Override
-    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+    protected void onTrigger(final ProcessContext context, final ProcessSession session, KuduClient kuduClient) throws ProcessException {
         final List<FlowFile> flowFiles = session.get(ffbatch);
         if (flowFiles.isEmpty()) {
             return;
@@ -272,23 +272,22 @@ public class PutKudu extends AbstractKuduProcessor {
 
         final KerberosUser user = getKerberosUser();
         if (user == null) {
-            trigger(context, session, flowFiles);
+            trigger(context, session, flowFiles, kuduClient);
             return;
         }
 
-        final PrivilegedExceptionAction<Void> privelegedAction = () -> {
-            trigger(context, session, flowFiles);
+        final PrivilegedExceptionAction<Void> privilegedAction = () -> {
+            trigger(context, session, flowFiles, kuduClient);
             return null;
         };
 
-        final KerberosAction<Void> action = new KerberosAction<>(user, privelegedAction, getLogger());
+        final KerberosAction<Void> action = new KerberosAction<>(user, privilegedAction, getLogger());
         action.execute();
     }
 
-    private void trigger(final ProcessContext context, final ProcessSession session, final List<FlowFile> flowFiles) throws ProcessException {
+    private void trigger(final ProcessContext context, final ProcessSession session, final List<FlowFile> flowFiles, KuduClient kuduClient) throws ProcessException {
         final RecordReaderFactory recordReaderFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
 
-        final KuduClient kuduClient = getKuduClient();
         final KuduSession kuduSession = createKuduSession(kuduClient);
 
         final Map<FlowFile, Integer> numRecords = new HashMap<>();
diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java
index e2935b0..52ab00d 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java
@@ -25,6 +25,8 @@ import org.apache.kudu.client.Insert;
 import org.apache.kudu.client.Upsert;
 import org.apache.kudu.client.Update;
 import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.security.krb.KerberosUser;
 import org.apache.nifi.serialization.record.Record;
 
@@ -83,7 +85,7 @@ public class MockPutKudu extends PutKudu {
     }
 
     @Override
-    public KuduClient buildClient(final String masters, ProcessContext context) {
+    public KuduClient buildClient(ProcessContext context) {
         final KuduClient client = mock(KuduClient.class);
 
         try {
@@ -96,7 +98,7 @@ public class MockPutKudu extends PutKudu {
     }
 
     @Override
-    public KuduClient getKuduClient() {
+    protected void onTrigger(ProcessContext context, ProcessSession session, KuduClient kuduClient)  throws ProcessException {
         final KuduClient client = mock(KuduClient.class);
 
         try {
@@ -105,7 +107,7 @@ public class MockPutKudu extends PutKudu {
             throw new AssertionError(e);
         }
 
-        return client;
+        super.onTrigger(context, session, client);
     }
 
     public boolean loggedIn() {
@@ -117,12 +119,12 @@ public class MockPutKudu extends PutKudu {
     }
 
     @Override
-    protected KerberosUser loginKerberosKeytabUser(final String principal, final String keytab) throws LoginException {
+    protected KerberosUser loginKerberosKeytabUser(final String principal, final String keytab, ProcessContext context) throws LoginException {
         return createMockKerberosUser(principal);
     }
 
     @Override
-    protected KerberosUser loginKerberosPasswordUser(String principal, String password) throws LoginException {
+    protected KerberosUser loginKerberosPasswordUser(String principal, String password, ProcessContext context) throws LoginException {
         return createMockKerberosUser(principal);
     }