You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2022/05/20 19:32:36 UTC

[nifi] 04/06: NIFI-10010: ListenTCP adds Certificate Subject and Issuer FlowFile attributes

This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch support/nifi-1.16
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 1cf0f8cf877259e2acedede926a6546a8af69aef
Author: Peter Gyori <pe...@gmail.com>
AuthorDate: Tue May 10 18:51:08 2022 +0200

    NIFI-10010: ListenTCP adds Certificate Subject and Issuer FlowFile attributes
    
    This closes #6032
    
    Signed-off-by: David Handermann <ex...@apache.org>
---
 ...ByteArrayMessage.java => SslSessionStatus.java} | 29 ++++++-------
 .../event/transport/message/ByteArrayMessage.java  | 13 +++++-
 .../netty/codec/SocketByteArrayMessageDecoder.java | 48 +++++++++++++++++++++-
 .../apache/nifi/processors/standard/ListenTCP.java | 26 +++++++++++-
 .../nifi/processors/standard/TestListenTCP.java    | 19 ++++++---
 5 files changed, 110 insertions(+), 25 deletions(-)

diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/message/ByteArrayMessage.java b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/SslSessionStatus.java
similarity index 57%
copy from nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/message/ByteArrayMessage.java
copy to nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/SslSessionStatus.java
index 00744ce06b..011cedb47f 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/message/ByteArrayMessage.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/SslSessionStatus.java
@@ -6,7 +6,7 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -14,28 +14,25 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.nifi.event.transport.message;
 
-import org.apache.nifi.event.transport.NetworkEvent;
+package org.apache.nifi.event.transport;
 
