You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pa...@apache.org on 2016/11/01 20:29:58 UTC

[13/15] drill git commit: DRILL-4420: C++ API for metadata access and prepared statements

http://git-wip-us.apache.org/repos/asf/drill/blob/166c4ce7/contrib/native/client/src/clientlib/drillClientImpl.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/drillClientImpl.cpp b/contrib/native/client/src/clientlib/drillClientImpl.cpp
index b5d5a31..7ecf910 100644
--- a/contrib/native/client/src/clientlib/drillClientImpl.cpp
+++ b/contrib/native/client/src/clientlib/drillClientImpl.cpp
@@ -19,32 +19,30 @@
 
 #include "drill/common.hpp"
 #include <queue>
-#include <string.h>
+#include <string>
 #include <boost/asio.hpp>
+#include <boost/assign.hpp>
 #include <boost/bind.hpp>
 #include <boost/date_time/posix_time/posix_time.hpp>
 #include <boost/date_time/posix_time/posix_time_duration.hpp>
+#include <boost/functional/factory.hpp>
 #include <boost/lexical_cast.hpp>
 #include <boost/thread.hpp>
-#ifdef _WIN32
-#include <zookeeper.h>
-#else
-#include <zookeeper/zookeeper.h>
-#endif
-#include <boost/assign.hpp>
+
 
 #include "drill/drillClient.hpp"
+#include "drill/fieldmeta.hpp"
 #include "drill/recordBatch.hpp"
 #include "drillClientImpl.hpp"
+#include "collectionsImpl.hpp"
 #include "errmsgs.hpp"
 #include "logger.hpp"
-#include "rpcEncoder.hpp"
-#include "rpcDecoder.hpp"
+#include "metadata.hpp"
 #include "rpcMessage.hpp"
 #include "utils.hpp"
-
 #include "GeneralRPC.pb.h"
 #include "UserBitShared.pb.h"
