You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by am...@apache.org on 2017/05/20 23:52:47 UTC

[6/6] drill git commit: DRILL-4335: Apache Drill should support network encryption.

DRILL-4335: Apache Drill should support network encryption.

NOTE: This pull request provides support for on-wire encryption using SASL framework. Communication channel covered is:
      1) C++ Drill Client and Drillbit channel.

close apache/drill#809


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/d11aba2e
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/d11aba2e
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/d11aba2e

Branch: refs/heads/master
Commit: d11aba2e55323bb5a6a9deb5bb09fd87470dcedf
Parents: ce8bbc0
Author: Sorabh Hamirwasia <sh...@maprtech.com>
Authored: Mon Mar 6 00:19:50 2017 -0800
Committer: Aman Sinha <as...@maprtech.com>
Committed: Sat May 20 16:17:19 2017 -0700

----------------------------------------------------------------------
 .../native/client/example/querySubmitter.cpp    |  25 +-
 contrib/native/client/readme.macos              |   6 +-
 .../native/client/src/clientlib/drillClient.cpp |   1 +
 .../client/src/clientlib/drillClientImpl.cpp    | 694 ++++++++++++++++---
 .../client/src/clientlib/drillClientImpl.hpp    |  41 +-
 .../native/client/src/clientlib/rpcMessage.cpp  |   2 +-
 .../native/client/src/clientlib/rpcMessage.hpp  |   1 -
 .../src/clientlib/saslAuthenticatorImpl.cpp     | 119 +++-
 .../src/clientlib/saslAuthenticatorImpl.hpp     |  12 +-
 contrib/native/client/src/clientlib/utils.cpp   |  49 ++
 contrib/native/client/src/clientlib/utils.hpp   |  32 +
 .../native/client/src/include/drill/common.hpp  |   2 +
 contrib/native/client/src/protobuf/User.pb.cc   | 463 ++++++++-----
 contrib/native/client/src/protobuf/User.pb.h    |  71 +-
 14 files changed, 1194 insertions(+), 324 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/d11aba2e/contrib/native/client/example/querySubmitter.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/example/querySubmitter.cpp b/contrib/native/client/example/querySubmitter.cpp
index 5990897..47e55de 100644
--- a/contrib/native/client/example/querySubmitter.cpp
+++ b/contrib/native/client/example/querySubmitter.cpp
@@ -1,3 +1,4 @@
+
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -24,7 +25,7 @@
 #include <boost/algorithm/string/join.hpp>
 #include "drill/drillc.hpp"
 
