You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ma...@apache.org on 2018/10/25 11:40:46 UTC

[kafka] branch trunk updated: KAFKA-5462: Add configuration to build custom SSL principal name (KIP-371)

This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 32e1da5  KAFKA-5462: Add configuration to build custom SSL principal name (KIP-371)
32e1da5 is described below

commit 32e1da570a8aa2ac87d9b5ab1cda1e3cdb867874
Author: Manikumar Reddy <ma...@gmail.com>
AuthorDate: Thu Oct 25 17:10:14 2018 +0530

    KAFKA-5462: Add configuration to build custom SSL principal name (KIP-371)
    
    Author: Manikumar Reddy <ma...@gmail.com>
    
    Reviewers: Sriharsha Chintalapani <sr...@apache.org>
    
    Closes #5684 from omkreddy/KAFKA-5462-SSL-Name
---
 .../config/internals/BrokerSecurityConfigs.java    |  13 +-
 .../kafka/common/network/ChannelBuilders.java      |   6 +-
 .../common/network/PlaintextChannelBuilder.java    |   2 +-
 .../kafka/common/network/SslChannelBuilder.java    |  14 +-
 .../DefaultKafkaPrincipalBuilder.java              |  31 +++-
 .../authenticator/SaslServerAuthenticator.java     |   2 +-
 .../common/security/ssl/SslPrincipalMapper.java    | 197 +++++++++++++++++++++
 .../kafka/common/network/ChannelBuildersTest.java  |   4 +-
 .../auth/DefaultKafkaPrincipalBuilderTest.java     |  67 ++++++-
 .../security/ssl/SslPrincipalMapperTest.java       |  85 +++++++++
 core/src/main/scala/kafka/server/KafkaConfig.scala |   4 +
 .../scala/unit/kafka/server/KafkaConfigTest.scala  |   1 +
 docs/security.html                                 |  31 +++-
 13 files changed, 437 insertions(+), 20 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java b/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java
