You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ex...@apache.org on 2022/05/13 20:48:49 UTC
[nifi] branch main updated: NIFI-10010: ListenTCP adds Certificate Subject and Issuer FlowFile attributes
This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 3f16a41ca1 NIFI-10010: ListenTCP adds Certificate Subject and Issuer FlowFile attributes
3f16a41ca1 is described below
commit 3f16a41ca13ecfb3d975f47092f16b0e180fe1c7
Author: Peter Gyori <pe...@gmail.com>
AuthorDate: Tue May 10 18:51:08 2022 +0200
NIFI-10010: ListenTCP adds Certificate Subject and Issuer FlowFile attributes
This closes #6032
Signed-off-by: David Handermann <ex...@apache.org>
---
...ByteArrayMessage.java => SslSessionStatus.java} | 29 ++++++-------
.../event/transport/message/ByteArrayMessage.java | 13 +++++-
.../netty/codec/SocketByteArrayMessageDecoder.java | 48 +++++++++++++++++++++-
.../apache/nifi/processors/standard/ListenTCP.java | 26 +++++++++++-
.../nifi/processors/standard/TestListenTCP.java | 19 ++++++---
5 files changed, 110 insertions(+), 25 deletions(-)
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/message/ByteArrayMessage.java b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/SslSessionStatus.java
similarity index 57%
copy from nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/message/ByteArrayMessage.java
copy to nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/SslSessionStatus.java
index 00744ce06b..011cedb47f 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/message/ByteArrayMessage.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/SslSessionStatus.java
@@ -6,7 +6,7 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -14,28 +14,25 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.nifi.event.transport.message;
-import org.apache.nifi.event.transport.NetworkEvent;
+package org.apache.nifi.event.transport;
-/**
- * Byte Array Message with Sender
- */
-public class ByteArrayMessage implements NetworkEvent {
- private final byte[] message;
+import javax.security.auth.x500.X500Principal;
- private final String sender;
+public class SslSessionStatus {
+ private final X500Principal subject;
+ private final X500Principal issuer;
- public ByteArrayMessage(final byte[] message, final String sender) {
- this.message = message;
- this.sender = sender;
+ public SslSessionStatus(final X500Principal subject, final X500Principal issuer) {
+ this.subject = subject;
+ this.issuer = issuer;
}
- public byte[] getMessage() {
- return message;
+ public X500Principal getSubject() {
+ return subject;
}
- public String getSender() {
- return sender;
+ public X500Principal getIssuer() {
+ return issuer;
}
}
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/message/ByteArrayMessage.java b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/message/ByteArrayMessage.java
index 00744ce06b..9c6e53e42f 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/message/ByteArrayMessage.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/message/ByteArrayMessage.java
@@ -17,6 +17,7 @@
package org.apache.nifi.event.transport.message;
import org.apache.nifi.event.transport.NetworkEvent;
+import org.apache.nifi.event.transport.SslSessionStatus;
/**
* Byte Array Message with Sender
@@ -25,10 +26,16 @@ public class ByteArrayMessage implements NetworkEvent {
private final byte[] message;
private final String sender;
+ private final SslSessionStatus sslSessionStatus;
- public ByteArrayMessage(final byte[] message, final String sender) {
+ public ByteArrayMessage(final byte[] message, final String sender, final SslSessionStatus sslSessionStatus) {
this.message = message;
this.sender = sender;
+ this.sslSessionStatus = sslSessionStatus;
+ }
+
+ public ByteArrayMessage(final byte[] message, final String sender) {
+ this(message, sender, null);
}
public byte[] getMessage() {
@@ -38,4 +45,8 @@ public class ByteArrayMessage implements NetworkEvent {
public String getSender() {
return sender;
}
+
+ public SslSessionStatus getSslSessionStatus() {
+ return sslSessionStatus;
+ }
}
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/codec/SocketByteArrayMessageDecoder.java b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/codec/SocketByteArrayMessageDecoder.java
index ab23dff80d..c7818c8b00 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/codec/SocketByteArrayMessageDecoder.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/codec/SocketByteArrayMessageDecoder.java
@@ -16,17 +16,30 @@
*/
package org.apache.nifi.event.transport.netty.codec;
+import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
+import io.netty.handler.ssl.SslHandler;
+import org.apache.nifi.event.transport.SslSessionStatus;
import org.apache.nifi.event.transport.message.ByteArrayMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import javax.net.ssl.SSLPeerUnverifiedException;
+import javax.net.ssl.SSLSession;
+import javax.security.auth.x500.X500Principal;
import java.net.InetSocketAddress;
+import java.security.cert.Certificate;
+import java.security.cert.X509Certificate;
import java.util.List;
+import java.util.Map;
/**
* Message Decoder for bytes received from Socket Channels
*/
public class SocketByteArrayMessageDecoder extends MessageToMessageDecoder<byte[]> {
+ private static final Logger logger = LoggerFactory.getLogger(SocketByteArrayMessageDecoder.class);
+
/**
* Decode bytes to Byte Array Message with remote address from Channel.remoteAddress()
*
@@ -38,7 +51,40 @@ public class SocketByteArrayMessageDecoder extends MessageToMessageDecoder<byte[
protected void decode(final ChannelHandlerContext channelHandlerContext, final byte[] bytes, final List<Object> decoded) {
final InetSocketAddress remoteAddress = (InetSocketAddress) channelHandlerContext.channel().remoteAddress();
final String address = remoteAddress.getHostString();
- final ByteArrayMessage message = new ByteArrayMessage(bytes, address);
+
+ final SslSessionStatus sslSessionStatus = getSslSessionStatus(channelHandlerContext);
+ final ByteArrayMessage message = new ByteArrayMessage(bytes, address, sslSessionStatus);
+
decoded.add(message);
}
+
+ private SslSessionStatus getSslSessionStatus(final ChannelHandlerContext channelHandlerContext) {
+ SslHandler sslHandler = null;
+ for (final Map.Entry<String, ChannelHandler> entry : channelHandlerContext.channel().pipeline()) {
+ final ChannelHandler channelHandler = entry.getValue();
+ if (channelHandler instanceof SslHandler) {
+ sslHandler = (SslHandler) channelHandler;
+ break;
+ }
+ }
+ return sslHandler == null ? null : createSslSessionStatusFromSslHandler(sslHandler);
+ }
+
+ private SslSessionStatus createSslSessionStatusFromSslHandler(final SslHandler sslHandler) {
+ final SSLSession sslSession = sslHandler.engine().getSession();
+ SslSessionStatus sslSessionStatus = null;
+ try {
+ final Certificate[] certificates = sslSession.getPeerCertificates();
+ if (certificates.length > 0) {
+ final X509Certificate certificate = (X509Certificate) certificates[0];
+ final X500Principal subject = certificate.getSubjectX500Principal();
+ final X500Principal issuer = certificate.getIssuerX500Principal();
+ sslSessionStatus = new SslSessionStatus(subject, issuer);
+ }
+ } catch (final SSLPeerUnverifiedException peerUnverifiedException) {
+ logger.debug("Peer Unverified", peerUnverifiedException);
+ }
+
+ return sslSessionStatus;
+ }
}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java
index fc7695b37a..b30ce88223 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java
@@ -30,6 +30,7 @@ import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.event.transport.EventException;
import org.apache.nifi.event.transport.EventServer;
+import org.apache.nifi.event.transport.SslSessionStatus;
import org.apache.nifi.event.transport.configuration.BufferAllocator;
import org.apache.nifi.event.transport.configuration.TransportProtocol;
import org.apache.nifi.event.transport.message.ByteArrayMessage;
@@ -74,12 +75,24 @@ import java.util.concurrent.LinkedBlockingQueue;
"as the message demarcator. The default behavior is for each message to produce a single FlowFile, however this can " +
"be controlled by increasing the Batch Size to a larger value for higher throughput. The Receive Buffer Size must be " +
"set as large as the largest messages expected to be received, meaning if every 100kb there is a line separator, then " +
- "the Receive Buffer Size must be greater than 100kb.")
+ "the Receive Buffer Size must be greater than 100kb. " +
+ "The processor can be configured to use an SSL Context Service to only allow secure connections. " +
+ "When connected clients present certificates for mutual TLS authentication, the Distinguished Names of the client certificate's " +
+ "issuer and subject are added to the outgoing FlowFiles as attributes. " +
+ "The processor does not perform authorization based on Distinguished Name values, but since these values " +
+ "are attached to the outgoing FlowFiles, authorization can be implemented based on these attributes.")
@WritesAttributes({
@WritesAttribute(attribute="tcp.sender", description="The sending host of the messages."),
- @WritesAttribute(attribute="tcp.port", description="The sending port the messages were received.")
+ @WritesAttribute(attribute="tcp.port", description="The sending port the messages were received."),
+ @WritesAttribute(attribute="client.certificate.issuer.dn", description="For connections using mutual TLS, the Distinguished Name of the " +
+ "Certificate Authority that issued the client's certificate " +
+ "is attached to the FlowFile."),
+ @WritesAttribute(attribute="client.certificate.subject.dn", description="For connections using mutual TLS, the Distinguished Name of the " +
+ "client certificate's owner (subject) is attached to the FlowFile.")
})
public class ListenTCP extends AbstractProcessor {
+ private static final String CLIENT_CERTIFICATE_SUBJECT_DN_ATTRIBUTE = "client.certificate.subject.dn";
+ private static final String CLIENT_CERTIFICATE_ISSUER_DN_ATTRIBUTE = "client.certificate.issuer.dn";
public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("SSL Context Service")
@@ -213,6 +226,7 @@ public class ListenTCP extends AbstractProcessor {
}
final Map<String,String> attributes = getAttributes(entry.getValue());
+ addClientCertificateAttributes(attributes, events.get(0));
flowFile = session.putAllAttributes(flowFile, attributes);
getLogger().debug("Transferring {} to success", flowFile);
@@ -291,4 +305,12 @@ public class ListenTCP extends AbstractProcessor {
}
return eventBatcher;
}
+
+ private void addClientCertificateAttributes(final Map<String, String> attributes, final ByteArrayMessage event) {
+ final SslSessionStatus sslSessionStatus = event.getSslSessionStatus();
+ if (sslSessionStatus != null) {
+ attributes.put(CLIENT_CERTIFICATE_SUBJECT_DN_ATTRIBUTE, sslSessionStatus.getSubject().getName());
+ attributes.put(CLIENT_CERTIFICATE_ISSUER_DN_ATTRIBUTE, sslSessionStatus.getIssuer().getName());
+ }
+ }
}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenTCP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenTCP.java
index 327dc235a1..91549d07a3 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenTCP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenTCP.java
@@ -33,9 +33,9 @@ import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.apache.nifi.web.util.ssl.SslContextUtils;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import javax.net.ssl.SSLContext;
@@ -45,6 +45,8 @@ import java.util.ArrayList;
import java.util.List;
public class TestListenTCP {
+ private static final String CLIENT_CERTIFICATE_SUBJECT_DN_ATTRIBUTE = "client.certificate.subject.dn";
+ private static final String CLIENT_CERTIFICATE_ISSUER_DN_ATTRIBUTE = "client.certificate.issuer.dn";
private static final String SSL_CONTEXT_IDENTIFIER = SSLContextService.class.getName();
private static final String LOCALHOST = "localhost";
@@ -56,13 +58,13 @@ public class TestListenTCP {
private TestRunner runner;
- @BeforeClass
+ @BeforeAll
public static void configureServices() throws TlsException {
keyStoreSslContext = SslContextUtils.createKeyStoreSslContext();
trustStoreSslContext = SslContextUtils.createTrustStoreSslContext();
}
- @Before
+ @BeforeEach
public void setup() {
runner = TestRunners.newTestRunner(ListenTCP.class);
}
@@ -122,6 +124,7 @@ public class TestListenTCP {
@Test
public void testRunClientAuthRequired() throws Exception {
+ final String expectedDistinguishedName = "CN=localhost";
runner.setProperty(ListenTCP.CLIENT_AUTH, ClientAuth.REQUIRED.name());
enableSslContextService(keyStoreSslContext);
@@ -137,6 +140,10 @@ public class TestListenTCP {
List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenTCP.REL_SUCCESS);
for (int i = 0; i < mockFlowFiles.size(); i++) {
mockFlowFiles.get(i).assertContentEquals("This is message " + (i + 1));
+ mockFlowFiles.get(i).assertAttributeExists(CLIENT_CERTIFICATE_SUBJECT_DN_ATTRIBUTE);
+ mockFlowFiles.get(i).assertAttributeExists(CLIENT_CERTIFICATE_ISSUER_DN_ATTRIBUTE);
+ mockFlowFiles.get(i).assertAttributeEquals(CLIENT_CERTIFICATE_SUBJECT_DN_ATTRIBUTE, expectedDistinguishedName);
+ mockFlowFiles.get(i).assertAttributeEquals(CLIENT_CERTIFICATE_ISSUER_DN_ATTRIBUTE, expectedDistinguishedName);
}
}
@@ -157,6 +164,8 @@ public class TestListenTCP {
List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenTCP.REL_SUCCESS);
for (int i = 0; i < mockFlowFiles.size(); i++) {
mockFlowFiles.get(i).assertContentEquals("This is message " + (i + 1));
+ mockFlowFiles.get(i).assertAttributeNotExists(CLIENT_CERTIFICATE_SUBJECT_DN_ATTRIBUTE);
+ mockFlowFiles.get(i).assertAttributeNotExists(CLIENT_CERTIFICATE_ISSUER_DN_ATTRIBUTE);
}
}