You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pa...@apache.org on 2015/02/18 00:55:15 UTC

[2/2] drill git commit: DRILL-2038: Fix handling of error objects. C++ Client syncronous API has a crash with multiple parallel queries.

DRILL-2038: Fix handling of error objects. C++ Client syncronous API has a crash with multiple parallel queries.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/36fc560f
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/36fc560f
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/36fc560f

Branch: refs/heads/master
Commit: 36fc560f0eebd39cdb4509bcfd180230bb9f3c22
Parents: 81ec0fb
Author: Parth Chandra <pc...@maprtech.com>
Authored: Mon Jan 19 12:07:24 2015 -0800
Committer: Parth Chandra <pc...@maprtech.com>
Committed: Tue Feb 17 15:54:21 2015 -0800

----------------------------------------------------------------------
 .../native/client/example/querySubmitter.cpp    | 40 +++++++++++----
 .../client/src/clientlib/drillClientImpl.cpp    | 53 ++++++++++++--------
 .../client/src/clientlib/drillClientImpl.hpp    |  1 +
 3 files changed, 63 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/36fc560f/contrib/native/client/example/querySubmitter.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/example/querySubmitter.cpp b/contrib/native/client/example/querySubmitter.cpp
index 9ecee24..2b0f000 100644
--- a/contrib/native/client/example/querySubmitter.cpp
+++ b/contrib/native/client/example/querySubmitter.cpp
@@ -22,7 +22,7 @@
 #include <stdlib.h>
 #include "drill/drillc.hpp"
 
