You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ij...@apache.org on 2018/11/07 08:16:41 UTC
nifi git commit: NIFI-5752: Load balancing fails with wildcard certs
Repository: nifi
Updated Branches:
refs/heads/master da1f9eaf6 -> 13232c741
NIFI-5752: Load balancing fails with wildcard certs
NIFI-5752: Remove an unnecessary String.format
NIFI-5752: Remove an unnecessary block
This closes #3110.
Signed-off-by: Koji Kawamura <ij...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/13232c74
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/13232c74
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/13232c74
Branch: refs/heads/master
Commit: 13232c74136e8452b3cbd708e535af7a1fc0d1cb
Parents: da1f9ea
Author: Kotaro Terada <ko...@yahoo-corp.jp>
Authored: Thu Oct 25 19:46:06 2018 +0900
Committer: Koji Kawamura <ij...@apache.org>
Committed: Wed Nov 7 17:16:20 2018 +0900
----------------------------------------------------------------------
.../server/ClusterLoadBalanceAuthorizer.java | 55 +++++++++++++++++---
.../clustered/server/LoadBalanceAuthorizer.java | 11 ++--
.../server/StandardLoadBalanceProtocol.java | 39 ++------------
.../queue/clustered/LoadBalancedQueueIT.java | 4 +-
.../server/TestStandardLoadBalanceProtocol.java | 2 +-
5 files changed, 60 insertions(+), 51 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/13232c74/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/ClusterLoadBalanceAuthorizer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/ClusterLoadBalanceAuthorizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/ClusterLoadBalanceAuthorizer.java
index f0d51c8..fbd849c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/ClusterLoadBalanceAuthorizer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/ClusterLoadBalanceAuthorizer.java
@@ -17,14 +17,23 @@
package org.apache.nifi.controller.queue.clustered.server;
+import org.apache.http.conn.ssl.DefaultHostnameVerifier;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.reporting.Severity;
+import org.apache.nifi.security.util.CertificateUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Collection;
+import javax.net.ssl.HostnameVerifier;
+import javax.net.ssl.SSLPeerUnverifiedException;
+import javax.net.ssl.SSLSession;
+import javax.net.ssl.SSLSocket;
+import java.io.IOException;
+import java.security.cert.Certificate;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
import java.util.Set;
import java.util.stream.Collectors;
@@ -33,19 +42,27 @@ public class ClusterLoadBalanceAuthorizer implements LoadBalanceAuthorizer {
private final ClusterCoordinator clusterCoordinator;
private final EventReporter eventReporter;
+ private final HostnameVerifier hostnameVerifier;
public ClusterLoadBalanceAuthorizer(final ClusterCoordinator clusterCoordinator, final EventReporter eventReporter) {
this.clusterCoordinator = clusterCoordinator;
this.eventReporter = eventReporter;
+ this.hostnameVerifier = new DefaultHostnameVerifier();
}
@Override
- public String authorize(final Collection<String> clientIdentities) throws NotAuthorizedException {
- if (clientIdentities == null) {
- logger.debug("Client Identities is null, so assuming that Load Balancing communications are not secure. Authorizing client to participate in Load Balancing");
- return null;
+ public String authorize(SSLSocket sslSocket) throws NotAuthorizedException, IOException {
+ final SSLSession sslSession = sslSocket.getSession();
+
+ final Set<String> clientIdentities;
+ try {
+ clientIdentities = getCertificateIdentities(sslSession);
+ } catch (final CertificateException e) {
+ throw new IOException("Failed to extract Client Certificate", e);
}
+ logger.debug("Will perform authorization against Client Identities '{}'", clientIdentities);
+
final Set<String> nodeIds = clusterCoordinator.getNodeIdentifiers().stream()
.map(NodeIdentifier::getApiAddress)
.collect(Collectors.toSet());
@@ -57,11 +74,35 @@ public class ClusterLoadBalanceAuthorizer implements LoadBalanceAuthorizer {
}
}
- final String message = String.format("Authorization failed for Client ID's %s to Load Balance data because none of the ID's are known Cluster Node Identifiers",
- clientIdentities);
+ // If there are no matches of Client IDs, try to verify it by HostnameVerifier. In this way, we can support wildcard certificates.
+ for (final String nodeId : nodeIds) {
+ if (hostnameVerifier.verify(nodeId, sslSession)) {
+ final String clientId = sslSocket.getInetAddress().getHostName();
+ logger.debug("The request was verified with node '{}'. The hostname derived from the socket is '{}'. Authorizing Client to Load Balance data", nodeId, clientId);
+ return clientId;
+ }
+ }
+
+ final String message = "Authorization failed for Client ID's to Load Balance data because none of the ID's are known Cluster Node Identifiers";
logger.warn(message);
eventReporter.reportEvent(Severity.WARNING, "Load Balanced Connections", message);
throw new NotAuthorizedException("Client ID's " + clientIdentities + " are not authorized to Load Balance data");
}
+
+ private Set<String> getCertificateIdentities(final SSLSession sslSession) throws CertificateException, SSLPeerUnverifiedException {
+ final Certificate[] certs = sslSession.getPeerCertificates();
+ if (certs == null || certs.length == 0) {
+ throw new SSLPeerUnverifiedException("No certificates found");
+ }
+
+ final X509Certificate cert = CertificateUtils.convertAbstractX509Certificate(certs[0]);
+ cert.checkValidity();
+
+ final Set<String> identities = CertificateUtils.getSubjectAlternativeNames(cert).stream()
+ .map(CertificateUtils::extractUsername)
+ .collect(Collectors.toSet());
+
+ return identities;
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/13232c74/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/LoadBalanceAuthorizer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/LoadBalanceAuthorizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/LoadBalanceAuthorizer.java
index 3abd328..e8c200b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/LoadBalanceAuthorizer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/LoadBalanceAuthorizer.java
@@ -17,16 +17,17 @@
package org.apache.nifi.controller.queue.clustered.server;
-import java.util.Collection;
+import javax.net.ssl.SSLSocket;
+import java.io.IOException;
public interface LoadBalanceAuthorizer {
/**
- * Checks if any of the given identities is allowed to load balance data. If so, the identity that has been
- * permitted is returned. If not, a NotAuthorizedException is thrown.
+ * Checks if the given SSLSocket (which includes identities) is allowed to load balance data. If so, the identity that has been
+ * permitted or hostname derived from the socket is returned. If not, a NotAuthorizedException is thrown.
*
- * @param clientIdentities the collection of identities to check
+ * @param sslSocket the SSLSocket which includes identities to check
* @return the identity that is authorized, or null if the given collection of identities is null
* @throws NotAuthorizedException if none of the given identities is authorized to load balance data
*/
- String authorize(Collection<String> clientIdentities) throws NotAuthorizedException;
+ String authorize(SSLSocket sslSocket) throws NotAuthorizedException, IOException;
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/13232c74/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java
index 5c1f8e9..2168f3e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java
@@ -39,15 +39,12 @@ import org.apache.nifi.provenance.ProvenanceRepository;
import org.apache.nifi.provenance.StandardProvenanceEventRecord;
import org.apache.nifi.remote.StandardVersionNegotiator;
import org.apache.nifi.remote.VersionNegotiator;
-import org.apache.nifi.security.util.CertificateUtils;
import org.apache.nifi.stream.io.ByteCountingInputStream;
import org.apache.nifi.stream.io.LimitingInputStream;
import org.apache.nifi.stream.io.StreamUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.net.ssl.SSLPeerUnverifiedException;
-import javax.net.ssl.SSLSession;
import javax.net.ssl.SSLSocket;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
@@ -60,14 +57,10 @@ import java.net.Socket;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
-import java.security.cert.Certificate;
-import java.security.cert.CertificateException;
-import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
@@ -125,20 +118,10 @@ public class StandardLoadBalanceProtocol implements LoadBalanceProtocol {
String peerDescription = socket.getInetAddress().getHostName();
if (socket instanceof SSLSocket) {
- final SSLSession sslSession = ((SSLSocket) socket).getSession();
+ logger.debug("Connection received from peer {}", peerDescription);
- final Set<String> certIdentities;
- try {
- certIdentities = getCertificateIdentities(sslSession);
- } catch (final CertificateException e) {
- throw new IOException("Failed to extract Client Certificate", e);
- }
-
- logger.debug("Connection received from peer {}. Will perform authorization against Client Identities '{}'",
- peerDescription, certIdentities);
-
- peerDescription = authorizer.authorize(certIdentities);
- logger.debug("Client Identities {} are authorized to load balance data", certIdentities);
+ peerDescription = authorizer.authorize((SSLSocket) socket);
+ logger.debug("Client Identities are authorized to load balance data for peer {}", peerDescription);
}
final int version = negotiateProtocolVersion(in, out, peerDescription);
@@ -155,22 +138,6 @@ public class StandardLoadBalanceProtocol implements LoadBalanceProtocol {
receiveFlowFiles(in, out, peerDescription, version);
}
- private Set<String> getCertificateIdentities(final SSLSession sslSession) throws CertificateException, SSLPeerUnverifiedException {
- final Certificate[] certs = sslSession.getPeerCertificates();
- if (certs == null || certs.length == 0) {
- throw new SSLPeerUnverifiedException("No certificates found");
- }
-
- final X509Certificate cert = CertificateUtils.convertAbstractX509Certificate(certs[0]);
- cert.checkValidity();
-
- final Set<String> identities = CertificateUtils.getSubjectAlternativeNames(cert).stream()
- .map(CertificateUtils::extractUsername)
- .collect(Collectors.toSet());
-
- return identities;
- }
-
protected int negotiateProtocolVersion(final InputStream in, final OutputStream out, final String peerDescription) throws IOException {
final VersionNegotiator negotiator = new StandardVersionNegotiator(1);
http://git-wip-us.apache.org/repos/asf/nifi/blob/13232c74/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/LoadBalancedQueueIT.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/LoadBalancedQueueIT.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/LoadBalancedQueueIT.java
index dd5db47..615ae00 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/LoadBalancedQueueIT.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/LoadBalancedQueueIT.java
@@ -101,8 +101,8 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class LoadBalancedQueueIT {
- private final LoadBalanceAuthorizer ALWAYS_AUTHORIZED = nodeIds -> nodeIds == null ? null : nodeIds.iterator().next();
- private final LoadBalanceAuthorizer NEVER_AUTHORIZED = nodeIds -> {
+ private final LoadBalanceAuthorizer ALWAYS_AUTHORIZED = (sslSocket) -> sslSocket == null ? null : "authorized.mydomain.com";
+ private final LoadBalanceAuthorizer NEVER_AUTHORIZED = (sslSocket) -> {
throw new NotAuthorizedException("Intentional Unit Test Failure - Not Authorized");
};
http://git-wip-us.apache.org/repos/asf/nifi/blob/13232c74/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/server/TestStandardLoadBalanceProtocol.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/server/TestStandardLoadBalanceProtocol.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/server/TestStandardLoadBalanceProtocol.java
index e36ed30..773adac 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/server/TestStandardLoadBalanceProtocol.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/server/TestStandardLoadBalanceProtocol.java
@@ -79,7 +79,7 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.when;
public class TestStandardLoadBalanceProtocol {
- private final LoadBalanceAuthorizer ALWAYS_AUTHORIZED = nodeIds -> nodeIds == null ? null : nodeIds.iterator().next();
+ private final LoadBalanceAuthorizer ALWAYS_AUTHORIZED = (sslSocket) -> sslSocket == null ? null : "authorized.mydomain.com";
private FlowFileRepository flowFileRepo;
private ContentRepository contentRepo;
private ProvenanceRepository provenanceRepo;