You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2022/05/10 16:55:00 UTC

[GitHub] [nifi] pgyori opened a new pull request, #6032: NIFI-10010: ListenTCP adds client certificate's Subject and Issuer DN…

pgyori opened a new pull request, #6032:
URL: https://github.com/apache/nifi/pull/6032

   …s to flowfiles as attributes
   
   <!-- Licensed to the Apache Software Foundation (ASF) under one or more -->
   <!-- contributor license agreements.  See the NOTICE file distributed with -->
   <!-- this work for additional information regarding copyright ownership. -->
   <!-- The ASF licenses this file to You under the Apache License, Version 2.0 -->
   <!-- (the "License"); you may not use this file except in compliance with -->
   <!-- the License.  You may obtain a copy of the License at -->
   <!--     http://www.apache.org/licenses/LICENSE-2.0 -->
   <!-- Unless required by applicable law or agreed to in writing, software -->
   <!-- distributed under the License is distributed on an "AS IS" BASIS, -->
   <!-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -->
   <!-- See the License for the specific language governing permissions and -->
   <!-- limitations under the License. -->
   
   # Summary
   
   https://issues.apache.org/jira/browse/NIFI-10010
   Implements NIFI-10010. ListenTCP adds the client certificate's Subject DN and Issuer DN values to the outgoing flowfiles as attributes in case of secure connections.
   
   # Tracking
   
   Please complete the following tracking steps prior to pull request creation.
   
   ### Issue Tracking
   
   - [ ] [Apache NiFi Jira](https://issues.apache.org/jira/browse/NIFI) issue created
   
   ### Pull Request Tracking
   
   - [ ] Pull Request title starts with Apache NiFi Jira issue number, such as `NIFI-00000`
   - [ ] Pull Request commit message starts with Apache NiFi Jira issue number, as such `NIFI-00000`
   
   ### Pull Request Formatting
   
   - [ ] Pull Request based on current revision of the `main` branch
   - [ ] Pull Request refers to a feature branch with one commit containing changes
   
   # Verification
   
   Please indicate the verification steps performed prior to pull request creation.
   
   ### Build
   
   - [ ] Build completed using `mvn clean install -P contrib-check`
     - [ ] JDK 8
     - [ ] JDK 11
     - [ ] JDK 17
   
   ### Licensing
   
   - [ ] New dependencies are compatible with the [Apache License 2.0](https://apache.org/licenses/LICENSE-2.0) according to the [License Policy](https://www.apache.org/legal/resolved.html)
   - [ ] New dependencies are documented in applicable `LICENSE` and `NOTICE` files
   
   ### Documentation
   
   - [ ] Documentation formatting appears as expected in rendered files
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] pgyori commented on pull request #6032: NIFI-10010: ListenTCP adds client certificate's Subject and Issuer DN…

Posted by GitBox <gi...@apache.org>.
pgyori commented on PR #6032:
URL: https://github.com/apache/nifi/pull/6032#issuecomment-1126222526

   Thank you again, @exceptionfactory .
   I extended the test case in TestListenTCP as you recommended and also added a check for the new attributes not being present when the connection is not secure.
   I added a test class for SocketByteArrayMessageDecoder with test cases.
   Applied your recommendations almost exactly as you requested. Please check my modifications. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] exceptionfactory closed pull request #6032: NIFI-10010: ListenTCP adds client certificate's Subject and Issuer DN…

Posted by GitBox <gi...@apache.org>.
exceptionfactory closed pull request #6032: NIFI-10010: ListenTCP adds client certificate's Subject and Issuer DN…
URL: https://github.com/apache/nifi/pull/6032


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] pgyori commented on a diff in pull request #6032: NIFI-10010: ListenTCP adds client certificate's Subject and Issuer DN…

Posted by GitBox <gi...@apache.org>.
pgyori commented on code in PR #6032:
URL: https://github.com/apache/nifi/pull/6032#discussion_r872569617