-int nOptions=10;
+int nOptions=11;
 
 struct Option{
     char name[32];
@@ -37,6 +37,7 @@ struct Option{
     {"api", "API type [sync|async]", true},
     {"logLevel", "Logging level [trace|debug|info|warn|error|fatal]", false},
     {"testCancel", "Cancel the query afterthe first record batch.", false},
+    {"syncSend", "Send query only after previous result is received", false},
     {"hshakeTimeout", "Handshake timeout (second).", false},
     {"queryTimeout", "Query timeout (second).", false}
 };
@@ -44,6 +45,7 @@ struct Option{
 std::map<std::string, std::string> qsOptionValues;
 
 bool bTestCancel=false;
+bool bSyncSend=false;
 
 
 Drill::status_t SchemaListener(void* ctx, Drill::FieldDefPtr fields, Drill::DrillClientError* err){
@@ -268,6 +270,7 @@ int main(int argc, char* argv[]) {
         std::string type_str=qsOptionValues["type"];
         std::string logLevel=qsOptionValues["logLevel"];
         std::string testCancel=qsOptionValues["testCancel"];
+        std::string syncSend=qsOptionValues["syncSend"];
         std::string hshakeTimeout=qsOptionValues["hshakeTimeout"];
         std::string queryTimeout=qsOptionValues["queryTimeout"];
 
@@ -295,6 +298,7 @@ int main(int argc, char* argv[]) {
         }
 
         bTestCancel = !strcmp(testCancel.c_str(), "true")?true:false;
+        bSyncSend = !strcmp(syncSend.c_str(), "true")?true:false;
 
         std::vector<std::string>::iterator queryInpIter;
 
@@ -371,16 +375,30 @@ int main(int argc, char* argv[]) {
                 client.freeQueryIterator(&pRecIter);
             }
         }else{
-            for(queryInpIter = queryInputs.begin(); queryInpIter != queryInputs.end(); queryInpIter++) {
-                Drill::QueryHandle_t* qryHandle = new Drill::QueryHandle_t;
-                client.submitQuery(type, *queryInpIter, QueryResultsListener, NULL, qryHandle);
-                client.registerSchemaChangeListener(qryHandle, SchemaListener);
-                queryHandles.push_back(qryHandle);
-            }
-            client.waitForResults();
-            for(queryHandleIter = queryHandles.begin(); queryHandleIter != queryHandles.end(); queryHandleIter++) {
-                client.freeQueryResources(*queryHandleIter);
-                delete *queryHandleIter;
+            if(bSyncSend){
+                for(queryInpIter = queryInputs.begin(); queryInpIter != queryInputs.end(); queryInpIter++) {
+                    Drill::QueryHandle_t* qryHandle = new Drill::QueryHandle_t;
+                    client.submitQuery(type, *queryInpIter, QueryResultsListener, NULL, qryHandle);
+                    client.registerSchemaChangeListener(qryHandle, SchemaListener);
+                    
+                    client.waitForResults();
+
+                    client.freeQueryResources(qryHandle);
+                    delete qryHandle;
+                }
+
+            }else{
+                for(queryInpIter = queryInputs.begin(); queryInpIter != queryInputs.end(); queryInpIter++) {
+                    Drill::QueryHandle_t* qryHandle = new Drill::QueryHandle_t;
+                    client.submitQuery(type, *queryInpIter, QueryResultsListener, NULL, qryHandle);
+                    client.registerSchemaChangeListener(qryHandle, SchemaListener);
+                    queryHandles.push_back(qryHandle);
+                }
+                client.waitForResults();
+                for(queryHandleIter = queryHandles.begin(); queryHandleIter != queryHandles.end(); queryHandleIter++) {
+                    client.freeQueryResources(*queryHandleIter);
+                    delete *queryHandleIter;
+                }
             }
         }
         client.close();

http://git-wip-us.apache.org/repos/asf/drill/blob/36fc560f/contrib/native/client/src/clientlib/drillClientImpl.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/drillClientImpl.cpp b/contrib/native/client/src/clientlib/drillClientImpl.cpp
index 40bd81e..f9c17f9 100644
--- a/contrib/native/client/src/clientlib/drillClientImpl.cpp
+++ b/contrib/native/client/src/clientlib/drillClientImpl.cpp
@@ -397,12 +397,14 @@ void DrillClientImpl::getNextResult(){
 }
 
 void DrillClientImpl::waitForResults(){
-    // do nothing. No we do not need to explicity wait for the listener thread to finish
-    delete this->m_pWork; this->m_pWork = NULL; // inform io_service that io_service is permited to exit
-    this->m_pListenerThread->join();
-    DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::waitForResults: Listener thread "
-        << this->m_pListenerThread << " exited." << std::endl;
-    delete this->m_pListenerThread; this->m_pListenerThread=NULL;
+    if(this->m_pListenerThread!=NULL){
+        // do nothing. No we do not need to explicity wait for the listener thread to finish
+        delete this->m_pWork; this->m_pWork = NULL; // inform io_service that io_service is permited to exit
+        this->m_pListenerThread->join();
+        DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::waitForResults: Listener thread "
+            << this->m_pListenerThread << " exited." << std::endl;
+        delete this->m_pListenerThread; this->m_pListenerThread=NULL;
+    }
 }
 
 status_t DrillClientImpl::readMsg(ByteBuf_t _buf,
@@ -638,7 +640,9 @@ status_t DrillClientImpl::processQueryId(AllocatedBufferPtr allocatedBuffer, InB
     boost::lock_guard<boost::mutex> lock(m_dcMutex);
     std::map<int,DrillClientQueryResult*>::iterator it;
     for(it=this->m_queryIds.begin();it!=this->m_queryIds.end();it++){
-    DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::processQueryId: m_queryIds: " << it->first << std::endl;
+        std::string qidString = it->second->m_pQueryId!=NULL?debugPrintQid(*it->second->m_pQueryId):std::string("NULL");
+        DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::processQueryId: m_queryIds: coordinationId: " << it->first
+        << " QueryId: "<< qidString << std::endl;
     }
     if(msg.m_coord_id==0){
         DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryId: m_coord_id=0. Ignore and return QRY_SUCCESS." << std::endl;
@@ -855,22 +859,25 @@ status_t DrillClientImpl::validateMessage(InBoundRpcMessage& msg, exec::shared::
 connectionStatus_t DrillClientImpl::handleConnError(connectionStatus_t status, std::string msg){
     DrillClientError* pErr = new DrillClientError(status, DrillClientError::CONN_ERROR_START+status, msg);
     m_pendingRequests=0;
-    if(m_pError!=NULL){ delete m_pError; m_pError=NULL;}
-    m_pError=pErr;
-    broadcastError(this->m_pError);
+    if(!m_queryIds.empty()){
+        // set query error only if queries are running
+        broadcastError(pErr);
+    }else{
+        if(m_pError!=NULL){ delete m_pError; m_pError=NULL;}
+        m_pError=pErr;
+    }
     return status;
 }
 
 status_t DrillClientImpl::handleQryError(status_t status, std::string msg, DrillClientQueryResult* pQueryResult){
     DrillClientError* pErr = new DrillClientError(status, DrillClientError::QRY_ERROR_START+status, msg);
-    if(m_pError!=NULL){ delete m_pError; m_pError=NULL;}
-    m_pError=pErr;
+    // set query error only if queries are running
     if(pQueryResult!=NULL){
         m_pendingRequests--;
         pQueryResult->signalError(pErr);
     }else{
         m_pendingRequests=0;
-        broadcastError(this->m_pError);
+        broadcastError(pErr);
     }
     return status;
 }
@@ -879,9 +886,8 @@ status_t DrillClientImpl::handleQryError(status_t status,
         const exec::shared::DrillPBError& e,
         DrillClientQueryResult* pQueryResult){
     assert(pQueryResult!=NULL);
-    if(m_pError!=NULL){ delete m_pError; m_pError=NULL;}
-    this->m_pError = DrillClientError::getErrorObject(e);
-    pQueryResult->signalError(this->m_pError);
+    DrillClientError* pErr =  DrillClientError::getErrorObject(e);
+    pQueryResult->signalError(pErr);
     m_pendingRequests--;
     return status;
 }
@@ -904,10 +910,11 @@ status_t DrillClientImpl::handleTerminatedQryState(
         std::string msg,
         DrillClientQueryResult* pQueryResult){
     assert(pQueryResult!=NULL);
-    DrillClientError* pErr = new DrillClientError(status, DrillClientError::QRY_ERROR_START+status, msg);
-    if(m_pError!=NULL){ delete m_pError; m_pError=NULL;}
-    m_pError=pErr;
-    pQueryResult->signalError(pErr);
+    if(status!=QRY_COMPLETED){
+        // set query error only if queries did not complete successfully
+        DrillClientError* pErr = new DrillClientError(status, DrillClientError::QRY_ERROR_START+status, msg);
+        pQueryResult->signalError(pErr);
+    }
     return status;
 }
 
@@ -1109,6 +1116,9 @@ void DrillClientQueryResult::cancel() {
 void DrillClientQueryResult::signalError(DrillClientError* pErr){
     // Ignore return values from the listener.
     if(pErr!=NULL){
+        if(m_pError!=NULL){
+            delete m_pError; m_pError=NULL;
+        }
         m_pError=pErr;
         pfnQueryResultsListener pResultsListener=this->m_pResultsListener;
         if(pResultsListener!=NULL){
@@ -1157,6 +1167,9 @@ void DrillClientQueryResult::clearAndDestroy(){
             delete pR;
         }
     }
+    if(m_pError!=NULL){
+        delete m_pError; m_pError=NULL;
+}
 }
 
 char ZookeeperImpl::s_drillRoot[]="/drill/";

http://git-wip-us.apache.org/repos/asf/drill/blob/36fc560f/contrib/native/client/src/clientlib/drillClientImpl.hpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/drillClientImpl.hpp b/contrib/native/client/src/clientlib/drillClientImpl.hpp
index bd33317..c87e1b7 100644
--- a/contrib/native/client/src/clientlib/drillClientImpl.hpp
+++ b/contrib/native/client/src/clientlib/drillClientImpl.hpp
@@ -134,6 +134,7 @@ class DrillClientQueryResult{
     status_t defaultQueryResultsListener(void* ctx, RecordBatch* b, DrillClientError* err);
     // Construct a DrillClientError object, set the appropriate state and signal any listeners, condition variables.
     // Also used when a query is cancelled or when a query completed response is received.
+    // Error object is now owned by the DrillClientQueryResult object.
     void signalError(DrillClientError* pErr);
     void clearAndDestroy();