+#include "zookeeperClient.hpp"
 
 namespace Drill{
 
@@ -56,70 +54,57 @@ static std::map<exec::shared::QueryResult_QueryState, status_t> QUERYSTATE_TO_ST
     (exec::shared::QueryResult_QueryState_FAILED, QRY_FAILED)
     ;
 
-RpcEncoder DrillClientImpl::s_encoder;
-RpcDecoder DrillClientImpl::s_decoder;
-
-std::string debugPrintQid(const exec::shared::QueryId& qid){
+static std::string debugPrintQid(const exec::shared::QueryId& qid){
     return std::string("[")+boost::lexical_cast<std::string>(qid.part1()) +std::string(":") + boost::lexical_cast<std::string>(qid.part2())+std::string("] ");
 }
 
-void setSocketTimeout(boost::asio::ip::tcp::socket& socket, int32_t timeout){
-#if defined _WIN32
-    int32_t timeoutMsecs=timeout*1000;
-    setsockopt(socket.native(), SOL_SOCKET, SO_RCVTIMEO, (const char*)&timeoutMsecs, sizeof(timeoutMsecs));
-    setsockopt(socket.native(), SOL_SOCKET, SO_SNDTIMEO, (const char*)&timeoutMsecs, sizeof(timeoutMsecs));
-#else
-    struct timeval tv;
-    tv.tv_sec  = timeout;
-    tv.tv_usec = 0;
-    int e=0;
-    e=setsockopt(socket.native(), SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
-    e=setsockopt(socket.native(), SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv));
-#endif
-}
-
 connectionStatus_t DrillClientImpl::connect(const char* connStr){
     std::string pathToDrill, protocol, hostPortStr;
     std::string host;
     std::string port;
-    if(!this->m_bIsConnected){
-        m_connectStr=connStr;
-        Utils::parseConnectStr(connStr, pathToDrill, protocol, hostPortStr);
-        if(!strcmp(protocol.c_str(), "zk")){
-            ZookeeperImpl zook;
-            std::vector<std::string> drillbits;
-            int err = zook.getAllDrillbits(hostPortStr.c_str(), pathToDrill.c_str(), drillbits);
+
+    if (this->m_bIsConnected) {
+        if(std::strcmp(connStr, m_connectStr.c_str())){ // trying to connect to a different address is not allowed if already connected
+            return handleConnError(CONN_ALREADYCONNECTED, getMessage(ERR_CONN_ALREADYCONN));
+        }
+        return CONN_SUCCESS;
+    }
+
+    m_connectStr=connStr;
+    Utils::parseConnectStr(connStr, pathToDrill, protocol, hostPortStr);
+    if(protocol == "zk"){
+        ZookeeperClient zook(pathToDrill);
+        std::vector<std::string> drillbits;
+        int err = zook.getAllDrillbits(hostPortStr, drillbits);
+        if(!err){
+            Utils::shuffle(drillbits);
+            exec::DrillbitEndpoint endpoint;
+            err = zook.getEndPoint(drillbits[drillbits.size() -1], endpoint);// get the last one in the list
             if(!err){
-                Utils::shuffle(drillbits);
-                exec::DrillbitEndpoint endpoint;
-                err = zook.getEndPoint(drillbits, drillbits.size()-1, endpoint);// get the last one in the list
-                if(!err){
-                    host=boost::lexical_cast<std::string>(endpoint.address());
-                    port=boost::lexical_cast<std::string>(endpoint.user_port());
-                }
+                host=boost::lexical_cast<std::string>(endpoint.address());
+                port=boost::lexical_cast<std::string>(endpoint.user_port());
             }
-            if(err){
-                return handleConnError(CONN_ZOOKEEPER_ERROR, getMessage(ERR_CONN_ZOOKEEPER, zook.getError().c_str()));
-            }
-            zook.close();
-            m_bIsDirectConnection=true;  
-        }else if(!strcmp(protocol.c_str(), "local")){
-            boost::lock_guard<boost::mutex> lock(m_dcMutex);//strtok is not reentrant
-            char tempStr[MAX_CONNECT_STR+1];
-            strncpy(tempStr, hostPortStr.c_str(), MAX_CONNECT_STR); tempStr[MAX_CONNECT_STR]=0;
-            host=strtok(tempStr, ":");
-            port=strtok(NULL, "");
-            m_bIsDirectConnection=false;  
-        }else{
-            return handleConnError(CONN_INVALID_INPUT, getMessage(ERR_CONN_UNKPROTO, protocol.c_str()));
+            DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Choosing drillbit <" << (drillbits.size() - 1)  << ">. Selected " << endpoint.DebugString() << std::endl;)
+
         }
-        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Connecting to endpoint: " << host << ":" << port << std::endl;)
-        connectionStatus_t ret = this->connect(host.c_str(), port.c_str());
-        return ret;
-    }else if(std::strcmp(connStr, m_connectStr.c_str())){ // tring to connect to a different address is not allowed if already connected
-        return handleConnError(CONN_ALREADYCONNECTED, getMessage(ERR_CONN_ALREADYCONN));
+        if(err){
+            return handleConnError(CONN_ZOOKEEPER_ERROR, getMessage(ERR_CONN_ZOOKEEPER, zook.getError().c_str()));
+        }
+        zook.close();
+        m_bIsDirectConnection=true;
+    }else if(protocol == "local"){
+        boost::lock_guard<boost::mutex> lock(m_dcMutex);//strtok is not reentrant
+        char tempStr[MAX_CONNECT_STR+1];
+        strncpy(tempStr, hostPortStr.c_str(), MAX_CONNECT_STR); tempStr[MAX_CONNECT_STR]=0;
+        host=strtok(tempStr, ":");
+        port=strtok(NULL, "");
+        m_bIsDirectConnection=false;
+    }else{
+        return handleConnError(CONN_INVALID_INPUT, getMessage(ERR_CONN_UNKPROTO, protocol.c_str()));
     }
-    return CONN_SUCCESS;
+    DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Connecting to endpoint: " << host << ":" << port << std::endl;)
+    connectionStatus_t ret = this->connect(host.c_str(), port.c_str());
+    return ret;
 }
 
 connectionStatus_t DrillClientImpl::connect(const char* host, const char* port){
@@ -140,7 +125,7 @@ connectionStatus_t DrillClientImpl::connect(const char* host, const char* port){
             return handleConnError(CONN_FAILURE, getMessage(ERR_CONN_FAILURE, host, port, ec.message().c_str()));
         }
 
-    }catch(std::exception e){
+    }catch(const std::exception & e){
         // Handle case when the hostname cannot be resolved. "resolve" is hard-coded in boost asio resolver.resolve
         if (!strcmp(e.what(), "resolve")) {
             return handleConnError(CONN_HOSTNAME_RESOLUTION_ERROR, getMessage(ERR_CONN_EXCEPT, e.what()));
@@ -152,7 +137,7 @@ connectionStatus_t DrillClientImpl::connect(const char* host, const char* port){
     // set socket keep alive
     boost::asio::socket_base::keep_alive keepAlive(true);
     m_socket.set_option(keepAlive);
-	// set no_delay
+    // set no_delay
     boost::asio::ip::tcp::no_delay noDelay(true);
     m_socket.set_option(noDelay);
 
@@ -160,7 +145,7 @@ connectionStatus_t DrillClientImpl::connect(const char* host, const char* port){
     connectedHost << "id: " << m_socket.native_handle() << " address: " << host << ":" << port;
     m_connectedHost = connectedHost.str();
     DRILL_MT_LOG(DRILL_LOG(LOG_INFO) << "Connected to endpoint: " << m_connectedHost << std::endl;)
-    
+
     return CONN_SUCCESS;
 }
 
@@ -180,7 +165,7 @@ connectionStatus_t DrillClientImpl::sendHeartbeat(){
     connectionStatus_t status=CONN_SUCCESS;
     exec::rpc::Ack ack;
     ack.set_ok(true);
-    OutBoundRpcMessage heartbeatMsg(exec::rpc::PING, exec::user::ACK/*can be anything */, 0, &ack);
+    rpc::OutBoundRpcMessage heartbeatMsg(exec::rpc::PING, exec::user::ACK/*can be anything */, 0, &ack);
     boost::lock_guard<boost::mutex> prLock(this->m_prMutex);
     boost::lock_guard<boost::mutex> lock(m_dcMutex);
     DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Heartbeat sent." << std::endl;)
@@ -203,7 +188,7 @@ void DrillClientImpl::handleHeartbeatTimeout(const boost::system::error_code & e
     DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl:: Heartbeat timer expired." << std::endl;)
     if(err != boost::asio::error::operation_aborted){
         // Check whether the deadline has passed.
-        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::Heartbeat Timer -  Expires at: " 
+        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::Heartbeat Timer -  Expires at: "
             << to_simple_string(m_heartbeatTimer.expires_at())
             << " and time now is: "
             << to_simple_string(boost::asio::deadline_timer::traits_type::now())
@@ -231,8 +216,8 @@ void DrillClientImpl::Close() {
 }
 
 
-connectionStatus_t DrillClientImpl::sendSync(OutBoundRpcMessage& msg){
-    DrillClientImpl::s_encoder.Encode(m_wbuf, msg);
+connectionStatus_t DrillClientImpl::sendSync(rpc::OutBoundRpcMessage& msg){
+    encode(m_wbuf, msg);
     boost::system::error_code ec;
     size_t s=m_socket.write_some(boost::asio::buffer(m_wbuf), ec);
     if(!ec && s!=0){
@@ -292,9 +277,9 @@ void DrillClientImpl::handleHandshake(ByteBuf_t _buf,
     m_deadlineTimer.cancel();
     DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Deadline timer cancelled." << std::endl;)
     if(!error){
-        InBoundRpcMessage msg;
+        rpc::InBoundRpcMessage msg;
         uint32_t length = 0;
-        int bytes_read = DrillClientImpl::s_decoder.LengthDecode(m_rbuf, &length);
+        std::size_t bytes_read = rpc::lengthDecode(m_rbuf, length);
         if(length>0){
             size_t leftover = LEN_PREFIX_BUFLEN - bytes_read;
             ByteBuf_t b=m_rbuf + LEN_PREFIX_BUFLEN;
@@ -309,7 +294,11 @@ void DrillClientImpl::handleHandshake(ByteBuf_t _buf,
                 bytesToRead-=dataBytesRead;
                 b+=dataBytesRead;
             }
-            DrillClientImpl::s_decoder.Decode(m_rbuf+bytes_read, length, msg);
+            if (!decode(m_rbuf+bytes_read, length, msg)) {
+                DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleHandshake: ERR_CONN_RDFAIL. Cannot decode handshake.\n";)
+                handleConnError(CONN_FAILURE, getMessage(ERR_CONN_RDFAIL, "Cannot decode handshake"));
+                return;
+            }
         }else{
             DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleHandshake: ERR_CONN_RDFAIL. No handshake.\n";)
             handleConnError(CONN_FAILURE, getMessage(ERR_CONN_RDFAIL, "No handshake"));
@@ -321,6 +310,7 @@ void DrillClientImpl::handleHandshake(ByteBuf_t _buf,
         this->m_handshakeStatus=b2u.status();
         this->m_handshakeErrorId=b2u.errorid();
         this->m_handshakeErrorMsg=b2u.errormessage();
+        this->m_serverInfos = b2u.server_infos();
 
     }else{
         // boost error
@@ -362,6 +352,14 @@ connectionStatus_t DrillClientImpl::validateHandshake(DrillUserProperties* prope
     u2b.set_support_listening(true);
     u2b.set_support_timeout(true);
 
+    // Adding version info
+    exec::user::RpcEndpointInfos* infos = u2b.mutable_client_infos();
+    infos->set_name(DRILL_CONNECTOR_NAME);
+    infos->set_version(DRILL_VERSION_STRING);
+    infos->set_majorversion(DRILL_VERSION_MAJOR);
+    infos->set_minorversion(DRILL_VERSION_MINOR);
+    infos->set_patchversion(DRILL_VERSION_PATCH);
+
     if(properties != NULL && properties->size()>0){
         std::string username;
         std::string err;
@@ -374,7 +372,7 @@ connectionStatus_t DrillClientImpl::validateHandshake(DrillUserProperties* prope
         for(size_t i=0; i<properties->size(); i++){
             std::map<std::string,uint32_t>::const_iterator it=DrillUserProperties::USER_PROPERTIES.find(properties->keyAt(i));
             if(it==DrillUserProperties::USER_PROPERTIES.end()){
-                DRILL_MT_LOG(DRILL_LOG(LOG_WARNING) << "Connection property ("<< properties->keyAt(i) 
+                DRILL_MT_LOG(DRILL_LOG(LOG_WARNING) << "Connection property ("<< properties->keyAt(i)
                     << ") is unknown and is being skipped" << std::endl;)
                 continue;
             }
@@ -402,7 +400,7 @@ connectionStatus_t DrillClientImpl::validateHandshake(DrillUserProperties* prope
         boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
         uint64_t coordId = this->getNextCoordinationId();
 
-        OutBoundRpcMessage out_msg(exec::rpc::REQUEST, exec::user::HANDSHAKE, coordId, &u2b);
+        rpc::OutBoundRpcMessage out_msg(exec::rpc::REQUEST, exec::user::HANDSHAKE, coordId, &u2b);
         sendSync(out_msg);
         DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Sent handshake request message. Coordination id: " << coordId << "\n";)
     }
@@ -469,38 +467,159 @@ DrillClientQueryResult* DrillClientImpl::SubmitQuery(::exec::shared::QueryType t
     query.set_type(t);
     query.set_plan(plan);
 
-    uint64_t coordId;
-    DrillClientQueryResult* pQuery=NULL;
+    boost::function<DrillClientQueryResult*(int32_t)> factory = boost::bind(
+            boost::factory<DrillClientQueryResult*>(),
+            boost::ref(*this),
+            _1,
+            boost::cref(plan),
+            l,
+            lCtx);
+    return sendMsg(factory, ::exec::user::RUN_QUERY, query);
+}
+
+DrillClientPrepareHandle* DrillClientImpl::PrepareQuery(const std::string& plan,
+        pfnPreparedStatementListener l,
+        void* lCtx){
+    exec::user::CreatePreparedStatementReq query;
+    query.set_sql_query(plan);
+
+    boost::function<DrillClientPrepareHandle*(int32_t)> factory = boost::bind(
+            boost::factory<DrillClientPrepareHandle*>(),
+            boost::ref(*this),
+            _1,
+            boost::cref(plan),
+            l,
+            lCtx);
+    return sendMsg(factory, ::exec::user::CREATE_PREPARED_STATEMENT, query);
+}
+
+DrillClientQueryResult* DrillClientImpl::ExecuteQuery(const PreparedStatement& pstmt,
+        pfnQueryResultsListener l,
+        void* lCtx){
+    const DrillClientPrepareHandle& handle = static_cast<const DrillClientPrepareHandle&>(pstmt);
+
+    exec::user::RunQuery query;
+    query.set_results_mode(exec::user::STREAM_FULL);
+    query.set_type(::exec::shared::PREPARED_STATEMENT);
+    query.set_allocated_prepared_statement_handle(new ::exec::user::PreparedStatementHandle(handle.m_preparedStatementHandle));
+
+    boost::function<DrillClientQueryResult*(int32_t)> factory = boost::bind(
+            boost::factory<DrillClientQueryResult*>(),
+            boost::ref(*this),
+            _1,
+            boost::cref(handle.m_query),
+            l,
+            lCtx);
+    return sendMsg(factory, ::exec::user::RUN_QUERY, query);
+}
+
+DrillClientCatalogResult* DrillClientImpl::getCatalogs(const std::string& catalogPattern,
+        Metadata::pfnCatalogMetadataListener listener,
+        void* listenerCtx) {
+    exec::user::GetCatalogsReq query;
+    exec::user::LikeFilter* catalogFilter(query.mutable_catalog_name_filter());
+    catalogFilter->set_pattern(catalogPattern);
+
+    boost::function<DrillClientCatalogResult*(int32_t)> factory = boost::bind(
+            boost::factory<DrillClientCatalogResult*>(),
+            boost::ref(*this),
+            _1,
+            listener,
+            listenerCtx);
+    return sendMsg(factory, ::exec::user::GET_CATALOGS, query);
+}
+
+DrillClientSchemaResult* DrillClientImpl::getSchemas(const std::string& catalogPattern,
+        const std::string& schemaPattern,
+        Metadata::pfnSchemaMetadataListener listener,
+        void* listenerCtx) {
+    exec::user::GetSchemasReq query;
+    query.mutable_catalog_name_filter()->set_pattern(catalogPattern);
+    query.mutable_schema_name_filter()->set_pattern(schemaPattern);
+
+    boost::function<DrillClientSchemaResult*(int32_t)> factory = boost::bind(
+            boost::factory<DrillClientSchemaResult*>(),
+            boost::ref(*this),
+            _1,
+            listener,
+            listenerCtx);
+    return sendMsg(factory, ::exec::user::GET_SCHEMAS, query);
+}
+
+DrillClientTableResult* DrillClientImpl::getTables(const std::string& catalogPattern,
+        const std::string& schemaPattern,
+        const std::string& tablePattern,
+		const std::vector<std::string>* tableTypes,
+        Metadata::pfnTableMetadataListener listener,
+        void* listenerCtx) {
+    exec::user::GetTablesReq query;
+    query.mutable_catalog_name_filter()->set_pattern(catalogPattern);
+    query.mutable_schema_name_filter()->set_pattern(schemaPattern);
+    query.mutable_table_name_filter()->set_pattern(tablePattern);
+    if (tableTypes) {
+    	std::copy(tableTypes->begin(), tableTypes->end(),
+    			google::protobuf::RepeatedFieldBackInserter(query.mutable_table_type_filter()));
+    }
+
+    boost::function<DrillClientTableResult*(int32_t)> factory = boost::bind(
+            boost::factory<DrillClientTableResult*>(),
+            boost::ref(*this),
+            _1,
+            listener,
+            listenerCtx);
+    return sendMsg(factory, ::exec::user::GET_TABLES, query);
+}
+
+DrillClientColumnResult* DrillClientImpl::getColumns(const std::string& catalogPattern,
+        const std::string& schemaPattern,
+        const std::string& tablePattern,
+        const std::string& columnsPattern,
+        Metadata::pfnColumnMetadataListener listener,
+        void* listenerCtx) {
+    exec::user::GetColumnsReq query;
+    query.mutable_catalog_name_filter()->set_pattern(catalogPattern);
+    query.mutable_schema_name_filter()->set_pattern(schemaPattern);
+    query.mutable_table_name_filter()->set_pattern(tablePattern);
+    query.mutable_column_name_filter()->set_pattern(columnsPattern);
+
+    boost::function<DrillClientColumnResult*(int32_t)> factory = boost::bind(
+            boost::factory<DrillClientColumnResult*>(),
+            boost::ref(*this),
+            _1,
+            listener,
+            listenerCtx);
+    return sendMsg(factory, ::exec::user::GET_COLUMNS, query);
+}
+
+template<typename Handle>
+Handle* DrillClientImpl::sendMsg(boost::function<Handle*(int32_t)> handleFactory, ::exec::user::RpcType type, const ::google::protobuf::Message& message) {
+    int32_t coordId;
+    Handle* phandle=NULL;
     connectionStatus_t cStatus=CONN_SUCCESS;
     {
         boost::lock_guard<boost::mutex> prLock(this->m_prMutex);
         boost::lock_guard<boost::mutex> dcLock(this->m_dcMutex);
         coordId = this->getNextCoordinationId();
-        OutBoundRpcMessage out_msg(exec::rpc::REQUEST, exec::user::RUN_QUERY, coordId, &query);
+        rpc::OutBoundRpcMessage out_msg(exec::rpc::REQUEST, type, coordId, &message);
 
-        // Create the result object and register the listener before we send the query
-        // because sometimes the caller is not checking the status of the submitQuery call.
-        // This way, the broadcast error call will cause the results listener to be called
-        // with a COMM_ERROR status.
-        pQuery = new DrillClientQueryResult(this, coordId, plan);
-        pQuery->registerListener(l, lCtx);
-        this->m_queryIds[coordId]=pQuery;
+        phandle = handleFactory(coordId);
+        this->m_queryHandles[coordId]=phandle;
 
         connectionStatus_t cStatus=sendSync(out_msg);
         if(cStatus == CONN_SUCCESS){
             bool sendRequest=false;
 
-            DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG)  << "Sent query request. " << "[" << m_connectedHost << "]"  << "Coordination id = " << coordId << std::endl;)
-                DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG)  << "Sent query " <<  "Coordination id = " << coordId << " query: " << plan << std::endl;)
+            DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG)  << "Sent " << ::exec::user::RpcType_Name(type) << " request. " << "[" << m_connectedHost << "]"  << "Coordination id = " << coordId << std::endl;)
+                DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG)  << "Sent " << ::exec::user::RpcType_Name(type) <<  " Coordination id = " << coordId << " query: " << phandle->getQuery() << std::endl;)
 
                 if(m_pendingRequests++==0){
                     sendRequest=true;
                 }else{
-                    DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Queueing query request to server" << std::endl;)
+                    DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Queuing " << ::exec::user::RpcType_Name(type) <<  " request to server" << std::endl;)
                         DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Number of pending requests = " << m_pendingRequests << std::endl;)
                 }
             if(sendRequest){
-                DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Sending query request. Number of pending requests = "
+                DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Sending " << ::exec::user::RpcType_Name(type) <<  " request. Number of pending requests = "
                         << m_pendingRequests << std::endl;)
                     getNextResult(); // async wait for results
             }
@@ -508,21 +627,18 @@ DrillClientQueryResult* DrillClientImpl::SubmitQuery(::exec::shared::QueryType t
 
     }
     if(cStatus!=CONN_SUCCESS){
-        this->m_queryIds.erase(coordId);
-        delete pQuery;
+        this->m_queryHandles.erase(coordId);
+        delete phandle;
         return NULL;
     }
 
-
-
     //run this in a new thread
     startMessageListener();
 
-    return pQuery;
+    return phandle;
 }
 
 void DrillClientImpl::getNextResult(){
-
     // This call is always made from within a function where the mutex has already been acquired
     //boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
 
@@ -533,7 +649,7 @@ void DrillClientImpl::getNextResult(){
             AllocatedBuffer::s_memCV.wait(memLock);
         }
     }
-    
+
     //use free, not delete to free
     ByteBuf_t readBuf = Utils::allocateBuffer(LEN_PREFIX_BUFLEN);
     if (DrillClientConfig::getQueryTimeout() > 0){
@@ -577,8 +693,7 @@ void DrillClientImpl::waitForResults(){
 
 status_t DrillClientImpl::readMsg(ByteBuf_t _buf,
         AllocatedBufferPtr* allocatedBuffer,
-        InBoundRpcMessage& msg,
-        boost::system::error_code& error){
+        rpc::InBoundRpcMessage& msg){
 
     DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readMsg: Read message from buffer "
         <<  reinterpret_cast<int*>(_buf) << std::endl;)
@@ -590,7 +705,7 @@ status_t DrillClientImpl::readMsg(ByteBuf_t _buf,
         // We need to protect the readLength and read buffer, and the pending requests counter,
         // but we don't have to keep the lock while we decode the rest of the buffer.
         boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
-        int bytes_read = DrillClientImpl::s_decoder.LengthDecode(_buf, &rmsgLen);
+        std::size_t bytes_read = rpc::lengthDecode(_buf, rmsgLen);
         DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "len bytes = " << bytes_read << std::endl;)
         DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "rmsgLen = " << rmsgLen << std::endl;)
 
@@ -612,7 +727,7 @@ status_t DrillClientImpl::readMsg(ByteBuf_t _buf,
                 << (rmsgLen - leftover) << std::endl;)
             ByteBuf_t b=currentBuffer->m_pBuffer + leftover;
             size_t bytesToRead=rmsgLen - leftover;
-              
+            boost::system::error_code error;
             while(1){
                 size_t dataBytesRead=this->m_socket.read_some(
                         boost::asio::buffer(b, bytesToRead),
@@ -623,10 +738,14 @@ status_t DrillClientImpl::readMsg(ByteBuf_t _buf,
                 bytesToRead-=dataBytesRead;
                 b+=dataBytesRead;
             }
-            
+
             if(!error){
                 // read data successfully
-                DrillClientImpl::s_decoder.Decode(currentBuffer->m_pBuffer, rmsgLen, msg);
+                if (!decode(currentBuffer->m_pBuffer, rmsgLen, msg)) {
+                    Utils::freeBuffer(_buf, LEN_PREFIX_BUFLEN);
+                    return handleQryError(QRY_COMM_ERROR,
+                            getMessage(ERR_QRY_COMMERR, "Cannot decode server message"), NULL);;
+                }
                 DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Done decoding chunk. Coordination id: " <<msg.m_coord_id<< std::endl;)
             }else{
                 Utils::freeBuffer(_buf, LEN_PREFIX_BUFLEN);
@@ -645,7 +764,7 @@ status_t DrillClientImpl::readMsg(ByteBuf_t _buf,
     return QRY_SUCCESS;
 }
 
-status_t DrillClientImpl::processQueryResult(AllocatedBufferPtr  allocatedBuffer, InBoundRpcMessage& msg ){
+status_t DrillClientImpl::processQueryResult(AllocatedBufferPtr  allocatedBuffer, const rpc::InBoundRpcMessage& msg ){
     DrillClientQueryResult* pDrillClientQueryResult=NULL;
     status_t ret=QRY_SUCCESS;
     exec::shared::QueryId qid;
@@ -657,15 +776,15 @@ status_t DrillClientImpl::processQueryResult(AllocatedBufferPtr  allocatedBuffer
         DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Processing Query Result " << std::endl;)
         qr.ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size());
         DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << qr.DebugString() << std::endl;)
-        
+
         qid.CopyFrom(qr.query_id());
-        
+
         if (qr.has_query_state() &&
                 qr.query_state() != exec::shared::QueryResult_QueryState_RUNNING &&
                 qr.query_state() != exec::shared::QueryResult_QueryState_STARTING) {
             pDrillClientQueryResult=findQueryResult(qid);
-            //Queries that have been cancelled or whose resources are freed before completion 
-            //do not have a DrillClientQueryResult object. We need not handle the terminal message 
+            //Queries that have been cancelled or whose resources are freed before completion
+            //do not have a DrillClientQueryResult object. We need not handle the terminal message
             //in that case since all it does is to free resources (and they have already been freed)
             if(pDrillClientQueryResult!=NULL){
                 //Validate the RPC message
@@ -703,10 +822,10 @@ status_t DrillClientImpl::processQueryResult(AllocatedBufferPtr  allocatedBuffer
     return ret;
 }
 
-status_t DrillClientImpl::processQueryData(AllocatedBufferPtr  allocatedBuffer, InBoundRpcMessage& msg ){
+status_t DrillClientImpl::processQueryData(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg ){
     DrillClientQueryResult* pDrillClientQueryResult=NULL;
     status_t ret=QRY_SUCCESS;
-    exec::shared::QueryId qid;
+    ::exec::shared::QueryId qid;
     // Be a good client and send ack as early as possible.
     // Drillbit pushed the query result to the client, the client should send ack
     // whenever it receives the message
@@ -720,7 +839,7 @@ status_t DrillClientImpl::processQueryData(AllocatedBufferPtr  allocatedBuffer,
         qr->ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size());
         DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << qr->DebugString() << std::endl;)
 
-        qid.CopyFrom(qr->query_id());
+        qid = ::exec::shared::QueryId(qr->query_id());
         if(qid.part1()==0){
             DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryData: QID=0. Ignore and return QRY_SUCCESS." << std::endl;)
             delete allocatedBuffer;
@@ -729,13 +848,13 @@ status_t DrillClientImpl::processQueryData(AllocatedBufferPtr  allocatedBuffer,
 
         pDrillClientQueryResult=findQueryResult(qid);
         if(pDrillClientQueryResult==NULL){
-            DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Cleaning up resources allocated for canceled query (" 
+            DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Cleaning up resources allocated for canceled query ("
                                  << debugPrintQid(qid) << ")." << std::endl;)
             delete qr;
             delete allocatedBuffer;
             return ret;
         }
-        
+
         //Validate the RPC message
         std::string valErr;
         if( (ret=validateDataMessage(msg, *qr, valErr)) != QRY_SUCCESS){
@@ -765,20 +884,13 @@ status_t DrillClientImpl::processQueryData(AllocatedBufferPtr  allocatedBuffer,
         }
 
         pDrillClientQueryResult->setIsQueryPending(true);
-        pfnQueryResultsListener pResultsListener=pDrillClientQueryResult->m_pResultsListener;
         if(pDrillClientQueryResult->m_bIsLastChunk){
             DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << debugPrintQid(*pDrillClientQueryResult->m_pQueryId)
                 <<  "Received last batch. " << std::endl;)
             ret=QRY_NO_MORE_DATA;
         }
         pDrillClientQueryResult->setQueryStatus(ret);
-        if(pResultsListener!=NULL){
-            ret = pResultsListener(pDrillClientQueryResult, pRecordBatch, NULL);
-        }else{
-            //Use a default callback that is called when a record batch is received
-            ret = pDrillClientQueryResult->defaultQueryResultsListener(pDrillClientQueryResult,
-                    pRecordBatch, NULL);
-        }
+        ret = pDrillClientQueryResult->notifyListener(pRecordBatch, NULL);
     } // release lock
     if(ret==QRY_FAILURE){
         sendCancel(&qid);
@@ -787,31 +899,37 @@ status_t DrillClientImpl::processQueryData(AllocatedBufferPtr  allocatedBuffer,
         pDrillClientQueryResult->setIsQueryPending(false);
         DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Client app cancelled query." << std::endl;)
         pDrillClientQueryResult->setQueryStatus(ret);
-        clearMapEntries(pDrillClientQueryResult);
+        removeQueryHandle(pDrillClientQueryResult);
+        removeQueryResult(pDrillClientQueryResult);
         return ret;
     }
     return ret;
 }
 
-status_t DrillClientImpl::processQueryId(AllocatedBufferPtr allocatedBuffer, InBoundRpcMessage& msg ){
-    DrillClientQueryResult* pDrillClientQueryResult=NULL;
+status_t DrillClientImpl::processQueryId(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg ){
     DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Processing Query Handle with coordination id:" << msg.m_coord_id << std::endl;)
     status_t ret=QRY_SUCCESS;
 
+    // make sure to deallocate buffer
+    boost::shared_ptr<AllocatedBuffer> deallocationGuard(allocatedBuffer);
     boost::lock_guard<boost::mutex> lock(m_dcMutex);
-    std::map<int,DrillClientQueryResult*>::iterator it;
-    for(it=this->m_queryIds.begin();it!=this->m_queryIds.end();it++){
-        std::string qidString = it->second->m_pQueryId!=NULL?debugPrintQid(*it->second->m_pQueryId):std::string("NULL");
-        DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::processQueryId: m_queryIds: coordinationId: " << it->first
+    for(std::map< ::exec::shared::QueryId*, DrillClientQueryResult*>::const_iterator it=this->m_queryResults.begin();it!=this->m_queryResults.end();it++){
+        DrillClientQueryResult* pDrillClientQueryResult=it->second;
+        std::string qidString = (pDrillClientQueryResult->m_pQueryId!=NULL)?debugPrintQid(*pDrillClientQueryResult->m_pQueryId):std::string("NULL");
+        DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::processQueryId: m_queryIds: coordinationId: " << pDrillClientQueryResult->m_coordinationId
         << " QueryId: "<< qidString << std::endl;)
     }
     if(msg.m_coord_id==0){
         DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryId: m_coord_id=0. Ignore and return QRY_SUCCESS." << std::endl;)
         return QRY_SUCCESS;
     }
-    it=this->m_queryIds.find(msg.m_coord_id);
-    if(it!=this->m_queryIds.end()){
-        pDrillClientQueryResult=(*it).second;
+    std::map<int, DrillClientQueryHandle*>::const_iterator it;
+    it=this->m_queryHandles.find(msg.m_coord_id);
+    if(it!=this->m_queryHandles.end()){
+        DrillClientQueryResult* pDrillClientQueryResult=dynamic_cast<DrillClientQueryResult*>((*it).second);
+        if (!pDrillClientQueryResult) {
+            return handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVQUERYID), NULL);
+        }
         exec::shared::QueryId *qid = new exec::shared::QueryId;
         DRILL_MT_LOG(DRILL_LOG(LOG_TRACE)  << "Received Query Handle " << msg.m_pbody.size() << std::endl;)
         qid->ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size());
@@ -820,14 +938,241 @@ status_t DrillClientImpl::processQueryId(AllocatedBufferPtr allocatedBuffer, InB
         //save queryId allocated here so we can free it later
         pDrillClientQueryResult->setQueryId(qid);
     }else{
-        delete allocatedBuffer;
         return handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVQUERYID), NULL);
     }
-    delete allocatedBuffer;
     return ret;
 }
 
-DrillClientQueryResult* DrillClientImpl::findQueryResult(exec::shared::QueryId& qid){
+status_t DrillClientImpl::processPreparedStatement(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg ){
+    DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Processing Prepared Statement with coordination id:" << msg.m_coord_id << std::endl;)
+    status_t ret=QRY_SUCCESS;
+
+    // make sure to deallocate buffer
+    boost::shared_ptr<AllocatedBuffer> deallocationGuard(allocatedBuffer);
+    boost::lock_guard<boost::mutex> lock(m_dcMutex);
+
+    if(msg.m_coord_id==0){
+        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processPreparedStatement: m_coord_id=0. Ignore and return QRY_SUCCESS." << std::endl;)
+        return QRY_SUCCESS;
+    }
+    std::map<int,DrillClientQueryHandle*>::const_iterator it=this->m_queryHandles.find(msg.m_coord_id);
+    if(it!=this->m_queryHandles.end()){
+        DrillClientPrepareHandle* pDrillClientPrepareHandle=static_cast<DrillClientPrepareHandle*>((*it).second);
+        exec::user::CreatePreparedStatementResp resp;
+        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE)  << "Received Prepared Statement Handle " << msg.m_pbody.size() << std::endl;)
+        if (!resp.ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size())) {
+            return handleQryError(QRY_COMM_ERROR, "Cannot decode prepared statement", pDrillClientPrepareHandle);
+        }
+        if (resp.has_status() && resp.status() != exec::user::OK) {
+            return handleQryError(QRY_FAILED, resp.error(), pDrillClientPrepareHandle);
+        }
+        pDrillClientPrepareHandle->setupPreparedStatement(resp.prepared_statement());
+        pDrillClientPrepareHandle->notifyListener(pDrillClientPrepareHandle, NULL);
+        DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Prepared Statement handle - " << resp.prepared_statement().server_handle().DebugString() << std::endl;)
+    }else{
+        return handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVQUERYID), NULL);
+    }
+    m_pendingRequests--;
+    DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processPreparedStament: " << m_pendingRequests << " requests pending." << std::endl;)
+    if(m_pendingRequests==0){
+        // signal any waiting client that it can exit because there are no more any query results to arrive.
+        // We keep the heartbeat going though.
+        m_cv.notify_one();
+    }
+    return ret;
+}
+
+status_t DrillClientImpl::processCatalogsResult(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg ){
+    DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Processing GetCatalogsResp with coordination id:" << msg.m_coord_id << std::endl;)
+    status_t ret=QRY_SUCCESS;
+
+    // make sure to deallocate buffer
+    boost::shared_ptr<AllocatedBuffer> deallocationGuard(allocatedBuffer);
+    boost::lock_guard<boost::mutex> lock(m_dcMutex);
+
+    if(msg.m_coord_id==0){
+        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processCatalogsResult: m_coord_id=0. Ignore and return QRY_SUCCESS." << std::endl;)
+        return QRY_SUCCESS;
+    }
+    std::map<int,DrillClientQueryHandle*>::const_iterator it=this->m_queryHandles.find(msg.m_coord_id);
+    if(it!=this->m_queryHandles.end()){
+        DrillClientCatalogResult* pHandle=static_cast<DrillClientCatalogResult*>((*it).second);
+        exec::user::GetCatalogsResp* resp = new exec::user::GetCatalogsResp;
+        pHandle->attachMetadataResult(resp);
+        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE)  << "Received GetCatalogs result Handle " << msg.m_pbody.size() << std::endl;)
+        if (!(resp->ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size()))) {
+            return handleQryError(QRY_COMM_ERROR, "Cannot decode getcatalogs results", pHandle);
+        }
+        if (resp->status() != exec::user::OK) {
+            return handleQryError(QRY_FAILED, resp->error(), pHandle);
+        }
+
+        const ::google::protobuf::RepeatedPtrField< ::exec::user::CatalogMetadata>& catalogs = resp->catalogs();
+        pHandle->m_meta.clear();
+        pHandle->m_meta.reserve(resp->catalogs_size());
+
+        for(::google::protobuf::RepeatedPtrField< ::exec::user::CatalogMetadata>::const_iterator it = catalogs.begin(); it != catalogs.end(); ++it) {
+            meta::DrillCatalogMetadata meta(*it);
+            pHandle->m_meta.push_back(meta);
+        }
+        pHandle->notifyListener(&pHandle->m_meta, NULL);
+        DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "GetCatalogs result -  " << resp->catalogs_size() << " catalog(s)" << std::endl;)
+    }else{
+        return handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVQUERYID), NULL);
+    }
+    m_pendingRequests--;
+    DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processCatalogsResult: " << m_pendingRequests << " requests pending." << std::endl;)
+    if(m_pendingRequests==0){
+        // signal any waiting client that it can exit because there are no more any query results to arrive.
+        // We keep the heartbeat going though.
+        m_cv.notify_one();
+    }
+    return ret;
+}
+
+status_t DrillClientImpl::processSchemasResult(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg ){
+    DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Processing GetSchemaResp with coordination id:" << msg.m_coord_id << std::endl;)
+    status_t ret=QRY_SUCCESS;
+
+    // make sure to deallocate buffer
+    boost::shared_ptr<AllocatedBuffer> deallocationGuard(allocatedBuffer);
+    boost::lock_guard<boost::mutex> lock(m_dcMutex);
+
+    if(msg.m_coord_id==0){
+        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processSchemasResult: m_coord_id=0. Ignore and return QRY_SUCCESS." << std::endl;)
+        return QRY_SUCCESS;
+    }
+    std::map<int,DrillClientQueryHandle*>::const_iterator it=this->m_queryHandles.find(msg.m_coord_id);
+    if(it!=this->m_queryHandles.end()){
+        DrillClientSchemaResult* pHandle=static_cast<DrillClientSchemaResult*>((*it).second);
+        exec::user::GetSchemasResp* resp = new exec::user::GetSchemasResp();
+        pHandle->attachMetadataResult(resp);
+        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE)  << "Received GetSchemasResp result Handle " << msg.m_pbody.size() << std::endl;)
+        if (!(resp->ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size()))) {
+            return handleQryError(QRY_COMM_ERROR, "Cannot decode getschemas results", pHandle);
+        }
+        if (resp->status() != exec::user::OK) {
+            return handleQryError(QRY_FAILED, resp->error(), pHandle);
+        }
+
+        const ::google::protobuf::RepeatedPtrField< ::exec::user::SchemaMetadata>& schemas = resp->schemas();
+        pHandle->m_meta.clear();
+        pHandle->m_meta.reserve(resp->schemas_size());
+
+        for(::google::protobuf::RepeatedPtrField< ::exec::user::SchemaMetadata>::const_iterator it = schemas.begin(); it != schemas.end(); ++it) {
+            meta::DrillSchemaMetadata meta(*it);
+            pHandle->m_meta.push_back(meta);
+        }
+        pHandle->notifyListener(&pHandle->m_meta, NULL);
+        DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "GetSchemaResp result - " << resp->schemas_size() << " schema(s)" << std::endl;)
+    }else{
+        return handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVQUERYID), NULL);
+    }
+    m_pendingRequests--;
+    DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processSchemasResult: " << m_pendingRequests << " requests pending." << std::endl;)
+    if(m_pendingRequests==0){
+        // signal any waiting client that it can exit because there are no more any query results to arrive.
+        // We keep the heartbeat going though.
+        m_cv.notify_one();
+    }
+    return ret;
+}
+
+status_t DrillClientImpl::processTablesResult(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg ){
+    DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Processing GetTablesResp with coordination id:" << msg.m_coord_id << std::endl;)
+    status_t ret=QRY_SUCCESS;
+
+    // make sure to deallocate buffer
+    boost::shared_ptr<AllocatedBuffer> deallocationGuard(allocatedBuffer);
+    boost::lock_guard<boost::mutex> lock(m_dcMutex);
+
+    if(msg.m_coord_id==0){
+        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processTablesResult: m_coord_id=0. Ignore and return QRY_SUCCESS." << std::endl;)
+        return QRY_SUCCESS;
+    }
+    std::map<int,DrillClientQueryHandle*>::const_iterator it=this->m_queryHandles.find(msg.m_coord_id);
+    if(it!=this->m_queryHandles.end()){
+        DrillClientTableResult* pHandle=static_cast<DrillClientTableResult*>((*it).second);
+        exec::user::GetTablesResp* resp =  new exec::user::GetTablesResp();
+        pHandle->attachMetadataResult(resp);
+        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE)  << "Received GeTablesResp result Handle " << msg.m_pbody.size() << std::endl;)
+        if (!(resp->ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size()))) {
+            return handleQryError(QRY_COMM_ERROR, "Cannot decode gettables results", pHandle);
+        }
+        if (resp->status() != exec::user::OK) {
+            return handleQryError(QRY_FAILED, resp->error(), pHandle);
+        }
+        const ::google::protobuf::RepeatedPtrField< ::exec::user::TableMetadata>& tables = resp->tables();
+        pHandle->m_meta.clear();
+        pHandle->m_meta.reserve(resp->tables_size());
+
+        for(::google::protobuf::RepeatedPtrField< ::exec::user::TableMetadata>::const_iterator it = tables.begin(); it != tables.end(); ++it) {
+            meta::DrillTableMetadata meta(*it);
+            pHandle->m_meta.push_back(meta);
+        }
+        pHandle->notifyListener(&pHandle->m_meta, NULL);
+        DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "GetTables result - " << resp->tables_size() << " table(s)" << std::endl;)
+    }else{
+        return handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVQUERYID), NULL);
+    }
+    m_pendingRequests--;
+    DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processTablesResult: " << m_pendingRequests << " requests pending." << std::endl;)
+    if(m_pendingRequests==0){
+        // signal any waiting client that it can exit because there are no more any query results to arrive.
+        // We keep the heartbeat going though.
+        m_cv.notify_one();
+    }
+    return ret;
+}
+
+status_t DrillClientImpl::processColumnsResult(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg ){
+    DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Processing GetColumnsResp with coordination id:" << msg.m_coord_id << std::endl;)
+    status_t ret=QRY_SUCCESS;
+
+    // make sure to deallocate buffer
+    boost::shared_ptr<AllocatedBuffer> deallocationGuard(allocatedBuffer);
+    boost::lock_guard<boost::mutex> lock(m_dcMutex);
+
+    if(msg.m_coord_id==0){
+         DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processColumnsResult: m_coord_id=0. Ignore and return QRY_SUCCESS." << std::endl;)
+        return QRY_SUCCESS;
+    }
+    std::map<int,DrillClientQueryHandle*>::const_iterator it=this->m_queryHandles.find(msg.m_coord_id);
+    if(it!=this->m_queryHandles.end()){
+        DrillClientColumnResult* pHandle=static_cast<DrillClientColumnResult*>((*it).second);
+        exec::user::GetColumnsResp* resp = new exec::user::GetColumnsResp();
+        pHandle->attachMetadataResult(resp);
+        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE)  << "Received GetColumnsResp result Handle " << msg.m_pbody.size() << std::endl;)
+        if (!(resp->ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size()))) {
+            return handleQryError(QRY_COMM_ERROR, "Cannot decode getcolumns results", pHandle);
+        }
+        if (resp->status() != exec::user::OK) {
+            return handleQryError(QRY_FAILED, resp->error(), pHandle);
+        }
+        const ::google::protobuf::RepeatedPtrField< ::exec::user::ColumnMetadata>& columns = resp->columns();
+        pHandle->m_meta.clear();
+        pHandle->m_meta.reserve(resp->columns_size());
+
+        for(::google::protobuf::RepeatedPtrField< ::exec::user::ColumnMetadata>::const_iterator it = columns.begin(); it != columns.end(); ++it) {
+            meta::DrillColumnMetadata meta(*it);
+            pHandle->m_meta.push_back(meta);
+        }
+        pHandle->notifyListener(&pHandle->m_meta, NULL);
+        DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "GetColumnsResp result - " << resp->columns_size() << " columns(s)" << std::endl;)
+    }else{
+        return handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVQUERYID), NULL);
+    }
+    m_pendingRequests--;
+    DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processColumnsResult: " << m_pendingRequests << " requests pending." << std::endl;)
+    if(m_pendingRequests==0){
+        // signal any waiting client that it can exit because there are no more any query results to arrive.
+        // We keep the heartbeat going though.
+        m_cv.notify_one();
+    }
+    return ret;
+}
+
+DrillClientQueryResult* DrillClientImpl::findQueryResult(const exec::shared::QueryId& qid){
     DrillClientQueryResult* pDrillClientQueryResult=NULL;
     DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Searching for Query Id - " << debugPrintQid(qid) << std::endl;)
     std::map<exec::shared::QueryId*, DrillClientQueryResult*, compareQueryId>::iterator it;
@@ -838,7 +1183,7 @@ DrillClientQueryResult* DrillClientImpl::findQueryResult(exec::shared::QueryId&
                 << it->first->part2() << "]\n";)
         }
     }