-int nOptions=15;
+int nOptions=19;
 
 struct Option{
     char name[32];
@@ -45,7 +46,11 @@ struct Option{
     {"heartbeatFrequency", "Heartbeat frequency (second). Disabled if set to 0.", false},
     {"user", "Username", false},
     {"password", "Password", false},
-    {"saslPluginPath", "Path to where SASL plugins are installed", false}
+    {"saslPluginPath", "Path to where SASL plugins are installed", false},
+    {"service_host", "Service host for Kerberos", false},
+    {"service_name", "Service name for Kerberos", false},
+    {"auth", "Authentication mechanism to use", false},
+    {"sasl_encrypt", "Negotiate for encrypted connection", false}
 };
 
 std::map<std::string, std::string> qsOptionValues;
@@ -295,6 +300,10 @@ int main(int argc, char* argv[]) {
         std::string user=qsOptionValues["user"];
         std::string password=qsOptionValues["password"];
         std::string saslPluginPath=qsOptionValues["saslPluginPath"];
+        std::string sasl_encrypt=qsOptionValues["sasl_encrypt"];
+        std::string serviceHost=qsOptionValues["service_host"];
+        std::string serviceName=qsOptionValues["service_name"];
+        std::string auth=qsOptionValues["auth"];
 
         Drill::QueryType type;
 
@@ -371,6 +380,18 @@ int main(int argc, char* argv[]) {
         if(password.length()>0){
             props.setProperty(USERPROP_PASSWORD, password);
         }
+        if(sasl_encrypt.length()>0){
+            props.setProperty(USERPROP_SASL_ENCRYPT, sasl_encrypt);
+        }
+        if(serviceHost.length()>0){
+            props.setProperty(USERPROP_SERVICE_HOST, serviceHost);
+        }
+        if(serviceName.length()>0){
+            props.setProperty(USERPROP_SERVICE_NAME, serviceName);
+        }
+        if(auth.length()>0){
+            props.setProperty(USERPROP_AUTH_MECHANISM, auth);
+        }
 
         if(client.connect(connectStr.c_str(), &props)!=Drill::CONN_SUCCESS){
             std::cerr<< "Failed to connect with error: "<< client.getError() << " (Using:"<<connectStr<<")"<<std::endl;

http://git-wip-us.apache.org/repos/asf/drill/blob/d11aba2e/contrib/native/client/readme.macos
----------------------------------------------------------------------
diff --git a/contrib/native/client/readme.macos b/contrib/native/client/readme.macos
index 4785e87..eee017e 100644
--- a/contrib/native/client/readme.macos
+++ b/contrib/native/client/readme.macos
@@ -35,6 +35,8 @@ Install Prerequisites
   or use brew to install 
   $> brew install cmake
 
+2.0) Install cppunit 
+  $> brew install cppunit 
 
 2.1) Install protobuf 2.5.0 (or higher)
   $> brew install protobuf
@@ -54,7 +56,7 @@ Install Prerequisites
 When changes have been introduced to the protocol module, you might need to refresh the protobuf C++ source files too.
   $> cd DRILL_DIR/contrib/native/client
   $> mkdir build
-  $> cd build && cmake3 -G "XCode" -D CMAKE_BUILD_TYPE=Debug ..
+  $> cd build && cmake -G "Xcode" -D CMAKE_BUILD_TYPE=Debug ..
   $> xcodebuild -project drillclient.xcodeproj -configuration ${BUILDTYPE} -target fixProtobufs
   $> xcodebuild -project drillclient.xcodeproj -configuration ${BUILDTYPE} -target cpProtobufs
 
@@ -64,7 +66,7 @@ Build drill client
 -------------------
   $> cd DRILL_DIR/contrib/native/client
   $> mkdir build
-  $> cd build && cmake3 -G "XCode" -D CMAKE_BUILD_TYPE=Debug ..
+  $> cd build && cmake -G "Xcode" -D CMAKE_BUILD_TYPE=Debug ..
   $> xcodebuild -project drillclient.xcodeproj -configuration ${BUILDTYPE} -target ALL_BUILD
 
 

http://git-wip-us.apache.org/repos/asf/drill/blob/d11aba2e/contrib/native/client/src/clientlib/drillClient.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/drillClient.cpp b/contrib/native/client/src/clientlib/drillClient.cpp
index 7000272..8eb909b 100644
--- a/contrib/native/client/src/clientlib/drillClient.cpp
+++ b/contrib/native/client/src/clientlib/drillClient.cpp
@@ -181,6 +181,7 @@ const std::map<std::string, uint32_t>  DrillUserProperties::USER_PROPERTIES=boos
     ( USERPROP_USESSL,      USERPROP_FLAGS_BOOLEAN|USERPROP_FLAGS_SSLPROP)
     ( USERPROP_FILEPATH,    USERPROP_FLAGS_STRING|USERPROP_FLAGS_SSLPROP|USERPROP_FLAGS_FILEPATH)
     ( USERPROP_FILENAME,    USERPROP_FLAGS_STRING|USERPROP_FLAGS_SSLPROP|USERPROP_FLAGS_FILENAME)
+    ( USERPROP_SASL_ENCRYPT,  USERPROP_FLAGS_STRING)
 ;
 
 bool DrillUserProperties::validate(std::string& err){

http://git-wip-us.apache.org/repos/asf/drill/blob/d11aba2e/contrib/native/client/src/clientlib/drillClientImpl.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/drillClientImpl.cpp b/contrib/native/client/src/clientlib/drillClientImpl.cpp
index 30a354e..0dee309 100644
--- a/contrib/native/client/src/clientlib/drillClientImpl.cpp
+++ b/contrib/native/client/src/clientlib/drillClientImpl.cpp
@@ -30,7 +30,6 @@
 #include <boost/lexical_cast.hpp>
 #include <boost/thread.hpp>
 
-
 #include "drill/drillClient.hpp"
 #include "drill/fieldmeta.hpp"
 #include "drill/recordBatch.hpp"
@@ -193,7 +192,7 @@ connectionStatus_t DrillClientImpl::sendHeartbeat(){
     boost::lock_guard<boost::mutex> prLock(this->m_prMutex);
     boost::lock_guard<boost::mutex> lock(m_dcMutex);
     DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Heartbeat sent." << std::endl;)
-    status=sendSync(heartbeatMsg);
+    status=sendSyncCommon(heartbeatMsg);
     status=status==CONN_SUCCESS?status:CONN_DEAD;
     //If the server sends responses to a heartbeat, we need to increment the pending requests counter.
     if(m_pendingRequests++==0){
@@ -233,18 +232,125 @@ void DrillClientImpl::Close() {
     shutdownSocket();
 }
 
+/*
+ * Write bytesToWrite length data bytes pointed by dataPtr. It handles EINTR error
+ * occurred during write_some sys call and does a retry on that.
+ *
+ * Parameters:
+ *      dataPtr      - in param   - Pointer to data bytes to write on socket.
+ *      bytesToWrite - in param   - Length of data bytes to write from dataPtr.
+ *      errorCode    -  out param - Error code set by boost.
+ */
+void DrillClientImpl::doWriteToSocket(const char* dataPtr, size_t bytesToWrite,
+                                        boost::system::error_code& errorCode) {
+    if(0 == bytesToWrite) {
+        return;
+    }
+
+    // Write all the bytes to socket. In case of error when all bytes are not successfully written
+    // proper errorCode will be set.
+    while(1) {
+        size_t bytesWritten = m_socket.write_some(boost::asio::buffer(dataPtr, bytesToWrite), errorCode);
+
+        // Update the state
+        bytesToWrite -= bytesWritten;
+        dataPtr += bytesWritten;
+
+        if(EINTR != errorCode.value()) break;
+
+        // Check if all the data is written then break from loop
+        if(0 == bytesToWrite) break;
+    }
+}
 
-connectionStatus_t DrillClientImpl::sendSync(rpc::OutBoundRpcMessage& msg){
+/*
+ * Common wrapper to take care of sending both plain or encrypted message. It creates a send buffer from an
+ * OutboundRPCMessage and then call the send handler pointing to either sendSyncPlain or sendSyncEncrypted
+ *
+ * Return:
+ *  connectionStatus_t  -   CONN_SUCCESS - In case of successful send
+ *                      -   CONN_FAILURE - In case of failure to send
+ */
+connectionStatus_t DrillClientImpl::sendSyncCommon(rpc::OutBoundRpcMessage& msg) {
     encode(m_wbuf, msg);
+    return (this->*m_fpCurrentSendHandler)();
+}
+
+/*
+ * Send handler for sending plain messages over wire
+ *
+ * Return:
+ *  connectionStatus_t  -   CONN_SUCCESS - In case of successful send
+ *                      -   CONN_FAILURE - In case of failure to send
+ */
+connectionStatus_t DrillClientImpl::sendSyncPlain(){
+
     boost::system::error_code ec;
-    size_t s=m_socket.write_some(boost::asio::buffer(m_wbuf), ec);
-    if(!ec && s!=0){
+    doWriteToSocket(reinterpret_cast<char*>(m_wbuf.data()), m_wbuf.size(), ec);
+
+    if(!ec) {
         return CONN_SUCCESS;
-    }else{
+    } else {
         return handleConnError(CONN_FAILURE, getMessage(ERR_CONN_WFAIL, ec.message().c_str()));
     }
 }
 
+/*
+ * Send handler for sending encrypted messages over wire. It encrypts the send buffer using wrap api provided by
+ * saslAuthenticatorImpl and then transmit the encrypted bytes over wire.
+ *
+ * Return:
+ *  connectionStatus_t  -   CONN_SUCCESS - In case of successful send
+ *                      -   CONN_FAILURE - In case of failure to send
+ */
+connectionStatus_t DrillClientImpl::sendSyncEncrypted() {
+
+    boost::system::error_code ec;
+
+    // Encoded message is encrypted into chunks of size <= WrapSizeLimit. Each encrypted chunk along with
+    // its encrypted length in network order (added by Cyrus-SASL plugin) is sent over wire.
+    const int wrapChunkSize = m_encryptionCtxt.getWrapSizeLimit();
+    int lengthToEncrypt = m_wbuf.size();
+
+    int currentChunkLen = std::min(wrapChunkSize, lengthToEncrypt);
+    uint32_t currentChunkOffset = 0;
+    std::stringstream errorMsg;
+
+    // Encrypt and send each chunk
+    while(lengthToEncrypt != 0) {
+        const char* wrappedChunk = NULL;
+        uint32_t wrappedLen = 0;
+        const int wrapResult = m_saslAuthenticator->wrap(reinterpret_cast<const char*>(m_wbuf.data() + currentChunkOffset),
+                                                   currentChunkLen, &wrappedChunk, wrappedLen);
+        if(SASL_OK != wrapResult) {
+            errorMsg << "Sasl wrap failed while encrypting chunk of length: " << currentChunkLen << " , EncodeError: "
+                     << wrapResult;
+            DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::sendSyncEncrypted - " << errorMsg.str()
+                                              << " ,ChunkOffset: " << currentChunkOffset << ", Message Len: " << m_wbuf.size()
+                                              << ", Closing connection.";)
+            return handleConnError(CONN_FAILURE, getMessage(ERR_CONN_WFAIL, errorMsg.str().c_str()));
+        }
+
+        // Send the encrypted chunk.
+        doWriteToSocket(wrappedChunk, wrappedLen, ec);
+
+        if(ec) {
+            errorMsg << "Failure while sending encrypted chunk. Error: " << ec.message().c_str();
+            DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::sendSyncEncrypted - " << errorMsg.str()
+                                              << ", Chunk Length: " << currentChunkLen << ", ChunkOffset:" << currentChunkOffset
+                                              << ", Message Len: " << m_wbuf.size() << ", Closing connection.";)
+            return handleConnError(CONN_FAILURE, getMessage(ERR_CONN_WFAIL, errorMsg.str().c_str()));
+        }
+
+        // Update variables after sending each encrypted chunk
+        lengthToEncrypt -= currentChunkLen;
+        currentChunkOffset += currentChunkLen;
+        currentChunkLen = std::min(wrapChunkSize, lengthToEncrypt);
+    }
+
+    return CONN_SUCCESS;
+}
+
 connectionStatus_t DrillClientImpl::recvHandshake(){
     if(m_rbuf==NULL){
         m_rbuf = Utils::allocateBuffer(MAX_SOCK_RD_BUFSIZE);
@@ -289,7 +395,41 @@ connectionStatus_t DrillClientImpl::recvHandshake(){
     return CONN_SUCCESS;
 }
 
-void DrillClientImpl::handleHandshake(ByteBuf_t _buf,
+/*
+ * Read bytesToRead length data bytes from socket into inBuf. It handles EINTR error
+ * occurred during read_some sys call and does a retry on that.
+ *
+ * Parameters:
+ *      inBuf        - out param  - Pointer to buffer to read data into from socket.
+ *      bytesToRead  - in param   - Length of data bytes to read from socket.
+ *      errorCode    - out param  - Error code set by boost.
+ */
+void DrillClientImpl::doReadFromSocket(ByteBuf_t inBuf, size_t bytesToRead,
+                                       boost::system::error_code& errorCode) {
+
+    // Check if bytesToRead is zero
+    if(0 == bytesToRead) {
+        return;
+    }
+
+    // Read all the bytes. In case when all the bytes were not read the proper
+    // errorCode will be set.
+    while(1){
+        size_t dataBytesRead = m_socket.read_some(boost::asio::buffer(inBuf, bytesToRead),
+                                           errorCode);
+        // Update the state
+        bytesToRead -= dataBytesRead;
+        inBuf += dataBytesRead;
+
+        // Check if errorCode is EINTR then just retry otherwise break from loop
+        if(EINTR != errorCode.value()) break;
+
+        // Check if all the data is read then break from loop
+        if(0 == bytesToRead) break;
+    }
+}
+
+void DrillClientImpl::handleHandshake(ByteBuf_t inBuf,
         const boost::system::error_code& err,
         size_t bytes_transferred) {
     boost::system::error_code error=err;
@@ -299,21 +439,23 @@ void DrillClientImpl::handleHandshake(ByteBuf_t _buf,
     if(!error){
         rpc::InBoundRpcMessage msg;
         uint32_t length = 0;
-        std::size_t bytes_read = rpc::lengthDecode(m_rbuf, length);
+        std::size_t bytes_read = rpcLengthDecode(m_rbuf, length);
         if(length>0){
-            size_t leftover = LEN_PREFIX_BUFLEN - bytes_read;
-            ByteBuf_t b=m_rbuf + LEN_PREFIX_BUFLEN;
-            size_t bytesToRead=length - leftover;
-            while(1){
-                size_t dataBytesRead=m_socket.read_some(
-                        boost::asio::buffer(b, bytesToRead),
-                        error);
-                if(err) break;
-                DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Handshake Message: actual bytes read = " << dataBytesRead << std::endl;)
-                if(dataBytesRead==bytesToRead) break;
-                bytesToRead-=dataBytesRead;
-                b+=dataBytesRead;
+            const size_t leftover = LEN_PREFIX_BUFLEN - bytes_read;
+            const ByteBuf_t b = m_rbuf + LEN_PREFIX_BUFLEN;
+            const size_t bytesToRead=length - leftover;
+            doReadFromSocket(b, bytesToRead, error);
+
+            // Check if any error happen while reading the message bytes. If yes then return before decoding the Msg
+            if(error) {
+                DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleHandshake: ERR_CONN_RDFAIL. "
+                                                  << " Failed to read entire handshake message. with error: "
+                                                  << error.message().c_str() << "\n";)
+                handleConnError(CONN_FAILURE, getMessage(ERR_CONN_RDFAIL, "Failed to read entire handshake message"));
+                return;
             }
+
+            // Decode the bytes into a valid RPC Message
             if (!decode(m_rbuf+bytes_read, length, msg)) {
                 DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleHandshake: ERR_CONN_RDFAIL. Cannot decode handshake.\n";)
                 handleConnError(CONN_FAILURE, getMessage(ERR_CONN_RDFAIL, "Cannot decode handshake"));
@@ -340,6 +482,11 @@ void DrillClientImpl::handleHandshake(ByteBuf_t _buf,
             this->m_serverAuthMechanisms.push_back(mechanism);
         }
 
+        // Updated encryption context based on server response
+        this->m_encryptionCtxt.setEncryptionReqd(b2u.has_encrypted() && b2u.encrypted());
+        if(b2u.has_maxwrappedsize()) {
+            this->m_encryptionCtxt.setMaxWrappedSize(b2u.maxwrappedsize());
+        }
     }else{
         // boost error
         if(error==boost::asio::error::eof){ // Server broke off the connection
@@ -360,7 +507,8 @@ void DrillClientImpl::handleHShakeReadTimeout(const boost::system::error_code &
         if (m_deadlineTimer.expires_at() <= boost::asio::deadline_timer::traits_type::now()){
             // The deadline has passed.
             m_deadlineTimer.expires_at(boost::posix_time::pos_infin);
-            DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::HandleHShakeReadTimeout: Deadline timer expired; ERR_CONN_HSHAKETIMOUT.\n";)
+            DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::HandleHShakeReadTimeout: "
+                                              << "Deadline timer expired; ERR_CONN_HSHAKETIMOUT.\n";)
             handleConnError(CONN_HANDSHAKE_TIMEOUT, getMessage(ERR_CONN_HSHAKETIMOUT));
             m_io_service.stop();
             boost::system::error_code ignorederr;
@@ -370,6 +518,33 @@ void DrillClientImpl::handleHShakeReadTimeout(const boost::system::error_code &
     return;
 }
 
+/*
+ * Check's if client has explicitly expressed interest in encrypted connections only. It looks for USERPROP_SASL_ENCRYPT
+ * connection string property. If set to true then returns true else returns false
+ */
+bool DrillClientImpl::clientNeedsEncryption(const DrillUserProperties* userProperties) {
+    bool needsEncryption = false;
+    // check if userProperties is null
+    if(!userProperties) {
+        return needsEncryption;
+    }
+
+    // Loop through the property to find USERPROP_SASL_ENCRYPT and it's value
+    for (size_t i = 0; i < userProperties->size(); i++) {
+        const std::string key = userProperties->keyAt(i);
+        std::string value = userProperties->valueAt(i);
+
+        if(USERPROP_SASL_ENCRYPT == key) {
+            boost::algorithm::to_lower(value);
+
+            if(0 == value.compare("true")) {
+                needsEncryption = true;
+            }
+        }
+    }
+    return needsEncryption;
+}
+
 connectionStatus_t DrillClientImpl::validateHandshake(DrillUserProperties* properties){
 
     DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "validateHandShake\n";)
@@ -379,7 +554,7 @@ connectionStatus_t DrillClientImpl::validateHandshake(DrillUserProperties* prope
     u2b.set_rpc_version(DRILL_RPC_VERSION);
     u2b.set_support_listening(true);
     u2b.set_support_timeout(DrillClientConfig::getHeartbeatFrequency() > 0);
-    u2b.set_sasl_support(exec::user::SASL_AUTH);
+    u2b.set_sasl_support(exec::user::SASL_PRIVACY);
 
     // Adding version info
     exec::user::RpcEndpointInfos* infos = u2b.mutable_client_infos();
@@ -436,7 +611,7 @@ connectionStatus_t DrillClientImpl::validateHandshake(DrillUserProperties* prope
         uint64_t coordId = this->getNextCoordinationId();
 
         rpc::OutBoundRpcMessage out_msg(exec::rpc::REQUEST, exec::user::HANDSHAKE, coordId, &u2b);
-        sendSync(out_msg);
+        sendSyncCommon(out_msg);
         DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Sent handshake request message. Coordination id: " << coordId << "\n";)
     }
 
@@ -479,6 +654,13 @@ connectionStatus_t DrillClientImpl::validateHandshake(DrillUserProperties* prope
 }
 
 connectionStatus_t DrillClientImpl::handleAuthentication(const DrillUserProperties *userProperties) {
+
+    // Check if client needs encryption and server is configured for encryption or not before starting handshake
+    if(clientNeedsEncryption(userProperties) && !m_encryptionCtxt.isEncryptionReqd()) {
+        return handleConnError(CONN_AUTH_FAILED, "Client needs encryption but on server side encryption is disabled."
+                                                 " Please check connection parameters or contact administrator?");
+    }
+
     try {
         m_saslAuthenticator = new SaslAuthenticatorImpl(userProperties);
     } catch (std::runtime_error& e) {
@@ -495,26 +677,46 @@ connectionStatus_t DrillClientImpl::handleAuthentication(const DrillUserProperti
         }
     }
 
+    std::stringstream logMsg;
+    logMsg << "DrillClientImpl::handleAuthentication: Authentication failed. [Details: ";
+
     if (SASL_OK == m_saslResultCode) {
-        DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::handleAuthentication: Successfully authenticated!"
-                                          << std::endl;)
+        // Check the negotiated SSF value and change the handlers.
+        if(m_encryptionCtxt.isEncryptionReqd()) {
+            if(SASL_OK != m_saslAuthenticator->verifyAndUpdateSaslProps()) {
+                logMsg << m_encryptionCtxt << "]. Negotiated Parameter is invalid."
+                       << " Error: " << m_saslResultCode;
+                DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << logMsg.str() << std::endl;)
+                return handleConnError(CONN_AUTH_FAILED, logMsg.str().c_str());
+            }
+
+            // Successfully negotiated for encryption related security parameters.
+            // Start using Encrypt and Decrypt handlers.
+            m_fpCurrentSendHandler = &DrillClientImpl::sendSyncEncrypted;
+            m_fpCurrentReadMsgHandler = &DrillClientImpl::readAndDecryptMsg;
+        }
 
-        // in future, negotiated security layers are known here..
+        // Reset the errorMsg stream since this is success case.
+        logMsg.str(std::string());
+        logMsg << "DrillClientImpl::handleAuthentication: Successfully authenticated! [Details: "
+               << m_encryptionCtxt << " ]";
 
+        DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << logMsg.str() << std::endl;)
         m_io_service.reset();
         return CONN_SUCCESS;
     } else {
-        DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::handleAuthentication: Authentication failed: "
-                                          << m_saslResultCode << std::endl;)
+        logMsg << m_encryptionCtxt << ", Error: " << m_saslResultCode;
+        DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << logMsg.str() << std::endl;)
+
         // shuts down socket as well
-        return handleConnError(CONN_AUTH_FAILED, "Authentication failed. Check connection parameters?");
+        logMsg << "]. Check connection parameters?";
+        return handleConnError(CONN_AUTH_FAILED, logMsg.str().c_str());
     }
 }
 
 void DrillClientImpl::initiateAuthentication() {
     exec::shared::SaslMessage response;
-    m_saslResultCode = m_saslAuthenticator->init(m_serverAuthMechanisms, response);
-
+    m_saslResultCode = m_saslAuthenticator->init(m_serverAuthMechanisms, response, &m_encryptionCtxt);
 
     switch (m_saslResultCode) {
         case SASL_CONTINUE:
@@ -539,7 +741,7 @@ void DrillClientImpl::sendSaslResponse(const exec::shared::SaslMessage& response
     boost::lock_guard<boost::mutex> lock(m_dcMutex);
     const int32_t coordId = getNextCoordinationId();
     rpc::OutBoundRpcMessage msg(exec::rpc::REQUEST, exec::user::SASL_MESSAGE, coordId, &response);
-    sendSync(msg);
+    sendSyncCommon(msg);
     if (m_pendingRequests++ == 0) {
         getNextResult();
     }
@@ -768,23 +970,23 @@ Handle* DrillClientImpl::sendMsg(boost::function<Handle*(int32_t)> handleFactory
         phandle = handleFactory(coordId);
         this->m_queryHandles[coordId]=phandle;
 
-        connectionStatus_t cStatus=sendSync(out_msg);
+        connectionStatus_t cStatus = sendSyncCommon(out_msg);
         if(cStatus == CONN_SUCCESS){
             bool sendRequest=false;
 
             DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG)  << "Sent " << ::exec::user::RpcType_Name(type) << " request. " << "[" << m_connectedHost << "]"  << "Coordination id = " << coordId << std::endl;)
-                DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG)  << "Sent " << ::exec::user::RpcType_Name(type) <<  " Coordination id = " << coordId << " query: " << phandle->getQuery() << std::endl;)
+            DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG)  << "Sent " << ::exec::user::RpcType_Name(type) <<  " Coordination id = " << coordId << " query: " << phandle->getQuery() << std::endl;)
 
-                if(m_pendingRequests++==0){
-                    sendRequest=true;
-                }else{
-                    DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Queuing " << ::exec::user::RpcType_Name(type) <<  " request to server" << std::endl;)
-                        DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Number of pending requests = " << m_pendingRequests << std::endl;)
-                }
+            if(m_pendingRequests++==0){
+                sendRequest=true;
+            }else{
+                DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Queuing " << ::exec::user::RpcType_Name(type) <<  " request to server" << std::endl;)
+                DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Number of pending requests = " << m_pendingRequests << std::endl;)
+            }
             if(sendRequest){
                 DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Sending " << ::exec::user::RpcType_Name(type) <<  " request. Number of pending requests = "
                         << m_pendingRequests << std::endl;)
-                    getNextResult(); // async wait for results
+                getNextResult(); // async wait for results
             }
         }
 
@@ -854,76 +1056,319 @@ void DrillClientImpl::waitForResults(){
     }
 }
 
-status_t DrillClientImpl::readMsg(ByteBuf_t _buf,
-        AllocatedBufferPtr* allocatedBuffer,
-        rpc::InBoundRpcMessage& msg){
+/*
+ *  Decode the length of the message from bufWithLen and then read entire message from the socket.
+ *  Parameters:
+ *      bufWithLenField            - in  param  - buffer containing the length of the RPC message/encrypted chunk
+ *      bufferWithDataAndLenBytes  - out param  - buffer pointer which points to memory allocated in this function and has the
+ *                                                entire one RPC message / encrypted chunk along with the length of the message.
+ *                                                Memory for this buffer is released by caller.
+ *      lengthFieldLength          - out param  - bytes of bufWithLen which contains the length of the entire RPC message or
+ *                                                encrypted chunk
+ *      lengthDecodeHandler        - in  param  - function pointer with length decoder to use. For encrypted chunk we use
+ *                                                lengthDecode and for plain RPC message we use rpcLengthDecode.
+ *  Return:
+ *      status_t    - QRY_SUCCESS    - In case of success.
+ *                  - QRY_COMM_ERROR/QRY_INTERNAL_ERROR/QRY_CLIENT_OUTOFMEM - In cases of error.
+ */
+status_t DrillClientImpl::readLenBytesFromSocket(const ByteBuf_t bufWithLenField, AllocatedBufferPtr* bufferWithDataAndLenBytes,
+                                                 uint32_t& lengthFieldLength, lengthDecoder lengthDecodeHandler) {
+
+    uint32_t rmsgLen = 0;
+    boost::system::error_code error;
+    *bufferWithDataAndLenBytes = NULL;
+
+    // Decode the length field
+    lengthFieldLength = (this->*lengthDecodeHandler)(bufWithLenField, rmsgLen);
+
+    DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Length bytes = " << lengthFieldLength << std::endl;)
+    DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Msg Length = " << rmsgLen << std::endl;)
+
+    if(rmsgLen>0) {
+        const size_t leftover = LEN_PREFIX_BUFLEN - lengthFieldLength;
+
+        // Allocate a buffer for reading all the bytes in bufWithLen and length number of bytes.
+        const size_t bufferSizeWithLenBytes = rmsgLen + lengthFieldLength;
+        *bufferWithDataAndLenBytes = new AllocatedBuffer(bufferSizeWithLenBytes);
+
+        if(*bufferWithDataAndLenBytes == NULL) {
+            return handleQryError(QRY_CLIENT_OUTOFMEM, getMessage(ERR_QRY_OUTOFMEM), NULL);
+        }
+
+        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readLenBytesFromSocket: Allocated and locked buffer: [ "
+                                          << *bufferWithDataAndLenBytes << ", size = " << bufferSizeWithLenBytes << " ]\n";)
+
+        // Copy the memory of bufWithLen into bufferWithLenBytesSize
+        memcpy((*bufferWithDataAndLenBytes)->m_pBuffer, bufWithLenField, LEN_PREFIX_BUFLEN);
+        const size_t bytesToRead = rmsgLen - leftover;
+        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Copied bufWithLen into bufferWithLenBytes. "
+                                          << "Now reading data (rmsgLen - leftover) : " << bytesToRead
+                                          << std::endl;)
+
+        // Read the entire data left from socket and copy to currentBuffer.
+        const ByteBuf_t b = (*bufferWithDataAndLenBytes)->m_pBuffer + LEN_PREFIX_BUFLEN;
+        doReadFromSocket(b, bytesToRead, error);
+    } else {
+        return handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVREADLEN), NULL);
+    }
+
+    return error ? handleQryError(QRY_COMM_ERROR, getMessage(ERR_QRY_COMMERR, error.message().c_str()), NULL)
+                 : QRY_SUCCESS;
+}
+
+
+/*
+ *  Function to read entire RPC message from socket and decode it to InboundRpcMessage
+ *  Parameters:
+ *      inBuf           - in param  - Buffer containing the length bytes.
+ *      allocatedBuffer - out param - Buffer containing the length bytes and entire RPC message bytes.
+ *      msg             - out param - Decoded InBoundRpcMessage from the bytes in allocatedBuffer
+ *  Return:
+ *      status_t    - QRY_SUCCESS   - In case of success.
+ *                  - QRY_COMM_ERROR/QRY_INTERNAL_ERROR/QRY_CLIENT_OUTOFMEM - In cases of error.
+ */
+status_t DrillClientImpl::readMsg(const ByteBuf_t inBuf, AllocatedBufferPtr* allocatedBuffer,
+                                  rpc::InBoundRpcMessage& msg){
 
     DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readMsg: Read message from buffer "
-        <<  reinterpret_cast<int*>(_buf) << std::endl;)
-    size_t leftover=0;
-    uint32_t rmsgLen;
-    AllocatedBufferPtr currentBuffer;
-    *allocatedBuffer=NULL;
+                                      <<  reinterpret_cast<int*>(inBuf) << std::endl;)
+    *allocatedBuffer = NULL;
+    {
+        // We need to protect the readLength and read buffer, and the pending requests counter,
+        // but we don't have to keep the lock while we decode the rest of the buffer.
+        boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
+        uint32_t lengthFieldSize = 0;
+
+        // Read the message length and extract length size bytes to form InBoundRpcMessage
+        const status_t statusCode = readLenBytesFromSocket(inBuf, allocatedBuffer, lengthFieldSize,
+                                                           &DrillClientImpl::rpcLengthDecode);
+
+        // Check for error conditions
+        if(QRY_SUCCESS != statusCode) {
+            Utils::freeBuffer(inBuf, LEN_PREFIX_BUFLEN);
+            return statusCode;
+        }
+
+        // Get the message size
+        size_t msgLen = (*allocatedBuffer)->m_bufSize;
+
+        // Read data successfully, now let's try to decode the buffer and form a valid RPC message.
+        // allocatedBuffer also contains the length bytes which is not needed by decodes so skip that part of buffer.
+        // We have it since in case of encryption the unwrap function expects it
+        if (!decode((*allocatedBuffer)->m_pBuffer + lengthFieldSize, msgLen - lengthFieldSize, msg)) {
+            Utils::freeBuffer(inBuf, LEN_PREFIX_BUFLEN);
+            return handleQryError(QRY_COMM_ERROR, getMessage(ERR_QRY_COMMERR, "Cannot decode server message"), NULL);
+        }
+
+        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Successfully created a RPC message with Coordination id: "
+                                          << msg.m_coord_id << std::endl;)
+    }
+    DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readMsg: Free buffer "
+                                      <<  reinterpret_cast<int*>(inBuf) << std::endl;)
+    Utils::freeBuffer(inBuf, LEN_PREFIX_BUFLEN);
+    return QRY_SUCCESS;
+}
+
+
+/*
+ *  Read ENCRYPT_LEN_PREFIX_BUFLEN bytes to decode length of one complete encrypted chunk. The length bytes are expected
+ *  to be in network order. It is converted to host order and the value is stored in rmsgLen parameter.
+ *  Parameters:
+ *      inBuf   - in param  - ByteBuf_t containing atleast the length bytes.
+ *      rmsgLen - out param - Contain the decoded value of length.
+ *  Return:
+ *      size_t  - length bytes read to decode
+ */
+size_t DrillClientImpl::lengthDecode(const ByteBuf_t inBuf, uint32_t& rmsgLen) {
+    memcpy(&rmsgLen, inBuf, ENCRYPT_LEN_PREFIX_BUFLEN);
+    rmsgLen = ntohl(rmsgLen);
+    return ENCRYPT_LEN_PREFIX_BUFLEN;
+}
+
+/*
+ *  Wrapper which uses RPC message length decoder to get length of one complete RPC message from _buf.
+ *  Parameters:
+ *      inBuf   - in param  - ByteBuf_t containing atleast the length bytes.
+ *      rmsgLen - out param - Contain the decoded value of length.
+ *  Return:
+ *      size_t	- length bytes read to decode
+ */
+size_t DrillClientImpl::rpcLengthDecode(const ByteBuf_t inBuf, uint32_t& rmsgLen) {
+    return rpc::lengthDecode(inBuf, rmsgLen);
+}
+
+
+/*
+ *  Read all the encrypted chunk needed to form a complete RPC message. Read an entire chunk from network, decrypt it
+ *  and put in a buffer. The same process is repeated until the entire buffer to form a completed RPC message is read.
+ *  Parameters:
+ *      inBuf           - in param  - ByteBuf_t containing atleast the length bytes.
+ *      allocatedBuffer - out param - Buffer containing the entire RPC message bytes which is formed by reading all the
+ *                                    required encrypted chunk from network and decrypting each individual chunk. The
+ *                                    buffer memory is released by caller.
+.*      msg             - out param - InBoundRpcMessage formed from bytes in allocatedBuffer
+ *  Return:
+ *      status_t    - QRY_SUCCESS - In case of success.
+ *                  - QRY_COMM_ERROR/QRY_INTERNAL_ERROR/QRY_CLIENT_OUTOFMEM - In cases of error.
+ */
+status_t DrillClientImpl::readAndDecryptMsg(const ByteBuf_t inBuf, AllocatedBufferPtr* allocatedBuffer,
+                                            rpc::InBoundRpcMessage& msg) {
+
+    DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readAndDecryptMsg: Read message from buffer "
+                                      << reinterpret_cast<int*>(inBuf) << std::endl;)
+
+    size_t leftover = 0;
+    uint32_t rpcMsgLen = 0;
+    size_t bytes_read = 0;
+    uint32_t writeIndex = 0;
+    size_t bytesToRead = 0;
+
+    *allocatedBuffer = NULL;
+    boost::system::error_code error;
+    std::stringstream errorMsg;
+
     {
         // We need to protect the readLength and read buffer, and the pending requests counter,
         // but we don't have to keep the lock while we decode the rest of the buffer.
         boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
-        std::size_t bytes_read = rpc::lengthDecode(_buf, rmsgLen);
-        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "len bytes = " << bytes_read << std::endl;)
-        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "rmsgLen = " << rmsgLen << std::endl;)
-
-        if(rmsgLen>0){
-            leftover = LEN_PREFIX_BUFLEN - bytes_read;
-            // Allocate a buffer
-            currentBuffer=new AllocatedBuffer(rmsgLen);
-            DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readMsg: Allocated and locked buffer: [ "
-                << currentBuffer << ", size = " << rmsgLen << " ]\n";)
-            if(currentBuffer==NULL){
-                Utils::freeBuffer(_buf, LEN_PREFIX_BUFLEN);
-                return handleQryError(QRY_CLIENT_OUTOFMEM, getMessage(ERR_QRY_OUTOFMEM), NULL);
+
+        do{
+            AllocatedBufferPtr currentBuffer = NULL;
+            uint32_t lengthFieldSize = 0;
+            const status_t statusCode = readLenBytesFromSocket(inBuf, &currentBuffer, lengthFieldSize,
+                                                               &DrillClientImpl::lengthDecode);
+
+            if(QRY_SUCCESS != statusCode) {
+                Utils::freeBuffer(inBuf, LEN_PREFIX_BUFLEN);
+
+                // Release the buffer allocated to hold chunk
+                if(currentBuffer != NULL) {
+                    Utils::freeBuffer(currentBuffer->m_pBuffer, currentBuffer->m_bufSize);
+                    currentBuffer = NULL;
+                }
+                return statusCode;
             }
-            *allocatedBuffer=currentBuffer;
-            if(leftover){
-                memcpy(currentBuffer->m_pBuffer, _buf + bytes_read, leftover);
+
+            // read one chunk successfully. Let's try to decrypt the message
+            const char* unWrappedData = NULL;
+            uint32_t unWrappedLen = 0;
+            const int decryptResult = m_saslAuthenticator->unwrap(reinterpret_cast<const char*>(currentBuffer->m_pBuffer),
+                                                                  currentBuffer->m_bufSize, &unWrappedData, unWrappedLen);
+
+            if(SASL_OK != decryptResult) {
+
+                errorMsg << "Sasl unwrap failed for the buffer of size:" << currentBuffer->m_bufSize << " , Error: "
+                         << decryptResult;
+
+                DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::readAndDecryptMsg: "
+                                                  << errorMsg.str() << std::endl;)
+
+                Utils::freeBuffer(inBuf, LEN_PREFIX_BUFLEN);
+
+                // Release the buffer allocated to hold chunk
+                Utils::freeBuffer(currentBuffer->m_pBuffer, currentBuffer->m_bufSize);
+                currentBuffer = NULL;
+                return handleQryError(QRY_COMM_ERROR,
+                                      getMessage(ERR_QRY_COMMERR, errorMsg.str().c_str()), NULL);
             }
-            DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "reading data (rmsgLen - leftover) : "
-                << (rmsgLen - leftover) << std::endl;)
-            ByteBuf_t b=currentBuffer->m_pBuffer + leftover;
-            size_t bytesToRead=rmsgLen - leftover;
-            boost::system::error_code error;
-            while(1){
-                size_t dataBytesRead=this->m_socket.read_some(
-                        boost::asio::buffer(b, bytesToRead),
-                        error);
-                if(error) break;
-                DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Data Message: actual bytes read = " << dataBytesRead << std::endl;)
-                if(dataBytesRead==bytesToRead) break;
-                bytesToRead-=dataBytesRead;
-                b+=dataBytesRead;
+
+            // Check for case if the unWrappedLen is 0, since Cyrus SASL plugin verifies if the length of wrapped data
+            // is less than the length specified by prepended 4 octets as per RFC 4422/2222. If so it just returns
+            // and waits for more data
+            if(unWrappedLen == 0 || (unWrappedData == NULL)) {
+                errorMsg << "Sasl unwrap failed with mismatch in length of wrapped data and the prepended length value";
+                DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::readAndDecryptMsg: " << errorMsg.str()
+                                                  << std::endl;)
+
+                Utils::freeBuffer(inBuf, LEN_PREFIX_BUFLEN);
+
+                // Release the buffer allocated to hold chunk
+                Utils::freeBuffer(currentBuffer->m_pBuffer, currentBuffer->m_bufSize);
+                currentBuffer = NULL;
+                return handleQryError(QRY_COMM_ERROR,
+                                      getMessage(ERR_QRY_COMMERR, errorMsg.str().c_str()), NULL);
             }
 
-            if(!error){
-                // read data successfully
-                if (!decode(currentBuffer->m_pBuffer, rmsgLen, msg)) {
-                    Utils::freeBuffer(_buf, LEN_PREFIX_BUFLEN);
-                    return handleQryError(QRY_COMM_ERROR,
-                            getMessage(ERR_QRY_COMMERR, "Cannot decode server message"), NULL);;
+            DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readAndDecryptMsg: Successfully decrypted the buffer"
+                                              << " Sizes - Before Decryption =  " << currentBuffer->m_bufSize
+                                              << " and After Decryption = " << unWrappedLen << std::endl;)
+
+            // Release the buffer allocated to hold chunk
+            Utils::freeBuffer(currentBuffer->m_pBuffer, currentBuffer->m_bufSize);
+            currentBuffer = NULL;
+
+            bytes_read = 0;
+            if(*allocatedBuffer == NULL) {
+                // This is the first chunk of the RPC message. We will decode the RPC message full length
+                bytes_read = rpcLengthDecode(reinterpret_cast<ByteBuf_t>(const_cast<char*>(unWrappedData)), rpcMsgLen);
+
+                DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readAndDecryptMsg: Rpc Message Length bytes = "
+                                                  << bytes_read << std::endl;)
+                DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readAndDecryptMsg: Rpc Message Length = "
+                                                  << rpcMsgLen << std::endl;)
+
+                if(rpcMsgLen == 0) {
+                    Utils::freeBuffer(inBuf, LEN_PREFIX_BUFLEN);
+                    return handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVREADLEN), NULL);
                 }