##########
nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/codec/SocketByteArrayMessageDecoder.java:
##########
@@ -38,7 +48,37 @@ public class SocketByteArrayMessageDecoder extends MessageToMessageDecoder<byte[
     protected void decode(final ChannelHandlerContext channelHandlerContext, final byte[] bytes, final List<Object> decoded) {
         final InetSocketAddress remoteAddress = (InetSocketAddress) channelHandlerContext.channel().remoteAddress();
         final String address = remoteAddress.getHostString();
-        final ByteArrayMessage message = new ByteArrayMessage(bytes, address);
+
+        final SslSessionStatus sslSessionStatus = getSslSessionStatus(channelHandlerContext);
+        final ByteArrayMessage message = new ByteArrayMessage(bytes, address, sslSessionStatus);
+
         decoded.add(message);
     }
+
+    private SslSessionStatus getSslSessionStatus(final ChannelHandlerContext channelHandlerContext) {
+        final Iterator<Map.Entry<String, ChannelHandler>> iterator = channelHandlerContext.channel().pipeline().iterator();
+        while (iterator.hasNext()) {
+            final ChannelHandler channelHandler = iterator.next().getValue();
+            if (channelHandler instanceof SslHandler) {
+                return createSslSessionStatusFromSslHandler((SslHandler) channelHandler);
+            }
+        }
+        return null;

Review Comment:
   Thank you! I applied your suggested change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] pgyori commented on pull request #6032: NIFI-10010: ListenTCP adds client certificate's Subject and Issuer DN…

Posted by GitBox <gi...@apache.org>.
pgyori commented on PR #6032:
URL: https://github.com/apache/nifi/pull/6032#issuecomment-1126272477

   I understand your concerns about the test for SocketByteArrayMessageDecoder. I'm OK with omitting that commit. Thanks for the quick feedback!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] exceptionfactory commented on a diff in pull request #6032: NIFI-10010: ListenTCP adds client certificate's Subject and Issuer DN…

Posted by GitBox <gi...@apache.org>.
exceptionfactory commented on code in PR #6032:
URL: https://github.com/apache/nifi/pull/6032#discussion_r869556379


##########
nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/SslSessionStatus.java:
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.event.transport;
+
+public class SslSessionStatus {
+    private final String subjectDN;
+    private final String issuerDN;

Review Comment:
   Recommend using `X500Principal` instead of `String` and renaming these properties to just `subject` and `issuer` in order to provide better fidelity with the `X509Certificate` source of the information.



##########
nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/codec/SocketByteArrayMessageDecoder.java:
##########
@@ -38,7 +47,38 @@ public class SocketByteArrayMessageDecoder extends MessageToMessageDecoder<byte[
     protected void decode(final ChannelHandlerContext channelHandlerContext, final byte[] bytes, final List<Object> decoded) {
         final InetSocketAddress remoteAddress = (InetSocketAddress) channelHandlerContext.channel().remoteAddress();
         final String address = remoteAddress.getHostString();
-        final ByteArrayMessage message = new ByteArrayMessage(bytes, address);
+
+        ByteArrayMessage message = getMessageWithSslSessionStatus(channelHandlerContext, bytes, address);
+        if (message == null) {
+            message = new ByteArrayMessage(bytes, address);
+        }

Review Comment:
   Rather than returning a `ByteArrayMessage` that could be `null`, recommend restructuring the approach.  Creating a method called `getSslSessionStatus()` that takes the `ChannelHandlerContext` could return an `SslSessionStatus` object, which could be `null`, and then that could be passed to the `ByteArrayMessage` constructor without the need for null-checking.



##########
nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/codec/SocketByteArrayMessageDecoder.java:
##########
@@ -38,7 +47,38 @@ public class SocketByteArrayMessageDecoder extends MessageToMessageDecoder<byte[
     protected void decode(final ChannelHandlerContext channelHandlerContext, final byte[] bytes, final List<Object> decoded) {
         final InetSocketAddress remoteAddress = (InetSocketAddress) channelHandlerContext.channel().remoteAddress();
         final String address = remoteAddress.getHostString();
-        final ByteArrayMessage message = new ByteArrayMessage(bytes, address);
+
+        ByteArrayMessage message = getMessageWithSslSessionStatus(channelHandlerContext, bytes, address);
+        if (message == null) {
+            message = new ByteArrayMessage(bytes, address);
+        }
         decoded.add(message);
     }
+
+    private ByteArrayMessage getMessageWithSslSessionStatus(final ChannelHandlerContext channelHandlerContext, final byte[] bytes, final String address) {
+        Iterator<Map.Entry<String, ChannelHandler>> iterator = channelHandlerContext.channel().pipeline().iterator();
+        while (iterator.hasNext()) {
+            final ChannelHandler channelHandler = iterator.next().getValue();
+            if (channelHandler instanceof SslHandler) {
+                return createMessageWithSslSessionStatus((SslHandler)channelHandler, bytes, address);
+            }
+        }
+        return null;
+    }
+
+    private ByteArrayMessage createMessageWithSslSessionStatus(final SslHandler sslHandler, final byte[] bytes, final String address) {
+        final SSLSession sslSession = sslHandler.engine().getSession();
+        try {
+            final Certificate[] certificates = sslSession.getPeerCertificates();
+            if (certificates.length > 0) {
+                final X509Certificate certificate = (X509Certificate) certificates[0];
+                final String subjectDN = certificate.getSubjectDN().toString();
+                final String issuerDN = certificate.getIssuerDN().toString();

Review Comment:
   Following the comment to change the `SslSessionStatus` property types, recommend replacing `getSubjectDN()` and `getIssuerDN()` with `getSubjectX500Principal()` and `getIssuerX500Principal()` since the DN methods are denigrated according to the Java 8 documentation:
   
   https://docs.oracle.com/javase/8/docs/api/java/security/cert/X509Certificate.html#getIssuerDN--



##########
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java:
##########
@@ -291,4 +305,15 @@ protected String getBatchKey(ByteArrayMessage event) {
         }
         return eventBatcher;
     }
+
+    private void addClientCertificateAttributes(final Map<String, String> attributes, final ByteArrayMessage event) {
+        final SslSessionStatus sslSessionStatus = event.getSslSessionStatus();
+        if (sslSessionStatus != null) {
+            attributes.put(CLIENT_CERTIFICATE_SUBJECT_DN_ATTRIBUTE, sslSessionStatus.getSubjectDN());
+            attributes.put(CLIENT_CERTIFICATE_ISSUER_DN_ATTRIBUTE, sslSessionStatus.getIssuerDN());
+        } else {
+            getLogger().debug("Remote Peer [{}] not verified: client certificates not provided",
+                    event.getSender());
+        }

Review Comment:
   Having the debug log seems unnecessary since the absence of the attributes in outgoing FlowFiles indicates the same fact of missing client certificate information.
   ```suggestion
           }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] exceptionfactory commented on a diff in pull request #6032: NIFI-10010: ListenTCP adds client certificate's Subject and Issuer DN…

Posted by GitBox <gi...@apache.org>.
exceptionfactory commented on code in PR #6032:
URL: https://github.com/apache/nifi/pull/6032#discussion_r872569040


##########
nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/test/resources/netty/client_certificate.cer:
##########
@@ -0,0 +1,18 @@
+-----BEGIN CERTIFICATE-----
+MIIC3jCCAcagAwIBAgIEYn5khzANBgkqhkiG9w0BAQsFADAvMRwwGgYDVQQKDBNJ
+c3N1ZXIgT3JnYW5pemF0aW9uMQ8wDQYDVQQDDAZJc3N1ZXIwIBcNMjIwNTEzMTQw
+MDM5WhgPMzAyMTA1MTMxNDAwMzlaMDExHTAbBgNVBAoMFFN1YmplY3QgT3JnYW5p
+emF0aW9uMRAwDgYDVQQDDAdTdWJqZWN0MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8A
+MIIBCgKCAQEA5I2jZf5rr6nCckKfO/9EZnWkD5AjB2P0P1HJQ3OrjHAmfzZGe6mK
+M2pwn+k/dqswkg6lU2F+Ey+SGnK9Y5AJd35nBN3p+jV+BGsxAjxkhdVgOlBK0GyB
+E5n9dcXiu9DrFgif6PwPjQe4eshXRpASTrI/ZPaIXf6m8Jy6gahThNEFGmDwRyll
+iN/29fglii3v+Ih1eRes2gH6LyRRS1jnVVlbZUvOXK+gqY+YqpAhOAIsdNb3zzJ/
+hGcuIfh4yWQwii9xWzFhinOstHFIOCvdf+DlU5LPS55XYq1rZLzTucwot+f5nDYJ
+rvUWzUuqQVGboQfeyuwkbMyoy8SaIeyCpwIDAQABMA0GCSqGSIb3DQEBCwUAA4IB
+AQA3Qw+00q7+NKy3jzz+CTDyFehMZvDo8cDO0xGm2qB4hjcR2Sn0oHCbKMZGjwOv
+U3Aprqqn9/9+hEVtHj7HmcBM4ocbqAXhdKhKOslPYTt9NRLLyHxrdqPQwQ4AzLGG
+GW4kpQQZa1GCaOyXOuAz+LV+vAuFIyvq7PH98yaCIxXrJ2QxhceGZnTtYKSwRnhj
+uoG3R7dwumXjiXI7JzhgxX2WQH6qSx1it/PsI4JqghK1ML0xtH/vH+pY9FbODHta
+K5FnWnS6B35Qbrei1Irf7LmzZU9C+feZOe0MvtSBrbpcX3BSIHBJgwM179MtwYZM
+sFxJtzcHUGnl0K0fwhrs12h+
+-----END CERTIFICATE-----

Review Comment:
   As mentioned, this file should be removed.



##########
nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/test/java/org/apache/nifi/event/transport/netty/SocketByteArrayMessageDecoderTest.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.event.transport.netty;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPipeline;
+import io.netty.handler.ssl.SslHandler;
+import org.apache.nifi.event.transport.SslSessionStatus;
+import org.apache.nifi.event.transport.message.ByteArrayMessage;
+import org.apache.nifi.event.transport.netty.codec.SocketByteArrayMessageDecoder;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLPeerUnverifiedException;
+import javax.net.ssl.SSLSession;
+import javax.security.auth.x500.X500Principal;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.net.InetSocketAddress;
+import java.security.cert.Certificate;
+import java.security.cert.CertificateFactory;
+import java.security.cert.X509Certificate;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+@ExtendWith(MockitoExtension.class)
+public class SocketByteArrayMessageDecoderTest extends SocketByteArrayMessageDecoder {
+
+    private InetSocketAddress inetSocketAddress;
+    private X509Certificate certificate;
+    @Mock
+    private ChannelHandlerContext channelHandlerContext;
+    @Mock
+    private Channel channel;
+    @Mock
+    private ChannelPipeline channelPipeline;
+    @Mock
+    private Iterator<Map.Entry<String, ChannelHandler>> iterator;
+    @Mock
+    private Map.Entry<String, ChannelHandler> pipelineEntry;
+    @Mock
+    private SslHandler sslHandler;
+    @Mock
+    private ChannelOutboundHandlerAdapter nonSslHandler;
+    @Mock
+    private SSLEngine sslEngine;
+    @Mock
+    private SSLSession sslSession;
+
+    @BeforeEach
+    public void init() {
+        inetSocketAddress = new InetSocketAddress("localhost", 21000);
+        Mockito.when(channelHandlerContext.channel()).thenReturn(channel);
+        Mockito.when(channel.remoteAddress()).thenReturn(inetSocketAddress);
+        Mockito.when(channel.pipeline()).thenReturn(channelPipeline);
+        Mockito.when(channelPipeline.iterator()).thenReturn(iterator);
+        Mockito.when(iterator.hasNext()).thenReturn(true).thenReturn(true).thenReturn(true).thenReturn(false);
+        Mockito.when(iterator.next()).thenReturn(pipelineEntry);
+    }
+
+    @Test
+    public void testDecodeMessageSsl() throws Exception {
+        String expectedSubjectDN = "CN=Subject,O=Subject Organization";
+        String expectedIssuerDN = "CN=Issuer,O=Issuer Organization";
+
+        initSsl();
+
+        List<Object> decoded = new ArrayList<>(1);
+        decode(channelHandlerContext, "test".getBytes(), decoded);
+
+        X500Principal actualSubject = ((ByteArrayMessage)decoded.get(0)).getSslSessionStatus().getSubject();
+        X500Principal actualIssuer = ((ByteArrayMessage)decoded.get(0)).getSslSessionStatus().getIssuer();
+        assertEquals(expectedSubjectDN, actualSubject.getName());
+        assertEquals(expectedIssuerDN, actualIssuer.getName());
+    }
+
+    @Test
+    public void testDecodeMessageNoSsl() {
+        initNoSslHandler();
+
+        List<Object> decoded = new ArrayList<>(1);
+        decode(channelHandlerContext, "test".getBytes(), decoded);
+
+        SslSessionStatus sslSessionStatus = ((ByteArrayMessage)decoded.get(0)).getSslSessionStatus();
+        assertNull(sslSessionStatus);
+    }
+
+    @Test
+    public void testDecodeMessageNoCertificates() throws Exception {
+        initSslWithNoCertificates();
+
+        List<Object> decoded = new ArrayList<>(1);
+        decode(channelHandlerContext, "test".getBytes(), decoded);
+
+        SslSessionStatus sslSessionStatus = ((ByteArrayMessage)decoded.get(0)).getSslSessionStatus();
+        assertNull(sslSessionStatus);
+    }
+
+    @Test
+    public void testDecodeMessagePeerUnverified() throws Exception {
+        initSslPeerUnverified();
+
+        List<Object> decoded = new ArrayList<>(1);
+        decode(channelHandlerContext, "test".getBytes(), decoded);
+
+        SslSessionStatus sslSessionStatus = ((ByteArrayMessage)decoded.get(0)).getSslSessionStatus();
+        assertNull(sslSessionStatus);
+    }
+
+    private void initSsl() throws Exception {
+        initSslEngine();
+        try (InputStream inStream = new FileInputStream("src/test/resources/netty/client_certificate.cer")) {
+            CertificateFactory cf = CertificateFactory.getInstance("X.509");
+            certificate = (X509Certificate)cf.generateCertificate(inStream);
+        }

Review Comment:
   Static certificate files should not be checked into version control as they will eventually expire and lead to test failures. See `TemporaryKeyStoreBuilder` for an approach to generating certificates for testing.



##########
nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/pom.xml:
##########
@@ -38,5 +38,9 @@
             <version>1.17.0-SNAPSHOT</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+        </dependency>

Review Comment:
   This JUnit 4 dependency should be removed.



##########
nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/test/java/org/apache/nifi/event/transport/netty/SocketByteArrayMessageDecoderTest.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.event.transport.netty;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPipeline;
+import io.netty.handler.ssl.SslHandler;
+import org.apache.nifi.event.transport.SslSessionStatus;
+import org.apache.nifi.event.transport.message.ByteArrayMessage;
+import org.apache.nifi.event.transport.netty.codec.SocketByteArrayMessageDecoder;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLPeerUnverifiedException;
+import javax.net.ssl.SSLSession;
+import javax.security.auth.x500.X500Principal;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.net.InetSocketAddress;
+import java.security.cert.Certificate;
+import java.security.cert.CertificateFactory;
+import java.security.cert.X509Certificate;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+@ExtendWith(MockitoExtension.class)
+public class SocketByteArrayMessageDecoderTest extends SocketByteArrayMessageDecoder {
+
+    private InetSocketAddress inetSocketAddress;
+    private X509Certificate certificate;
+    @Mock
+    private ChannelHandlerContext channelHandlerContext;
+    @Mock
+    private Channel channel;
+    @Mock
+    private ChannelPipeline channelPipeline;
+    @Mock
+    private Iterator<Map.Entry<String, ChannelHandler>> iterator;
+    @Mock
+    private Map.Entry<String, ChannelHandler> pipelineEntry;
+    @Mock
+    private SslHandler sslHandler;
+    @Mock
+    private ChannelOutboundHandlerAdapter nonSslHandler;
+    @Mock
+    private SSLEngine sslEngine;
+    @Mock
+    private SSLSession sslSession;
+
+    @BeforeEach
+    public void init() {
+        inetSocketAddress = new InetSocketAddress("localhost", 21000);
+        Mockito.when(channelHandlerContext.channel()).thenReturn(channel);
+        Mockito.when(channel.remoteAddress()).thenReturn(inetSocketAddress);
+        Mockito.when(channel.pipeline()).thenReturn(channelPipeline);
+        Mockito.when(channelPipeline.iterator()).thenReturn(iterator);
+        Mockito.when(iterator.hasNext()).thenReturn(true).thenReturn(true).thenReturn(true).thenReturn(false);
+        Mockito.when(iterator.next()).thenReturn(pipelineEntry);
+    }
+
+    @Test
+    public void testDecodeMessageSsl() throws Exception {
+        String expectedSubjectDN = "CN=Subject,O=Subject Organization";
+        String expectedIssuerDN = "CN=Issuer,O=Issuer Organization";
+
+        initSsl();
+
+        List<Object> decoded = new ArrayList<>(1);
+        decode(channelHandlerContext, "test".getBytes(), decoded);
+
+        X500Principal actualSubject = ((ByteArrayMessage)decoded.get(0)).getSslSessionStatus().getSubject();
+        X500Principal actualIssuer = ((ByteArrayMessage)decoded.get(0)).getSslSessionStatus().getIssuer();
+        assertEquals(expectedSubjectDN, actualSubject.getName());
+        assertEquals(expectedIssuerDN, actualIssuer.getName());
+    }
+
+    @Test
+    public void testDecodeMessageNoSsl() {
+        initNoSslHandler();
+
+        List<Object> decoded = new ArrayList<>(1);
+        decode(channelHandlerContext, "test".getBytes(), decoded);

Review Comment:
   The `"test".getBytes()` method should be declared statically, and `getBytes()` should include the Standard UTF-8 character set.



##########
nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/test/java/org/apache/nifi/event/transport/netty/SocketByteArrayMessageDecoderTest.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.event.transport.netty;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPipeline;
+import io.netty.handler.ssl.SslHandler;
+import org.apache.nifi.event.transport.SslSessionStatus;
+import org.apache.nifi.event.transport.message.ByteArrayMessage;
+import org.apache.nifi.event.transport.netty.codec.SocketByteArrayMessageDecoder;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLPeerUnverifiedException;
+import javax.net.ssl.SSLSession;
+import javax.security.auth.x500.X500Principal;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.net.InetSocketAddress;
+import java.security.cert.Certificate;
+import java.security.cert.CertificateFactory;
+import java.security.cert.X509Certificate;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+@ExtendWith(MockitoExtension.class)
+public class SocketByteArrayMessageDecoderTest extends SocketByteArrayMessageDecoder {
+
+    private InetSocketAddress inetSocketAddress;
+    private X509Certificate certificate;
+    @Mock
+    private ChannelHandlerContext channelHandlerContext;
+    @Mock
+    private Channel channel;
+    @Mock
+    private ChannelPipeline channelPipeline;
+    @Mock
+    private Iterator<Map.Entry<String, ChannelHandler>> iterator;
+    @Mock
+    private Map.Entry<String, ChannelHandler> pipelineEntry;
+    @Mock
+    private SslHandler sslHandler;
+    @Mock
+    private ChannelOutboundHandlerAdapter nonSslHandler;
+    @Mock
+    private SSLEngine sslEngine;
+    @Mock
+    private SSLSession sslSession;

Review Comment:
   Although this test is narrowly scoped, the number of mocked components makes it difficult to maintain. It would be better to test this class on conjunction with a functional Netty implementation. With the usage of this class tested through TestListenTCP, it seems better to remove this unit test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] pgyori commented on a diff in pull request #6032: NIFI-10010: ListenTCP adds client certificate's Subject and Issuer DN…

Posted by GitBox <gi...@apache.org>.
pgyori commented on code in PR #6032:
URL: https://github.com/apache/nifi/pull/6032#discussion_r872569152


##########
nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/codec/SocketByteArrayMessageDecoder.java:
##########
@@ -38,7 +48,37 @@ public class SocketByteArrayMessageDecoder extends MessageToMessageDecoder<byte[
     protected void decode(final ChannelHandlerContext channelHandlerContext, final byte[] bytes, final List<Object> decoded) {
         final InetSocketAddress remoteAddress = (InetSocketAddress) channelHandlerContext.channel().remoteAddress();
         final String address = remoteAddress.getHostString();
-        final ByteArrayMessage message = new ByteArrayMessage(bytes, address);
+
+        final SslSessionStatus sslSessionStatus = getSslSessionStatus(channelHandlerContext);
+        final ByteArrayMessage message = new ByteArrayMessage(bytes, address, sslSessionStatus);
+
         decoded.add(message);
     }
+
+    private SslSessionStatus getSslSessionStatus(final ChannelHandlerContext channelHandlerContext) {
+        final Iterator<Map.Entry<String, ChannelHandler>> iterator = channelHandlerContext.channel().pipeline().iterator();
+        while (iterator.hasNext()) {
+            final ChannelHandler channelHandler = iterator.next().getValue();
+            if (channelHandler instanceof SslHandler) {
+                return createSslSessionStatusFromSslHandler((SslHandler) channelHandler);
+            }
+        }
+        return null;
+    }
+
+    private SslSessionStatus createSslSessionStatusFromSslHandler(final SslHandler sslHandler) {
+        final SSLSession sslSession = sslHandler.engine().getSession();
+        try {
+            final Certificate[] certificates = sslSession.getPeerCertificates();
+            if (certificates.length > 0) {
+                final X509Certificate certificate = (X509Certificate) certificates[0];
+                final X500Principal subject = certificate.getSubjectX500Principal();
+                final X500Principal issuer = certificate.getIssuerX500Principal();
+                return new SslSessionStatus(subject, issuer);
+            }
+        } catch (SSLPeerUnverifiedException peerUnverifiedException) {
+            return null;
+        }
+        return null;

Review Comment:
   Unfortunately there is no access to a logger in SocketByteArrayMessageDecoder.
   I refactored the method to have only one return statement, because I agree that it looks cleaner. The catch clause is now empty, which I know is not nice, but I'm not sure we can do anything in that block.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] pgyori commented on a diff in pull request #6032: NIFI-10010: ListenTCP adds client certificate's Subject and Issuer DN…

Posted by GitBox <gi...@apache.org>.
pgyori commented on code in PR #6032:
URL: https://github.com/apache/nifi/pull/6032#discussion_r871442022


##########
nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/codec/SocketByteArrayMessageDecoder.java:
##########
@@ -38,7 +47,38 @@ public class SocketByteArrayMessageDecoder extends MessageToMessageDecoder<byte[
     protected void decode(final ChannelHandlerContext channelHandlerContext, final byte[] bytes, final List<Object> decoded) {
         final InetSocketAddress remoteAddress = (InetSocketAddress) channelHandlerContext.channel().remoteAddress();
         final String address = remoteAddress.getHostString();
-        final ByteArrayMessage message = new ByteArrayMessage(bytes, address);
+
+        ByteArrayMessage message = getMessageWithSslSessionStatus(channelHandlerContext, bytes, address);
+        if (message == null) {
+            message = new ByteArrayMessage(bytes, address);
+        }

Review Comment:
   This is a great idea, makes it much cleaner. Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] pgyori commented on pull request #6032: NIFI-10010: ListenTCP adds client certificate's Subject and Issuer DN…

Posted by GitBox <gi...@apache.org>.
pgyori commented on PR #6032:
URL: https://github.com/apache/nifi/pull/6032#issuecomment-1125054252

   Thank you for your feedback and detailed guidance @exceptionfactory ! I applied the modifications and pushed a new commit. Please check it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] exceptionfactory commented on a diff in pull request #6032: NIFI-10010: ListenTCP adds client certificate's Subject and Issuer DN…

Posted by GitBox <gi...@apache.org>.
exceptionfactory commented on code in PR #6032:
URL: https://github.com/apache/nifi/pull/6032#discussion_r871461795


##########
nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/codec/SocketByteArrayMessageDecoder.java:
##########
@@ -38,7 +48,37 @@ public class SocketByteArrayMessageDecoder extends MessageToMessageDecoder<byte[
     protected void decode(final ChannelHandlerContext channelHandlerContext, final byte[] bytes, final List<Object> decoded) {
         final InetSocketAddress remoteAddress = (InetSocketAddress) channelHandlerContext.channel().remoteAddress();
         final String address = remoteAddress.getHostString();
-        final ByteArrayMessage message = new ByteArrayMessage(bytes, address);
+
+        final SslSessionStatus sslSessionStatus = getSslSessionStatus(channelHandlerContext);
+        final ByteArrayMessage message = new ByteArrayMessage(bytes, address, sslSessionStatus);
+
         decoded.add(message);
     }
+
+    private SslSessionStatus getSslSessionStatus(final ChannelHandlerContext channelHandlerContext) {
+        final Iterator<Map.Entry<String, ChannelHandler>> iterator = channelHandlerContext.channel().pipeline().iterator();
+        while (iterator.hasNext()) {
+            final ChannelHandler channelHandler = iterator.next().getValue();
+            if (channelHandler instanceof SslHandler) {
+                return createSslSessionStatusFromSslHandler((SslHandler) channelHandler);
+            }
+        }
+        return null;
+    }
+
+    private SslSessionStatus createSslSessionStatusFromSslHandler(final SslHandler sslHandler) {
+        final SSLSession sslSession = sslHandler.engine().getSession();
+        try {
+            final Certificate[] certificates = sslSession.getPeerCertificates();
+            if (certificates.length > 0) {
+                final X509Certificate certificate = (X509Certificate) certificates[0];
+                final X500Principal subject = certificate.getSubjectX500Principal();
+                final X500Principal issuer = certificate.getIssuerX500Principal();
+                return new SslSessionStatus(subject, issuer);
+            }
+        } catch (SSLPeerUnverifiedException peerUnverifiedException) {
+            return null;
+        }
+        return null;

Review Comment:
   It would be helpful to adjust this method to have a single return statement, and also log the exception as a warning. It would require getting the remote address to the ChannelHandlerContext for the log.
   ```suggestion
           final SslSessionStatus sslSessionStatus;
           try {
               final Certificate[] certificates = sslSession.getPeerCertificates();
               if (certificates.length > 0) {
                   final X509Certificate certificate = (X509Certificate) certificates[0];
                   final X500Principal subject = certificate.getSubjectX500Principal();
                   final X500Principal issuer = certificate.getIssuerX500Principal();
                   sslSessionStatus = new SslSessionStatus(subject, issuer);
               } else {
                   sslSessionStatus = null;
               }
           } catch (final SSLPeerUnverifiedException e) {
               logger.warn("Peer Unverified [{}]", channelHandlerContext.channel().remoteAddress(), e);
               sslSessionStatus = null;
           }
           return sslSessionStatus;
   ```



##########
nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/codec/SocketByteArrayMessageDecoder.java:
##########
@@ -38,7 +48,37 @@ public class SocketByteArrayMessageDecoder extends MessageToMessageDecoder<byte[
     protected void decode(final ChannelHandlerContext channelHandlerContext, final byte[] bytes, final List<Object> decoded) {
         final InetSocketAddress remoteAddress = (InetSocketAddress) channelHandlerContext.channel().remoteAddress();
         final String address = remoteAddress.getHostString();
-        final ByteArrayMessage message = new ByteArrayMessage(bytes, address);
+
+        final SslSessionStatus sslSessionStatus = getSslSessionStatus(channelHandlerContext);
+        final ByteArrayMessage message = new ByteArrayMessage(bytes, address, sslSessionStatus);
+
         decoded.add(message);
     }
+
+    private SslSessionStatus getSslSessionStatus(final ChannelHandlerContext channelHandlerContext) {
+        final Iterator<Map.Entry<String, ChannelHandler>> iterator = channelHandlerContext.channel().pipeline().iterator();
+        while (iterator.hasNext()) {
+            final ChannelHandler channelHandler = iterator.next().getValue();
+            if (channelHandler instanceof SslHandler) {
+                return createSslSessionStatusFromSslHandler((SslHandler) channelHandler);
+            }
+        }
+        return null;

Review Comment:
   This could be refactored to use a single return statement. ChannelPipeline also implements `Iterable`, so it could be streamlined.
   ```suggestion
           SslHandler sslHandler = null;
           for (final Map.Entry<String, ChannelHandler> entry : channelHandlerContext.channel().pipeline()) {
               final ChannelHandler channelHandler = entry.getValue();
               if (channelHandler instanceof SslHandler) {
                   sslHandler = (SslHandler) channelHandler;
                   break;
               }
           }
           return sslHandler == null ? createSslSessionStatusFromSslHandler(sslHandler);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org