You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ex...@apache.org on 2022/03/28 18:36:38 UTC

[nifi] branch main updated: NIFI-9838: Added Client Certificate attributes to ListenTCPRecord FlowFiles

This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 6ade47a  NIFI-9838: Added Client Certificate attributes to ListenTCPRecord FlowFiles
6ade47a is described below

commit 6ade47ac4f4fb6a4e0be84d5f2431e0c2f2718eb
Author: Peter Gyori <pe...@gmail.com>
AuthorDate: Fri Mar 25 18:36:43 2022 +0100

    NIFI-9838: Added Client Certificate attributes to ListenTCPRecord FlowFiles
    
    This closes #5908
    
    Signed-off-by: David Handermann <ex...@apache.org>
---
 .../listen/SSLSocketChannelRecordReader.java       | 10 ++++-
 .../SocketChannelRecordReaderDispatcher.java       |  2 +-
 .../nifi/processors/standard/ListenTCPRecord.java  | 43 ++++++++++++++++++++--
 .../processors/standard/TestListenTCPRecord.java   | 15 ++++++++
 4 files changed, 65 insertions(+), 5 deletions(-)

diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/record/listen/SSLSocketChannelRecordReader.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/record/listen/SSLSocketChannelRecordReader.java
index f393419..39710a0 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/record/listen/SSLSocketChannelRecordReader.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/record/listen/SSLSocketChannelRecordReader.java
@@ -24,6 +24,8 @@ import org.apache.nifi.serialization.MalformedRecordException;
 import org.apache.nifi.serialization.RecordReader;
 import org.apache.nifi.serialization.RecordReaderFactory;
 
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLSession;
 import java.io.BufferedInputStream;
 import java.io.IOException;
 import java.io.InputStream;
@@ -40,17 +42,20 @@ public class SSLSocketChannelRecordReader implements SocketChannelRecordReader {
     private final SSLSocketChannel sslSocketChannel;
     private final RecordReaderFactory readerFactory;
     private final SocketChannelRecordReaderDispatcher dispatcher;
+    private final SSLEngine sslEngine;
 
     private RecordReader recordReader;
 
     public SSLSocketChannelRecordReader(final SocketChannel socketChannel,
                                         final SSLSocketChannel sslSocketChannel,
                                         final RecordReaderFactory readerFactory,
-                                        final SocketChannelRecordReaderDispatcher dispatcher) {
+                                        final SocketChannelRecordReaderDispatcher dispatcher,
+                                        final SSLEngine sslEngine) {
         this.socketChannel = socketChannel;
         this.sslSocketChannel = sslSocketChannel;
         this.readerFactory = readerFactory;
         this.dispatcher = dispatcher;
+        this.sslEngine = sslEngine;
     }
 
     @Override
@@ -87,4 +92,7 @@ public class SSLSocketChannelRecordReader implements SocketChannelRecordReader {
         dispatcher.connectionCompleted();
     }
 
+    public SSLSession getSession() {
+        return sslEngine.getSession();
+    }
 }
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/record/listen/SocketChannelRecordReaderDispatcher.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/record/listen/SocketChannelRecordReaderDispatcher.java
index 2c7c93a..48459ce 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/record/listen/SocketChannelRecordReaderDispatcher.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/record/listen/SocketChannelRecordReaderDispatcher.java
@@ -119,7 +119,7 @@ public class SocketChannelRecordReaderDispatcher implements Runnable, Closeable
                     }
 
                     final SSLSocketChannel sslSocketChannel = new SSLSocketChannel(sslEngine, socketChannel);
-                    socketChannelRecordReader = new SSLSocketChannelRecordReader(socketChannel, sslSocketChannel, readerFactory, this);
+                    socketChannelRecordReader = new SSLSocketChannelRecordReader(socketChannel, sslSocketChannel, readerFactory, this, sslEngine);
                 }
 
                 // queue the SocketChannelRecordReader for processing by the processor
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCPRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCPRecord.java
index acc936a..bb7a2bf 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCPRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCPRecord.java
@@ -25,6 +25,8 @@ import java.net.InetSocketAddress;
 import java.net.NetworkInterface;
 import java.net.SocketTimeoutException;
 import java.nio.channels.ServerSocketChannel;
+import java.security.cert.Certificate;
+import java.security.cert.X509Certificate;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -37,6 +39,9 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLPeerUnverifiedException;
+import javax.net.ssl.SSLSession;
+
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.behavior.InputRequirement;
@@ -62,6 +67,7 @@ import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processor.util.listen.ListenerProperties;
+import org.apache.nifi.record.listen.SSLSocketChannelRecordReader;
 import org.apache.nifi.record.listen.SocketChannelRecordReader;
 import org.apache.nifi.record.listen.SocketChannelRecordReaderDispatcher;
 import org.apache.nifi.security.util.ClientAuth;
@@ -87,14 +93,26 @@ import org.apache.nifi.ssl.SSLContextService;
         "If the read times out, or if any other error is encountered when reading, the connection will be closed, and any records " +
         "read up to that point will be handled according to the configured Read Error Strategy (Discard or Transfer). In cases where " +
         "clients are keeping a connection open, the concurrent tasks for the processor should be adjusted to match the Max Number of " +
