You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ex...@apache.org on 2022/03/28 18:36:38 UTC
[nifi] branch main updated: NIFI-9838: Added Client Certificate attributes to ListenTCPRecord FlowFiles
This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 6ade47a NIFI-9838: Added Client Certificate attributes to ListenTCPRecord FlowFiles
6ade47a is described below
commit 6ade47ac4f4fb6a4e0be84d5f2431e0c2f2718eb
Author: Peter Gyori <pe...@gmail.com>
AuthorDate: Fri Mar 25 18:36:43 2022 +0100
NIFI-9838: Added Client Certificate attributes to ListenTCPRecord FlowFiles
This closes #5908
Signed-off-by: David Handermann <ex...@apache.org>
---
.../listen/SSLSocketChannelRecordReader.java | 10 ++++-
.../SocketChannelRecordReaderDispatcher.java | 2 +-
.../nifi/processors/standard/ListenTCPRecord.java | 43 ++++++++++++++++++++--
.../processors/standard/TestListenTCPRecord.java | 15 ++++++++
4 files changed, 65 insertions(+), 5 deletions(-)
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/record/listen/SSLSocketChannelRecordReader.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/record/listen/SSLSocketChannelRecordReader.java
index f393419..39710a0 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/record/listen/SSLSocketChannelRecordReader.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/record/listen/SSLSocketChannelRecordReader.java
@@ -24,6 +24,8 @@ import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLSession;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
@@ -40,17 +42,20 @@ public class SSLSocketChannelRecordReader implements SocketChannelRecordReader {
private final SSLSocketChannel sslSocketChannel;
private final RecordReaderFactory readerFactory;
private final SocketChannelRecordReaderDispatcher dispatcher;
+ private final SSLEngine sslEngine;
private RecordReader recordReader;
public SSLSocketChannelRecordReader(final SocketChannel socketChannel,
final SSLSocketChannel sslSocketChannel,
final RecordReaderFactory readerFactory,
- final SocketChannelRecordReaderDispatcher dispatcher) {
+ final SocketChannelRecordReaderDispatcher dispatcher,
+ final SSLEngine sslEngine) {
this.socketChannel = socketChannel;
this.sslSocketChannel = sslSocketChannel;
this.readerFactory = readerFactory;
this.dispatcher = dispatcher;
+ this.sslEngine = sslEngine;
}
@Override
@@ -87,4 +92,7 @@ public class SSLSocketChannelRecordReader implements SocketChannelRecordReader {
dispatcher.connectionCompleted();
}
+ public SSLSession getSession() {
+ return sslEngine.getSession();
+ }
}
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/record/listen/SocketChannelRecordReaderDispatcher.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/record/listen/SocketChannelRecordReaderDispatcher.java
index 2c7c93a..48459ce 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/record/listen/SocketChannelRecordReaderDispatcher.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/record/listen/SocketChannelRecordReaderDispatcher.java
@@ -119,7 +119,7 @@ public class SocketChannelRecordReaderDispatcher implements Runnable, Closeable
}
final SSLSocketChannel sslSocketChannel = new SSLSocketChannel(sslEngine, socketChannel);
- socketChannelRecordReader = new SSLSocketChannelRecordReader(socketChannel, sslSocketChannel, readerFactory, this);
+ socketChannelRecordReader = new SSLSocketChannelRecordReader(socketChannel, sslSocketChannel, readerFactory, this, sslEngine);
}
// queue the SocketChannelRecordReader for processing by the processor
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCPRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCPRecord.java
index acc936a..bb7a2bf 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCPRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCPRecord.java
@@ -25,6 +25,8 @@ import java.net.InetSocketAddress;
import java.net.NetworkInterface;
import java.net.SocketTimeoutException;
import java.nio.channels.ServerSocketChannel;
+import java.security.cert.Certificate;
+import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -37,6 +39,9 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLPeerUnverifiedException;
+import javax.net.ssl.SSLSession;
+
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
@@ -62,6 +67,7 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processor.util.listen.ListenerProperties;
+import org.apache.nifi.record.listen.SSLSocketChannelRecordReader;
import org.apache.nifi.record.listen.SocketChannelRecordReader;
import org.apache.nifi.record.listen.SocketChannelRecordReaderDispatcher;
import org.apache.nifi.security.util.ClientAuth;
@@ -87,14 +93,26 @@ import org.apache.nifi.ssl.SSLContextService;
"If the read times out, or if any other error is encountered when reading, the connection will be closed, and any records " +
"read up to that point will be handled according to the configured Read Error Strategy (Discard or Transfer). In cases where " +
"clients are keeping a connection open, the concurrent tasks for the processor should be adjusted to match the Max Number of " +
- "TCP Connections allowed, so that there is a task processing each connection.")
+ "TCP Connections allowed, so that there is a task processing each connection. " +
+ "The processor can be configured to use an SSL Context Service to only allow secure connections. " +
+ "When connected clients present certificates for mutual TLS authentication, the Distinguished Names of the client certificate's " +
+ "issuer and subject are added to the outgoing FlowFiles as attributes. " +
+ "The processor does not perform authorization based on Distinguished Name values, but since these values " +
+ "are attached to the outgoing FlowFiles, authorization can be implemented based on these attributes.")
@WritesAttributes({
@WritesAttribute(attribute="tcp.sender", description="The host that sent the data."),
@WritesAttribute(attribute="tcp.port", description="The port that the processor accepted the connection on."),
@WritesAttribute(attribute="record.count", description="The number of records written to the flow file."),
- @WritesAttribute(attribute="mime.type", description="The mime-type of the writer used to write the records to the flow file.")
+ @WritesAttribute(attribute="mime.type", description="The mime-type of the writer used to write the records to the flow file."),
+ @WritesAttribute(attribute="client.certificate.issuer.dn", description="For connections using mutual TLS, the Distinguished Name of the " +
+ "Certificate Authority that issued the client's certificate " +
+ "is attached to the FlowFile."),
+ @WritesAttribute(attribute="client.certificate.subject.dn", description="For connections using mutual TLS, the Distinguished Name of the " +
+ "client certificate's owner (subject) is attached to the FlowFile.")
})
public class ListenTCPRecord extends AbstractProcessor {
+ private static final String CLIENT_CERTIFICATE_SUBJECT_DN_ATTRIBUTE = "client.certificate.subject.dn";
+ private static final String CLIENT_CERTIFICATE_ISSUER_DN_ATTRIBUTE = "client.certificate.issuer.dn";
static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
.name("port")
@@ -199,7 +217,6 @@ public class ListenTCPRecord extends AbstractProcessor {
.description("Messages received successfully will be sent out this relationship.")
.build();
-
static final List<PropertyDescriptor> PROPERTIES;
static {
final List<PropertyDescriptor> props = new ArrayList<>();
@@ -427,6 +444,7 @@ public class ListenTCPRecord extends AbstractProcessor {
attributes.put("tcp.sender", sender);
attributes.put("tcp.port", String.valueOf(port));
attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
+ addClientCertificateAttributes(attributes, socketRecordReader);
flowFile = session.putAllAttributes(flowFile, attributes);
final String senderHost = sender.startsWith("/") && sender.length() > 1 ? sender.substring(1) : sender;
@@ -460,4 +478,23 @@ public class ListenTCPRecord extends AbstractProcessor {
private String getRemoteAddress(final SocketChannelRecordReader socketChannelRecordReader) {
return socketChannelRecordReader.getRemoteAddress() == null ? "null" : socketChannelRecordReader.getRemoteAddress().toString();
}
+
+ private void addClientCertificateAttributes(final Map<String, String> attributes, final SocketChannelRecordReader socketRecordReader)
+ throws SSLPeerUnverifiedException {
+ if (socketRecordReader instanceof SSLSocketChannelRecordReader) {
+ SSLSocketChannelRecordReader sslSocketRecordReader = (SSLSocketChannelRecordReader) socketRecordReader;
+ SSLSession sslSession = sslSocketRecordReader.getSession();
+ try {
+ Certificate[] certificates = sslSession.getPeerCertificates();
+ if (certificates.length > 0) {
+ X509Certificate certificate = (X509Certificate) certificates[0];
+ attributes.put(CLIENT_CERTIFICATE_SUBJECT_DN_ATTRIBUTE, certificate.getSubjectDN().toString());
+ attributes.put(CLIENT_CERTIFICATE_ISSUER_DN_ATTRIBUTE, certificate.getIssuerDN().toString());
+ }
+ } catch (SSLPeerUnverifiedException peerUnverifiedException) {
+ getLogger().debug("Remote Peer [{}] not verified: client certificates not provided",
+ socketRecordReader.getRemoteAddress(), peerUnverifiedException);
+ }
+ }
+ }
}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenTCPRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenTCPRecord.java
index 325b951..731902d 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenTCPRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenTCPRecord.java
@@ -173,6 +173,21 @@ public class TestListenTCPRecord {
}
@Test(timeout = TEST_TIMEOUT)
+ public void testRunSSLClientDNsAddedAsAttributes() throws InitializationException, IOException, InterruptedException {
+ runner.setProperty(ListenTCPRecord.CLIENT_AUTH, ClientAuth.REQUIRED.name());
+ enableSslContextService(keyStoreSslContext);
+
+ run(1, keyStoreSslContext);
+
+ final List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenTCPRecord.REL_SUCCESS);
+ Assert.assertEquals(1, mockFlowFiles.size());
+
+ final MockFlowFile flowFile = mockFlowFiles.get(0);
+ flowFile.assertAttributeEquals("client.certificate.subject.dn", "CN=localhost");
+ flowFile.assertAttributeEquals("client.certificate.issuer.dn", "CN=localhost");
+ }
+
+ @Test(timeout = TEST_TIMEOUT)
public void testRunClientAuthNone() throws InitializationException, IOException, InterruptedException {
runner.setProperty(ListenTCPRecord.CLIENT_AUTH, ClientAuth.NONE.name());
enableSslContextService(keyStoreSslContext);