You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pa...@apache.org on 2015/01/07 02:56:28 UTC

[3/4] drill git commit: DRILL-1498 Drill Client to handle spurious results and handshake messages

DRILL-1498 Drill Client to handle spurious results and handshake messages


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/4304b25e
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/4304b25e
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/4304b25e

Branch: refs/heads/master
Commit: 4304b25e8246970163966e5790231d7ffdbd308f
Parents: 621527d
Author: norrislee <no...@hotmail.com>
Authored: Tue Jan 6 14:44:00 2015 -0800
Committer: Parth Chandra <pc...@maprtech.com>
Committed: Tue Jan 6 15:31:26 2015 -0800

----------------------------------------------------------------------
 .../client/src/clientlib/drillClientImpl.cpp    | 41 ++++++++++++++++++--
 .../native/client/src/clientlib/rpcDecoder.cpp  |  2 +
 .../native/client/src/clientlib/rpcMessage.hpp  |  4 ++
 3 files changed, 44 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/4304b25e/contrib/native/client/src/clientlib/drillClientImpl.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/drillClientImpl.cpp b/contrib/native/client/src/clientlib/drillClientImpl.cpp
index 5b390ef..6f4a2ca 100644
--- a/contrib/native/client/src/clientlib/drillClientImpl.cpp
+++ b/contrib/native/client/src/clientlib/drillClientImpl.cpp
@@ -478,6 +478,17 @@ status_t DrillClientImpl::processQueryResult(AllocatedBufferPtr  allocatedBuffer
 
         qid.CopyFrom(qr->query_id());
         std::map<exec::shared::QueryId*, DrillClientQueryResult*, compareQueryId>::iterator it;
+        DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: m_queryResults size: " << m_queryResults.size() << std::endl;
+        if(m_queryResults.size() != 0){
+            for(it=m_queryResults.begin(); it!=m_queryResults.end(); it++){
+                DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: m_QueryResult ids: [" << it->first->part1() << ":"
+                    << it->first->part2() << "]\n";
+            }
+        }
+        if(qid.part1()==0){
+            DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: QID=0. Ignore and return QRY_SUCCESS." << std::endl;
+            return QRY_SUCCESS;
+        }
         it=this->m_queryResults.find(&qid);
         if(it!=this->m_queryResults.end()){
             pDrillClientQueryResult=(*it).second;
@@ -610,6 +621,13 @@ status_t DrillClientImpl::processQueryId(AllocatedBufferPtr allocatedBuffer, InB
 
     boost::lock_guard<boost::mutex> lock(m_dcMutex);
     std::map<int,DrillClientQueryResult*>::iterator it;
+    for(it=this->m_queryIds.begin();it!=this->m_queryIds.end();it++){
+    DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::processQueryId: m_queryIds: " << it->first << std::endl;
+    }
+    if(msg.m_coord_id==0){
+        DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryId: m_coord_id=0. Ignore and return QRY_SUCCESS." << std::endl;
+        return QRY_SUCCESS;
+    }
     it=this->m_queryIds.find(msg.m_coord_id);
     if(it!=this->m_queryIds.end()){
         pDrillClientQueryResult=(*it).second;
@@ -755,10 +773,27 @@ void DrillClientImpl::handleRead(ByteBuf_t _buf,
                 handleQryError(QRY_COMM_ERROR, getMessage(ERR_QRY_COMMERR, error.message().c_str()), NULL);
                 return;
             }else{
-                //If not QUERY_RESULT, then we think something serious has gone wrong?
-                assert(0);
-                DRILL_LOG(LOG_TRACE) << "QueryResult returned " << msg.m_rpc_type << std::endl;
+                // If not QUERY_RESULT, then we think something serious has gone wrong?
+                // In one case when the client hung, we observed that the server was sending a handshake request to the client
+                // We should properly handle these handshake requests/responses
+                if(msg.has_rpc_type() && msg.m_rpc_type==exec::user::HANDSHAKE){
+                    if(msg.has_mode() && msg.m_mode==exec::rpc::REQUEST){
+                        DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handshake request from server. Send response.\n";
+                        exec::user::UserToBitHandshake u2b;
+                        u2b.set_channel(exec::shared::USER);
+                        u2b.set_rpc_version(DRILL_RPC_VERSION);
+                        u2b.set_support_listening(true);
+                        OutBoundRpcMessage out_msg(exec::rpc::RESPONSE, exec::user::HANDSHAKE, msg.m_coord_id, &u2b);
+                        sendSync(out_msg);
+                        DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handshake response sent.\n";
+                    }else{
+                        DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handshake response from server. Ignore.\n";
+                    }
+                }else{
+                DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: ERR_QRY_INVRPCTYPE. "
+                        << "QueryResult returned " << msg.m_rpc_type << std::endl;
                 handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVRPCTYPE, msg.m_rpc_type), NULL);
+                }
                 return;
             }
         }

http://git-wip-us.apache.org/repos/asf/drill/blob/4304b25e/contrib/native/client/src/clientlib/rpcDecoder.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/rpcDecoder.cpp b/contrib/native/client/src/clientlib/rpcDecoder.cpp
index c1001fd..d3cf50c 100644
--- a/contrib/native/client/src/clientlib/rpcDecoder.cpp
+++ b/contrib/native/client/src/clientlib/rpcDecoder.cpp
@@ -84,8 +84,10 @@ int RpcDecoder::Decode(const uint8_t* buf, int length, InBoundRpcMessage& msg) {
     int header_limit = cis->PushLimit(header_length);
     header.ParseFromCodedStream(cis);
     cis->PopLimit(header_limit);
+    msg.m_has_mode = header.has_mode();
     msg.m_mode = header.mode();
     msg.m_coord_id = header.coordination_id();
+    msg.m_has_rpc_type = header.has_rpc_type();
     msg.m_rpc_type = header.rpc_type();
 
     //if(RpcConstants.EXTRA_DEBUGGING) logger.debug(" post header read index {}", buffer.readerIndex());

http://git-wip-us.apache.org/repos/asf/drill/blob/4304b25e/contrib/native/client/src/clientlib/rpcMessage.hpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/rpcMessage.hpp b/contrib/native/client/src/clientlib/rpcMessage.hpp
index fa92c42..6696971 100644
--- a/contrib/native/client/src/clientlib/rpcMessage.hpp
+++ b/contrib/native/client/src/clientlib/rpcMessage.hpp
@@ -33,6 +33,10 @@ class InBoundRpcMessage {
         int m_coord_id;
         DataBuf m_pbody;
         ByteBuf_t m_dbody;
+        bool m_has_mode;
+        bool m_has_rpc_type;
+        bool has_mode() { return m_has_mode; };
+        bool has_rpc_type() { return m_has_rpc_type; };
 };
 
 class OutBoundRpcMessage {