-/**
- * Byte Array Message with Sender
- */
-public class ByteArrayMessage implements NetworkEvent {
-    private final byte[] message;
+import javax.security.auth.x500.X500Principal;
 
-    private final String sender;
+public class SslSessionStatus {
+    private final X500Principal subject;
+    private final X500Principal issuer;
 
-    public ByteArrayMessage(final byte[] message, final String sender) {
-        this.message = message;
-        this.sender = sender;
+    public SslSessionStatus(final X500Principal subject, final X500Principal issuer) {
+        this.subject = subject;
+        this.issuer = issuer;
     }
 
-    public byte[] getMessage() {
-        return message;
+    public X500Principal getSubject() {
+        return subject;
     }
 
-    public String getSender() {
-        return sender;
+    public X500Principal getIssuer() {
+        return issuer;
     }
 }
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/message/ByteArrayMessage.java b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/message/ByteArrayMessage.java
index 00744ce06b..9c6e53e42f 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/message/ByteArrayMessage.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/message/ByteArrayMessage.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.event.transport.message;
 
 import org.apache.nifi.event.transport.NetworkEvent;
+import org.apache.nifi.event.transport.SslSessionStatus;
 
 /**
  * Byte Array Message with Sender
@@ -25,10 +26,16 @@ public class ByteArrayMessage implements NetworkEvent {
     private final byte[] message;
 
     private final String sender;
+    private final SslSessionStatus sslSessionStatus;
 
-    public ByteArrayMessage(final byte[] message, final String sender) {
+    public ByteArrayMessage(final byte[] message, final String sender, final SslSessionStatus sslSessionStatus) {
         this.message = message;
         this.sender = sender;
+        this.sslSessionStatus = sslSessionStatus;
+    }
+
+    public ByteArrayMessage(final byte[] message, final String sender) {
+        this(message, sender, null);
     }
 
     public byte[] getMessage() {
@@ -38,4 +45,8 @@ public class ByteArrayMessage implements NetworkEvent {
     public String getSender() {
         return sender;
     }
+
+    public SslSessionStatus getSslSessionStatus() {
+        return sslSessionStatus;
+    }
 }
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/codec/SocketByteArrayMessageDecoder.java b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/codec/SocketByteArrayMessageDecoder.java
index ab23dff80d..c7818c8b00 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/codec/SocketByteArrayMessageDecoder.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/codec/SocketByteArrayMessageDecoder.java
@@ -16,17 +16,30 @@
  */
 package org.apache.nifi.event.transport.netty.codec;
 
+import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.MessageToMessageDecoder;
+import io.netty.handler.ssl.SslHandler;
+import org.apache.nifi.event.transport.SslSessionStatus;
 import org.apache.nifi.event.transport.message.ByteArrayMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import javax.net.ssl.SSLPeerUnverifiedException;
+import javax.net.ssl.SSLSession;
+import javax.security.auth.x500.X500Principal;
 import java.net.InetSocketAddress;
+import java.security.cert.Certificate;
+import java.security.cert.X509Certificate;
 import java.util.List;
+import java.util.Map;
 
 /**
  * Message Decoder for bytes received from Socket Channels
  */
 public class SocketByteArrayMessageDecoder extends MessageToMessageDecoder<byte[]> {
+    private static final Logger logger = LoggerFactory.getLogger(SocketByteArrayMessageDecoder.class);
+
     /**
      * Decode bytes to Byte Array Message with remote address from Channel.remoteAddress()
      *
@@ -38,7 +51,40 @@ public class SocketByteArrayMessageDecoder extends MessageToMessageDecoder<byte[
     protected void decode(final ChannelHandlerContext channelHandlerContext, final byte[] bytes, final List<Object> decoded) {
         final InetSocketAddress remoteAddress = (InetSocketAddress) channelHandlerContext.channel().remoteAddress();
         final String address = remoteAddress.getHostString();
-        final ByteArrayMessage message = new ByteArrayMessage(bytes, address);
+
+        final SslSessionStatus sslSessionStatus = getSslSessionStatus(channelHandlerContext);
+        final ByteArrayMessage message = new ByteArrayMessage(bytes, address, sslSessionStatus);
+
         decoded.add(message);
     }
+
+    private SslSessionStatus getSslSessionStatus(final ChannelHandlerContext channelHandlerContext) {
+        SslHandler sslHandler = null;
+        for (final Map.Entry<String, ChannelHandler> entry : channelHandlerContext.channel().pipeline()) {
+            final ChannelHandler channelHandler = entry.getValue();
+            if (channelHandler instanceof SslHandler) {
+                sslHandler = (SslHandler) channelHandler;
+                break;
+            }
+        }
+        return sslHandler == null ? null : createSslSessionStatusFromSslHandler(sslHandler);
+    }
+
+    private SslSessionStatus createSslSessionStatusFromSslHandler(final SslHandler sslHandler) {
+        final SSLSession sslSession = sslHandler.engine().getSession();
+        SslSessionStatus sslSessionStatus = null;
+        try {
+            final Certificate[] certificates = sslSession.getPeerCertificates();
+            if (certificates.length > 0) {
+                final X509Certificate certificate = (X509Certificate) certificates[0];
+                final X500Principal subject = certificate.getSubjectX500Principal();
+                final X500Principal issuer = certificate.getIssuerX500Principal();
+                sslSessionStatus = new SslSessionStatus(subject, issuer);
+            }
+        } catch (final SSLPeerUnverifiedException peerUnverifiedException) {
+            logger.debug("Peer Unverified", peerUnverifiedException);
+        }
+
+        return sslSessionStatus;
+    }
 }
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java
index fc7695b37a..b30ce88223 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java
@@ -30,6 +30,7 @@ import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.event.transport.EventException;
 import org.apache.nifi.event.transport.EventServer;
+import org.apache.nifi.event.transport.SslSessionStatus;
 import org.apache.nifi.event.transport.configuration.BufferAllocator;
 import org.apache.nifi.event.transport.configuration.TransportProtocol;
 import org.apache.nifi.event.transport.message.ByteArrayMessage;
@@ -74,12 +75,24 @@ import java.util.concurrent.LinkedBlockingQueue;
         "as the message demarcator. The default behavior is for each message to produce a single FlowFile, however this can " +
         "be controlled by increasing the Batch Size to a larger value for higher throughput. The Receive Buffer Size must be " +
         "set as large as the largest messages expected to be received, meaning if every 100kb there is a line separator, then " +
-        "the Receive Buffer Size must be greater than 100kb.")
+        "the Receive Buffer Size must be greater than 100kb. " +
+        "The processor can be configured to use an SSL Context Service to only allow secure connections. " +
+        "When connected clients present certificates for mutual TLS authentication, the Distinguished Names of the client certificate's " +
+        "issuer and subject are added to the outgoing FlowFiles as attributes. " +
+        "The processor does not perform authorization based on Distinguished Name values, but since these values " +
+        "are attached to the outgoing FlowFiles, authorization can be implemented based on these attributes.")
 @WritesAttributes({
         @WritesAttribute(attribute="tcp.sender", description="The sending host of the messages."),
-        @WritesAttribute(attribute="tcp.port", description="The sending port the messages were received.")
+        @WritesAttribute(attribute="tcp.port", description="The sending port the messages were received."),
+        @WritesAttribute(attribute="client.certificate.issuer.dn", description="For connections using mutual TLS, the Distinguished Name of the " +
+                "Certificate Authority that issued the client's certificate " +
+                "is attached to the FlowFile."),
+        @WritesAttribute(attribute="client.certificate.subject.dn", description="For connections using mutual TLS, the Distinguished Name of the " +
+                "client certificate's owner (subject) is attached to the FlowFile.")
 })
 public class ListenTCP extends AbstractProcessor {
+    private static final String CLIENT_CERTIFICATE_SUBJECT_DN_ATTRIBUTE = "client.certificate.subject.dn";
+    private static final String CLIENT_CERTIFICATE_ISSUER_DN_ATTRIBUTE = "client.certificate.issuer.dn";
 
     public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
             .name("SSL Context Service")
@@ -213,6 +226,7 @@ public class ListenTCP extends AbstractProcessor {
             }
 
             final Map<String,String> attributes = getAttributes(entry.getValue());
+            addClientCertificateAttributes(attributes, events.get(0));
             flowFile = session.putAllAttributes(flowFile, attributes);
 
             getLogger().debug("Transferring {} to success", flowFile);
@@ -291,4 +305,12 @@ public class ListenTCP extends AbstractProcessor {
         }
         return eventBatcher;
     }
+
+    private void addClientCertificateAttributes(final Map<String, String> attributes, final ByteArrayMessage event) {
+        final SslSessionStatus sslSessionStatus = event.getSslSessionStatus();
+        if (sslSessionStatus != null) {
+            attributes.put(CLIENT_CERTIFICATE_SUBJECT_DN_ATTRIBUTE, sslSessionStatus.getSubject().getName());
+            attributes.put(CLIENT_CERTIFICATE_ISSUER_DN_ATTRIBUTE, sslSessionStatus.getIssuer().getName());
+        }
+    }
 }
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenTCP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenTCP.java
index 327dc235a1..91549d07a3 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenTCP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenTCP.java
@@ -33,9 +33,9 @@ import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.apache.nifi.web.util.ssl.SslContextUtils;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
 import org.mockito.Mockito;
 
 import javax.net.ssl.SSLContext;
@@ -45,6 +45,8 @@ import java.util.ArrayList;
 import java.util.List;
 
 public class TestListenTCP {
+    private static final String CLIENT_CERTIFICATE_SUBJECT_DN_ATTRIBUTE = "client.certificate.subject.dn";
+    private static final String CLIENT_CERTIFICATE_ISSUER_DN_ATTRIBUTE = "client.certificate.issuer.dn";
     private static final String SSL_CONTEXT_IDENTIFIER = SSLContextService.class.getName();
 
     private static final String LOCALHOST = "localhost";
@@ -56,13 +58,13 @@ public class TestListenTCP {
 
     private TestRunner runner;
 
-    @BeforeClass
+    @BeforeAll
     public static void configureServices() throws TlsException {
         keyStoreSslContext = SslContextUtils.createKeyStoreSslContext();
         trustStoreSslContext = SslContextUtils.createTrustStoreSslContext();
     }
 
-    @Before
+    @BeforeEach
     public void setup() {
         runner = TestRunners.newTestRunner(ListenTCP.class);
     }
@@ -122,6 +124,7 @@ public class TestListenTCP {
 
     @Test
     public void testRunClientAuthRequired() throws Exception {
+        final String expectedDistinguishedName = "CN=localhost";
         runner.setProperty(ListenTCP.CLIENT_AUTH, ClientAuth.REQUIRED.name());
         enableSslContextService(keyStoreSslContext);
 
@@ -137,6 +140,10 @@ public class TestListenTCP {
         List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenTCP.REL_SUCCESS);
         for (int i = 0; i < mockFlowFiles.size(); i++) {
             mockFlowFiles.get(i).assertContentEquals("This is message " + (i + 1));
+            mockFlowFiles.get(i).assertAttributeExists(CLIENT_CERTIFICATE_SUBJECT_DN_ATTRIBUTE);
+            mockFlowFiles.get(i).assertAttributeExists(CLIENT_CERTIFICATE_ISSUER_DN_ATTRIBUTE);
+            mockFlowFiles.get(i).assertAttributeEquals(CLIENT_CERTIFICATE_SUBJECT_DN_ATTRIBUTE, expectedDistinguishedName);
+            mockFlowFiles.get(i).assertAttributeEquals(CLIENT_CERTIFICATE_ISSUER_DN_ATTRIBUTE, expectedDistinguishedName);
         }
     }
 
@@ -157,6 +164,8 @@ public class TestListenTCP {
         List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenTCP.REL_SUCCESS);
         for (int i = 0; i < mockFlowFiles.size(); i++) {
             mockFlowFiles.get(i).assertContentEquals("This is message " + (i + 1));
+            mockFlowFiles.get(i).assertAttributeNotExists(CLIENT_CERTIFICATE_SUBJECT_DN_ATTRIBUTE);
+            mockFlowFiles.get(i).assertAttributeNotExists(CLIENT_CERTIFICATE_ISSUER_DN_ATTRIBUTE);
         }
     }