You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by am...@apache.org on 2017/05/20 23:52:47 UTC
[6/6] drill git commit: DRILL-4335: Apache Drill should support
network encryption.
DRILL-4335: Apache Drill should support network encryption.
NOTE: This pull request provides support for on-wire encryption using SASL framework. Communication channel covered is:
1) C++ Drill Client and Drillbit channel.
close apache/drill#809
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/d11aba2e
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/d11aba2e
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/d11aba2e
Branch: refs/heads/master
Commit: d11aba2e55323bb5a6a9deb5bb09fd87470dcedf
Parents: ce8bbc0
Author: Sorabh Hamirwasia <sh...@maprtech.com>
Authored: Mon Mar 6 00:19:50 2017 -0800
Committer: Aman Sinha <as...@maprtech.com>
Committed: Sat May 20 16:17:19 2017 -0700
----------------------------------------------------------------------
.../native/client/example/querySubmitter.cpp | 25 +-
contrib/native/client/readme.macos | 6 +-
.../native/client/src/clientlib/drillClient.cpp | 1 +
.../client/src/clientlib/drillClientImpl.cpp | 694 ++++++++++++++++---
.../client/src/clientlib/drillClientImpl.hpp | 41 +-
.../native/client/src/clientlib/rpcMessage.cpp | 2 +-
.../native/client/src/clientlib/rpcMessage.hpp | 1 -
.../src/clientlib/saslAuthenticatorImpl.cpp | 119 +++-
.../src/clientlib/saslAuthenticatorImpl.hpp | 12 +-
contrib/native/client/src/clientlib/utils.cpp | 49 ++
contrib/native/client/src/clientlib/utils.hpp | 32 +
.../native/client/src/include/drill/common.hpp | 2 +
contrib/native/client/src/protobuf/User.pb.cc | 463 ++++++++-----
contrib/native/client/src/protobuf/User.pb.h | 71 +-
14 files changed, 1194 insertions(+), 324 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/d11aba2e/contrib/native/client/example/querySubmitter.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/example/querySubmitter.cpp b/contrib/native/client/example/querySubmitter.cpp
index 5990897..47e55de 100644
--- a/contrib/native/client/example/querySubmitter.cpp
+++ b/contrib/native/client/example/querySubmitter.cpp
@@ -1,3 +1,4 @@
+
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -24,7 +25,7 @@
#include <boost/algorithm/string/join.hpp>
#include "drill/drillc.hpp"
-int nOptions=15;
+int nOptions=19;
struct Option{
char name[32];
@@ -45,7 +46,11 @@ struct Option{
{"heartbeatFrequency", "Heartbeat frequency (second). Disabled if set to 0.", false},
{"user", "Username", false},
{"password", "Password", false},
- {"saslPluginPath", "Path to where SASL plugins are installed", false}
+ {"saslPluginPath", "Path to where SASL plugins are installed", false},
+ {"service_host", "Service host for Kerberos", false},
+ {"service_name", "Service name for Kerberos", false},
+ {"auth", "Authentication mechanism to use", false},
+ {"sasl_encrypt", "Negotiate for encrypted connection", false}
};
std::map<std::string, std::string> qsOptionValues;
@@ -295,6 +300,10 @@ int main(int argc, char* argv[]) {
std::string user=qsOptionValues["user"];
std::string password=qsOptionValues["password"];
std::string saslPluginPath=qsOptionValues["saslPluginPath"];
+ std::string sasl_encrypt=qsOptionValues["sasl_encrypt"];
+ std::string serviceHost=qsOptionValues["service_host"];
+ std::string serviceName=qsOptionValues["service_name"];
+ std::string auth=qsOptionValues["auth"];
Drill::QueryType type;
@@ -371,6 +380,18 @@ int main(int argc, char* argv[]) {
if(password.length()>0){
props.setProperty(USERPROP_PASSWORD, password);
}
+ if(sasl_encrypt.length()>0){
+ props.setProperty(USERPROP_SASL_ENCRYPT, sasl_encrypt);
+ }
+ if(serviceHost.length()>0){
+ props.setProperty(USERPROP_SERVICE_HOST, serviceHost);
+ }
+ if(serviceName.length()>0){
+ props.setProperty(USERPROP_SERVICE_NAME, serviceName);
+ }
+ if(auth.length()>0){
+ props.setProperty(USERPROP_AUTH_MECHANISM, auth);
+ }
if(client.connect(connectStr.c_str(), &props)!=Drill::CONN_SUCCESS){
std::cerr<< "Failed to connect with error: "<< client.getError() << " (Using:"<<connectStr<<")"<<std::endl;
http://git-wip-us.apache.org/repos/asf/drill/blob/d11aba2e/contrib/native/client/readme.macos
----------------------------------------------------------------------
diff --git a/contrib/native/client/readme.macos b/contrib/native/client/readme.macos
index 4785e87..eee017e 100644
--- a/contrib/native/client/readme.macos
+++ b/contrib/native/client/readme.macos
@@ -35,6 +35,8 @@ Install Prerequisites
or use brew to install
$> brew install cmake
+2.0) Install cppunit
+ $> brew install cppunit
2.1) Install protobuf 2.5.0 (or higher)
$> brew install protobuf
@@ -54,7 +56,7 @@ Install Prerequisites
When changes have been introduced to the protocol module, you might need to refresh the protobuf C++ source files too.
$> cd DRILL_DIR/contrib/native/client
$> mkdir build
- $> cd build && cmake3 -G "XCode" -D CMAKE_BUILD_TYPE=Debug ..
+ $> cd build && cmake -G "Xcode" -D CMAKE_BUILD_TYPE=Debug ..
$> xcodebuild -project drillclient.xcodeproj -configuration ${BUILDTYPE} -target fixProtobufs
$> xcodebuild -project drillclient.xcodeproj -configuration ${BUILDTYPE} -target cpProtobufs
@@ -64,7 +66,7 @@ Build drill client
-------------------
$> cd DRILL_DIR/contrib/native/client
$> mkdir build
- $> cd build && cmake3 -G "XCode" -D CMAKE_BUILD_TYPE=Debug ..
+ $> cd build && cmake -G "Xcode" -D CMAKE_BUILD_TYPE=Debug ..
$> xcodebuild -project drillclient.xcodeproj -configuration ${BUILDTYPE} -target ALL_BUILD
http://git-wip-us.apache.org/repos/asf/drill/blob/d11aba2e/contrib/native/client/src/clientlib/drillClient.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/drillClient.cpp b/contrib/native/client/src/clientlib/drillClient.cpp
index 7000272..8eb909b 100644
--- a/contrib/native/client/src/clientlib/drillClient.cpp
+++ b/contrib/native/client/src/clientlib/drillClient.cpp
@@ -181,6 +181,7 @@ const std::map<std::string, uint32_t> DrillUserProperties::USER_PROPERTIES=boos
( USERPROP_USESSL, USERPROP_FLAGS_BOOLEAN|USERPROP_FLAGS_SSLPROP)
( USERPROP_FILEPATH, USERPROP_FLAGS_STRING|USERPROP_FLAGS_SSLPROP|USERPROP_FLAGS_FILEPATH)
( USERPROP_FILENAME, USERPROP_FLAGS_STRING|USERPROP_FLAGS_SSLPROP|USERPROP_FLAGS_FILENAME)
+ ( USERPROP_SASL_ENCRYPT, USERPROP_FLAGS_STRING)
;
bool DrillUserProperties::validate(std::string& err){
http://git-wip-us.apache.org/repos/asf/drill/blob/d11aba2e/contrib/native/client/src/clientlib/drillClientImpl.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/drillClientImpl.cpp b/contrib/native/client/src/clientlib/drillClientImpl.cpp
index 30a354e..0dee309 100644
--- a/contrib/native/client/src/clientlib/drillClientImpl.cpp
+++ b/contrib/native/client/src/clientlib/drillClientImpl.cpp
@@ -30,7 +30,6 @@
#include <boost/lexical_cast.hpp>
#include <boost/thread.hpp>
-
#include "drill/drillClient.hpp"
#include "drill/fieldmeta.hpp"
#include "drill/recordBatch.hpp"
@@ -193,7 +192,7 @@ connectionStatus_t DrillClientImpl::sendHeartbeat(){
boost::lock_guard<boost::mutex> prLock(this->m_prMutex);
boost::lock_guard<boost::mutex> lock(m_dcMutex);
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Heartbeat sent." << std::endl;)
- status=sendSync(heartbeatMsg);
+ status=sendSyncCommon(heartbeatMsg);
status=status==CONN_SUCCESS?status:CONN_DEAD;
//If the server sends responses to a heartbeat, we need to increment the pending requests counter.
if(m_pendingRequests++==0){
@@ -233,18 +232,125 @@ void DrillClientImpl::Close() {
shutdownSocket();
}
+/*
+ * Write bytesToWrite length data bytes pointed by dataPtr. It handles EINTR error
+ * occurred during write_some sys call and does a retry on that.
+ *
+ * Parameters:
+ * dataPtr - in param - Pointer to data bytes to write on socket.
+ * bytesToWrite - in param - Length of data bytes to write from dataPtr.
+ * errorCode - out param - Error code set by boost.
+ */
+void DrillClientImpl::doWriteToSocket(const char* dataPtr, size_t bytesToWrite,
+ boost::system::error_code& errorCode) {
+ if(0 == bytesToWrite) {
+ return;
+ }
+
+ // Write all the bytes to socket. In case of error when all bytes are not successfully written
+ // proper errorCode will be set.
+ while(1) {
+ size_t bytesWritten = m_socket.write_some(boost::asio::buffer(dataPtr, bytesToWrite), errorCode);
+
+ // Update the state
+ bytesToWrite -= bytesWritten;
+ dataPtr += bytesWritten;
+
+ if(EINTR != errorCode.value()) break;
+
+ // Check if all the data is written then break from loop
+ if(0 == bytesToWrite) break;
+ }
+}
-connectionStatus_t DrillClientImpl::sendSync(rpc::OutBoundRpcMessage& msg){
+/*
+ * Common wrapper to take care of sending both plain or encrypted message. It creates a send buffer from an
+ * OutboundRPCMessage and then call the send handler pointing to either sendSyncPlain or sendSyncEncrypted
+ *
+ * Return:
+ * connectionStatus_t - CONN_SUCCESS - In case of successful send
+ * - CONN_FAILURE - In case of failure to send
+ */
+connectionStatus_t DrillClientImpl::sendSyncCommon(rpc::OutBoundRpcMessage& msg) {
encode(m_wbuf, msg);
+ return (this->*m_fpCurrentSendHandler)();
+}
+
+/*
+ * Send handler for sending plain messages over wire
+ *
+ * Return:
+ * connectionStatus_t - CONN_SUCCESS - In case of successful send
+ * - CONN_FAILURE - In case of failure to send
+ */
+connectionStatus_t DrillClientImpl::sendSyncPlain(){
+
boost::system::error_code ec;
- size_t s=m_socket.write_some(boost::asio::buffer(m_wbuf), ec);
- if(!ec && s!=0){
+ doWriteToSocket(reinterpret_cast<char*>(m_wbuf.data()), m_wbuf.size(), ec);
+
+ if(!ec) {
return CONN_SUCCESS;
- }else{
+ } else {
return handleConnError(CONN_FAILURE, getMessage(ERR_CONN_WFAIL, ec.message().c_str()));
}
}
+/*
+ * Send handler for sending encrypted messages over wire. It encrypts the send buffer using wrap api provided by
+ * saslAuthenticatorImpl and then transmit the encrypted bytes over wire.
+ *
+ * Return:
+ * connectionStatus_t - CONN_SUCCESS - In case of successful send
+ * - CONN_FAILURE - In case of failure to send
+ */
+connectionStatus_t DrillClientImpl::sendSyncEncrypted() {
+
+ boost::system::error_code ec;
+
+ // Encoded message is encrypted into chunks of size <= WrapSizeLimit. Each encrypted chunk along with
+ // its encrypted length in network order (added by Cyrus-SASL plugin) is sent over wire.
+ const int wrapChunkSize = m_encryptionCtxt.getWrapSizeLimit();
+ int lengthToEncrypt = m_wbuf.size();
+
+ int currentChunkLen = std::min(wrapChunkSize, lengthToEncrypt);
+ uint32_t currentChunkOffset = 0;
+ std::stringstream errorMsg;
+
+ // Encrypt and send each chunk
+ while(lengthToEncrypt != 0) {
+ const char* wrappedChunk = NULL;
+ uint32_t wrappedLen = 0;
+ const int wrapResult = m_saslAuthenticator->wrap(reinterpret_cast<const char*>(m_wbuf.data() + currentChunkOffset),
+ currentChunkLen, &wrappedChunk, wrappedLen);
+ if(SASL_OK != wrapResult) {
+ errorMsg << "Sasl wrap failed while encrypting chunk of length: " << currentChunkLen << " , EncodeError: "
+ << wrapResult;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::sendSyncEncrypted - " << errorMsg.str()
+ << " ,ChunkOffset: " << currentChunkOffset << ", Message Len: " << m_wbuf.size()
+ << ", Closing connection.";)
+ return handleConnError(CONN_FAILURE, getMessage(ERR_CONN_WFAIL, errorMsg.str().c_str()));
+ }
+
+ // Send the encrypted chunk.
+ doWriteToSocket(wrappedChunk, wrappedLen, ec);
+
+ if(ec) {
+ errorMsg << "Failure while sending encrypted chunk. Error: " << ec.message().c_str();
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::sendSyncEncrypted - " << errorMsg.str()
+ << ", Chunk Length: " << currentChunkLen << ", ChunkOffset:" << currentChunkOffset
+ << ", Message Len: " << m_wbuf.size() << ", Closing connection.";)
+ return handleConnError(CONN_FAILURE, getMessage(ERR_CONN_WFAIL, errorMsg.str().c_str()));
+ }
+
+ // Update variables after sending each encrypted chunk
+ lengthToEncrypt -= currentChunkLen;
+ currentChunkOffset += currentChunkLen;
+ currentChunkLen = std::min(wrapChunkSize, lengthToEncrypt);
+ }
+
+ return CONN_SUCCESS;
+}
+
connectionStatus_t DrillClientImpl::recvHandshake(){
if(m_rbuf==NULL){
m_rbuf = Utils::allocateBuffer(MAX_SOCK_RD_BUFSIZE);
@@ -289,7 +395,41 @@ connectionStatus_t DrillClientImpl::recvHandshake(){
return CONN_SUCCESS;
}
-void DrillClientImpl::handleHandshake(ByteBuf_t _buf,
+/*
+ * Read bytesToRead length data bytes from socket into inBuf. It handles EINTR error
+ * occurred during read_some sys call and does a retry on that.
+ *
+ * Parameters:
+ * inBuf - out param - Pointer to buffer to read data into from socket.
+ * bytesToRead - in param - Length of data bytes to read from socket.
+ * errorCode - out param - Error code set by boost.
+ */
+void DrillClientImpl::doReadFromSocket(ByteBuf_t inBuf, size_t bytesToRead,
+ boost::system::error_code& errorCode) {
+
+ // Check if bytesToRead is zero
+ if(0 == bytesToRead) {
+ return;
+ }
+
+ // Read all the bytes. In case when all the bytes were not read the proper
+ // errorCode will be set.
+ while(1){
+ size_t dataBytesRead = m_socket.read_some(boost::asio::buffer(inBuf, bytesToRead),
+ errorCode);
+ // Update the state
+ bytesToRead -= dataBytesRead;
+ inBuf += dataBytesRead;
+
+ // Check if errorCode is EINTR then just retry otherwise break from loop
+ if(EINTR != errorCode.value()) break;
+
+ // Check if all the data is read then break from loop
+ if(0 == bytesToRead) break;
+ }
+}
+
+void DrillClientImpl::handleHandshake(ByteBuf_t inBuf,
const boost::system::error_code& err,
size_t bytes_transferred) {
boost::system::error_code error=err;
@@ -299,21 +439,23 @@ void DrillClientImpl::handleHandshake(ByteBuf_t _buf,
if(!error){
rpc::InBoundRpcMessage msg;
uint32_t length = 0;
- std::size_t bytes_read = rpc::lengthDecode(m_rbuf, length);
+ std::size_t bytes_read = rpcLengthDecode(m_rbuf, length);
if(length>0){
- size_t leftover = LEN_PREFIX_BUFLEN - bytes_read;
- ByteBuf_t b=m_rbuf + LEN_PREFIX_BUFLEN;
- size_t bytesToRead=length - leftover;
- while(1){
- size_t dataBytesRead=m_socket.read_some(
- boost::asio::buffer(b, bytesToRead),
- error);
- if(err) break;
- DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Handshake Message: actual bytes read = " << dataBytesRead << std::endl;)
- if(dataBytesRead==bytesToRead) break;
- bytesToRead-=dataBytesRead;
- b+=dataBytesRead;
+ const size_t leftover = LEN_PREFIX_BUFLEN - bytes_read;
+ const ByteBuf_t b = m_rbuf + LEN_PREFIX_BUFLEN;
+ const size_t bytesToRead=length - leftover;
+ doReadFromSocket(b, bytesToRead, error);
+
+ // Check if any error happen while reading the message bytes. If yes then return before decoding the Msg
+ if(error) {
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleHandshake: ERR_CONN_RDFAIL. "
+ << " Failed to read entire handshake message. with error: "
+ << error.message().c_str() << "\n";)
+ handleConnError(CONN_FAILURE, getMessage(ERR_CONN_RDFAIL, "Failed to read entire handshake message"));
+ return;
}
+
+ // Decode the bytes into a valid RPC Message
if (!decode(m_rbuf+bytes_read, length, msg)) {
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleHandshake: ERR_CONN_RDFAIL. Cannot decode handshake.\n";)
handleConnError(CONN_FAILURE, getMessage(ERR_CONN_RDFAIL, "Cannot decode handshake"));
@@ -340,6 +482,11 @@ void DrillClientImpl::handleHandshake(ByteBuf_t _buf,
this->m_serverAuthMechanisms.push_back(mechanism);
}
+ // Updated encryption context based on server response
+ this->m_encryptionCtxt.setEncryptionReqd(b2u.has_encrypted() && b2u.encrypted());
+ if(b2u.has_maxwrappedsize()) {
+ this->m_encryptionCtxt.setMaxWrappedSize(b2u.maxwrappedsize());
+ }
}else{
// boost error
if(error==boost::asio::error::eof){ // Server broke off the connection
@@ -360,7 +507,8 @@ void DrillClientImpl::handleHShakeReadTimeout(const boost::system::error_code &
if (m_deadlineTimer.expires_at() <= boost::asio::deadline_timer::traits_type::now()){
// The deadline has passed.
m_deadlineTimer.expires_at(boost::posix_time::pos_infin);
- DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::HandleHShakeReadTimeout: Deadline timer expired; ERR_CONN_HSHAKETIMOUT.\n";)
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::HandleHShakeReadTimeout: "
+ << "Deadline timer expired; ERR_CONN_HSHAKETIMOUT.\n";)
handleConnError(CONN_HANDSHAKE_TIMEOUT, getMessage(ERR_CONN_HSHAKETIMOUT));
m_io_service.stop();
boost::system::error_code ignorederr;
@@ -370,6 +518,33 @@ void DrillClientImpl::handleHShakeReadTimeout(const boost::system::error_code &
return;
}
+/*
+ * Check's if client has explicitly expressed interest in encrypted connections only. It looks for USERPROP_SASL_ENCRYPT
+ * connection string property. If set to true then returns true else returns false
+ */
+bool DrillClientImpl::clientNeedsEncryption(const DrillUserProperties* userProperties) {
+ bool needsEncryption = false;
+ // check if userProperties is null
+ if(!userProperties) {
+ return needsEncryption;
+ }
+
+ // Loop through the property to find USERPROP_SASL_ENCRYPT and it's value
+ for (size_t i = 0; i < userProperties->size(); i++) {
+ const std::string key = userProperties->keyAt(i);
+ std::string value = userProperties->valueAt(i);
+
+ if(USERPROP_SASL_ENCRYPT == key) {
+ boost::algorithm::to_lower(value);
+
+ if(0 == value.compare("true")) {
+ needsEncryption = true;
+ }
+ }
+ }
+ return needsEncryption;
+}
+
connectionStatus_t DrillClientImpl::validateHandshake(DrillUserProperties* properties){
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "validateHandShake\n";)
@@ -379,7 +554,7 @@ connectionStatus_t DrillClientImpl::validateHandshake(DrillUserProperties* prope
u2b.set_rpc_version(DRILL_RPC_VERSION);
u2b.set_support_listening(true);
u2b.set_support_timeout(DrillClientConfig::getHeartbeatFrequency() > 0);
- u2b.set_sasl_support(exec::user::SASL_AUTH);
+ u2b.set_sasl_support(exec::user::SASL_PRIVACY);
// Adding version info
exec::user::RpcEndpointInfos* infos = u2b.mutable_client_infos();
@@ -436,7 +611,7 @@ connectionStatus_t DrillClientImpl::validateHandshake(DrillUserProperties* prope
uint64_t coordId = this->getNextCoordinationId();
rpc::OutBoundRpcMessage out_msg(exec::rpc::REQUEST, exec::user::HANDSHAKE, coordId, &u2b);
- sendSync(out_msg);
+ sendSyncCommon(out_msg);
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Sent handshake request message. Coordination id: " << coordId << "\n";)
}
@@ -479,6 +654,13 @@ connectionStatus_t DrillClientImpl::validateHandshake(DrillUserProperties* prope
}
connectionStatus_t DrillClientImpl::handleAuthentication(const DrillUserProperties *userProperties) {
+
+ // Check if client needs encryption and server is configured for encryption or not before starting handshake
+ if(clientNeedsEncryption(userProperties) && !m_encryptionCtxt.isEncryptionReqd()) {
+ return handleConnError(CONN_AUTH_FAILED, "Client needs encryption but on server side encryption is disabled."
+ " Please check connection parameters or contact administrator?");
+ }
+
try {
m_saslAuthenticator = new SaslAuthenticatorImpl(userProperties);
} catch (std::runtime_error& e) {
@@ -495,26 +677,46 @@ connectionStatus_t DrillClientImpl::handleAuthentication(const DrillUserProperti
}
}
+ std::stringstream logMsg;
+ logMsg << "DrillClientImpl::handleAuthentication: Authentication failed. [Details: ";
+
if (SASL_OK == m_saslResultCode) {
- DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::handleAuthentication: Successfully authenticated!"
- << std::endl;)
+ // Check the negotiated SSF value and change the handlers.
+ if(m_encryptionCtxt.isEncryptionReqd()) {
+ if(SASL_OK != m_saslAuthenticator->verifyAndUpdateSaslProps()) {
+ logMsg << m_encryptionCtxt << "]. Negotiated Parameter is invalid."
+ << " Error: " << m_saslResultCode;
+ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << logMsg.str() << std::endl;)
+ return handleConnError(CONN_AUTH_FAILED, logMsg.str().c_str());
+ }
+
+ // Successfully negotiated for encryption related security parameters.
+ // Start using Encrypt and Decrypt handlers.
+ m_fpCurrentSendHandler = &DrillClientImpl::sendSyncEncrypted;
+ m_fpCurrentReadMsgHandler = &DrillClientImpl::readAndDecryptMsg;
+ }
- // in future, negotiated security layers are known here..
+ // Reset the errorMsg stream since this is success case.
+ logMsg.str(std::string());
+ logMsg << "DrillClientImpl::handleAuthentication: Successfully authenticated! [Details: "
+ << m_encryptionCtxt << " ]";
+ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << logMsg.str() << std::endl;)
m_io_service.reset();
return CONN_SUCCESS;
} else {
- DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::handleAuthentication: Authentication failed: "
- << m_saslResultCode << std::endl;)
+ logMsg << m_encryptionCtxt << ", Error: " << m_saslResultCode;
+ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << logMsg.str() << std::endl;)
+
// shuts down socket as well
- return handleConnError(CONN_AUTH_FAILED, "Authentication failed. Check connection parameters?");
+ logMsg << "]. Check connection parameters?";
+ return handleConnError(CONN_AUTH_FAILED, logMsg.str().c_str());
}
}
void DrillClientImpl::initiateAuthentication() {
exec::shared::SaslMessage response;
- m_saslResultCode = m_saslAuthenticator->init(m_serverAuthMechanisms, response);
-
+ m_saslResultCode = m_saslAuthenticator->init(m_serverAuthMechanisms, response, &m_encryptionCtxt);
switch (m_saslResultCode) {
case SASL_CONTINUE:
@@ -539,7 +741,7 @@ void DrillClientImpl::sendSaslResponse(const exec::shared::SaslMessage& response
boost::lock_guard<boost::mutex> lock(m_dcMutex);
const int32_t coordId = getNextCoordinationId();
rpc::OutBoundRpcMessage msg(exec::rpc::REQUEST, exec::user::SASL_MESSAGE, coordId, &response);
- sendSync(msg);
+ sendSyncCommon(msg);
if (m_pendingRequests++ == 0) {
getNextResult();
}
@@ -768,23 +970,23 @@ Handle* DrillClientImpl::sendMsg(boost::function<Handle*(int32_t)> handleFactory
phandle = handleFactory(coordId);
this->m_queryHandles[coordId]=phandle;
- connectionStatus_t cStatus=sendSync(out_msg);
+ connectionStatus_t cStatus = sendSyncCommon(out_msg);
if(cStatus == CONN_SUCCESS){
bool sendRequest=false;
DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Sent " << ::exec::user::RpcType_Name(type) << " request. " << "[" << m_connectedHost << "]" << "Coordination id = " << coordId << std::endl;)
- DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Sent " << ::exec::user::RpcType_Name(type) << " Coordination id = " << coordId << " query: " << phandle->getQuery() << std::endl;)
+ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Sent " << ::exec::user::RpcType_Name(type) << " Coordination id = " << coordId << " query: " << phandle->getQuery() << std::endl;)
- if(m_pendingRequests++==0){
- sendRequest=true;
- }else{
- DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Queuing " << ::exec::user::RpcType_Name(type) << " request to server" << std::endl;)
- DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Number of pending requests = " << m_pendingRequests << std::endl;)
- }
+ if(m_pendingRequests++==0){
+ sendRequest=true;
+ }else{
+ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Queuing " << ::exec::user::RpcType_Name(type) << " request to server" << std::endl;)
+ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Number of pending requests = " << m_pendingRequests << std::endl;)
+ }
if(sendRequest){
DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Sending " << ::exec::user::RpcType_Name(type) << " request. Number of pending requests = "
<< m_pendingRequests << std::endl;)
- getNextResult(); // async wait for results
+ getNextResult(); // async wait for results
}
}
@@ -854,76 +1056,319 @@ void DrillClientImpl::waitForResults(){
}
}
-status_t DrillClientImpl::readMsg(ByteBuf_t _buf,
- AllocatedBufferPtr* allocatedBuffer,
- rpc::InBoundRpcMessage& msg){
+/*
+ * Decode the length of the message from bufWithLen and then read entire message from the socket.
+ * Parameters:
+ * bufWithLenField - in param - buffer containing the length of the RPC message/encrypted chunk
+ * bufferWithDataAndLenBytes - out param - buffer pointer which points to memory allocated in this function and has the
+ * entire one RPC message / encrypted chunk along with the length of the message.
+ * Memory for this buffer is released by caller.
+ * lengthFieldLength - out param - bytes of bufWithLen which contains the length of the entire RPC message or
+ * encrypted chunk
+ * lengthDecodeHandler - in param - function pointer with length decoder to use. For encrypted chunk we use
+ * lengthDecode and for plain RPC message we use rpcLengthDecode.
+ * Return:
+ * status_t - QRY_SUCCESS - In case of success.
+ * - QRY_COMM_ERROR/QRY_INTERNAL_ERROR/QRY_CLIENT_OUTOFMEM - In cases of error.
+ */
+status_t DrillClientImpl::readLenBytesFromSocket(const ByteBuf_t bufWithLenField, AllocatedBufferPtr* bufferWithDataAndLenBytes,
+ uint32_t& lengthFieldLength, lengthDecoder lengthDecodeHandler) {
+
+ uint32_t rmsgLen = 0;
+ boost::system::error_code error;
+ *bufferWithDataAndLenBytes = NULL;
+
+ // Decode the length field
+ lengthFieldLength = (this->*lengthDecodeHandler)(bufWithLenField, rmsgLen);
+
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Length bytes = " << lengthFieldLength << std::endl;)
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Msg Length = " << rmsgLen << std::endl;)
+
+ if(rmsgLen>0) {
+ const size_t leftover = LEN_PREFIX_BUFLEN - lengthFieldLength;
+
+ // Allocate a buffer for reading all the bytes in bufWithLen and length number of bytes.
+ const size_t bufferSizeWithLenBytes = rmsgLen + lengthFieldLength;
+ *bufferWithDataAndLenBytes = new AllocatedBuffer(bufferSizeWithLenBytes);
+
+ if(*bufferWithDataAndLenBytes == NULL) {
+ return handleQryError(QRY_CLIENT_OUTOFMEM, getMessage(ERR_QRY_OUTOFMEM), NULL);
+ }
+
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readLenBytesFromSocket: Allocated and locked buffer: [ "
+ << *bufferWithDataAndLenBytes << ", size = " << bufferSizeWithLenBytes << " ]\n";)
+
+ // Copy the memory of bufWithLen into bufferWithLenBytesSize
+ memcpy((*bufferWithDataAndLenBytes)->m_pBuffer, bufWithLenField, LEN_PREFIX_BUFLEN);
+ const size_t bytesToRead = rmsgLen - leftover;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Copied bufWithLen into bufferWithLenBytes. "
+ << "Now reading data (rmsgLen - leftover) : " << bytesToRead
+ << std::endl;)
+
+ // Read the entire data left from socket and copy to currentBuffer.
+ const ByteBuf_t b = (*bufferWithDataAndLenBytes)->m_pBuffer + LEN_PREFIX_BUFLEN;
+ doReadFromSocket(b, bytesToRead, error);
+ } else {
+ return handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVREADLEN), NULL);
+ }
+
+ return error ? handleQryError(QRY_COMM_ERROR, getMessage(ERR_QRY_COMMERR, error.message().c_str()), NULL)
+ : QRY_SUCCESS;
+}
+
+
+/*
+ * Function to read entire RPC message from socket and decode it to InboundRpcMessage
+ * Parameters:
+ * inBuf - in param - Buffer containing the length bytes.
+ * allocatedBuffer - out param - Buffer containing the length bytes and entire RPC message bytes.
+ * msg - out param - Decoded InBoundRpcMessage from the bytes in allocatedBuffer
+ * Return:
+ * status_t - QRY_SUCCESS - In case of success.
+ * - QRY_COMM_ERROR/QRY_INTERNAL_ERROR/QRY_CLIENT_OUTOFMEM - In cases of error.
+ */
+status_t DrillClientImpl::readMsg(const ByteBuf_t inBuf, AllocatedBufferPtr* allocatedBuffer,
+ rpc::InBoundRpcMessage& msg){
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readMsg: Read message from buffer "
- << reinterpret_cast<int*>(_buf) << std::endl;)
- size_t leftover=0;
- uint32_t rmsgLen;
- AllocatedBufferPtr currentBuffer;
- *allocatedBuffer=NULL;
+ << reinterpret_cast<int*>(inBuf) << std::endl;)
+ *allocatedBuffer = NULL;
+ {
+ // We need to protect the readLength and read buffer, and the pending requests counter,
+ // but we don't have to keep the lock while we decode the rest of the buffer.
+ boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
+ uint32_t lengthFieldSize = 0;
+
+ // Read the message length and extract length size bytes to form InBoundRpcMessage
+ const status_t statusCode = readLenBytesFromSocket(inBuf, allocatedBuffer, lengthFieldSize,
+ &DrillClientImpl::rpcLengthDecode);
+
+ // Check for error conditions
+ if(QRY_SUCCESS != statusCode) {
+ Utils::freeBuffer(inBuf, LEN_PREFIX_BUFLEN);
+ return statusCode;
+ }
+
+ // Get the message size
+ size_t msgLen = (*allocatedBuffer)->m_bufSize;
+
+ // Read data successfully, now let's try to decode the buffer and form a valid RPC message.
+ // allocatedBuffer also contains the length bytes which is not needed by decodes so skip that part of buffer.
+ // We have it since in case of encryption the unwrap function expects it
+ if (!decode((*allocatedBuffer)->m_pBuffer + lengthFieldSize, msgLen - lengthFieldSize, msg)) {
+ Utils::freeBuffer(inBuf, LEN_PREFIX_BUFLEN);
+ return handleQryError(QRY_COMM_ERROR, getMessage(ERR_QRY_COMMERR, "Cannot decode server message"), NULL);
+ }
+
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Successfully created a RPC message with Coordination id: "
+ << msg.m_coord_id << std::endl;)
+ }
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readMsg: Free buffer "
+ << reinterpret_cast<int*>(inBuf) << std::endl;)
+ Utils::freeBuffer(inBuf, LEN_PREFIX_BUFLEN);
+ return QRY_SUCCESS;
+}
+
+
+/*
+ * Read ENCRYPT_LEN_PREFIX_BUFLEN bytes to decode length of one complete encrypted chunk. The length bytes are expected
+ * to be in network order. It is converted to host order and the value is stored in rmsgLen parameter.
+ * Parameters:
+ * inBuf - in param - ByteBuf_t containing atleast the length bytes.
+ * rmsgLen - out param - Contain the decoded value of length.
+ * Return:
+ * size_t - length bytes read to decode
+ */
+size_t DrillClientImpl::lengthDecode(const ByteBuf_t inBuf, uint32_t& rmsgLen) {
+ memcpy(&rmsgLen, inBuf, ENCRYPT_LEN_PREFIX_BUFLEN);
+ rmsgLen = ntohl(rmsgLen);
+ return ENCRYPT_LEN_PREFIX_BUFLEN;
+}
+
+/*
+ * Wrapper which uses RPC message length decoder to get length of one complete RPC message from _buf.
+ * Parameters:
+ * inBuf - in param - ByteBuf_t containing atleast the length bytes.
+ * rmsgLen - out param - Contain the decoded value of length.
+ * Return:
+ * size_t - length bytes read to decode
+ */
+size_t DrillClientImpl::rpcLengthDecode(const ByteBuf_t inBuf, uint32_t& rmsgLen) {
+ return rpc::lengthDecode(inBuf, rmsgLen);
+}
+
+
+/*
+ * Read all the encrypted chunk needed to form a complete RPC message. Read an entire chunk from network, decrypt it
+ * and put in a buffer. The same process is repeated until the entire buffer to form a completed RPC message is read.
+ * Parameters:
+ * inBuf - in param - ByteBuf_t containing atleast the length bytes.
+ * allocatedBuffer - out param - Buffer containing the entire RPC message bytes which is formed by reading all the
+ * required encrypted chunk from network and decrypting each individual chunk. The
+ * buffer memory is released by caller.
+.* msg - out param - InBoundRpcMessage formed from bytes in allocatedBuffer
+ * Return:
+ * status_t - QRY_SUCCESS - In case of success.
+ * - QRY_COMM_ERROR/QRY_INTERNAL_ERROR/QRY_CLIENT_OUTOFMEM - In cases of error.
+ */
+status_t DrillClientImpl::readAndDecryptMsg(const ByteBuf_t inBuf, AllocatedBufferPtr* allocatedBuffer,
+ rpc::InBoundRpcMessage& msg) {
+
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readAndDecryptMsg: Read message from buffer "
+ << reinterpret_cast<int*>(inBuf) << std::endl;)
+
+ size_t leftover = 0;
+ uint32_t rpcMsgLen = 0;
+ size_t bytes_read = 0;
+ uint32_t writeIndex = 0;
+ size_t bytesToRead = 0;
+
+ *allocatedBuffer = NULL;
+ boost::system::error_code error;
+ std::stringstream errorMsg;
+
{
// We need to protect the readLength and read buffer, and the pending requests counter,
// but we don't have to keep the lock while we decode the rest of the buffer.
boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
- std::size_t bytes_read = rpc::lengthDecode(_buf, rmsgLen);
- DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "len bytes = " << bytes_read << std::endl;)
- DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "rmsgLen = " << rmsgLen << std::endl;)
-
- if(rmsgLen>0){
- leftover = LEN_PREFIX_BUFLEN - bytes_read;
- // Allocate a buffer
- currentBuffer=new AllocatedBuffer(rmsgLen);
- DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readMsg: Allocated and locked buffer: [ "
- << currentBuffer << ", size = " << rmsgLen << " ]\n";)
- if(currentBuffer==NULL){
- Utils::freeBuffer(_buf, LEN_PREFIX_BUFLEN);
- return handleQryError(QRY_CLIENT_OUTOFMEM, getMessage(ERR_QRY_OUTOFMEM), NULL);
+
+ do{
+ AllocatedBufferPtr currentBuffer = NULL;
+ uint32_t lengthFieldSize = 0;
+ const status_t statusCode = readLenBytesFromSocket(inBuf, ¤tBuffer, lengthFieldSize,
+ &DrillClientImpl::lengthDecode);
+
+ if(QRY_SUCCESS != statusCode) {
+ Utils::freeBuffer(inBuf, LEN_PREFIX_BUFLEN);
+
+ // Release the buffer allocated to hold chunk
+ if(currentBuffer != NULL) {
+ Utils::freeBuffer(currentBuffer->m_pBuffer, currentBuffer->m_bufSize);
+ currentBuffer = NULL;
+ }
+ return statusCode;
}
- *allocatedBuffer=currentBuffer;
- if(leftover){
- memcpy(currentBuffer->m_pBuffer, _buf + bytes_read, leftover);
+
+ // read one chunk successfully. Let's try to decrypt the message
+ const char* unWrappedData = NULL;
+ uint32_t unWrappedLen = 0;
+ const int decryptResult = m_saslAuthenticator->unwrap(reinterpret_cast<const char*>(currentBuffer->m_pBuffer),
+ currentBuffer->m_bufSize, &unWrappedData, unWrappedLen);
+
+ if(SASL_OK != decryptResult) {
+
+ errorMsg << "Sasl unwrap failed for the buffer of size:" << currentBuffer->m_bufSize << " , Error: "
+ << decryptResult;
+
+ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::readAndDecryptMsg: "
+ << errorMsg.str() << std::endl;)
+
+ Utils::freeBuffer(inBuf, LEN_PREFIX_BUFLEN);
+
+ // Release the buffer allocated to hold chunk
+ Utils::freeBuffer(currentBuffer->m_pBuffer, currentBuffer->m_bufSize);
+ currentBuffer = NULL;
+ return handleQryError(QRY_COMM_ERROR,
+ getMessage(ERR_QRY_COMMERR, errorMsg.str().c_str()), NULL);
}
- DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "reading data (rmsgLen - leftover) : "
- << (rmsgLen - leftover) << std::endl;)
- ByteBuf_t b=currentBuffer->m_pBuffer + leftover;
- size_t bytesToRead=rmsgLen - leftover;
- boost::system::error_code error;
- while(1){
- size_t dataBytesRead=this->m_socket.read_some(
- boost::asio::buffer(b, bytesToRead),
- error);
- if(error) break;
- DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Data Message: actual bytes read = " << dataBytesRead << std::endl;)
- if(dataBytesRead==bytesToRead) break;
- bytesToRead-=dataBytesRead;
- b+=dataBytesRead;
+
+ // Check for case if the unWrappedLen is 0, since Cyrus SASL plugin verifies if the length of wrapped data
+ // is less than the length specified by prepended 4 octets as per RFC 4422/2222. If so it just returns
+ // and waits for more data
+ if(unWrappedLen == 0 || (unWrappedData == NULL)) {
+ errorMsg << "Sasl unwrap failed with mismatch in length of wrapped data and the prepended length value";
+ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::readAndDecryptMsg: " << errorMsg.str()
+ << std::endl;)
+
+ Utils::freeBuffer(inBuf, LEN_PREFIX_BUFLEN);
+
+ // Release the buffer allocated to hold chunk
+ Utils::freeBuffer(currentBuffer->m_pBuffer, currentBuffer->m_bufSize);
+ currentBuffer = NULL;
+ return handleQryError(QRY_COMM_ERROR,
+ getMessage(ERR_QRY_COMMERR, errorMsg.str().c_str()), NULL);
}
- if(!error){
- // read data successfully
- if (!decode(currentBuffer->m_pBuffer, rmsgLen, msg)) {
- Utils::freeBuffer(_buf, LEN_PREFIX_BUFLEN);
- return handleQryError(QRY_COMM_ERROR,
- getMessage(ERR_QRY_COMMERR, "Cannot decode server message"), NULL);;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readAndDecryptMsg: Successfully decrypted the buffer"
+ << " Sizes - Before Decryption = " << currentBuffer->m_bufSize
+ << " and After Decryption = " << unWrappedLen << std::endl;)
+
+ // Release the buffer allocated to hold chunk
+ Utils::freeBuffer(currentBuffer->m_pBuffer, currentBuffer->m_bufSize);
+ currentBuffer = NULL;
+
+ bytes_read = 0;
+ if(*allocatedBuffer == NULL) {
+ // This is the first chunk of the RPC message. We will decode the RPC message full length
+ bytes_read = rpcLengthDecode(reinterpret_cast<ByteBuf_t>(const_cast<char*>(unWrappedData)), rpcMsgLen);
+
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readAndDecryptMsg: Rpc Message Length bytes = "
+ << bytes_read << std::endl;)
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readAndDecryptMsg: Rpc Message Length = "
+ << rpcMsgLen << std::endl;)
+
+ if(rpcMsgLen == 0) {
+ Utils::freeBuffer(inBuf, LEN_PREFIX_BUFLEN);
+ return handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVREADLEN), NULL);
}
- DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Done decoding chunk. Coordination id: " <<msg.m_coord_id<< std::endl;)
- }else{
- Utils::freeBuffer(_buf, LEN_PREFIX_BUFLEN);
- return handleQryError(QRY_COMM_ERROR,
- getMessage(ERR_QRY_COMMERR, error.message().c_str()), NULL);
+ // Allocate a buffer for storing full RPC message. This is released by the caller
+ *allocatedBuffer = new AllocatedBuffer(rpcMsgLen);
+
+ if(*allocatedBuffer == NULL){
+ Utils::freeBuffer(inBuf, LEN_PREFIX_BUFLEN);
+ return handleQryError(QRY_CLIENT_OUTOFMEM, getMessage(ERR_QRY_OUTOFMEM), NULL);
+ }
+
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readAndDecryptMsg: Allocated and locked buffer:"
+ << "[ " << *allocatedBuffer << ", size = " << rpcMsgLen << " ]\n";)
+
+ bytesToRead = rpcMsgLen;
}
- }else{
- // got a message with an invalid read length.
- Utils::freeBuffer(_buf, LEN_PREFIX_BUFLEN);
- return handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVREADLEN), NULL);
+
+ // Update the leftover bytes that is not copied yet
+ leftover = unWrappedLen - bytes_read;
+
+ // Copy rest of decrypted message to the buffer. We can do this since it is assured that one
+ // entire decrypted chunk is part of the same RPC message.
+ if(leftover) {
+ memcpy((*allocatedBuffer)->m_pBuffer + writeIndex, unWrappedData + bytes_read, leftover);
+ }
+
+ // Update bytes left to read to form full RPC message.
+ bytesToRead -= leftover;
+ writeIndex += leftover;
+
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readAndDecryptMsg: Left to read unencrypted data"
+ << " of length (bytesToRead) : " << bytesToRead << std::endl;)
+
+ if(bytesToRead > 0) {
+ // Read synchronously buffer of size LEN_PREFIX_BUFLEN to get length of next chunk
+ doReadFromSocket(inBuf, LEN_PREFIX_BUFLEN, error);
+
+ if(error) {
+ Utils::freeBuffer(inBuf, LEN_PREFIX_BUFLEN);
+ return handleQryError(QRY_COMM_ERROR, getMessage(ERR_QRY_COMMERR, error.message().c_str()), NULL);
+ }
+ }
+ }while(bytesToRead > 0); // more chunks to read for entire RPC message
+
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readAndDecryptMsg: Done decrypting entire RPC message "
+ << " of length: " << rpcMsgLen << ". Now starting decode:" << std::endl;)
+
+ // Decode the buffer and form a RPC message
+ if (!decode((*allocatedBuffer)->m_pBuffer, rpcMsgLen, msg)) {
+ Utils::freeBuffer(inBuf, LEN_PREFIX_BUFLEN);
+ return handleQryError(QRY_COMM_ERROR, getMessage(ERR_QRY_COMMERR,
+ "Cannot decode server message into valid RPC message"), NULL);
}
+
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Successfully created a RPC message with Coordination id: "
+ << msg.m_coord_id << std::endl;)
}
- DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readMsg: Free buffer "
- << reinterpret_cast<int*>(_buf) << std::endl;)
- Utils::freeBuffer(_buf, LEN_PREFIX_BUFLEN);
+
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readAndDecryptMsg: Free buffer "
+ << reinterpret_cast<int*>(inBuf) << std::endl;)
+ Utils::freeBuffer(inBuf, LEN_PREFIX_BUFLEN);
return QRY_SUCCESS;
}
@@ -1364,15 +1809,15 @@ status_t DrillClientImpl::processServerMetaResult(AllocatedBufferPtr allocatedBu
std::map<int,DrillClientQueryHandle*>::const_iterator it=this->m_queryHandles.find(msg.m_coord_id);
if(it!=this->m_queryHandles.end()){
DrillClientServerMetaHandle* pHandle=static_cast<DrillClientServerMetaHandle*>((*it).second);
+ exec::user::GetServerMetaResp* resp = new exec::user::GetServerMetaResp();
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Received GetServerMetaResp result Handle " << msg.m_pbody.size() << std::endl;)
- exec::user::GetServerMetaResp resp;
- if (!(resp.ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size()))) {
+ if (!(resp->ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size()))) {
return handleQryError(QRY_COMM_ERROR, "Cannot decode GetServerMetaResp results", pHandle);
}
- if (resp.status() != exec::user::OK) {
- return handleQryError(QRY_FAILED, resp.error(), pHandle);
+ if (resp->status() != exec::user::OK) {
+ return handleQryError(QRY_FAILED, resp->error(), pHandle);
}
- pHandle->notifyListener(&(resp.server_meta()), NULL);
+ pHandle->notifyListener(&(resp->server_meta()), NULL);
DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "GetServerMetaResp result " << std::endl;)
}else{
return handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVQUERYID), NULL);
@@ -1484,11 +1929,11 @@ void DrillClientImpl::handleReadTimeout(const boost::system::error_code & err){
return;
}
-void DrillClientImpl::handleRead(ByteBuf_t _buf,
+void DrillClientImpl::handleRead(ByteBuf_t inBuf,
const boost::system::error_code& error,
size_t bytes_transferred) {
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handle Read from buffer "
- << reinterpret_cast<int*>(_buf) << std::endl;)
+ << reinterpret_cast<int*>(inBuf) << std::endl;)
if(DrillClientConfig::getQueryTimeout() > 0){
// Cancel the timeout if handleRead is called
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Cancel deadline timer.\n";)
@@ -1496,7 +1941,7 @@ void DrillClientImpl::handleRead(ByteBuf_t _buf,
}
if (error) {
// boost error
- Utils::freeBuffer(_buf, LEN_PREFIX_BUFLEN);
+ Utils::freeBuffer(inBuf, LEN_PREFIX_BUFLEN);
boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: ERR_QRY_COMMERR. "
"Boost Communication Error: " << error.message() << std::endl;)
@@ -1510,7 +1955,7 @@ void DrillClientImpl::handleRead(ByteBuf_t _buf,
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Getting new message" << std::endl;)
AllocatedBufferPtr allocatedBuffer=NULL;
- if(readMsg(_buf, &allocatedBuffer, msg)!=QRY_SUCCESS){
+ if((this->*m_fpCurrentReadMsgHandler)(inBuf, &allocatedBuffer, msg)!=QRY_SUCCESS){
delete allocatedBuffer;
if(m_pendingRequests!=0){
boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
@@ -1655,6 +2100,9 @@ status_t DrillClientImpl::validateResultMessage(const rpc::InBoundRpcMessage& ms
return QRY_SUCCESS;
}
+/*
+ * Called when there is failure in connect/send.
+ */
connectionStatus_t DrillClientImpl::handleConnError(connectionStatus_t status, const std::string& msg){
DrillClientError* pErr = new DrillClientError(status, DrillClientError::CONN_ERROR_START+status, msg);
m_pendingRequests=0;
@@ -1669,19 +2117,28 @@ connectionStatus_t DrillClientImpl::handleConnError(connectionStatus_t status, c
return status;
}
+/*
+ * Always called with NULL QueryHandle when there is any error while reading data from socket. Once enough data is read
+ * and a valid RPC message is formed then it can get called with NULL/valid QueryHandle depending on if QueryHandle is found
+ * for the created RPC message.
+ */
status_t DrillClientImpl::handleQryError(status_t status, const std::string& msg, DrillClientQueryHandle* pQueryHandle){
DrillClientError* pErr = new DrillClientError(status, DrillClientError::QRY_ERROR_START+status, msg);
- // set query error only if queries are running
+ // Set query error only if queries are running. If valid QueryHandle that means the bytes to form a valid
+ // RPC message was read successfully from socket. So there is no socket/connection issues.
if(pQueryHandle!=NULL){
m_pendingRequests--;
pQueryHandle->signalError(pErr);
- }else{
+ }else{ // This means error was while reading from socket, hence call broadcastError which eventually closes socket.
m_pendingRequests=0;
broadcastError(pErr);
}
return status;
}
+/*
+ * Always called with valid QueryHandle when there is any error processing Query related data.
+ */
status_t DrillClientImpl::handleQryError(status_t status,
const exec::shared::DrillPBError& e,
DrillClientQueryHandle* pQueryHandle){
@@ -1766,7 +2223,7 @@ void DrillClientImpl::sendAck(const rpc::InBoundRpcMessage& msg, bool isOk){
ack.set_ok(isOk);
rpc::OutBoundRpcMessage ack_msg(exec::rpc::RESPONSE, exec::user::ACK, msg.m_coord_id, &ack);
boost::lock_guard<boost::mutex> lock(m_dcMutex);
- sendSync(ack_msg);
+ sendSyncCommon(ack_msg);
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "ACK sent" << std::endl;)
}
@@ -1774,7 +2231,7 @@ void DrillClientImpl::sendCancel(const exec::shared::QueryId* pQueryId){
boost::lock_guard<boost::mutex> lock(m_dcMutex);
uint64_t coordId = this->getNextCoordinationId();
rpc::OutBoundRpcMessage cancel_msg(exec::rpc::REQUEST, exec::user::CANCEL_QUERY, coordId, pQueryId);
- sendSync(cancel_msg);
+ sendSyncCommon(cancel_msg);
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "CANCEL sent" << std::endl;)
}
@@ -1783,6 +2240,21 @@ void DrillClientImpl::shutdownSocket(){
boost::system::error_code ignorederr;
m_socket.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ignorederr);
m_bIsConnected=false;
+
+ // Delete the saslAuthenticatorImpl instance since connection is broken. It will recreated on next
+ // call to connect.
+ if(m_saslAuthenticator != NULL) {
+ delete m_saslAuthenticator;
+ m_saslAuthenticator = NULL;
+ }
+
+ // Reset the SASL states.
+ m_saslDone = false;
+ m_saslResultCode = SASL_OK;
+
+ // Reset the encryption context since connection is invalid
+ m_encryptionCtxt.reset();
+
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Socket shutdown" << std::endl;)
}
@@ -1799,8 +2271,6 @@ struct ServerMetaContext {
boost::mutex m_mutex;
boost::condition_variable m_cv;
- ServerMetaContext(): m_done(false), m_status(QRY_SUCCESS), m_serverMeta(), m_mutex(), m_cv() {};
-
static status_t listener(void* ctx, const exec::user::ServerMeta* serverMeta, DrillClientError* err) {
ServerMetaContext* context = static_cast<ServerMetaContext*>(ctx);
if (err) {
http://git-wip-us.apache.org/repos/asf/drill/blob/d11aba2e/contrib/native/client/src/clientlib/drillClientImpl.hpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/drillClientImpl.hpp b/contrib/native/client/src/clientlib/drillClientImpl.hpp
index d37076e..852233f 100644
--- a/contrib/native/client/src/clientlib/drillClientImpl.hpp
+++ b/contrib/native/client/src/clientlib/drillClientImpl.hpp
@@ -368,16 +368,17 @@ class DrillClientColumnResult: public DrillClientMetadataResult<Metadata::pfnCol
DrillClientMetadataResult<Metadata::pfnColumnMetadataListener, meta::ColumnMetadata, meta::DrillColumnMetadata, exec::user::GetColumnsResp>(client, coordId, "getColumns", listener, listenerCtx) {}
};
+// Length Decoder Function Pointer definition
+typedef size_t (DrillClientImpl::*lengthDecoder)(const ByteBuf_t, uint32_t&);
class DrillClientImpl : public DrillClientImplBase{
public:
DrillClientImpl():
- m_coordinationId(1),
m_handshakeVersion(0),
m_handshakeStatus(exec::user::SUCCESS),
m_bIsConnected(false),
m_saslAuthenticator(NULL),
- m_saslResultCode(SASL_OK),
+ m_saslResultCode(SASL_OK),
m_saslDone(false),
m_pendingRequests(0),
m_pError(NULL),
@@ -388,9 +389,11 @@ class DrillClientImpl : public DrillClientImplBase{
m_heartbeatTimer(m_io_service),
m_rbuf(NULL),
m_wbuf(MAX_SOCK_RD_BUFSIZE),
- m_bIsDirectConnection(false)
+ m_bIsDirectConnection(false)
{
m_coordinationId=rand()%1729+1;
+ m_fpCurrentReadMsgHandler = &DrillClientImpl::readMsg;
+ m_fpCurrentSendHandler = &DrillClientImpl::sendSyncPlain;
};
~DrillClientImpl(){
@@ -477,9 +480,10 @@ class DrillClientImpl : public DrillClientImplBase{
void handleHeartbeatTimeout(const boost::system::error_code & err); // send a heartbeat. If send fails, broadcast error, close connection and bail out.
int32_t getNextCoordinationId(){ return ++m_coordinationId; };
- // send synchronous messages
- //connectionStatus_t recvSync(rpc::InBoundRpcMessage& msg);
- connectionStatus_t sendSync(rpc::OutBoundRpcMessage& msg);
+ // synchronous message send handlers
+ connectionStatus_t sendSyncCommon(rpc::OutBoundRpcMessage& msg);
+ connectionStatus_t sendSyncPlain();
+ connectionStatus_t sendSyncEncrypted();
// handshake
connectionStatus_t recvHandshake();
void handleHandshake(ByteBuf_t b, const boost::system::error_code& err, std::size_t bytes_transferred );
@@ -488,10 +492,16 @@ class DrillClientImpl : public DrillClientImplBase{
void startMessageListener();
// Query results
void getNextResult();
- status_t readMsg(
- ByteBuf_t _buf,
- AllocatedBufferPtr* allocatedBuffer,
- rpc::InBoundRpcMessage& msg);
+ // Read Message Handlers
+ status_t readMsg(const ByteBuf_t inBuf, AllocatedBufferPtr* allocatedBuffer, rpc::InBoundRpcMessage& msg);
+ status_t readAndDecryptMsg(const ByteBuf_t inBuf, AllocatedBufferPtr* allocatedBuffer, rpc::InBoundRpcMessage& msg);
+ status_t readLenBytesFromSocket(const ByteBuf_t bufWithLenField, AllocatedBufferPtr* bufferWithDataAndLenBytes,
+ uint32_t& lengthFieldLength, lengthDecoder lengthDecodeHandler);
+ void doReadFromSocket(ByteBuf_t inBuf, size_t bytesToRead, boost::system::error_code& errorCode);
+ void doWriteToSocket(const char* dataPtr, size_t bytesToWrite, boost::system::error_code& errorCode);
+ // Length decode handlers
+ size_t lengthDecode(const ByteBuf_t inBuf, uint32_t& rmsgLen);
+ size_t rpcLengthDecode(const ByteBuf_t inBuf, uint32_t& rmsgLen);
status_t processQueryResult(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg);
status_t processQueryData(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg);
status_t processCancelledQueryResult( exec::shared::QueryId& qid, exec::shared::QueryResult* qr);
@@ -506,7 +516,7 @@ class DrillClientImpl : public DrillClientImplBase{
status_t processQueryStatusResult( exec::shared::QueryResult* qr,
DrillClientQueryResult* pDrillClientQueryResult);
void handleReadTimeout(const boost::system::error_code & err);
- void handleRead(ByteBuf_t _buf, const boost::system::error_code & err, size_t bytes_transferred) ;
+ void handleRead(ByteBuf_t inBuf, const boost::system::error_code & err, size_t bytes_transferred) ;
status_t validateDataMessage(const rpc::InBoundRpcMessage& msg, const exec::shared::QueryData& qd, std::string& valError);
status_t validateResultMessage(const rpc::InBoundRpcMessage& msg, const exec::shared::QueryResult& qr, std::string& valError);
connectionStatus_t handleConnError(connectionStatus_t status, const std::string& msg);
@@ -540,6 +550,7 @@ class DrillClientImpl : public DrillClientImplBase{
void finishAuthentication();
void shutdownSocket();
+ bool clientNeedsEncryption(const DrillUserProperties* userProperties);
int32_t m_coordinationId;
int32_t m_handshakeVersion;
@@ -557,6 +568,14 @@ class DrillClientImpl : public DrillClientImplBase{
boost::mutex m_saslMutex; // mutex to protect m_saslDone
boost::condition_variable m_saslCv; // to signal completion of SASL exchange
+ // Used for encryption and is set when server notifies in first handshake response.
+ EncryptionContext m_encryptionCtxt;
+
+ // Function pointer for read and send handler. By default these are referred to handler for plain message read/send. When encryption is enabled
+ // then after successful handshake these pointers refer to handler for encrypted message read/send over wire.
+ status_t (DrillClientImpl::*m_fpCurrentReadMsgHandler)(ByteBuf_t inBuf, AllocatedBufferPtr* allocatedBuffer, rpc::InBoundRpcMessage& msg);
+ connectionStatus_t (DrillClientImpl::*m_fpCurrentSendHandler)();
+
std::string m_connectStr;
//
http://git-wip-us.apache.org/repos/asf/drill/blob/d11aba2e/contrib/native/client/src/clientlib/rpcMessage.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/rpcMessage.cpp b/contrib/native/client/src/clientlib/rpcMessage.cpp
index 13cd7a8..f64167f 100644
--- a/contrib/native/client/src/clientlib/rpcMessage.cpp
+++ b/contrib/native/client/src/clientlib/rpcMessage.cpp
@@ -47,7 +47,7 @@ std::size_t lengthDecode(const uint8_t* buf, uint32_t& length) {
// read the frame to get the length of the message and then
- CodedInputStream cis(buf, 5); // read 5 bytes at most
+ CodedInputStream cis(buf, LEN_PREFIX_BUFLEN); // read LEN_PREFIX_BUFLEN bytes at most
int startPos(cis.CurrentPosition()); // for debugging
if (!cis.ReadVarint32(&length)) {
http://git-wip-us.apache.org/repos/asf/drill/blob/d11aba2e/contrib/native/client/src/clientlib/rpcMessage.hpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/rpcMessage.hpp b/contrib/native/client/src/clientlib/rpcMessage.hpp
index 15487e9..43bcaeb 100644
--- a/contrib/native/client/src/clientlib/rpcMessage.hpp
+++ b/contrib/native/client/src/clientlib/rpcMessage.hpp
@@ -54,7 +54,6 @@ std::size_t lengthDecode(const uint8_t* buf, uint32_t& length);
bool decode(const uint8_t* buf, int length, InBoundRpcMessage& msg);
bool encode(DataBuf& buf, const OutBoundRpcMessage& msg);
-
} // namespace rpc
} // namespace Drill
http://git-wip-us.apache.org/repos/asf/drill/blob/d11aba2e/contrib/native/client/src/clientlib/saslAuthenticatorImpl.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/saslAuthenticatorImpl.cpp b/contrib/native/client/src/clientlib/saslAuthenticatorImpl.cpp
index e7e2ba5..c5dc3ac 100644
--- a/contrib/native/client/src/clientlib/saslAuthenticatorImpl.cpp
+++ b/contrib/native/client/src/clientlib/saslAuthenticatorImpl.cpp
@@ -32,6 +32,7 @@ static const std::string DEFAULT_SERVICE_NAME = "drill";
static const std::string KERBEROS_SIMPLE_NAME = "kerberos";
static const std::string KERBEROS_SASL_NAME = "gssapi";
static const std::string PLAIN_NAME = "plain";
+static const int PREFERRED_MIN_SSF = 56;
const std::map<std::string, std::string> SaslAuthenticatorImpl::MECHANISM_MAPPING = boost::assign::map_list_of
(KERBEROS_SIMPLE_NAME, KERBEROS_SASL_NAME)
@@ -42,8 +43,7 @@ boost::mutex SaslAuthenticatorImpl::s_mutex;
bool SaslAuthenticatorImpl::s_initialized = false;
SaslAuthenticatorImpl::SaslAuthenticatorImpl(const DrillUserProperties* const properties) :
- m_pUserProperties(properties), m_pConnection(NULL), m_ppwdSecret(NULL) {
-
+ m_pUserProperties(properties), m_pConnection(NULL), m_ppwdSecret(NULL), m_pEncryptCtxt(NULL) {
if (!s_initialized) {
boost::lock_guard<boost::mutex> lock(SaslAuthenticatorImpl::s_mutex);
if (!s_initialized) {
@@ -85,6 +85,9 @@ SaslAuthenticatorImpl::~SaslAuthenticatorImpl() {
sasl_dispose(&m_pConnection);
}
m_pConnection = NULL;
+
+ // Memory is owned by DrillClientImpl object
+ m_pEncryptCtxt = NULL;
}
typedef int (*sasl_callback_proc_t)(void); // see sasl_callback_ft
@@ -109,8 +112,14 @@ int SaslAuthenticatorImpl::passwordCallback(sasl_conn_t *conn, void *context, in
return SASL_OK;
}
-int SaslAuthenticatorImpl::init(const std::vector<std::string>& mechanisms, exec::shared::SaslMessage& response) {
- // find and set parameters
+int SaslAuthenticatorImpl::init(const std::vector<std::string>& mechanisms, exec::shared::SaslMessage& response,
+ EncryptionContext* const encryptCtxt) {
+
+ // EncryptionContext should not be NULL here.
+ assert(encryptCtxt != NULL);
+ m_pEncryptCtxt = encryptCtxt;
+
+ // find and set parameters
std::string authMechanismToUse;
std::string serviceName;
std::string serviceHost;
@@ -163,6 +172,9 @@ int SaslAuthenticatorImpl::init(const std::vector<std::string>& mechanisms, exec
<< saslResult << std::endl;)
if (saslResult != SASL_OK) return saslResult;
+ // set the security properties
+ setSecurityProps();
+
// initiate; for now, pass in only one mechanism
const char *out;
unsigned outlen;
@@ -204,4 +216,103 @@ int SaslAuthenticatorImpl::step(const exec::shared::SaslMessage& challenge, exec
return saslResult;
}
+/*
+ * Verify that the negotiated value is correct as per system configurations. Also retrieves and set the rawWrapSendSize
+ */
+int SaslAuthenticatorImpl::verifyAndUpdateSaslProps() {
+ const int* negotiatedValue;
+ int result = SASL_OK;
+
+ if(SASL_OK != (result = sasl_getprop(m_pConnection, SASL_SSF, reinterpret_cast<const void **>(&negotiatedValue)))) {
+ return result;
+ }
+
+ // If the negotiated SSF value is less than required one that means we have negotiated for weaker security level.
+ if(*negotiatedValue < PREFERRED_MIN_SSF) {
+ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "SaslAuthenticatorImpl::verifyAndUpdateSaslProps: "
+ << "Negotiated SSF parameter:" << *negotiatedValue
+ << " is less than Preferred one: " << PREFERRED_MIN_SSF << std::endl;)
+ result = SASL_BADPARAM;
+ return result;
+ }
+
+ if(SASL_OK != (result = sasl_getprop(m_pConnection, SASL_MAXOUTBUF,
+ reinterpret_cast<const void **>(&negotiatedValue)))) {
+ return result;
+ }
+
+ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "SaslAuthenticatorImpl::verifyAndUpdateSaslProps: "
+ << "Negotiated Raw Wrap Buffer size: " << *negotiatedValue << std::endl;)
+
+ m_pEncryptCtxt->setWrapSizeLimit(*negotiatedValue);
+ return result;
+}
+
+/*
+ * Set the security properties structure with all the needed parameters for encryption so that
+ * a proper mechanism with and cipher is chosen after handshake.
+ *
+ * PREFERRED_MIN_SSF is chosen to be 56 since that is the max_ssf supported by gssapi. We want
+ * stronger cipher algorithm to be used all the time (preferably AES-256), so leaving MAX_SSF as UINT_MAX
+ */
+void SaslAuthenticatorImpl::setSecurityProps() const{
+
+ if(m_pEncryptCtxt->isEncryptionReqd()) {
+ // set the security properties.
+ sasl_security_properties_t secprops;
+ secprops.min_ssf = PREFERRED_MIN_SSF;
+ secprops.max_ssf = UINT_MAX;
+ secprops.maxbufsize = m_pEncryptCtxt->getMaxWrappedSize();
+ secprops.property_names = NULL;
+ secprops.property_values = NULL;
+ // Only specify NOPLAINTEXT for encryption since the mechanism is selected based on name not
+ // the security properties configured here.
+ secprops.security_flags = SASL_SEC_NOPLAINTEXT;
+
+ // Set the security properties in the connection context.
+ sasl_setprop(m_pConnection, SASL_SEC_PROPS, &secprops);
+ }
+}
+
+/*
+ * Encodes the input data by calling the sasl_encode provided by Cyrus-SASL library which internally calls
+ * the wrap function of the chosen mechanism. The output buffer will have first 4 octets as the length of
+ * encrypted data in network byte order.
+ *
+ * Parameters:
+ * dataToWrap - in param - pointer to data buffer to encrypt.
+ * dataToWrapLen - in param - length of data buffer to encrypt.
+ * output - out param - pointer to data buffer with encrypted data. Allocated by Cyrus-SASL
+ * wrappedLen - out param - length of data after encryption
+ * Returns:
+ * SASL_OK - success (returns input if no layer negotiated)
+ * SASL_NOTDONE - security layer negotiation not finished
+ * SASL_BADPARAM - inputlen is greater than the SASL_MAXOUTBUF
+ */
+int SaslAuthenticatorImpl::wrap(const char* dataToWrap, const int& dataToWrapLen, const char** output,
+ uint32_t& wrappedLen) {
+ return sasl_encode(m_pConnection, dataToWrap, dataToWrapLen, output, &wrappedLen);
+}
+
+/*
+ * Decodes the input data by calling the sasl_decode provided by Cyrus-SASL library which internally calls
+ * the wrap function of the chosen mechanism. The input buffer will have first 4 octets as the length of
+ * encrypted data in network byte order.
+ *
+ * Parameters:
+ * dataToUnWrap - in param - pointer to data buffer to decrypt.
+ * dataToUnWrapLen - in param - length of data buffer to decrypt.
+ * output - out param - pointer to data buffer with decrypted data. Allocated by Cyrus-SASL
+ * unWrappedLen - out param - length of data after decryption
+ * Returns:
+ * SASL_OK - success (returns input if no layer negotiated)
+ * SASL_NOTDONE - security layer negotiation not finished
+ * SASL_BADPARAM - inputlen is greater than the SASL_MAXOUTBUF
+ */
+int SaslAuthenticatorImpl::unwrap(const char* dataToUnWrap, const int& dataToUnWrapLen, const char** output,
+ uint32_t& unWrappedLen) {
+ return sasl_decode(m_pConnection, dataToUnWrap, dataToUnWrapLen, output, &unWrappedLen);
+}
+
+
} /* namespace Drill */
http://git-wip-us.apache.org/repos/asf/drill/blob/d11aba2e/contrib/native/client/src/clientlib/saslAuthenticatorImpl.hpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/saslAuthenticatorImpl.hpp b/contrib/native/client/src/clientlib/saslAuthenticatorImpl.hpp
index 5e36ee1..53fe4e3 100644
--- a/contrib/native/client/src/clientlib/saslAuthenticatorImpl.hpp
+++ b/contrib/native/client/src/clientlib/saslAuthenticatorImpl.hpp
@@ -24,6 +24,7 @@
#include <vector>
#include "drill/drillClient.hpp"
#include "UserBitShared.pb.h"
+#include "utils.hpp"
#include "sasl/sasl.h"
#include "sasl/saslplug.h"
@@ -38,10 +39,17 @@ public:
~SaslAuthenticatorImpl();
- int init(const std::vector<std::string>& mechanisms, exec::shared::SaslMessage& response);
+ int init(const std::vector<std::string>& mechanisms, exec::shared::SaslMessage& response,
+ EncryptionContext* const encryptCtxt);
int step(const exec::shared::SaslMessage& challenge, exec::shared::SaslMessage& response) const;
+ int verifyAndUpdateSaslProps();
+
+ int wrap(const char* dataToWrap, const int& dataToWrapLen, const char** output, uint32_t& wrappedLen);
+
+ int unwrap(const char* dataToUnWrap, const int& dataToUnWrapLen, const char** output, uint32_t& unWrappedLen);
+
private:
static const std::map<std::string, std::string> MECHANISM_MAPPING;
@@ -53,11 +61,13 @@ private:
sasl_conn_t *m_pConnection;
std::string m_username;
sasl_secret_t *m_ppwdSecret;
+ EncryptionContext *m_pEncryptCtxt;
static int passwordCallback(sasl_conn_t *conn, void *context, int id, sasl_secret_t **psecret);
static int userNameCallback(void *context, int id, const char **result, unsigned int *len);
+ void setSecurityProps() const;
};
} /* namespace Drill */
http://git-wip-us.apache.org/repos/asf/drill/blob/d11aba2e/contrib/native/client/src/clientlib/utils.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/utils.cpp b/contrib/native/client/src/clientlib/utils.cpp
index d3c8f08..11aa2c2 100644
--- a/contrib/native/client/src/clientlib/utils.cpp
+++ b/contrib/native/client/src/clientlib/utils.cpp
@@ -111,4 +111,53 @@ AllocatedBuffer::~AllocatedBuffer(){
m_bufSize = 0;
}
+EncryptionContext::EncryptionContext(const bool& encryptionReqd, const int& maxWrappedSize, const int& wrapSizeLimit) {
+ this->m_bEncryptionReqd = encryptionReqd;
+ this->m_maxWrappedSize = maxWrappedSize;
+ this->m_wrapSizeLimit = wrapSizeLimit;
+}
+
+EncryptionContext::EncryptionContext() {
+ this->m_bEncryptionReqd = false;
+ this->m_maxWrappedSize = 65536;
+ this->m_wrapSizeLimit = 0;
+}
+
+void EncryptionContext::setEncryptionReqd(const bool& encryptionReqd) {
+ this->m_bEncryptionReqd = encryptionReqd;
+}
+
+void EncryptionContext::setMaxWrappedSize(const int& maxWrappedSize) {
+ this->m_maxWrappedSize = maxWrappedSize;
+}
+
+void EncryptionContext::setWrapSizeLimit(const int& wrapSizeLimit) {
+ this->m_wrapSizeLimit = wrapSizeLimit;
+}
+
+bool EncryptionContext::isEncryptionReqd() const {
+ return m_bEncryptionReqd;
+}
+
+int EncryptionContext::getMaxWrappedSize() const {
+ return m_maxWrappedSize;
+}
+
+int EncryptionContext::getWrapSizeLimit() const {
+ return m_wrapSizeLimit;
+}
+
+void EncryptionContext::reset() {
+ this->m_bEncryptionReqd = false;
+ this->m_maxWrappedSize = 65536;
+ this->m_wrapSizeLimit = 0;
+}
+
+std::ostream& operator<<(std::ostream &contextStream, const EncryptionContext& context) {
+ contextStream << " Encryption: " << (context.isEncryptionReqd() ? "enabled" : "disabled");
+ contextStream << " ,MaxWrappedSize: " << context.getMaxWrappedSize();
+ contextStream << " ,WrapSizeLimit: " << context.getWrapSizeLimit();
+ return contextStream;
+}
+
} // namespace
http://git-wip-us.apache.org/repos/asf/drill/blob/d11aba2e/contrib/native/client/src/clientlib/utils.hpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/utils.hpp b/contrib/native/client/src/clientlib/utils.hpp
index 4cd8fa5..d30794c 100644
--- a/contrib/native/client/src/clientlib/utils.hpp
+++ b/contrib/native/client/src/clientlib/utils.hpp
@@ -98,6 +98,38 @@ class DECLSPEC_DRILL_CLIENT Utils{
}; // Utils
+/*
+ * Encryption related configuration parameters. The member's are updated with value received from server
+ * and also after the SASL Handshake is done.
+ */
+class EncryptionContext {
+
+ bool m_bEncryptionReqd;
+ int m_maxWrappedSize;
+ int m_wrapSizeLimit;
+
+public:
+ EncryptionContext();
+
+ EncryptionContext(const bool& encryptionReqd, const int& maxWrappedSize, const int& wrapSizeLimit);
+
+ void setEncryptionReqd(const bool& encryptionReqd);
+
+ void setMaxWrappedSize(const int& maxWrappedSize);
+
+ void setWrapSizeLimit(const int& wrapSizeLimit);
+
+ bool isEncryptionReqd() const;
+
+ int getMaxWrappedSize() const;
+
+ int getWrapSizeLimit() const;
+
+ void reset();
+
+ friend std::ostream& operator<<(std::ostream &contextStream, const EncryptionContext& context);
+};
+
} // namespace Drill
#endif
http://git-wip-us.apache.org/repos/asf/drill/blob/d11aba2e/contrib/native/client/src/include/drill/common.hpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/include/drill/common.hpp b/contrib/native/client/src/include/drill/common.hpp
index ed0a1ed..5401c75 100644
--- a/contrib/native/client/src/include/drill/common.hpp
+++ b/contrib/native/client/src/include/drill/common.hpp
@@ -56,6 +56,7 @@
#define LENGTH_PREFIX_MAX_LENGTH 5
#define LEN_PREFIX_BUFLEN LENGTH_PREFIX_MAX_LENGTH
+#define ENCRYPT_LEN_PREFIX_BUFLEN 4
#define MAX_CONNECT_STR 4096
#define MAX_SOCK_RD_BUFSIZE 1024
@@ -169,6 +170,7 @@ typedef enum{
#define USERPROP_AUTH_MECHANISM "auth"
#define USERPROP_SERVICE_NAME "service_name"
#define USERPROP_SERVICE_HOST "service_host"
+#define USERPROP_SASL_ENCRYPT "sasl_encrypt"
// Bitflags to describe user properties
// Used in DrillUserProperties::USER_PROPERTIES