You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2013/05/31 11:25:55 UTC

git commit: Adds SASL authentication in binary protocol v2

Updated Branches:
  refs/heads/trunk 6f5d8a56f -> 401b46bb3


Adds SASL authentication in binary protocol v2

patch by beobal; reviewed by slebresne for CASSANDRA-5545


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/401b46bb
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/401b46bb
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/401b46bb

Branch: refs/heads/trunk
Commit: 401b46bb3c5c27717d6f64af3aab349817485322
Parents: 6f5d8a5
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Fri May 31 10:59:53 2013 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Fri May 31 11:25:16 2013 +0200

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 doc/native_protocol_v2.spec                        |   67 ++++++++---
 .../cassandra/auth/ISaslAwareAuthenticator.java    |   41 +++++++
 .../cassandra/auth/PasswordAuthenticator.java      |   81 ++++++++++++-
 .../org/apache/cassandra/service/ClientState.java  |    6 +-
 .../apache/cassandra/thrift/CassandraServer.java   |    4 +-
 .../org/apache/cassandra/transport/Client.java     |   51 +++++++-
 .../org/apache/cassandra/transport/Message.java    |   32 +++---
 .../org/apache/cassandra/transport/Server.java     |   14 ++-
 .../cassandra/transport/ServerConnection.java      |   31 ++++-
 .../transport/messages/CredentialsMessage.java     |   10 ++-
 .../transport/messages/SaslChallenge.java          |   62 ++++++++++
 .../cassandra/transport/messages/SaslResponse.java |   94 +++++++++++++++
 13 files changed, 443 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/401b46bb/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b83b88a..abde4b3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -55,6 +55,7 @@
  * Binary protocol: allow batching already prepared statements (CASSANDRA-4693)
  * Allow preparing timestamp, ttl and limit in CQL3 queries (CASSANDRA-4450)
  * Support native link w/o JNA in Java7 (CASSANDRA-3734)
+ * Use SASL authentication in binary protocol v2 (CASSANDRA-5545)
 
 1.2.6
  * Scale hinted_handoff_throttle_in_kb to cluster size (CASSANDRA-5272)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/401b46bb/doc/native_protocol_v2.spec
----------------------------------------------------------------------
diff --git a/doc/native_protocol_v2.spec b/doc/native_protocol_v2.spec
index 20c0a80..92d6ebd 100644
--- a/doc/native_protocol_v2.spec
+++ b/doc/native_protocol_v2.spec
@@ -15,7 +15,7 @@ Table of Contents
   4. Messages
     4.1. Requests
       4.1.1. STARTUP
-      4.1.2. CREDENTIALS
+      4.1.2. AUTH_RESPONSE
       4.1.3. OPTIONS
       4.1.4. QUERY
       4.1.5. PREPARE
@@ -34,6 +34,7 @@ Table of Contents
         4.2.5.4. Prepared
         4.2.5.5. Schema_change
       4.2.6. EVENT
+      4.2.7. AUTH_CHALLENGE
   5. Compression
   6. Collection types
   7. Error codes
@@ -89,7 +90,7 @@ Table of Contents
   of version is used to define the direction of the message: 0 indicates a
   request, 1 indicates a responses. This can be useful for protocol analyzers to
   distinguish the nature of the packet from the direction which it is moving.