-    it=this->m_queryResults.find(&qid);
+    it=this->m_queryResults.find(const_cast<exec::shared::QueryId * const>(&qid));
     if(it!=this->m_queryResults.end()){
         pDrillClientQueryResult=(*it).second;
         DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Drill Client Query Result Query Id - " <<
@@ -925,9 +1270,8 @@ void DrillClientImpl::handleReadTimeout(const boost::system::error_code & err){
 }
 
 void DrillClientImpl::handleRead(ByteBuf_t _buf,
-        const boost::system::error_code& err,
+        const boost::system::error_code& error,
         size_t bytes_transferred) {
-    boost::system::error_code error=err;
     DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handle Read from buffer "
         <<  reinterpret_cast<int*>(_buf) << std::endl;)
     if(DrillClientConfig::getQueryTimeout() > 0){
@@ -935,120 +1279,153 @@ void DrillClientImpl::handleRead(ByteBuf_t _buf,
         DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Cancel deadline timer.\n";)
         m_deadlineTimer.cancel();
     }
-    if(!error){
-        InBoundRpcMessage msg;
-        boost::lock_guard<boost::mutex> lock(this->m_prMutex);
+    if (error) {
+        // boost error
+        Utils::freeBuffer(_buf, LEN_PREFIX_BUFLEN);
+        boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
+        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: ERR_QRY_COMMERR. "
+            "Boost Communication Error: " << error.message() << std::endl;)
+        handleQryError(QRY_COMM_ERROR, getMessage(ERR_QRY_COMMERR, error.message().c_str()), NULL);
+        return;
+    }
 
-        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Getting new message" << std::endl;)
-        AllocatedBufferPtr allocatedBuffer=NULL;
+    rpc::InBoundRpcMessage msg;
+    boost::lock_guard<boost::mutex> lockPR(this->m_prMutex);
 
-        if(readMsg(_buf, &allocatedBuffer, msg, error)!=QRY_SUCCESS){
-            if(m_pendingRequests!=0){
-                boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
-                getNextResult();
-            }
-            return;
+    DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Getting new message" << std::endl;)
+    AllocatedBufferPtr allocatedBuffer=NULL;
+
+    if(readMsg(_buf, &allocatedBuffer, msg)!=QRY_SUCCESS){
+        delete allocatedBuffer;
+        if(m_pendingRequests!=0){
+            boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
+            getNextResult();
         }
+        return;
+    }
 
-        if(!error && msg.m_mode==exec::rpc::PONG){ //heartbeat response. Throw it away
-            m_pendingRequests--;
+    if(msg.m_mode==exec::rpc::PONG) { //heartbeat response. Throw it away
+        m_pendingRequests--;
+        delete allocatedBuffer;
+        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Received heartbeat from server. " <<  std::endl;)
+        if(m_pendingRequests!=0){
+            boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
+            getNextResult();
+        }else{
+            boost::unique_lock<boost::mutex> cvLock(this->m_dcMutex);
+            DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "No more results expected from server. " <<  std::endl;)
+            m_cv.notify_one();
+        }
+
+        return;
+    }
+
+    if(msg.m_mode == exec::rpc::RESPONSE) {
+        status_t s;
+        switch(msg.m_rpc_type) {
+        case exec::user::QUERY_HANDLE:
+            s = processQueryId(allocatedBuffer, msg);
+            break;
+
+        case exec::user::PREPARED_STATEMENT:
+            s = processPreparedStatement(allocatedBuffer, msg);
+            break;
+
+        case exec::user::CATALOGS:
+            s = processCatalogsResult(allocatedBuffer, msg);
+            break;
+
+        case exec::user::SCHEMAS:
+            s = processSchemasResult(allocatedBuffer, msg);
+            break;
+
+        case exec::user::TABLES:
+            s = processTablesResult(allocatedBuffer, msg);
+            break;
+
+        case exec::user::COLUMNS:
+            s = processColumnsResult(allocatedBuffer, msg);
+            break;
+
+        case exec::user::HANDSHAKE:
+            DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handshake response from server. Ignore.\n";)
             delete allocatedBuffer;
-            DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Received heartbeat from server. " <<  std::endl;)
-            if(m_pendingRequests!=0){
-                boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
-                getNextResult();
-            }else{
-                boost::unique_lock<boost::mutex> cvLock(this->m_dcMutex);
-                DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "No more results expected from server. " <<  std::endl;)
-                m_cv.notify_one();
-            }
-            return;
-        }else if(!error && msg.m_rpc_type==exec::user::QUERY_RESULT){
-            status_t s = processQueryResult(allocatedBuffer, msg);
-            if(s !=QRY_SUCCESS && s!= QRY_NO_MORE_DATA){
-                if(m_pendingRequests!=0){
-                    boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
-                    getNextResult();
-                }
-                return;
-            }
-        }else if(!error && msg.m_rpc_type==exec::user::QUERY_DATA){
-            if(processQueryData(allocatedBuffer, msg)!=QRY_SUCCESS){
-                if(m_pendingRequests!=0){
-                    boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
-                    getNextResult();
-                }
-                return;
-            }
-        }else if(!error && msg.m_rpc_type==exec::user::QUERY_HANDLE){
-            if(processQueryId(allocatedBuffer, msg)!=QRY_SUCCESS){
-                if(m_pendingRequests!=0){
-                    boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
-                    getNextResult();
-                }
-                return;
-            }
-        }else if(!error && msg.m_rpc_type==exec::user::ACK){
+            break;
+
+        case exec::user::ACK:
             // Cancel requests will result in an ACK sent back.
             // Consume silently
+            s = QRY_CANCELED;
             delete allocatedBuffer;
-            if(m_pendingRequests!=0){
-                boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
-                getNextResult();
-            }
-            return;
-        }else{
+            break;
+
+        default:
+            DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: ERR_QRY_INVRPCTYPE. "
+                    << "QueryResult returned " << msg.m_rpc_type << std::endl;)
+            delete allocatedBuffer;
+            handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVRPCTYPE, msg.m_rpc_type), NULL);
+        }
+
+        if (m_pendingRequests != 0) {
             boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
-            if(error){
-                // We have a socket read error, but we do not know which query this is for.
-                // Signal ALL pending queries that they should stop waiting.
-                delete allocatedBuffer;
-                DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "read error: " << error << std::endl;)
-                handleQryError(QRY_COMM_ERROR, getMessage(ERR_QRY_COMMERR, error.message().c_str()), NULL);
-                return;
-            }else{
-                // If not QUERY_RESULT, then we think something serious has gone wrong?
-                // In one case when the client hung, we observed that the server was sending a handshake request to the client
-                // We should properly handle these handshake requests/responses
-                if(msg.has_rpc_type() && msg.m_rpc_type==exec::user::HANDSHAKE){
-                    if(msg.has_mode() && msg.m_mode==exec::rpc::REQUEST){
-                        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handshake request from server. Send response.\n";)
-                        exec::user::UserToBitHandshake u2b;
-                        u2b.set_channel(exec::shared::USER);
-                        u2b.set_rpc_version(DRILL_RPC_VERSION);
-                        u2b.set_support_listening(true);
-                        OutBoundRpcMessage out_msg(exec::rpc::RESPONSE, exec::user::HANDSHAKE, msg.m_coord_id, &u2b);
-                        sendSync(out_msg);
-                        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handshake response sent.\n";)
-                    }else{
-                        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handshake response from server. Ignore.\n";)
-                    }
-                }else{
-                    DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: ERR_QRY_INVRPCTYPE. "
-                        << "QueryResult returned " << msg.m_rpc_type << std::endl;)
-                    handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVRPCTYPE, msg.m_rpc_type), NULL);
-                }
-                delete allocatedBuffer;
-                return;
+            getNextResult();
+        }
+
+        return;
+    }
+
+    if (msg.has_mode() && msg.m_mode == exec::rpc::REQUEST) {
+        status_t s;
+        switch(msg.m_rpc_type) {
+        case exec::user::QUERY_RESULT:
+            s = processQueryResult(allocatedBuffer, msg);
+            break;
+
+        case exec::user::QUERY_DATA:
+            s = processQueryData(allocatedBuffer, msg);
+            break;
+
+        case exec::user::HANDSHAKE:
+            DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handshake request from server. Send response.\n";)
+            delete allocatedBuffer;
+            // In one case when the client hung, we observed that the server was sending a handshake request to the client
+            // We should properly handle these handshake requests/responses
+            {
+                boost::lock_guard<boost::mutex> lockDC(this->m_dcMutex);
+                exec::user::UserToBitHandshake u2b;
+                u2b.set_channel(exec::shared::USER);
+                u2b.set_rpc_version(DRILL_RPC_VERSION);
+                u2b.set_support_listening(true);
+                rpc::OutBoundRpcMessage out_msg(exec::rpc::RESPONSE, exec::user::HANDSHAKE, msg.m_coord_id, &u2b);
+                sendSync(out_msg);
+                DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handshake response sent.\n";)
             }
+            break;
+
+        default:
+            DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: ERR_QRY_INVRPCTYPE. "
+                    << "QueryResult returned " << msg.m_rpc_type << std::endl;)
+            delete allocatedBuffer;
+            handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVRPCTYPE, msg.m_rpc_type), NULL);
         }
