You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tu...@apache.org on 2020/05/26 16:06:29 UTC
[nifi] branch master updated: NIFI-7453 In PutKudu creating a new
Kudu client when refreshing TGT
This is an automated email from the ASF dual-hosted git repository.
turcsanyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new ca65bba NIFI-7453 In PutKudu creating a new Kudu client when refreshing TGT
ca65bba is described below
commit ca65bba5d720550aab97fcfc58be46e1b77001d3
Author: Tamas Palfy <ta...@gmail.com>
AuthorDate: Fri May 15 13:48:54 2020 +0200
NIFI-7453 In PutKudu creating a new Kudu client when refreshing TGT
NIFI-7453 Creating a new Kudu client when refreshing TGT in KerberosPasswordUser as well. (Applied to KerberosKeytabUser only before.)
NIFI-7453 Safely closing old Kudu client before creating a new one.
NIFI-7453 Visibility adjustment.
This closes #4276.
Signed-off-by: Peter Turcsanyi <tu...@apache.org>
---
.../processors/kudu/AbstractKuduProcessor.java | 95 +++++++++++++++++-----
.../org/apache/nifi/processors/kudu/PutKudu.java | 15 ++--
.../apache/nifi/processors/kudu/MockPutKudu.java | 12 +--
3 files changed, 90 insertions(+), 32 deletions(-)
diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
index 7561ee4..c41c8d5 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
@@ -44,6 +44,8 @@ import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.kerberos.KerberosCredentialsService;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.security.krb.KerberosAction;
import org.apache.nifi.security.krb.KerberosKeytabUser;
@@ -62,6 +64,9 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
public abstract class AbstractKuduProcessor extends AbstractProcessor {
@@ -120,40 +125,70 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor {
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
- protected KuduClient kuduClient;
+ private volatile KuduClient kuduClient;
+ private final ReadWriteLock kuduClientReadWriteLock = new ReentrantReadWriteLock();
+ private final Lock kuduClientReadLock = kuduClientReadWriteLock.readLock();
+ private final Lock kuduClientWriteLock = kuduClientReadWriteLock.writeLock();
private volatile KerberosUser kerberosUser;
- public KerberosUser getKerberosUser() {
+ protected abstract void onTrigger(ProcessContext context, ProcessSession session, KuduClient kuduClient) throws ProcessException;
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+ kuduClientReadLock.lock();
+ try {
+ onTrigger(context, session, kuduClient);
+ } finally {
+ kuduClientReadLock.unlock();
+ }
+ }
+
+ protected KerberosUser getKerberosUser() {
return this.kerberosUser;
}
- public KuduClient getKuduClient() {
- return this.kuduClient;
+ protected void createKerberosUserAndKuduClient(ProcessContext context) throws LoginException {
+ createKerberosUser(context);
+ createKuduClient(context);
}
- public void createKuduClient(ProcessContext context) throws LoginException {
- final String kuduMasters = context.getProperty(KUDU_MASTERS).evaluateAttributeExpressions().getValue();
+ protected void createKerberosUser(ProcessContext context) throws LoginException {
final KerberosCredentialsService credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
final String kerberosPrincipal = context.getProperty(KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue();
final String kerberosPassword = context.getProperty(KERBEROS_PASSWORD).getValue();
if (credentialsService != null) {
- kerberosUser = loginKerberosKeytabUser(credentialsService.getPrincipal(), credentialsService.getKeytab());
+ kerberosUser = loginKerberosKeytabUser(credentialsService.getPrincipal(), credentialsService.getKeytab(), context);
} else if (!StringUtils.isBlank(kerberosPrincipal) && !StringUtils.isBlank(kerberosPassword)) {
- kerberosUser = loginKerberosPasswordUser(kerberosPrincipal, kerberosPassword);
+ kerberosUser = loginKerberosPasswordUser(kerberosPrincipal, kerberosPassword, context);
}
+ }
- if (kerberosUser != null) {
- final KerberosAction<KuduClient> kerberosAction = new KerberosAction<>(kerberosUser, () -> buildClient(kuduMasters, context), getLogger());
- this.kuduClient = kerberosAction.execute();
- } else {
- this.kuduClient = buildClient(kuduMasters, context);
+ protected void createKuduClient(ProcessContext context) {
+ kuduClientWriteLock.lock();
+ try {
+ if (this.kuduClient != null) {
+ try {
+ this.kuduClient.close();
+ } catch (KuduException e) {
+ getLogger().error("Couldn't close Kudu client.");
+ }
+ }
+
+ if (kerberosUser != null) {
+ final KerberosAction<KuduClient> kerberosAction = new KerberosAction<>(kerberosUser, () -> buildClient(context), getLogger());
+ this.kuduClient = kerberosAction.execute();
+ } else {
+ this.kuduClient = buildClient(context);
+ }
+ } finally {
+ kuduClientWriteLock.unlock();
}
}
-
- protected KuduClient buildClient(final String masters, final ProcessContext context) {
+ protected KuduClient buildClient(final ProcessContext context) {
+ final String masters = context.getProperty(KUDU_MASTERS).evaluateAttributeExpressions().getValue();
final Integer operationTimeout = context.getProperty(KUDU_OPERATION_TIMEOUT_MS).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue();
final Integer adminOperationTimeout = context.getProperty(KUDU_KEEP_ALIVE_PERIOD_TIMEOUT_MS).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue();
@@ -176,14 +211,36 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor {
}
}
- protected KerberosUser loginKerberosKeytabUser(final String principal, final String keytab) throws LoginException {
- final KerberosUser kerberosUser = new KerberosKeytabUser(principal, keytab);
+ protected KerberosUser loginKerberosKeytabUser(final String principal, final String keytab, ProcessContext context) throws LoginException {
+ final KerberosUser kerberosUser = new KerberosKeytabUser(principal, keytab) {
+ @Override
+ public synchronized boolean checkTGTAndRelogin() throws LoginException {
+ boolean didRelogin = super.checkTGTAndRelogin();
+
+ if (didRelogin) {
+ createKuduClient(context);
+ }
+
+ return didRelogin;
+ }
+ };
kerberosUser.login();
return kerberosUser;
}
- protected KerberosUser loginKerberosPasswordUser(final String principal, final String password) throws LoginException {
- final KerberosUser kerberosUser = new KerberosPasswordUser(principal, password);
+ protected KerberosUser loginKerberosPasswordUser(final String principal, final String password, ProcessContext context) throws LoginException {
+ final KerberosUser kerberosUser = new KerberosPasswordUser(principal, password) {
+ @Override
+ public synchronized boolean checkTGTAndRelogin() throws LoginException {
+ boolean didRelogin = super.checkTGTAndRelogin();
+
+ if (didRelogin) {
+ createKuduClient(context);
+ }
+
+ return didRelogin;
+ }
+ };
kerberosUser.login();
return kerberosUser;
}
diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
index 1893c64..2ac0195 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
@@ -260,11 +260,11 @@ public class PutKudu extends AbstractKuduProcessor {
batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger();
ffbatch = context.getProperty(FLOWFILE_BATCH_SIZE).evaluateAttributeExpressions().asInteger();
flushMode = SessionConfiguration.FlushMode.valueOf(context.getProperty(FLUSH_MODE).getValue().toUpperCase());
- createKuduClient(context);
+ createKerberosUserAndKuduClient(context);
}
@Override
- public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+ protected void onTrigger(final ProcessContext context, final ProcessSession session, KuduClient kuduClient) throws ProcessException {
final List<FlowFile> flowFiles = session.get(ffbatch);
if (flowFiles.isEmpty()) {
return;
@@ -272,23 +272,22 @@ public class PutKudu extends AbstractKuduProcessor {
final KerberosUser user = getKerberosUser();
if (user == null) {
- trigger(context, session, flowFiles);
+ trigger(context, session, flowFiles, kuduClient);
return;
}
- final PrivilegedExceptionAction<Void> privelegedAction = () -> {
- trigger(context, session, flowFiles);
+ final PrivilegedExceptionAction<Void> privilegedAction = () -> {
+ trigger(context, session, flowFiles, kuduClient);
return null;
};
- final KerberosAction<Void> action = new KerberosAction<>(user, privelegedAction, getLogger());
+ final KerberosAction<Void> action = new KerberosAction<>(user, privilegedAction, getLogger());
action.execute();
}
- private void trigger(final ProcessContext context, final ProcessSession session, final List<FlowFile> flowFiles) throws ProcessException {
+ private void trigger(final ProcessContext context, final ProcessSession session, final List<FlowFile> flowFiles, KuduClient kuduClient) throws ProcessException {
final RecordReaderFactory recordReaderFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
- final KuduClient kuduClient = getKuduClient();
final KuduSession kuduSession = createKuduSession(kuduClient);
final Map<FlowFile, Integer> numRecords = new HashMap<>();
diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java
index e2935b0..52ab00d 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java
@@ -25,6 +25,8 @@ import org.apache.kudu.client.Insert;
import org.apache.kudu.client.Upsert;
import org.apache.kudu.client.Update;
import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.security.krb.KerberosUser;
import org.apache.nifi.serialization.record.Record;
@@ -83,7 +85,7 @@ public class MockPutKudu extends PutKudu {
}
@Override
- public KuduClient buildClient(final String masters, ProcessContext context) {
+ public KuduClient buildClient(ProcessContext context) {
final KuduClient client = mock(KuduClient.class);
try {
@@ -96,7 +98,7 @@ public class MockPutKudu extends PutKudu {
}
@Override
- public KuduClient getKuduClient() {
+ protected void onTrigger(ProcessContext context, ProcessSession session, KuduClient kuduClient) throws ProcessException {
final KuduClient client = mock(KuduClient.class);
try {
@@ -105,7 +107,7 @@ public class MockPutKudu extends PutKudu {
throw new AssertionError(e);
}
- return client;
+ super.onTrigger(context, session, client);
}
public boolean loggedIn() {
@@ -117,12 +119,12 @@ public class MockPutKudu extends PutKudu {
}
@Override
- protected KerberosUser loginKerberosKeytabUser(final String principal, final String keytab) throws LoginException {
+ protected KerberosUser loginKerberosKeytabUser(final String principal, final String keytab, ProcessContext context) throws LoginException {
return createMockKerberosUser(principal);
}
@Override
- protected KerberosUser loginKerberosPasswordUser(String principal, String password) throws LoginException {
+ protected KerberosUser loginKerberosPasswordUser(String principal, String password, ProcessContext context) throws LoginException {
return createMockKerberosUser(principal);
}