You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ij...@apache.org on 2018/11/07 08:16:41 UTC

nifi git commit: NIFI-5752: Load balancing fails with wildcard certs

Repository: nifi
Updated Branches:
  refs/heads/master da1f9eaf6 -> 13232c741


NIFI-5752: Load balancing fails with wildcard certs

NIFI-5752: Remove an unnecessary String.format

NIFI-5752: Remove an unnecessary block

This closes #3110.

Signed-off-by: Koji Kawamura <ij...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/13232c74
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/13232c74
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/13232c74

Branch: refs/heads/master
Commit: 13232c74136e8452b3cbd708e535af7a1fc0d1cb
Parents: da1f9ea
Author: Kotaro Terada <ko...@yahoo-corp.jp>
Authored: Thu Oct 25 19:46:06 2018 +0900
Committer: Koji Kawamura <ij...@apache.org>
Committed: Wed Nov 7 17:16:20 2018 +0900

----------------------------------------------------------------------
 .../server/ClusterLoadBalanceAuthorizer.java    | 55 +++++++++++++++++---
 .../clustered/server/LoadBalanceAuthorizer.java | 11 ++--
 .../server/StandardLoadBalanceProtocol.java     | 39 ++------------
 .../queue/clustered/LoadBalancedQueueIT.java    |  4 +-
 .../server/TestStandardLoadBalanceProtocol.java |  2 +-
 5 files changed, 60 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/13232c74/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/ClusterLoadBalanceAuthorizer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/ClusterLoadBalanceAuthorizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/ClusterLoadBalanceAuthorizer.java
index f0d51c8..fbd849c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/ClusterLoadBalanceAuthorizer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/ClusterLoadBalanceAuthorizer.java
@@ -17,14 +17,23 @@
 
 package org.apache.nifi.controller.queue.clustered.server;
 
+import org.apache.http.conn.ssl.DefaultHostnameVerifier;
 import org.apache.nifi.cluster.coordination.ClusterCoordinator;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.events.EventReporter;
 import org.apache.nifi.reporting.Severity;
