You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pa...@apache.org on 2015/05/13 20:19:04 UTC

[3/3] drill git commit: DRILL-2998: Implement heartbeat in C++ client

DRILL-2998: Implement heartbeat in C++ client


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/ffbb9c7a
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/ffbb9c7a
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/ffbb9c7a

Branch: refs/heads/master
Commit: ffbb9c7adc6360744bee186e1f69d47dc743f73e
Parents: d1526f9
Author: Parth Chandra <pa...@apache.org>
Authored: Fri May 8 17:53:39 2015 -0700
Committer: Parth Chandra <pa...@apache.org>
Committed: Wed May 13 10:48:58 2015 -0700

----------------------------------------------------------------------
 .../native/client/example/querySubmitter.cpp    |   3 +-
 .../native/client/src/clientlib/drillClient.cpp |  18 +
 .../client/src/clientlib/drillClientImpl.cpp    | 171 ++++-
 .../client/src/clientlib/drillClientImpl.hpp    |  36 +-
 .../native/client/src/include/drill/common.hpp  |   3 +-
 .../client/src/include/drill/drillClient.hpp    |   7 +
 .../native/client/src/protobuf/BitControl.pb.cc | 570 ++++++++++++----
 .../native/client/src/protobuf/BitControl.pb.h  | 441 ++++++++----
 .../native/client/src/protobuf/GeneralRPC.pb.cc |  10 +-
 .../native/client/src/protobuf/GeneralRPC.pb.h  |   6 +-
 contrib/native/client/src/protobuf/User.pb.cc   |  83 ++-
 contrib/native/client/src/protobuf/User.pb.h    |  37 +-
 .../client/src/protobuf/UserBitShared.pb.cc     | 674 +++++++++++++++----
 .../client/src/protobuf/UserBitShared.pb.h      | 614 ++++++++++++++++-
 14 files changed, 2215 insertions(+), 458 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/ffbb9c7a/contrib/native/client/example/querySubmitter.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/example/querySubmitter.cpp b/contrib/native/client/example/querySubmitter.cpp
