You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/10/20 23:13:43 UTC

[3/4] kafka git commit: KAFKA-1686; Implement SASL/Kerberos

http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosRule.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosRule.java b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosRule.java
new file mode 100644
index 0000000..c1789db
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosRule.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.kerberos;
+
+import java.io.IOException;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * An encoding of a rule for translating kerberos names.
+ */
+class KerberosRule {
+
+    /**
+     * A pattern that matches a string without '$' and then a single
+     * parameter with $n.
+     */
+    private static final Pattern PARAMETER_PATTERN = Pattern.compile("([^$]*)(\\$(\\d*))?");
+
+    /**
+     * A pattern that recognizes simple/non-simple names.
+     */
+    private static final Pattern NON_SIMPLE_PATTERN = Pattern.compile("[/@]");
+
+    private final String defaultRealm;
+    private final boolean isDefault;
+    private final int numOfComponents;
+    private final String format;
+    private final Pattern match;
+    private final Pattern fromPattern;
+    private final String toPattern;
+    private final boolean repeat;
+
+    KerberosRule(String defaultRealm) {
+        this.defaultRealm = defaultRealm;
+        isDefault = true;
+        numOfComponents = 0;
+        format = null;
+        match = null;
+        fromPattern = null;
+        toPattern = null;
+        repeat = false;
+    }
+
+    KerberosRule(String defaultRealm, int numOfComponents, String format, String match, String fromPattern,
+                 String toPattern, boolean repeat) {
+        this.defaultRealm = defaultRealm;
+        isDefault = false;
+        this.numOfComponents = numOfComponents;
+        this.format = format;
+        this.match = match == null ? null : Pattern.compile(match);
+        this.fromPattern =
+                fromPattern == null ? null : Pattern.compile(fromPattern);
+        this.toPattern = toPattern;
+        this.repeat = repeat;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder buf = new StringBuilder();
+        if (isDefault) {
+            buf.append("DEFAULT");
+        } else {
+            buf.append("RULE:[");
+            buf.append(numOfComponents);
+            buf.append(':');
+            buf.append(format);
+            buf.append(']');
+            if (match != null) {
+                buf.append('(');
+                buf.append(match);
+                buf.append(')');
+            }
+            if (fromPattern != null) {
+                buf.append("s/");
+                buf.append(fromPattern);
+                buf.append('/');
+                buf.append(toPattern);
+                buf.append('/');
+                if (repeat) {
+                    buf.append('g');
+                }
+            }
+        }
+        return buf.toString();
+    }
+
+    /**
+     * Replace the numbered parameters of the form $n where n is from 1 to
+     * the length of params. Normal text is copied directly and $n is replaced
+     * by the corresponding parameter.
+     * @param format the string to replace parameters again
+     * @param params the list of parameters
+     * @return the generated string with the parameter references replaced.
+     * @throws KerberosNameParser.BadFormatString
+     */
+    static String replaceParameters(String format,
+                                    String[] params) throws KerberosNameParser.BadFormatString {
+        Matcher match = PARAMETER_PATTERN.matcher(format);
+        int start = 0;
+        StringBuilder result = new StringBuilder();
+        while (start < format.length() && match.find(start)) {
+            result.append(match.group(1));
+            String paramNum = match.group(3);
+            if (paramNum != null) {
+                try {
+                    int num = Integer.parseInt(paramNum);
+                    if (num < 0 || num > params.length) {
+                        throw new KerberosNameParser.BadFormatString("index " + num + " from " + format +
+                                " is outside of the valid range 0 to " +
+                                (params.length - 1));
+                    }
+                    result.append(params[num]);
+                } catch (NumberFormatException nfe) {
+                    throw new KerberosNameParser.BadFormatString("bad format in username mapping in " +
+                            paramNum, nfe);
+                }
+
+            }
+            start = match.end();
+        }
+        return result.toString();
+    }
+
+    /**
+     * Replace the matches of the from pattern in the base string with the value
+     * of the to string.
+     * @param base the string to transform
+     * @param from the pattern to look for in the base string
+     * @param to the string to replace matches of the pattern with
+     * @param repeat whether the substitution should be repeated
+     * @return
+     */
+    static String replaceSubstitution(String base, Pattern from, String to,
+                                      boolean repeat) {
+        Matcher match = from.matcher(base);
+        if (repeat) {
+            return match.replaceAll(to);
+        } else {
+            return match.replaceFirst(to);
+        }
+    }
+
+    /**
+     * Try to apply this rule to the given name represented as a parameter
+     * array.
+     * @param params first element is the realm, second and later elements are
+     *        are the components of the name "a/b@FOO" -> {"FOO", "a", "b"}
+     * @return the short name if this rule applies or null
+     * @throws IOException throws if something is wrong with the rules
+     */
+    String apply(String[] params) throws IOException {
+        String result = null;
+        if (isDefault) {
+            if (defaultRealm.equals(params[0])) {
+                result = params[1];
+            }
+        } else if (params.length - 1 == numOfComponents) {
+            String base = replaceParameters(format, params);
+            if (match == null || match.matcher(base).matches()) {
+                if (fromPattern == null) {
+                    result = base;
+                } else {
+                    result = replaceSubstitution(base, fromPattern, toPattern,  repeat);
+                }
+            }
+        }
+        if (result != null && NON_SIMPLE_PATTERN.matcher(result).find()) {
+            throw new NoMatchingRule("Non-simple name " + result + " after auth_to_local rule " + this);
+        }
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/clients/src/main/java/org/apache/kafka/common/security/kerberos/Login.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/kerberos/Login.java b/clients/src/main/java/org/apache/kafka/common/security/kerberos/Login.java
new file mode 100644
index 0000000..dd885e5
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/kerberos/Login.java
@@ -0,0 +1,389 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.kerberos;
+
+import javax.security.auth.kerberos.KerberosPrincipal;
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.Configuration;
+import javax.security.auth.login.LoginContext;
+import javax.security.auth.login.LoginException;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.kerberos.KerberosTicket;
+import javax.security.auth.Subject;
+
+import org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.ClientCallbackHandler;
+import org.apache.kafka.common.security.JaasUtils;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.utils.Shell;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.SystemTime;
+
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Date;
+import java.util.Random;
+import java.util.Set;
+import java.util.Map;
+
+/**
+ * This class is responsible for refreshing Kerberos credentials for
+ * logins for both Kafka client and server.
+ */
+public class Login {
+    private static final Logger log = LoggerFactory.getLogger(Login.class);
+
+    private static final Random RNG = new Random();
+
+    private final Thread t;
+    private final boolean isKrbTicket;
+    private final boolean isUsingTicketCache;
+
+    private final String loginContextName;
+    private final String principal;
+    private final Time time = new SystemTime();
+    private final CallbackHandler callbackHandler = new ClientCallbackHandler();
+
+    // LoginThread will sleep until 80% of time from last refresh to
+    // ticket's expiry has been reached, at which time it will wake
+    // and try to renew the ticket.
+    private final double ticketRenewWindowFactor;
+
+    /**
+     * Percentage of random jitter added to the renewal time
+     */
+    private final double ticketRenewJitter;
+
+    // Regardless of ticketRenewWindowFactor setting above and the ticket expiry time,
+    // thread will not sleep between refresh attempts any less than 1 minute (60*1000 milliseconds = 1 minute).
+    // Change the '1' to e.g. 5, to change this to 5 minutes.
+    private final long minTimeBeforeRelogin;
+
+    private final String kinitCmd;
+
+    private volatile Subject subject;
+
+    private LoginContext login;
+    private long lastLogin;
+
+    /**
+     * Login constructor. The constructor starts the thread used
+     * to periodically re-login to the Kerberos Ticket Granting Server.
+     * @param loginContextName
+     *               name of section in JAAS file that will be use to login.
+     *               Passed as first param to javax.security.auth.login.LoginContext().
+     * @param configs configure Login with the given key-value pairs.
+     * @throws javax.security.auth.login.LoginException
+     *               Thrown if authentication fails.
+     */
+    public Login(final String loginContextName, Map<String, ?> configs) throws LoginException {
+        this.loginContextName = loginContextName;
+        this.ticketRenewWindowFactor = (Double) configs.get(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR);
+        this.ticketRenewJitter = (Double) configs.get(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER);
+        this.minTimeBeforeRelogin = (Long) configs.get(SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN);
+        this.kinitCmd = (String) configs.get(SaslConfigs.SASL_KERBEROS_KINIT_CMD);
+
+        this.lastLogin = currentElapsedTime();
+        login = login(loginContextName);
+        subject = login.getSubject();
+        isKrbTicket = !subject.getPrivateCredentials(KerberosTicket.class).isEmpty();
+
+        AppConfigurationEntry[] entries = Configuration.getConfiguration().getAppConfigurationEntry(loginContextName);
+        if (entries.length == 0) {
+            isUsingTicketCache = false;
+            principal = null;
+        } else {
+            // there will only be a single entry
+            AppConfigurationEntry entry = entries[0];
+            if (entry.getOptions().get("useTicketCache") != null) {
+                String val = (String) entry.getOptions().get("useTicketCache");
+                isUsingTicketCache = val.equals("true");
+            } else
+                isUsingTicketCache = false;
+            if (entry.getOptions().get("principal") != null)
+                principal = (String) entry.getOptions().get("principal");
+            else
+                principal = null;
+        }
+
+        if (!isKrbTicket) {
+            log.debug("It is not a Kerberos ticket");
+            t = null;
+            // if no TGT, do not bother with ticket management.
+            return;
+        }
+        log.debug("It is a Kerberos ticket");
+
+        // Refresh the Ticket Granting Ticket (TGT) periodically. How often to refresh is determined by the
+        // TGT's existing expiry date and the configured minTimeBeforeRelogin. For testing and development,
+        // you can decrease the interval of expiration of tickets (for example, to 3 minutes) by running:
+        //  "modprinc -maxlife 3mins <principal>" in kadmin.
+        t = Utils.newThread("kafka-kerberos-refresh-thread", new Runnable() {
+            public void run() {
+                log.info("TGT refresh thread started.");
+                while (true) {  // renewal thread's main loop. if it exits from here, thread will exit.
+                    KerberosTicket tgt = getTGT();
+                    long now = currentWallTime();
+                    long nextRefresh;
+                    Date nextRefreshDate;
+                    if (tgt == null) {
+                        nextRefresh = now + minTimeBeforeRelogin;
+                        nextRefreshDate = new Date(nextRefresh);
+                        log.warn("No TGT found: will try again at {}", nextRefreshDate);
+                    } else {
+                        nextRefresh = getRefreshTime(tgt);
+                        long expiry = tgt.getEndTime().getTime();
+                        Date expiryDate = new Date(expiry);
+                        if (isUsingTicketCache && tgt.getRenewTill() != null && tgt.getRenewTill().getTime() >= expiry) {
+                            log.error("The TGT cannot be renewed beyond the next expiry date: {}." +
+                                    "This process will not be able to authenticate new SASL connections after that " +
+                                    "time (for example, it will not be able to authenticate a new connection with a Kafka " +
+                                    "Broker).  Ask your system administrator to either increase the " +
+                                    "'renew until' time by doing : 'modprinc -maxrenewlife {} ' within " +
+                                    "kadmin, or instead, to generate a keytab for {}. Because the TGT's " +
+                                    "expiry cannot be further extended by refreshing, exiting refresh thread now.",
+                                    expiryDate, principal, principal);
+                            return;
+                        }
+                        // determine how long to sleep from looking at ticket's expiry.
+                        // We should not allow the ticket to expire, but we should take into consideration
+                        // minTimeBeforeRelogin. Will not sleep less than minTimeBeforeRelogin, unless doing so
+                        // would cause ticket expiration.
+                        if ((nextRefresh > expiry) || (now + minTimeBeforeRelogin > expiry)) {
+                            // expiry is before next scheduled refresh).
+                            log.info("Refreshing now because expiry is before next scheduled refresh time.");
+                            nextRefresh = now;
+                        } else {
+                            if (nextRefresh < (now + minTimeBeforeRelogin)) {
+                                // next scheduled refresh is sooner than (now + MIN_TIME_BEFORE_LOGIN).
+                                Date until = new Date(nextRefresh);
+                                Date newUntil = new Date(now + minTimeBeforeRelogin);
+                                log.warn("TGT refresh thread time adjusted from {} to {} since the former is sooner " +
+                                        "than the minimum refresh interval ({} seconds) from now.",
+                                        until, newUntil, minTimeBeforeRelogin / 1000);
+                            }
+                            nextRefresh = Math.max(nextRefresh, now + minTimeBeforeRelogin);
+                        }
+                        nextRefreshDate = new Date(nextRefresh);
+                        if (nextRefresh > expiry) {
+                            log.error("Next refresh: {} is later than expiry {}. This may indicate a clock skew problem." +
+                                    "Check that this host and the KDC hosts' clocks are in sync. Exiting refresh thread.",
+                                    nextRefreshDate, expiryDate);
+                            return;
+                        }
+                    }
+                    if (now < nextRefresh) {
+                        Date until = new Date(nextRefresh);
+                        log.info("TGT refresh sleeping until: {}", until);
+                        try {
+                            Thread.sleep(nextRefresh - now);
+                        } catch (InterruptedException ie) {
+                            log.warn("TGT renewal thread has been interrupted and will exit.");
+                            return;
+                        }
+                    } else {
+                        log.error("NextRefresh: {} is in the past: exiting refresh thread. Check"
+                                + " clock sync between this host and KDC - (KDC's clock is likely ahead of this host)."
+                                + " Manual intervention will be required for this client to successfully authenticate."
+                                + " Exiting refresh thread.", nextRefreshDate);
+                        return;
+                    }
+                    if (isUsingTicketCache) {
+                        String kinitArgs = "-R";
+                        int retry = 1;
+                        while (retry >= 0) {
+                            try {
+                                log.debug("Running ticket cache refresh command: {} {}", kinitCmd, kinitArgs);
+                                Shell.execCommand(kinitCmd, kinitArgs);
+                                break;
+                            } catch (Exception e) {
+                                if (retry > 0) {
+                                    --retry;
+                                    // sleep for 10 seconds
+                                    try {
+                                        Thread.sleep(10 * 1000);
+                                    } catch (InterruptedException ie) {
+                                        log.error("Interrupted while renewing TGT, exiting Login thread");
+                                        return;
+                                    }
+                                } else {
+                                    log.warn("Could not renew TGT due to problem running shell command: '" + kinitCmd
+                                            + " " + kinitArgs + "'" + "; exception was: " + e + ". Exiting refresh thread.", e);
+                                    return;
+                                }
+                            }
+                        }
+                    }
+                    try {
+                        int retry = 1;
+                        while (retry >= 0) {
+                            try {
+                                reLogin();
+                                break;
+                            } catch (LoginException le) {
+                                if (retry > 0) {
+                                    --retry;
+                                    // sleep for 10 seconds.
+                                    try {
+                                        Thread.sleep(10 * 1000);
+                                    } catch (InterruptedException e) {
+                                        log.error("Interrupted during login retry after LoginException:", le);
+                                        throw le;
+                                    }
+                                } else {
+                                    log.error("Could not refresh TGT for principal: " + principal + ".", le);
+                                }
+                            }
+                        }
+                    } catch (LoginException le) {
+                        log.error("Failed to refresh TGT: refresh thread exiting now.", le);
+                        return;
+                    }
+                }
+            }
+        }, false);
+    }
+
+    public void startThreadIfNeeded() {
+        // thread object 't' will be null if a refresh thread is not needed.
+        if (t != null) {
+            t.start();
+        }
+    }
+
+    public void shutdown() {
+        if ((t != null) && (t.isAlive())) {
+            t.interrupt();
+            try {
+                t.join();
+            } catch (InterruptedException e) {
+                log.warn("Error while waiting for Login thread to shutdown: " + e, e);
+            }
+        }
+    }
+
+    public Subject subject() {
+        return subject;
+    }
+
+    private synchronized LoginContext login(final String loginContextName) throws LoginException {
+        String jaasConfigFile = System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM);
+        if (jaasConfigFile == null) {
+            throw new IllegalArgumentException("You must pass " + JaasUtils.JAVA_LOGIN_CONFIG_PARAM + " in secure mode.");
+        }
+
+        AppConfigurationEntry[] configEntries = Configuration.getConfiguration().getAppConfigurationEntry(loginContextName);
+        if (configEntries == null) {
+            // Forcing a reload of the configuration in case it's been overridden by third-party code.
+            // Without this, our tests fail sometimes depending on the order the tests are executed.
+            // Singletons are bad.
+            Configuration.setConfiguration(null);
+            configEntries = Configuration.getConfiguration().getAppConfigurationEntry(loginContextName);
+            if (configEntries == null) {
+                String errorMessage = "Could not find a '" + loginContextName + "' entry in `" + jaasConfigFile + "`.";
+                throw new IllegalArgumentException(errorMessage);
+            } else {
+                log.info("Found `" + loginContextName + "` in JAAS configuration after forced reload.");
+            }
+        }
+
+        LoginContext loginContext = new LoginContext(loginContextName, callbackHandler);
+        loginContext.login();
+        log.info("Successfully logged in.");
+        return loginContext;
+    }
+
+    private long getRefreshTime(KerberosTicket tgt) {
+        long start = tgt.getStartTime().getTime();
+        long expires = tgt.getEndTime().getTime();
+        log.info("TGT valid starting at: {}", tgt.getStartTime());
+        log.info("TGT expires: {}", tgt.getEndTime());
+        long proposedRefresh = start + (long) ((expires - start) *
+                (ticketRenewWindowFactor + (ticketRenewJitter * RNG.nextDouble())));
+
+        if (proposedRefresh > expires)
+            // proposedRefresh is too far in the future: it's after ticket expires: simply return now.
+            return currentWallTime();
+        else
+            return proposedRefresh;
+    }
+
+    private synchronized KerberosTicket getTGT() {
+        Set<KerberosTicket> tickets = subject.getPrivateCredentials(KerberosTicket.class);
+        for (KerberosTicket ticket : tickets) {
+            KerberosPrincipal server = ticket.getServer();
+            if (server.getName().equals("krbtgt/" + server.getRealm() + "@" + server.getRealm())) {
+                log.debug("Found TGT {}.", ticket);
+                return ticket;
+            }
+        }
+        return null;
+    }
+
+    private boolean hasSufficientTimeElapsed() {
+        long now = currentElapsedTime();
+        if (now - lastLogin < minTimeBeforeRelogin) {
+            log.warn("Not attempting to re-login since the last re-login was attempted less than {} seconds before.",
+                    minTimeBeforeRelogin / 1000);
+            return false;
+        }
+        return true;
+    }
+
+    /**
+     * Re-login a principal. This method assumes that {@link #login(String)} has happened already.
+     * @throws javax.security.auth.login.LoginException on a failure
+     */
+    private synchronized void reLogin()
+            throws LoginException {
+        if (!isKrbTicket) {
+            return;
+        }
+        if (login == null) {
+            throw new LoginException("Login must be done first");
+        }
+        if (!hasSufficientTimeElapsed()) {
+            return;
+        }
+        log.info("Initiating logout for {}", principal);
+        synchronized (Login.class) {
+            // register most recent relogin attempt
+            lastLogin = currentElapsedTime();
+            //clear up the kerberos state. But the tokens are not cleared! As per
+            //the Java kerberos login module code, only the kerberos credentials
+            //are cleared
+            login.logout();
+            //login and also update the subject field of this instance to
+            //have the new credentials (pass it to the LoginContext constructor)
+            login = new LoginContext(loginContextName, subject);
+            log.info("Initiating re-login for {}", principal);
+            login.login();
+        }
+    }
+
+    private long currentElapsedTime() {
+        return time.nanoseconds() / 1000000;
+    }
+
+    private long currentWallTime() {
+        return time.milliseconds();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/clients/src/main/java/org/apache/kafka/common/security/kerberos/LoginManager.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/kerberos/LoginManager.java b/clients/src/main/java/org/apache/kafka/common/security/kerberos/LoginManager.java
new file mode 100644
index 0000000..18651c8
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/kerberos/LoginManager.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.kerberos;
+
+import javax.security.auth.Subject;
+import javax.security.auth.login.LoginException;
+import java.io.IOException;
+import java.util.EnumMap;
+import java.util.Map;
+
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.network.LoginType;
+import org.apache.kafka.common.security.JaasUtils;
+
+public class LoginManager {
+
+    private static final EnumMap<LoginType, LoginManager> CACHED_INSTANCES = new EnumMap(LoginType.class);
+
+    private final Login login;
+    private final String serviceName;
+    private final LoginType loginType;
+    private int refCount;
+
+    private LoginManager(LoginType loginType, Map<String, ?> configs) throws IOException, LoginException {
+        this.loginType = loginType;
+        String loginContext = loginType.contextName();
+        login = new Login(loginContext, configs);
+        this.serviceName = getServiceName(loginContext, configs);
+        login.startThreadIfNeeded();
+    }
+
+    private static String getServiceName(String loginContext, Map<String, ?> configs) throws IOException {
+        String jaasServiceName = JaasUtils.jaasConfig(loginContext, JaasUtils.SERVICE_NAME);
+        String configServiceName = (String) configs.get(SaslConfigs.SASL_KERBEROS_SERVICE_NAME);
+        if (jaasServiceName != null && configServiceName != null && jaasServiceName != configServiceName) {
+            String message = "Conflicting serviceName values found in JAAS and Kafka configs " +
+                "value in JAAS file " + jaasServiceName + ", value in Kafka config " + configServiceName;
+            throw new IllegalArgumentException(message);
+        }
+
+        if (jaasServiceName != null)
+            return jaasServiceName;
+        if (configServiceName != null)
+            return configServiceName;
+
+        throw new IllegalArgumentException("No serviceName defined in either JAAS or Kafka config");
+    }
+
+    /**
+     * Returns an instance of `LoginManager` and increases its reference count.
+     *
+     * `release()` should be invoked when the `LoginManager` is no longer needed. This method will try to reuse an
+     * existing `LoginManager` for the provided `mode` if available. However, it expects `configs` to be the same for
+     * every invocation and it will ignore them in the case where it's returning a cached instance of `LoginManager`.
+     *
+     * This is a bit ugly and it would be nicer if we could pass the `LoginManager` to `ChannelBuilders.create` and
+     * shut it down when the broker or clients are closed. It's straightforward to do the former, but it's more
+     * complicated to do the latter without making the consumer API more complex.
+     *
+     * @param loginType the type of the login context, it should be SERVER for the broker and CLIENT for the clients
+     *                  (i.e. consumer and producer)
+     * @param configs configuration as key/value pairs
+     */
+    public static final LoginManager acquireLoginManager(LoginType loginType, Map<String, ?> configs) throws IOException, LoginException {
+        synchronized (LoginManager.class) {
+            LoginManager loginManager = CACHED_INSTANCES.get(loginType);
+            if (loginManager == null) {
+                loginManager = new LoginManager(loginType, configs);
+                CACHED_INSTANCES.put(loginType, loginManager);
+            }
+            return loginManager.acquire();
+        }
+    }
+
+    public Subject subject() {
+        return login.subject();
+    }
+
+    public String serviceName() {
+        return serviceName;
+    }
+
+    private LoginManager acquire() {
+        ++refCount;
+        return this;
+    }
+
+    /**
+     * Decrease the reference count for this instance and release resources if it reaches 0.
+     */
+    public void release() {
+        synchronized (LoginManager.class) {
+            if (refCount == 0)
+                throw new IllegalStateException("release called on LoginManager with refCount == 0");
+            else if (refCount == 1) {
+                CACHED_INSTANCES.remove(loginType);
+                login.shutdown();
+            }
+            --refCount;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/clients/src/main/java/org/apache/kafka/common/security/kerberos/NoMatchingRule.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/kerberos/NoMatchingRule.java b/clients/src/main/java/org/apache/kafka/common/security/kerberos/NoMatchingRule.java
new file mode 100644
index 0000000..6c2d267
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/kerberos/NoMatchingRule.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.kerberos;
+
+import java.io.IOException;
+
+public class NoMatchingRule extends IOException {
+    NoMatchingRule(String msg) {
+        super(msg);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/clients/src/main/java/org/apache/kafka/common/security/ssl/SSLFactory.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/ssl/SSLFactory.java b/clients/src/main/java/org/apache/kafka/common/security/ssl/SSLFactory.java
index b291409..163b8c6 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/ssl/SSLFactory.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/ssl/SSLFactory.java
@@ -19,19 +19,19 @@ package org.apache.kafka.common.security.ssl;
 import org.apache.kafka.common.Configurable;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.config.SSLConfigs;
+import org.apache.kafka.common.network.Mode;
 
 import javax.net.ssl.*;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.security.GeneralSecurityException;
 import java.security.KeyStore;
+
 import java.util.List;
 import java.util.Map;
 
-
 public class SSLFactory implements Configurable {
 
-    public enum Mode { CLIENT, SERVER };
     private String protocol;
     private String provider;
     private String kmfAlgorithm;

http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/clients/src/main/java/org/apache/kafka/common/utils/Shell.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Shell.java b/clients/src/main/java/org/apache/kafka/common/utils/Shell.java
new file mode 100644
index 0000000..f5db5c3
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Shell.java
@@ -0,0 +1,304 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.kafka.common.utils;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A base class for running a Unix command.
+ *
+ * <code>Shell</code> can be used to run unix commands like <code>du</code> or
+ * <code>df</code>.
+ */
+abstract public class Shell {
+
+    private static final Logger LOG = LoggerFactory.getLogger(Shell.class);
+
+    /** Return an array containing the command name and its parameters */
+    protected abstract String[] execString();
+
+    /** Parse the execution result */
+    protected abstract void parseExecResult(BufferedReader lines) throws IOException;
+
+    private final long timeout;
+
+    private int exitCode;
+    private Process process; // sub process used to execute the command
+
+    /* If or not script finished executing */
+    private volatile AtomicBoolean completed;
+
+    /**
+     * @param timeout Specifies the time in milliseconds, after which the command will be killed. -1 means no timeout.
+     */
+    public Shell(long timeout) {
+        this.timeout = timeout;
+    }
+
+    /** get the exit code
+     * @return the exit code of the process
+     */
+    public int exitCode() {
+        return exitCode;
+    }
+
+    /** get the current sub-process executing the given command
+     * @return process executing the command
+     */
+    public Process process() {
+        return process;
+    }
+
+    protected void run() throws IOException {
+        exitCode = 0; // reset for next run
+        runCommand();
+    }
+
+    /** Run a command */
+    private void runCommand() throws IOException {
+        ProcessBuilder builder = new ProcessBuilder(execString());
+        Timer timeoutTimer = null;
+        completed = new AtomicBoolean(false);
+
+        process = builder.start();
+        if (timeout > -1) {
+            timeoutTimer = new Timer();
+            //One time scheduling.
+            timeoutTimer.schedule(new ShellTimeoutTimerTask(this), timeout);
+        }
+        final BufferedReader errReader = new BufferedReader(new InputStreamReader(process.getErrorStream()));
+        BufferedReader inReader = new BufferedReader(new InputStreamReader(process.getInputStream()));
+        final StringBuffer errMsg = new StringBuffer();
+
+        // read error and input streams as this would free up the buffers
+        // free the error stream buffer
+        Thread errThread = Utils.newThread("kafka-shell-thread", new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    String line = errReader.readLine();
+                    while ((line != null) && !Thread.currentThread().isInterrupted()) {
+                        errMsg.append(line);
+                        errMsg.append(System.getProperty("line.separator"));
+                        line = errReader.readLine();
+                    }
+                } catch (IOException ioe) {
+                    LOG.warn("Error reading the error stream", ioe);
+                }
+            }
+        }, false);
+        errThread.start();
+
+        try {
+            parseExecResult(inReader); // parse the output
+            // clear the input stream buffer
+            String line = null;
+            while (line != null) {
+                line = inReader.readLine();
+            }
+            // wait for the process to finish and check the exit code
+            exitCode = process.waitFor();
+            try {
+                // make sure that the error thread exits
+                errThread.join();
+            } catch (InterruptedException ie) {
+                LOG.warn("Interrupted while reading the error stream", ie);
+            }
+            completed.set(true);
+            //the timeout thread handling
+            //taken care in finally block
+            if (exitCode != 0) {
+                throw new ExitCodeException(exitCode, errMsg.toString());
+            }
+        } catch (InterruptedException ie) {
+            throw new IOException(ie.toString());
+        } finally {
+            if (timeoutTimer != null)
+                timeoutTimer.cancel();
+
+            // close the input stream
+            try {
+                inReader.close();
+            } catch (IOException ioe) {
+                LOG.warn("Error while closing the input stream", ioe);
+            }
+            if (!completed.get())
+                errThread.interrupt();
+
+            try {
+                errReader.close();
+            } catch (IOException ioe) {
+                LOG.warn("Error while closing the error stream", ioe);
+            }
+
+            process.destroy();
+        }
+    }
+
+
+    /**
+     * This is an IOException with exit code added.
+     */
+    @SuppressWarnings("serial")
+    public static class ExitCodeException extends IOException {
+        int exitCode;
+
+        public ExitCodeException(int exitCode, String message) {
+            super(message);
+            this.exitCode = exitCode;
+        }
+
+        public int getExitCode() {
+            return exitCode;
+        }
+    }
+
+    /**
+     * A simple shell command executor.
+     *
+     * <code>ShellCommandExecutor</code>should be used in cases where the output
+     * of the command needs no explicit parsing and where the command, working
+     * directory and the environment remains unchanged. The output of the command
+     * is stored as-is and is expected to be small.
+     */
+    public static class ShellCommandExecutor extends Shell {
+
+        private final String[] command;
+        private StringBuffer output;
+
+        /**
+         * Create a new instance of the ShellCommandExecutor to execute a command.
+         *
+         * @param execString The command to execute with arguments
+         * @param timeout Specifies the time in milliseconds, after which the
+         *                command will be killed. -1 means no timeout.
+         */
+
+        public ShellCommandExecutor(String[] execString, long timeout) {
+            super(timeout);
+            command = execString.clone();
+        }
+
+
+        /** Execute the shell command. */
+        public void execute() throws IOException {
+            this.run();
+        }
+
+        protected String[] execString() {
+            return command;
+        }
+
+        protected void parseExecResult(BufferedReader reader) throws IOException {
+            output = new StringBuffer();
+            char[] buf = new char[512];
+            int nRead;
+            while ((nRead = reader.read(buf, 0, buf.length)) > 0) {
+                output.append(buf, 0, nRead);
+            }
+        }
+
+        /** Get the output of the shell command.*/
+        public String output() {
+            return (output == null) ? "" : output.toString();
+        }
+
+        /**
+         * Returns the commands of this instance.
+         * Arguments with spaces in are presented with quotes round; other
+         * arguments are presented raw
+         *
+         * @return a string representation of the object.
+         */
+        public String toString() {
+            StringBuilder builder = new StringBuilder();
+            String[] args = execString();
+            for (String s : args) {
+                if (s.indexOf(' ') >= 0) {
+                    builder.append('"').append(s).append('"');
+                } else {
+                    builder.append(s);
+                }
+                builder.append(' ');
+            }
+            return builder.toString();
+        }
+    }
+
+    /**
+     * Static method to execute a shell command.
+     * Covers most of the simple cases without requiring the user to implement
+     * the <code>Shell</code> interface.
+     * @param cmd shell command to execute.
+     * @return the output of the executed command.
+     */
+    public static String execCommand(String ... cmd) throws IOException {
+        return execCommand(cmd, -1);
+    }
+
+    /**
+     * Static method to execute a shell command.
+     * Covers most of the simple cases without requiring the user to implement
+     * the <code>Shell</code> interface.
+     * @param cmd shell command to execute.
+     * @param timeout time in milliseconds after which script should be killed. -1 means no timeout.
+     * @return the output of the executed command.
+     */
+    public static String execCommand(String[] cmd, long timeout) throws IOException {
+        ShellCommandExecutor exec = new ShellCommandExecutor(cmd, timeout);
+        exec.execute();
+        return exec.output();
+    }
+
+    /**
+     * Timer which is used to timeout scripts spawned off by shell.
+     */
+    private static class ShellTimeoutTimerTask extends TimerTask {
+
+        private final Shell shell;
+
+        public ShellTimeoutTimerTask(Shell shell) {
+            this.shell = shell;
+        }
+
+        @Override
+        public void run() {
+            Process p = shell.process();
+            try {
+                p.exitValue();
+            } catch (Exception e) {
+                //Process has not terminated.
+                //So check if it has completed
+                //if not just destroy it.
+                if (p != null && !shell.completed.get()) {
+                    p.destroy();
+                }
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/clients/src/main/java/org/apache/kafka/common/utils/Time.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Time.java b/clients/src/main/java/org/apache/kafka/common/utils/Time.java
index 66c44de..b2fad7f 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/Time.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Time.java
@@ -24,16 +24,16 @@ public interface Time {
     /**
      * The current time in milliseconds
      */
-    public long milliseconds();
+    long milliseconds();
 
     /**
      * The current time in nanoseconds
      */
-    public long nanoseconds();
+    long nanoseconds();
 
     /**
      * Sleep for the given number of milliseconds
      */
-    public void sleep(long ms);
+    void sleep(long ms);
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/clients/src/test/java/org/apache/kafka/common/network/EchoServer.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/EchoServer.java b/clients/src/test/java/org/apache/kafka/common/network/EchoServer.java
index f13c21a..4a6d304 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/EchoServer.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/EchoServer.java
@@ -45,7 +45,7 @@ class EchoServer extends Thread {
         this.protocol =  configs.containsKey("security.protocol") ?
             SecurityProtocol.valueOf((String) configs.get("security.protocol")) : SecurityProtocol.PLAINTEXT;
         if (protocol == SecurityProtocol.SSL) {
-            this.sslFactory = new SSLFactory(SSLFactory.Mode.SERVER);
+            this.sslFactory = new SSLFactory(Mode.SERVER);
             this.sslFactory.configure(configs);
             SSLContext sslContext = this.sslFactory.sslContext();
             this.serverSocket = sslContext.getServerSocketFactory().createServerSocket(0);

http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/clients/src/test/java/org/apache/kafka/common/network/SSLSelectorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SSLSelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SSLSelectorTest.java
index c60053f..6475ff0 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SSLSelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SSLSelectorTest.java
@@ -22,7 +22,6 @@ import java.net.InetSocketAddress;
 
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.config.SSLConfigs;
-import org.apache.kafka.common.security.ssl.SSLFactory;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.test.TestSSLUtils;
 import org.junit.After;
@@ -40,15 +39,15 @@ public class SSLSelectorTest extends SelectorTest {
     public void setup() throws Exception {
         File trustStoreFile = File.createTempFile("truststore", ".jks");
 
-        Map<String, Object> sslServerConfigs = TestSSLUtils.createSSLConfig(false, true, SSLFactory.Mode.SERVER, trustStoreFile, "server");
+        Map<String, Object> sslServerConfigs = TestSSLUtils.createSSLConfig(false, true, Mode.SERVER, trustStoreFile, "server");
         sslServerConfigs.put(SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Class.forName(SSLConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS));
         this.server = new EchoServer(sslServerConfigs);
         this.server.start();
         this.time = new MockTime();
-        Map<String, Object> sslClientConfigs = TestSSLUtils.createSSLConfig(false, false, SSLFactory.Mode.SERVER, trustStoreFile, "client");
+        Map<String, Object> sslClientConfigs = TestSSLUtils.createSSLConfig(false, false, Mode.SERVER, trustStoreFile, "client");
         sslClientConfigs.put(SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Class.forName(SSLConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS));
 
-        this.channelBuilder = new SSLChannelBuilder(SSLFactory.Mode.CLIENT);
+        this.channelBuilder = new SSLChannelBuilder(Mode.CLIENT);
         this.channelBuilder.configure(sslClientConfigs);
         this.metrics = new Metrics();
         this.selector = new Selector(5000, metrics, time, "MetricGroup", new LinkedHashMap<String, String>(), channelBuilder);

http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/clients/src/test/java/org/apache/kafka/common/network/SSLTransportLayerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SSLTransportLayerTest.java b/clients/src/test/java/org/apache/kafka/common/network/SSLTransportLayerTest.java
index 6993f52..987f4bb 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SSLTransportLayerTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SSLTransportLayerTest.java
@@ -73,7 +73,7 @@ public class SSLTransportLayerTest {
         sslServerConfigs = serverCertStores.getTrustingConfig(clientCertStores);
         sslClientConfigs = clientCertStores.getTrustingConfig(serverCertStores);
 
-        this.channelBuilder = new SSLChannelBuilder(SSLFactory.Mode.CLIENT);
+        this.channelBuilder = new SSLChannelBuilder(Mode.CLIENT);
         this.channelBuilder.configure(sslClientConfigs);
         this.selector = new Selector(5000, new Metrics(), new MockTime(), "MetricGroup", new LinkedHashMap<String, String>(), channelBuilder);
     }
@@ -270,7 +270,7 @@ public class SSLTransportLayerTest {
      */
     @Test
     public void testInvalidTruststorePassword() throws Exception {
-        SSLChannelBuilder channelBuilder = new SSLChannelBuilder(SSLFactory.Mode.CLIENT);
+        SSLChannelBuilder channelBuilder = new SSLChannelBuilder(Mode.CLIENT);
         try {
             sslClientConfigs.put(SSLConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "invalid");
             channelBuilder.configure(sslClientConfigs);
@@ -285,7 +285,7 @@ public class SSLTransportLayerTest {
      */
     @Test
     public void testInvalidKeystorePassword() throws Exception {
-        SSLChannelBuilder channelBuilder = new SSLChannelBuilder(SSLFactory.Mode.CLIENT);
+        SSLChannelBuilder channelBuilder = new SSLChannelBuilder(Mode.CLIENT);
         try {
             sslClientConfigs.put(SSLConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "invalid");
             channelBuilder.configure(sslClientConfigs);
@@ -437,7 +437,7 @@ public class SSLTransportLayerTest {
 
     private void createSelector(Map<String, Object> sslClientConfigs, final Integer netReadBufSize, final Integer netWriteBufSize, final Integer appBufSize) {
         
-        this.channelBuilder = new SSLChannelBuilder(SSLFactory.Mode.CLIENT) {
+        this.channelBuilder = new SSLChannelBuilder(Mode.CLIENT) {
 
             @Override
             protected SSLTransportLayer buildTransportLayer(SSLFactory sslFactory, String id, SelectionKey key) throws IOException {
@@ -461,7 +461,7 @@ public class SSLTransportLayerTest {
         
         CertStores(boolean server) throws Exception {
             String name = server ? "server" : "client";
-            SSLFactory.Mode mode = server ? SSLFactory.Mode.SERVER : SSLFactory.Mode.CLIENT;
+            Mode mode = server ? Mode.SERVER : Mode.CLIENT;
             File truststoreFile = File.createTempFile(name + "TS", ".jks");
             sslConfig = TestSSLUtils.createSSLConfig(!server, true, mode, truststoreFile, name);
             sslConfig.put(SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Class.forName(SSLConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS));
@@ -549,7 +549,7 @@ public class SSLTransportLayerTest {
         private final ConcurrentLinkedQueue<NetworkSend> inflightSends = new ConcurrentLinkedQueue<NetworkSend>();
 
         public SSLEchoServer(Map<String, ?> configs, String serverHost) throws Exception {
-            this.sslFactory = new SSLFactory(SSLFactory.Mode.SERVER);
+            this.sslFactory = new SSLFactory(Mode.SERVER);
             this.sslFactory.configure(configs);
             serverSocketChannel = ServerSocketChannel.open();
             serverSocketChannel.configureBlocking(false);
@@ -557,7 +557,7 @@ public class SSLTransportLayerTest {
             this.port = serverSocketChannel.socket().getLocalPort();
             this.socketChannels = Collections.synchronizedList(new ArrayList<SocketChannel>());
             this.newChannels = Collections.synchronizedList(new ArrayList<SocketChannel>());
-            SSLChannelBuilder channelBuilder = new SSLChannelBuilder(SSLFactory.Mode.SERVER);
+            SSLChannelBuilder channelBuilder = new SSLChannelBuilder(Mode.SERVER);
             channelBuilder.configure(sslServerConfigs);
             this.selector = new Selector(5000, new Metrics(), new MockTime(), "MetricGroup", new LinkedHashMap<String, String>(), channelBuilder);
             setName("echoserver");

http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/clients/src/test/java/org/apache/kafka/common/security/kerberos/KerberosNameTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/security/kerberos/KerberosNameTest.java b/clients/src/test/java/org/apache/kafka/common/security/kerberos/KerberosNameTest.java
new file mode 100644
index 0000000..9781f6d
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/security/kerberos/KerberosNameTest.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.kerberos;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class KerberosNameTest {
+
+    @Test
+    public void testParse() throws IOException {
+        List<String> rules = new ArrayList<>(Arrays.asList(
+            "RULE:[1:$1](App\\..*)s/App\\.(.*)/$1/g",
+            "RULE:[2:$1](App\\..*)s/App\\.(.*)/$1/g",
+            "DEFAULT"
+        ));
+        KerberosNameParser parser = new KerberosNameParser("REALM.COM", rules);
+
+        KerberosName name = parser.parse("App.service-name/example.com@REALM.COM");
+        assertEquals("App.service-name", name.serviceName());
+        assertEquals("example.com", name.hostName());
+        assertEquals("REALM.COM", name.realm());
+        assertEquals("service-name", name.shortName());
+
+        name = parser.parse("App.service-name@REALM.COM");
+        assertEquals("App.service-name", name.serviceName());
+        assertNull(name.hostName());
+        assertEquals("REALM.COM", name.realm());
+        assertEquals("service-name", name.shortName());
+
+        name = parser.parse("user/host@REALM.COM");
+        assertEquals("user", name.serviceName());
+        assertEquals("host", name.hostName());
+        assertEquals("REALM.COM", name.realm());
+        assertEquals("user", name.shortName());
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/clients/src/test/java/org/apache/kafka/common/security/ssl/SSLFactoryTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/security/ssl/SSLFactoryTest.java b/clients/src/test/java/org/apache/kafka/common/security/ssl/SSLFactoryTest.java
index 0aec666..e90ec2b 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/ssl/SSLFactoryTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/ssl/SSLFactoryTest.java
@@ -18,6 +18,7 @@ import java.io.File;
 import java.util.Map;
 
 import org.apache.kafka.test.TestSSLUtils;
+import org.apache.kafka.common.network.Mode;
 
 import org.junit.Test;
 import static org.junit.Assert.assertNotNull;
@@ -35,8 +36,8 @@ public class SSLFactoryTest {
     @Test
     public void testSSLFactoryConfiguration() throws Exception {
         File trustStoreFile = File.createTempFile("truststore", ".jks");
-        Map<String, Object> serverSSLConfig = TestSSLUtils.createSSLConfig(false, true, SSLFactory.Mode.SERVER, trustStoreFile, "server");
-        SSLFactory sslFactory = new SSLFactory(SSLFactory.Mode.SERVER);
+        Map<String, Object> serverSSLConfig = TestSSLUtils.createSSLConfig(false, true, Mode.SERVER, trustStoreFile, "server");
+        SSLFactory sslFactory = new SSLFactory(Mode.SERVER);
         sslFactory.configure(serverSSLConfig);
         //host and port are hints
         SSLEngine engine = sslFactory.createSSLEngine("localhost", 0);
@@ -49,8 +50,8 @@ public class SSLFactoryTest {
     @Test
     public void testClientMode() throws Exception {
         File trustStoreFile = File.createTempFile("truststore", ".jks");
-        Map<String, Object> clientSSLConfig = TestSSLUtils.createSSLConfig(false, true, SSLFactory.Mode.CLIENT, trustStoreFile, "client");
-        SSLFactory sslFactory = new SSLFactory(SSLFactory.Mode.CLIENT);
+        Map<String, Object> clientSSLConfig = TestSSLUtils.createSSLConfig(false, true, Mode.CLIENT, trustStoreFile, "client");
+        SSLFactory sslFactory = new SSLFactory(Mode.CLIENT);
         sslFactory.configure(clientSSLConfig);
         //host and port are hints
         SSLEngine engine = sslFactory.createSSLEngine("localhost", 0);

http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/clients/src/test/java/org/apache/kafka/common/utils/MockTime.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/MockTime.java b/clients/src/test/java/org/apache/kafka/common/utils/MockTime.java
index eb7fcf0..387e48f 100644
--- a/clients/src/test/java/org/apache/kafka/common/utils/MockTime.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/MockTime.java
@@ -3,9 +3,9 @@
  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
  * License. You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.

http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/clients/src/test/java/org/apache/kafka/test/TestSSLUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/TestSSLUtils.java b/clients/src/test/java/org/apache/kafka/test/TestSSLUtils.java
index c01cf37..b231692 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestSSLUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestSSLUtils.java
@@ -18,7 +18,7 @@
 package org.apache.kafka.test;
 
 import org.apache.kafka.common.config.SSLConfigs;
-import org.apache.kafka.common.security.ssl.SSLFactory;
+import org.apache.kafka.common.network.Mode;
 import org.apache.kafka.clients.CommonClientConfigs;
 
 import java.io.File;
@@ -177,13 +177,13 @@ public class TestSSLUtils {
         return certs;
     }
 
-    public static Map<String, Object> createSSLConfig(SSLFactory.Mode mode, File keyStoreFile, String password, String keyPassword,
+    public static Map<String, Object> createSSLConfig(Mode mode, File keyStoreFile, String password, String keyPassword,
                                                       File trustStoreFile, String trustStorePassword) {
         Map<String, Object> sslConfigs = new HashMap<String, Object>();
         sslConfigs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL"); // kafka security protocol
         sslConfigs.put(SSLConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.2"); // protocol to create SSLContext
 
-        if (mode == SSLFactory.Mode.SERVER || (mode == SSLFactory.Mode.CLIENT && keyStoreFile != null)) {
+        if (mode == Mode.SERVER || (mode == Mode.CLIENT && keyStoreFile != null)) {
             sslConfigs.put(SSLConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keyStoreFile.getPath());
             sslConfigs.put(SSLConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS");
             sslConfigs.put(SSLConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, TrustManagerFactory.getDefaultAlgorithm());
@@ -203,13 +203,13 @@ public class TestSSLUtils {
         return sslConfigs;
     }
 
-    public static  Map<String, Object> createSSLConfig(boolean useClientCert, boolean trustStore, SSLFactory.Mode mode, File trustStoreFile, String certAlias)
+    public static  Map<String, Object> createSSLConfig(boolean useClientCert, boolean trustStore, Mode mode, File trustStoreFile, String certAlias)
         throws IOException, GeneralSecurityException {
         Map<String, X509Certificate> certs = new HashMap<String, X509Certificate>();
         File keyStoreFile;
         String password;
 
-        if (mode == SSLFactory.Mode.SERVER)
+        if (mode == Mode.SERVER)
             password = "ServerPassword";
         else
             password = "ClientPassword";

http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index c2076a2..3756822 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -21,10 +21,9 @@ import kafka.utils._
 import org.apache.kafka.clients.{ClientResponse, ClientRequest, ManualMetadataUpdater, NetworkClient}
 import org.apache.kafka.common.{TopicPartition, Node}
 import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.network.{Selectable, ChannelBuilders, Selector, NetworkReceive}
+import org.apache.kafka.common.network.{LoginType, Selectable, ChannelBuilders, Selector, NetworkReceive, Mode}
 import org.apache.kafka.common.protocol.{SecurityProtocol, ApiKeys}
 import org.apache.kafka.common.requests._
-import org.apache.kafka.common.security.ssl.SSLFactory
 import org.apache.kafka.common.utils.Time
 import collection.mutable.HashMap
 import kafka.cluster.Broker
@@ -97,7 +96,7 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf
         "controller-channel",
         Map("broker-id" -> broker.id.toString).asJava,
         false,
-        ChannelBuilders.create(config.interBrokerSecurityProtocol, SSLFactory.Mode.CLIENT, config.channelConfigs)
+        ChannelBuilders.create(config.interBrokerSecurityProtocol, Mode.CLIENT, LoginType.SERVER, config.channelConfigs)
       )
       new NetworkClient(
         selector,

http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index 41a3705..1066fbe 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -33,9 +33,8 @@ import kafka.server.KafkaConfig
 import kafka.utils._
 import org.apache.kafka.common.MetricName
 import org.apache.kafka.common.metrics._
-import org.apache.kafka.common.network.{Selector => KSelector, ChannelBuilders, InvalidReceiveException}
+import org.apache.kafka.common.network.{Selector => KSelector, LoginType, Mode, ChannelBuilders, InvalidReceiveException}
 import org.apache.kafka.common.security.auth.KafkaPrincipal
-import org.apache.kafka.common.security.ssl.SSLFactory
 import org.apache.kafka.common.protocol.SecurityProtocol
 import org.apache.kafka.common.protocol.types.SchemaException
 import org.apache.kafka.common.utils.{Time, Utils}
@@ -378,7 +377,7 @@ private[kafka] class Processor(val id: Int,
 
   private val newConnections = new ConcurrentLinkedQueue[SocketChannel]()
   private val inflightResponses = mutable.Map[String, RequestChannel.Response]()
-  private val channelBuilder = ChannelBuilders.create(protocol, SSLFactory.Mode.SERVER, channelConfigs)
+  private val channelBuilder = ChannelBuilders.create(protocol, Mode.SERVER, LoginType.SERVER, channelConfigs)
   private val metricTags = new util.HashMap[String, String]()
   metricTags.put("networkProcessor", id.toString)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 913d49b..194ee9c 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -26,6 +26,8 @@ import kafka.consumer.ConsumerConfig
 import kafka.message.{BrokerCompressionCodec, CompressionCodec, Message, MessageSet}
 import kafka.utils.CoreUtils
 import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.common.config.SaslConfigs
+
 import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, SSLConfigs}
 import org.apache.kafka.common.metrics.MetricsReporter
 import org.apache.kafka.common.protocol.SecurityProtocol
@@ -33,7 +35,6 @@ import org.apache.kafka.common.security.auth.PrincipalBuilder
 
 import scala.collection.{Map, immutable}
 
-
 object Defaults {
   /** ********* Zookeeper Configuration ***********/
   val ZkSessionTimeoutMs = 6000
@@ -168,6 +169,14 @@ object Defaults {
   val SSLClientAuthNone = "none"
   val SSLClientAuth = SSLClientAuthNone
   val SSLCipherSuites = ""
+
+  /** ********* Sasl configuration ***********/
+  val SaslKerberosKinitCmd = SaslConfigs.DEFAULT_KERBEROS_KINIT_CMD
+  val SaslKerberosTicketRenewWindowFactor = SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_WINDOW_FACTOR
+  val SaslKerberosTicketRenewJitter = SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_JITTER
+  val SaslKerberosMinTimeBeforeRelogin = SaslConfigs.DEFAULT_KERBEROS_MIN_TIME_BEFORE_RELOGIN
+  val AuthToLocal = SaslConfigs.DEFAULT_AUTH_TO_LOCAL
+
 }
 
 object KafkaConfig {
@@ -316,6 +325,13 @@ object KafkaConfig {
   val SSLEndpointIdentificationAlgorithmProp = SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG
   val SSLClientAuthProp = SSLConfigs.SSL_CLIENT_AUTH_CONFIG
 
+  /** ********* SASL Configuration ****************/
+  val SaslKerberosServiceNameProp = SaslConfigs.SASL_KERBEROS_SERVICE_NAME
+  val SaslKerberosKinitCmdProp = SaslConfigs.SASL_KERBEROS_KINIT_CMD
+  val SaslKerberosTicketRenewWindowFactorProp = SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR
+  val SaslKerberosTicketRenewJitterProp = SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER
+  val SaslKerberosMinTimeBeforeReloginProp = SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN
+  val AuthToLocalProp = SaslConfigs.AUTH_TO_LOCAL
 
   /* Documentation */
   /** ********* Zookeeper Configuration ***********/
@@ -487,6 +503,14 @@ object KafkaConfig {
   val SSLEndpointIdentificationAlgorithmDoc = SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC
   val SSLClientAuthDoc = SSLConfigs.SSL_CLIENT_AUTH_DOC
 
+  /** ********* Sasl Configuration ****************/
+  val SaslKerberosServiceNameDoc = SaslConfigs.SASL_KERBEROS_SERVICE_NAME_DOC
+  val SaslKerberosKinitCmdDoc = SaslConfigs.SASL_KERBEROS_KINIT_CMD_DOC
+  val SaslKerberosTicketRenewWindowFactorDoc = SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR_DOC
+  val SaslKerberosTicketRenewJitterDoc = SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER_DOC
+  val SaslKerberosMinTimeBeforeReloginDoc = SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN_DOC
+  val AuthToLocalDoc = SaslConfigs.AUTH_TO_LOCAL_DOC
+
   private val configDef = {
     import ConfigDef.Importance._
     import ConfigDef.Range._
@@ -642,6 +666,15 @@ object KafkaConfig {
       .define(SSLTrustManagerAlgorithmProp, STRING, Defaults.SSLTrustManagerAlgorithm, MEDIUM, SSLTrustManagerAlgorithmDoc)
       .define(SSLClientAuthProp, STRING, Defaults.SSLClientAuth, in(Defaults.SSLClientAuthRequired, Defaults.SSLClientAuthRequested, Defaults.SSLClientAuthNone), MEDIUM, SSLClientAuthDoc)
       .define(SSLCipherSuitesProp, LIST, Defaults.SSLCipherSuites, MEDIUM, SSLCipherSuitesDoc)
+
+      /** ********* Sasl Configuration ****************/
+      .define(SaslKerberosServiceNameProp, STRING, MEDIUM, SaslKerberosServiceNameDoc, false)
+      .define(SaslKerberosKinitCmdProp, STRING, Defaults.SaslKerberosKinitCmd, MEDIUM, SaslKerberosKinitCmdDoc)
+      .define(SaslKerberosTicketRenewWindowFactorProp, DOUBLE, Defaults.SaslKerberosTicketRenewWindowFactor, MEDIUM, SaslKerberosTicketRenewWindowFactorDoc)
+      .define(SaslKerberosTicketRenewJitterProp, DOUBLE, Defaults.SaslKerberosTicketRenewJitter, MEDIUM, SaslKerberosTicketRenewJitterDoc)
+      .define(SaslKerberosMinTimeBeforeReloginProp, LONG, Defaults.SaslKerberosMinTimeBeforeRelogin, MEDIUM, SaslKerberosMinTimeBeforeReloginDoc)
+      .define(AuthToLocalProp, LIST, Defaults.AuthToLocal, MEDIUM, AuthToLocalDoc)
+
   }
 
   def configNames() = {
@@ -802,6 +835,14 @@ case class KafkaConfig (props: java.util.Map[_, _]) extends AbstractConfig(Kafka
   val sslClientAuth = getString(KafkaConfig.SSLClientAuthProp)
   val sslCipher = getList(KafkaConfig.SSLCipherSuitesProp)
 
+  /** ********* Sasl Configuration **************/
+  val saslKerberosServiceName = getString(KafkaConfig.SaslKerberosServiceNameProp)
+  val saslKerberosKinitCmd = getString(KafkaConfig.SaslKerberosKinitCmdProp)
+  val saslKerberosTicketRenewWindowFactor = getDouble(KafkaConfig.SaslKerberosTicketRenewWindowFactorProp)
+  val saslKerberosTicketRenewJitter = getDouble(KafkaConfig.SaslKerberosTicketRenewJitterProp)
+  val saslKerberosMinTimeBeforeRelogin = getLong(KafkaConfig.SaslKerberosMinTimeBeforeReloginProp)
+  val authToLocal = getList(KafkaConfig.AuthToLocalProp)
+
   /** ********* Quota Configuration **************/
   val producerQuotaBytesPerSecondDefault = getLong(KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp)
   val consumerQuotaBytesPerSecondDefault = getLong(KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp)
@@ -823,7 +864,7 @@ case class KafkaConfig (props: java.util.Map[_, _]) extends AbstractConfig(Kafka
     val millis: java.lang.Long =
       Option(getLong(KafkaConfig.LogRetentionTimeMillisProp)).getOrElse(
         Option(getInt(KafkaConfig.LogRetentionTimeMinutesProp)) match {
-          case Some(mins) =>  millisInMinute * mins
+          case Some(mins) => millisInMinute * mins
           case None => getInt(KafkaConfig.LogRetentionTimeHoursProp) * millisInHour
         })
 
@@ -927,20 +968,30 @@ case class KafkaConfig (props: java.util.Map[_, _]) extends AbstractConfig(Kafka
   def channelConfigs: java.util.Map[String, Object] = {
     val channelConfigs = new java.util.HashMap[String, Object]()
     import kafka.server.KafkaConfig._
-    channelConfigs.put(PrincipalBuilderClassProp, Class.forName(principalBuilderClass))
-    channelConfigs.put(SSLProtocolProp, sslProtocol)
-    channelConfigs.put(SSLEnabledProtocolsProp, sslEnabledProtocols)
-    channelConfigs.put(SSLKeystoreTypeProp, sslKeystoreType)
-    channelConfigs.put(SSLKeystoreLocationProp, sslKeystoreLocation)
-    channelConfigs.put(SSLKeystorePasswordProp, sslKeystorePassword)
-    channelConfigs.put(SSLKeyPasswordProp, sslKeyPassword)
-    channelConfigs.put(SSLTruststoreTypeProp, sslTruststoreType)
-    channelConfigs.put(SSLTruststoreLocationProp, sslTruststoreLocation)
-    channelConfigs.put(SSLTruststorePasswordProp, sslTruststorePassword)
-    channelConfigs.put(SSLKeyManagerAlgorithmProp, sslKeyManagerAlgorithm)
-    channelConfigs.put(SSLTrustManagerAlgorithmProp, sslTrustManagerAlgorithm)
-    channelConfigs.put(SSLClientAuthProp, sslClientAuth)
-    channelConfigs.put(SSLCipherSuitesProp, sslCipher)
+    Seq(
+      (PrincipalBuilderClassProp, Class.forName(principalBuilderClass)),
+      (SSLProtocolProp, sslProtocol),
+      (SSLEnabledProtocolsProp, sslEnabledProtocols),
+      (SSLKeystoreTypeProp, sslKeystoreType),
+      (SSLKeystoreLocationProp, sslKeystoreLocation),
+      (SSLKeystorePasswordProp, sslKeystorePassword),
+      (SSLKeyPasswordProp, sslKeyPassword),
+      (SSLTruststoreTypeProp, sslTruststoreType),
+      (SSLTruststoreLocationProp, sslTruststoreLocation),
+      (SSLTruststorePasswordProp, sslTruststorePassword),
+      (SSLKeyManagerAlgorithmProp, sslKeyManagerAlgorithm),
+      (SSLTrustManagerAlgorithmProp, sslTrustManagerAlgorithm),
+      (SSLClientAuthProp, sslClientAuth),
+      (SSLCipherSuitesProp, sslCipher),
+      (SaslKerberosServiceNameProp, saslKerberosServiceName),
+      (SaslKerberosKinitCmdProp, saslKerberosKinitCmd),
+      (SaslKerberosTicketRenewWindowFactorProp, saslKerberosTicketRenewWindowFactor),
+      (SaslKerberosTicketRenewJitterProp, saslKerberosTicketRenewJitter),
+      (SaslKerberosMinTimeBeforeReloginProp, saslKerberosMinTimeBeforeRelogin),
+      (AuthToLocalProp, authToLocal)
+    ).foreach { case (key, value) =>
+      if (value != null) channelConfigs.put(key, value)
+    }
     channelConfigs
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index f50c266..510957b 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -21,7 +21,7 @@ import java.net.{SocketTimeoutException}
 import java.util
 
 import kafka.admin._
-import kafka.api.{KAFKA_090, ApiVersion}
+import kafka.api.KAFKA_090
 import kafka.log.LogConfig
 import kafka.log.CleanerConfig
 import kafka.log.LogManager
@@ -34,17 +34,16 @@ import kafka.utils._
 import org.apache.kafka.clients.{ManualMetadataUpdater, ClientRequest, NetworkClient}
 import org.apache.kafka.common.Node
 import org.apache.kafka.common.metrics._
-import org.apache.kafka.common.network.{Selectable, ChannelBuilders, NetworkReceive, Selector}
+import org.apache.kafka.common.network.{LoginType, Selectable, ChannelBuilders, NetworkReceive, Selector, Mode}
 import org.apache.kafka.common.protocol.{Errors, ApiKeys, SecurityProtocol}
 import org.apache.kafka.common.metrics.{JmxReporter, Metrics}
 import org.apache.kafka.common.requests.{ControlledShutdownResponse, ControlledShutdownRequest, RequestSend}
 import org.apache.kafka.common.security.JaasUtils
-import org.apache.kafka.common.security.ssl.SSLFactory
 import org.apache.kafka.common.utils.AppInfoParser
 
 import scala.collection.mutable
 import scala.collection.JavaConverters._
-import org.I0Itec.zkclient.{ZkClient, ZkConnection}
+import org.I0Itec.zkclient.ZkClient
 import kafka.controller.{ControllerStats, KafkaController}
 import kafka.cluster.{EndPoint, Broker}
 import kafka.common.{ErrorMapping, InconsistentBrokerIdException, GenerateBrokerIdException}
@@ -308,7 +307,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
           "kafka-server-controlled-shutdown",
           Map.empty.asJava,
           false,
-          ChannelBuilders.create(config.interBrokerSecurityProtocol, SSLFactory.Mode.CLIENT, config.channelConfigs)
+          ChannelBuilders.create(config.interBrokerSecurityProtocol, Mode.CLIENT, LoginType.SERVER, config.channelConfigs)
         )
         new NetworkClient(
           selector,
@@ -491,7 +490,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
         if (config.interBrokerProtocolVersion.onOrAfter(KAFKA_090))
           networkClientControlledShutdown(config.controlledShutdownMaxRetries.intValue)
         else blockingChannelControlledShutdown(config.controlledShutdownMaxRetries.intValue)
-      
+
       if (!shutdownSucceeded)
         warn("Proceeding to do an unclean shutdown as all the controlled shutdown attempts failed")
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 0a17fd0..5aa817d 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -27,7 +27,7 @@ import kafka.api.KAFKA_090
 import kafka.common.{KafkaStorageException, TopicAndPartition}
 import ReplicaFetcherThread._
 import org.apache.kafka.clients.{ManualMetadataUpdater, NetworkClient, ClientRequest, ClientResponse}
-import org.apache.kafka.common.network.{Selectable, ChannelBuilders, NetworkReceive, Selector}
+import org.apache.kafka.common.network.{LoginType, Selectable, ChannelBuilders, NetworkReceive, Selector, Mode}
 import org.apache.kafka.common.requests.{ListOffsetResponse, FetchResponse, RequestSend, AbstractRequest, ListOffsetRequest}
 import org.apache.kafka.common.requests.{FetchRequest => JFetchRequest}
 import org.apache.kafka.common.{Node, TopicPartition}
@@ -74,7 +74,7 @@ class ReplicaFetcherThread(name: String,
       "replica-fetcher",
       Map("broker-id" -> sourceBroker.id.toString).asJava,
       false,
-      ChannelBuilders.create(brokerConfig.interBrokerSecurityProtocol, SSLFactory.Mode.CLIENT, brokerConfig.channelConfigs)
+      ChannelBuilders.create(brokerConfig.interBrokerSecurityProtocol, Mode.CLIENT, LoginType.SERVER, brokerConfig.channelConfigs)
     )
     new NetworkClient(
       selector,

http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/core/src/test/resources/kafka_jaas.conf
----------------------------------------------------------------------
diff --git a/core/src/test/resources/kafka_jaas.conf b/core/src/test/resources/kafka_jaas.conf
new file mode 100644
index 0000000..b097e26
--- /dev/null
+++ b/core/src/test/resources/kafka_jaas.conf
@@ -0,0 +1,29 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+  * License. You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+  * specific language governing permissions and limitations under the License.
+  */
+KafkaClient {
+	com.sun.security.auth.module.Krb5LoginModule required debug=true
+	useKeyTab=true
+	storeKey=true
+	serviceName="kafka"
+	keyTab="$keytab-location"
+	principal="client@EXAMPLE.COM";
+};
+
+KafkaServer {
+	com.sun.security.auth.module.Krb5LoginModule required debug=true
+	useKeyTab=true
+	storeKey=true
+	serviceName="kafka"
+	keyTab="$keytab-location"
+	principal="kafka/localhost@EXAMPLE.COM";
+};