You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pa...@apache.org on 2017/06/12 01:27:40 UTC
[2/4] drill git commit: DRILL-5541: C++ Client Crashes During Simple
"Man in the Middle" Attack Test with Exploitable Write AV This closes #850
DRILL-5541: C++ Client Crashes During Simple "Man in the Middle" Attack Test with Exploitable Write AV
This closes #850
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/5d2eedff
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/5d2eedff
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/5d2eedff
Branch: refs/heads/master
Commit: 5d2eedff8329b687fb7a5a498971503b82260946
Parents: adb79fb
Author: Rob Wu <ro...@gmail.com>
Authored: Mon Jun 5 14:06:33 2017 -0700
Committer: Parth Chandra <pa...@apache.org>
Committed: Sun Jun 11 17:28:52 2017 -0700
----------------------------------------------------------------------
.../client/src/clientlib/drillClientImpl.cpp | 46 ++++++++++++++++++--
.../client/src/clientlib/drillClientImpl.hpp | 40 +++++++++--------
.../native/client/src/clientlib/rpcMessage.cpp | 12 +++++
3 files changed, 75 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/5d2eedff/contrib/native/client/src/clientlib/drillClientImpl.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/drillClientImpl.cpp b/contrib/native/client/src/clientlib/drillClientImpl.cpp
index 0dee309..7015b5a 100644
--- a/contrib/native/client/src/clientlib/drillClientImpl.cpp
+++ b/contrib/native/client/src/clientlib/drillClientImpl.cpp
@@ -1580,6 +1580,9 @@ status_t DrillClientImpl::processPreparedStatement(AllocatedBufferPtr allocatedB
std::map<int,DrillClientQueryHandle*>::const_iterator it=this->m_queryHandles.find(msg.m_coord_id);
if(it!=this->m_queryHandles.end()){
DrillClientPrepareHandle* pDrillClientPrepareHandle=static_cast<DrillClientPrepareHandle*>((*it).second);
+ if (!validateResultRPCType(pDrillClientPrepareHandle, msg)){
+ return handleQryError(QRY_COMM_ERROR, "Unexpected RPC Type for prepared statement.", pDrillClientPrepareHandle);
+ }
exec::user::CreatePreparedStatementResp resp;
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Received Prepared Statement Handle " << msg.m_pbody.size() << std::endl;)
if (!resp.ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size())) {
@@ -1588,7 +1591,9 @@ status_t DrillClientImpl::processPreparedStatement(AllocatedBufferPtr allocatedB
if (resp.has_status() && resp.status() != exec::user::OK) {
return handleQryError(QRY_FAILED, resp.error(), pDrillClientPrepareHandle);
}
- pDrillClientPrepareHandle->setupPreparedStatement(resp.prepared_statement());
+ if (QRY_SUCCESS != pDrillClientPrepareHandle->setupPreparedStatement(resp.prepared_statement())){
+ return handleQryError(QRY_FAILED, "Error during prepared statement setup.", pDrillClientPrepareHandle);
+ }
pDrillClientPrepareHandle->notifyListener(pDrillClientPrepareHandle, NULL);
DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Prepared Statement handle - " << resp.prepared_statement().server_handle().DebugString() << std::endl;)
}else{
@@ -1619,6 +1624,9 @@ status_t DrillClientImpl::processCatalogsResult(AllocatedBufferPtr allocatedBuff
std::map<int,DrillClientQueryHandle*>::const_iterator it=this->m_queryHandles.find(msg.m_coord_id);
if(it!=this->m_queryHandles.end()){
DrillClientCatalogResult* pHandle=static_cast<DrillClientCatalogResult*>((*it).second);
+ if (!validateResultRPCType(pHandle, msg)){
+ return handleQryError(QRY_COMM_ERROR, "Unexpected RPC Type for getcatalogs results.", pHandle);
+ }
exec::user::GetCatalogsResp* resp = new exec::user::GetCatalogsResp;
pHandle->attachMetadataResult(resp);
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Received GetCatalogs result Handle " << msg.m_pbody.size() << std::endl;)
@@ -1667,6 +1675,9 @@ status_t DrillClientImpl::processSchemasResult(AllocatedBufferPtr allocatedBuffe
std::map<int,DrillClientQueryHandle*>::const_iterator it=this->m_queryHandles.find(msg.m_coord_id);
if(it!=this->m_queryHandles.end()){
DrillClientSchemaResult* pHandle=static_cast<DrillClientSchemaResult*>((*it).second);
+ if (!validateResultRPCType(pHandle, msg)){
+ return handleQryError(QRY_COMM_ERROR, "Unexpected RPC Type for getschemas results.", pHandle);
+ }
exec::user::GetSchemasResp* resp = new exec::user::GetSchemasResp();
pHandle->attachMetadataResult(resp);
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Received GetSchemasResp result Handle " << msg.m_pbody.size() << std::endl;)
@@ -1715,6 +1726,9 @@ status_t DrillClientImpl::processTablesResult(AllocatedBufferPtr allocatedBuffer
std::map<int,DrillClientQueryHandle*>::const_iterator it=this->m_queryHandles.find(msg.m_coord_id);
if(it!=this->m_queryHandles.end()){
DrillClientTableResult* pHandle=static_cast<DrillClientTableResult*>((*it).second);
+ if (!validateResultRPCType(pHandle, msg)){
+ return handleQryError(QRY_COMM_ERROR, "Unexpected RPC Type for gettables results.", pHandle);
+ }
exec::user::GetTablesResp* resp = new exec::user::GetTablesResp();
pHandle->attachMetadataResult(resp);
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Received GeTablesResp result Handle " << msg.m_pbody.size() << std::endl;)
@@ -1762,6 +1776,9 @@ status_t DrillClientImpl::processColumnsResult(AllocatedBufferPtr allocatedBuffe
std::map<int,DrillClientQueryHandle*>::const_iterator it=this->m_queryHandles.find(msg.m_coord_id);
if(it!=this->m_queryHandles.end()){
DrillClientColumnResult* pHandle=static_cast<DrillClientColumnResult*>((*it).second);
+ if (!validateResultRPCType(pHandle, msg)){
+ return handleQryError(QRY_COMM_ERROR, "Unexpected RPC Type for getcolumns results.", pHandle);
+ }
exec::user::GetColumnsResp* resp = new exec::user::GetColumnsResp();
pHandle->attachMetadataResult(resp);
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Received GetColumnsResp result Handle " << msg.m_pbody.size() << std::endl;)
@@ -1809,6 +1826,9 @@ status_t DrillClientImpl::processServerMetaResult(AllocatedBufferPtr allocatedBu
std::map<int,DrillClientQueryHandle*>::const_iterator it=this->m_queryHandles.find(msg.m_coord_id);
if(it!=this->m_queryHandles.end()){
DrillClientServerMetaHandle* pHandle=static_cast<DrillClientServerMetaHandle*>((*it).second);
+ if (!validateResultRPCType(pHandle, msg)){
+ return handleQryError(QRY_COMM_ERROR, "Unexpected RPC Type for GetServerMetaResp results.", pHandle);
+ }
exec::user::GetServerMetaResp* resp = new exec::user::GetServerMetaResp();
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Received GetServerMetaResp result Handle " << msg.m_pbody.size() << std::endl;)
if (!(resp->ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size()))) {
@@ -1863,7 +1883,8 @@ status_t DrillClientImpl::processQueryStatusResult(exec::shared::QueryResult* qr
case exec::shared::QueryResult_QueryState_FAILED:
{
// get the error message from protobuf and handle errors
- ret=handleQryError(ret, qr->error(0), pDrillClientQueryResult);
+ ret = (0 == qr->error_size()) ?
+ handleQryError(ret, "Unknown protobuf error.", pDrillClientQueryResult) : handleQryError(ret, qr->error(0), pDrillClientQueryResult);
}
break;
// m_pendingRequests should be decremented when the query is
@@ -2100,6 +2121,20 @@ status_t DrillClientImpl::validateResultMessage(const rpc::InBoundRpcMessage& ms
return QRY_SUCCESS;
}
+bool DrillClientImpl::validateResultRPCType(DrillClientQueryHandle* pQueryHandle, const rpc::InBoundRpcMessage& msg) {
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::validateResultRPCType" << std::endl;)
+ if (NULL != pQueryHandle) {
+ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG)
+ << "DrillClientImpl::validateResultRPCType: Expected RPC Type: "
+ << pQueryHandle->getExpectedRPCType()
+ << " inbound RPC Type: "
+ << msg.m_rpc_type
+ << std::endl;)
+ return (pQueryHandle->getExpectedRPCType() == msg.m_rpc_type);
+ }
+ return false;
+}
+
/*
* Called when there is failure in connect/send.
*/
@@ -2589,8 +2624,11 @@ status_t DrillClientPrepareHandle::setupPreparedStatement(const exec::user::Prep
}
// Copy server handle
- this->m_preparedStatementHandle.CopyFrom(pstmt.server_handle());
- return QRY_SUCCESS;
+ if (pstmt.has_server_handle()){
+ this->m_preparedStatementHandle.CopyFrom(pstmt.server_handle());
+ return QRY_SUCCESS;
+ }
+ return QRY_FAILURE;
}
void DrillClientPrepareHandle::clearAndDestroy(){
http://git-wip-us.apache.org/repos/asf/drill/blob/5d2eedff/contrib/native/client/src/clientlib/drillClientImpl.hpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/drillClientImpl.hpp b/contrib/native/client/src/clientlib/drillClientImpl.hpp
index 852233f..efa4e66 100644
--- a/contrib/native/client/src/clientlib/drillClientImpl.hpp
+++ b/contrib/native/client/src/clientlib/drillClientImpl.hpp
@@ -109,15 +109,16 @@ class DrillClientImplBase{
class DrillClientQueryHandle{
friend class DrillClientImpl;
public:
- DrillClientQueryHandle(DrillClientImpl& client, int32_t coordId, const std::string& query, void* context):
+ DrillClientQueryHandle(DrillClientImpl& client, int32_t coordId, const std::string& query, void* context, int in_expectedRPCType) :
m_client(client),
m_coordinationId(coordId),
m_query(query),
- m_status(QRY_SUCCESS),
+ m_status(QRY_SUCCESS),
m_bCancel(false),
m_bHasError(false),
+ m_expectedRPCType(in_expectedRPCType),
m_pError(NULL),
- m_pApplicationContext(context){
+ m_pApplicationContext(context){
};
virtual ~DrillClientQueryHandle(){
@@ -137,7 +138,7 @@ class DrillClientQueryHandle{
void setQueryStatus(status_t s){ m_status = s;}
status_t getQueryStatus() const { return m_status;}
inline DrillClientImpl& client() const { return m_client; };
-
+ int getExpectedRPCType() const { return m_expectedRPCType; };
inline void* getApplicationContext() const { return m_pApplicationContext; }
protected:
@@ -153,7 +154,7 @@ class DrillClientQueryHandle{
status_t m_status;
bool m_bCancel;
bool m_bHasError;
-
+ int m_expectedRPCType;
const DrillClientError* m_pError;
void* m_pApplicationContext;
@@ -163,9 +164,9 @@ template<typename Listener, typename ListenerValue>
class DrillClientBaseHandle: public DrillClientQueryHandle {
friend class DrillClientImpl;
public:
- DrillClientBaseHandle(DrillClientImpl& client, int32_t coordId, const std::string& query, Listener listener, void* context):
- DrillClientQueryHandle(client, coordId, query, context),
- m_pApplicationListener(listener){
+ DrillClientBaseHandle(DrillClientImpl& client, int32_t coordId, const std::string& query, Listener listener, void* context, int in_expectedRPCType) :
+ DrillClientQueryHandle(client, coordId, query, context, in_expectedRPCType),
+ m_pApplicationListener(listener){
};
virtual ~DrillClientBaseHandle(){
@@ -189,7 +190,7 @@ class DrillClientQueryResult: public DrillClientBaseHandle<pfnQueryResultsListen
friend class DrillClientImpl;
public:
DrillClientQueryResult(DrillClientImpl& client, int32_t coordId, const std::string& query, pfnQueryResultsListener listener, void* listenerCtx):
- DrillClientBaseHandle<pfnQueryResultsListener, RecordBatch*>(client, coordId, query, listener, listenerCtx),
+ DrillClientBaseHandle<pfnQueryResultsListener, RecordBatch*>(client, coordId, query, listener, listenerCtx, exec::user::QUERY_HANDLE),
m_numBatches(0),
m_columnDefs(new std::vector<Drill::FieldMetadata*>),
m_bIsQueryPending(true),
@@ -288,8 +289,8 @@ class DrillClientQueryResult: public DrillClientBaseHandle<pfnQueryResultsListen
class DrillClientPrepareHandle: public DrillClientBaseHandle<pfnPreparedStatementListener, PreparedStatement*>, public PreparedStatement {
public:
DrillClientPrepareHandle(DrillClientImpl& client, int32_t coordId, const std::string& query, pfnPreparedStatementListener listener, void* listenerCtx):
- DrillClientBaseHandle<pfnPreparedStatementListener, PreparedStatement*>(client, coordId, query, listener, listenerCtx),
- PreparedStatement(),
+ DrillClientBaseHandle<pfnPreparedStatementListener, PreparedStatement*>(client, coordId, query, listener, listenerCtx, exec::user::PREPARED_STATEMENT),
+ PreparedStatement(),
m_columnDefs(new std::vector<Drill::FieldMetadata*>) {
};
@@ -311,8 +312,8 @@ class DrillClientPrepareHandle: public DrillClientBaseHandle<pfnPreparedStatemen
typedef status_t (*pfnServerMetaListener)(void* ctx, const exec::user::ServerMeta* serverMeta, DrillClientError* err);
class DrillClientServerMetaHandle: public DrillClientBaseHandle<pfnServerMetaListener, const exec::user::ServerMeta*> {
public:
- DrillClientServerMetaHandle(DrillClientImpl& client, int32_t coordId, pfnServerMetaListener listener, void* listenerCtx):
- DrillClientBaseHandle<pfnServerMetaListener, const exec::user::ServerMeta*>(client, coordId, "server meta", listener, listenerCtx) {
+ DrillClientServerMetaHandle(DrillClientImpl& client, int32_t coordId, pfnServerMetaListener listener, void* listenerCtx):
+ DrillClientBaseHandle<pfnServerMetaListener, const exec::user::ServerMeta*>(client, coordId, "server meta", listener, listenerCtx, exec::user::SERVER_META) {
};
private:
@@ -323,8 +324,8 @@ class DrillClientServerMetaHandle: public DrillClientBaseHandle<pfnServerMetaLis
template<typename Listener, typename MetaType, typename MetaImpl, typename MetadataResult>
class DrillClientMetadataResult: public DrillClientBaseHandle<Listener, const DrillCollection<MetaType>*> {
public:
- DrillClientMetadataResult(DrillClientImpl& client, int32_t coordId, const std::string& query, Listener listener, void* listenerCtx):
- DrillClientBaseHandle<Listener, const DrillCollection<MetaType>*>(client, coordId, query, listener, listenerCtx) {}
+ DrillClientMetadataResult(DrillClientImpl& client, int32_t coordId, const std::string& query, Listener listener, void* listenerCtx, int in_expectedRPCType) :
+ DrillClientBaseHandle<Listener, const DrillCollection<MetaType>*>(client, coordId, query, listener, listenerCtx, in_expectedRPCType) {}
void attachMetadataResult(MetadataResult* result) { this->m_pMetadata.reset(result); }
@@ -344,28 +345,28 @@ class DrillClientCatalogResult: public DrillClientMetadataResult<Metadata::pfnCa
friend class DrillClientImpl;
public:
DrillClientCatalogResult(DrillClientImpl& client, int32_t coordId, Metadata::pfnCatalogMetadataListener listener, void* listenerCtx):
- DrillClientMetadataResult<Metadata::pfnCatalogMetadataListener, meta::CatalogMetadata, meta::DrillCatalogMetadata, exec::user::GetCatalogsResp>(client, coordId, "getCatalog", listener, listenerCtx) {}
+ DrillClientMetadataResult<Metadata::pfnCatalogMetadataListener, meta::CatalogMetadata, meta::DrillCatalogMetadata, exec::user::GetCatalogsResp>(client, coordId, "getCatalog", listener, listenerCtx, exec::user::CATALOGS) {}
};
class DrillClientSchemaResult: public DrillClientMetadataResult<Metadata::pfnSchemaMetadataListener, meta::SchemaMetadata, meta::DrillSchemaMetadata, exec::user::GetSchemasResp> {
friend class DrillClientImpl;
public:
DrillClientSchemaResult(DrillClientImpl& client, int32_t coordId, Metadata::pfnSchemaMetadataListener listener, void* listenerCtx):
- DrillClientMetadataResult<Metadata::pfnSchemaMetadataListener, meta::SchemaMetadata, meta::DrillSchemaMetadata, exec::user::GetSchemasResp>(client, coordId, "getSchemas", listener, listenerCtx) {}
+ DrillClientMetadataResult<Metadata::pfnSchemaMetadataListener, meta::SchemaMetadata, meta::DrillSchemaMetadata, exec::user::GetSchemasResp>(client, coordId, "getSchemas", listener, listenerCtx, exec::user::SCHEMAS) {}
};
class DrillClientTableResult: public DrillClientMetadataResult<Metadata::pfnTableMetadataListener, meta::TableMetadata, meta::DrillTableMetadata, exec::user::GetTablesResp> {
friend class DrillClientImpl;
public:
DrillClientTableResult(DrillClientImpl& client, int32_t coordId, Metadata::pfnTableMetadataListener listener, void* listenerCtx):
- DrillClientMetadataResult<Metadata::pfnTableMetadataListener, meta::TableMetadata, meta::DrillTableMetadata, exec::user::GetTablesResp>(client, coordId, "getTables", listener, listenerCtx) {}
+ DrillClientMetadataResult<Metadata::pfnTableMetadataListener, meta::TableMetadata, meta::DrillTableMetadata, exec::user::GetTablesResp>(client, coordId, "getTables", listener, listenerCtx, exec::user::TABLES) {}
};
class DrillClientColumnResult: public DrillClientMetadataResult<Metadata::pfnColumnMetadataListener, meta::ColumnMetadata, meta::DrillColumnMetadata, exec::user::GetColumnsResp> {
friend class DrillClientImpl;
public:
DrillClientColumnResult(DrillClientImpl& client, int32_t coordId, Metadata::pfnColumnMetadataListener listener, void* listenerCtx):
- DrillClientMetadataResult<Metadata::pfnColumnMetadataListener, meta::ColumnMetadata, meta::DrillColumnMetadata, exec::user::GetColumnsResp>(client, coordId, "getColumns", listener, listenerCtx) {}
+ DrillClientMetadataResult<Metadata::pfnColumnMetadataListener, meta::ColumnMetadata, meta::DrillColumnMetadata, exec::user::GetColumnsResp>(client, coordId, "getColumns", listener, listenerCtx, exec::user::COLUMNS) {}
};
// Length Decoder Function Pointer definition
@@ -519,6 +520,7 @@ class DrillClientImpl : public DrillClientImplBase{
void handleRead(ByteBuf_t inBuf, const boost::system::error_code & err, size_t bytes_transferred) ;
status_t validateDataMessage(const rpc::InBoundRpcMessage& msg, const exec::shared::QueryData& qd, std::string& valError);
status_t validateResultMessage(const rpc::InBoundRpcMessage& msg, const exec::shared::QueryResult& qr, std::string& valError);
+ bool validateResultRPCType(DrillClientQueryHandle* pQueryHandle, const rpc::InBoundRpcMessage& msg);
connectionStatus_t handleConnError(connectionStatus_t status, const std::string& msg);
status_t handleQryCancellation(status_t status, DrillClientQueryResult* pQueryResult);
status_t handleQryError(status_t status, const std::string& msg, DrillClientQueryHandle* pQueryHandle);
http://git-wip-us.apache.org/repos/asf/drill/blob/5d2eedff/contrib/native/client/src/clientlib/rpcMessage.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/rpcMessage.cpp b/contrib/native/client/src/clientlib/rpcMessage.cpp
index f64167f..87d2fdb 100644
--- a/contrib/native/client/src/clientlib/rpcMessage.cpp
+++ b/contrib/native/client/src/clientlib/rpcMessage.cpp
@@ -152,6 +152,12 @@ bool decode(const uint8_t* buf, int length, InBoundRpcMessage& msg) {
assert(dbody_length == size);
assert(msg.m_dbody==buf+currPos);
+
+ // Enforce assertion due to unexpected crashes during network error.
+ if (!((dbody_length == size) && (msg.m_dbody == (buf + currPos)))) {
+ return false;
+ }
+
#ifdef CODER_DEBUG
cerr << "Read raw body of " << dbody_length << " bytes" << endl;
#endif
@@ -173,6 +179,12 @@ bool decode(const uint8_t* buf, int length, InBoundRpcMessage& msg) {
int endPos = cis.CurrentPosition();
assert((endPos-startPos) == length);
+
+ // Enforce assertion due to unexpected crashes during network error.
+ if ((endPos - startPos) != length) {
+ return false;
+
+ }
return true;
}