-  The rest of that byte is the protocol version (1 for the protocol defined in
+  The rest of that byte is the protocol version (2 for the protocol defined in
   this document). In other words, for this version of the protocol, version will
   have one of:
     0x02    Request frame for this protocol version
@@ -151,7 +152,6 @@ Table of Contents
     0x01    STARTUP
     0x02    READY
     0x03    AUTHENTICATE
-    0x04    CREDENTIALS
     0x05    OPTIONS
     0x06    SUPPORTED
     0x07    QUERY
@@ -161,9 +161,13 @@ Table of Contents
     0x0B    REGISTER
     0x0C    EVENT
     0x0D    BATCH
+    0x0E    AUTH_CHALLENGE
+    0x0F    AUTH_RESPONSE
 
   Messages are described in Section 4.
 
+  (Note that there is no 0x04 message in this version of the protocol)
+
 
 2.5. length
 
@@ -226,7 +230,7 @@ Table of Contents
 
   Initialize the connection. The server will respond by either a READY message
   (in which case the connection is ready for queries) or an AUTHENTICATE message
-  (in which case credentials will need to be provided using CREDENTIALS).
+  (in which case credentials will need to be provided using AUTH_RESPONSE).
 
   This must be the first message of the connection, except for OPTIONS that can
   be sent before to find out the options supported by the server. Once the
@@ -241,19 +245,23 @@ Table of Contents
       This is optional, if not specified no compression will be used.
 
 
-4.1.2. CREDENTIALS
+4.1.2. AUTH_RESPONSE
+
+  Answers a server authentication challenge.
 
-  Provides credentials information for the purpose of identification. This
-  message comes as a response to an AUTHENTICATE message from the server, but
-  can be use later in the communication to change the authentication
-  information.
+  Authentication in the protocol is SASL based. The server sends authentication
+  challenge (a bytes token) to which the client answer with this message. Those
+  exchanges continue until the server accepts the authentication by sending a
+  READY message after a client AUTH_RESPONSE. It is however that client that
+  initiate the exchange by sending an initial AUTH_RESPONSE in response to a
+  server AUTHENTICATE request.
 
-  The body is a list of key/value informations. It is a [short] n, followed by n
-  pair of [string]. These key/value pairs are passed as is to the Cassandra
-  IAuthenticator and thus the detail of which informations is needed depends on
-  that authenticator.
+  The body of this message is a single [bytes] token. The details of what this
+  token contains (and when it can be null/empty, if ever) depends on the actual
+  authenticator used.
 
-  The response to a CREDENTIALS is a READY message (or an ERROR message).
+  The response to a AUTH_RESPONSE is either a follow-up AUTH_CHALLENGE message,
+  a READY message or an ERROR message.
 
 
 4.1.3. OPTIONS
@@ -376,9 +384,18 @@ Table of Contents
 
 4.2.3. AUTHENTICATE
 
-  Indicates that the server require authentication. This will be sent following
-  a STARTUP message and must be answered by a CREDENTIALS message from the
-  client to provide authentication informations.
+  Indicates that the server require authentication, and which authentication
+  mechanism to use.
+
+  The authentication is SASL based and thus consist on a number of server
+  challenge (AUTH_CHALLENGE, Section 4.2.7) followed by client response
+  (AUTH_RESPONSE, Section 4.1.2). The Initial exchange is however boostrapped
+  by an initial client response. The details of that exchange (including how
+  much challenge-response pair are required) are specific to the authenticator
+  in use.
+
+  This will be sent following a STARTUP message if authentication is required
+  and must be answered by a AUTH_RESPONSE message from the client.
 
   The body consists of a single [string] indicating the full class name of the
   IAuthenticator in use.
@@ -548,6 +565,18 @@ Table of Contents
   should be enough), otherwise they may experience a connection refusal at
   first.
 
+4.2.7. AUTH_CHALLENGE
+
+  A server authentication challenge (see AUTH_RESPONSE (Section 4.1.2) for more
+  details).
+
+  The body of this message is a single [bytes] token. The details of what this
+  token contains (and when it can be null/empty, if ever) depends on the actual
+  authenticator used.
+
+  Clients are expected to answer the server challenge by an AUTH_RESPONSE
+  message.
+
 
 5. Compression
 
@@ -682,3 +711,7 @@ Table of Contents
     prepared; see Section 4.1.4.
   * A new BATCH message allows to batch a set of queries (prepared or not); see 
     Section 4.1.7.