-        {
+
+        if (m_pendingRequests != 0) {
             boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
             getNextResult();
         }
-    }else{
-        // boost error
-        Utils::freeBuffer(_buf, LEN_PREFIX_BUFLEN);
-        boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
-        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: ERR_QRY_COMMERR. "
-            "Boost Communication Error: " << error.message() << std::endl;)
-        handleQryError(QRY_COMM_ERROR, getMessage(ERR_QRY_COMMERR, error.message().c_str()), NULL);
+
         return;
     }
-    return;
+
+    // If not QUERY_RESULT, then we think something serious has gone wrong?
+    DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: ERR_QRY_INVRPCTYPE. "
+        << "QueryResult returned " << msg.m_rpc_type << " for " << msg.m_mode << std::endl;)
+    handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVRPCTYPE, msg.m_rpc_type), NULL);
+    delete allocatedBuffer;
+
 }
 
-status_t DrillClientImpl::validateDataMessage(InBoundRpcMessage& msg, exec::shared::QueryData& qd, std::string& valErr){
+status_t DrillClientImpl::validateDataMessage(const rpc::InBoundRpcMessage& msg, const exec::shared::QueryData& qd, std::string& valErr){
     if(msg.m_mode == exec::rpc::RESPONSE_FAILURE){
         valErr=getMessage(ERR_QRY_RESPFAIL);
         return QRY_FAILURE;
@@ -1060,7 +1437,7 @@ status_t DrillClientImpl::validateDataMessage(InBoundRpcMessage& msg, exec::shar
     return QRY_SUCCESS;
 }
 
-status_t DrillClientImpl::validateResultMessage(InBoundRpcMessage& msg, exec::shared::QueryResult& qr, std::string& valErr){
+status_t DrillClientImpl::validateResultMessage(const rpc::InBoundRpcMessage& msg, const exec::shared::QueryResult& qr, std::string& valErr){
     if(msg.m_mode == exec::rpc::RESPONSE_FAILURE){
         valErr=getMessage(ERR_QRY_RESPFAIL);
         return QRY_FAILURE;
@@ -1072,10 +1449,10 @@ status_t DrillClientImpl::validateResultMessage(InBoundRpcMessage& msg, exec::sh
     return QRY_SUCCESS;
 }
 
-connectionStatus_t DrillClientImpl::handleConnError(connectionStatus_t status, std::string msg){
+connectionStatus_t DrillClientImpl::handleConnError(connectionStatus_t status, const std::string& msg){
     DrillClientError* pErr = new DrillClientError(status, DrillClientError::CONN_ERROR_START+status, msg);
     m_pendingRequests=0;
-    if(!m_queryIds.empty()){
+    if(!m_queryHandles.empty()){
         // set query error only if queries are running
         broadcastError(pErr);
     }else{
@@ -1086,12 +1463,12 @@ connectionStatus_t DrillClientImpl::handleConnError(connectionStatus_t status, s
     return status;
 }
 
-status_t DrillClientImpl::handleQryError(status_t status, std::string msg, DrillClientQueryResult* pQueryResult){
+status_t DrillClientImpl::handleQryError(status_t status, const std::string& msg, DrillClientQueryHandle* pQueryHandle){
     DrillClientError* pErr = new DrillClientError(status, DrillClientError::QRY_ERROR_START+status, msg);
     // set query error only if queries are running
-    if(pQueryResult!=NULL){
+    if(pQueryHandle!=NULL){
         m_pendingRequests--;
-        pQueryResult->signalError(pErr);
+        pQueryHandle->signalError(pErr);
     }else{
         m_pendingRequests=0;
         broadcastError(pErr);
@@ -1101,27 +1478,27 @@ status_t DrillClientImpl::handleQryError(status_t status, std::string msg, Drill
 
 status_t DrillClientImpl::handleQryError(status_t status,
         const exec::shared::DrillPBError& e,
-        DrillClientQueryResult* pQueryResult){
-    assert(pQueryResult!=NULL);
+        DrillClientQueryHandle* pQueryHandle){
+    assert(pQueryHandle!=NULL);
     DrillClientError* pErr =  DrillClientError::getErrorObject(e);
-    pQueryResult->signalError(pErr);
+    pQueryHandle->signalError(pErr);
     m_pendingRequests--;
     return status;
 }
 
 void DrillClientImpl::broadcastError(DrillClientError* pErr){
     if(pErr!=NULL){
-        std::map<int, DrillClientQueryResult*>::iterator iter;
-        if(!m_queryIds.empty()){
-            for(iter = m_queryIds.begin(); iter != m_queryIds.end(); iter++) {
+        std::map<int, DrillClientQueryHandle*>::const_iterator iter;
+        if(!m_queryHandles.empty()){
+            for(iter = m_queryHandles.begin(); iter != m_queryHandles.end(); iter++) {
                 DrillClientError* err=new DrillClientError(pErr->status, pErr->errnum, pErr->msg);
                 iter->second->signalError(err);
             }
         }
         delete pErr;
     }
-    // We have an error at the connection level. Cancel the heartbeat. 
-    // And close the connection 
+    // We have an error at the connection level. Cancel the heartbeat.
+    // And close the connection
     m_heartbeatTimer.cancel();
     m_pendingRequests=0;
     m_cv.notify_one();
@@ -1132,7 +1509,7 @@ void DrillClientImpl::broadcastError(DrillClientError* pErr){
 // The implementation is similar to handleQryError
 status_t DrillClientImpl::handleTerminatedQryState(
         status_t status,
-        std::string msg,
+        const std::string& msg,
         DrillClientQueryResult* pQueryResult){
     assert(pQueryResult!=NULL);
     if(status==QRY_COMPLETED){
@@ -1145,21 +1522,22 @@ status_t DrillClientImpl::handleTerminatedQryState(
     return status;
 }
 
-
-void DrillClientImpl::clearMapEntries(DrillClientQueryResult* pQueryResult){
-    std::map<int, DrillClientQueryResult*>::iterator iter;
+void DrillClientImpl::removeQueryHandle(DrillClientQueryHandle* pQueryHandle){
     boost::lock_guard<boost::mutex> lock(m_dcMutex);
-    if(!m_queryIds.empty()){
-        for(iter=m_queryIds.begin(); iter!=m_queryIds.end(); iter++) {
-            if(pQueryResult==(DrillClientQueryResult*)iter->second){
-                m_queryIds.erase(iter->first);
+    if(!m_queryHandles.empty()){
+        for(std::map<int, DrillClientQueryHandle*>::const_iterator iter=m_queryHandles.begin(); iter!=m_queryHandles.end(); iter++) {
+            if(pQueryHandle==(DrillClientQueryHandle*)iter->second){
+                m_queryHandles.erase(iter->first);
                 break;
             }
         }
     }
+}
+
+void DrillClientImpl::removeQueryResult(DrillClientQueryResult* pQueryResult){
+    boost::lock_guard<boost::mutex> lock(m_dcMutex);
     if(!m_queryResults.empty()){
-        std::map<exec::shared::QueryId*, DrillClientQueryResult*, compareQueryId>::iterator it;
-        for(it=m_queryResults.begin(); it!=m_queryResults.end(); it++) {
+        for(std::map<exec::shared::QueryId*, DrillClientQueryResult*, compareQueryId>::const_iterator it=m_queryResults.begin(); it!=m_queryResults.end(); it++) {
             if(pQueryResult==(DrillClientQueryResult*)it->second){
                 m_queryResults.erase(it->first);
                 break;
@@ -1168,19 +1546,19 @@ void DrillClientImpl::clearMapEntries(DrillClientQueryResult* pQueryResult){
     }
 }
 
-void DrillClientImpl::sendAck(InBoundRpcMessage& msg, bool isOk){
+void DrillClientImpl::sendAck(const rpc::InBoundRpcMessage& msg, bool isOk){
     exec::rpc::Ack ack;
     ack.set_ok(isOk);
-    OutBoundRpcMessage ack_msg(exec::rpc::RESPONSE, exec::user::ACK, msg.m_coord_id, &ack);
+    rpc::OutBoundRpcMessage ack_msg(exec::rpc::RESPONSE, exec::user::ACK, msg.m_coord_id, &ack);
     boost::lock_guard<boost::mutex> lock(m_dcMutex);
     sendSync(ack_msg);
     DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "ACK sent" << std::endl;)
 }
 
-void DrillClientImpl::sendCancel(exec::shared::QueryId* pQueryId){
+void DrillClientImpl::sendCancel(const exec::shared::QueryId* pQueryId){
     boost::lock_guard<boost::mutex> lock(m_dcMutex);
     uint64_t coordId = this->getNextCoordinationId();
-    OutBoundRpcMessage cancel_msg(exec::rpc::REQUEST, exec::user::CANCEL_QUERY, coordId, pQueryId);
+    rpc::OutBoundRpcMessage cancel_msg(exec::rpc::REQUEST, exec::user::CANCEL_QUERY, coordId, pQueryId);
     sendSync(cancel_msg);
     DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "CANCEL sent" << std::endl;)
 }
@@ -1193,6 +1571,14 @@ void DrillClientImpl::shutdownSocket(){
     DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Socket shutdown" << std::endl;)
 }
 
+meta::DrillMetadata* DrillClientImpl::getMetadata() {
+    return new meta::DrillMetadata(*this);
+}
+
+void DrillClientImpl::freeMetadata(meta::DrillMetadata* metadata) {
+    delete metadata;
+}
+
 // This COPIES the FieldMetadata definition for the record batch.  ColumnDefs held by this
 // class are used by the async callbacks.
 status_t DrillClientQueryResult::setupColumnDefs(exec::shared::QueryData* pQueryData) {
@@ -1254,7 +1640,7 @@ status_t DrillClientQueryResult::defaultQueryResultsListener(void* ctx,
     //ctx; // unused, we already have the this pointer
     DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Query result listener called" << std::endl;)
     //check if the query has been canceled. IF so then return FAILURE. Caller will send cancel to the server.
-    if(this->m_bCancel){
+    if(this->isCancelled()){
         if(b!=NULL) delete b;
         return QRY_FAILURE;
     }
@@ -1284,7 +1670,7 @@ RecordBatch*  DrillClientQueryResult::peekNext(){
     //if no more data, return NULL;
     if(!m_bIsQueryPending) return NULL;
     DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Synchronous read waiting for data." << std::endl;)
-    while(!this->m_bHasData && !m_bHasError && m_bIsQueryPending) {
+    while(!this->m_bHasData && !this->hasError() && m_bIsQueryPending) {
         this->m_cv.wait(cvLock);
     }
     // READ but not remove first element from queue
@@ -1305,7 +1691,7 @@ RecordBatch*  DrillClientQueryResult::getNext() {
     }
 
     DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Synchronous read waiting for data." << std::endl;)
-    while(!this->m_bHasData && !m_bHasError && m_bIsQueryPending){
+    while(!this->m_bHasData && !this->hasError() && m_bIsQueryPending){
         this->m_cv.wait(cvLock);
     }
     // remove first element from queue
@@ -1322,33 +1708,60 @@ void DrillClientQueryResult::waitForData() {
     boost::unique_lock<boost::mutex> cvLock(this->m_cvMutex);
     //if no more data, return NULL;
     if(!m_bIsQueryPending) return;
-    while(!this->m_bHasData && !m_bHasError && m_bIsQueryPending) {
+    while(!this->m_bHasData && !this->hasError() && m_bIsQueryPending) {
         this->m_cv.wait(cvLock);
     }
 }
 
-void DrillClientQueryResult::cancel() {
+template<typename Listener, typename Value>
+status_t DrillClientBaseHandle<Listener, Value>::notifyListener(Value v, DrillClientError* pErr){
+	return m_pApplicationListener(getApplicationContext(), v, pErr);
+}
+
+void DrillClientQueryHandle::cancel() {
     this->m_bCancel=true;
 }
 
-void DrillClientQueryResult::signalError(DrillClientError* pErr){
+void DrillClientQueryHandle::signalError(DrillClientError* pErr){
     // Ignore return values from the listener.
     if(pErr!=NULL){
         if(m_pError!=NULL){
             delete m_pError; m_pError=NULL;
         }
         m_pError=pErr;
-        pfnQueryResultsListener pResultsListener=this->m_pResultsListener;
-        if(pResultsListener!=NULL){
-            pResultsListener(this, NULL, pErr);
-        }else{
-            defaultQueryResultsListener(this, NULL, pErr);
-        }
+        // TODO should it be protected by m_cvMutex?
+        m_bHasError=true;
+    }
+    return;
+}
+
+template<typename Listener, typename Value>
+void DrillClientBaseHandle<Listener, Value>::signalError(DrillClientError* pErr){
+    DrillClientQueryHandle::signalError(pErr);
+    // Ignore return values from the listener.
+    if(pErr!=NULL){
+        this->notifyListener(NULL, pErr);
+    }
+}
+
+status_t DrillClientQueryResult::notifyListener(RecordBatch* batch, DrillClientError* pErr) {
+    pfnQueryResultsListener pResultsListener=getApplicationListener();
+    if(pResultsListener!=NULL){
+        return pResultsListener(this, batch, pErr);
+    }else{
+        return defaultQueryResultsListener(this, batch, pErr);
+    }
+}
+
+void DrillClientQueryResult::signalError(DrillClientError* pErr){
+    DrillClientQueryHandle::signalError(pErr);
+    // Ignore return values from the listener.
+    if(pErr!=NULL){
+        this->notifyListener(NULL, pErr);
         {
             boost::lock_guard<boost::mutex> cvLock(this->m_cvMutex);
             m_bIsQueryPending=false;
             m_bHasData=false;
-            m_bHasError=true;
         }
         //Signal the cv in case there is a client waiting for data already.
         m_cv.notify_one();
@@ -1357,24 +1770,27 @@ void DrillClientQueryResult::signalError(DrillClientError* pErr){
 }
 
 void DrillClientQueryResult::signalComplete(){
-    pfnQueryResultsListener pResultsListener=this->m_pResultsListener;
-    if(pResultsListener!=NULL){
-        pResultsListener(this, NULL, NULL);
-    }else{
-        defaultQueryResultsListener(this, NULL, NULL);
-    }
+    this->notifyListener(NULL, NULL);
     {
         boost::lock_guard<boost::mutex> cvLock(this->m_cvMutex);
-        m_bIsQueryPending=false;
         m_bIsQueryPending=!(this->m_recordBatches.empty()&&m_queryState==exec::shared::QueryResult_QueryState_COMPLETED);
-        m_bHasError=false;
+        resetError();
     }
     //Signal the cv in case there is a client waiting for data already.
     m_cv.notify_one();
     return;
 }
 
+void DrillClientQueryHandle::clearAndDestroy(){
+    //Tell the parent to remove this from its lists
+    m_client.removeQueryHandle(this);
+
+    if(m_pError!=NULL){
+        delete m_pError; m_pError=NULL;
+    }
+}
 void DrillClientQueryResult::clearAndDestroy(){
+    DrillClientQueryHandle::clearAndDestroy();
     //free memory allocated for FieldMetadata objects saved in m_columnDefs;
     if(!m_columnDefs->empty()){
         for(std::vector<Drill::FieldMetadata*>::iterator it = m_columnDefs->begin(); it != m_columnDefs->end(); ++it){
@@ -1385,15 +1801,16 @@ void DrillClientQueryResult::clearAndDestroy(){
     if(this->m_pQueryId!=NULL){
         DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Clearing state for Query Id - " << debugPrintQid(*this->m_pQueryId) << std::endl;)
     }
+
     //Tell the parent to remove this from its lists
-    m_pClient->clearMapEntries(this);
+    this->client().removeQueryResult(this);
 
     //clear query id map entries.
     if(this->m_pQueryId!=NULL){
         delete this->m_pQueryId; this->m_pQueryId=NULL;
     }
     if(!m_recordBatches.empty()){
-        // When multiple qwueries execute in parallel we sometimes get an empty record batch back from the server _after_
+        // When multiple queries execute in parallel we sometimes get an empty record batch back from the server _after_
         // the last chunk has been received. We eventually delete it.
         DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Freeing Record batch(es) left behind "<< std::endl;)
         RecordBatch* pR=NULL;
@@ -1403,11 +1820,32 @@ void DrillClientQueryResult::clearAndDestroy(){
             delete pR;
         }
     }
-    if(m_pError!=NULL){
-        delete m_pError; m_pError=NULL;
+}
+
+status_t DrillClientPrepareHandle::setupPreparedStatement(const exec::user::PreparedStatement& pstmt) {
+    // Get columns schema information
+    const ::google::protobuf::RepeatedPtrField< ::exec::user::ResultColumnMetadata>& columns = pstmt.columns();
+    for(::google::protobuf::RepeatedPtrField< ::exec::user::ResultColumnMetadata>::const_iterator it = columns.begin(); it != columns.end(); ++it) {
+        FieldMetadata* metadata = new FieldMetadata;
+        metadata->set(*it);
+        m_columnDefs->push_back(metadata);
     }
+
+    // Copy server handle
+    this->m_preparedStatementHandle.CopyFrom(pstmt.server_handle());
+    return QRY_SUCCESS;
 }
 
+void DrillClientPrepareHandle::clearAndDestroy(){
+    DrillClientQueryHandle::clearAndDestroy();
+    //free memory allocated for FieldMetadata objects saved in m_columnDefs;
+    if(!m_columnDefs->empty()){
+        for(std::vector<Drill::FieldMetadata*>::iterator it = m_columnDefs->begin(); it != m_columnDefs->end(); ++it){
+            delete *it;
+        }
+        m_columnDefs->clear();
+    }
+}
 
 connectionStatus_t PooledDrillClientImpl::connect(const char* connStr){
     connectionStatus_t stat = CONN_SUCCESS;
@@ -1418,9 +1856,9 @@ connectionStatus_t PooledDrillClientImpl::connect(const char* connStr){
     Utils::parseConnectStr(connStr, pathToDrill, protocol, hostPortStr);
     if(!strcmp(protocol.c_str(), "zk")){
         // Get a list of drillbits
-        ZookeeperImpl zook;
+        ZookeeperClient zook(pathToDrill);
         std::vector<std::string> drillbits;
-        int err = zook.getAllDrillbits(hostPortStr.c_str(), pathToDrill.c_str(), drillbits);
+        int err = zook.getAllDrillbits(hostPortStr, drillbits);
         if(!err){
             Utils::shuffle(drillbits);
             // The original shuffled order is maintained if we shuffle first and then add any missing elements
@@ -1432,15 +1870,17 @@ connectionStatus_t PooledDrillClientImpl::connect(const char* connStr){
                 m_lastConnection++;
                 nextIndex = (m_lastConnection)%(getDrillbitCount());
             }
+
             DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Pooled Connection"
                     << "(" << (void*)this << ")"
-                    << ": Current counter is: " 
+                    << ": Current counter is: "
                     << m_lastConnection << std::endl;)
-                err=zook.getEndPoint(m_drillbits, nextIndex, e);
+                err=zook.getEndPoint(m_drillbits[nextIndex], e);
             if(!err){
                 host=boost::lexical_cast<std::string>(e.address());
                 port=boost::lexical_cast<std::string>(e.user_port());
             }
+            DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Choosing drillbit <" << nextIndex  << ">. Selected " << e.DebugString() << std::endl;)
         }
         if(err){
             return handleConnError(CONN_ZOOKEEPER_ERROR, getMessage(ERR_CONN_ZOOKEEPER, zook.getError().c_str()));
@@ -1475,7 +1915,7 @@ connectionStatus_t PooledDrillClientImpl::validateHandshake(DrillUserProperties*
     connectionStatus_t stat=CONN_FAILURE;
     // Keep a copy of the user properties
     if(props!=NULL){
-        m_pUserProperties = new DrillUserProperties;
+        m_pUserProperties = boost::shared_ptr<DrillUserProperties>(new DrillUserProperties);
         for(size_t i=0; i<props->size(); i++){
             m_pUserProperties->setProperty(
                     props->keyAt(i),
@@ -1486,10 +1926,10 @@ connectionStatus_t PooledDrillClientImpl::validateHandshake(DrillUserProperties*
     DrillClientImpl* pDrillClientImpl = getOneConnection();
     if(pDrillClientImpl != NULL){
         DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Validating handshake: (Pooled) " << pDrillClientImpl->m_connectedHost << std::endl;)
-        stat=pDrillClientImpl->validateHandshake(m_pUserProperties);
+        stat = pDrillClientImpl->validateHandshake(m_pUserProperties.get());
     }
     else{
-        stat =  handleConnError(CONN_NOTCONNECTED, getMessage(ERR_CONN_NOCONN));
+        stat = handleConnError(CONN_NOTCONNECTED, getMessage(ERR_CONN_NOCONN));
     }
     return stat;
 }
@@ -1505,16 +1945,52 @@ DrillClientQueryResult* PooledDrillClientImpl::SubmitQuery(::exec::shared::Query
     return pDrillClientQueryResult;
 }
 
-void PooledDrillClientImpl::freeQueryResources(DrillClientQueryResult* pQryResult){
-    // Nothing to do. If this class ever keeps track of executing queries then it will need 
-    // to implement this call to free any query specific resources the pool might have 
+DrillClientPrepareHandle* PooledDrillClientImpl::PrepareQuery(const std::string& plan, pfnPreparedStatementListener listener, void* listenerCtx){
+    DrillClientPrepareHandle* pDrillClientPrepareHandle = NULL;
+    DrillClientImpl* pDrillClientImpl = NULL;
+    pDrillClientImpl = getOneConnection();
+    if(pDrillClientImpl != NULL){
+        pDrillClientPrepareHandle=pDrillClientImpl->PrepareQuery(plan,listener,listenerCtx);
+        m_queriesExecuted++;
+    }
+    return pDrillClientPrepareHandle;
+}
+
+DrillClientQueryResult* PooledDrillClientImpl::ExecuteQuery(const PreparedStatement& pstmt, pfnQueryResultsListener listener, void* listenerCtx){
+    DrillClientQueryResult* pDrillClientQueryResult = NULL;
+    DrillClientImpl* pDrillClientImpl = NULL;
+    pDrillClientImpl = getOneConnection();
+    if(pDrillClientImpl != NULL){
+        pDrillClientQueryResult=pDrillClientImpl->ExecuteQuery(pstmt, listener, listenerCtx);
+        m_queriesExecuted++;
+    }
+    return pDrillClientQueryResult;
+}
+
+void PooledDrillClientImpl::freeQueryResources(DrillClientQueryHandle* pQryHandle){
+    // If this class ever keeps track of executing queries then it will need
+    // to implement this call to free any query specific resources the pool might have
     // allocated
-    return;
+
+    pQryHandle->client().freeQueryResources(pQryHandle);
+}
+
+meta::DrillMetadata* PooledDrillClientImpl::getMetadata() {
+    meta::DrillMetadata* metadata = NULL;
+    DrillClientImpl* pDrillClientImpl = getOneConnection();
+    if (pDrillClientImpl != NULL) {
+        metadata = pDrillClientImpl->getMetadata();
+    }
+    return metadata;
+}
+
+void PooledDrillClientImpl::freeMetadata(meta::DrillMetadata* metadata) {
+    metadata->client().freeMetadata(metadata);
 }
 
 bool PooledDrillClientImpl::Active(){
     boost::lock_guard<boost::mutex> lock(m_poolMutex);
-    for(std::vector<DrillClientImpl*>::iterator it = m_clientConnections.begin(); it != m_clientConnections.end(); ++it){
+    for(std::vector<DrillClientImpl*>::const_iterator it = m_clientConnections.begin(); it != m_clientConnections.end(); ++it){
         if((*it)->Active()){
             return true;
         }
@@ -1529,7 +2005,7 @@ void PooledDrillClientImpl::Close() {
         delete *it;
     }
     m_clientConnections.clear();
-    if(m_pUserProperties!=NULL){ delete m_pUserProperties; m_pUserProperties=NULL;}
+    m_pUserProperties.reset();
     if(m_pError!=NULL){ delete m_pError; m_pError=NULL;}
     m_lastConnection=-1;
     m_queriesExecuted=0;
@@ -1592,7 +2068,7 @@ DrillClientImpl* PooledDrillClientImpl::getOneConnection(){
                 if((ret=connect(m_connectStr.c_str()))==CONN_SUCCESS){
                     boost::lock_guard<boost::mutex> lock(m_poolMutex);
                     pDrillClientImpl=m_clientConnections.back();
-                    ret=pDrillClientImpl->validateHandshake(m_pUserProperties);
+                    ret=pDrillClientImpl->validateHandshake(m_pUserProperties.get());
                     if(ret!=CONN_SUCCESS){
                         delete pDrillClientImpl; pDrillClientImpl=NULL;
                         m_clientConnections.erase(m_clientConnections.end());
@@ -1602,251 +2078,14 @@ DrillClientImpl* PooledDrillClientImpl::getOneConnection(){
             if(ret!=CONN_SUCCESS){
                 break;
             }
-        } // need a new connection 
+        } // need a new connection
     }// while
 
     if(pDrillClientImpl==NULL){
         connectionStatus_t status = CONN_NOTCONNECTED;
-        handleConnError(status, getMessage(status));
+        handleConnError(status, getMessage(ERR_CONN_NOCONN));
     }
     return pDrillClientImpl;
 }
 
-char ZookeeperImpl::s_drillRoot[]="/drill/";
-char ZookeeperImpl::s_defaultCluster[]="drillbits1";
-
-ZookeeperImpl::ZookeeperImpl(){
-    m_pDrillbits=new String_vector;
-    m_bConnecting=true;
-    memset(&m_id, 0, sizeof(m_id));
-}
-
-ZookeeperImpl::~ZookeeperImpl(){
-    delete m_pDrillbits;
-}
-
-ZooLogLevel ZookeeperImpl::getZkLogLevel(){
-    //typedef enum {ZOO_LOG_LEVEL_ERROR=1,
-    //    ZOO_LOG_LEVEL_WARN=2,
-    //    ZOO_LOG_LEVEL_INFO=3,
-    //    ZOO_LOG_LEVEL_DEBUG=4
-    //} ZooLogLevel;
-    switch(DrillClientConfig::getLogLevel()){
-        case LOG_TRACE:
-        case LOG_DEBUG:
-            return ZOO_LOG_LEVEL_DEBUG;
-        case LOG_INFO:
-            return ZOO_LOG_LEVEL_INFO;
-        case LOG_WARNING:
-            return ZOO_LOG_LEVEL_WARN;
-        case LOG_ERROR:
-        case LOG_FATAL:
-        default:
-            return ZOO_LOG_LEVEL_ERROR;
-    }
-    return ZOO_LOG_LEVEL_ERROR;
-}
-
-int ZookeeperImpl::getAllDrillbits(const char* connectStr, const char* pathToDrill, std::vector<std::string>& drillbits){
-    uint32_t waitTime=30000; // 10 seconds
-    zoo_set_debug_level(getZkLogLevel());
-    zoo_deterministic_conn_order(1); // enable deterministic order
-    struct String_vector* pDrillbits=NULL;
-    m_zh = zookeeper_init(connectStr, watcher, waitTime, 0, this, 0);
-    if(!m_zh) {
-        m_err = getMessage(ERR_CONN_ZKFAIL);
-        zookeeper_close(m_zh);
-        return -1;
-    }else{
-        m_err="";
-        //Wait for the completion handler to signal successful connection
-        boost::unique_lock<boost::mutex> bufferLock(this->m_cvMutex);
-        boost::system_time const timeout=boost::get_system_time()+ boost::posix_time::milliseconds(waitTime);
-        while(this->m_bConnecting) {
-            if(!this->m_cv.timed_wait(bufferLock, timeout)){
-                m_err = getMessage(ERR_CONN_ZKTIMOUT);
-                zookeeper_close(m_zh);
-                return -1;
-            }
-        }
-    }
-    if(m_state!=ZOO_CONNECTED_STATE){
-        zookeeper_close(m_zh);
-        return -1;
-    }
-    int rc = ZOK;
-    if(pathToDrill==NULL || strlen(pathToDrill)==0){
-        m_rootDir=s_drillRoot;
-        m_rootDir += s_defaultCluster;
-    }else{
-        m_rootDir=pathToDrill;
-    }
-
-    pDrillbits = new String_vector;
-    rc=zoo_get_children(m_zh, m_rootDir.c_str(), 0, pDrillbits);
-    if(rc!=ZOK){
-        delete pDrillbits;
-        m_err=getMessage(ERR_CONN_ZKERR, rc);
-        zookeeper_close(m_zh);
-        return -1;
-    }
-    if(pDrillbits && pDrillbits->count > 0){
-        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Found " << pDrillbits->count << " drillbits in cluster (" 
-                << connectStr << "/" << pathToDrill
-                << ")." <<std::endl;)
-            for(int i=0; i<pDrillbits->count; i++){
-                drillbits.push_back(pDrillbits->data[i]);
-            }
-        for(int i=0; i<drillbits.size(); i++){
-            DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "\t Unshuffled Drillbit id: " << drillbits[i] << std::endl;)
-        }
-    }
-    delete pDrillbits;
-    return 0;
-}
-
-int ZookeeperImpl::getEndPoint(std::vector<std::string>& drillbits, size_t index, exec::DrillbitEndpoint& endpoint){
-    int rc = ZOK;
-    exec::DrillServiceInstance drillServiceInstance;
-    if( drillbits.size() >0){
-        // pick the drillbit at 'index'
-        const char * bit=drillbits[index].c_str();
-        std::string s;
-        s=m_rootDir +  std::string("/") + bit;
-        int buffer_len=MAX_CONNECT_STR;
-        char buffer[MAX_CONNECT_STR+1];
-        struct Stat stat;
-        buffer[MAX_CONNECT_STR]=0;
-        rc= zoo_get(m_zh, s.c_str(), 0, buffer,  &buffer_len, &stat);
-        if(rc!=ZOK){
-            m_err=getMessage(ERR_CONN_ZKDBITERR, rc);
-            zookeeper_close(m_zh);
-            return -1;
-        }
-        exec::DrillServiceInstance drillServiceInstance;
-        drillServiceInstance.ParseFromArray(buffer, buffer_len);
-        endpoint=drillServiceInstance.endpoint();
-        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Choosing drillbit <" <<index << ">. Selected " << drillServiceInstance.DebugString() << std::endl;)
-    }else{
-
-        m_err=getMessage(ERR_CONN_ZKNODBIT);
-        zookeeper_close(m_zh);
-        return -1;
-    }
-    return 0;
-}
-
-// Deprecated
-int ZookeeperImpl::connectToZookeeper(const char* connectStr, const char* pathToDrill){
-    uint32_t waitTime=30000; // 10 seconds
-    zoo_set_debug_level(getZkLogLevel());
-    zoo_deterministic_conn_order(1); // enable deterministic order
-    m_zh = zookeeper_init(connectStr, watcher, waitTime, 0, this, 0);
-    if(!m_zh) {
-        m_err = getMessage(ERR_CONN_ZKFAIL);
-        return CONN_FAILURE;
-    }else{
-        m_err="";
-        //Wait for the completion handler to signal successful connection
-        boost::unique_lock<boost::mutex> bufferLock(this->m_cvMutex);
-        boost::system_time const timeout=boost::get_system_time()+ boost::posix_time::milliseconds(waitTime);
-        while(this->m_bConnecting) {
-            if(!this->m_cv.timed_wait(bufferLock, timeout)){
-                m_err = getMessage(ERR_CONN_ZKTIMOUT);
-                return CONN_FAILURE;
-            }
-        }
-    }
-    if(m_state!=ZOO_CONNECTED_STATE){
-        return CONN_FAILURE;
-    }
-    int rc = ZOK;
-    char rootDir[MAX_CONNECT_STR+1];
-    if(pathToDrill==NULL || strlen(pathToDrill)==0){
-        strcpy(rootDir, (char*)s_drillRoot);
-        strcat(rootDir, s_defaultCluster);
-    }else{
-        strncpy(rootDir, pathToDrill, MAX_CONNECT_STR); rootDir[MAX_CONNECT_STR]=0;
-    }
-    rc=zoo_get_children(m_zh, (char*)rootDir, 0, m_pDrillbits);
-    if(rc!=ZOK){
-        m_err=getMessage(ERR_CONN_ZKERR, rc);
-        zookeeper_close(m_zh);
-        return -1;
-    }
-
-    //Let's pick a random drillbit.
-    if(m_pDrillbits && m_pDrillbits->count >0){
-
-        std::vector<std::string> randomDrillbits;
-        for(int i=0; i<m_pDrillbits->count; i++){
-            randomDrillbits.push_back(m_pDrillbits->data[i]);
-        }
-        //Use the same random shuffle as the Java client instead of picking a drillbit at random.
-        //Gives much better randomization when the size of the cluster is small.
-        std::random_shuffle(randomDrillbits.begin(), randomDrillbits.end());
-        const char * bit=randomDrillbits[0].c_str();
-        std::string s;
-
-        s=rootDir +  std::string("/") + bit;
-        int buffer_len=MAX_CONNECT_STR;
-        char buffer[MAX_CONNECT_STR+1];
-        struct Stat stat;
-        buffer[MAX_CONNECT_STR]=0;
-        rc= zoo_get(m_zh, s.c_str(), 0, buffer,  &buffer_len, &stat);
-        if(rc!=ZOK){
-            m_err=getMessage(ERR_CONN_ZKDBITERR, rc);
-            zookeeper_close(m_zh);
-            return -1;
-        }
-        m_drillServiceInstance.ParseFromArray(buffer, buffer_len);
-    }else{
-        m_err=getMessage(ERR_CONN_ZKNODBIT);
-        zookeeper_close(m_zh);
-        return -1;
-    }
-    return 0;
-}
-
-void ZookeeperImpl::close(){
-    zookeeper_close(m_zh);
-}
-
-void ZookeeperImpl::watcher(zhandle_t *zzh, int type, int state, const char *path, void* context) {
-    //From cli.c
-
-    /* Be careful using zh here rather than zzh - as this may be mt code
-     * the client lib may call the watcher before zookeeper_init returns */
-
-    ZookeeperImpl* self=(ZookeeperImpl*)context;
-    self->m_state=state;
-    if (type == ZOO_SESSION_EVENT) {
-        if (state == ZOO_CONNECTED_STATE) {
-        } else if (state == ZOO_AUTH_FAILED_STATE) {
-            self->m_err= getMessage(ERR_CONN_ZKNOAUTH);
-            zookeeper_close(zzh);
-            self->m_zh=0;
-        } else if (state == ZOO_EXPIRED_SESSION_STATE) {
-            self->m_err= getMessage(ERR_CONN_ZKEXP);
-            zookeeper_close(zzh);
-            self->m_zh=0;
-        }
-    }
-    // signal the cond var
-    {
-        if (state == ZOO_CONNECTED_STATE){
-            DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Connected to Zookeeper." << std::endl;)
-        }
-        boost::lock_guard<boost::mutex> bufferLock(self->m_cvMutex);
-        self->m_bConnecting=false;
-    }
-    self->m_cv.notify_one();
-}
-
-void ZookeeperImpl:: debugPrint(){
-    if(m_zh!=NULL && m_state==ZOO_CONNECTED_STATE){
-        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << m_drillServiceInstance.DebugStrin

<TRUNCATED>