You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by js...@apache.org on 2019/02/11 23:30:50 UTC
[nifi] branch master updated: NIFI-5984: Enabled Kerberos
Authentication for PutKudu
This is an automated email from the ASF dual-hosted git repository.
jstorck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 3d408f2 NIFI-5984: Enabled Kerberos Authentication for PutKudu
3d408f2 is described below
commit 3d408f2b30fbb35fd507577a7032821044a7dcd9
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Wed Jan 30 10:06:04 2019 -0500
NIFI-5984: Enabled Kerberos Authentication for PutKudu
This closes #3279
---
.../nifi-kudu-bundle/nifi-kudu-processors/pom.xml | 27 +++---
.../org/apache/nifi/processors/kudu/PutKudu.java | 103 +++++++++++++++++----
.../apache/nifi/processors/kudu/MockPutKudu.java | 65 ++++++++++++-
.../apache/nifi/processors/kudu/TestPutKudu.java | 61 ++++++++++++
4 files changed, 220 insertions(+), 36 deletions(-)
diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml
index 5871cc0..8bb100d 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml
@@ -27,17 +27,16 @@
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
- <artifactId>nifi-distributed-cache-client-service-api</artifactId>
- <version>1.9.0-SNAPSHOT</version>
- <scope>provided</scope>
+ <artifactId>nifi-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
- <artifactId>nifi-api</artifactId>
+ <artifactId>nifi-processor-utils</artifactId>
+ <version>1.9.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
- <artifactId>nifi-processor-utils</artifactId>
+ <artifactId>nifi-kerberos-credentials-service-api</artifactId>
<version>1.9.0-SNAPSHOT</version>
</dependency>
<dependency>
@@ -59,10 +58,17 @@
<artifactId>nifi-record</artifactId>
</dependency>
<dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>18.0</version>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.nifi</groupId>
- <artifactId>nifi-hadoop-record-utils</artifactId>
+ <artifactId>nifi-security-utils</artifactId>
<version>1.9.0-SNAPSHOT</version>
</dependency>
+
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock-record-utils</artifactId>
@@ -71,10 +77,6 @@
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
- <artifactId>nifi-schema-registry-service-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<version>1.9.0-SNAPSHOT</version>
<scope>test</scope>
@@ -91,10 +93,5 @@
<version>2.5.4</version>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- <version>18.0</version>
- </dependency>
</dependencies>
</project>
diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
index b0eb3f9..9c0c503 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
@@ -34,6 +34,7 @@ import org.apache.kudu.client.SessionConfiguration;
import org.apache.kudu.client.Upsert;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
@@ -41,21 +42,29 @@ import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyDescriptor.Builder;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.kerberos.KerberosCredentialsService;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.security.krb.KerberosAction;
+import org.apache.nifi.security.krb.KerberosKeytabUser;
+import org.apache.nifi.security.krb.KerberosUser;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSet;
+import javax.security.auth.login.LoginException;
+import java.io.IOException;
import java.io.InputStream;
import java.math.BigDecimal;
+import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -65,8 +74,11 @@ import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
+import static org.apache.nifi.expression.ExpressionLanguageScope.VARIABLE_REGISTRY;
+
@EventDriven
@SupportsBatching
+@RequiresInstanceClassLoading // Because of calls to UserGroupInformation.setConfiguration
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"put", "database", "NoSQL", "kudu", "HDFS", "record"})
@CapabilityDescription("Reads records from an incoming FlowFile using the provided Record Reader, and writes those records " +
@@ -74,23 +86,31 @@ import java.util.stream.Collectors;
" If any error occurs while reading records from the input, or writing records to Kudu, the FlowFile will be routed to failure")
@WritesAttribute(attribute = "record.count", description = "Number of records written to Kudu")
public class PutKudu extends AbstractProcessor {
- protected static final PropertyDescriptor KUDU_MASTERS = new PropertyDescriptor.Builder()
+ protected static final PropertyDescriptor KUDU_MASTERS = new Builder()
.name("Kudu Masters")
.description("List all kudu masters's ip with port (e.g. 7051), comma separated")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .expressionLanguageSupported(VARIABLE_REGISTRY)
.build();
- protected static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
+ protected static final PropertyDescriptor TABLE_NAME = new Builder()
.name("Table Name")
.description("The name of the Kudu Table to put data into")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .expressionLanguageSupported(VARIABLE_REGISTRY)
.build();
- public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+ static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new Builder()
+ .name("kerberos-credentials-service")
+ .displayName("Kerberos Credentials Service")
+ .description("Specifies the Kerberos Credentials to use for authentication")
+ .required(false)
+ .identifiesControllerService(KerberosCredentialsService.class)
+ .build();
+
+ public static final PropertyDescriptor RECORD_READER = new Builder()
.name("record-reader")
.displayName("Record Reader")
.description("The service for reading records from incoming flow files.")
@@ -98,7 +118,7 @@ public class PutKudu extends AbstractProcessor {
.required(true)
.build();
- protected static final PropertyDescriptor SKIP_HEAD_LINE = new PropertyDescriptor.Builder()
+ protected static final PropertyDescriptor SKIP_HEAD_LINE = new Builder()
.name("Skip head line")
.description("Deprecated. Used to ignore header lines, but this should be handled by a RecordReader " +
"(e.g. \"Treat First Line as Header\" property of CSVReader)")
@@ -108,7 +128,7 @@ public class PutKudu extends AbstractProcessor {
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
- protected static final PropertyDescriptor INSERT_OPERATION = new PropertyDescriptor.Builder()
+ protected static final PropertyDescriptor INSERT_OPERATION = new Builder()
.name("Insert Operation")
.description("Specify operationType for this processor. Insert-Ignore will ignore duplicated rows")
.allowableValues(OperationType.values())
@@ -116,7 +136,7 @@ public class PutKudu extends AbstractProcessor {
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
- protected static final PropertyDescriptor FLUSH_MODE = new PropertyDescriptor.Builder()
+ protected static final PropertyDescriptor FLUSH_MODE = new Builder()
.name("Flush Mode")
.description("Set the new flush mode for a kudu session.\n" +
"AUTO_FLUSH_SYNC: the call returns when the operation is persisted, else it throws an exception.\n" +
@@ -149,7 +169,7 @@ public class PutKudu extends AbstractProcessor {
.defaultValue("100")
.required(true)
.addValidator(StandardValidators.createLongValidator(1, 100000, true))
- .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .expressionLanguageSupported(VARIABLE_REGISTRY)
.build();
@@ -171,12 +191,14 @@ public class PutKudu extends AbstractProcessor {
protected KuduClient kuduClient;
protected KuduTable kuduTable;
+ private volatile KerberosUser kerberosUser;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(KUDU_MASTERS);
properties.add(TABLE_NAME);
+ properties.add(KERBEROS_CREDENTIALS_SERVICE);
properties.add(SKIP_HEAD_LINE);
properties.add(RECORD_READER);
properties.add(INSERT_OPERATION);
@@ -197,7 +219,7 @@ public class PutKudu extends AbstractProcessor {
@OnScheduled
- public void OnScheduled(final ProcessContext context) throws KuduException {
+ public void onScheduled(final ProcessContext context) throws IOException, LoginException {
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue();
final String kuduMasters = context.getProperty(KUDU_MASTERS).evaluateAttributeExpressions().getValue();
operationType = OperationType.valueOf(context.getProperty(INSERT_OPERATION).getValue());
@@ -206,21 +228,48 @@ public class PutKudu extends AbstractProcessor {
flushMode = SessionConfiguration.FlushMode.valueOf(context.getProperty(FLUSH_MODE).getValue());
getLogger().debug("Setting up Kudu connection...");
- kuduClient = createClient(kuduMasters);
+ final KerberosCredentialsService credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
+ kuduClient = createClient(kuduMasters, credentialsService);
kuduTable = kuduClient.openTable(tableName);
getLogger().debug("Kudu connection successfully initialized");
}
- protected KuduClient createClient(final String masters) {
+ protected KuduClient createClient(final String masters, final KerberosCredentialsService credentialsService) throws LoginException {
+ if (credentialsService == null) {
+ return buildClient(masters);
+ }
+
+ final String keytab = credentialsService.getKeytab();
+ final String principal = credentialsService.getPrincipal();
+ kerberosUser = loginKerberosUser(principal, keytab);
+
+ final KerberosAction<KuduClient> kerberosAction = new KerberosAction<>(kerberosUser, () -> buildClient(masters), getLogger());
+ return kerberosAction.execute();
+ }
+
+ protected KuduClient buildClient(final String masters) {
return new KuduClient.KuduClientBuilder(masters).build();
}
+ protected KerberosUser loginKerberosUser(final String principal, final String keytab) throws LoginException {
+ final KerberosUser kerberosUser = new KerberosKeytabUser(principal, keytab);
+ kerberosUser.login();
+ return kerberosUser;
+ }
+
@OnStopped
- public final void closeClient() throws KuduException {
- if (kuduClient != null) {
- getLogger().debug("Closing KuduClient");
- kuduClient.close();
- kuduClient = null;
+ public final void closeClient() throws KuduException, LoginException {
+ try {
+ if (kuduClient != null) {
+ getLogger().debug("Closing KuduClient");
+ kuduClient.close();
+ kuduClient = null;
+ }
+ } finally {
+ if (kerberosUser != null) {
+ kerberosUser.logout();
+ kerberosUser = null;
+ }
}
}
@@ -231,6 +280,22 @@ public class PutKudu extends AbstractProcessor {
return;
}
+ final KerberosUser user = kerberosUser;
+ if (user == null) {
+ trigger(context, session, flowFiles);
+ return;
+ }
+
+ final PrivilegedExceptionAction<Void> privelegedAction = () -> {
+ trigger(context, session, flowFiles);
+ return null;
+ };
+
+ final KerberosAction<Void> action = new KerberosAction<>(user, privelegedAction, getLogger());
+ action.execute();
+ }
+
+ private void trigger(final ProcessContext context, final ProcessSession session, final List<FlowFile> flowFiles) throws ProcessException {
final KuduSession kuduSession = getKuduSession(kuduClient);
final RecordReaderFactory recordReaderFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
@@ -353,13 +418,13 @@ public class PutKudu extends AbstractProcessor {
- protected Upsert upsertRecordToKudu(KuduTable kuduTable, Record record, List<String> fieldNames) throws IllegalStateException, Exception {
+ protected Upsert upsertRecordToKudu(KuduTable kuduTable, Record record, List<String> fieldNames) {
Upsert upsert = kuduTable.newUpsert();
this.buildPartialRow(kuduTable.getSchema(), upsert.getRow(), record, fieldNames);
return upsert;
}
- protected Insert insertRecordToKudu(KuduTable kuduTable, Record record, List<String> fieldNames) throws IllegalStateException, Exception {
+ protected Insert insertRecordToKudu(KuduTable kuduTable, Record record, List<String> fieldNames) {
Insert insert = kuduTable.newInsert();
this.buildPartialRow(kuduTable.getSchema(), insert.getRow(), record, fieldNames);
return insert;
diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java
index f805be7..091f5c3 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java
@@ -22,8 +22,13 @@ import org.apache.kudu.client.KuduClient;
import org.apache.kudu.client.KuduSession;
import org.apache.kudu.client.KuduTable;
import org.apache.kudu.client.Upsert;
+import org.apache.nifi.security.krb.KerberosUser;
import org.apache.nifi.serialization.record.Record;
+import javax.security.auth.login.LoginException;
+import java.security.PrivilegedAction;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
@@ -36,6 +41,9 @@ public class MockPutKudu extends PutKudu {
private KuduSession session;
private LinkedList<Insert> insertQueue;
+ private boolean loggedIn = false;
+ private boolean loggedOut = false;
+
public MockPutKudu() {
this(mock(KuduSession.class));
}
@@ -61,18 +69,71 @@ public class MockPutKudu extends PutKudu {
}
@Override
- protected KuduClient createClient(final String masters) {
+ protected KuduClient buildClient(final String masters) {
final KuduClient client = mock(KuduClient.class);
try {
when(client.openTable(anyString())).thenReturn(mock(KuduTable.class));
} catch (final Exception e) {
-
+ throw new AssertionError(e);
}
return client;
}
+ public boolean loggedIn() {
+ return loggedIn;
+ }
+
+ public boolean loggedOut() {
+ return loggedOut;
+ }
+
+ @Override
+ protected KerberosUser loginKerberosUser(final String principal, final String keytab) throws LoginException {
+ return new KerberosUser() {
+
+ @Override
+ public void login() {
+ loggedIn = true;
+ }
+
+ @Override
+ public void logout() {
+ loggedOut = true;
+ }
+
+ @Override
+ public <T> T doAs(final PrivilegedAction<T> action) throws IllegalStateException {
+ return action.run();
+ }
+
+ @Override
+ public <T> T doAs(final PrivilegedExceptionAction<T> action) throws IllegalStateException, PrivilegedActionException {
+ try {
+ return action.run();
+ } catch (Exception e) {
+ throw new PrivilegedActionException(e);
+ }
+ }
+
+ @Override
+ public boolean checkTGTAndRelogin() {
+ return true;
+ }
+
+ @Override
+ public boolean isLoggedIn() {
+ return loggedIn && !loggedOut;
+ }
+
+ @Override
+ public String getPrincipal() {
+ return principal;
+ }
+ };
+ }
+
@Override
protected KuduSession getKuduSession(KuduClient client) {
return session;
diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java
index 51908f2..6fc430c 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java
@@ -28,8 +28,10 @@ import org.apache.kudu.client.OperationResponse;
import org.apache.kudu.client.RowError;
import org.apache.kudu.client.RowErrorsAndOverflowStatus;
import org.apache.kudu.client.SessionConfiguration.FlushMode;
+import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.kerberos.KerberosCredentialsService;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
@@ -70,6 +72,8 @@ import java.util.stream.IntStream;
import static org.apache.nifi.processors.kudu.TestPutKudu.ResultCode.EXCEPTION;
import static org.apache.nifi.processors.kudu.TestPutKudu.ResultCode.FAIL;
import static org.apache.nifi.processors.kudu.TestPutKudu.ResultCode.OK;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -150,6 +154,42 @@ public class TestPutKudu {
}
@Test
+ public void testKerberosEnabled() throws InitializationException {
+ createRecordReader(1);
+
+ final KerberosCredentialsService kerberosCredentialsService = new MockKerberosCredentialsService("unit-test-principal", "unit-test-keytab");
+ testRunner.addControllerService("kerb", kerberosCredentialsService);
+ testRunner.enableControllerService(kerberosCredentialsService);
+
+ testRunner.setProperty(PutKudu.KERBEROS_CREDENTIALS_SERVICE, "kerb");
+
+ testRunner.run(1, false);
+
+ final MockPutKudu proc = (MockPutKudu) testRunner.getProcessor();
+ assertTrue(proc.loggedIn());
+ assertFalse(proc.loggedOut());
+
+ testRunner.run(1, true, false);
+ assertTrue(proc.loggedOut());
+ }
+
+
+ @Test
+ public void testInsecureClient() throws InitializationException {
+ createRecordReader(1);
+
+ testRunner.run(1, false);
+
+ final MockPutKudu proc = (MockPutKudu) testRunner.getProcessor();
+ assertFalse(proc.loggedIn());
+ assertFalse(proc.loggedOut());
+
+ testRunner.run(1, true, false);
+ assertFalse(proc.loggedOut());
+ }
+
+
+ @Test
public void testInvalidReaderShouldRouteToFailure() throws InitializationException, SchemaNotFoundException, MalformedRecordException, IOException {
createRecordReader(0);
@@ -516,4 +556,25 @@ public class TestPutKudu {
public void testKuduPartialFailuresOnManualFlush() throws Exception {
testKuduPartialFailure(FlushMode.MANUAL_FLUSH);
}
+
+
+ public static class MockKerberosCredentialsService extends AbstractControllerService implements KerberosCredentialsService {
+ private final String keytab;
+ private final String principal;
+
+ public MockKerberosCredentialsService(final String keytab, final String principal) {
+ this.keytab = keytab;
+ this.principal = principal;
+ }
+
+ @Override
+ public String getKeytab() {
+ return keytab;
+ }
+
+ @Override
+ public String getPrincipal() {
+ return principal;
+ }
+ }
}