You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pa...@apache.org on 2017/06/12 01:27:40 UTC

[2/4] drill git commit: DRILL-5541: C++ Client Crashes During Simple "Man in the Middle" Attack Test with Exploitable Write AV This closes #850

DRILL-5541: C++ Client Crashes During Simple "Man in the Middle" Attack Test with Exploitable Write AV
This closes #850


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/5d2eedff
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/5d2eedff
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/5d2eedff

Branch: refs/heads/master
Commit: 5d2eedff8329b687fb7a5a498971503b82260946
Parents: adb79fb
Author: Rob Wu <ro...@gmail.com>
Authored: Mon Jun 5 14:06:33 2017 -0700
Committer: Parth Chandra <pa...@apache.org>
Committed: Sun Jun 11 17:28:52 2017 -0700

----------------------------------------------------------------------
 .../client/src/clientlib/drillClientImpl.cpp    | 46 ++++++++++++++++++--
 .../client/src/clientlib/drillClientImpl.hpp    | 40 +++++++++--------
 .../native/client/src/clientlib/rpcMessage.cpp  | 12 +++++
 3 files changed, 75 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/5d2eedff/contrib/native/client/src/clientlib/drillClientImpl.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/drillClientImpl.cpp b/contrib/native/client/src/clientlib/drillClientImpl.cpp
