You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/10/30 16:53:24 UTC

kafka git commit: KAFKA-2711; SaslClientAuthenticator no longer needs KerberosNameParser in constructor

Repository: kafka
Updated Branches:
  refs/heads/trunk 622730905 -> d9ae33d4c


KAFKA-2711; SaslClientAuthenticator no longer needs KerberosNameParser in constructor

Also refactor `KerberosNameParser` and `KerberosName` to make the code
clearer and easier to use when `shortName` is not needed.

Author: Ismael Juma <is...@juma.me.uk>

Reviewers: Jun Rao <ju...@gmail.com>

Closes #390 from ijuma/kafka-2711


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d9ae33d4
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d9ae33d4
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d9ae33d4

Branch: refs/heads/trunk
Commit: d9ae33d4c0473ef53a9ea560536467333136c0a0
Parents: 6227309
Author: Ismael Juma <is...@juma.me.uk>
Authored: Fri Oct 30 08:53:16 2015 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Fri Oct 30 08:53:16 2015 -0700

----------------------------------------------------------------------
 .../common/network/SaslChannelBuilder.java      |  13 ++-
 .../authenticator/SaslClientAuthenticator.java  |   7 +-
 .../authenticator/SaslServerAuthenticator.java  |  12 +--
 .../SaslServerCallbackHandler.java              |  12 +--
 .../security/kerberos/BadFormatString.java      |  30 ++++++
 .../common/security/kerberos/KerberosName.java  |  55 +++++-----
 .../security/kerberos/KerberosNameParser.java   | 105 -------------------
 .../common/security/kerberos/KerberosRule.java  |   8 +-
 .../security/kerberos/KerberosShortNamer.java   | 101 ++++++++++++++++++
 .../security/kerberos/KerberosNameTest.java     |  14 +--
 10 files changed, 188 insertions(+), 169 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d9ae33d4/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
index d50055a..75e3fca 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
@@ -21,7 +21,7 @@ import java.util.Map;
 import org.apache.kafka.common.config.SaslConfigs;
 import org.apache.kafka.common.security.JaasUtils;
 import org.apache.kafka.common.security.auth.PrincipalBuilder;
-import org.apache.kafka.common.security.kerberos.KerberosNameParser;
+import org.apache.kafka.common.security.kerberos.KerberosShortNamer;
 import org.apache.kafka.common.security.kerberos.LoginManager;
 import org.apache.kafka.common.security.authenticator.SaslClientAuthenticator;
 import org.apache.kafka.common.security.authenticator.SaslServerAuthenticator;