-                DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Done decoding chunk. Coordination id: " <<msg.m_coord_id<< std::endl;)
-            }else{
-                Utils::freeBuffer(_buf, LEN_PREFIX_BUFLEN);
-                return handleQryError(QRY_COMM_ERROR,
-                        getMessage(ERR_QRY_COMMERR, error.message().c_str()), NULL);
+                // Allocate a buffer for storing full RPC message. This is released by the caller
+                *allocatedBuffer = new AllocatedBuffer(rpcMsgLen);
+
+                if(*allocatedBuffer == NULL){
+                    Utils::freeBuffer(inBuf, LEN_PREFIX_BUFLEN);
+                    return handleQryError(QRY_CLIENT_OUTOFMEM, getMessage(ERR_QRY_OUTOFMEM), NULL);
+                }
+
+                DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readAndDecryptMsg:  Allocated and locked buffer:"
+                                                  << "[ " << *allocatedBuffer << ", size = " << rpcMsgLen << " ]\n";)
+
+                bytesToRead = rpcMsgLen;
             }
-        }else{
-            // got a message with an invalid read length.
-            Utils::freeBuffer(_buf, LEN_PREFIX_BUFLEN);
-            return handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVREADLEN), NULL);
+
+            // Update the leftover bytes that is not copied yet
+            leftover = unWrappedLen - bytes_read;
+
+            // Copy rest of decrypted message to the buffer. We can do this since it is assured that one
+            // entire decrypted chunk is part of the same RPC message.
+            if(leftover) {
+                memcpy((*allocatedBuffer)->m_pBuffer + writeIndex, unWrappedData + bytes_read, leftover);
+            }
+
+            // Update bytes left to read to form full RPC message.
+            bytesToRead -= leftover;
+            writeIndex += leftover;
+
+            DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readAndDecryptMsg: Left to read unencrypted data"
+                                              << " of length (bytesToRead) : " << bytesToRead << std::endl;)
+
+            if(bytesToRead > 0) {
+                // Read synchronously buffer of size LEN_PREFIX_BUFLEN to get length of next chunk
+                doReadFromSocket(inBuf, LEN_PREFIX_BUFLEN, error);
+
+                if(error) {
+                    Utils::freeBuffer(inBuf, LEN_PREFIX_BUFLEN);
+                    return handleQryError(QRY_COMM_ERROR, getMessage(ERR_QRY_COMMERR, error.message().c_str()), NULL);
+                }
+            }
+        }while(bytesToRead > 0); // more chunks to read for entire RPC message
+
+        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readAndDecryptMsg: Done decrypting entire RPC message "
+                                          << " of length: " << rpcMsgLen << ". Now starting decode:" << std::endl;)
+
+        // Decode the buffer and form a RPC message
+        if (!decode((*allocatedBuffer)->m_pBuffer, rpcMsgLen, msg)) {
+            Utils::freeBuffer(inBuf, LEN_PREFIX_BUFLEN);
+            return handleQryError(QRY_COMM_ERROR, getMessage(ERR_QRY_COMMERR,
+                                  "Cannot decode server message into valid RPC message"), NULL);
         }