index 85e89e0..960ff4f 100644
--- a/contrib/native/client/example/querySubmitter.cpp
+++ b/contrib/native/client/example/querySubmitter.cpp
@@ -82,6 +82,7 @@ Drill::status_t QueryResultsListener(void* ctx, Drill::RecordBatch* b, Drill::Dr
             }
         }else{
             std::cout << "Query Complete." << std::endl;
+            return Drill::QRY_SUCCESS;
 		}
     }else{
         assert(b==NULL);
@@ -368,7 +369,7 @@ int main(int argc, char* argv[]) {
                 row=0;
                 Drill::RecordIterator* pRecIter=*recordIterIter;
                 Drill::FieldDefPtr fields= pRecIter->getColDefs();
-                while((ret=pRecIter->next()), ret==Drill::QRY_SUCCESS || ret==Drill::QRY_SUCCESS_WITH_INFO){
+                while((ret=pRecIter->next()), (ret==Drill::QRY_SUCCESS || ret==Drill::QRY_SUCCESS_WITH_INFO) && !pRecIter->hasError()){
                     fields = pRecIter->getColDefs();
                     row++;
                     if( (ret==Drill::QRY_SUCCESS_WITH_INFO  && pRecIter->hasSchemaChanged() )|| ( row%100==1)){

http://git-wip-us.apache.org/repos/asf/drill/blob/ffbb9c7a/contrib/native/client/src/clientlib/drillClient.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/drillClient.cpp b/contrib/native/client/src/clientlib/drillClient.cpp
index 7162f63..e536fc3 100644
--- a/contrib/native/client/src/clientlib/drillClient.cpp
+++ b/contrib/native/client/src/clientlib/drillClient.cpp
@@ -50,6 +50,8 @@ uint64_t DrillClientConfig::s_bufferLimit=MAX_MEM_ALLOC_SIZE;
 int32_t DrillClientConfig::s_socketTimeout=0;
 int32_t DrillClientConfig::s_handshakeTimeout=5;
 int32_t DrillClientConfig::s_queryTimeout=180;
+int32_t DrillClientConfig::s_heartbeatFrequency=15; // 15 seconds
+
 boost::mutex DrillClientConfig::s_mutex;
 
 DrillClientConfig::DrillClientConfig(){
@@ -100,6 +102,13 @@ void DrillClientConfig::setQueryTimeout(int32_t t){
     }
 }
 
+void DrillClientConfig::setHeartbeatFrequency(int32_t t){
+    if (t>0){
+        boost::lock_guard<boost::mutex> configLock(DrillClientConfig::s_mutex);
+        s_heartbeatFrequency=t;
+    }
+}
+
 int32_t DrillClientConfig::getSocketTimeout(){
     boost::lock_guard<boost::mutex> configLock(DrillClientConfig::s_mutex);
     return s_socketTimeout;
@@ -115,6 +124,11 @@ int32_t DrillClientConfig::getQueryTimeout(){
     return s_queryTimeout;
 }
 
+int32_t DrillClientConfig::getHeartbeatFrequency(){
+    boost::lock_guard<boost::mutex> configLock(DrillClientConfig::s_mutex);
+    return s_heartbeatFrequency;
+}
+
 logLevel_t DrillClientConfig::getLogLevel(){
     boost::lock_guard<boost::mutex> configLock(DrillClientConfig::s_mutex);
     return s_logLevel;
@@ -281,6 +295,10 @@ void RecordIterator::registerSchemaChangeListener(pfnSchemaListener l){
     this->m_pQueryResult->registerSchemaChangeListener(l);
 }
 
+bool RecordIterator::hasError(){
+    return m_pQueryResult->hasError();
+}
+
 const std::string& RecordIterator::getError(){
     return m_pQueryResult->getError()->msg;
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/ffbb9c7a/contrib/native/client/src/clientlib/drillClientImpl.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/drillClientImpl.cpp b/contrib/native/client/src/clientlib/drillClientImpl.cpp
index eca0e75..97afb88 100644
--- a/contrib/native/client/src/clientlib/drillClientImpl.cpp
+++ b/contrib/native/client/src/clientlib/drillClientImpl.cpp
@@ -22,6 +22,7 @@
 #include <string.h>
 #include <boost/asio.hpp>
 #include <boost/bind.hpp>
+#include <boost/date_time/posix_time/posix_time.hpp>
 #include <boost/date_time/posix_time/posix_time_duration.hpp>
 #include <boost/lexical_cast.hpp>
 #include <boost/thread.hpp>
@@ -148,6 +149,13 @@ connectionStatus_t DrillClientImpl::connect(const char* host, const char* port){
         return handleConnError(CONN_FAILURE, getMessage(ERR_CONN_EXCEPT, e.what()));
     }
 
+    // set socket keep alive
+    boost::asio::socket_base::keep_alive keepAlive(true);
+    m_socket.set_option(keepAlive);
+	// set no_delay
+    boost::asio::ip::tcp::no_delay noDelay(true);
+    m_socket.set_option(noDelay);
+
     //
     // We put some OS dependent code here for timing out a socket. Mostly, this appears to
     // do nothing. Should we leave it in there?
@@ -157,6 +165,74 @@ connectionStatus_t DrillClientImpl::connect(const char* host, const char* port){
     return CONN_SUCCESS;
 }
 
+void DrillClientImpl::startHeartbeatTimer(){
+    DRILL_LOG(LOG_TRACE) << "Started new heartbeat timer with "
+        << DrillClientConfig::getHeartbeatFrequency() << " seconds." << std::endl;
+    m_heartbeatTimer.expires_from_now(boost::posix_time::seconds(DrillClientConfig::getHeartbeatFrequency()));
+    m_heartbeatTimer.async_wait(boost::bind(
+                &DrillClientImpl::handleHeartbeatTimeout,
+                this,
+                boost::asio::placeholders::error
+                ));
+	    startMessageListener(); // start this thread early so we don't have the timer blocked
+}
+
+connectionStatus_t DrillClientImpl::sendHeartbeat(){
+	connectionStatus_t status=CONN_SUCCESS;
+    exec::rpc::Ack ack;
+    ack.set_ok(true);
+    OutBoundRpcMessage heartbeatMsg(exec::rpc::PING, exec::user::ACK/*can be anything */, 0, &ack);
+	boost::lock_guard<boost::mutex> prLock(this->m_prMutex);
+	boost::lock_guard<boost::mutex> lock(m_dcMutex);
+    DRILL_LOG(LOG_TRACE) << "Heartbeat sent." << std::endl;
+    status=sendSync(heartbeatMsg);
+    status=status==CONN_SUCCESS?status:CONN_DEAD;
+    //If the server sends responses to a heartbeat, we need to increment the pending requests counter.
+    if(m_pendingRequests++==0){
+        getNextResult(); // async wait for results
+    }
+    return status;
+}
+
+void DrillClientImpl::resetHeartbeatTimer(){
+    m_heartbeatTimer.cancel();
+    DRILL_LOG(LOG_TRACE) << "Reset Heartbeat timer." << std::endl;
+    startHeartbeatTimer();
+}
+
+
+
+void DrillClientImpl::handleHeartbeatTimeout(const boost::system::error_code & err){
+    DRILL_LOG(LOG_TRACE) << "DrillClientImpl:: Heartbeat timer expired." << std::endl;
+    if(err != boost::asio::error::operation_aborted){
+        // Check whether the deadline has passed.
+        DRILL_LOG(LOG_TRACE) << "DrillClientImpl::Heartbeat Timer -  Expires at: " 
+            << to_simple_string(m_heartbeatTimer.expires_at())
+            << " and time now is: "
+            << to_simple_string(boost::asio::deadline_timer::traits_type::now())
+            << std::endl;
+            ;
+        if (m_heartbeatTimer.expires_at() <= boost::asio::deadline_timer::traits_type::now()){
+            // The deadline has passed.
+            m_heartbeatTimer.expires_at(boost::posix_time::pos_infin);
+            if(sendHeartbeat()==CONN_SUCCESS){
+                startHeartbeatTimer();
+            }else{
+                // Close connection.
+                DRILL_LOG(LOG_TRACE) << "DrillClientImpl:: No heartbeat. Closing connection.";
+                shutdownSocket();
+            }
+        }
+    }
+    return;
+}
+
+
+void DrillClientImpl::Close() {
+    shutdownSocket();
+}
+
+
 connectionStatus_t DrillClientImpl::sendSync(OutBoundRpcMessage& msg){
     DrillClientImpl::s_encoder.Encode(m_wbuf, msg);
     boost::system::error_code ec;
@@ -205,6 +281,7 @@ connectionStatus_t DrillClientImpl::recvHandshake(){
         return static_cast<connectionStatus_t>(m_pError->status);
     }
 #endif // WIN32_SHUTDOWN_ON_TIMEOUT
+    startHeartbeatTimer();
 
     return CONN_SUCCESS;
 }
@@ -285,6 +362,7 @@ connectionStatus_t DrillClientImpl::validateHandshake(DrillUserProperties* prope
     u2b.set_channel(exec::shared::USER);
     u2b.set_rpc_version(DRILL_RPC_VERSION);
     u2b.set_support_listening(true);
+    u2b.set_support_timeout(true);
 
     if(properties != NULL && properties->size()>0){
         std::string username;
@@ -369,6 +447,21 @@ connectionStatus_t DrillClientImpl::validateHandshake(DrillUserProperties* prope
 
 FieldDefPtr DrillClientQueryResult::s_emptyColDefs( new (std::vector<Drill::FieldMetadata*>));
 
+void DrillClientImpl::startMessageListener() {
+    if(this->m_pListenerThread==NULL){
+        // Stopping the io_service from running out-of-work
+        if(m_io_service.stopped()){
+            DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::startMessageListener: io_service is stopped. Restarting." <<std::endl;
+            m_io_service.reset();
+        }
+        this->m_pWork = new boost::asio::io_service::work(m_io_service);
+        this->m_pListenerThread = new boost::thread(boost::bind(&boost::asio::io_service::run,
+                    &this->m_io_service));
+        DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::startMessageListener: Starting listener thread: "
+            << this->m_pListenerThread << std::endl;
+    }
+}
+
 DrillClientQueryResult* DrillClientImpl::SubmitQuery(::exec::shared::QueryType t,
         const std::string& plan,
         pfnQueryResultsListener l,
@@ -408,20 +501,8 @@ DrillClientQueryResult* DrillClientImpl::SubmitQuery(::exec::shared::QueryType t
     }
 
     //run this in a new thread
-    {
-        if(this->m_pListenerThread==NULL){
-            // Stopping the io_service from running out-of-work
-            if(m_io_service.stopped()){
-                DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::SubmitQuery: io_service is stopped. Restarting." <<std::endl;
-                m_io_service.reset();
-            }
-            this->m_pWork = new boost::asio::io_service::work(m_io_service);
-            this->m_pListenerThread = new boost::thread(boost::bind(&boost::asio::io_service::run,
-                &this->m_io_service));
-            DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::SubmitQuery: Starting listener thread: "
-                << this->m_pListenerThread << std::endl;
-        }
-    }
+    startMessageListener();
+
     return pQuery;
 }
 
@@ -437,6 +518,7 @@ void DrillClientImpl::getNextResult(){
             AllocatedBuffer::s_memCV.wait(memLock);
         }
     }
+    
     //use free, not delete to free
     ByteBuf_t readBuf = Utils::allocateBuffer(LEN_PREFIX_BUFLEN);
     if (DrillClientConfig::getQueryTimeout() > 0){
@@ -450,6 +532,8 @@ void DrillClientImpl::getNextResult(){
             ));
     }
 
+    resetHeartbeatTimer();
+
     async_read(
             this->m_socket,
             boost::asio::buffer(readBuf, LEN_PREFIX_BUFLEN),
@@ -464,13 +548,15 @@ void DrillClientImpl::getNextResult(){
 }
 
 void DrillClientImpl::waitForResults(){
-    if(this->m_pListenerThread!=NULL){
-        // do nothing. No we do not need to explicity wait for the listener thread to finish
-        delete this->m_pWork; this->m_pWork = NULL; // inform io_service that io_service is permited to exit
-        this->m_pListenerThread->join();
-        DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::waitForResults: Listener thread "
-            << this->m_pListenerThread << " exited." << std::endl;
-        delete this->m_pListenerThread; this->m_pListenerThread=NULL;
+    // The listener thread never exists because it may be sending/receiving a heartbeat. Before the heartbeat was introduced
+    // we could check if the listener thread has exited to tell if the queries are done. We can no longer do so now. We check
+    // a condition variable instead
+    {
+        boost::unique_lock<boost::mutex> cvLock(this->m_dcMutex);
+        //if no more data, return NULL;
+        while(this->m_pendingRequests>0) {
+            this->m_cv.wait(cvLock);
+        }
     }
 }
 
@@ -511,6 +597,7 @@ status_t DrillClientImpl::readMsg(ByteBuf_t _buf,
                 << (rmsgLen - leftover) << std::endl;
             ByteBuf_t b=currentBuffer->m_pBuffer + leftover;
             size_t bytesToRead=rmsgLen - leftover;
+              
             while(1){
                 size_t dataBytesRead=this->m_socket.read_some(
                         boost::asio::buffer(b, bytesToRead),
@@ -521,6 +608,7 @@ status_t DrillClientImpl::readMsg(ByteBuf_t _buf,
                 bytesToRead-=dataBytesRead;
                 b+=dataBytesRead;
             }
+            
             if(!error){
                 // read data successfully
                 DrillClientImpl::s_decoder.Decode(currentBuffer->m_pBuffer, rmsgLen, msg);
@@ -583,7 +671,7 @@ status_t DrillClientImpl::processQueryResult(AllocatedBufferPtr  allocatedBuffer
                 ret=QRY_CANCELED;
             }
             delete allocatedBuffer;
-            return ret;
+            //return ret;
         }else{
             // Normal query results come back with query_state not set.
             // Actually this is not strictly true. The query state is set to
@@ -591,6 +679,12 @@ status_t DrillClientImpl::processQueryResult(AllocatedBufferPtr  allocatedBuffer
             DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: Query State was not set.\n";
         }
     }
+    DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: " << m_pendingRequests << " requests pending." << std::endl;
+    if(m_pendingRequests==0){
+        // signal any waiting client that it can exit because there are no more any query results to arrive.
+        // We keep the heartbeat going though.
+        m_cv.notify_one();
+    }
     return ret;
 }
 
@@ -841,7 +935,20 @@ void DrillClientImpl::handleRead(ByteBuf_t _buf,
             return;
         }
 
-        if(!error && msg.m_rpc_type==exec::user::QUERY_RESULT){
+        if(!error && msg.m_mode==exec::rpc::PONG){ //heartbeat response. Throw it away
+            m_pendingRequests--;
+            delete allocatedBuffer;
+            DRILL_LOG(LOG_TRACE) << "Received heartbeat from server. " <<  std::endl;
+            if(m_pendingRequests!=0){
+                boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
+                getNextResult();
+            }else{
+				boost::unique_lock<boost::mutex> cvLock(this->m_dcMutex);
+                DRILL_LOG(LOG_TRACE) << "No more results expected from server. " <<  std::endl;
+				m_cv.notify_one();
+            }
+            return;
+        }else if(!error && msg.m_rpc_type==exec::user::QUERY_RESULT){
             status_t s = processQueryResult(allocatedBuffer, msg);
             if(s !=QRY_SUCCESS && s!= QRY_NO_MORE_DATA){
                 if(m_pendingRequests!=0){
@@ -991,10 +1098,18 @@ void DrillClientImpl::broadcastError(DrillClientError* pErr){
         std::map<int, DrillClientQueryResult*>::iterator iter;
         if(!m_queryIds.empty()){
             for(iter = m_queryIds.begin(); iter != m_queryIds.end(); iter++) {
-                iter->second->signalError(pErr);
+                DrillClientError* err=new DrillClientError(pErr->status, pErr->errnum, pErr->msg);
+                iter->second->signalError(err);
             }
         }
+        delete pErr;
     }
+    // We have an error at the connection level. Cancel the heartbeat. 
+    // And close the connection 
+    m_heartbeatTimer.cancel();
+    m_pendingRequests=0;
+    m_cv.notify_one();
+    shutdownSocket();
     return;
 }
 
@@ -1054,6 +1169,14 @@ void DrillClientImpl::sendCancel(exec::shared::QueryId* pQueryId){
     DRILL_LOG(LOG_TRACE) << "CANCEL sent" << std::endl;
 }
 
+void DrillClientImpl::shutdownSocket(){
+    m_io_service.stop();
+    boost::system::error_code ignorederr;
+    m_socket.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ignorederr);
+    m_bIsConnected=false;
+    DRILL_LOG(LOG_TRACE) << "Socket shutdown" << std::endl;
+}
+
 // This COPIES the FieldMetadata definition for the record batch.  ColumnDefs held by this
 // class are used by the async callbacks.
 status_t DrillClientQueryResult::setupColumnDefs(exec::shared::QueryData* pQueryData) {

http://git-wip-us.apache.org/repos/asf/drill/blob/ffbb9c7a/contrib/native/client/src/clientlib/drillClientImpl.hpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/drillClientImpl.hpp b/contrib/native/client/src/clientlib/drillClientImpl.hpp
index 04d59c7..ada63e1 100644
--- a/contrib/native/client/src/clientlib/drillClientImpl.hpp
+++ b/contrib/native/client/src/clientlib/drillClientImpl.hpp
@@ -201,6 +201,7 @@ class DrillClientImpl{
             m_pWork(NULL),
             m_socket(m_io_service),
             m_deadlineTimer(m_io_service),
+            m_heartbeatTimer(m_io_service),
             m_rbuf(NULL),
             m_wbuf(MAX_SOCK_RD_BUFSIZE)
     {
@@ -218,6 +219,7 @@ class DrillClientImpl{
                 this->m_pWork = NULL;
             }
 
+            m_heartbeatTimer.cancel();
             m_deadlineTimer.cancel();
             m_io_service.stop();
             boost::system::error_code ignorederr;
@@ -229,6 +231,13 @@ class DrillClientImpl{
             if(m_pError!=NULL){
                 delete m_pError; m_pError=NULL;
             }
+            //Terminate and free the heartbeat thread
+            //if(this->m_pHeartbeatThread!=NULL){
+            //    this->m_pHeartbeatThread->interrupt();
+            //    this->m_pHeartbeatThread->join();
+            //    delete this->m_pHeartbeatThread;
+            //    this->m_pHeartbeatThread = NULL;
+            //}
             //Terminate and free the listener thread
             if(this->m_pListenerThread!=NULL){
                 this->m_pListenerThread->interrupt();
@@ -260,6 +269,11 @@ class DrillClientImpl{
         // Direct connection to a drillbit
         // host can be name or ip address, port can be port number or name of service in /etc/services
         connectionStatus_t connect(const char* host, const char* port);
+        void startHeartbeatTimer();// start a heartbeat timer
+        connectionStatus_t sendHeartbeat(); // send a heartbeat to the server
+        void resetHeartbeatTimer(); // reset the heartbeat timer (called every time one sends a message to the server (after sendAck, or submitQuery)
+        void handleHeartbeatTimeout(const boost::system::error_code & err); // send a heartbeat. If send fails, broadcast error, close connection and bail out.
+
         int32_t getNextCoordinationId(){ return ++m_coordinationId; };
         void parseConnectStr(const char* connectStr, std::string& pathToDrill, std::string& protocol, std::string& hostPortStr);
         // send synchronous messages
@@ -269,6 +283,8 @@ class DrillClientImpl{
         connectionStatus_t recvHandshake();
         void handleHandshake(ByteBuf_t b, const boost::system::error_code& err, std::size_t bytes_transferred );
         void handleHShakeReadTimeout(const boost::system::error_code & err);
+        // starts the listener thread that receives responses/messages from the server
+        void startMessageListener(); 
         // Query results
         void getNextResult();
         status_t readMsg(
@@ -302,6 +318,8 @@ class DrillClientImpl{
         void sendAck(InBoundRpcMessage& msg, bool isOk);
         void sendCancel(exec::shared::QueryId* pQueryId);
 
+        void shutdownSocket();
+
 
         static RpcEncoder s_encoder;
         static RpcDecoder s_decoder;
@@ -325,6 +343,10 @@ class DrillClientImpl{
         // If the error is query specific, only the query results object will have the error set.
         DrillClientError* m_pError;
 
+        //Started after the connection is established and sends heartbeat messages after {heartbeat frequency} seconds
+        //The thread is killed on disconnect.
+        //boost::thread * m_pHeartbeatThread;
+
         // for boost asio
         boost::thread * m_pListenerThread;
         boost::asio::io_service m_io_service;
@@ -332,6 +354,7 @@ class DrillClientImpl{
         boost::asio::io_service::work * m_pWork;
         boost::asio::ip::tcp::socket m_socket;
         boost::asio::deadline_timer m_deadlineTimer; // to timeout async queries that never return
+        boost::asio::deadline_timer m_heartbeatTimer; // to send heartbeat messages
 
         //for synchronous messages, like validate handshake
         ByteBuf_t m_rbuf; // buffer for receiving synchronous messages
@@ -346,22 +369,15 @@ class DrillClientImpl{
         // Map of query id to query result for currently executing queries
         std::map<exec::shared::QueryId*, DrillClientQueryResult*, compareQueryId> m_queryResults;
 
+        // Condition variable to signal completion of all queries. 
+        boost::condition_variable m_cv;
+
 };
 
 inline bool DrillClientImpl::Active() {
     return this->m_bIsConnected;;
 }
 
-inline void DrillClientImpl::Close() {
-    //TODO: cancel pending query
-    if(this->m_bIsConnected){
-        boost::system::error_code ignorederr;
-        m_socket.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ignorederr);
-        m_socket.close();
-        m_bIsConnected=false;
-    }
-}
-
 class ZookeeperImpl{
     public:
         ZookeeperImpl();

http://git-wip-us.apache.org/repos/asf/drill/blob/ffbb9c7a/contrib/native/client/src/include/drill/common.hpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/include/drill/common.hpp b/contrib/native/client/src/include/drill/common.hpp
index 2fa0954..da41149 100644
--- a/contrib/native/client/src/include/drill/common.hpp
+++ b/contrib/native/client/src/include/drill/common.hpp
@@ -109,7 +109,8 @@ typedef enum{
     CONN_HANDSHAKE_TIMEOUT=5,
     CONN_HOSTNAME_RESOLUTION_ERROR=6,
     CONN_AUTH_FAILED=7,
-    CONN_BAD_RPC_VER=8
+    CONN_BAD_RPC_VER=8,
+    CONN_DEAD=9
 } connectionStatus_t;
 
 typedef enum{

http://git-wip-us.apache.org/repos/asf/drill/blob/ffbb9c7a/contrib/native/client/src/include/drill/drillClient.hpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/include/drill/drillClient.hpp b/contrib/native/client/src/include/drill/drillClient.hpp
index d7bf33c..4568ca1 100644
--- a/contrib/native/client/src/include/drill/drillClient.hpp
+++ b/contrib/native/client/src/include/drill/drillClient.hpp
@@ -101,9 +101,11 @@ class DECLSPEC_DRILL_CLIENT DrillClientConfig{
         static void setSocketTimeout(int32_t l);
         static void setHandshakeTimeout(int32_t l);
         static void setQueryTimeout(int32_t l);
+        static void setHeartbeatFrequency(int32_t l);
         static int32_t getSocketTimeout();
         static int32_t getHandshakeTimeout();
         static int32_t getQueryTimeout();
+        static int32_t getHeartbeatFrequency();
         static logLevel_t getLogLevel();
     private:
         // The logging level
@@ -127,10 +129,14 @@ class DECLSPEC_DRILL_CLIENT DrillClientConfig{
          *
          * s_queryTimeout: (default 180)
          *      place a timeout on waiting result of querying.
+         *
+         * s_heartbeatFrequency: (default 30)
+         *      Seconds of idle activity after which a heartbeat is sent to the drillbit
          */
         static int32_t s_socketTimeout;
         static int32_t s_handshakeTimeout;
         static int32_t s_queryTimeout;
+        static int32_t s_heartbeatFrequency;
         static boost::mutex s_mutex;
 };
 
@@ -215,6 +221,7 @@ class DECLSPEC_DRILL_CLIENT RecordIterator{
 
     void registerSchemaChangeListener(pfnSchemaListener l);
 
+    bool hasError();
     /*
      * Returns the last error message
      */

http://git-wip-us.apache.org/repos/asf/drill/blob/ffbb9c7a/contrib/native/client/src/protobuf/BitControl.pb.cc
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/protobuf/BitControl.pb.cc b/contrib/native/client/src/protobuf/BitControl.pb.cc
index 827f708..a4661b4 100644
--- a/contrib/native/client/src/protobuf/BitControl.pb.cc
+++ b/contrib/native/client/src/protobuf/BitControl.pb.cc
@@ -37,6 +37,9 @@ const ::google::protobuf::internal::GeneratedMessageReflection*
 const ::google::protobuf::Descriptor* PlanFragment_descriptor_ = NULL;
 const ::google::protobuf::internal::GeneratedMessageReflection*
   PlanFragment_reflection_ = NULL;
+const ::google::protobuf::Descriptor* QueryContextInformation_descriptor_ = NULL;
+const ::google::protobuf::internal::GeneratedMessageReflection*
+  QueryContextInformation_reflection_ = NULL;
 const ::google::protobuf::Descriptor* WorkQueueStatus_descriptor_ = NULL;
 const ::google::protobuf::internal::GeneratedMessageReflection*
   WorkQueueStatus_reflection_ = NULL;
@@ -118,22 +121,21 @@ void protobuf_AssignDesc_BitControl_2eproto() {
       ::google::protobuf::MessageFactory::generated_factory(),
       sizeof(InitializeFragments));
   PlanFragment_descriptor_ = file->message_type(4);
-  static const int PlanFragment_offsets_[15] = {
+  static const int PlanFragment_offsets_[14] = {
     GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(PlanFragment, handle_),
     GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(PlanFragment, network_cost_),
     GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(PlanFragment, cpu_cost_),
     GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(PlanFragment, disk_cost_),
     GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(PlanFragment, memory_cost_),
     GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(PlanFragment, fragment_json_),
-    GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(PlanFragment, assignment_),
     GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(PlanFragment, leaf_fragment_),
+    GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(PlanFragment, assignment_),
     GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(PlanFragment, foreman_),
     GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(PlanFragment, mem_initial_),
     GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(PlanFragment, mem_max_),
-    GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(PlanFragment, query_start_time_),
     GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(PlanFragment, credentials_),
-    GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(PlanFragment, time_zone_),
     GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(PlanFragment, options_json_),
+    GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(PlanFragment, context_),
   };
   PlanFragment_reflection_ =
     new ::google::protobuf::internal::GeneratedMessageReflection(
@@ -146,7 +148,24 @@ void protobuf_AssignDesc_BitControl_2eproto() {
       ::google::protobuf::DescriptorPool::generated_pool(),
       ::google::protobuf::MessageFactory::generated_factory(),
       sizeof(PlanFragment));
-  WorkQueueStatus_descriptor_ = file->message_type(5);
+  QueryContextInformation_descriptor_ = file->message_type(5);
+  static const int QueryContextInformation_offsets_[3] = {
+    GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(QueryContextInformation, query_start_time_),
+    GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(QueryContextInformation, time_zone_),
+    GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(QueryContextInformation, default_schema_name_),
+  };
+  QueryContextInformation_reflection_ =
+    new ::google::protobuf::internal::GeneratedMessageReflection(
+      QueryContextInformation_descriptor_,
+      QueryContextInformation::default_instance_,
+      QueryContextInformation_offsets_,
+      GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(QueryContextInformation, _has_bits_[0]),
+      GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(QueryContextInformation, _unknown_fields_),
+      -1,
+      ::google::protobuf::DescriptorPool::generated_pool(),
+      ::google::protobuf::MessageFactory::generated_factory(),
+      sizeof(QueryContextInformation));
+  WorkQueueStatus_descriptor_ = file->message_type(6);
   static const int WorkQueueStatus_offsets_[3] = {
     GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(WorkQueueStatus, endpoint_),
     GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(WorkQueueStatus, queue_length_),
@@ -163,7 +182,7 @@ void protobuf_AssignDesc_BitControl_2eproto() {
       ::google::protobuf::DescriptorPool::generated_pool(),
       ::google::protobuf::MessageFactory::generated_factory(),
       sizeof(WorkQueueStatus));
-  FinishedReceiver_descriptor_ = file->message_type(6);
+  FinishedReceiver_descriptor_ = file->message_type(7);
   static const int FinishedReceiver_offsets_[2] = {
     GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(FinishedReceiver, receiver_),
     GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(FinishedReceiver, sender_),
@@ -203,6 +222,8 @@ void protobuf_RegisterTypes(const ::std::string&) {
   ::google::protobuf::MessageFactory::InternalRegisterGeneratedMessage(
     PlanFragment_descriptor_, &PlanFragment::default_instance());
   ::google::protobuf::MessageFactory::InternalRegisterGeneratedMessage(
+    QueryContextInformation_descriptor_, &QueryContextInformation::default_instance());
+  ::google::protobuf::MessageFactory::InternalRegisterGeneratedMessage(
     WorkQueueStatus_descriptor_, &WorkQueueStatus::default_instance());
   ::google::protobuf::MessageFactory::InternalRegisterGeneratedMessage(
     FinishedReceiver_descriptor_, &FinishedReceiver::default_instance());
@@ -221,6 +242,8 @@ void protobuf_ShutdownFile_BitControl_2eproto() {
   delete InitializeFragments_reflection_;
   delete PlanFragment::default_instance_;
   delete PlanFragment_reflection_;
+  delete QueryContextInformation::default_instance_;
+  delete QueryContextInformation_reflection_;
   delete WorkQueueStatus::default_instance_;
   delete WorkQueueStatus_reflection_;
   delete FinishedReceiver::default_instance_;
@@ -249,32 +272,36 @@ void protobuf_AddDesc_BitControl_2eproto() {
     "red.MinorFragmentProfile\022(\n\006handle\030\002 \001(\013"
     "2\030.exec.bit.FragmentHandle\"G\n\023Initialize"
     "Fragments\0220\n\010fragment\030\001 \003(\0132\036.exec.bit.c"
-    "ontrol.PlanFragment\"\275\003\n\014PlanFragment\022(\n\006"
+    "ontrol.PlanFragment\"\314\003\n\014PlanFragment\022(\n\006"
     "handle\030\001 \001(\0132\030.exec.bit.FragmentHandle\022\024"
     "\n\014network_cost\030\004 \001(\002\022\020\n\010cpu_cost\030\005 \001(\002\022\021"
     "\n\tdisk_cost\030\006 \001(\002\022\023\n\013memory_cost\030\007 \001(\002\022\025"
-    "\n\rfragment_json\030\010 \001(\t\022*\n\nassignment\030\n \001("
-    "\0132\026.exec.DrillbitEndpoint\022\025\n\rleaf_fragme"
-    "nt\030\t \001(\010\022\'\n\007foreman\030\013 \001(\0132\026.exec.Drillbi"
+    "\n\rfragment_json\030\010 \001(\t\022\025\n\rleaf_fragment\030\t"
+    " \001(\010\022*\n\nassignment\030\n \001(\0132\026.exec.Drillbit"
+    "Endpoint\022\'\n\007foreman\030\013 \001(\0132\026.exec.Drillbi"
     "tEndpoint\022\035\n\013mem_initial\030\014 \001(\003:\01020000000"
-    "\022\033\n\007mem_max\030\r \001(\003:\n2000000000\022\030\n\020query_s"
-    "tart_time\030\016 \001(\003\0221\n\013credentials\030\017 \001(\0132\034.e"
-    "xec.shared.UserCredentials\022\021\n\ttime_zone\030"
-    "\020 \001(\005\022\024\n\014options_json\030\021 \001(\t\"f\n\017WorkQueue"
-    "Status\022(\n\010endpoint\030\001 \001(\0132\026.exec.Drillbit"
-    "Endpoint\022\024\n\014queue_length\030\002 \001(\005\022\023\n\013report"
-    "_time\030\003 \001(\003\"h\n\020FinishedReceiver\022*\n\010recei"
-    "ver\030\001 \001(\0132\030.exec.bit.FragmentHandle\022(\n\006s"
-    "ender\030\002 \001(\0132\030.exec.bit.FragmentHandle*\271\002"
-    "\n\007RpcType\022\r\n\tHANDSHAKE\020\000\022\007\n\003ACK\020\001\022\013\n\007GOO"
-    "DBYE\020\002\022\034\n\030REQ_INIATILIZE_FRAGMENTS\020\003\022\027\n\023"
-    "REQ_CANCEL_FRAGMENT\020\006\022\031\n\025REQ_RECEIVER_FI"
-    "NISHED\020\007\022\027\n\023REQ_FRAGMENT_STATUS\020\010\022\022\n\016REQ"
-    "_BIT_STATUS\020\t\022\024\n\020REQ_QUERY_STATUS\020\n\022\024\n\020R"
-    "EQ_QUERY_CANCEL\020\017\022\030\n\024RESP_FRAGMENT_HANDL"
-    "E\020\013\022\030\n\024RESP_FRAGMENT_STATUS\020\014\022\023\n\017RESP_BI"
-    "T_STATUS\020\r\022\025\n\021RESP_QUERY_STATUS\020\016B+\n\033org"
-    ".apache.drill.exec.protoB\nBitControlH\001", 1518);
+    "\022\033\n\007mem_max\030\r \001(\003:\n2000000000\0221\n\013credent"
+    "ials\030\016 \001(\0132\034.exec.shared.UserCredentials"
+    "\022\024\n\014options_json\030\017 \001(\t\022:\n\007context\030\020 \001(\0132"
+    ").exec.bit.control.QueryContextInformati"
+    "on\"c\n\027QueryContextInformation\022\030\n\020query_s"
+    "tart_time\030\001 \001(\003\022\021\n\ttime_zone\030\002 \001(\005\022\033\n\023de"
+    "fault_schema_name\030\003 \001(\t\"f\n\017WorkQueueStat"
+    "us\022(\n\010endpoint\030\001 \001(\0132\026.exec.DrillbitEndp"
+    "oint\022\024\n\014queue_length\030\002 \001(\005\022\023\n\013report_tim"
+    "e\030\003 \001(\003\"h\n\020FinishedReceiver\022*\n\010receiver\030"
+    "\001 \001(\0132\030.exec.bit.FragmentHandle\022(\n\006sende"
+    "r\030\002 \001(\0132\030.exec.bit.FragmentHandle*\323\002\n\007Rp"
+    "cType\022\r\n\tHANDSHAKE\020\000\022\007\n\003ACK\020\001\022\013\n\007GOODBYE"
+    "\020\002\022\034\n\030REQ_INITIALIZE_FRAGMENTS\020\003\022\027\n\023REQ_"
+    "CANCEL_FRAGMENT\020\006\022\031\n\025REQ_RECEIVER_FINISH"
+    "ED\020\007\022\027\n\023REQ_FRAGMENT_STATUS\020\010\022\022\n\016REQ_BIT"
+    "_STATUS\020\t\022\024\n\020REQ_QUERY_STATUS\020\n\022\024\n\020REQ_Q"
+    "UERY_CANCEL\020\017\022\030\n\024REQ_UNPAUSE_FRAGMENT\020\020\022"
+    "\030\n\024RESP_FRAGMENT_HANDLE\020\013\022\030\n\024RESP_FRAGME"
+    "NT_STATUS\020\014\022\023\n\017RESP_BIT_STATUS\020\r\022\025\n\021RESP"
+    "_QUERY_STATUS\020\016B+\n\033org.apache.drill.exec"
+    ".protoB\nBitControlH\001", 1660);
   ::google::protobuf::MessageFactory::InternalRegisterGeneratedFile(
     "BitControl.proto", &protobuf_RegisterTypes);
   BitControlHandshake::default_instance_ = new BitControlHandshake();
@@ -282,6 +309,7 @@ void protobuf_AddDesc_BitControl_2eproto() {
   FragmentStatus::default_instance_ = new FragmentStatus();
   InitializeFragments::default_instance_ = new InitializeFragments();
   PlanFragment::default_instance_ = new PlanFragment();
+  QueryContextInformation::default_instance_ = new QueryContextInformation();
   WorkQueueStatus::default_instance_ = new WorkQueueStatus();
   FinishedReceiver::default_instance_ = new FinishedReceiver();
   BitControlHandshake::default_instance_->InitAsDefaultInstance();
@@ -289,6 +317,7 @@ void protobuf_AddDesc_BitControl_2eproto() {
   FragmentStatus::default_instance_->InitAsDefaultInstance();
   InitializeFragments::default_instance_->InitAsDefaultInstance();
   PlanFragment::default_instance_->InitAsDefaultInstance();
+  QueryContextInformation::default_instance_->InitAsDefaultInstance();
   WorkQueueStatus::default_instance_->InitAsDefaultInstance();
   FinishedReceiver::default_instance_->InitAsDefaultInstance();
   ::google::protobuf::internal::OnShutdown(&protobuf_ShutdownFile_BitControl_2eproto);
@@ -320,6 +349,7 @@ bool RpcType_IsValid(int value) {
     case 13:
     case 14:
     case 15:
+    case 16:
       return true;
     default:
       return false;
@@ -1299,15 +1329,14 @@ const int PlanFragment::kCpuCostFieldNumber;
 const int PlanFragment::kDiskCostFieldNumber;
 const int PlanFragment::kMemoryCostFieldNumber;
 const int PlanFragment::kFragmentJsonFieldNumber;
-const int PlanFragment::kAssignmentFieldNumber;
 const int PlanFragment::kLeafFragmentFieldNumber;
+const int PlanFragment::kAssignmentFieldNumber;
 const int PlanFragment::kForemanFieldNumber;
 const int PlanFragment::kMemInitialFieldNumber;
 const int PlanFragment::kMemMaxFieldNumber;
-const int PlanFragment::kQueryStartTimeFieldNumber;
 const int PlanFragment::kCredentialsFieldNumber;
-const int PlanFragment::kTimeZoneFieldNumber;
 const int PlanFragment::kOptionsJsonFieldNumber;
+const int PlanFragment::kContextFieldNumber;
 #endif  // !_MSC_VER
 
 PlanFragment::PlanFragment()
@@ -1320,6 +1349,7 @@ void PlanFragment::InitAsDefaultInstance() {
   assignment_ = const_cast< ::exec::DrillbitEndpoint*>(&::exec::DrillbitEndpoint::default_instance());
   foreman_ = const_cast< ::exec::DrillbitEndpoint*>(&::exec::DrillbitEndpoint::default_instance());
   credentials_ = const_cast< ::exec::shared::UserCredentials*>(&::exec::shared::UserCredentials::default_instance());
+  context_ = const_cast< ::exec::bit::control::QueryContextInformation*>(&::exec::bit::control::QueryContextInformation::default_instance());
 }
 
 PlanFragment::PlanFragment(const PlanFragment& from)
@@ -1336,15 +1366,14 @@ void PlanFragment::SharedCtor() {
   disk_cost_ = 0;
   memory_cost_ = 0;
   fragment_json_ = const_cast< ::std::string*>(&::google::protobuf::internal::kEmptyString);
-  assignment_ = NULL;
   leaf_fragment_ = false;
+  assignment_ = NULL;
   foreman_ = NULL;
   mem_initial_ = GOOGLE_LONGLONG(20000000);
   mem_max_ = GOOGLE_LONGLONG(2000000000);
-  query_start_time_ = GOOGLE_LONGLONG(0);
   credentials_ = NULL;
-  time_zone_ = 0;
   options_json_ = const_cast< ::std::string*>(&::google::protobuf::internal::kEmptyString);
+  context_ = NULL;
   ::memset(_has_bits_, 0, sizeof(_has_bits_));
 }
 
@@ -1364,6 +1393,7 @@ void PlanFragment::SharedDtor() {
     delete assignment_;
     delete foreman_;
     delete credentials_;
+    delete context_;
   }
 }
 
@@ -1402,10 +1432,10 @@ void PlanFragment::Clear() {
         fragment_json_->clear();
       }
     }
+    leaf_fragment_ = false;
     if (has_assignment()) {
       if (assignment_ != NULL) assignment_->::exec::DrillbitEndpoint::Clear();
     }
-    leaf_fragment_ = false;
   }
   if (_has_bits_[8 / 32] & (0xffu << (8 % 32))) {
     if (has_foreman()) {
@@ -1413,16 +1443,17 @@ void PlanFragment::Clear() {
     }
     mem_initial_ = GOOGLE_LONGLONG(20000000);
     mem_max_ = GOOGLE_LONGLONG(2000000000);
-    query_start_time_ = GOOGLE_LONGLONG(0);
     if (has_credentials()) {
       if (credentials_ != NULL) credentials_->::exec::shared::UserCredentials::Clear();
     }
-    time_zone_ = 0;
     if (has_options_json()) {
       if (options_json_ != &::google::protobuf::internal::kEmptyString) {
         options_json_->clear();
       }
     }
+    if (has_context()) {
+      if (context_ != NULL) context_->::exec::bit::control::QueryContextInformation::Clear();
+    }
   }
   ::memset(_has_bits_, 0, sizeof(_has_bits_));
   mutable_unknown_fields()->Clear();
@@ -1600,29 +1631,13 @@ bool PlanFragment::MergePartialFromCodedStream(
         } else {
           goto handle_uninterpreted;
         }
-        if (input->ExpectTag(112)) goto parse_query_start_time;
+        if (input->ExpectTag(114)) goto parse_credentials;
         break;
       }
 
-      // optional int64 query_start_time = 14;
+      // optional .exec.shared.UserCredentials credentials = 14;
       case 14: {
         if (::google::protobuf::internal::WireFormatLite::GetTagWireType(tag) ==
-            ::google::protobuf::internal::WireFormatLite::WIRETYPE_VARINT) {
-         parse_query_start_time:
-          DO_((::google::protobuf::internal::WireFormatLite::ReadPrimitive<
-                   ::google::protobuf::int64, ::google::protobuf::internal::WireFormatLite::TYPE_INT64>(
-                 input, &query_start_time_)));
-          set_has_query_start_time();
-        } else {
-          goto handle_uninterpreted;
-        }
-        if (input->ExpectTag(122)) goto parse_credentials;
-        break;
-      }
-
-      // optional .exec.shared.UserCredentials credentials = 15;
-      case 15: {
-        if (::google::protobuf::internal::WireFormatLite::GetTagWireType(tag) ==
             ::google::protobuf::internal::WireFormatLite::WIRETYPE_LENGTH_DELIMITED) {
          parse_credentials:
           DO_(::google::protobuf::internal::WireFormatLite::ReadMessageNoVirtual(
@@ -1630,36 +1645,34 @@ bool PlanFragment::MergePartialFromCodedStream(
         } else {
           goto handle_uninterpreted;
         }
-        if (input->ExpectTag(128)) goto parse_time_zone;
+        if (input->ExpectTag(122)) goto parse_options_json;
         break;
       }
 
-      // optional int32 time_zone = 16;
-      case 16: {
+      // optional string options_json = 15;
+      case 15: {
         if (::google::protobuf::internal::WireFormatLite::GetTagWireType(tag) ==
-            ::google::protobuf::internal::WireFormatLite::WIRETYPE_VARINT) {
-         parse_time_zone:
-          DO_((::google::protobuf::internal::WireFormatLite::ReadPrimitive<
-                   ::google::protobuf::int32, ::google::protobuf::internal::WireFormatLite::TYPE_INT32>(
-                 input, &time_zone_)));
-          set_has_time_zone();
+            ::google::protobuf::internal::WireFormatLite::WIRETYPE_LENGTH_DELIMITED) {
+         parse_options_json:
+          DO_(::google::protobuf::internal::WireFormatLite::ReadString(
+                input, this->mutable_options_json()));
+          ::google::protobuf::internal::WireFormat::VerifyUTF8String(
+            this->options_json().data(), this->options_json().length(),
+            ::google::protobuf::internal::WireFormat::PARSE);
         } else {
           goto handle_uninterpreted;
         }
-        if (input->ExpectTag(138)) goto parse_options_json;
+        if (input->ExpectTag(130)) goto parse_context;
         break;
       }
 
-      // optional string options_json = 17;
-      case 17: {
+      // optional .exec.bit.control.QueryContextInformation context = 16;
+      case 16: {
         if (::google::protobuf::internal::WireFormatLite::GetTagWireType(tag) ==
             ::google::protobuf::internal::WireFormatLite::WIRETYPE_LENGTH_DELIMITED) {
-         parse_options_json:
-          DO_(::google::protobuf::internal::WireFormatLite::ReadString(
-                input, this->mutable_options_json()));
-          ::google::protobuf::internal::WireFormat::VerifyUTF8String(
-            this->options_json().data(), this->options_json().length(),
-            ::google::protobuf::internal::WireFormat::PARSE);
+         parse_context:
+          DO_(::google::protobuf::internal::WireFormatLite::ReadMessageNoVirtual(
+               input, mutable_context()));
         } else {
           goto handle_uninterpreted;
         }
@@ -1747,29 +1760,25 @@ void PlanFragment::SerializeWithCachedSizes(
     ::google::protobuf::internal::WireFormatLite::WriteInt64(13, this->mem_max(), output);
   }
 
-  // optional int64 query_start_time = 14;
-  if (has_query_start_time()) {
-    ::google::protobuf::internal::WireFormatLite::WriteInt64(14, this->query_start_time(), output);
-  }
-
-  // optional .exec.shared.UserCredentials credentials = 15;
+  // optional .exec.shared.UserCredentials credentials = 14;
   if (has_credentials()) {
     ::google::protobuf::internal::WireFormatLite::WriteMessageMaybeToArray(
-      15, this->credentials(), output);
-  }
-
-  // optional int32 time_zone = 16;
-  if (has_time_zone()) {
-    ::google::protobuf::internal::WireFormatLite::WriteInt32(16, this->time_zone(), output);
+      14, this->credentials(), output);
   }
 
-  // optional string options_json = 17;
+  // optional string options_json = 15;
   if (has_options_json()) {
     ::google::protobuf::internal::WireFormat::VerifyUTF8String(
       this->options_json().data(), this->options_json().length(),
       ::google::protobuf::internal::WireFormat::SERIALIZE);
     ::google::protobuf::internal::WireFormatLite::WriteString(
-      17, this->options_json(), output);
+      15, this->options_json(), output);
+  }
+
+  // optional .exec.bit.control.QueryContextInformation context = 16;
+  if (has_context()) {
+    ::google::protobuf::internal::WireFormatLite::WriteMessageMaybeToArray(
+      16, this->context(), output);
   }
 
   if (!unknown_fields().empty()) {
@@ -1846,31 +1855,28 @@ void PlanFragment::SerializeWithCachedSizes(
     target = ::google::protobuf::internal::WireFormatLite::WriteInt64ToArray(13, this->mem_max(), target);
   }
 
-  // optional int64 query_start_time = 14;
-  if (has_query_start_time()) {
-    target = ::google::protobuf::internal::WireFormatLite::WriteInt64ToArray(14, this->query_start_time(), target);
-  }
-
-  // optional .exec.shared.UserCredentials credentials = 15;
+  // optional .exec.shared.UserCredentials credentials = 14;
   if (has_credentials()) {
     target = ::google::protobuf::internal::WireFormatLite::
       WriteMessageNoVirtualToArray(
-        15, this->credentials(), target);
-  }
-
-  // optional int32 time_zone = 16;
-  if (has_time_zone()) {
-    target = ::google::protobuf::internal::WireFormatLite::WriteInt32ToArray(16, this->time_zone(), target);
+        14, this->credentials(), target);
   }
 
-  // optional string options_json = 17;
+  // optional string options_json = 15;
   if (has_options_json()) {
     ::google::protobuf::internal::WireFormat::VerifyUTF8String(
       this->options_json().data(), this->options_json().length(),
       ::google::protobuf::internal::WireFormat::SERIALIZE);
     target =
       ::google::protobuf::internal::WireFormatLite::WriteStringToArray(
-        17, this->options_json(), target);
+        15, this->options_json(), target);
+  }
+
+  // optional .exec.bit.control.QueryContextInformation context = 16;
+  if (has_context()) {
+    target = ::google::protobuf::internal::WireFormatLite::
+      WriteMessageNoVirtualToArray(
+        16, this->context(), target);
   }
 
   if (!unknown_fields().empty()) {
@@ -1918,6 +1924,11 @@ int PlanFragment::ByteSize() const {
           this->fragment_json());
     }
 
+    // optional bool leaf_fragment = 9;
+    if (has_leaf_fragment()) {
+      total_size += 1 + 1;
+    }
+
     // optional .exec.DrillbitEndpoint assignment = 10;
     if (has_assignment()) {
       total_size += 1 +
@@ -1925,11 +1936,6 @@ int PlanFragment::ByteSize() const {
           this->assignment());
     }
 
-    // optional bool leaf_fragment = 9;
-    if (has_leaf_fragment()) {
-      total_size += 1 + 1;
-    }
-
   }
   if (_has_bits_[8 / 32] & (0xffu << (8 % 32))) {
     // optional .exec.DrillbitEndpoint foreman = 11;
@@ -1953,34 +1959,27 @@ int PlanFragment::ByteSize() const {
           this->mem_max());
     }
 
-    // optional int64 query_start_time = 14;
-    if (has_query_start_time()) {
-      total_size += 1 +
-        ::google::protobuf::internal::WireFormatLite::Int64Size(
-          this->query_start_time());
-    }
-
-    // optional .exec.shared.UserCredentials credentials = 15;
+    // optional .exec.shared.UserCredentials credentials = 14;
     if (has_credentials()) {
       total_size += 1 +
         ::google::protobuf::internal::WireFormatLite::MessageSizeNoVirtual(
           this->credentials());
     }
 
-    // optional int32 time_zone = 16;
-    if (has_time_zone()) {
-      total_size += 2 +
-        ::google::protobuf::internal::WireFormatLite::Int32Size(
-          this->time_zone());
-    }
-
-    // optional string options_json = 17;
+    // optional string options_json = 15;
     if (has_options_json()) {
-      total_size += 2 +
+      total_size += 1 +
         ::google::protobuf::internal::WireFormatLite::StringSize(
           this->options_json());
     }
 
+    // optional .exec.bit.control.QueryContextInformation context = 16;
+    if (has_context()) {
+      total_size += 2 +
+        ::google::protobuf::internal::WireFormatLite::MessageSizeNoVirtual(
+          this->context());
+    }
+
   }
   if (!unknown_fields().empty()) {
     total_size +=
@@ -2026,12 +2025,12 @@ void PlanFragment::MergeFrom(const PlanFragment& from) {
     if (from.has_fragment_json()) {
       set_fragment_json(from.fragment_json());
     }
-    if (from.has_assignment()) {
-      mutable_assignment()->::exec::DrillbitEndpoint::MergeFrom(from.assignment());
-    }
     if (from.has_leaf_fragment()) {
       set_leaf_fragment(from.leaf_fragment());
     }
+    if (from.has_assignment()) {
+      mutable_assignment()->::exec::DrillbitEndpoint::MergeFrom(from.assignment());
+    }
   }
   if (from._has_bits_[8 / 32] & (0xffu << (8 % 32))) {
     if (from.has_foreman()) {
@@ -2043,18 +2042,15 @@ void PlanFragment::MergeFrom(const PlanFragment& from) {
     if (from.has_mem_max()) {
       set_mem_max(from.mem_max());
     }
-    if (from.has_query_start_time()) {
-      set_query_start_time(from.query_start_time());
-    }
     if (from.has_credentials()) {
       mutable_credentials()->::exec::shared::UserCredentials::MergeFrom(from.credentials());
     }
-    if (from.has_time_zone()) {
-      set_time_zone(from.time_zone());
-    }
     if (from.has_options_json()) {
       set_options_json(from.options_json());
     }
+    if (from.has_context()) {
+      mutable_context()->::exec::bit::control::QueryContextInformation::MergeFrom(from.context());
+    }
   }
   mutable_unknown_fields()->MergeFrom(from.unknown_fields());
 }
@@ -2084,15 +2080,14 @@ void PlanFragment::Swap(PlanFragment* other) {
     std::swap(disk_cost_, other->disk_cost_);
     std::swap(memory_cost_, other->memory_cost_);
     std::swap(fragment_json_, other->fragment_json_);
-    std::swap(assignment_, other->assignment_);
     std::swap(leaf_fragment_, other->leaf_fragment_);
+    std::swap(assignment_, other->assignment_);
     std::swap(foreman_, other->foreman_);
     std::swap(mem_initial_, other->mem_initial_);
     std::swap(mem_max_, other->mem_max_);
-    std::swap(query_start_time_, other->query_start_time_);
     std::swap(credentials_, other->credentials_);
-    std::swap(time_zone_, other->time_zone_);
     std::swap(options_json_, other->options_json_);
+    std::swap(context_, other->context_);
     std::swap(_has_bits_[0], other->_has_bits_[0]);
     _unknown_fields_.Swap(&other->_unknown_fields_);
     std::swap(_cached_size_, other->_cached_size_);
@@ -2111,6 +2106,311 @@ void PlanFragment::Swap(PlanFragment* other) {
 // ===================================================================
 
 #ifndef _MSC_VER
+const int QueryContextInformation::kQueryStartTimeFieldNumber;
+const int QueryContextInformation::kTimeZoneFieldNumber;
+const int QueryContextInformation::kDefaultSchemaNameFieldNumber;
+#endif  // !_MSC_VER
+
+QueryContextInformation::QueryContextInformation()
+  : ::google::protobuf::Message() {
+  SharedCtor();
+}
+
+void QueryContextInformation::InitAsDefaultInstance() {
+}
+
+QueryContextInformation::QueryContextInformation(const QueryContextInformation& from)
+  : ::google::protobuf::Message() {
+  SharedCtor();
+  MergeFrom(from);
+}
+
+void QueryContextInformation::SharedCtor() {
+  _cached_size_ = 0;
+  query_start_time_ = GOOGLE_LONGLONG(0);
+  time_zone_ = 0;
+  default_schema_name_ = const_cast< ::std::string*>(&::google::protobuf::internal::kEmptyString);
+  ::memset(_has_bits_, 0, sizeof(_has_bits_));
+}
+
+QueryContextInformation::~QueryContextInformation() {
+  SharedDtor();
+}
+
+void QueryContextInformation::SharedDtor() {
+  if (default_schema_name_ != &::google::protobuf::internal::kEmptyString) {
+    delete default_schema_name_;
+  }
+  if (this != default_instance_) {
+  }
+}
+
+void QueryContextInformation::SetCachedSize(int size) const {
+  GOOGLE_SAFE_CONCURRENT_WRITES_BEGIN();
+  _cached_size_ = size;
+  GOOGLE_SAFE_CONCURRENT_WRITES_END();
+}
+const ::google::protobuf::Descriptor* QueryContextInformation::descriptor() {
+  protobuf_AssignDescriptorsOnce();
+  return QueryContextInformation_descriptor_;
+}
+
+const QueryContextInformation& QueryContextInformation::default_instance() {
+  if (default_instance_ == NULL) protobuf_AddDesc_BitControl_2eproto();
+  return *default_instance_;
+}
+
+QueryContextInformation* QueryContextInformation::default_instance_ = NULL;
+
+QueryContextInformation* QueryContextInformation::New() const {
+  return new QueryContextInformation;
+}
+
+void QueryContextInformation::Clear() {
+  if (_has_bits_[0 / 32] & (0xffu << (0 % 32))) {
+    query_start_time_ = GOOGLE_LONGLONG(0);
+    time_zone_ = 0;
+    if (has_default_schema_name()) {
+      if (default_schema_name_ != &::google::protobuf::internal::kEmptyString) {
+        default_schema_name_->clear();
+      }
+    }
+  }
+  ::memset(_has_bits_, 0, sizeof(_has_bits_));
+  mutable_unknown_fields()->Clear();
+}
+
+bool QueryContextInformation::MergePartialFromCodedStream(
+    ::google::protobuf::io::CodedInputStream* input) {
+#define DO_(EXPRESSION) if (!(EXPRESSION)) return false
+  ::google::protobuf::uint32 tag;
+  while ((tag = input->ReadTag()) != 0) {
+    switch (::google::protobuf::internal::WireFormatLite::GetTagFieldNumber(tag)) {
+      // optional int64 query_start_time = 1;
+      case 1: {
+        if (::google::protobuf::internal::WireFormatLite::GetTagWireType(tag) ==
+            ::google::protobuf::internal::WireFormatLite::WIRETYPE_VARINT) {
+          DO_((::google::protobuf::internal::WireFormatLite::ReadPrimitive<
+                   ::google::protobuf::int64, ::google::protobuf::internal::WireFormatLite::TYPE_INT64>(
+                 input, &query_start_time_)));
+          set_has_query_start_time();
+        } else {
+          goto handle_uninterpreted;
+        }
+        if (input->ExpectTag(16)) goto parse_time_zone;
+        break;
+      }
+
+      // optional int32 time_zone = 2;
+      case 2: {
+        if (::google::protobuf::internal::WireFormatLite::GetTagWireType(tag) ==
+            ::google::protobuf::internal::WireFormatLite::WIRETYPE_VARINT) {
+         parse_time_zone:
+          DO_((::google::protobuf::internal::WireFormatLite::ReadPrimitive<
+                   ::google::protobuf::int32, ::google::protobuf::internal::WireFormatLite::TYPE_INT32>(
+                 input, &time_zone_)));
+          set_has_time_zone();
+        } else {
+          goto handle_uninterpreted;
+        }
+        if (input->ExpectTag(26)) goto parse_default_schema_name;
+        break;
+      }
+
+      // optional string default_schema_name = 3;
+      case 3: {
+        if (::google::protobuf::internal::WireFormatLite::GetTagWireType(tag) ==
+            ::google::protobuf::internal::WireFormatLite::WIRETYPE_LENGTH_DELIMITED) {
+         parse_default_schema_name:
+          DO_(::google::protobuf::internal::WireFormatLite::ReadString(
+                input, this->mutable_default_schema_name()));
+          ::google::protobuf::internal::WireFormat::VerifyUTF8String(
+            this->default_schema_name().data(), this->default_schema_name().length(),
+            ::google::protobuf::internal::WireFormat::PARSE);
+        } else {
+          goto handle_uninterpreted;
+        }
+        if (input->ExpectAtEnd()) return true;
+        break;
+      }
+
+      default: {
+      handle_uninterpreted:
+        if (::google::protobuf::internal::WireFormatLite::GetTagWireType(tag) ==
+            ::google::protobuf::internal::WireFormatLite::WIRETYPE_END_GROUP) {
+          return true;
+        }
+        DO_(::google::protobuf::internal::WireFormat::SkipField(
+              input, tag, mutable_unknown_fields()));
+        break;
+      }
+    }
+  }
+  return true;
+#undef DO_
+}
+
+void QueryContextInformation::SerializeWithCachedSizes(
+    ::google::protobuf::io::CodedOutputStream* output) const {
+  // optional int64 query_start_time = 1;
+  if (has_query_start_time()) {
+    ::google::protobuf::internal::WireFormatLite::WriteInt64(1, this->query_start_time(), output);
+  }
+
+  // optional int32 time_zone = 2;
+  if (has_time_zone()) {
+    ::google::protobuf::internal::WireFormatLite::WriteInt32(2, this->time_zone(), output);
+  }
+
+  // optional string default_schema_name = 3;
+  if (has_default_schema_name()) {
+    ::google::protobuf::internal::WireFormat::VerifyUTF8String(
+      this->default_schema_name().data(), this->default_schema_name().length(),
+      ::google::protobuf::internal::WireFormat::SERIALIZE);
+    ::google::protobuf::internal::WireFormatLite::WriteString(
+      3, this->default_schema_name(), output);
+  }
+
+  if (!unknown_fields().empty()) {
+    ::google::protobuf::internal::WireFormat::SerializeUnknownFields(
+        unknown_fields(), output);
+  }
+}
+
+::google::protobuf::uint8* QueryContextInformation::SerializeWithCachedSizesToArray(
+    ::google::protobuf::uint8* target) const {
+  // optional int64 query_start_time = 1;
+  if (has_query_start_time()) {
+    target = ::google::protobuf::internal::WireFormatLite::WriteInt64ToArray(1, this->query_start_time(), target);
+  }
+
+  // optional int32 time_zone = 2;
+  if (has_time_zone()) {
+    target = ::google::protobuf::internal::WireFormatLite::WriteInt32ToArray(2, this->time_zone(), target);
+  }
+
+  // optional string default_schema_name = 3;
+  if (has_default_schema_name()) {
+    ::google::protobuf::internal::WireFormat::VerifyUTF8String(
+      this->default_schema_name().data(), this->default_schema_name().length(),
+      ::google::protobuf::internal::WireFormat::SERIALIZE);
+    target =
+      ::google::protobuf::internal::WireFormatLite::WriteStringToArray(
+        3, this->default_schema_name(), target);
+  }
+
+  if (!unknown_fields().empty()) {
+    target = ::google::protobuf::internal::WireFormat::SerializeUnknownFieldsToArray(
+        unknown_fields(), target);
+  }
+  return target;
+}
+
+int QueryContextInformation::ByteSize() const {
+  int total_size = 0;
+
+  if (_has_bits_[0 / 32] & (0xffu << (0 % 32))) {
+    // optional int64 query_start_time = 1;
+    if (has_query_start_time()) {
+      total_size += 1 +
+        ::google::protobuf::internal::WireFormatLite::Int64Size(
+          this->query_start_time());
+    }
+
+    // optional int32 time_zone = 2;
+    if (has_time_zone()) {
+      total_size += 1 +
+        ::google::protobuf::internal::WireFormatLite::Int32Size(
+          this->time_zone());
+    }
+
+    // optional string default_schema_name = 3;
+    if (has_default_schema_name()) {
+      total_size += 1 +
+        ::google::protobuf::internal::WireFormatLite::StringSize(
+          this->default_schema_name());
+    }
+
+  }
+  if (!unknown_fields().empty()) {
+    total_size +=
+      ::google::protobuf::internal::WireFormat::ComputeUnknownFieldsSize(
+        unknown_fields());
+  }
+  GOOGLE_SAFE_CONCURRENT_WRITES_BEGIN();
+  _cached_size_ = total_size;
+  GOOGLE_SAFE_CONCURRENT_WRITES_END();
+  return total_size;
+}
+
+void QueryContextInformation::MergeFrom(const ::google::protobuf::Message& from) {
+  GOOGLE_CHECK_NE(&from, this);
+  const QueryContextInformation* source =
+    ::google::protobuf::internal::dynamic_cast_if_available<const QueryContextInformation*>(
+      &from);
+  if (source == NULL) {
+    ::google::protobuf::internal::ReflectionOps::Merge(from, this);
+  } else {
+    MergeFrom(*source);
+  }
+}
+
+void QueryContextInformation::MergeFrom(const QueryContextInformation& from) {
+  GOOGLE_CHECK_NE(&from, this);
+  if (from._has_bits_[0 / 32] & (0xffu << (0 % 32))) {
+    if (from.has_query_start_time()) {
+      set_query_start_time(from.query_start_time());
+    }
+    if (from.has_time_zone()) {
+      set_time_zone(from.time_zone());
+    }
+    if (from.has_default_schema_name()) {
+      set_default_schema_name(from.default_schema_name());
+    }
+  }
+  mutable_unknown_fields()->MergeFrom(from.unknown_fields());
+}
+
+void QueryContextInformation::CopyFrom(const ::google::protobuf::Message& from) {
+  if (&from == this) return;
+  Clear();
+  MergeFrom(from);
+}
+
+void QueryContextInformation::CopyFrom(const QueryContextInformation& from) {
+  if (&from == this) return;
+  Clear();
+  MergeFrom(from);
+}
+
+bool QueryContextInformation::IsInitialized() const {
+
+  return true;
+}
+
+void QueryContextInformation::Swap(QueryContextInformation* other) {
+  if (other != this) {
+    std::swap(query_start_time_, other->query_start_time_);
+    std::swap(time_zone_, other->time_zone_);
+    std::swap(default_schema_name_, other->default_schema_name_);
+    std::swap(_has_bits_[0], other->_has_bits_[0]);
+    _unknown_fields_.Swap(&other->_unknown_fields_);
+    std::swap(_cached_size_, other->_cached_size_);
+  }
+}
+
+::google::protobuf::Metadata QueryContextInformation::GetMetadata() const {
+  protobuf_AssignDescriptorsOnce();
+  ::google::protobuf::Metadata metadata;
+  metadata.descriptor = QueryContextInformation_descriptor_;
+  metadata.reflection = QueryContextInformation_reflection_;
+  return metadata;
+}
+
+
+// ===================================================================
+
+#ifndef _MSC_VER
 const int WorkQueueStatus::kEndpointFieldNumber;
 const int WorkQueueStatus::kQueueLengthFieldNumber;
 const int WorkQueueStatus::kReportTimeFieldNumber;

http://git-wip-us.apache.org/repos/asf/drill/blob/ffbb9c7a/contrib/native/client/src/protobuf/BitControl.pb.h
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/protobuf/BitControl.pb.h b/contrib/native/client/src/protobuf/BitControl.pb.h
index 865d377..7a34cc4 100644
--- a/contrib/native/client/src/protobuf/BitControl.pb.h
+++ b/contrib/native/client/src/protobuf/BitControl.pb.h
@@ -44,6 +44,7 @@ class BitStatus;
 class FragmentStatus;
 class InitializeFragments;
 class PlanFragment;
+class QueryContextInformation;
 class WorkQueueStatus;
 class FinishedReceiver;
 
@@ -51,13 +52,14 @@ enum RpcType {
   HANDSHAKE = 0,
   ACK = 1,
   GOODBYE = 2,
-  REQ_INIATILIZE_FRAGMENTS = 3,
+  REQ_INITIALIZE_FRAGMENTS = 3,
   REQ_CANCEL_FRAGMENT = 6,
   REQ_RECEIVER_FINISHED = 7,
   REQ_FRAGMENT_STATUS = 8,
   REQ_BIT_STATUS = 9,
   REQ_QUERY_STATUS = 10,
   REQ_QUERY_CANCEL = 15,
+  REQ_UNPAUSE_FRAGMENT = 16,
   RESP_FRAGMENT_HANDLE = 11,
   RESP_FRAGMENT_STATUS = 12,
   RESP_BIT_STATUS = 13,
@@ -65,7 +67,7 @@ enum RpcType {
 };
 bool RpcType_IsValid(int value);
 const RpcType RpcType_MIN = HANDSHAKE;
-const RpcType RpcType_MAX = REQ_QUERY_CANCEL;
+const RpcType RpcType_MAX = REQ_UNPAUSE_FRAGMENT;
 const int RpcType_ARRAYSIZE = RpcType_MAX + 1;
 
 const ::google::protobuf::EnumDescriptor* RpcType_descriptor();
@@ -553,6 +555,13 @@ class PlanFragment : public ::google::protobuf::Message {
   inline ::std::string* release_fragment_json();
   inline void set_allocated_fragment_json(::std::string* fragment_json);
 
+  // optional bool leaf_fragment = 9;
+  inline bool has_leaf_fragment() const;
+  inline void clear_leaf_fragment();
+  static const int kLeafFragmentFieldNumber = 9;
+  inline bool leaf_fragment() const;
+  inline void set_leaf_fragment(bool value);
+
   // optional .exec.DrillbitEndpoint assignment = 10;
   inline bool has_assignment() const;
   inline void clear_assignment();
@@ -562,13 +571,6 @@ class PlanFragment : public ::google::protobuf::Message {
   inline ::exec::DrillbitEndpoint* release_assignment();
   inline void set_allocated_assignment(::exec::DrillbitEndpoint* assignment);
 
-  // optional bool leaf_fragment = 9;
-  inline bool has_leaf_fragment() const;
-  inline void clear_leaf_fragment();
-  static const int kLeafFragmentFieldNumber = 9;
-  inline bool leaf_fragment() const;
-  inline void set_leaf_fragment(bool value);
-
   // optional .exec.DrillbitEndpoint foreman = 11;
   inline bool has_foreman() const;
   inline void clear_foreman();
@@ -592,33 +594,19 @@ class PlanFragment : public ::google::protobuf::Message {
   inline ::google::protobuf::int64 mem_max() const;
   inline void set_mem_max(::google::protobuf::int64 value);
 
-  // optional int64 query_start_time = 14;
-  inline bool has_query_start_time() const;
-  inline void clear_query_start_time();
-  static const int kQueryStartTimeFieldNumber = 14;
-  inline ::google::protobuf::int64 query_start_time() const;
-  inline void set_query_start_time(::google::protobuf::int64 value);
-
-  // optional .exec.shared.UserCredentials credentials = 15;
+  // optional .exec.shared.UserCredentials credentials = 14;
   inline bool has_credentials() const;
   inline void clear_credentials();
-  static const int kCredentialsFieldNumber = 15;
+  static const int kCredentialsFieldNumber = 14;
   inline const ::exec::shared::UserCredentials& credentials() const;
   inline ::exec::shared::UserCredentials* mutable_credentials();
   inline ::exec::shared::UserCredentials* release_credentials();
   inline void set_allocated_credentials(::exec::shared::UserCredentials* credentials);
 
-  // optional int32 time_zone = 16;
-  inline bool has_time_zone() const;
-  inline void clear_time_zone();
-  static const int kTimeZoneFieldNumber = 16;
-  inline ::google::protobuf::int32 time_zone() const;
-  inline void set_time_zone(::google::protobuf::int32 value);
-
-  // optional string options_json = 17;
+  // optional string options_json = 15;
   inline bool has_options_json() const;
   inline void clear_options_json();
-  static const int kOptionsJsonFieldNumber = 17;
+  static const int kOptionsJsonFieldNumber = 15;
   inline const ::std::string& options_json() const;
   inline void set_options_json(const ::std::string& value);
   inline void set_options_json(const char* value);
@@ -627,6 +615,15 @@ class PlanFragment : public ::google::protobuf::Message {
   inline ::std::string* release_options_json();
   inline void set_allocated_options_json(::std::string* options_json);
 
+  // optional .exec.bit.control.QueryContextInformation context = 16;
+  inline bool has_context() const;
+  inline void clear_context();
+  static const int kContextFieldNumber = 16;
+  inline const ::exec::bit::control::QueryContextInformation& context() const;
+  inline ::exec::bit::control::QueryContextInformation* mutable_context();
+  inline ::exec::bit::control::QueryContextInformation* release_context();
+  inline void set_allocated_context(::exec::bit::control::QueryContextInformation* context);
+
   // @@protoc_insertion_point(class_scope:exec.bit.control.PlanFragment)
  private:
   inline void set_has_handle();
@@ -641,24 +638,22 @@ class PlanFragment : public ::google::protobuf::Message {
   inline void clear_has_memory_cost();
   inline void set_has_fragment_json();
   inline void clear_has_fragment_json();
-  inline void set_has_assignment();
-  inline void clear_has_assignment();
   inline void set_has_leaf_fragment();
   inline void clear_has_leaf_fragment();
+  inline void set_has_assignment();
+  inline void clear_has_assignment();
   inline void set_has_foreman();
   inline void clear_has_foreman();
   inline void set_has_mem_initial();
   inline void clear_has_mem_initial();
   inline void set_has_mem_max();
   inline void clear_has_mem_max();
-  inline void set_has_query_start_time();
-  inline void clear_has_query_start_time();
   inline void set_has_credentials();
   inline void clear_has_credentials();
-  inline void set_has_time_zone();
-  inline void clear_has_time_zone();
   inline void set_has_options_json();
   inline void clear_has_options_json();
+  inline void set_has_context();
+  inline void clear_has_context();
 
   ::google::protobuf::UnknownFieldSet _unknown_fields_;
 
@@ -671,15 +666,14 @@ class PlanFragment : public ::google::protobuf::Message {
   ::exec::DrillbitEndpoint* assignment_;
   ::exec::DrillbitEndpoint* foreman_;
   ::google::protobuf::int64 mem_initial_;
-  bool leaf_fragment_;
-  ::google::protobuf::int32 time_zone_;
   ::google::protobuf::int64 mem_max_;
-  ::google::protobuf::int64 query_start_time_;
   ::exec::shared::UserCredentials* credentials_;
   ::std::string* options_json_;
+  ::exec::bit::control::QueryContextInformation* context_;
+  bool leaf_fragment_;
 
   mutable int _cached_size_;
-  ::google::protobuf::uint32 _has_bits_[(15 + 31) / 32];
+  ::google::protobuf::uint32 _has_bits_[(14 + 31) / 32];
 
   friend void  protobuf_AddDesc_BitControl_2eproto();
   friend void protobuf_AssignDesc_BitControl_2eproto();
@@ -690,6 +684,113 @@ class PlanFragment : public ::google::protobuf::Message {
 };
 // -------------------------------------------------------------------
 
+class QueryContextInformation : public ::google::protobuf::Message {
+ public:
+  QueryContextInformation();
+  virtual ~QueryContextInformation();
+
+  QueryContextInformation(const QueryContextInformation& from);
+
+  inline QueryContextInformation& operator=(const QueryContextInformation& from) {
+    CopyFrom(from);
+    return *this;
+  }
+
+  inline const ::google::protobuf::UnknownFieldSet& unknown_fields() const {
+    return _unknown_fields_;
+  }
+
+  inline ::google::protobuf::UnknownFieldSet* mutable_unknown_fields() {
+    return &_unknown_fields_;
+  }
+
+  static const ::google::protobuf::Descriptor* descriptor();
+  static const QueryContextInformation& default_instance();
+
+  void Swap(QueryContextInformation* other);
+
+  // implements Message ----------------------------------------------
+
+  QueryContextInformation* New() const;
+  void CopyFrom(const ::google::protobuf::Message& from);
+  void MergeFrom(const ::google::protobuf::Message& from);
+  void CopyFrom(const QueryContextInformation& from);
+  void MergeFrom(const QueryContextInformation& from);
+  void Clear();
+  bool IsInitialized() const;
+
+  int ByteSize() const;
+  bool MergePartialFromCodedStream(
+      ::google::protobuf::io::CodedInputStream* input);
+  void SerializeWithCachedSizes(
+      ::google::protobuf::io::CodedOutputStream* output) const;
+  ::google::protobuf::uint8* SerializeWithCachedSizesToArray(::google::protobuf::uint8* output) const;
+  int GetCachedSize() const { return _cached_size_; }
+  private:
+  void SharedCtor();
+  void SharedDtor();
+  void SetCachedSize(int size) const;
+  public:
+
+  ::google::protobuf::Metadata GetMetadata() const;
+
+  // nested types ----------------------------------------------------
+
+  // accessors -------------------------------------------------------
+
+  // optional int64 query_start_time = 1;
+  inline bool has_query_start_time() const;
+  inline void clear_query_start_time();
+  static const int kQueryStartTimeFieldNumber = 1;
+  inline ::google::protobuf::int64 query_start_time() const;
+  inline void set_query_start_time(::google::protobuf::int64 value);
+
+  // optional int32 time_zone = 2;
+  inline bool has_time_zone() const;
+  inline void clear_time_zone();
+  static const int kTimeZoneFieldNumber = 2;
+  inline ::google::protobuf::int32 time_zone() const;
+  inline void set_time_zone(::google::protobuf::int32 value);
+
+  // optional string default_schema_name = 3;
+  inline bool has_default_schema_name() const;
+  inline void clear_default_schema_name();
+  static const int kDefaultSchemaNameFieldNumber = 3;
+  inline const ::std::string& default_schema_name() const;
+  inline void set_default_schema_name(const ::std::string& value);
+  inline void set_default_schema_name(const char* value);
+  inline void set_default_schema_name(const char* value, size_t size);
+  inline ::std::string* mutable_default_schema_name();
+  inline ::std::string* release_default_schema_name();
+  inline void set_allocated_default_schema_name(::std::string* default_schema_name);
+
+  // @@protoc_insertion_point(class_scope:exec.bit.control.QueryContextInformation)
+ private:
+  inline void set_has_query_start_time();
+  inline void clear_has_query_start_time();
+  inline void set_has_time_zone();
+  inline void clear_has_time_zone();
+  inline void set_has_default_schema_name();
+  inline void clear_has_default_schema_name();
+
+  ::google::protobuf::UnknownFieldSet _unknown_fields_;
+
+  ::google::protobuf::int64 query_start_time_;
+  ::std::string* default_schema_name_;
+  ::google::protobuf::int32 time_zone_;
+
+  mutable int _cached_size_;
+  ::google::protobuf::uint32 _has_bits_[(3 + 31) / 32];
+
+  friend void  protobuf_AddDesc_BitControl_2eproto();
+  friend void protobuf_AssignDesc_BitControl_2eproto();
+  friend void protobuf_ShutdownFile_BitControl_2eproto();
+
+  void InitAsDefaultInstance();
+  static QueryContextInformation* default_instance_;
+};
+// -------------------------------------------------------------------
+
 class WorkQueueStatus : public ::google::protobuf::Message {
  public:
   WorkQueueStatus();
@@ -1316,15 +1417,37 @@ inline void PlanFragment::set_allocated_fragment_json(::std::string* fragment_js
   }
 }
 
+// optional bool leaf_fragment = 9;
+inline bool PlanFragment::has_leaf_fragment() const {
+  return (_has_bits_[0] & 0x00000040u) != 0;
+}
+inline void PlanFragment::set_has_leaf_fragment() {
+  _has_bits_[0] |= 0x00000040u;
+}
+inline void PlanFragment::clear_has_leaf_fragment() {
+  _has_bits_[0] &= ~0x00000040u;
+}
+inline void PlanFragment::clear_leaf_fragment() {
+  leaf_fragment_ = false;
+  clear_has_leaf_fragment();
+}
+inline bool PlanFragment::leaf_fragment() const {
+  return leaf_fragment_;
+}
+inline void PlanFragment::set_leaf_fragment(bool value) {
+  set_has_leaf_fragment();
+  leaf_fragment_ = value;
+}
+
 // optional .exec.DrillbitEndpoint assignment = 10;
 inline bool PlanFragment::has_assignment() const {
-  return (_has_bits_[0] & 0x00000040u) != 0;
+  return (_has_bits_[0] & 0x00000080u) != 0;
 }
 inline void PlanFragment::set_has_assignment() {
-  _has_bits_[0] |= 0x00000040u;
+  _has_bits_[0] |= 0x00000080u;
 }
 inline void PlanFragment::clear_has_assignment() {
-  _has_bits_[0] &= ~0x00000040u;
+  _has_bits_[0] &= ~0x00000080u;
 }
 inline void PlanFragment::clear_assignment() {
   if (assignment_ != NULL) assignment_->::exec::DrillbitEndpoint::Clear();
@@ -1354,28 +1477,6 @@ inline void PlanFragment::set_allocated_assignment(::exec::DrillbitEndpoint* ass
   }
 }
 
-// optional bool leaf_fragment = 9;
-inline bool PlanFragment::has_leaf_fragment() const {
-  return (_has_bits_[0] & 0x00000080u) != 0;
-}
-inline void PlanFragment::set_has_leaf_fragment() {
-  _has_bits_[0] |= 0x00000080u;
-}
-inline void PlanFragment::clear_has_leaf_fragment() {
-  _has_bits_[0] &= ~0x00000080u;
-}
-inline void PlanFragment::clear_leaf_fragment() {
-  leaf_fragment_ = false;
-  clear_has_leaf_fragment();
-}
-inline bool PlanFragment::leaf_fragment() const {
-  return leaf_fragment_;
-}
-inline void PlanFragment::set_leaf_fragment(bool value) {
-  set_has_leaf_fragment();
-  leaf_fragment_ = value;
-}
-
 // optional .exec.DrillbitEndpoint foreman = 11;
 inline bool PlanFragment::has_foreman() const {
   return (_has_bits_[0] & 0x00000100u) != 0;
@@ -1458,37 +1559,15 @@ inline void PlanFragment::set_mem_max(::google::protobuf::int64 value) {
   mem_max_ = value;
 }
 
-// optional int64 query_start_time = 14;
-inline bool PlanFragment::has_query_start_time() const {
-  return (_has_bits_[0] & 0x00000800u) != 0;
-}
-inline void PlanFragment::set_has_query_start_time() {
-  _has_bits_[0] |= 0x00000800u;
-}
-inline void PlanFragment::clear_has_query_start_time() {
-  _has_bits_[0] &= ~0x00000800u;
-}
-inline void PlanFragment::clear_query_start_time() {
-  query_start_time_ = GOOGLE_LONGLONG(0);
-  clear_has_query_start_time();
-}
-inline ::google::protobuf::int64 PlanFragment::query_start_time() const {
-  return query_start_time_;
-}
-inline void PlanFragment::set_query_start_time(::google::protobuf::int64 value) {
-  set_has_query_start_time();
-  query_start_time_ = value;
-}
-
-// optional .exec.shared.UserCredentials credentials = 15;
+// optional .exec.shared.UserCredentials credentials = 14;
 inline bool PlanFragment::has_credentials() const {
-  return (_has_bits_[0] & 0x00001000u) != 0;
+  return (_has_bits_[0] & 0x00000800u) != 0;
 }
 inline void PlanFragment::set_has_credentials() {
-  _has_bits_[0] |= 0x00001000u;
+  _has_bits_[0] |= 0x00000800u;
 }
 inline void PlanFragment::clear_has_credentials() {
-  _has_bits_[0] &= ~0x00001000u;
+  _has_bits_[0] &= ~0x00000800u;
 }
 inline void PlanFragment::clear_credentials() {
   if (credentials_ != NULL) credentials_->::exec::shared::UserCredentials::Clear();
@@ -1518,37 +1597,15 @@ inline void PlanFragment::set_allocated_credentials(::exec::shared::UserCredenti
   }
 }
 
-// optional int32 time_zone = 16;
-inline bool PlanFragment::has_time_zone() const {
-  return (_has_bits_[0] & 0x00002000u) != 0;
-}
-inline void PlanFragment::set_has_time_zone() {
-  _has_bits_[0] |= 0x00002000u;
-}
-inline void PlanFragment::clear_has_time_zone() {
-  _has_bits_[0] &= ~0x00002000u;
-}
-inline void PlanFragment::clear_time_zone() {
-  time_zone_ = 0;
-  clear_has_time_zone();
-}
-inline ::google::protobuf::int32 PlanFragment::time_zone() const {
-  return time_zone_;
-}
-inline void PlanFragment::set_time_zone(::google::protobuf::int32 value) {
-  set_has_time_zone();
-  time_zone_ = value;
-}
-
-// optional string options_json = 17;
+// optional string options_json = 15;
 inline bool PlanFragment::has_options_json() const {
-  return (_has_bits_[0] & 0x00004000u) != 0;
+  return (_has_bits_[0] & 0x00001000u) != 0;
 }
 inline void PlanFragment::set_has_options_json() {
-  _has_bits_[0] |= 0x00004000u;
+  _has_bits_[0] |= 0x00001000u;
 }
 inline void PlanFragment::clear_has_options_json() {
-  _has_bits_[0] &= ~0x00004000u;
+  _has_bits_[0] &= ~0x00001000u;
 }
 inline void PlanFragment::clear_options_json() {
   if (options_json_ != &::google::protobuf::internal::kEmptyString) {
@@ -1610,6 +1667,162 @@ inline void PlanFragment::set_allocated_options_json(::std::string* options_json
   }
 }
 
+// optional .exec.bit.control.QueryContextInformation context = 16;
+inline bool PlanFragment::has_context() const {
+  return (_has_bits_[0] & 0x00002000u) != 0;
+}
+inline void PlanFragment::set_has_context() {
+  _has_bits_[0] |= 0x00002000u;
+}
+inline void PlanFragment::clear_has_context() {
+  _has_bits_[0] &= ~0x00002000u;
+}
+inline void PlanFragment::clear_context() {
+  if (context_ != NULL) context_->::exec::bit::control::QueryContextInformation::Clear();
+  clear_has_context();
+}
+inline const ::exec::bit::control::QueryContextInformation& PlanFragment::context() const {
+  return context_ != NULL ? *context_ : *default_instance_->context_;
+}
+inline ::exec::bit::control::QueryContextInformation* PlanFragment::mutable_context() {
+  set_has_context();
+  if (context_ == NULL) context_ = new ::exec::bit::control::QueryContextInformation;
+  return context_;
+}
+inline ::exec::bit::control::QueryContextInformation* PlanFragment::release_context() {
+  clear_has_context();
+  ::exec::bit::control::QueryContextInformation* temp = context_;
+  context_ = NULL;
+  return temp;
+}
+inline void PlanFragment::set_allocated_context(::exec::bit::control::QueryContextInformation* context) {
+  delete context_;
+  context_ = context;
+  if (context) {
+    set_has_context();
+  } else {
+    clear_has_context();
+  }
+}
+
+// -------------------------------------------------------------------
+
+// QueryContextInformation
+
+// optional int64 query_start_time = 1;
+inline bool QueryContextInformation::has_query_start_time() const {
+  return (_has_bits_[0] & 0x00000001u) != 0;
+}
+inline void QueryContextInformation::set_has_query_start_time() {
+  _has_bits_[0] |= 0x00000001u;
+}
+inline void QueryContextInformation::clear_has_query_start_time() {
+  _has_bits_[0] &= ~0x00000001u;
+}
+inline void QueryContextInformation::clear_query_start_time() {
+  query_start_time_ = GOOGLE_LONGLONG(0);
+  clear_has_query_start_time();
+}
+inline ::google::protobuf::int64 QueryContextInformation::query_start_time() const {
+  return query_start_time_;
+}
+inline void QueryContextInformation::set_query_start_time(::google::protobuf::int64 value) {
+  set_has_query_start_time();
+  query_start_time_ = value;
+}
+
+// optional int32 time_zone = 2;
+inline bool QueryContextInformation::has_time_zone() const {
+  return (_has_bits_[0] & 0x00000002u) != 0;
+}
+inline void QueryContextInformation::set_has_time_zone() {
+  _has_bits_[0] |= 0x00000002u;
+}
+inline void QueryContextInformation::clear_has_time_zone() {
+  _has_bits_[0] &= ~0x00000002u;
+}
+inline void QueryContextInformation::clear_time_zone() {
+  time_zone_ = 0;
+  clear_has_time_zone();
+}
+inline ::google::protobuf::int32 QueryContextInformation::time_zone() const {
+  return time_zone_;
+}
+inline void QueryContextInformation::set_time_zone(::google::protobuf::int32 value) {
+  set_has_time_zone();
+  time_zone_ = value;
+}
+
+// optional string default_schema_name = 3;
+inline bool QueryContextInformation::has_default_schema_name() const {
+  return (_has_bits_[0] & 0x00000004u) != 0;
+}
+inline void QueryContextInformation::set_has_default_schema_name() {
+  _has_bits_[0] |= 0x00000004u;
+}
+inline void QueryContextInformation::clear_has_default_schema_name() {
+  _has_bits_[0] &= ~0x00000004u;
+}
+inline void QueryContextInformation::clear_default_schema_name() {
+  if (default_schema_name_ != &::google::protobuf::internal::kEmptyString) {
+    default_schema_name_->clear();
+  }
+  clear_has_default_schema_name();
+}
+inline const ::std::string& QueryContextInformation::default_schema_name() const {
+  return *default_schema_name_;
+}
+inline void QueryContextInformation::set_default_schema_name(const ::std::string& value) {
+  set_has_default_schema_name();
+  if (default_schema_name_ == &::google::protobuf::internal::kEmptyString) {
+    default_schema_name_ = new ::std::string;
+  }
+  default_schema_name_->assign(value);
+}
+inline void QueryContextInformation::set_default_schema_name(const char* value) {
+  set_has_default_schema_name();
+  if (default_schema_name_ == &::google::protobuf::internal::kEmptyString) {
+    default_schema_name_ = new ::std::string;
+  }
+  default_schema_name_->assign(value);
+}
+inline void QueryContextInformation::set_default_schema_name(const char* value, size_t size) {
+  set_has_default_schema_name();
+  if (default_schema_name_ == &::google::protobuf::internal::kEmptyString) {
+    default_schema_name_ = new ::std::string;
+  }
+  default_schema_name_->assign(reinterpret_cast<const char*>(value), size);
+}
+inline ::std::string* QueryContextInformation::mutable_default_schema_name() {
+  set_has_default_schema_name();
+  if (default_schema_name_ == &::google::protobuf::internal::kEmptyString) {
+    default_schema_name_ = new ::std::string;
+  }
+  return default_schema_name_;
+}
+inline ::std::string* QueryContextInformation::release_default_schema_name() {
+  clear_has_default_schema_name();
+  if (default_schema_name_ == &::google::protobuf::internal::kEmptyString) {
+    return NULL;
+  } else {
+    ::std::string* temp = default_schema_name_;
+    default_schema_name_ = const_cast< ::std::string*>(&::google::protobuf::internal::kEmptyString);
+    return temp;
+  }
+}
+inline void QueryContextInformation::set_allocated_default_schema_name(::std::string* default_schema_name) {
+  if (default_schema_name_ != &::google::protobuf::internal::kEmptyString) {
+    delete default_schema_name_;
+  }
+  if (default_schema_name) {
+    set_has_default_schema_name();
+    default_schema_name_ = default_schema_name;
+  } else {
+    clear_has_default_schema_name();
+    default_schema_name_ = const_cast< ::std::string*>(&::google::protobuf::internal::kEmptyString);
+  }
+}
+
 // -------------------------------------------------------------------
 
 // WorkQueueStatus

http://git-wip-us.apache.org/repos/asf/drill/blob/ffbb9c7a/contrib/native/client/src/protobuf/GeneralRPC.pb.cc
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/protobuf/GeneralRPC.pb.cc b/contrib/native/client/src/protobuf/GeneralRPC.pb.cc
index 0ebb3a9..b02ef0e 100644
--- a/contrib/native/client/src/protobuf/GeneralRPC.pb.cc
+++ b/contrib/native/client/src/protobuf/GeneralRPC.pb.cc
@@ -136,10 +136,10 @@ void protobuf_AddDesc_GeneralRPC_2eproto() {
     "rdination_id\030\002 \001(\005\022\020\n\010rpc_type\030\003 \001(\005\"b\n\022"
     "CompleteRpcMessage\022#\n\006header\030\001 \001(\0132\023.exe"
     "c.rpc.RpcHeader\022\025\n\rprotobuf_body\030\002 \001(\014\022\020"
-    "\n\010raw_body\030\003 \001(\014*:\n\007RpcMode\022\013\n\007REQUEST\020\000"
-    "\022\014\n\010RESPONSE\020\001\022\024\n\020RESPONSE_FAILURE\020\002B1\n\033"
-    "org.apache.drill.exec.protoB\020GeneralRPCP"
-    "rotosH\001", 367);
+    "\n\010raw_body\030\003 \001(\014*N\n\007RpcMode\022\013\n\007REQUEST\020\000"
+    "\022\014\n\010RESPONSE\020\001\022\024\n\020RESPONSE_FAILURE\020\002\022\010\n\004"
+    "PING\020\003\022\010\n\004PONG\020\004B1\n\033org.apache.drill.exe"
+    "c.protoB\020GeneralRPCProtosH\001", 387);
   ::google::protobuf::MessageFactory::InternalRegisterGeneratedFile(
     "GeneralRPC.proto", &protobuf_RegisterTypes);
   Ack::default_instance_ = new Ack();
@@ -166,6 +166,8 @@ bool RpcMode_IsValid(int value) {
     case 0:
     case 1:
     case 2:
+    case 3:
+    case 4:
       return true;
     default:
       return false;

http://git-wip-us.apache.org/repos/asf/drill/blob/ffbb9c7a/contrib/native/client/src/protobuf/GeneralRPC.pb.h
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/protobuf/GeneralRPC.pb.h b/contrib/native/client/src/protobuf/GeneralRPC.pb.h
index 49f4bf7..8ebef9a 100644
--- a/contrib/native/client/src/protobuf/GeneralRPC.pb.h
+++ b/contrib/native/client/src/protobuf/GeneralRPC.pb.h
@@ -43,11 +43,13 @@ class CompleteRpcMessage;
 enum RpcMode {
   REQUEST = 0,
   RESPONSE = 1,
-  RESPONSE_FAILURE = 2
+  RESPONSE_FAILURE = 2,
+  PING = 3,
+  PONG = 4
 };
 bool RpcMode_IsValid(int value);
 const RpcMode RpcMode_MIN = REQUEST;
-const RpcMode RpcMode_MAX = RESPONSE_FAILURE;
+const RpcMode RpcMode_MAX = PONG;
 const int RpcMode_ARRAYSIZE = RpcMode_MAX + 1;
 
 const ::google::protobuf::EnumDescriptor* RpcMode_descriptor();