-        "TCP Connections allowed, so that there is a task processing each connection.")
+        "TCP Connections allowed, so that there is a task processing each connection. " +
+        "The processor can be configured to use an SSL Context Service to only allow secure connections. " +
+        "When connected clients present certificates for mutual TLS authentication, the Distinguished Names of the client certificate's " +
+        "issuer and subject are added to the outgoing FlowFiles as attributes. " +
+        "The processor does not perform authorization based on Distinguished Name values, but since these values " +
+        "are attached to the outgoing FlowFiles, authorization can be implemented based on these attributes.")
 @WritesAttributes({
         @WritesAttribute(attribute="tcp.sender", description="The host that sent the data."),
         @WritesAttribute(attribute="tcp.port", description="The port that the processor accepted the connection on."),
         @WritesAttribute(attribute="record.count", description="The number of records written to the flow file."),
-        @WritesAttribute(attribute="mime.type", description="The mime-type of the writer used to write the records to the flow file.")
+        @WritesAttribute(attribute="mime.type", description="The mime-type of the writer used to write the records to the flow file."),
+        @WritesAttribute(attribute="client.certificate.issuer.dn", description="For connections using mutual TLS, the Distinguished Name of the " +
+                                                                               "Certificate Authority that issued the client's certificate " +
+                                                                               "is attached to the FlowFile."),
+        @WritesAttribute(attribute="client.certificate.subject.dn", description="For connections using mutual TLS, the Distinguished Name of the " +
+                                                                                "client certificate's owner (subject) is attached to the FlowFile.")
 })
 public class ListenTCPRecord extends AbstractProcessor {
+    private static final String CLIENT_CERTIFICATE_SUBJECT_DN_ATTRIBUTE = "client.certificate.subject.dn";
+    private static final String CLIENT_CERTIFICATE_ISSUER_DN_ATTRIBUTE = "client.certificate.issuer.dn";
 
     static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
             .name("port")
@@ -199,7 +217,6 @@ public class ListenTCPRecord extends AbstractProcessor {
             .description("Messages received successfully will be sent out this relationship.")
             .build();
 
-
     static final List<PropertyDescriptor> PROPERTIES;
     static {
         final List<PropertyDescriptor> props = new ArrayList<>();
@@ -427,6 +444,7 @@ public class ListenTCPRecord extends AbstractProcessor {
                     attributes.put("tcp.sender", sender);
                     attributes.put("tcp.port", String.valueOf(port));
                     attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
+                    addClientCertificateAttributes(attributes, socketRecordReader);
                     flowFile = session.putAllAttributes(flowFile, attributes);
 
                     final String senderHost = sender.startsWith("/") && sender.length() > 1 ? sender.substring(1) : sender;
@@ -460,4 +478,23 @@ public class ListenTCPRecord extends AbstractProcessor {
     private String getRemoteAddress(final SocketChannelRecordReader socketChannelRecordReader) {
         return socketChannelRecordReader.getRemoteAddress() == null ? "null" : socketChannelRecordReader.getRemoteAddress().toString();
     }
+
+    private void addClientCertificateAttributes(final Map<String, String> attributes, final SocketChannelRecordReader socketRecordReader)
+            throws SSLPeerUnverifiedException {
+        if (socketRecordReader instanceof SSLSocketChannelRecordReader) {
+            SSLSocketChannelRecordReader sslSocketRecordReader = (SSLSocketChannelRecordReader) socketRecordReader;
+            SSLSession sslSession = sslSocketRecordReader.getSession();
+            try {
+                Certificate[] certificates = sslSession.getPeerCertificates();
+                if (certificates.length > 0) {
+                    X509Certificate certificate = (X509Certificate) certificates[0];
+                    attributes.put(CLIENT_CERTIFICATE_SUBJECT_DN_ATTRIBUTE, certificate.getSubjectDN().toString());
+                    attributes.put(CLIENT_CERTIFICATE_ISSUER_DN_ATTRIBUTE, certificate.getIssuerDN().toString());
+                }
+            } catch (SSLPeerUnverifiedException peerUnverifiedException) {
+                getLogger().debug("Remote Peer [{}] not verified: client certificates not provided",
+                        socketRecordReader.getRemoteAddress(), peerUnverifiedException);
+            }
+        }
+    }
 }
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenTCPRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenTCPRecord.java
index 325b951..731902d 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenTCPRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenTCPRecord.java
@@ -173,6 +173,21 @@ public class TestListenTCPRecord {
     }
 
     @Test(timeout = TEST_TIMEOUT)
+    public void testRunSSLClientDNsAddedAsAttributes() throws InitializationException, IOException, InterruptedException {
+        runner.setProperty(ListenTCPRecord.CLIENT_AUTH, ClientAuth.REQUIRED.name());
+        enableSslContextService(keyStoreSslContext);
+
+        run(1, keyStoreSslContext);
+
+        final List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenTCPRecord.REL_SUCCESS);
+        Assert.assertEquals(1, mockFlowFiles.size());
+
+        final MockFlowFile flowFile = mockFlowFiles.get(0);
+        flowFile.assertAttributeEquals("client.certificate.subject.dn", "CN=localhost");
+        flowFile.assertAttributeEquals("client.certificate.issuer.dn", "CN=localhost");
+    }
+
+    @Test(timeout = TEST_TIMEOUT)
     public void testRunClientAuthNone() throws InitializationException, IOException, InterruptedException {
         runner.setProperty(ListenTCPRecord.CLIENT_AUTH, ClientAuth.NONE.name());
         enableSslContextService(keyStoreSslContext);