+
+        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Successfully created a RPC message with Coordination id: "
+                                          << msg.m_coord_id << std::endl;)
     }
-    DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readMsg: Free buffer "
-        <<  reinterpret_cast<int*>(_buf) << std::endl;)
-    Utils::freeBuffer(_buf, LEN_PREFIX_BUFLEN);
+
+    DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readAndDecryptMsg: Free buffer "
+                                      <<  reinterpret_cast<int*>(inBuf) << std::endl;)
+    Utils::freeBuffer(inBuf, LEN_PREFIX_BUFLEN);
     return QRY_SUCCESS;
 }
 
@@ -1364,15 +1809,15 @@ status_t DrillClientImpl::processServerMetaResult(AllocatedBufferPtr allocatedBu
     std::map<int,DrillClientQueryHandle*>::const_iterator it=this->m_queryHandles.find(msg.m_coord_id);
     if(it!=this->m_queryHandles.end()){
         DrillClientServerMetaHandle* pHandle=static_cast<DrillClientServerMetaHandle*>((*it).second);
+        exec::user::GetServerMetaResp* resp = new exec::user::GetServerMetaResp();
         DRILL_MT_LOG(DRILL_LOG(LOG_TRACE)  << "Received GetServerMetaResp result Handle " << msg.m_pbody.size() << std::endl;)
-        exec::user::GetServerMetaResp resp;
-        if (!(resp.ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size()))) {
+        if (!(resp->ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size()))) {
             return handleQryError(QRY_COMM_ERROR, "Cannot decode GetServerMetaResp results", pHandle);
         }
-        if (resp.status() != exec::user::OK) {
-            return handleQryError(QRY_FAILED, resp.error(), pHandle);
+        if (resp->status() != exec::user::OK) {
+            return handleQryError(QRY_FAILED, resp->error(), pHandle);
         }
-        pHandle->notifyListener(&(resp.server_meta()), NULL);
+        pHandle->notifyListener(&(resp->server_meta()), NULL);
         DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "GetServerMetaResp result " << std::endl;)
     }else{
         return handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVQUERYID), NULL);