@@ -45,7 +45,7 @@ public class SaslChannelBuilder implements ChannelBuilder {
     private PrincipalBuilder principalBuilder;
     private SslFactory sslFactory;
     private Map<String, ?> configs;
-    private KerberosNameParser kerberosNameParser;
+    private KerberosShortNamer kerberosShortNamer;
 
     public SaslChannelBuilder(Mode mode, LoginType loginType, SecurityProtocol securityProtocol) {
         this.mode = mode;
@@ -66,7 +66,10 @@ public class SaslChannelBuilder implements ChannelBuilder {
             } catch (Exception ke) {
                 defaultRealm = "";
             }
-            kerberosNameParser = new KerberosNameParser(defaultRealm, (List<String>) configs.get(SaslConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES));
+
+            List<String> principalToLocalRules = (List<String>) configs.get(SaslConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES);
+            if (principalToLocalRules != null)
+                kerberosShortNamer = KerberosShortNamer.fromUnparsedRules(defaultRealm, principalToLocalRules);
 
             if (this.securityProtocol == SecurityProtocol.SASL_SSL) {
                 this.sslFactory = new SslFactory(mode);
@@ -83,10 +86,10 @@ public class SaslChannelBuilder implements ChannelBuilder {
             TransportLayer transportLayer = buildTransportLayer(id, key, socketChannel);
             Authenticator authenticator;
             if (mode == Mode.SERVER)
-                authenticator = new SaslServerAuthenticator(id, loginManager.subject(), kerberosNameParser);
+                authenticator = new SaslServerAuthenticator(id, loginManager.subject(), kerberosShortNamer);
             else
                 authenticator = new SaslClientAuthenticator(id, loginManager.subject(), loginManager.serviceName(),
-                        socketChannel.socket().getInetAddress().getHostName(), kerberosNameParser);
+                        socketChannel.socket().getInetAddress().getHostName());
             authenticator.configure(transportLayer, this.principalBuilder, this.configs);
             return new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize);
         } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d9ae33d4/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
index 654be14..370e729 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
@@ -48,7 +48,6 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal;
 import org.apache.kafka.common.security.auth.PrincipalBuilder;
 import org.apache.kafka.common.KafkaException;
 
-import org.apache.kafka.common.security.kerberos.KerberosNameParser;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -64,7 +63,6 @@ public class SaslClientAuthenticator implements Authenticator {
     private final String servicePrincipal;
     private final String host;
     private final String node;
-    private final KerberosNameParser kerberosNameParser;
 
     // assigned in `configure`
     private SaslClient saslClient;
@@ -77,12 +75,11 @@ public class SaslClientAuthenticator implements Authenticator {
 
     private SaslState saslState = SaslState.INITIAL;
 
-    public SaslClientAuthenticator(String node, Subject subject, String servicePrincipal, String host, KerberosNameParser kerberosNameParser) throws IOException {
+    public SaslClientAuthenticator(String node, Subject subject, String servicePrincipal, String host) throws IOException {
         this.node = node;
         this.subject = subject;
         this.host = host;
         this.servicePrincipal = servicePrincipal;
-        this.kerberosNameParser = kerberosNameParser;
     }
 
     public void configure(TransportLayer transportLayer, PrincipalBuilder principalBuilder, Map<String, ?> configs) throws KafkaException {
@@ -91,7 +88,7 @@ public class SaslClientAuthenticator implements Authenticator {
 
             // determine client principal from subject.
             Principal clientPrincipal = subject.getPrincipals().iterator().next();
-            this.clientPrincipalName = kerberosNameParser.parse(clientPrincipal.getName()).toString();
+            this.clientPrincipalName = clientPrincipal.getName();
             this.saslClient = createSaslClient();
         } catch (Exception e) {
             throw new KafkaException("Failed to configure SaslClientAuthenticator", e);

http://git-wip-us.apache.org/repos/asf/kafka/blob/d9ae33d4/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
index 4104668..b4d99d2 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
@@ -35,7 +35,7 @@ import javax.security.sasl.SaslException;
 
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.security.kerberos.KerberosName;
-import org.apache.kafka.common.security.kerberos.KerberosNameParser;
+import org.apache.kafka.common.security.kerberos.KerberosShortNamer;
 import org.ietf.jgss.GSSContext;
 import org.ietf.jgss.GSSCredential;
 import org.ietf.jgss.GSSException;
@@ -59,7 +59,7 @@ public class SaslServerAuthenticator implements Authenticator {
     private final SaslServer saslServer;
     private final Subject subject;
     private final String node;
-    private final KerberosNameParser kerberosNameParser;
+    private final KerberosShortNamer kerberosNamer;
 
     // assigned in `configure`
     private TransportLayer transportLayer;
@@ -68,14 +68,14 @@ public class SaslServerAuthenticator implements Authenticator {
     private NetworkReceive netInBuffer;
     private NetworkSend netOutBuffer;
 
-    public SaslServerAuthenticator(String node, final Subject subject, KerberosNameParser kerberosNameParser) throws IOException {
+    public SaslServerAuthenticator(String node, final Subject subject, KerberosShortNamer kerberosNameParser) throws IOException {
         if (subject == null)
             throw new IllegalArgumentException("subject cannot be null");
         if (subject.getPrincipals().isEmpty())
             throw new IllegalArgumentException("subject must have at least one principal");
         this.node = node;
         this.subject = subject;
-        this.kerberosNameParser = kerberosNameParser;
+        this.kerberosNamer = kerberosNameParser;
         saslServer = createSaslServer();
     }
 
@@ -86,11 +86,11 @@ public class SaslServerAuthenticator implements Authenticator {
     private SaslServer createSaslServer() throws IOException {
         // server is using a JAAS-authenticated subject: determine service principal name and hostname from kafka server's subject.
         final SaslServerCallbackHandler saslServerCallbackHandler = new SaslServerCallbackHandler(
-                Configuration.getConfiguration(), kerberosNameParser);
+                Configuration.getConfiguration(), kerberosNamer);
         final Principal servicePrincipal = subject.getPrincipals().iterator().next();
         KerberosName kerberosName;
         try {
-            kerberosName = kerberosNameParser.parse(servicePrincipal.getName());
+            kerberosName = KerberosName.parse(servicePrincipal.getName());
         } catch (IllegalArgumentException e) {
             throw new KafkaException("Principal has name with unexpected format " + servicePrincipal);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d9ae33d4/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerCallbackHandler.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerCallbackHandler.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerCallbackHandler.java
index 8474faf..1de4a2e 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerCallbackHandler.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerCallbackHandler.java
@@ -20,7 +20,7 @@ package org.apache.kafka.common.security.authenticator;
 
 import java.io.IOException;
 
-import org.apache.kafka.common.security.kerberos.KerberosNameParser;
+import org.apache.kafka.common.security.kerberos.KerberosShortNamer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import javax.security.auth.callback.Callback;
@@ -36,13 +36,13 @@ import org.apache.kafka.common.security.JaasUtils;
 
 public class SaslServerCallbackHandler implements CallbackHandler {
     private static final Logger LOG = LoggerFactory.getLogger(SaslServerCallbackHandler.class);
-    private final KerberosNameParser kerberosNameParser;
+    private final KerberosShortNamer kerberosShortNamer;
 
-    public SaslServerCallbackHandler(Configuration configuration, KerberosNameParser kerberosNameParser) throws IOException {
+    public SaslServerCallbackHandler(Configuration configuration, KerberosShortNamer kerberosNameParser) throws IOException {
         AppConfigurationEntry[] configurationEntries = configuration.getAppConfigurationEntry(JaasUtils.LOGIN_CONTEXT_SERVER);
         if (configurationEntries == null)
             throw new IOException("Could not find a 'KafkaServer' entry in this configuration: Kafka Server cannot start.");
-        this.kerberosNameParser = kerberosNameParser;
+        this.kerberosShortNamer = kerberosNameParser;
     }
 
     public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
@@ -68,9 +68,9 @@ public class SaslServerCallbackHandler implements CallbackHandler {
                 authorizationID);
         ac.setAuthorized(true);
 
-        KerberosName kerberosName = kerberosNameParser.parse(authenticationID);
+        KerberosName kerberosName = KerberosName.parse(authenticationID);
         try {
-            String userName = kerberosName.shortName();
+            String userName = kerberosShortNamer.shortName(kerberosName);
             LOG.info("Setting authorizedID: {}", userName);
             ac.setAuthorizedID(userName);
         } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d9ae33d4/clients/src/main/java/org/apache/kafka/common/security/kerberos/BadFormatString.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/kerberos/BadFormatString.java b/clients/src/main/java/org/apache/kafka/common/security/kerberos/BadFormatString.java
new file mode 100644
index 0000000..4d49a83
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/kerberos/BadFormatString.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.kerberos;
+
+import java.io.IOException;
+
+public class BadFormatString extends IOException {
+    BadFormatString(String msg) {
+        super(msg);
+    }
+    BadFormatString(String msg, Throwable err) {
+        super(msg, err);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d9ae33d4/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosName.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosName.java b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosName.java
index 46f0edf..4f8e097 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosName.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosName.java
@@ -18,11 +18,16 @@
 
 package org.apache.kafka.common.security.kerberos;
 
-import java.io.IOException;
-import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 public class KerberosName {
 
+    /**
+     * A pattern that matches a Kerberos name with at most 3 components.
+     */
+    private static final Pattern NAME_PARSER = Pattern.compile("([^/@]*)(/([^/@]*))?@([^/@]*)");
+
     /** The first component of the name */
     private final String serviceName;
     /** The second component of the name. It may be null. */
@@ -30,19 +35,31 @@ public class KerberosName {
     /** The realm of the name. */
     private final String realm;
 
-    /* Rules for the translation of the principal name into an operating system name */
-    private final List<KerberosRule> principalToLocalRules;
-
     /**
      * Creates an instance of `KerberosName` with the provided parameters.
      */
-    public KerberosName(String serviceName, String hostName, String realm, List<KerberosRule> principalToLocalRules) {
+    public KerberosName(String serviceName, String hostName, String realm) {
         if (serviceName == null)
             throw new IllegalArgumentException("serviceName must not be null");
         this.serviceName = serviceName;
         this.hostName = hostName;
         this.realm = realm;
-        this.principalToLocalRules = principalToLocalRules;
+    }
+
+    /**
+     * Create a name from the full Kerberos principal name.
+     */
+    public static KerberosName parse(String principalName) {
+        Matcher match = NAME_PARSER.matcher(principalName);
+        if (!match.matches()) {
+            if (principalName.contains("@")) {
+                throw new IllegalArgumentException("Malformed Kerberos name: " + principalName);
+            } else {
+                return new KerberosName(principalName, null, null);
+            }
+        } else {
+            return new KerberosName(match.group(1), match.group(3), match.group(4));
+        }
     }
 
     /**
@@ -87,28 +104,4 @@ public class KerberosName {
         return realm;
     }
 
-    /**
-     * Get the translation of the principal name into an operating system
-     * user name.
-     * @return the short name
-     * @throws IOException
-     */
-    public String shortName() throws IOException {
-        String[] params;
-        if (hostName == null) {
-            // if it is already simple, just return it
-            if (realm == null)
-                return serviceName;
-            params = new String[]{realm, serviceName};
-        } else {
-            params = new String[]{realm, serviceName, hostName};
-        }
-        for (KerberosRule r : principalToLocalRules) {
-            String result = r.apply(params);
-            if (result != null)
-                return result;
-        }
-        throw new NoMatchingRule("No rules applied to " + toString());
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d9ae33d4/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosNameParser.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosNameParser.java b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosNameParser.java
deleted file mode 100644
index eb4e6f1..0000000
--- a/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosNameParser.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.common.security.kerberos;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-/**
- * This class implements parsing and handling of Kerberos principal names. In
- * particular, it splits them apart and translates them down into local
- * operating system names.
- */
-public class KerberosNameParser {
-
-    /**
-     * A pattern that matches a Kerberos name with at most 3 components.
-     */
-    private static final Pattern NAME_PARSER = Pattern.compile("([^/@]*)(/([^/@]*))?@([^/@]*)");
-
-    /**
-     * A pattern for parsing a auth_to_local rule.
-     */
-    private static final Pattern RULE_PARSER = Pattern.compile("((DEFAULT)|(RULE:\\[(\\d*):([^\\]]*)](\\(([^)]*)\\))?(s/([^/]*)/([^/]*)/(g)?)?))");
-
-    /**
-     * The list of translation rules.
-     */
-    private final List<KerberosRule> principalToLocalRules;
-
-    public KerberosNameParser(String defaultRealm, List<String> principalToLocalRules) {
-        List<String> rules = principalToLocalRules == null ? Collections.singletonList("DEFAULT") : principalToLocalRules;
-        this.principalToLocalRules = parseRules(defaultRealm, rules);
-    }
-
-    /**
-     * Create a name from the full Kerberos principal name.
-     */
-    public KerberosName parse(String principalName) {
-        Matcher match = NAME_PARSER.matcher(principalName);
-        if (!match.matches()) {
-            if (principalName.contains("@")) {
-                throw new IllegalArgumentException("Malformed Kerberos name: " + principalName);
-            } else {
-                return new KerberosName(principalName, null, null, principalToLocalRules);
-            }
-        } else {
-            return new KerberosName(match.group(1), match.group(3), match.group(4), principalToLocalRules);
-        }
-    }
-
-    private static List<KerberosRule> parseRules(String defaultRealm, List<String> rules) {
-        List<KerberosRule> result = new ArrayList<>();
-        for (String rule : rules) {
-            Matcher matcher = RULE_PARSER.matcher(rule);
-            if (!matcher.lookingAt()) {
-                throw new IllegalArgumentException("Invalid rule: " + rule);
-            }
-            if (rule.length() != matcher.end())
-                throw new IllegalArgumentException("Invalid rule: `" + rule + "`, unmatched substring: `" + rule.substring(matcher.end()) + "`");
-            if (matcher.group(2) != null) {
-                result.add(new KerberosRule(defaultRealm));
-            } else {
-                result.add(new KerberosRule(defaultRealm,
-                        Integer.parseInt(matcher.group(4)),
-                        matcher.group(5),
-                        matcher.group(7),
-                        matcher.group(9),
-                        matcher.group(10),
-                        "g".equals(matcher.group(11))));
-
-            }
-        }
-        return result;
-    }
-
-    public static class BadFormatString extends IOException {
-        BadFormatString(String msg) {
-            super(msg);
-        }
-        BadFormatString(String msg, Throwable err) {
-            super(msg, err);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d9ae33d4/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosRule.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosRule.java b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosRule.java
index c1789db..b366f1b 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosRule.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosRule.java
@@ -108,10 +108,10 @@ class KerberosRule {
      * @param format the string to replace parameters again
      * @param params the list of parameters
      * @return the generated string with the parameter references replaced.
-     * @throws KerberosNameParser.BadFormatString
+     * @throws BadFormatString
      */
     static String replaceParameters(String format,
-                                    String[] params) throws KerberosNameParser.BadFormatString {
+                                    String[] params) throws BadFormatString {
         Matcher match = PARAMETER_PATTERN.matcher(format);
         int start = 0;
         StringBuilder result = new StringBuilder();
@@ -122,13 +122,13 @@ class KerberosRule {
                 try {
                     int num = Integer.parseInt(paramNum);
                     if (num < 0 || num > params.length) {
-                        throw new KerberosNameParser.BadFormatString("index " + num + " from " + format +
+                        throw new BadFormatString("index " + num + " from " + format +
                                 " is outside of the valid range 0 to " +
                                 (params.length - 1));
                     }
                     result.append(params[num]);
                 } catch (NumberFormatException nfe) {
-                    throw new KerberosNameParser.BadFormatString("bad format in username mapping in " +
+                    throw new BadFormatString("bad format in username mapping in " +
                             paramNum, nfe);
                 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d9ae33d4/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosShortNamer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosShortNamer.java b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosShortNamer.java
new file mode 100644
index 0000000..0b8759b
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosShortNamer.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.kerberos;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * This class implements parsing and handling of Kerberos principal names. In
+ * particular, it splits them apart and translates them down into local
+ * operating system names.
+ */
+public class KerberosShortNamer {
+
+    /**
+     * A pattern for parsing a auth_to_local rule.
+     */
+    private static final Pattern RULE_PARSER = Pattern.compile("((DEFAULT)|(RULE:\\[(\\d*):([^\\]]*)](\\(([^)]*)\\))?(s/([^/]*)/([^/]*)/(g)?)?))");
+
+    /* Rules for the translation of the principal name into an operating system name */
+    private final List<KerberosRule> principalToLocalRules;
+
+    public KerberosShortNamer(List<KerberosRule> principalToLocalRules) {
+        this.principalToLocalRules = principalToLocalRules;
+    }
+
+    public static KerberosShortNamer fromUnparsedRules(String defaultRealm, List<String> principalToLocalRules) {
+        List<String> rules = principalToLocalRules == null ? Collections.singletonList("DEFAULT") : principalToLocalRules;
+        return new KerberosShortNamer(parseRules(defaultRealm, rules));
+    }
+
+    private static List<KerberosRule> parseRules(String defaultRealm, List<String> rules) {
+        List<KerberosRule> result = new ArrayList<>();
+        for (String rule : rules) {
+            Matcher matcher = RULE_PARSER.matcher(rule);
+            if (!matcher.lookingAt()) {
+                throw new IllegalArgumentException("Invalid rule: " + rule);
+            }
+            if (rule.length() != matcher.end())
+                throw new IllegalArgumentException("Invalid rule: `" + rule + "`, unmatched substring: `" + rule.substring(matcher.end()) + "`");
+            if (matcher.group(2) != null) {
+                result.add(new KerberosRule(defaultRealm));
+            } else {
+                result.add(new KerberosRule(defaultRealm,
+                        Integer.parseInt(matcher.group(4)),
+                        matcher.group(5),
+                        matcher.group(7),
+                        matcher.group(9),
+                        matcher.group(10),
+                        "g".equals(matcher.group(11))));
+
+            }
+        }
+        return result;
+    }
+
+    /**
+     * Get the translation of the principal name into an operating system
+     * user name.
+     * @return the short name
+     * @throws IOException
+     */
+    public String shortName(KerberosName kerberosName) throws IOException {
+        String[] params;
+        if (kerberosName.hostName() == null) {
+            // if it is already simple, just return it
+            if (kerberosName.realm() == null)
+                return kerberosName.serviceName();
+            params = new String[]{kerberosName.realm(), kerberosName.serviceName()};
+        } else {
+            params = new String[]{kerberosName.realm(), kerberosName.serviceName(), kerberosName.hostName()};
+        }
+        for (KerberosRule r : principalToLocalRules) {
+            String result = r.apply(params);
+            if (result != null)
+                return result;
+        }
+        throw new NoMatchingRule("No rules applied to " + toString());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d9ae33d4/clients/src/test/java/org/apache/kafka/common/security/kerberos/KerberosNameTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/security/kerberos/KerberosNameTest.java b/clients/src/test/java/org/apache/kafka/common/security/kerberos/KerberosNameTest.java
index 9781f6d..01e1a1a 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/kerberos/KerberosNameTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/kerberos/KerberosNameTest.java
@@ -36,24 +36,24 @@ public class KerberosNameTest {
             "RULE:[2:$1](App\\..*)s/App\\.(.*)/$1/g",
             "DEFAULT"
         ));
-        KerberosNameParser parser = new KerberosNameParser("REALM.COM", rules);
+        KerberosShortNamer shortNamer = KerberosShortNamer.fromUnparsedRules("REALM.COM", rules);
 
-        KerberosName name = parser.parse("App.service-name/example.com@REALM.COM");
+        KerberosName name = KerberosName.parse("App.service-name/example.com@REALM.COM");
         assertEquals("App.service-name", name.serviceName());
         assertEquals("example.com", name.hostName());
         assertEquals("REALM.COM", name.realm());
-        assertEquals("service-name", name.shortName());
+        assertEquals("service-name", shortNamer.shortName(name));
 
-        name = parser.parse("App.service-name@REALM.COM");
+        name = KerberosName.parse("App.service-name@REALM.COM");
         assertEquals("App.service-name", name.serviceName());
         assertNull(name.hostName());
         assertEquals("REALM.COM", name.realm());
-        assertEquals("service-name", name.shortName());
+        assertEquals("service-name", shortNamer.shortName(name));
 
-        name = parser.parse("user/host@REALM.COM");
+        name = KerberosName.parse("user/host@REALM.COM");
         assertEquals("user", name.serviceName());
         assertEquals("host", name.hostName());
         assertEquals("REALM.COM", name.realm());
-        assertEquals("user", name.shortName());
+        assertEquals("user", shortNamer.shortName(name));
     }
 }