+  * Authentication now uses SASL. Concretely, the CREDENTIALS message has been
+    removed and replaced by a server/client challenges/responses exchanges (done
+    through the new AUTH_RESPONSE/AUTH_CHALLENGE messages). See Section 4.2.3 for
+    details.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/401b46bb/src/java/org/apache/cassandra/auth/ISaslAwareAuthenticator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/auth/ISaslAwareAuthenticator.java b/src/java/org/apache/cassandra/auth/ISaslAwareAuthenticator.java
new file mode 100644
index 0000000..959506f
--- /dev/null
+++ b/src/java/org/apache/cassandra/auth/ISaslAwareAuthenticator.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.auth;
+
+import org.apache.cassandra.exceptions.AuthenticationException;
+
+public interface ISaslAwareAuthenticator extends IAuthenticator
+{
+    /**
+     * Provide a SaslAuthenticator to be used by the CQL binary protocol server. If
+     * the configured IAuthenticator requires authentication but does not implement this
+     * interface we refuse to start the binary protocol server as it will have no way
+     * of authenticating clients.
+     * @return SaslAuthenticator implementation
+     * (see {@link PasswordAuthenticator.PlainTextSaslAuthenticator})
+     */
+    SaslAuthenticator newAuthenticator();
+
+
+    public interface SaslAuthenticator
+    {
+        public byte[] evaluateResponse(byte[] clientResponse) throws AuthenticationException;
+        public boolean isComplete();
+        public AuthenticatedUser getAuthenticatedUser() throws AuthenticationException;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/401b46bb/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java b/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java
index bcbdd29..12dbdee 100644
--- a/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java
+++ b/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java
@@ -17,6 +17,9 @@
  */
 package org.apache.cassandra.auth;
 
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
@@ -47,7 +50,7 @@ import org.mindrot.jbcrypt.BCrypt;
  * that keeps credentials (usernames and bcrypt-hashed passwords)
  * internally in C* - in system_auth.credentials CQL3 table.
  */
-public class PasswordAuthenticator implements IAuthenticator
+public class PasswordAuthenticator implements ISaslAwareAuthenticator
 {
     private static final Logger logger = LoggerFactory.getLogger(PasswordAuthenticator.class);
 
@@ -196,6 +199,11 @@ public class PasswordAuthenticator implements IAuthenticator
         }
     }
 
+    public SaslAuthenticator newAuthenticator()
+    {
+        return new PlainTextSaslAuthenticator();
+    }
+
     private void setupCredentialsTable()
     {
         if (Schema.instance.getCFMetaData(Auth.AUTH_KS, CREDENTIALS_CF) == null)
@@ -256,4 +264,75 @@ public class PasswordAuthenticator implements IAuthenticator
         else
             return ConsistencyLevel.ONE;
     }
+
+    private class PlainTextSaslAuthenticator implements ISaslAwareAuthenticator.SaslAuthenticator
+    {
+        private static final byte NUL = 0;
+        private final Charset UTF8_CHARSET = Charset.forName("UTF-8");
+
+        private boolean complete = false;
+        private Map<String, String> credentials;
+
+        @Override
+        public byte[] evaluateResponse(byte[] clientResponse) throws AuthenticationException
+        {
+            credentials = decodeCredentials(clientResponse);
+            complete = true;
+            return null;
+        }
+
+        @Override
+        public boolean isComplete()
+        {
+            return complete;
+        }
+
+        @Override
+        public AuthenticatedUser getAuthenticatedUser() throws AuthenticationException
+        {
+            return authenticate(credentials);
+        }
+
+        /**
+         * SASL PLAIN mechanism specifies that credentials are encoded in a
+         * sequence of UTF-8 bytes, delimited by 0 (US-ASCII NUL).
+         * The form is : {code}authzId<NUL>authnId<NUL>password<NUL>{code}
+         * authzId is optional, and in fact we don't care about it here as we'll
+         * set the authzId to match the authnId (that is, there is no concept of
+         * a user being authorized to act on behalf of another).
+         *
+         * @param bytes encoded credentials string sent by the client
+         * @return map containing the username/password pairs in the form an IAuthenticator
+         * would expect
+         * @throws javax.security.sasl.SaslException
+         */
+        private Map<String, String> decodeCredentials(byte[] bytes) throws AuthenticationException
+        {
+            logger.debug("Decoding credentials from client token");
+            byte[] user = null;
+            byte[] pass = null;
+            int end = bytes.length;
+            for (int i = bytes.length - 1 ; i >= 0; i--)
+            {
+                if (bytes[i] == NUL)
+                {
+                    if (pass == null)
+                        pass = Arrays.copyOfRange(bytes, i + 1, end);
+                    else if (user == null)
+                        user = Arrays.copyOfRange(bytes, i + 1, end);
+                    end = i;
+                }
+            }
+
+            if (user == null)
+                throw new AuthenticationException("Authentication ID must not be null");
+            if (pass == null)
+                throw new AuthenticationException("Password must not be null");
+
+            Map<String, String> credentials = new HashMap<String, String>();
+            credentials.put(IAuthenticator.USERNAME_KEY, new String(user, UTF8_CHARSET));
+            credentials.put(IAuthenticator.PASSWORD_KEY, new String(pass, UTF8_CHARSET));
+            return credentials;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/401b46bb/src/java/org/apache/cassandra/service/ClientState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ClientState.java b/src/java/org/apache/cassandra/service/ClientState.java
index b6769a3..e911a3e 100644
--- a/src/java/org/apache/cassandra/service/ClientState.java
+++ b/src/java/org/apache/cassandra/service/ClientState.java
@@ -115,12 +115,10 @@ public class ClientState
     }
 
     /**
-     * Attempts to login this client with the given credentials map.
+     * Attempts to login the given user.
      */
-    public void login(Map<String, String> credentials) throws AuthenticationException
+    public void login(AuthenticatedUser user) throws AuthenticationException
     {
-        AuthenticatedUser user = DatabaseDescriptor.getAuthenticator().authenticate(credentials);
-
         if (!user.isAnonymous() && !Auth.isExistingUser(user.getName()))
            throw new AuthenticationException(String.format("User %s doesn't exist - create it with CREATE USER query first",
                                                            user.getName()));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/401b46bb/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index 870cdbd..878c42b 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -32,6 +32,7 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import org.apache.cassandra.auth.AuthenticatedUser;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -1439,7 +1440,8 @@ public class CassandraServer implements Cassandra.Iface
     {
         try
         {
-            state().login(auth_request.getCredentials());
+            AuthenticatedUser user = DatabaseDescriptor.getAuthenticator().authenticate(auth_request.getCredentials());
+            state().login(user);
         }
         catch (org.apache.cassandra.exceptions.AuthenticationException e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/401b46bb/src/java/org/apache/cassandra/transport/Client.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Client.java b/src/java/org/apache/cassandra/transport/Client.java
index cde8be5..13e1308 100644
--- a/src/java/org/apache/cassandra/transport/Client.java
+++ b/src/java/org/apache/cassandra/transport/Client.java
@@ -21,6 +21,7 @@ import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -30,6 +31,7 @@ import java.util.Map;
 
 import com.google.common.base.Splitter;
 
+import org.apache.cassandra.auth.IAuthenticator;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.marshal.Int32Type;
 import org.apache.cassandra.db.marshal.UTF8Type;
@@ -39,8 +41,10 @@ import org.apache.cassandra.transport.messages.OptionsMessage;
 import org.apache.cassandra.transport.messages.PrepareMessage;
 import org.apache.cassandra.transport.messages.QueryMessage;
 import org.apache.cassandra.transport.messages.RegisterMessage;
+import org.apache.cassandra.transport.messages.SaslResponse;
 import org.apache.cassandra.transport.messages.StartupMessage;
 import org.apache.cassandra.utils.Hex;
+
 import static org.apache.cassandra.config.EncryptionOptions.ClientEncryptionOptions;
 
 public class Client extends SimpleClient
@@ -154,16 +158,20 @@ public class Client extends SimpleClient
         }
         else if (msgType.equals("CREDENTIALS"))
         {
+            System.err.println("[WARN] CREDENTIALS command is deprecated, use AUTHENTICATE instead");
             CredentialsMessage msg = new CredentialsMessage();
-            while (iter.hasNext())
+            msg.credentials.putAll(readCredentials(iter));
+            return msg;
+        }
+        else if (msgType.equals("AUTHENTICATE"))
+        {
+            Map<String, String> credentials = readCredentials(iter);
+            if(!credentials.containsKey(IAuthenticator.USERNAME_KEY) || !credentials.containsKey(IAuthenticator.PASSWORD_KEY))
             {
-                String next = iter.next();
-                String[] kv = next.split("=");
-                if (kv.length != 2)
-                    return null;
-                msg.credentials.put(kv[0], kv[1]);
+                System.err.println("[ERROR] Authentication requires both 'username' and 'password'");
+                return null;
             }
-            return msg;
+            return new SaslResponse(encodeCredentialsForSasl(credentials));
         }
         else if (msgType.equals("REGISTER"))
         {
@@ -181,6 +189,35 @@ public class Client extends SimpleClient
         return null;
     }
 
+    private Map<String, String> readCredentials(Iterator<String> iter)
+    {
+        final Map<String, String> credentials = new HashMap<String, String>();
+        while (iter.hasNext())
+        {
+            String next = iter.next();
+            String[] kv = next.split("=");
+            if (kv.length != 2)
+            {
+                System.err.println("[ERROR] Default authentication requires username & password");
+                return null;
+            }
+            credentials.put(kv[0], kv[1]);
+        }
+        return credentials;
+    }
+
+    private byte[] encodeCredentialsForSasl(Map<String, String> credentials)
+    {
+        byte[] username = credentials.get(IAuthenticator.USERNAME_KEY).getBytes(Charset.forName("UTF-8"));
+        byte[] password = credentials.get(IAuthenticator.PASSWORD_KEY).getBytes(Charset.forName("UTF-8"));
+        byte[] initialResponse = new byte[username.length + password.length + 2];
+        initialResponse[0] = 0;
+        System.arraycopy(username, 0, initialResponse, 1, username.length);
+        initialResponse[username.length + 1] = 0;
+        System.arraycopy(password, 0, initialResponse, username.length + 2, password.length);
+        return initialResponse;
+    }
+
     public static void main(String[] args) throws Exception
     {
         // Print usage if no argument is specified.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/401b46bb/src/java/org/apache/cassandra/transport/Message.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Message.java b/src/java/org/apache/cassandra/transport/Message.java
index e57da51..9cb20e8 100644
--- a/src/java/org/apache/cassandra/transport/Message.java
+++ b/src/java/org/apache/cassandra/transport/Message.java
@@ -57,20 +57,22 @@ public abstract class Message
 
     public enum Type
     {
-        ERROR        (0,  Direction.RESPONSE, ErrorMessage.codec),
-        STARTUP      (1,  Direction.REQUEST,  StartupMessage.codec),
-        READY        (2,  Direction.RESPONSE, ReadyMessage.codec),
-        AUTHENTICATE (3,  Direction.RESPONSE, AuthenticateMessage.codec),
-        CREDENTIALS  (4,  Direction.REQUEST,  CredentialsMessage.codec),
-        OPTIONS      (5,  Direction.REQUEST,  OptionsMessage.codec),
-        SUPPORTED    (6,  Direction.RESPONSE, SupportedMessage.codec),
-        QUERY        (7,  Direction.REQUEST,  QueryMessage.codec),
-        RESULT       (8,  Direction.RESPONSE, ResultMessage.codec),
-        PREPARE      (9,  Direction.REQUEST,  PrepareMessage.codec),
-        EXECUTE      (10, Direction.REQUEST,  ExecuteMessage.codec),
-        REGISTER     (11, Direction.REQUEST,  RegisterMessage.codec),
-        EVENT        (12, Direction.RESPONSE, EventMessage.codec),
-        BATCH        (13, Direction.REQUEST,  BatchMessage.codec);
+        ERROR          (0,  Direction.RESPONSE, ErrorMessage.codec),
+        STARTUP        (1,  Direction.REQUEST,  StartupMessage.codec),
+        READY          (2,  Direction.RESPONSE, ReadyMessage.codec),
+        AUTHENTICATE   (3,  Direction.RESPONSE, AuthenticateMessage.codec),
+        CREDENTIALS    (4,  Direction.REQUEST,  CredentialsMessage.codec),
+        OPTIONS        (5,  Direction.REQUEST,  OptionsMessage.codec),
+        SUPPORTED      (6,  Direction.RESPONSE, SupportedMessage.codec),
+        QUERY          (7,  Direction.REQUEST,  QueryMessage.codec),
+        RESULT         (8,  Direction.RESPONSE, ResultMessage.codec),
+        PREPARE        (9,  Direction.REQUEST,  PrepareMessage.codec),
+        EXECUTE        (10, Direction.REQUEST,  ExecuteMessage.codec),
+        REGISTER       (11, Direction.REQUEST,  RegisterMessage.codec),
+        EVENT          (12, Direction.RESPONSE, EventMessage.codec),
+        BATCH          (13, Direction.REQUEST,  BatchMessage.codec),
+        SASL_CHALLENGE (14, Direction.RESPONSE, SaslChallenge.codec),
+        SASL_RESPONSE  (15, Direction.REQUEST,  SaslResponse.codec);
 
         public final int opcode;
         public final Direction direction;
@@ -295,7 +297,7 @@ public abstract class Message
             {
                 assert request.connection() instanceof ServerConnection;
                 ServerConnection connection = (ServerConnection)request.connection();
-                connection.validateNewMessage(request.type);
+                connection.validateNewMessage(request.type, request.getVersion());
 
                 logger.debug("Received: {}, v={}", request, request.getVersion());
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/401b46bb/src/java/org/apache/cassandra/transport/Server.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java
index 51a90e8..ef97e0b 100644
--- a/src/java/org/apache/cassandra/transport/Server.java
+++ b/src/java/org/apache/cassandra/transport/Server.java
@@ -30,6 +30,8 @@ import javax.net.ssl.SSLEngine;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.auth.IAuthenticator;
+import org.apache.cassandra.auth.ISaslAwareAuthenticator;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.EncryptionOptions;
 import org.apache.cassandra.security.SSLFactory;
@@ -112,6 +114,16 @@ public class Server implements CassandraDaemon.Server
 
     private void run()
     {
+        // Check that a SaslAuthenticator can be provided by the configured
+        // IAuthenticator. If not, don't start the server.
+        IAuthenticator authenticator = DatabaseDescriptor.getAuthenticator();
+        if (authenticator.requireAuthentication() && !(authenticator instanceof ISaslAwareAuthenticator))
+        {
+            logger.error("Not starting native transport as the configured IAuthenticator is not capable of SASL authentication");
+            isRunning.compareAndSet(true, false);
+            return;
+        }
+
         // Configure the server.
         executionHandler = new ExecutionHandler(new RequestThreadPoolExecutor());
         factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
@@ -223,7 +235,7 @@ public class Server implements CassandraDaemon.Server
             pipeline.addLast("dispatcher", dispatcher);
 
             return pipeline;
-      }
+        }
     }
 
     private static class SecurePipelineFactory extends PipelineFactory

http://git-wip-us.apache.org/repos/asf/cassandra/blob/401b46bb/src/java/org/apache/cassandra/transport/ServerConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/ServerConnection.java b/src/java/org/apache/cassandra/transport/ServerConnection.java
index 591a9c2..3e55dcb 100644
--- a/src/java/org/apache/cassandra/transport/ServerConnection.java
+++ b/src/java/org/apache/cassandra/transport/ServerConnection.java
@@ -19,9 +19,13 @@ package org.apache.cassandra.transport;
 
 import java.util.concurrent.ConcurrentMap;
 
+import org.apache.cassandra.auth.IAuthenticator;
+import org.apache.cassandra.auth.ISaslAwareAuthenticator;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.QueryState;
 
+import org.apache.cassandra.auth.ISaslAwareAuthenticator.SaslAuthenticator;
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
 public class ServerConnection extends Connection
@@ -36,6 +40,7 @@ public class ServerConnection extends Connection
 
     private enum State { UNINITIALIZED, AUTHENTICATION, READY; }
 
+    private volatile SaslAuthenticator saslAuthenticator;
     private final ClientState clientState;
     private volatile State state;
 
@@ -61,7 +66,7 @@ public class ServerConnection extends Connection
         return qState;
     }
 
-    public void validateNewMessage(Message.Type type)
+    public void validateNewMessage(Message.Type type, int version)
     {
         switch (state)
         {
@@ -70,8 +75,9 @@ public class ServerConnection extends Connection
                     throw new ProtocolException(String.format("Unexpected message %s, expecting STARTUP or OPTIONS", type));
                 break;
             case AUTHENTICATION:
-                if (type != Message.Type.CREDENTIALS)
-                    throw new ProtocolException(String.format("Unexpected message %s, needs authentication through CREDENTIALS message", type));
+                // Support both SASL auth from protocol v2 and the older style Credentials auth from v1
+                if (type != Message.Type.SASL_RESPONSE && type != Message.Type.CREDENTIALS)
+                    throw new ProtocolException(String.format("Unexpected message %s, expecting %s", type, version == 1 ? "CREDENTIALS" : "SASL_RESPONSE"));
                 break;
             case READY:
                 if (type == Message.Type.STARTUP)
@@ -96,13 +102,30 @@ public class ServerConnection extends Connection
                 }
                 break;
             case AUTHENTICATION:
-                assert requestType == Message.Type.CREDENTIALS;
+                // Support both SASL auth from protocol v2 and the older style Credentials auth from v1
+                assert requestType == Message.Type.SASL_RESPONSE || requestType == Message.Type.CREDENTIALS;
+
                 if (responseType == Message.Type.READY)
+                {
                     state = State.READY;
+                    // we won't use the authenticator again, null it so that it can be GC'd
+                    saslAuthenticator = null;
+                }
             case READY:
                 break;
             default:
                 throw new AssertionError();
         }
     }
+
+    public SaslAuthenticator getAuthenticator()
+    {
+        if (saslAuthenticator == null)
+        {
+            IAuthenticator authenticator = DatabaseDescriptor.getAuthenticator();
+            assert authenticator instanceof ISaslAwareAuthenticator : "Configured IAuthenticator does not support SASL authentication";
+            saslAuthenticator = ((ISaslAwareAuthenticator)authenticator).newAuthenticator();
+        }
+        return saslAuthenticator;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/401b46bb/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java b/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java
index 512675e..ceff5ba 100644
--- a/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java
@@ -20,6 +20,9 @@ package org.apache.cassandra.transport.messages;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.cassandra.auth.AuthenticatedUser;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.transport.ProtocolException;
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.buffer.ChannelBuffers;
 
@@ -37,6 +40,10 @@ public class CredentialsMessage extends Message.Request
     {
         public CredentialsMessage decode(ChannelBuffer body, int version)
         {
+            if (version > 1)
+                throw new ProtocolException("Legacy credentials authentication is not supported in " +
+                        "protocol versions > 1. Please use SASL authentication via a SaslResponse message");
+
             CredentialsMessage msg = new CredentialsMessage();
             int count = body.readUnsignedShort();
             for (int i = 0; i < count; i++)
@@ -78,7 +85,8 @@ public class CredentialsMessage extends Message.Request
     {
         try
         {
-            state.getClientState().login(credentials);
+            AuthenticatedUser user = DatabaseDescriptor.getAuthenticator().authenticate(credentials);
+            state.getClientState().login(user);
             return new ReadyMessage();
         }
         catch (AuthenticationException e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/401b46bb/src/java/org/apache/cassandra/transport/messages/SaslChallenge.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/SaslChallenge.java b/src/java/org/apache/cassandra/transport/messages/SaslChallenge.java
new file mode 100644
index 0000000..f95596d
--- /dev/null
+++ b/src/java/org/apache/cassandra/transport/messages/SaslChallenge.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.transport.messages;
+
+import org.apache.cassandra.transport.CBUtil;
+import org.apache.cassandra.transport.Message;
+import org.jboss.netty.buffer.ChannelBuffer;
+
+/**
+ * SASL challenge sent from client to server
+ */
+public class SaslChallenge extends Message.Response
+{
+    public static final Message.Codec<SaslChallenge> codec = new Message.Codec<SaslChallenge>()
+    {
+        @Override
+        public SaslChallenge decode(ChannelBuffer body, int version)
+        {
+            return new SaslChallenge(CBUtil.readBytes(body));
+        }
+
+        @Override
+        public ChannelBuffer encode(SaslChallenge challenge)
+        {
+            return CBUtil.bytesToCB(challenge.token);
+        }
+    };
+
+    private byte[] token;
+
+    public SaslChallenge(byte[] token)
+    {
+        super(Message.Type.SASL_CHALLENGE);
+        this.token = token;
+    }
+
+    @Override
+    public ChannelBuffer encode()
+    {
+        return codec.encode(this);
+    }
+
+    public byte[] getToken()
+    {
+        return token;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/401b46bb/src/java/org/apache/cassandra/transport/messages/SaslResponse.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/SaslResponse.java b/src/java/org/apache/cassandra/transport/messages/SaslResponse.java
new file mode 100644
index 0000000..604788e
--- /dev/null
+++ b/src/java/org/apache/cassandra/transport/messages/SaslResponse.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.transport.messages;
+
+import org.apache.cassandra.auth.AuthenticatedUser;
+import org.apache.cassandra.auth.ISaslAwareAuthenticator.SaslAuthenticator;
+import org.apache.cassandra.exceptions.AuthenticationException;
+import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.transport.CBUtil;
+import org.apache.cassandra.transport.Message;
+import org.apache.cassandra.transport.ProtocolException;
+import org.apache.cassandra.transport.ServerConnection;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+
+/**
+ * A SASL token message sent from client to server. Some SASL
+ * mechanisms & clients may send an initial token before
+ * receiving a challenge from the server.
+ */
+public class SaslResponse extends Message.Request
+{
+    public static final Message.Codec<SaslResponse> codec = new Message.Codec<SaslResponse>()
+    {
+        @Override
+        public SaslResponse decode(ChannelBuffer body, int version)
+        {
+            if (version == 1)
+                throw new ProtocolException("SASL Authentication is not supported in version 1 of the protocol");
+
+            return new SaslResponse(CBUtil.readBytes(body));
+        }
+
+        @Override
+        public ChannelBuffer encode(SaslResponse response)
+        {
+            return CBUtil.bytesToCB(response.token);
+        }
+    };
+
+    private byte[] token;
+
+    public SaslResponse(byte[] token)
+    {
+        super(Message.Type.SASL_RESPONSE);
+        this.token = token;
+    }
+
+    @Override
+    public ChannelBuffer encode()
+    {
+        return codec.encode(this);
+    }
+
+    @Override
+    public Response execute(QueryState queryState)
+    {
+        try
+        {
+            SaslAuthenticator authenticator = ((ServerConnection) connection).getAuthenticator();
+            byte[] challenge = authenticator.evaluateResponse(token);
+            if (authenticator.isComplete())
+            {
+                AuthenticatedUser user = authenticator.getAuthenticatedUser();
+                queryState.getClientState().login(user);
+                // authentication is complete, send a ready message to the client
+                return new ReadyMessage();
+            }
+            else
+            {
+                return new SaslChallenge(challenge);
+            }
+        }
+        catch (AuthenticationException e)
+        {
+            return ErrorMessage.fromException(e);
+        }
+    }
+}