index 0dee309..7015b5a 100644
--- a/contrib/native/client/src/clientlib/drillClientImpl.cpp
+++ b/contrib/native/client/src/clientlib/drillClientImpl.cpp
@@ -1580,6 +1580,9 @@ status_t DrillClientImpl::processPreparedStatement(AllocatedBufferPtr allocatedB
     std::map<int,DrillClientQueryHandle*>::const_iterator it=this->m_queryHandles.find(msg.m_coord_id);
     if(it!=this->m_queryHandles.end()){
         DrillClientPrepareHandle* pDrillClientPrepareHandle=static_cast<DrillClientPrepareHandle*>((*it).second);
+        if (!validateResultRPCType(pDrillClientPrepareHandle, msg)){
+            return handleQryError(QRY_COMM_ERROR, "Unexpected RPC Type for prepared statement.", pDrillClientPrepareHandle);
+        }
         exec::user::CreatePreparedStatementResp resp;
         DRILL_MT_LOG(DRILL_LOG(LOG_TRACE)  << "Received Prepared Statement Handle " << msg.m_pbody.size() << std::endl;)
         if (!resp.ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size())) {
@@ -1588,7 +1591,9 @@ status_t DrillClientImpl::processPreparedStatement(AllocatedBufferPtr allocatedB
         if (resp.has_status() && resp.status() != exec::user::OK) {
             return handleQryError(QRY_FAILED, resp.error(), pDrillClientPrepareHandle);
         }
-        pDrillClientPrepareHandle->setupPreparedStatement(resp.prepared_statement());
+        if (QRY_SUCCESS != pDrillClientPrepareHandle->setupPreparedStatement(resp.prepared_statement())){
+            return handleQryError(QRY_FAILED, "Error during prepared statement setup.", pDrillClientPrepareHandle);
+        }
         pDrillClientPrepareHandle->notifyListener(pDrillClientPrepareHandle, NULL);
         DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Prepared Statement handle - " << resp.prepared_statement().server_handle().DebugString() << std::endl;)
     }else{
@@ -1619,6 +1624,9 @@ status_t DrillClientImpl::processCatalogsResult(AllocatedBufferPtr allocatedBuff
     std::map<int,DrillClientQueryHandle*>::const_iterator it=this->m_queryHandles.find(msg.m_coord_id);
     if(it!=this->m_queryHandles.end()){
         DrillClientCatalogResult* pHandle=static_cast<DrillClientCatalogResult*>((*it).second);
+        if (!validateResultRPCType(pHandle, msg)){
+            return handleQryError(QRY_COMM_ERROR, "Unexpected RPC Type for getcatalogs results.", pHandle);
+        }
         exec::user::GetCatalogsResp* resp = new exec::user::GetCatalogsResp;
         pHandle->attachMetadataResult(resp);
         DRILL_MT_LOG(DRILL_LOG(LOG_TRACE)  << "Received GetCatalogs result Handle " << msg.m_pbody.size() << std::endl;)
@@ -1667,6 +1675,9 @@ status_t DrillClientImpl::processSchemasResult(AllocatedBufferPtr allocatedBuffe
     std::map<int,DrillClientQueryHandle*>::const_iterator it=this->m_queryHandles.find(msg.m_coord_id);
     if(it!=this->m_queryHandles.end()){
         DrillClientSchemaResult* pHandle=static_cast<DrillClientSchemaResult*>((*it).second);
+        if (!validateResultRPCType(pHandle, msg)){
+            return handleQryError(QRY_COMM_ERROR, "Unexpected RPC Type for getschemas results.", pHandle);
+        }
         exec::user::GetSchemasResp* resp = new exec::user::GetSchemasResp();
         pHandle->attachMetadataResult(resp);
         DRILL_MT_LOG(DRILL_LOG(LOG_TRACE)  << "Received GetSchemasResp result Handle " << msg.m_pbody.size() << std::endl;)
@@ -1715,6 +1726,9 @@ status_t DrillClientImpl::processTablesResult(AllocatedBufferPtr allocatedBuffer
     std::map<int,DrillClientQueryHandle*>::const_iterator it=this->m_queryHandles.find(msg.m_coord_id);
     if(it!=this->m_queryHandles.end()){
         DrillClientTableResult* pHandle=static_cast<DrillClientTableResult*>((*it).second);
+        if (!validateResultRPCType(pHandle, msg)){
+            return handleQryError(QRY_COMM_ERROR, "Unexpected RPC Type for gettables results.", pHandle);
+        }
         exec::user::GetTablesResp* resp =  new exec::user::GetTablesResp();
         pHandle->attachMetadataResult(resp);
         DRILL_MT_LOG(DRILL_LOG(LOG_TRACE)  << "Received GeTablesResp result Handle " << msg.m_pbody.size() << std::endl;)
@@ -1762,6 +1776,9 @@ status_t DrillClientImpl::processColumnsResult(AllocatedBufferPtr allocatedBuffe
     std::map<int,DrillClientQueryHandle*>::const_iterator it=this->m_queryHandles.find(msg.m_coord_id);
     if(it!=this->m_queryHandles.end()){
         DrillClientColumnResult* pHandle=static_cast<DrillClientColumnResult*>((*it).second);
+        if (!validateResultRPCType(pHandle, msg)){
+            return handleQryError(QRY_COMM_ERROR, "Unexpected RPC Type for getcolumns results.", pHandle);
+        }
         exec::user::GetColumnsResp* resp = new exec::user::GetColumnsResp();
         pHandle->attachMetadataResult(resp);
         DRILL_MT_LOG(DRILL_LOG(LOG_TRACE)  << "Received GetColumnsResp result Handle " << msg.m_pbody.size() << std::endl;)
@@ -1809,6 +1826,9 @@ status_t DrillClientImpl::processServerMetaResult(AllocatedBufferPtr allocatedBu
     std::map<int,DrillClientQueryHandle*>::const_iterator it=this->m_queryHandles.find(msg.m_coord_id);
     if(it!=this->m_queryHandles.end()){
         DrillClientServerMetaHandle* pHandle=static_cast<DrillClientServerMetaHandle*>((*it).second);
+        if (!validateResultRPCType(pHandle, msg)){
+            return handleQryError(QRY_COMM_ERROR, "Unexpected RPC Type for GetServerMetaResp results.", pHandle);
+        }
         exec::user::GetServerMetaResp* resp = new exec::user::GetServerMetaResp();
         DRILL_MT_LOG(DRILL_LOG(LOG_TRACE)  << "Received GetServerMetaResp result Handle " << msg.m_pbody.size() << std::endl;)
         if (!(resp->ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size()))) {
@@ -1863,7 +1883,8 @@ status_t DrillClientImpl::processQueryStatusResult(exec::shared::QueryResult* qr
         case exec::shared::QueryResult_QueryState_FAILED:
             {
                 // get the error message from protobuf and handle errors
-                ret=handleQryError(ret, qr->error(0), pDrillClientQueryResult);
+                ret = (0 == qr->error_size()) ? 
+                    handleQryError(ret, "Unknown protobuf error.", pDrillClientQueryResult) : handleQryError(ret, qr->error(0), pDrillClientQueryResult);
             }
             break;
             // m_pendingRequests should be decremented when the query is
@@ -2100,6 +2121,20 @@ status_t DrillClientImpl::validateResultMessage(const rpc::InBoundRpcMessage& ms
     return QRY_SUCCESS;
 }
 
+bool DrillClientImpl::validateResultRPCType(DrillClientQueryHandle* pQueryHandle, const rpc::InBoundRpcMessage& msg) {
+    DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::validateResultRPCType" << std::endl;)
+    if (NULL != pQueryHandle) {
+        DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG)
+            << "DrillClientImpl::validateResultRPCType: Expected RPC Type: "
+            << pQueryHandle->getExpectedRPCType()
+            << " inbound RPC Type: "
+            << msg.m_rpc_type
+            << std::endl;)
+        return (pQueryHandle->getExpectedRPCType() == msg.m_rpc_type);
+    }
+    return false;
+}
+
 /*
  * Called when there is failure in connect/send.
  */
@@ -2589,8 +2624,11 @@ status_t DrillClientPrepareHandle::setupPreparedStatement(const exec::user::Prep
     }
 
     // Copy server handle
-    this->m_preparedStatementHandle.CopyFrom(pstmt.server_handle());
-    return QRY_SUCCESS;
+    if (pstmt.has_server_handle()){
+        this->m_preparedStatementHandle.CopyFrom(pstmt.server_handle());
+        return QRY_SUCCESS;
+    }
+    return QRY_FAILURE;
 }
 
 void DrillClientPrepareHandle::clearAndDestroy(){

http://git-wip-us.apache.org/repos/asf/drill/blob/5d2eedff/contrib/native/client/src/clientlib/drillClientImpl.hpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/drillClientImpl.hpp b/contrib/native/client/src/clientlib/drillClientImpl.hpp
index 852233f..efa4e66 100644
--- a/contrib/native/client/src/clientlib/drillClientImpl.hpp
+++ b/contrib/native/client/src/clientlib/drillClientImpl.hpp
@@ -109,15 +109,16 @@ class DrillClientImplBase{
 class DrillClientQueryHandle{
     friend class DrillClientImpl;
     public:
-    DrillClientQueryHandle(DrillClientImpl& client, int32_t coordId, const std::string& query, void* context):
+        DrillClientQueryHandle(DrillClientImpl& client, int32_t coordId, const std::string& query, void* context, int in_expectedRPCType) :
         m_client(client),
         m_coordinationId(coordId),
         m_query(query),
-		m_status(QRY_SUCCESS),
+        m_status(QRY_SUCCESS),
         m_bCancel(false),
         m_bHasError(false),
+        m_expectedRPCType(in_expectedRPCType),
         m_pError(NULL),
-		m_pApplicationContext(context){
+        m_pApplicationContext(context){
     };
 
     virtual ~DrillClientQueryHandle(){
@@ -137,7 +138,7 @@ class DrillClientQueryHandle{
     void setQueryStatus(status_t s){ m_status = s;}
     status_t getQueryStatus() const { return m_status;}
     inline DrillClientImpl& client() const { return m_client; };
-
+    int getExpectedRPCType() const { return m_expectedRPCType; };
     inline void* getApplicationContext() const { return m_pApplicationContext; }
 
     protected:
@@ -153,7 +154,7 @@ class DrillClientQueryHandle{
     status_t m_status;
     bool m_bCancel;
     bool m_bHasError;
-
+    int m_expectedRPCType;
     const DrillClientError* m_pError;
 
     void* m_pApplicationContext;
@@ -163,9 +164,9 @@ template<typename Listener, typename ListenerValue>
 class DrillClientBaseHandle: public DrillClientQueryHandle {
     friend class DrillClientImpl;
     public:
-    DrillClientBaseHandle(DrillClientImpl& client, int32_t coordId, const std::string& query, Listener listener, void* context):
-    	DrillClientQueryHandle(client, coordId, query, context),
-		m_pApplicationListener(listener){
+        DrillClientBaseHandle(DrillClientImpl& client, int32_t coordId, const std::string& query, Listener listener, void* context, int in_expectedRPCType) :
+            DrillClientQueryHandle(client, coordId, query, context, in_expectedRPCType),
+            m_pApplicationListener(listener){
     };
 
     virtual ~DrillClientBaseHandle(){
@@ -189,7 +190,7 @@ class DrillClientQueryResult: public DrillClientBaseHandle<pfnQueryResultsListen
     friend class DrillClientImpl;
     public:
     DrillClientQueryResult(DrillClientImpl& client, int32_t coordId, const std::string& query, pfnQueryResultsListener listener, void* listenerCtx):
-    	DrillClientBaseHandle<pfnQueryResultsListener, RecordBatch*>(client, coordId, query, listener, listenerCtx),
+        DrillClientBaseHandle<pfnQueryResultsListener, RecordBatch*>(client, coordId, query, listener, listenerCtx, exec::user::QUERY_HANDLE),
         m_numBatches(0),
         m_columnDefs(new std::vector<Drill::FieldMetadata*>),
         m_bIsQueryPending(true),
@@ -288,8 +289,8 @@ class DrillClientQueryResult: public DrillClientBaseHandle<pfnQueryResultsListen
 class DrillClientPrepareHandle: public DrillClientBaseHandle<pfnPreparedStatementListener, PreparedStatement*>, public PreparedStatement {
     public:
     DrillClientPrepareHandle(DrillClientImpl& client, int32_t coordId, const std::string& query, pfnPreparedStatementListener listener, void* listenerCtx):
-    	DrillClientBaseHandle<pfnPreparedStatementListener, PreparedStatement*>(client, coordId, query, listener, listenerCtx),
-		PreparedStatement(),
+        DrillClientBaseHandle<pfnPreparedStatementListener, PreparedStatement*>(client, coordId, query, listener, listenerCtx, exec::user::PREPARED_STATEMENT),
+        PreparedStatement(),
         m_columnDefs(new std::vector<Drill::FieldMetadata*>) {
     };
 
@@ -311,8 +312,8 @@ class DrillClientPrepareHandle: public DrillClientBaseHandle<pfnPreparedStatemen
 typedef status_t (*pfnServerMetaListener)(void* ctx, const exec::user::ServerMeta* serverMeta, DrillClientError* err);
 class DrillClientServerMetaHandle: public DrillClientBaseHandle<pfnServerMetaListener, const exec::user::ServerMeta*> {
     public:
-	DrillClientServerMetaHandle(DrillClientImpl& client, int32_t coordId, pfnServerMetaListener listener, void* listenerCtx):
-    	DrillClientBaseHandle<pfnServerMetaListener, const exec::user::ServerMeta*>(client, coordId, "server meta", listener, listenerCtx) {
+    DrillClientServerMetaHandle(DrillClientImpl& client, int32_t coordId, pfnServerMetaListener listener, void* listenerCtx):
+        DrillClientBaseHandle<pfnServerMetaListener, const exec::user::ServerMeta*>(client, coordId, "server meta", listener, listenerCtx, exec::user::SERVER_META) {
     };
 
     private:
@@ -323,8 +324,8 @@ class DrillClientServerMetaHandle: public DrillClientBaseHandle<pfnServerMetaLis
 template<typename Listener, typename MetaType, typename MetaImpl, typename MetadataResult>
 class DrillClientMetadataResult: public DrillClientBaseHandle<Listener, const DrillCollection<MetaType>*> {
 public:
-    DrillClientMetadataResult(DrillClientImpl& client, int32_t coordId, const std::string& query, Listener listener, void* listenerCtx):
-    	DrillClientBaseHandle<Listener, const DrillCollection<MetaType>*>(client, coordId, query, listener, listenerCtx) {}
+    DrillClientMetadataResult(DrillClientImpl& client, int32_t coordId, const std::string& query, Listener listener, void* listenerCtx, int in_expectedRPCType) :
+        DrillClientBaseHandle<Listener, const DrillCollection<MetaType>*>(client, coordId, query, listener, listenerCtx, in_expectedRPCType) {}
 
     void attachMetadataResult(MetadataResult* result) { this->m_pMetadata.reset(result); }
 
@@ -344,28 +345,28 @@ class DrillClientCatalogResult: public DrillClientMetadataResult<Metadata::pfnCa
     friend class DrillClientImpl;
 public:
     DrillClientCatalogResult(DrillClientImpl& client, int32_t coordId, Metadata::pfnCatalogMetadataListener listener, void* listenerCtx):
-    	DrillClientMetadataResult<Metadata::pfnCatalogMetadataListener, meta::CatalogMetadata, meta::DrillCatalogMetadata, exec::user::GetCatalogsResp>(client, coordId, "getCatalog", listener, listenerCtx) {}
+        DrillClientMetadataResult<Metadata::pfnCatalogMetadataListener, meta::CatalogMetadata, meta::DrillCatalogMetadata, exec::user::GetCatalogsResp>(client, coordId, "getCatalog", listener, listenerCtx, exec::user::CATALOGS) {}
 };
 
 class DrillClientSchemaResult: public DrillClientMetadataResult<Metadata::pfnSchemaMetadataListener, meta::SchemaMetadata, meta::DrillSchemaMetadata, exec::user::GetSchemasResp> {
     friend class DrillClientImpl;
 public:
     DrillClientSchemaResult(DrillClientImpl& client, int32_t coordId, Metadata::pfnSchemaMetadataListener listener, void* listenerCtx):
-    	DrillClientMetadataResult<Metadata::pfnSchemaMetadataListener, meta::SchemaMetadata, meta::DrillSchemaMetadata, exec::user::GetSchemasResp>(client, coordId, "getSchemas", listener, listenerCtx) {}
+        DrillClientMetadataResult<Metadata::pfnSchemaMetadataListener, meta::SchemaMetadata, meta::DrillSchemaMetadata, exec::user::GetSchemasResp>(client, coordId, "getSchemas", listener, listenerCtx, exec::user::SCHEMAS) {}
 };
 
 class DrillClientTableResult: public DrillClientMetadataResult<Metadata::pfnTableMetadataListener, meta::TableMetadata, meta::DrillTableMetadata, exec::user::GetTablesResp> {
     friend class DrillClientImpl;
 public:
     DrillClientTableResult(DrillClientImpl& client, int32_t coordId, Metadata::pfnTableMetadataListener listener, void* listenerCtx):
-    	DrillClientMetadataResult<Metadata::pfnTableMetadataListener, meta::TableMetadata, meta::DrillTableMetadata, exec::user::GetTablesResp>(client, coordId, "getTables", listener, listenerCtx) {}
+        DrillClientMetadataResult<Metadata::pfnTableMetadataListener, meta::TableMetadata, meta::DrillTableMetadata, exec::user::GetTablesResp>(client, coordId, "getTables", listener, listenerCtx, exec::user::TABLES) {}
 };
 
 class DrillClientColumnResult: public DrillClientMetadataResult<Metadata::pfnColumnMetadataListener, meta::ColumnMetadata, meta::DrillColumnMetadata, exec::user::GetColumnsResp> {
     friend class DrillClientImpl;
     public:
     DrillClientColumnResult(DrillClientImpl& client, int32_t coordId, Metadata::pfnColumnMetadataListener listener, void* listenerCtx):
-    	DrillClientMetadataResult<Metadata::pfnColumnMetadataListener, meta::ColumnMetadata, meta::DrillColumnMetadata, exec::user::GetColumnsResp>(client, coordId, "getColumns", listener, listenerCtx) {}
+        DrillClientMetadataResult<Metadata::pfnColumnMetadataListener, meta::ColumnMetadata, meta::DrillColumnMetadata, exec::user::GetColumnsResp>(client, coordId, "getColumns", listener, listenerCtx, exec::user::COLUMNS) {}
 };
 
 // Length Decoder Function Pointer definition
@@ -519,6 +520,7 @@ class DrillClientImpl : public DrillClientImplBase{
         void handleRead(ByteBuf_t inBuf, const boost::system::error_code & err, size_t bytes_transferred) ;
         status_t validateDataMessage(const rpc::InBoundRpcMessage& msg, const exec::shared::QueryData& qd, std::string& valError);
         status_t validateResultMessage(const rpc::InBoundRpcMessage& msg, const exec::shared::QueryResult& qr, std::string& valError);
+        bool validateResultRPCType(DrillClientQueryHandle* pQueryHandle, const rpc::InBoundRpcMessage& msg);
         connectionStatus_t handleConnError(connectionStatus_t status, const std::string& msg);
         status_t handleQryCancellation(status_t status, DrillClientQueryResult* pQueryResult);
         status_t handleQryError(status_t status, const std::string& msg, DrillClientQueryHandle* pQueryHandle);

http://git-wip-us.apache.org/repos/asf/drill/blob/5d2eedff/contrib/native/client/src/clientlib/rpcMessage.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/rpcMessage.cpp b/contrib/native/client/src/clientlib/rpcMessage.cpp
index f64167f..87d2fdb 100644
--- a/contrib/native/client/src/clientlib/rpcMessage.cpp
+++ b/contrib/native/client/src/clientlib/rpcMessage.cpp
@@ -152,6 +152,12 @@ bool decode(const uint8_t* buf, int length, InBoundRpcMessage& msg) {
 
         assert(dbody_length == size);
         assert(msg.m_dbody==buf+currPos);
+
+        // Enforce assertion due to unexpected crashes during network error.
+        if (!((dbody_length == size) && (msg.m_dbody == (buf + currPos)))) {
+            return false;
+        }
+
 		#ifdef CODER_DEBUG
 			cerr << "Read raw body of " << dbody_length << " bytes" << endl;
 		#endif
@@ -173,6 +179,12 @@ bool decode(const uint8_t* buf, int length, InBoundRpcMessage& msg) {
 
     int endPos = cis.CurrentPosition();
     assert((endPos-startPos) == length);
+
+    // Enforce assertion due to unexpected crashes during network error.
+    if ((endPos - startPos) != length) {
+        return false;
+        
+    }
     return true;
 }