@@ -1484,11 +1929,11 @@ void DrillClientImpl::handleReadTimeout(const boost::system::error_code & err){
     return;
 }
 
-void DrillClientImpl::handleRead(ByteBuf_t _buf,
+void DrillClientImpl::handleRead(ByteBuf_t inBuf,
         const boost::system::error_code& error,
         size_t bytes_transferred) {
     DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handle Read from buffer "
-        <<  reinterpret_cast<int*>(_buf) << std::endl;)
+        <<  reinterpret_cast<int*>(inBuf) << std::endl;)
     if(DrillClientConfig::getQueryTimeout() > 0){
         // Cancel the timeout if handleRead is called
         DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Cancel deadline timer.\n";)
@@ -1496,7 +1941,7 @@ void DrillClientImpl::handleRead(ByteBuf_t _buf,
     }
     if (error) {
         // boost error
-        Utils::freeBuffer(_buf, LEN_PREFIX_BUFLEN);
+        Utils::freeBuffer(inBuf, LEN_PREFIX_BUFLEN);
         boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
         DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: ERR_QRY_COMMERR. "
             "Boost Communication Error: " << error.message() << std::endl;)
@@ -1510,7 +1955,7 @@ void DrillClientImpl::handleRead(ByteBuf_t _buf,
     DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Getting new message" << std::endl;)
     AllocatedBufferPtr allocatedBuffer=NULL;
 
-    if(readMsg(_buf, &allocatedBuffer, msg)!=QRY_SUCCESS){
+    if((this->*m_fpCurrentReadMsgHandler)(inBuf, &allocatedBuffer, msg)!=QRY_SUCCESS){
         delete allocatedBuffer;
         if(m_pendingRequests!=0){
             boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
@@ -1655,6 +2100,9 @@ status_t DrillClientImpl::validateResultMessage(const rpc::InBoundRpcMessage& ms
     return QRY_SUCCESS;
 }
 
+/*
+ * Called when there is failure in connect/send.
+ */
 connectionStatus_t DrillClientImpl::handleConnError(connectionStatus_t status, const std::string& msg){
     DrillClientError* pErr = new DrillClientError(status, DrillClientError::CONN_ERROR_START+status, msg);
     m_pendingRequests=0;
@@ -1669,19 +2117,28 @@ connectionStatus_t DrillClientImpl::handleConnError(connectionStatus_t status, c
     return status;
 }
 
+/*
+ * Always called with NULL QueryHandle when there is any error while reading data from socket. Once enough data is read
+ * and a valid RPC message is formed then it can get called with NULL/valid QueryHandle depending on if QueryHandle is found
+ * for the created RPC message.
+ */
 status_t DrillClientImpl::handleQryError(status_t status, const std::string& msg, DrillClientQueryHandle* pQueryHandle){
     DrillClientError* pErr = new DrillClientError(status, DrillClientError::QRY_ERROR_START+status, msg);
-    // set query error only if queries are running
+    // Set query error only if queries are running. If valid QueryHandle that means the bytes to form a valid
+    // RPC message was read successfully from socket. So there is no socket/connection issues.
     if(pQueryHandle!=NULL){
         m_pendingRequests--;
         pQueryHandle->signalError(pErr);
-    }else{
+    }else{ // This means error was while reading from socket, hence call broadcastError which eventually closes socket.
         m_pendingRequests=0;
         broadcastError(pErr);
     }
     return status;
 }
 
+/*
+ * Always called with valid QueryHandle when there is any error processing Query related data.
+ */
 status_t DrillClientImpl::handleQryError(status_t status,
         const exec::shared::DrillPBError& e,
         DrillClientQueryHandle* pQueryHandle){
@@ -1766,7 +2223,7 @@ void DrillClientImpl::sendAck(const rpc::InBoundRpcMessage& msg, bool isOk){
     ack.set_ok(isOk);
     rpc::OutBoundRpcMessage ack_msg(exec::rpc::RESPONSE, exec::user::ACK, msg.m_coord_id, &ack);
     boost::lock_guard<boost::mutex> lock(m_dcMutex);
-    sendSync(ack_msg);
+    sendSyncCommon(ack_msg);
     DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "ACK sent" << std::endl;)
 }
 
@@ -1774,7 +2231,7 @@ void DrillClientImpl::sendCancel(const exec::shared::QueryId* pQueryId){
     boost::lock_guard<boost::mutex> lock(m_dcMutex);
     uint64_t coordId = this->getNextCoordinationId();
     rpc::OutBoundRpcMessage cancel_msg(exec::rpc::REQUEST, exec::user::CANCEL_QUERY, coordId, pQueryId);
-    sendSync(cancel_msg);
+    sendSyncCommon(cancel_msg);
     DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "CANCEL sent" << std::endl;)
 }
 
@@ -1783,6 +2240,21 @@ void DrillClientImpl::shutdownSocket(){
     boost::system::error_code ignorederr;
     m_socket.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ignorederr);
     m_bIsConnected=false;
+
+    // Delete the saslAuthenticatorImpl instance since connection is broken. It will recreated on next
+    // call to connect.
+    if(m_saslAuthenticator != NULL) {
+        delete m_saslAuthenticator;
+        m_saslAuthenticator = NULL;
+    }
+
+    // Reset the SASL states.
+    m_saslDone = false;
+    m_saslResultCode = SASL_OK;
+
+    // Reset the encryption context since connection is invalid
+    m_encryptionCtxt.reset();
+
     DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Socket shutdown" << std::endl;)
 }
 
@@ -1799,8 +2271,6 @@ struct ServerMetaContext {
 	boost::mutex m_mutex;
 	boost::condition_variable m_cv;
 
-    ServerMetaContext(): m_done(false), m_status(QRY_SUCCESS), m_serverMeta(), m_mutex(), m_cv() {};
-
 	static status_t listener(void* ctx, const exec::user::ServerMeta* serverMeta, DrillClientError* err) {
 		ServerMetaContext* context = static_cast<ServerMetaContext*>(ctx);
 			if (err) {

http://git-wip-us.apache.org/repos/asf/drill/blob/d11aba2e/contrib/native/client/src/clientlib/drillClientImpl.hpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/drillClientImpl.hpp b/contrib/native/client/src/clientlib/drillClientImpl.hpp
index d37076e..852233f 100644
--- a/contrib/native/client/src/clientlib/drillClientImpl.hpp
+++ b/contrib/native/client/src/clientlib/drillClientImpl.hpp
@@ -368,16 +368,17 @@ class DrillClientColumnResult: public DrillClientMetadataResult<Metadata::pfnCol
     	DrillClientMetadataResult<Metadata::pfnColumnMetadataListener, meta::ColumnMetadata, meta::DrillColumnMetadata, exec::user::GetColumnsResp>(client, coordId, "getColumns", listener, listenerCtx) {}
 };
 
+// Length Decoder Function Pointer definition
+typedef size_t (DrillClientImpl::*lengthDecoder)(const ByteBuf_t, uint32_t&);
 
 class DrillClientImpl : public DrillClientImplBase{
     public:
         DrillClientImpl():
-            m_coordinationId(1),
             m_handshakeVersion(0),
             m_handshakeStatus(exec::user::SUCCESS),
             m_bIsConnected(false),
             m_saslAuthenticator(NULL),
-    		m_saslResultCode(SASL_OK),
+            m_saslResultCode(SASL_OK),
             m_saslDone(false),
             m_pendingRequests(0),
             m_pError(NULL),
@@ -388,9 +389,11 @@ class DrillClientImpl : public DrillClientImplBase{
             m_heartbeatTimer(m_io_service),
             m_rbuf(NULL),
             m_wbuf(MAX_SOCK_RD_BUFSIZE),
-			m_bIsDirectConnection(false)
+            m_bIsDirectConnection(false)
     {
         m_coordinationId=rand()%1729+1;
+        m_fpCurrentReadMsgHandler = &DrillClientImpl::readMsg;
+        m_fpCurrentSendHandler = &DrillClientImpl::sendSyncPlain;
     };
 
         ~DrillClientImpl(){
@@ -477,9 +480,10 @@ class DrillClientImpl : public DrillClientImplBase{
         void handleHeartbeatTimeout(const boost::system::error_code & err); // send a heartbeat. If send fails, broadcast error, close connection and bail out.
 
         int32_t getNextCoordinationId(){ return ++m_coordinationId; };
-        // send synchronous messages
-        //connectionStatus_t recvSync(rpc::InBoundRpcMessage& msg);
-        connectionStatus_t sendSync(rpc::OutBoundRpcMessage& msg);
+        // synchronous message send handlers
+        connectionStatus_t sendSyncCommon(rpc::OutBoundRpcMessage& msg);
+        connectionStatus_t sendSyncPlain();
+        connectionStatus_t sendSyncEncrypted();
         // handshake
         connectionStatus_t recvHandshake();
         void handleHandshake(ByteBuf_t b, const boost::system::error_code& err, std::size_t bytes_transferred );
@@ -488,10 +492,16 @@ class DrillClientImpl : public DrillClientImplBase{
         void startMessageListener(); 
         // Query results
         void getNextResult();
-        status_t readMsg(
-                ByteBuf_t _buf,
-                AllocatedBufferPtr* allocatedBuffer,
-                rpc::InBoundRpcMessage& msg);
+        // Read Message Handlers
+        status_t readMsg(const ByteBuf_t inBuf, AllocatedBufferPtr* allocatedBuffer, rpc::InBoundRpcMessage& msg);
+        status_t readAndDecryptMsg(const ByteBuf_t inBuf, AllocatedBufferPtr* allocatedBuffer, rpc::InBoundRpcMessage& msg);
+        status_t readLenBytesFromSocket(const ByteBuf_t bufWithLenField, AllocatedBufferPtr* bufferWithDataAndLenBytes,
+                                        uint32_t& lengthFieldLength, lengthDecoder lengthDecodeHandler);
+        void doReadFromSocket(ByteBuf_t inBuf, size_t bytesToRead, boost::system::error_code& errorCode);
+        void doWriteToSocket(const char* dataPtr, size_t bytesToWrite, boost::system::error_code& errorCode);
+        // Length decode handlers
+        size_t lengthDecode(const ByteBuf_t inBuf, uint32_t& rmsgLen);
+        size_t rpcLengthDecode(const ByteBuf_t inBuf, uint32_t& rmsgLen);
         status_t processQueryResult(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg);
         status_t processQueryData(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg);
         status_t processCancelledQueryResult( exec::shared::QueryId& qid, exec::shared::QueryResult* qr);
@@ -506,7 +516,7 @@ class DrillClientImpl : public DrillClientImplBase{
         status_t processQueryStatusResult( exec::shared::QueryResult* qr,
                 DrillClientQueryResult* pDrillClientQueryResult);
         void handleReadTimeout(const boost::system::error_code & err);
-        void handleRead(ByteBuf_t _buf, const boost::system::error_code & err, size_t bytes_transferred) ;
+        void handleRead(ByteBuf_t inBuf, const boost::system::error_code & err, size_t bytes_transferred) ;
         status_t validateDataMessage(const rpc::InBoundRpcMessage& msg, const exec::shared::QueryData& qd, std::string& valError);
         status_t validateResultMessage(const rpc::InBoundRpcMessage& msg, const exec::shared::QueryResult& qr, std::string& valError);
         connectionStatus_t handleConnError(connectionStatus_t status, const std::string& msg);
@@ -540,6 +550,7 @@ class DrillClientImpl : public DrillClientImplBase{
         void finishAuthentication();
 
         void shutdownSocket();
+        bool clientNeedsEncryption(const DrillUserProperties* userProperties);
 
         int32_t m_coordinationId;
         int32_t m_handshakeVersion;
@@ -557,6 +568,14 @@ class DrillClientImpl : public DrillClientImplBase{
         boost::mutex m_saslMutex; // mutex to protect m_saslDone
         boost::condition_variable m_saslCv; // to signal completion of SASL exchange
 
+        // Used for encryption and is set when server notifies in first handshake response.
+        EncryptionContext m_encryptionCtxt;
+
+        // Function pointer for read and send handler. By default these are referred to handler for plain message read/send. When encryption is enabled
+        // then after successful handshake these pointers refer to handler for encrypted message read/send over wire.
+        status_t (DrillClientImpl::*m_fpCurrentReadMsgHandler)(ByteBuf_t inBuf, AllocatedBufferPtr* allocatedBuffer, rpc::InBoundRpcMessage& msg);
+        connectionStatus_t (DrillClientImpl::*m_fpCurrentSendHandler)();
+
         std::string m_connectStr; 
 
         // 

http://git-wip-us.apache.org/repos/asf/drill/blob/d11aba2e/contrib/native/client/src/clientlib/rpcMessage.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/rpcMessage.cpp b/contrib/native/client/src/clientlib/rpcMessage.cpp
index 13cd7a8..f64167f 100644
--- a/contrib/native/client/src/clientlib/rpcMessage.cpp
+++ b/contrib/native/client/src/clientlib/rpcMessage.cpp
@@ -47,7 +47,7 @@ std::size_t lengthDecode(const uint8_t* buf, uint32_t& length) {
 
     // read the frame to get the length of the message and then
 
-    CodedInputStream cis(buf, 5); // read 5 bytes at most
+    CodedInputStream cis(buf, LEN_PREFIX_BUFLEN); // read LEN_PREFIX_BUFLEN bytes at most
 
     int startPos(cis.CurrentPosition()); // for debugging
     if (!cis.ReadVarint32(&length)) {

http://git-wip-us.apache.org/repos/asf/drill/blob/d11aba2e/contrib/native/client/src/clientlib/rpcMessage.hpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/rpcMessage.hpp b/contrib/native/client/src/clientlib/rpcMessage.hpp
index 15487e9..43bcaeb 100644
--- a/contrib/native/client/src/clientlib/rpcMessage.hpp
+++ b/contrib/native/client/src/clientlib/rpcMessage.hpp
@@ -54,7 +54,6 @@ std::size_t lengthDecode(const uint8_t* buf, uint32_t& length);
 bool decode(const uint8_t* buf, int length, InBoundRpcMessage& msg);
 
 bool encode(DataBuf& buf, const OutBoundRpcMessage& msg);
-
 } // namespace rpc
 } // namespace Drill
 

http://git-wip-us.apache.org/repos/asf/drill/blob/d11aba2e/contrib/native/client/src/clientlib/saslAuthenticatorImpl.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/saslAuthenticatorImpl.cpp b/contrib/native/client/src/clientlib/saslAuthenticatorImpl.cpp
index e7e2ba5..c5dc3ac 100644
--- a/contrib/native/client/src/clientlib/saslAuthenticatorImpl.cpp
+++ b/contrib/native/client/src/clientlib/saslAuthenticatorImpl.cpp
@@ -32,6 +32,7 @@ static const std::string DEFAULT_SERVICE_NAME = "drill";
 static const std::string KERBEROS_SIMPLE_NAME = "kerberos";
 static const std::string KERBEROS_SASL_NAME = "gssapi";
 static const std::string PLAIN_NAME = "plain";
+static const int PREFERRED_MIN_SSF = 56;
 
 const std::map<std::string, std::string> SaslAuthenticatorImpl::MECHANISM_MAPPING = boost::assign::map_list_of
     (KERBEROS_SIMPLE_NAME, KERBEROS_SASL_NAME)
@@ -42,8 +43,7 @@ boost::mutex SaslAuthenticatorImpl::s_mutex;
 bool SaslAuthenticatorImpl::s_initialized = false;
 
 SaslAuthenticatorImpl::SaslAuthenticatorImpl(const DrillUserProperties* const properties) :
-    m_pUserProperties(properties), m_pConnection(NULL), m_ppwdSecret(NULL) {
-
+    m_pUserProperties(properties), m_pConnection(NULL), m_ppwdSecret(NULL), m_pEncryptCtxt(NULL) {
     if (!s_initialized) {
         boost::lock_guard<boost::mutex> lock(SaslAuthenticatorImpl::s_mutex);
         if (!s_initialized) {
@@ -85,6 +85,9 @@ SaslAuthenticatorImpl::~SaslAuthenticatorImpl() {
         sasl_dispose(&m_pConnection);
     }
     m_pConnection = NULL;
+
+    // Memory is owned by DrillClientImpl object
+    m_pEncryptCtxt = NULL;
 }
 
 typedef int (*sasl_callback_proc_t)(void); // see sasl_callback_ft
@@ -109,8 +112,14 @@ int SaslAuthenticatorImpl::passwordCallback(sasl_conn_t *conn, void *context, in
     return SASL_OK;
 }
 
-int SaslAuthenticatorImpl::init(const std::vector<std::string>& mechanisms, exec::shared::SaslMessage& response) {
-    // find and set parameters
+int SaslAuthenticatorImpl::init(const std::vector<std::string>& mechanisms, exec::shared::SaslMessage& response,
+                                EncryptionContext* const encryptCtxt) {
+
+    // EncryptionContext should not be NULL here.
+    assert(encryptCtxt != NULL);
+    m_pEncryptCtxt = encryptCtxt;
+
+	// find and set parameters
     std::string authMechanismToUse;
     std::string serviceName;
     std::string serviceHost;
@@ -163,6 +172,9 @@ int SaslAuthenticatorImpl::init(const std::vector<std::string>& mechanisms, exec
                                       << saslResult << std::endl;)
     if (saslResult != SASL_OK) return saslResult;
 
+    // set the security properties
+    setSecurityProps();
+
     // initiate; for now, pass in only one mechanism
     const char *out;
     unsigned outlen;
@@ -204,4 +216,103 @@ int SaslAuthenticatorImpl::step(const exec::shared::SaslMessage& challenge, exec
     return saslResult;
 }
 
+/*
+ * Verify that the negotiated value is correct as per system configurations. Also retrieves and set the rawWrapSendSize
+ */
+int SaslAuthenticatorImpl::verifyAndUpdateSaslProps() {
+    const int* negotiatedValue;
+    int result = SASL_OK;
+
+    if(SASL_OK != (result = sasl_getprop(m_pConnection, SASL_SSF, reinterpret_cast<const void **>(&negotiatedValue)))) {
+        return result;
+    }
+
+    // If the negotiated SSF value is less than required one that means we have negotiated for weaker security level.
+    if(*negotiatedValue < PREFERRED_MIN_SSF) {
+        DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "SaslAuthenticatorImpl::verifyAndUpdateSaslProps: "
+                                          << "Negotiated SSF parameter:" << *negotiatedValue
+                                          << " is less than Preferred one: " << PREFERRED_MIN_SSF << std::endl;)
+        result = SASL_BADPARAM;
+        return result;
+    }
+
+    if(SASL_OK != (result = sasl_getprop(m_pConnection, SASL_MAXOUTBUF,
+                                         reinterpret_cast<const void **>(&negotiatedValue)))) {
+        return result;
+    }
+
+    DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "SaslAuthenticatorImpl::verifyAndUpdateSaslProps: "
+                                      << "Negotiated Raw Wrap Buffer size: " << *negotiatedValue << std::endl;)
+
+    m_pEncryptCtxt->setWrapSizeLimit(*negotiatedValue);
+    return result;
+}
+
+/*
+ *  Set the security properties structure with all the needed parameters for encryption so that
+ *  a proper mechanism with and cipher is chosen after handshake.
+ *
+ *  PREFERRED_MIN_SSF is chosen to be 56 since that is the max_ssf supported by gssapi. We want 
+ *  stronger cipher algorithm to be used all the time (preferably AES-256), so leaving MAX_SSF as UINT_MAX
+ */
+void SaslAuthenticatorImpl::setSecurityProps() const{
+
+    if(m_pEncryptCtxt->isEncryptionReqd()) {
+        // set the security properties.
+        sasl_security_properties_t secprops;
+        secprops.min_ssf = PREFERRED_MIN_SSF;
+        secprops.max_ssf = UINT_MAX;
+        secprops.maxbufsize = m_pEncryptCtxt->getMaxWrappedSize();
+        secprops.property_names = NULL;
+        secprops.property_values = NULL;
+        // Only specify NOPLAINTEXT for encryption since the mechanism is selected based on name not
+        // the security properties configured here.
+        secprops.security_flags = SASL_SEC_NOPLAINTEXT;
+
+        // Set the security properties in the connection context.
+        sasl_setprop(m_pConnection, SASL_SEC_PROPS, &secprops);
+    }
+}
+
+/*
+ * Encodes the input data by calling the sasl_encode provided by Cyrus-SASL library which internally calls
+ * the wrap function of the chosen mechanism. The output buffer will have first 4 octets as the length of
+ * encrypted data in network byte order.
+ *
+ * Parameters:
+ *      dataToWrap      -   in param    -   pointer to data buffer to encrypt.
+ *      dataToWrapLen   -   in param    -   length of data buffer to encrypt.
+ *      output          -   out param   -   pointer to data buffer with encrypted data. Allocated by Cyrus-SASL
+ *      wrappedLen      -   out param   -   length of data after encryption
+ * Returns:
+ *      SASL_OK         - success (returns input if no layer negotiated)
+ *      SASL_NOTDONE    - security layer negotiation not finished
+ *      SASL_BADPARAM   - inputlen is greater than the SASL_MAXOUTBUF
+ */
+int SaslAuthenticatorImpl::wrap(const char* dataToWrap, const int& dataToWrapLen, const char** output,
+                                uint32_t& wrappedLen) {
+    return sasl_encode(m_pConnection, dataToWrap, dataToWrapLen, output, &wrappedLen);
+}
+
+/*
+ * Decodes the input data by calling the sasl_decode provided by Cyrus-SASL library which internally calls
+ * the wrap function of the chosen mechanism. The input buffer will have first 4 octets as the length of
+ * encrypted data in network byte order.
+ *
+ * Parameters:
+ *      dataToUnWrap      -   in param    -   pointer to data buffer to decrypt.
+ *      dataToUnWrapLen   -   in param    -   length of data buffer to decrypt.
+ *      output            -   out param   -   pointer to data buffer with decrypted data. Allocated by Cyrus-SASL
+ *      unWrappedLen      -   out param   -   length of data after decryption
+ * Returns:
+ *      SASL_OK         - success (returns input if no layer negotiated)
+ *      SASL_NOTDONE    - security layer negotiation not finished
+ *      SASL_BADPARAM   - inputlen is greater than the SASL_MAXOUTBUF
+ */
+int SaslAuthenticatorImpl::unwrap(const char* dataToUnWrap, const int& dataToUnWrapLen, const char** output,
+                                  uint32_t& unWrappedLen) {
+    return sasl_decode(m_pConnection, dataToUnWrap, dataToUnWrapLen, output, &unWrappedLen);
+}
+
+
 } /* namespace Drill */

http://git-wip-us.apache.org/repos/asf/drill/blob/d11aba2e/contrib/native/client/src/clientlib/saslAuthenticatorImpl.hpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/saslAuthenticatorImpl.hpp b/contrib/native/client/src/clientlib/saslAuthenticatorImpl.hpp
index 5e36ee1..53fe4e3 100644
--- a/contrib/native/client/src/clientlib/saslAuthenticatorImpl.hpp
+++ b/contrib/native/client/src/clientlib/saslAuthenticatorImpl.hpp
@@ -24,6 +24,7 @@
 #include <vector>
 #include "drill/drillClient.hpp"
 #include "UserBitShared.pb.h"
+#include "utils.hpp"
 
 #include "sasl/sasl.h"
 #include "sasl/saslplug.h"
@@ -38,10 +39,17 @@ public:
 
     ~SaslAuthenticatorImpl();
 
-    int init(const std::vector<std::string>& mechanisms, exec::shared::SaslMessage& response);
+    int init(const std::vector<std::string>& mechanisms, exec::shared::SaslMessage& response,
+             EncryptionContext* const encryptCtxt);
 
     int step(const exec::shared::SaslMessage& challenge, exec::shared::SaslMessage& response) const;
 
+    int verifyAndUpdateSaslProps();
+
+    int wrap(const char* dataToWrap, const int& dataToWrapLen, const char** output, uint32_t& wrappedLen);
+
+    int unwrap(const char* dataToUnWrap, const int& dataToUnWrapLen, const char** output, uint32_t& unWrappedLen);
+
 private:
 
     static const std::map<std::string, std::string> MECHANISM_MAPPING;
@@ -53,11 +61,13 @@ private:
     sasl_conn_t *m_pConnection;
     std::string m_username;
     sasl_secret_t *m_ppwdSecret;
+    EncryptionContext *m_pEncryptCtxt;
 
     static int passwordCallback(sasl_conn_t *conn, void *context, int id, sasl_secret_t **psecret);
 
     static int userNameCallback(void *context, int id, const char **result, unsigned int *len);
 
+    void setSecurityProps() const;
 };
 
 } /* namespace Drill */

http://git-wip-us.apache.org/repos/asf/drill/blob/d11aba2e/contrib/native/client/src/clientlib/utils.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/utils.cpp b/contrib/native/client/src/clientlib/utils.cpp
index d3c8f08..11aa2c2 100644
--- a/contrib/native/client/src/clientlib/utils.cpp
+++ b/contrib/native/client/src/clientlib/utils.cpp
@@ -111,4 +111,53 @@ AllocatedBuffer::~AllocatedBuffer(){
     m_bufSize = 0;
 }
 
+EncryptionContext::EncryptionContext(const bool& encryptionReqd, const int& maxWrappedSize, const int& wrapSizeLimit) {
+    this->m_bEncryptionReqd = encryptionReqd;
+    this->m_maxWrappedSize = maxWrappedSize;
+    this->m_wrapSizeLimit = wrapSizeLimit;
+}
+
+EncryptionContext::EncryptionContext() {
+    this->m_bEncryptionReqd = false;
+    this->m_maxWrappedSize = 65536;
+    this->m_wrapSizeLimit = 0;
+}
+
+void EncryptionContext::setEncryptionReqd(const bool& encryptionReqd) {
+    this->m_bEncryptionReqd = encryptionReqd;
+}
+
+void EncryptionContext::setMaxWrappedSize(const int& maxWrappedSize) {
+    this->m_maxWrappedSize = maxWrappedSize;
+}
+
+void EncryptionContext::setWrapSizeLimit(const int& wrapSizeLimit) {
+    this->m_wrapSizeLimit = wrapSizeLimit;
+}
+
+bool EncryptionContext::isEncryptionReqd() const {
+    return m_bEncryptionReqd;
+}
+
+int EncryptionContext::getMaxWrappedSize() const {
+    return m_maxWrappedSize;
+}
+
+int EncryptionContext::getWrapSizeLimit() const {
+    return m_wrapSizeLimit;
+}
+
+void EncryptionContext::reset() {
+    this->m_bEncryptionReqd = false;
+    this->m_maxWrappedSize = 65536;
+    this->m_wrapSizeLimit = 0;
+}
+
+std::ostream& operator<<(std::ostream &contextStream, const EncryptionContext& context) {
+    contextStream << " Encryption: " << (context.isEncryptionReqd() ? "enabled" : "disabled");
+    contextStream << " ,MaxWrappedSize: " << context.getMaxWrappedSize();
+    contextStream << " ,WrapSizeLimit: " << context.getWrapSizeLimit();
+    return contextStream;
+}
+
 } // namespace 

http://git-wip-us.apache.org/repos/asf/drill/blob/d11aba2e/contrib/native/client/src/clientlib/utils.hpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/utils.hpp b/contrib/native/client/src/clientlib/utils.hpp
index 4cd8fa5..d30794c 100644
--- a/contrib/native/client/src/clientlib/utils.hpp
+++ b/contrib/native/client/src/clientlib/utils.hpp
@@ -98,6 +98,38 @@ class DECLSPEC_DRILL_CLIENT Utils{
 
 }; // Utils
 
+/*
+ * Encryption related configuration parameters. The member's are updated with value received from server
+ * and also after the SASL Handshake is done.
+ */
+class EncryptionContext {
+
+	bool m_bEncryptionReqd;
+	int m_maxWrappedSize;
+	int m_wrapSizeLimit;
+
+public:
+	EncryptionContext();
+
+	EncryptionContext(const bool& encryptionReqd, const int& maxWrappedSize, const int& wrapSizeLimit);
+
+	void setEncryptionReqd(const bool& encryptionReqd);
+
+	void setMaxWrappedSize(const int& maxWrappedSize);
+
+	void setWrapSizeLimit(const int& wrapSizeLimit);
+
+	bool isEncryptionReqd() const;
+
+	int getMaxWrappedSize() const;
+
+	int getWrapSizeLimit() const;
+
+	void reset();
+
+	friend std::ostream& operator<<(std::ostream &contextStream, const EncryptionContext& context);
+};
+
 } // namespace Drill
 
 #endif

http://git-wip-us.apache.org/repos/asf/drill/blob/d11aba2e/contrib/native/client/src/include/drill/common.hpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/include/drill/common.hpp b/contrib/native/client/src/include/drill/common.hpp
index ed0a1ed..5401c75 100644
--- a/contrib/native/client/src/include/drill/common.hpp
+++ b/contrib/native/client/src/include/drill/common.hpp
@@ -56,6 +56,7 @@
 
 #define LENGTH_PREFIX_MAX_LENGTH 5
 #define LEN_PREFIX_BUFLEN LENGTH_PREFIX_MAX_LENGTH
+#define ENCRYPT_LEN_PREFIX_BUFLEN 4
 
 #define MAX_CONNECT_STR 4096
 #define MAX_SOCK_RD_BUFSIZE  1024
@@ -169,6 +170,7 @@ typedef enum{
 #define USERPROP_AUTH_MECHANISM "auth"
 #define USERPROP_SERVICE_NAME "service_name"
 #define USERPROP_SERVICE_HOST "service_host"
+#define USERPROP_SASL_ENCRYPT "sasl_encrypt"
 
 // Bitflags to describe user properties
 // Used in DrillUserProperties::USER_PROPERTIES