+import org.apache.nifi.security.util.CertificateUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Collection;
+import javax.net.ssl.HostnameVerifier;
+import javax.net.ssl.SSLPeerUnverifiedException;
+import javax.net.ssl.SSLSession;
+import javax.net.ssl.SSLSocket;
+import java.io.IOException;
+import java.security.cert.Certificate;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -33,19 +42,27 @@ public class ClusterLoadBalanceAuthorizer implements LoadBalanceAuthorizer {
 
     private final ClusterCoordinator clusterCoordinator;
     private final EventReporter eventReporter;
+    private final HostnameVerifier hostnameVerifier;
 
     public ClusterLoadBalanceAuthorizer(final ClusterCoordinator clusterCoordinator, final EventReporter eventReporter) {
         this.clusterCoordinator = clusterCoordinator;
         this.eventReporter = eventReporter;
+        this.hostnameVerifier = new DefaultHostnameVerifier();
     }
 
     @Override
-    public String authorize(final Collection<String> clientIdentities) throws NotAuthorizedException {
-        if (clientIdentities == null) {
-            logger.debug("Client Identities is null, so assuming that Load Balancing communications are not secure. Authorizing client to participate in Load Balancing");
-            return null;
+    public String authorize(SSLSocket sslSocket) throws NotAuthorizedException, IOException {
+        final SSLSession sslSession = sslSocket.getSession();
+
+        final Set<String> clientIdentities;
+        try {
+            clientIdentities = getCertificateIdentities(sslSession);
+        } catch (final CertificateException e) {
+            throw new IOException("Failed to extract Client Certificate", e);
         }
 
+        logger.debug("Will perform authorization against Client Identities '{}'", clientIdentities);
+
         final Set<String> nodeIds = clusterCoordinator.getNodeIdentifiers().stream()
                 .map(NodeIdentifier::getApiAddress)
                 .collect(Collectors.toSet());
@@ -57,11 +74,35 @@ public class ClusterLoadBalanceAuthorizer implements LoadBalanceAuthorizer {
             }
         }
 
-        final String message = String.format("Authorization failed for Client ID's %s to Load Balance data because none of the ID's are known Cluster Node Identifiers",
-                clientIdentities);
+        // If there are no matches of Client IDs, try to verify it by HostnameVerifier. In this way, we can support wildcard certificates.
+        for (final String nodeId : nodeIds) {
+            if (hostnameVerifier.verify(nodeId, sslSession)) {
+                final String clientId = sslSocket.getInetAddress().getHostName();
+                logger.debug("The request was verified with node '{}'. The hostname derived from the socket is '{}'. Authorizing Client to Load Balance data", nodeId, clientId);
+                return clientId;
+            }
+        }
+
+        final String message = "Authorization failed for Client ID's to Load Balance data because none of the ID's are known Cluster Node Identifiers";
 
         logger.warn(message);
         eventReporter.reportEvent(Severity.WARNING, "Load Balanced Connections", message);
         throw new NotAuthorizedException("Client ID's " + clientIdentities + " are not authorized to Load Balance data");
     }
+
+    private Set<String> getCertificateIdentities(final SSLSession sslSession) throws CertificateException, SSLPeerUnverifiedException {
+        final Certificate[] certs = sslSession.getPeerCertificates();
+        if (certs == null || certs.length == 0) {
+            throw new SSLPeerUnverifiedException("No certificates found");
+        }
+
+        final X509Certificate cert = CertificateUtils.convertAbstractX509Certificate(certs[0]);
+        cert.checkValidity();
+
+        final Set<String> identities = CertificateUtils.getSubjectAlternativeNames(cert).stream()
+                .map(CertificateUtils::extractUsername)
+                .collect(Collectors.toSet());
+
+        return identities;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/13232c74/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/LoadBalanceAuthorizer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/LoadBalanceAuthorizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/LoadBalanceAuthorizer.java
index 3abd328..e8c200b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/LoadBalanceAuthorizer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/LoadBalanceAuthorizer.java
@@ -17,16 +17,17 @@
 
 package org.apache.nifi.controller.queue.clustered.server;
 
-import java.util.Collection;
+import javax.net.ssl.SSLSocket;
+import java.io.IOException;
 
 public interface LoadBalanceAuthorizer {
     /**
-     * Checks if any of the given identities is allowed to load balance data. If so, the identity that has been
-     * permitted is returned. If not, a NotAuthorizedException is thrown.
+     * Checks if the given SSLSocket (which includes identities) is allowed to load balance data. If so, the identity that has been
+     * permitted or hostname derived from the socket is returned. If not, a NotAuthorizedException is thrown.
      *
-     * @param clientIdentities the collection of identities to check
+     * @param sslSocket the SSLSocket which includes identities to check
      * @return the identity that is authorized, or null if the given collection of identities is null
      * @throws NotAuthorizedException if none of the given identities is authorized to load balance data
      */
-    String authorize(Collection<String> clientIdentities) throws NotAuthorizedException;
+    String authorize(SSLSocket sslSocket) throws NotAuthorizedException, IOException;
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/13232c74/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java
index 5c1f8e9..2168f3e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java
@@ -39,15 +39,12 @@ import org.apache.nifi.provenance.ProvenanceRepository;
 import org.apache.nifi.provenance.StandardProvenanceEventRecord;
 import org.apache.nifi.remote.StandardVersionNegotiator;
 import org.apache.nifi.remote.VersionNegotiator;
-import org.apache.nifi.security.util.CertificateUtils;
 import org.apache.nifi.stream.io.ByteCountingInputStream;
 import org.apache.nifi.stream.io.LimitingInputStream;
 import org.apache.nifi.stream.io.StreamUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.net.ssl.SSLPeerUnverifiedException;
-import javax.net.ssl.SSLSession;
 import javax.net.ssl.SSLSocket;
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
@@ -60,14 +57,10 @@ import java.net.Socket;
 import java.net.SocketTimeoutException;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
-import java.security.cert.Certificate;
-import java.security.cert.CertificateException;
-import java.security.cert.X509Certificate;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
@@ -125,20 +118,10 @@ public class StandardLoadBalanceProtocol implements LoadBalanceProtocol {
 
         String peerDescription = socket.getInetAddress().getHostName();
         if (socket instanceof SSLSocket) {
-            final SSLSession sslSession = ((SSLSocket) socket).getSession();
+            logger.debug("Connection received from peer {}", peerDescription);
 
-            final Set<String> certIdentities;
-            try {
-                certIdentities = getCertificateIdentities(sslSession);
-            } catch (final CertificateException e) {
-                throw new IOException("Failed to extract Client Certificate", e);
-            }
-
-            logger.debug("Connection received from peer {}. Will perform authorization against Client Identities '{}'",
-                peerDescription, certIdentities);
-
-            peerDescription = authorizer.authorize(certIdentities);
-            logger.debug("Client Identities {} are authorized to load balance data", certIdentities);
+            peerDescription = authorizer.authorize((SSLSocket) socket);
+            logger.debug("Client Identities are authorized to load balance data for peer {}", peerDescription);
         }
 
         final int version = negotiateProtocolVersion(in, out, peerDescription);
@@ -155,22 +138,6 @@ public class StandardLoadBalanceProtocol implements LoadBalanceProtocol {
         receiveFlowFiles(in, out, peerDescription, version);
     }
 
-    private Set<String> getCertificateIdentities(final SSLSession sslSession) throws CertificateException, SSLPeerUnverifiedException {
-        final Certificate[] certs = sslSession.getPeerCertificates();
-        if (certs == null || certs.length == 0) {
-            throw new SSLPeerUnverifiedException("No certificates found");
-        }
-
-        final X509Certificate cert = CertificateUtils.convertAbstractX509Certificate(certs[0]);
-        cert.checkValidity();
-
-        final Set<String> identities = CertificateUtils.getSubjectAlternativeNames(cert).stream()
-                .map(CertificateUtils::extractUsername)
-                .collect(Collectors.toSet());
-
-        return identities;
-    }
-
 
     protected int negotiateProtocolVersion(final InputStream in, final OutputStream out, final String peerDescription) throws IOException {
         final VersionNegotiator negotiator = new StandardVersionNegotiator(1);

http://git-wip-us.apache.org/repos/asf/nifi/blob/13232c74/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/LoadBalancedQueueIT.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/LoadBalancedQueueIT.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/LoadBalancedQueueIT.java
index dd5db47..615ae00 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/LoadBalancedQueueIT.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/LoadBalancedQueueIT.java
@@ -101,8 +101,8 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 public class LoadBalancedQueueIT {
-    private final LoadBalanceAuthorizer ALWAYS_AUTHORIZED = nodeIds -> nodeIds == null ? null : nodeIds.iterator().next();
-    private final LoadBalanceAuthorizer NEVER_AUTHORIZED = nodeIds -> {
+    private final LoadBalanceAuthorizer ALWAYS_AUTHORIZED = (sslSocket) -> sslSocket == null ? null : "authorized.mydomain.com";
+    private final LoadBalanceAuthorizer NEVER_AUTHORIZED = (sslSocket) -> {
         throw new NotAuthorizedException("Intentional Unit Test Failure - Not Authorized");
     };
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/13232c74/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/server/TestStandardLoadBalanceProtocol.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/server/TestStandardLoadBalanceProtocol.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/server/TestStandardLoadBalanceProtocol.java
index e36ed30..773adac 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/server/TestStandardLoadBalanceProtocol.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/server/TestStandardLoadBalanceProtocol.java
@@ -79,7 +79,7 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.when;
 
 public class TestStandardLoadBalanceProtocol {
-    private final LoadBalanceAuthorizer ALWAYS_AUTHORIZED = nodeIds -> nodeIds == null ? null : nodeIds.iterator().next();
+    private final LoadBalanceAuthorizer ALWAYS_AUTHORIZED = (sslSocket) -> sslSocket == null ? null : "authorized.mydomain.com";
     private FlowFileRepository flowFileRepo;
     private ContentRepository contentRepo;
     private ProvenanceRepository provenanceRepo;