index a29d806..e3a8a77 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java
@@ -34,17 +34,28 @@ public class BrokerSecurityConfigs {
     public static final String SSL_CLIENT_AUTH_CONFIG = "ssl.client.auth";
     public static final String SASL_ENABLED_MECHANISMS_CONFIG = "sasl.enabled.mechanisms";
     public static final String SASL_SERVER_CALLBACK_HANDLER_CLASS = "sasl.server.callback.handler.class";
+    public static final String SSL_PRINCIPAL_MAPPING_RULES_CONFIG = "ssl.principal.mapping.rules";
 
     public static final String PRINCIPAL_BUILDER_CLASS_DOC = "The fully qualified name of a class that implements the " +
             "KafkaPrincipalBuilder interface, which is used to build the KafkaPrincipal object used during " +
             "authorization. This config also supports the deprecated PrincipalBuilder interface which was previously " +
             "used for client authentication over SSL. If no principal builder is defined, the default behavior depends " +
-            "on the security protocol in use. For SSL authentication, the principal name will be the distinguished " +
+            "on the security protocol in use. For SSL authentication,  the principal will be derived using the" +
+            " rules defined by <code>" + SSL_PRINCIPAL_MAPPING_RULES_CONFIG + "</code> applied on the distinguished " +
             "name from the client certificate if one is provided; otherwise, if client authentication is not required, " +
             "the principal name will be ANONYMOUS. For SASL authentication, the principal will be derived using the " +
             "rules defined by <code>" + SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_CONFIG + "</code> if GSSAPI is in use, " +
             "and the SASL authentication ID for other mechanisms. For PLAINTEXT, the principal will be ANONYMOUS.";
 
+    public static final String SSL_PRINCIPAL_MAPPING_RULES_DOC = "A list of rules for mapping from distinguished name" +
+            " from the client certificate to short name. The rules are evaluated in order and the first rule that matches" +
+            " a principal name is used to map it to a short name. Any later rules in the list are ignored. By default," +
+            " distinguished name of the X.500 certificate will be the principal. For more details on the format please" +
+            " see <a href=\"#security_authz\"> security authorization and acls</a>. Note that this configuration is ignored" +
+            " if an extension of KafkaPrincipalBuilder is provided by the <code>" + PRINCIPAL_BUILDER_CLASS_CONFIG + "</code>" +
+           " configuration.";
+    public static final List<String> DEFAULT_SSL_PRINCIPAL_MAPPING_RULES = Collections.singletonList("DEFAULT");
+
     public static final String SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_DOC = "A list of rules for mapping from principal " +
             "names to short names (typically operating system usernames). The rules are evaluated in order and the " +
             "first rule that matches a principal name is used to map it to a short name. Any later rules in the list are " +
diff --git a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
index 10779b7..b3040f3 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
@@ -26,6 +26,7 @@ import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuild
 import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder;
 import org.apache.kafka.common.security.authenticator.CredentialCache;
 import org.apache.kafka.common.security.kerberos.KerberosShortNamer;
+import org.apache.kafka.common.security.ssl.SslPrincipalMapper;
 import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache;
 import org.apache.kafka.common.utils.Utils;
 
@@ -163,12 +164,13 @@ public class ChannelBuilders {
     public static KafkaPrincipalBuilder createPrincipalBuilder(Map<String, ?> configs,
                                                                TransportLayer transportLayer,
                                                                Authenticator authenticator,
-                                                               KerberosShortNamer kerberosShortNamer) {
+                                                               KerberosShortNamer kerberosShortNamer,
+                                                               SslPrincipalMapper sslPrincipalMapper) {
         Class<?> principalBuilderClass = (Class<?>) configs.get(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG);
         final KafkaPrincipalBuilder builder;
 
         if (principalBuilderClass == null || principalBuilderClass == DefaultKafkaPrincipalBuilder.class) {
-            builder = new DefaultKafkaPrincipalBuilder(kerberosShortNamer);
+            builder = new DefaultKafkaPrincipalBuilder(kerberosShortNamer, sslPrincipalMapper);
         } else if (KafkaPrincipalBuilder.class.isAssignableFrom(principalBuilderClass)) {
             builder = (KafkaPrincipalBuilder) Utils.newInstance(principalBuilderClass);
         } else if (org.apache.kafka.common.security.auth.PrincipalBuilder.class.isAssignableFrom(principalBuilderClass)) {
diff --git a/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java
index e397f05..e5dc778 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java
@@ -70,7 +70,7 @@ public class PlaintextChannelBuilder implements ChannelBuilder {
 
         private PlaintextAuthenticator(Map<String, ?> configs, PlaintextTransportLayer transportLayer, ListenerName listenerName) {
             this.transportLayer = transportLayer;
-            this.principalBuilder = ChannelBuilders.createPrincipalBuilder(configs, transportLayer, this, null);
+            this.principalBuilder = ChannelBuilders.createPrincipalBuilder(configs, transportLayer, this, null, null);
             this.listenerName = listenerName;
         }
 
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
index 86d41d0..ffa8deb 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
@@ -18,10 +18,12 @@ package org.apache.kafka.common.network;
 
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
 import org.apache.kafka.common.memory.MemoryPool;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
 import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder;
 import org.apache.kafka.common.security.auth.SslAuthenticationContext;
+import org.apache.kafka.common.security.ssl.SslPrincipalMapper;
 import org.apache.kafka.common.security.ssl.SslFactory;
 import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
@@ -33,6 +35,7 @@ import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.SocketChannel;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -44,6 +47,7 @@ public class SslChannelBuilder implements ChannelBuilder, ListenerReconfigurable
     private SslFactory sslFactory;
     private Mode mode;
     private Map<String, ?> configs;
+    private SslPrincipalMapper sslPrincipalMapper;
 
     /**
      * Constructs a SSL channel builder. ListenerName is provided only
@@ -58,6 +62,10 @@ public class SslChannelBuilder implements ChannelBuilder, ListenerReconfigurable
     public void configure(Map<String, ?> configs) throws KafkaException {
         try {
             this.configs = configs;
+            @SuppressWarnings("unchecked")
+            List<String> sslPrincipalMappingRules = (List<String>) configs.get(BrokerSecurityConfigs.SSL_PRINCIPAL_MAPPING_RULES_CONFIG);
+            if (sslPrincipalMappingRules != null)
+                sslPrincipalMapper = SslPrincipalMapper.fromRules(sslPrincipalMappingRules);
             this.sslFactory = new SslFactory(mode, null, isInterBrokerListener);
             this.sslFactory.configure(this.configs);
         } catch (Exception e) {
@@ -89,7 +97,7 @@ public class SslChannelBuilder implements ChannelBuilder, ListenerReconfigurable
     public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize, MemoryPool memoryPool) throws KafkaException {
         try {
             SslTransportLayer transportLayer = buildTransportLayer(sslFactory, id, key, peerHost(key));
-            Authenticator authenticator = new SslAuthenticator(configs, transportLayer, listenerName);
+            Authenticator authenticator = new SslAuthenticator(configs, transportLayer, listenerName, sslPrincipalMapper);
             return new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize,
                     memoryPool != null ? memoryPool : MemoryPool.NONE);
         } catch (Exception e) {
@@ -154,9 +162,9 @@ public class SslChannelBuilder implements ChannelBuilder, ListenerReconfigurable
         private final KafkaPrincipalBuilder principalBuilder;
         private final ListenerName listenerName;
 
-        private SslAuthenticator(Map<String, ?> configs, SslTransportLayer transportLayer, ListenerName listenerName) {
+        private SslAuthenticator(Map<String, ?> configs, SslTransportLayer transportLayer, ListenerName listenerName, SslPrincipalMapper sslPrincipalMapper) {
             this.transportLayer = transportLayer;
-            this.principalBuilder = ChannelBuilders.createPrincipalBuilder(configs, transportLayer, this, null);
+            this.principalBuilder = ChannelBuilders.createPrincipalBuilder(configs, transportLayer, this, null, sslPrincipalMapper);
             this.listenerName = listenerName;
         }
         /**
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/DefaultKafkaPrincipalBuilder.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/DefaultKafkaPrincipalBuilder.java
index 30b0a3e..38c303f 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/DefaultKafkaPrincipalBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/DefaultKafkaPrincipalBuilder.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.common.security.authenticator;
 
+import javax.security.auth.x500.X500Principal;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.config.SaslConfigs;
 import org.apache.kafka.common.network.Authenticator;
@@ -32,6 +33,8 @@ import org.apache.kafka.common.security.kerberos.KerberosShortNamer;
 import javax.net.ssl.SSLPeerUnverifiedException;
 import javax.net.ssl.SSLSession;
 import javax.security.sasl.SaslServer;
+import org.apache.kafka.common.security.ssl.SslPrincipalMapper;
+
 import java.io.Closeable;
 import java.io.IOException;
 import java.security.Principal;
@@ -55,6 +58,7 @@ public class DefaultKafkaPrincipalBuilder implements KafkaPrincipalBuilder, Clos
     private final Authenticator authenticator;
     private final TransportLayer transportLayer;
     private final KerberosShortNamer kerberosShortNamer;
+    private final SslPrincipalMapper sslPrincipalMapper;
 
     /**
      * Construct a new instance which wraps an instance of the older {@link org.apache.kafka.common.security.auth.PrincipalBuilder}.
@@ -73,27 +77,31 @@ public class DefaultKafkaPrincipalBuilder implements KafkaPrincipalBuilder, Clos
                 requireNonNull(authenticator),
                 requireNonNull(transportLayer),
                 requireNonNull(oldPrincipalBuilder),
-                kerberosShortNamer);
+                kerberosShortNamer,
+                null);
     }
 
     @SuppressWarnings("deprecation")
     private DefaultKafkaPrincipalBuilder(Authenticator authenticator,
                                          TransportLayer transportLayer,
                                          org.apache.kafka.common.security.auth.PrincipalBuilder oldPrincipalBuilder,
-                                         KerberosShortNamer kerberosShortNamer) {
+                                         KerberosShortNamer kerberosShortNamer,
+                                         SslPrincipalMapper sslPrincipalMapper) {
         this.authenticator = authenticator;
         this.transportLayer = transportLayer;
         this.oldPrincipalBuilder = oldPrincipalBuilder;
         this.kerberosShortNamer = kerberosShortNamer;
+        this.sslPrincipalMapper =  sslPrincipalMapper;
     }
 
     /**
      * Construct a new instance.
      *
      * @param kerberosShortNamer Kerberos name rewrite rules or null if none have been configured
+     * @param sslPrincipalMapper SSL Principal mapper or null if none have been configured
      */
-    public DefaultKafkaPrincipalBuilder(KerberosShortNamer kerberosShortNamer) {
-        this(null, null, null, kerberosShortNamer);
+    public DefaultKafkaPrincipalBuilder(KerberosShortNamer kerberosShortNamer, SslPrincipalMapper sslPrincipalMapper) {
+        this(null, null, null, kerberosShortNamer, sslPrincipalMapper);
     }
 
     @Override
@@ -110,7 +118,7 @@ public class DefaultKafkaPrincipalBuilder implements KafkaPrincipalBuilder, Clos
                 return convertToKafkaPrincipal(oldPrincipalBuilder.buildPrincipal(transportLayer, authenticator));
 
             try {
-                return convertToKafkaPrincipal(sslSession.getPeerPrincipal());
+                return applySslPrincipalMapper(sslSession.getPeerPrincipal());
             } catch (SSLPeerUnverifiedException se) {
                 return KafkaPrincipal.ANONYMOUS;
             }
@@ -136,6 +144,19 @@ public class DefaultKafkaPrincipalBuilder implements KafkaPrincipalBuilder, Clos
         }
     }
 
+    private KafkaPrincipal applySslPrincipalMapper(Principal principal) {
+        try {
+            if (!(principal instanceof X500Principal) || principal == KafkaPrincipal.ANONYMOUS) {
+                return new KafkaPrincipal(KafkaPrincipal.USER_TYPE, principal.getName());
+            } else {
+                return new KafkaPrincipal(KafkaPrincipal.USER_TYPE, sslPrincipalMapper.getName(principal.getName()));
+            }
+        } catch (IOException e) {
+            throw new KafkaException("Failed to map name for '" + principal.getName() +
+                    "' based on SSL principal mapping rules.", e);
+        }
+    }
+
     private KafkaPrincipal convertToKafkaPrincipal(Principal principal) {
         return new KafkaPrincipal(KafkaPrincipal.USER_TYPE, principal.getName());
     }
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
index 48a49fe..4db1971 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
@@ -153,7 +153,7 @@ public class SaslServerAuthenticator implements Authenticator {
 
         // Note that the old principal builder does not support SASL, so we do not need to pass the
         // authenticator or the transport layer
-        this.principalBuilder = ChannelBuilders.createPrincipalBuilder(configs, null, null, kerberosNameParser);
+        this.principalBuilder = ChannelBuilders.createPrincipalBuilder(configs, null, null, kerberosNameParser, null);
     }
 
     private void createSaslServer(String mechanism) throws IOException {
diff --git a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslPrincipalMapper.java b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslPrincipalMapper.java
new file mode 100644
index 0000000..7ec4a79
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslPrincipalMapper.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.ssl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Locale;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class SslPrincipalMapper {
+
+    private static final Pattern RULE_PARSER = Pattern.compile("((DEFAULT)|(RULE:(([^/]*)/([^/]*))/([LU])?))");
+
+    private final List<Rule> rules;
+
+    public SslPrincipalMapper(List<Rule> sslPrincipalMappingRules) {
+        this.rules = sslPrincipalMappingRules;
+    }
+
+    public static SslPrincipalMapper fromRules(List<String> sslPrincipalMappingRules) {
+        List<String> rules = sslPrincipalMappingRules == null ? Collections.singletonList("DEFAULT") : sslPrincipalMappingRules;
+        return new SslPrincipalMapper(parseRules(rules));
+    }
+
+    private static List<Rule> parseRules(List<String> rules) {
+        List<Rule> result = new ArrayList<>();
+        for (String rule : rules) {
+            Matcher matcher = RULE_PARSER.matcher(rule);
+            if (!matcher.lookingAt()) {
+                throw new IllegalArgumentException("Invalid rule: " + rule);
+            }
+            if (rule.length() != matcher.end()) {
+                throw new IllegalArgumentException("Invalid rule: `" + rule + "`, unmatched substring: `" + rule.substring(matcher.end()) + "`");
+            }
+            if (matcher.group(2) != null) {
+                result.add(new Rule());
+            } else {
+                result.add(new Rule(matcher.group(5),
+                                    matcher.group(6),
+                                    "L".equals(matcher.group(7)),
+                                    "U".equals(matcher.group(7))));
+            }
+        }
+        return result;
+    }
+
+    public String getName(String distinguishedName) throws IOException {
+        for (Rule r : rules) {
+            String principalName = r.apply(distinguishedName);
+            if (principalName != null) {
+                return principalName;
+            }
+        }
+        throw new NoMatchingRule("No rules apply to " + distinguishedName + ", rules " + rules);
+    }
+
+    @Override
+    public String toString() {
+        return "SslPrincipalMapper(rules = " + rules + ")";
+    }
+
+    public static class NoMatchingRule extends IOException {
+        NoMatchingRule(String msg) {
+            super(msg);
+        }
+    }
+
+    private static class Rule {
+        private static final Pattern BACK_REFERENCE_PATTERN = Pattern.compile("\\$(\\d+)");
+
+        private final boolean isDefault;
+        private final Pattern pattern;
+        private final String replacement;
+        private final boolean toLowerCase;
+        private final boolean toUpperCase;
+
+        Rule() {
+            isDefault = true;
+            pattern = null;
+            replacement = null;
+            toLowerCase = false;
+            toUpperCase = false;
+        }
+
+        Rule(String pattern, String replacement, boolean toLowerCase, boolean toUpperCase) {
+            isDefault = false;
+            this.pattern = pattern == null ? null : Pattern.compile(pattern);
+            this.replacement = replacement;
+            this.toLowerCase = toLowerCase;
+            this.toUpperCase = toUpperCase;
+        }
+
+        String apply(String distinguishedName) {
+            if (isDefault) {
+                return distinguishedName;
+            }
+
+            String result = null;
+            final Matcher m = pattern.matcher(distinguishedName);
+
+            if (m.matches()) {
+                result = distinguishedName.replaceAll(pattern.pattern(), escapeLiteralBackReferences(replacement, m.groupCount()));
+            }
+
+            if (toLowerCase && result != null) {
+                result = result.toLowerCase(Locale.ENGLISH);
+            } else if (toUpperCase & result != null) {
+                result = result.toUpperCase(Locale.ENGLISH);
+            }
+
+            return result;
+        }
+
+        //If we find a back reference that is not valid, then we will treat it as a literal string. For example, if we have 3 capturing
+        //groups and the Replacement Value has the value is "$1@$4", then we want to treat the $4 as a literal "$4", rather
+        //than attempting to use it as a back reference.
+        //This method was taken from Apache Nifi project : org.apache.nifi.authorization.util.IdentityMappingUtil
+        private String escapeLiteralBackReferences(final String unescaped, final int numCapturingGroups) {
+            if (numCapturingGroups == 0) {
+                return unescaped;
+            }
+
+            String value = unescaped;
+            final Matcher backRefMatcher = BACK_REFERENCE_PATTERN.matcher(value);
+            while (backRefMatcher.find()) {
+                final String backRefNum = backRefMatcher.group(1);
+                if (backRefNum.startsWith("0")) {
+                    continue;
+                }
+                final int originalBackRefIndex = Integer.parseInt(backRefNum);
+                int backRefIndex = originalBackRefIndex;
+
+
+                // if we have a replacement value like $123, and we have less than 123 capturing groups, then
+                // we want to truncate the 3 and use capturing group 12; if we have less than 12 capturing groups,
+                // then we want to truncate the 2 and use capturing group 1; if we don't have a capturing group then
+                // we want to truncate the 1 and get 0.
+                while (backRefIndex > numCapturingGroups && backRefIndex >= 10) {
+                    backRefIndex /= 10;
+                }
+
+                if (backRefIndex > numCapturingGroups) {
+                    final StringBuilder sb = new StringBuilder(value.length() + 1);
+                    final int groupStart = backRefMatcher.start(1);
+
+                    sb.append(value.substring(0, groupStart - 1));
+                    sb.append("\\");
+                    sb.append(value.substring(groupStart - 1));
+                    value = sb.toString();
+                }
+            }
+
+            return value;
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder buf = new StringBuilder();
+            if (isDefault) {
+                buf.append("DEFAULT");
+            } else {
+                buf.append("RULE:");
+                if (pattern != null) {
+                    buf.append(pattern);
+                }
+                if (replacement != null) {
+                    buf.append("/");
+                    buf.append(replacement);
+                }
+                if (toLowerCase) {
+                    buf.append("/L");
+                } else if (toUpperCase) {
+                    buf.append("/U");
+                }
+            }
+            return buf.toString();
+        }
+
+    }
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/network/ChannelBuildersTest.java b/clients/src/test/java/org/apache/kafka/common/network/ChannelBuildersTest.java
index 27daf0f..630cba1 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/ChannelBuildersTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/ChannelBuildersTest.java
@@ -45,7 +45,7 @@ public class ChannelBuildersTest {
 
         Map<String, Object> configs = new HashMap<>();
         configs.put(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, OldPrincipalBuilder.class);
-        KafkaPrincipalBuilder builder = ChannelBuilders.createPrincipalBuilder(configs, transportLayer, authenticator, null);
+        KafkaPrincipalBuilder builder = ChannelBuilders.createPrincipalBuilder(configs, transportLayer, authenticator, null, null);
 
         // test old principal builder is properly configured and delegated to
         assertTrue(OldPrincipalBuilder.configured);
@@ -60,7 +60,7 @@ public class ChannelBuildersTest {
     public void testCreateConfigurableKafkaPrincipalBuilder() {
         Map<String, Object> configs = new HashMap<>();
         configs.put(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, ConfigurableKafkaPrincipalBuilder.class);
-        KafkaPrincipalBuilder builder = ChannelBuilders.createPrincipalBuilder(configs, null, null, null);
+        KafkaPrincipalBuilder builder = ChannelBuilders.createPrincipalBuilder(configs, null, null, null, null);
         assertTrue(builder instanceof ConfigurableKafkaPrincipalBuilder);
         assertTrue(((ConfigurableKafkaPrincipalBuilder) builder).configured);
     }
diff --git a/clients/src/test/java/org/apache/kafka/common/security/auth/DefaultKafkaPrincipalBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/security/auth/DefaultKafkaPrincipalBuilderTest.java
index a05a850..dd5087a 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/auth/DefaultKafkaPrincipalBuilderTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/auth/DefaultKafkaPrincipalBuilderTest.java
@@ -16,23 +16,28 @@
  */
 package org.apache.kafka.common.security.auth;
 
+import javax.security.auth.x500.X500Principal;
 import org.apache.kafka.common.config.SaslConfigs;
 import org.apache.kafka.common.network.Authenticator;
 import org.apache.kafka.common.network.TransportLayer;
 import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder;
 import org.apache.kafka.common.security.kerberos.KerberosShortNamer;
 import org.apache.kafka.common.security.scram.internals.ScramMechanism;
+import org.apache.kafka.common.security.ssl.SslPrincipalMapper;
 import org.junit.Test;
 
 import javax.net.ssl.SSLSession;
 import javax.security.sasl.SaslServer;
 import java.net.InetAddress;
 import java.security.Principal;
+import java.util.Arrays;
+import java.util.List;
 
 import static org.junit.Assert.assertEquals;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -63,7 +68,7 @@ public class DefaultKafkaPrincipalBuilderTest {
 
     @Test
     public void testReturnAnonymousPrincipalForPlaintext() throws Exception {
-        try (DefaultKafkaPrincipalBuilder builder = new DefaultKafkaPrincipalBuilder(null)) {
+        try (DefaultKafkaPrincipalBuilder builder = new DefaultKafkaPrincipalBuilder(null, null)) {
             assertEquals(KafkaPrincipal.ANONYMOUS, builder.build(
                     new PlaintextAuthenticationContext(InetAddress.getLocalHost(), SecurityProtocol.PLAINTEXT.name())));
         }
@@ -100,7 +105,7 @@ public class DefaultKafkaPrincipalBuilderTest {
 
         when(session.getPeerPrincipal()).thenReturn(new DummyPrincipal("foo"));
 
-        DefaultKafkaPrincipalBuilder builder = new DefaultKafkaPrincipalBuilder(null);
+        DefaultKafkaPrincipalBuilder builder = new DefaultKafkaPrincipalBuilder(null, null);
 
         KafkaPrincipal principal = builder.build(
                 new SslAuthenticationContext(session, InetAddress.getLocalHost(), SecurityProtocol.PLAINTEXT.name()));
@@ -113,13 +118,67 @@ public class DefaultKafkaPrincipalBuilderTest {
     }
 
     @Test
+    public void testPrincipalIfSSLPeerIsNotAuthenticated() throws Exception {
+        SSLSession session = mock(SSLSession.class);
+
+        when(session.getPeerPrincipal()).thenReturn(KafkaPrincipal.ANONYMOUS);
+
+        DefaultKafkaPrincipalBuilder builder = new DefaultKafkaPrincipalBuilder(null, null);
+
+        KafkaPrincipal principal = builder.build(
+                new SslAuthenticationContext(session, InetAddress.getLocalHost(), SecurityProtocol.PLAINTEXT.name()));
+        assertEquals(KafkaPrincipal.ANONYMOUS, principal);
+
+        builder.close();
+        verify(session, atLeastOnce()).getPeerPrincipal();
+    }
+
+
+    @Test
+    public void testPrincipalWithSslPrincipalMapper() throws Exception {
+        SSLSession session = mock(SSLSession.class);
+
+        when(session.getPeerPrincipal()).thenReturn(new X500Principal("CN=Duke, OU=ServiceUsers, O=Org, C=US"))
+                                        .thenReturn(new X500Principal("CN=Duke, OU=SME, O=mycp, L=Fulton, ST=MD, C=US"))
+                                        .thenReturn(new X500Principal("CN=duke, OU=JavaSoft, O=Sun Microsystems"))
+                                        .thenReturn(new X500Principal("OU=JavaSoft, O=Sun Microsystems, C=US"));
+
+        List<String> rules = Arrays.asList(
+            "RULE:^CN=(.*),OU=ServiceUsers.*$/$1/L",
+            "RULE:^CN=(.*),OU=(.*),O=(.*),L=(.*),ST=(.*),C=(.*)$/$1@$2/L",
+            "RULE:^.*[Cc][Nn]=([a-zA-Z0-9.]*).*$/$1/U",
+            "DEFAULT"
+        );
+
+        SslPrincipalMapper mapper = SslPrincipalMapper.fromRules(rules);
+        DefaultKafkaPrincipalBuilder builder = new DefaultKafkaPrincipalBuilder(null, mapper);
+
+        SslAuthenticationContext sslContext = new SslAuthenticationContext(session, InetAddress.getLocalHost(), SecurityProtocol.PLAINTEXT.name());
+
+        KafkaPrincipal principal = builder.build(sslContext);
+        assertEquals("duke", principal.getName());
+
+        principal = builder.build(sslContext);
+        assertEquals("duke@sme", principal.getName());
+
+        principal = builder.build(sslContext);
+        assertEquals("DUKE", principal.getName());
+
+        principal = builder.build(sslContext);
+        assertEquals("OU=JavaSoft,O=Sun Microsystems,C=US", principal.getName());
+
+        builder.close();
+        verify(session, times(4)).getPeerPrincipal();
+    }
+
+    @Test
     public void testPrincipalBuilderScram() throws Exception {
         SaslServer server = mock(SaslServer.class);
 
         when(server.getMechanismName()).thenReturn(ScramMechanism.SCRAM_SHA_256.mechanismName());
         when(server.getAuthorizationID()).thenReturn("foo");
 
-        DefaultKafkaPrincipalBuilder builder = new DefaultKafkaPrincipalBuilder(null);
+        DefaultKafkaPrincipalBuilder builder = new DefaultKafkaPrincipalBuilder(null, null);
 
         KafkaPrincipal principal = builder.build(new SaslAuthenticationContext(server,
                 SecurityProtocol.SASL_PLAINTEXT, InetAddress.getLocalHost(), SecurityProtocol.SASL_PLAINTEXT.name()));
@@ -141,7 +200,7 @@ public class DefaultKafkaPrincipalBuilderTest {
         when(server.getAuthorizationID()).thenReturn("foo/host@REALM.COM");
         when(kerberosShortNamer.shortName(any())).thenReturn("foo");
 
-        DefaultKafkaPrincipalBuilder builder = new DefaultKafkaPrincipalBuilder(kerberosShortNamer);
+        DefaultKafkaPrincipalBuilder builder = new DefaultKafkaPrincipalBuilder(kerberosShortNamer, null);
 
         KafkaPrincipal principal = builder.build(new SaslAuthenticationContext(server,
                 SecurityProtocol.SASL_PLAINTEXT, InetAddress.getLocalHost(), SecurityProtocol.SASL_PLAINTEXT.name()));
diff --git a/clients/src/test/java/org/apache/kafka/common/security/ssl/SslPrincipalMapperTest.java b/clients/src/test/java/org/apache/kafka/common/security/ssl/SslPrincipalMapperTest.java
new file mode 100644
index 0000000..c647fd0
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/security/ssl/SslPrincipalMapperTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.ssl;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class SslPrincipalMapperTest {
+
+    @Test
+    public void testValidRules() {
+        testValidRule(Arrays.asList("DEFAULT"));
+        testValidRule(Arrays.asList("RULE:^CN=(.*?),OU=ServiceUsers.*$/$1/"));
+        testValidRule(Arrays.asList("RULE:^CN=(.*?),OU=ServiceUsers.*$/$1/L", "DEFAULT"));
+        testValidRule(Arrays.asList("RULE:^CN=(.*?),OU=(.*?),O=(.*?),L=(.*?),ST=(.*?),C=(.*?)$/$1@$2/"));
+        testValidRule(Arrays.asList("RULE:^.*[Cc][Nn]=([a-zA-Z0-9.]*).*$/$1/L"));
+        testValidRule(Arrays.asList("RULE:^cn=(.?),ou=(.?),dc=(.?),dc=(.?)$/$1@$2/U"));
+    }
+
+    private void testValidRule(List<String> rules) {
+        SslPrincipalMapper.fromRules(rules);
+    }
+
+    @Test
+    public void testInvalidRules() {
+        testInvalidRule(Arrays.asList("default"));
+        testInvalidRule(Arrays.asList("DEFAUL"));
+        testInvalidRule(Arrays.asList("DEFAULT/L"));
+        testInvalidRule(Arrays.asList("DEFAULT/U"));
+
+        testInvalidRule(Arrays.asList("RULE:CN=(.*?),OU=ServiceUsers.*/$1"));
+        testInvalidRule(Arrays.asList("rule:^CN=(.*?),OU=ServiceUsers.*$/$1/"));
+        testInvalidRule(Arrays.asList("RULE:^CN=(.*?),OU=ServiceUsers.*$/$1/L/U"));
+        testInvalidRule(Arrays.asList("RULE:^CN=(.*?),OU=ServiceUsers.*$/L"));
+        testInvalidRule(Arrays.asList("RULE:^CN=(.*?),OU=ServiceUsers.*$/U"));
+        testInvalidRule(Arrays.asList("RULE:^CN=(.*?),OU=ServiceUsers.*$/LU"));
+    }
+
+    private void testInvalidRule(List<String> rules) {
+        try {
+            System.out.println(SslPrincipalMapper.fromRules(rules));
+            fail("should have thrown IllegalArgumentException");
+        } catch (IllegalArgumentException e) {
+        }
+    }
+
+    @Test
+    public void testSslPrincipalMapper() throws Exception {
+        List<String> rules = Arrays.asList(
+            "RULE:^CN=(.*?),OU=ServiceUsers.*$/$1/L",
+            "RULE:^CN=(.*?),OU=(.*?),O=(.*?),L=(.*?),ST=(.*?),C=(.*?)$/$1@$2/L",
+            "RULE:^cn=(.*?),ou=(.*?),dc=(.*?),dc=(.*?)$/$1@$2/U",
+            "RULE:^.*[Cc][Nn]=([a-zA-Z0-9.]*).*$/$1/U",
+            "DEFAULT"
+        );
+
+        SslPrincipalMapper mapper = SslPrincipalMapper.fromRules(rules);
+
+        assertEquals("duke", mapper.getName("CN=Duke,OU=ServiceUsers,O=Org,C=US"));
+        assertEquals("duke@sme", mapper.getName("CN=Duke,OU=SME,O=mycp,L=Fulton,ST=MD,C=US"));
+        assertEquals("DUKE@SME", mapper.getName("cn=duke,ou=sme,dc=mycp,dc=com"));
+        assertEquals("DUKE", mapper.getName("cN=duke,OU=JavaSoft,O=Sun Microsystems"));
+        assertEquals("OU=JavaSoft,O=Sun Microsystems,C=US", mapper.getName("OU=JavaSoft,O=Sun Microsystems,C=US"));
+    }
+
+}
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 9edda4e..9bf41a1 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -220,6 +220,7 @@ object Defaults {
   val SslClientAuthRequested = "requested"
   val SslClientAuthNone = "none"
   val SslClientAuth = SslClientAuthNone
+  val SslPrincipalMappingRules = BrokerSecurityConfigs.DEFAULT_SSL_PRINCIPAL_MAPPING_RULES
 
   /** ********* Sasl configuration ***********/
   val SaslMechanismInterBrokerProtocol = SaslConfigs.DEFAULT_SASL_MECHANISM
@@ -439,6 +440,7 @@ object KafkaConfig {
   val SslEndpointIdentificationAlgorithmProp = SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG
   val SslSecureRandomImplementationProp = SslConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_CONFIG
   val SslClientAuthProp = BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG
+  val SslPrincipalMappingRulesProp = BrokerSecurityConfigs.SSL_PRINCIPAL_MAPPING_RULES_CONFIG
 
   /** ********* SASL Configuration ****************/
   val SaslMechanismInterBrokerProtocolProp = "sasl.mechanism.inter.broker.protocol"
@@ -760,6 +762,7 @@ object KafkaConfig {
   val SslEndpointIdentificationAlgorithmDoc = SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC
   val SslSecureRandomImplementationDoc = SslConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_DOC
   val SslClientAuthDoc = BrokerSecurityConfigs.SSL_CLIENT_AUTH_DOC
+  val SslPrincipalMappingRulesDoc = BrokerSecurityConfigs.SSL_PRINCIPAL_MAPPING_RULES_DOC
 
   /** ********* Sasl Configuration ****************/
   val SaslMechanismInterBrokerProtocolDoc = "SASL mechanism used for inter-broker communication. Default is GSSAPI."
@@ -998,6 +1001,7 @@ object KafkaConfig {
       .define(SslSecureRandomImplementationProp, STRING, null, LOW, SslSecureRandomImplementationDoc)
       .define(SslClientAuthProp, STRING, Defaults.SslClientAuth, in(Defaults.SslClientAuthRequired, Defaults.SslClientAuthRequested, Defaults.SslClientAuthNone), MEDIUM, SslClientAuthDoc)
       .define(SslCipherSuitesProp, LIST, Collections.emptyList(), MEDIUM, SslCipherSuitesDoc)
+      .define(SslPrincipalMappingRulesProp, LIST, Defaults.SslPrincipalMappingRules, LOW, SslPrincipalMappingRulesDoc)
 
       /** ********* Sasl Configuration ****************/
       .define(SaslMechanismInterBrokerProtocolProp, STRING, Defaults.SaslMechanismInterBrokerProtocol, MEDIUM, SaslMechanismInterBrokerProtocolDoc)
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index b75c3e7..ba8e54b 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -691,6 +691,7 @@ class KafkaConfigTest {
         case KafkaConfig.SslEndpointIdentificationAlgorithmProp => // ignore string
         case KafkaConfig.SslSecureRandomImplementationProp => // ignore string
         case KafkaConfig.SslCipherSuitesProp => // ignore string
+        case KafkaConfig.SslPrincipalMappingRulesProp => // ignore string
 
         //Sasl Configs
         case KafkaConfig.SaslMechanismInterBrokerProtocolProp => // ignore
diff --git a/docs/security.html b/docs/security.html
index 6ff9eba..5f6d0ac 100644
--- a/docs/security.html
+++ b/docs/security.html
@@ -1020,8 +1020,37 @@
     <pre>allow.everyone.if.no.acl.found=true</pre>
     One can also add super users in server.properties like the following (note that the delimiter is semicolon since SSL user names may contain comma). Default PrincipalType string "User" is case sensitive.
     <pre>super.users=User:Bob;User:Alice</pre>
-    By default, the SSL user name will be of the form "CN=writeuser,OU=Unknown,O=Unknown,L=Unknown,ST=Unknown,C=Unknown". One can change that by setting a customized PrincipalBuilder in server.properties like the following.
+
+    <h5><a id="security_authz_ssl" href="#security_authz_ssl">Customizing SSL User Name</a></h5>
+
+    By default, the SSL user name will be of the form "CN=writeuser,OU=Unknown,O=Unknown,L=Unknown,ST=Unknown,C=Unknown". One can change that by setting <code>ssl.principal.mapping.rules</code> to a customized rule in server.properties.
+    This config allows a list of rules for mapping X.500 distinguished name to short name. The rules are evaluated in order and the first rule that matches a distinguished name is used to map it to a short name. Any later rules in the list are ignored.
+
+    <br>The format of <code>ssl.principal.mapping.rules</code> is a list where each rule starts with "RULE:" and contains an expression as the following formats. Default rule will return
+    string representation of the X.500 certificate distinguished name. If the distinguished name matches the pattern, then the replacement command will be run over the name.
+    This also supports lowercase/uppercase options, to force the translated result to be all lower/uppercase case. This is done by adding a "/L" or "/U' to the end of the rule.
+
+    <pre>
+        RULE:pattern/replacement/
+        RULE:pattern/replacement/[LU]
+    </pre>
+
+    Example <code>ssl.principal.mapping.rules</code> values are:
+    <pre>
+        RULE:^CN=(.*?),OU=ServiceUsers.*$/$1/,
+        RULE:^CN=(.*?),OU=(.*?),O=(.*?),L=(.*?),ST=(.*?),C=(.*?)$/$1@$2/L,
+        RULE:^.*[Cc][Nn]=([a-zA-Z0-9.]*).*$/$1/L,
+        DEFAULT
+    </pre>
+
+    Above rules translate distinguished name "CN=serviceuser,OU=ServiceUsers,O=Unknown,L=Unknown,ST=Unknown,C=Unknown" to "serviceuser"
+    and "CN=adminUser,OU=Admin,O=Unknown,L=Unknown,ST=Unknown,C=Unknown" to "adminuser@admin".
+
+    <br>For advanced use cases, one can customize the name by setting a customized PrincipalBuilder in server.properties like the following.
     <pre>principal.builder.class=CustomizedPrincipalBuilderClass</pre>
+
+    <h5><a id="security_authz_sasl" href="#security_authz_sasl">Customizing SASL User Name</a></h5>
+
     By default, the SASL user name will be the primary part of the Kerberos principal. One can change that by setting <code>sasl.kerberos.principal.to.local.rules</code> to a customized rule in server.properties.
     The format of <code>sasl.kerberos.principal.to.local.rules</code> is a list where each rule works in the same way as the auth_to_local in <a href="http://web.mit.edu/Kerberos/krb5-latest/doc/admin/conf_files/krb5_conf.html">Kerberos configuration file (krb5.conf)</a>. This also support additional lowercase rule, to force the translated result to be all lower case. This is done by adding a "/L" to the end of the rule. check below formats for syntax.
     Each rules starts with RULE: and contains an expression as the following formats. See the kerberos documentation for more details.