You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pa...@apache.org on 2016/11/01 20:29:59 UTC

[14/15] drill git commit: DRILL-4420: C++ API for metadata access and prepared statements

DRILL-4420: C++ API for metadata access and prepared statements

Add support to the C++ client for metadata querying and prepared
statement requests.

Part of the metadata API, add methods to query for server capabilities.
As of now, this interface is not backed up by any RPC exchange so
the information is pretty much static, and match Drill 1.8.0
current capabilities.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/166c4ce7
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/166c4ce7
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/166c4ce7

Branch: refs/heads/master
Commit: 166c4ce7600b5571249a6748dd57383479313e2e
Parents: 3a35a42
Author: Laurent Goujon <la...@dremio.com>
Authored: Mon Aug 15 14:17:35 2016 -0700
Committer: Parth Chandra <pa...@apache.org>
Committed: Tue Nov 1 11:29:58 2016 -0700

----------------------------------------------------------------------
 contrib/native/client/CMakeLists.txt            |    9 +-
 .../native/client/example/querySubmitter.cpp    |   24 +-
 .../native/client/src/clientlib/CMakeLists.txt  |    8 +-
 .../client/src/clientlib/collectionsImpl.hpp    |  123 ++
 .../native/client/src/clientlib/drillClient.cpp |  237 ++--
 .../client/src/clientlib/drillClientImpl.cpp    | 1315 +++++++++++-------
 .../client/src/clientlib/drillClientImpl.hpp    |  392 ++++--
 contrib/native/client/src/clientlib/env.h.in    |    9 +
 contrib/native/client/src/clientlib/errmsgs.cpp |    2 +-
 .../native/client/src/clientlib/fieldmeta.cpp   |  406 ++++++
 .../native/client/src/clientlib/metadata.cpp    |  748 ++++++++++
 .../native/client/src/clientlib/metadata.hpp    |  288 ++++
 .../native/client/src/clientlib/recordBatch.cpp |   12 +-
 .../native/client/src/clientlib/rpcDecoder.cpp  |  153 --
 .../native/client/src/clientlib/rpcDecoder.hpp  |   38 -
 .../native/client/src/clientlib/rpcEncoder.cpp  |  109 --
 .../native/client/src/clientlib/rpcEncoder.hpp  |   55 -
 .../native/client/src/clientlib/rpcMessage.cpp  |  241 ++++
 .../native/client/src/clientlib/rpcMessage.hpp  |   15 +-
 contrib/native/client/src/clientlib/utils.cpp   |    7 +
 contrib/native/client/src/clientlib/utils.hpp   |    1 -
 .../native/client/src/clientlib/y2038/time64.c  |   18 +-
 .../client/src/clientlib/zookeeperClient.cpp    |  168 +++
 .../client/src/clientlib/zookeeperClient.hpp    |   71 +
 .../client/src/include/drill/collections.hpp    |  179 +++
 .../native/client/src/include/drill/common.hpp  |   18 +
 .../client/src/include/drill/drillClient.hpp    |  998 ++++++++++++-
 .../native/client/src/include/drill/drillc.hpp  |    2 +
 .../client/src/include/drill/fieldmeta.hpp      |  122 ++
 .../src/include/drill/preparedStatement.hpp     |   38 +
 .../client/src/include/drill/recordBatch.hpp    |   35 +-
 contrib/native/client/src/test/CMakeLists.txt   |    1 +
 .../native/client/src/test/CollectionsTest.cpp  |  215 +++
 protocol/src/main/protobuf/User.proto           |    2 +-
 34 files changed, 4819 insertions(+), 1240 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/166c4ce7/contrib/native/client/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/contrib/native/client/CMakeLists.txt b/contrib/native/client/CMakeLists.txt
index e61eb9c..7e22ce8 100644
--- a/contrib/native/client/CMakeLists.txt
+++ b/contrib/native/client/CMakeLists.txt
@@ -57,10 +57,9 @@ project(drillclient
         )
 
 message("Project Dir = ${PROJECT_SOURCE_DIR}")
-message("Project version = ${PROJECT_VERSION} ")
+message("Project Version = ${PROJECT_VERSION} ")
 message("Source Dir = ${CMAKE_SOURCE_DIR} ")
 
-cmake_policy(SET CMP0043 NEW)
 
 set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_SOURCE_DIR}/cmakeModules/")
 
@@ -71,6 +70,12 @@ execute_process(
     OUTPUT_VARIABLE GIT_COMMIT_PROP
     OUTPUT_STRIP_TRAILING_WHITESPACE
     )
+execute_process(
+    COMMAND git log -1 --format="%H"
+    WORKING_DIRECTORY ${CMAKE_SOURCE_DIR}
+    OUTPUT_VARIABLE GIT_SHA_PROP
+    OUTPUT_STRIP_TRAILING_WHITESPACE
+    )
 STRING(REPLACE . " " GIT_COMMIT_PROP "${GIT_COMMIT_PROP}")
 STRING(REPLACE \" "" GIT_COMMIT_PROP "${GIT_COMMIT_PROP}")
 set(GIT_COMMIT_PROP "\"${GIT_COMMIT_PROP}\"")

http://git-wip-us.apache.org/repos/asf/drill/blob/166c4ce7/contrib/native/client/example/querySubmitter.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/example/querySubmitter.cpp b/contrib/native/client/example/querySubmitter.cpp
index d507d1b..306db56 100644
--- a/contrib/native/client/example/querySubmitter.cpp
+++ b/contrib/native/client/example/querySubmitter.cpp
@@ -316,8 +316,8 @@ int main(int argc, char* argv[]) {
         std::vector<Drill::RecordIterator*> recordIterators;
         std::vector<Drill::RecordIterator*>::iterator recordIterIter;
 
-        std::vector<Drill::QueryHandle_t*> queryHandles;
-        std::vector<Drill::QueryHandle_t*>::iterator queryHandleIter;
+        std::vector<Drill::QueryHandle_t> queryHandles;
+        std::vector<Drill::QueryHandle_t>::iterator queryHandleIter;
 
         Drill::DrillClient client;
 #if defined _WIN32 || defined _WIN64
@@ -327,7 +327,7 @@ int main(int argc, char* argv[]) {
 		strcpy(logpathPrefix,tempPath);
 		strcat(logpathPrefix, "\\drillclient");
 #else
-		char* logpathPrefix = "/var/log/drill/drillclient";
+		const char* logpathPrefix = "/var/log/drill/drillclient";
 #endif
 		// To log to file
         Drill::DrillClient::initLogging(logpathPrefix, l);
@@ -411,27 +411,25 @@ int main(int argc, char* argv[]) {
         }else{
             if(bSyncSend){
                 for(queryInpIter = queryInputs.begin(); queryInpIter != queryInputs.end(); queryInpIter++) {
-                    Drill::QueryHandle_t* qryHandle = new Drill::QueryHandle_t;
-                    client.submitQuery(type, *queryInpIter, QueryResultsListener, NULL, qryHandle);
-                    client.registerSchemaChangeListener(qryHandle, SchemaListener);
+                    Drill::QueryHandle_t qryHandle;
+                    client.submitQuery(type, *queryInpIter, QueryResultsListener, NULL, &qryHandle);
+                    client.registerSchemaChangeListener(&qryHandle, SchemaListener);
                     
                     client.waitForResults();
 
-                    client.freeQueryResources(qryHandle);
-                    delete qryHandle;
+                    client.freeQueryResources(&qryHandle);
                 }
 
             }else{
                 for(queryInpIter = queryInputs.begin(); queryInpIter != queryInputs.end(); queryInpIter++) {
-                    Drill::QueryHandle_t* qryHandle = new Drill::QueryHandle_t;
-                    client.submitQuery(type, *queryInpIter, QueryResultsListener, NULL, qryHandle);
-                    client.registerSchemaChangeListener(qryHandle, SchemaListener);
+                    Drill::QueryHandle_t qryHandle;
+                    client.submitQuery(type, *queryInpIter, QueryResultsListener, NULL, &qryHandle);
+                    client.registerSchemaChangeListener(&qryHandle, SchemaListener);
                     queryHandles.push_back(qryHandle);
                 }
                 client.waitForResults();
                 for(queryHandleIter = queryHandles.begin(); queryHandleIter != queryHandles.end(); queryHandleIter++) {
-                    client.freeQueryResources(*queryHandleIter);
-                    delete *queryHandleIter;
+                    client.freeQueryResources(&*queryHandleIter);
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/drill/blob/166c4ce7/contrib/native/client/src/clientlib/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/CMakeLists.txt b/contrib/native/client/src/clientlib/CMakeLists.txt
index a2e7052..68326e2 100644
--- a/contrib/native/client/src/clientlib/CMakeLists.txt
+++ b/contrib/native/client/src/clientlib/CMakeLists.txt
@@ -22,12 +22,14 @@ set (CLIENTLIB_SRC_FILES
     ${CMAKE_CURRENT_SOURCE_DIR}/decimalUtils.cpp
     ${CMAKE_CURRENT_SOURCE_DIR}/drillClient.cpp
     ${CMAKE_CURRENT_SOURCE_DIR}/drillClientImpl.cpp
+    ${CMAKE_CURRENT_SOURCE_DIR}/fieldmeta.cpp
+    ${CMAKE_CURRENT_SOURCE_DIR}/metadata.cpp
     ${CMAKE_CURRENT_SOURCE_DIR}/recordBatch.cpp
-    ${CMAKE_CURRENT_SOURCE_DIR}/rpcEncoder.cpp
-    ${CMAKE_CURRENT_SOURCE_DIR}/rpcDecoder.cpp
+    ${CMAKE_CURRENT_SOURCE_DIR}/rpcMessage.cpp
     ${CMAKE_CURRENT_SOURCE_DIR}/errmsgs.cpp
     ${CMAKE_CURRENT_SOURCE_DIR}/logger.cpp
     ${CMAKE_CURRENT_SOURCE_DIR}/utils.cpp
+    ${CMAKE_CURRENT_SOURCE_DIR}/zookeeperClient.cpp
     )
 
 include_directories(${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_SOURCE_DIR}/../include )
@@ -43,7 +45,7 @@ set_property(
 
 if(MSVC)
     set(CMAKE_CXX_FLAGS "/EHsc")
-    add_definitions(-DDRILL_CLIENT_EXPORTS)
+    add_definitions(-DDRILL_CLIENT_EXPORTS -D_SCL_SECURE_NO_WARNINGS)
 endif()
 
 add_library(drillClient SHARED ${CLIENTLIB_SRC_FILES} )

http://git-wip-us.apache.org/repos/asf/drill/blob/166c4ce7/contrib/native/client/src/clientlib/collectionsImpl.hpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/collectionsImpl.hpp b/contrib/native/client/src/clientlib/collectionsImpl.hpp
new file mode 100644
index 0000000..be1b54f
--- /dev/null
+++ b/contrib/native/client/src/clientlib/collectionsImpl.hpp
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef DRILL_COLLECTIONSIMPL_H
+#define DRILL_COLLECTIONSIMPL_H
+
+#include <iterator>
+#include <drill/collections.hpp>
+
+namespace Drill {
+namespace impl {
+template<typename T, typename Iterator>
+class DrillContainerIterator: public DrillIteratorImpl<T> {
+public:
+	typedef DrillContainerIterator<T, Iterator> type;
+	typedef DrillIteratorImpl<T> supertype;
+	typedef typename supertype::iterator iterator;
+	typedef typename iterator::value_type value_type;
+	typedef typename iterator::reference reference;
+	typedef typename iterator::pointer pointer;
+
+	DrillContainerIterator(Iterator it): supertype(), m_it(it) {};
+
+	operator typename DrillIteratorImpl<const T>::iterator_ptr() const { return typename DrillIteratorImpl<const T>::iterator_ptr(new DrillContainerIterator<const T, Iterator>(m_it)); }
+
+	reference operator*() const { return m_it.operator *();}
+	pointer   operator->() const { return m_it.operator->(); }
+
+	iterator& operator++() { m_it++; return *this; }
+
+	bool operator==(const iterator& x) const {
+		const type& other(dynamic_cast<const type&>(x));
+		return m_it == other.m_it;
+	}
+
+	bool operator!=(const iterator& x) const { return !(*this==x); }
+
+private:
+	Iterator m_it;
+};
+
+template<typename T, typename Container>
+class DrillContainerCollection: public DrillCollectionImpl<T> {
+public:
+	typedef DrillCollectionImpl<T> supertype;
+	typedef typename supertype::value_type value_type;
+	typedef typename supertype::iterator iterator;
+	typedef typename supertype::const_iterator const_iterator;
+
+	typedef typename supertype::iterator_ptr iterator_ptr;
+	typedef typename supertype::const_iterator_ptr const_iterator_ptr;
+
+	DrillContainerCollection(): supertype(), m_container() {};
+
+	Container& operator*() { return &m_container; }
+	const Container& operator*() const { return &m_container; }
+	Container* operator->() { return &m_container; }
+	const Container* operator->() const { return &m_container; }
+
+	iterator_ptr begin() { return iterator_ptr(new IteratorImpl(m_container.begin())); }
+	const_iterator_ptr begin() const { return const_iterator_ptr(new ConstIteratorImpl(m_container.begin())); }
+	iterator_ptr end() { return iterator_ptr(new IteratorImpl(m_container.end())); }
+	const_iterator_ptr end() const { return const_iterator_ptr(new ConstIteratorImpl(m_container.end())); }
+
+private:
+	typedef DrillContainerIterator<value_type, typename Container::iterator> IteratorImpl;
+	typedef DrillContainerIterator<const value_type, typename Container::const_iterator> ConstIteratorImpl;
+
+	Container m_container;
+};
+} /* namespace impl */
+
+
+/**
+ * Drill collection backed up by a vector
+ * Offer a view over a collection of Iface instances,
+ * where concrete implementation of Iface is T
+ */
+template<typename Iface, typename T>
+class DrillVector: public DrillCollection<Iface> {
+public:
+	DrillVector(): DrillCollection<Iface>(ImplPtr(new Impl())) {};
+
+	void clear() {
+		Impl& impl = static_cast<Impl&>(**this);
+		impl->clear();
+	}
+
+	void push_back( const T& value ) {
+		Impl& impl = static_cast<Impl&>(**this);
+		impl->push_back(value);
+	}
+
+	void reserve(std::size_t new_cap) {
+		Impl& impl = static_cast<Impl&>(**this);
+		impl->reserve(new_cap);
+	}
+
+
+private:
+	typedef impl::DrillContainerCollection<Iface, std::vector<T> > Impl;
+	typedef boost::shared_ptr<Impl> ImplPtr;
+};
+}
+
+
+
+#endif /* DRILL_COLLECTIONSIMPL_H */

http://git-wip-us.apache.org/repos/asf/drill/blob/166c4ce7/contrib/native/client/src/clientlib/drillClient.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/drillClient.cpp b/contrib/native/client/src/clientlib/drillClient.cpp
index 1251058..20a466e 100644
--- a/contrib/native/client/src/clientlib/drillClient.cpp
+++ b/contrib/native/client/src/clientlib/drillClient.cpp
@@ -20,11 +20,11 @@
 #include <boost/assign.hpp>
 #include "drill/common.hpp"
 #include "drill/drillClient.hpp"
+#include "drill/fieldmeta.hpp"
 #include "drill/recordBatch.hpp"
 #include "drillClientImpl.hpp"
 #include "errmsgs.hpp"
 #include "logger.hpp"
-
 #include "Types.pb.h"
 
 namespace Drill{
@@ -173,83 +173,86 @@ FieldDefPtr RecordIterator::getColDefs(){
     if(m_pQueryResult->hasError()){
         return DrillClientQueryResult::s_emptyColDefs;
     }
+
+    if (this->m_pColDefs != NULL && !this->hasSchemaChanged()) {
+    	return this->m_pColDefs;
+    }
+
     //NOTE: if query is cancelled, return whatever you have. Client applications job to deal with it.
-    if(this->m_pColDefs==NULL || this->hasSchemaChanged()){
-        if(this->m_pCurrentRecordBatch==NULL){
-            this->m_pQueryResult->waitForData();
-            if(m_pQueryResult->hasError()){
-                return DrillClientQueryResult::s_emptyColDefs;
-            }
-        }
-        if(this->hasSchemaChanged()){
-            if(m_pColDefs!=NULL){
-                for(std::vector<Drill::FieldMetadata*>::iterator it=m_pColDefs->begin();
-                        it!=m_pColDefs->end();
-                        ++it){
-                    delete *it;
-                }
-                m_pColDefs->clear();
-                //delete m_pColDefs; m_pColDefs=NULL;
-            }
-        }
-        FieldDefPtr pColDefs(  new std::vector<Drill::FieldMetadata*>);
-        {   //lock after we come out of the  wait.
-            boost::lock_guard<boost::mutex> bufferLock(this->m_recordBatchMutex);
-            boost::shared_ptr< std::vector<Drill::FieldMetadata*> >  currentColDefs=DrillClientQueryResult::s_emptyColDefs;
-            if(this->m_pCurrentRecordBatch!=NULL){
-                currentColDefs=this->m_pCurrentRecordBatch->getColumnDefs();
-            }else{
-                // This is reached only when the first results have been received but
-                // the getNext call has not been made to retrieve the record batch
-                RecordBatch* pR=this->m_pQueryResult->peekNext();
-                if(pR!=NULL){
-                    currentColDefs=pR->getColumnDefs();
-                }
-            }
-            for(std::vector<Drill::FieldMetadata*>::iterator it=currentColDefs->begin(); it!=currentColDefs->end(); ++it){
-                Drill::FieldMetadata* fmd= new Drill::FieldMetadata;
-                fmd->copy(*(*it));//Yup, that's 2 stars
-                pColDefs->push_back(fmd);
-            }
-        }
-        this->m_pColDefs = pColDefs;
+    if(this->m_pCurrentRecordBatch==NULL){
+    	this->m_pQueryResult->waitForData();
+    	if(m_pQueryResult->hasError()){
+    		return DrillClientQueryResult::s_emptyColDefs;
+    	}
+    }
+    if(this->hasSchemaChanged()){
+    	if(m_pColDefs!=NULL){
+    		for(std::vector<Drill::FieldMetadata*>::iterator it=m_pColDefs->begin();
+    				it!=m_pColDefs->end();
+    				++it){
+    			delete *it;
+    		}
+    		m_pColDefs->clear();
+    		//delete m_pColDefs; m_pColDefs=NULL;
+    	}
+    }
+    FieldDefPtr pColDefs(  new std::vector<Drill::FieldMetadata*>);
+    {   //lock after we come out of the  wait.
+    	boost::lock_guard<boost::mutex> bufferLock(this->m_recordBatchMutex);
+    	boost::shared_ptr< std::vector<Drill::FieldMetadata*> >  currentColDefs=DrillClientQueryResult::s_emptyColDefs;
+    	if(this->m_pCurrentRecordBatch!=NULL){
+    		currentColDefs=this->m_pCurrentRecordBatch->getColumnDefs();
+    	}else{
+    		// This is reached only when the first results have been received but
+    		// the getNext call has not been made to retrieve the record batch
+    		RecordBatch* pR=this->m_pQueryResult->peekNext();
+    		if(pR!=NULL){
+    			currentColDefs=pR->getColumnDefs();
+    		}
+    	}
+    	for(std::vector<Drill::FieldMetadata*>::const_iterator it=currentColDefs->begin(); it!=currentColDefs->end(); ++it){
+    		Drill::FieldMetadata* fmd= new Drill::FieldMetadata;
+    		fmd->copy(*(*it));//Yup, that's 2 stars
+    		pColDefs->push_back(fmd);
+    	}
     }
+    this->m_pColDefs = pColDefs;
     return this->m_pColDefs;
 }
 
 status_t RecordIterator::next(){
     status_t ret=QRY_SUCCESS;
     this->m_currentRecord++;
+    if(this->m_pQueryResult->isCancelled()){
+    	return QRY_CANCEL;
+    }
 
-    if(!this->m_pQueryResult->isCancelled()){
-        if(this->m_pCurrentRecordBatch==NULL || this->m_currentRecord==this->m_pCurrentRecordBatch->getNumRecords()){
-            boost::lock_guard<boost::mutex> bufferLock(this->m_recordBatchMutex);
-            if(this->m_pCurrentRecordBatch !=NULL){
-                DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Deleted old Record batch " << (void*) m_pCurrentRecordBatch << std::endl;)
-                delete this->m_pCurrentRecordBatch; //free the previous record batch
-                this->m_pCurrentRecordBatch=NULL;
-            }
-            this->m_currentRecord=0;
-            this->m_pQueryResult->waitForData();
-            if(m_pQueryResult->hasError()){
-                return m_pQueryResult->getErrorStatus();
-            }
-            this->m_pCurrentRecordBatch=this->m_pQueryResult->getNext();
-            if(this->m_pCurrentRecordBatch != NULL){
-                DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Fetched new Record batch " << std::endl;)
-            }else{
-                DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "No new Record batch found " << std::endl;)
-            }
-            if(this->m_pCurrentRecordBatch==NULL || this->m_pCurrentRecordBatch->getNumRecords()==0){
-                DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "No more data." << std::endl;)
-                ret = QRY_NO_MORE_DATA;
-            }else if(this->m_pCurrentRecordBatch->hasSchemaChanged()){
-                ret=QRY_SUCCESS_WITH_INFO;
-            }
-        }
-    }else{
-        ret=QRY_CANCEL;
+    if(this->m_pCurrentRecordBatch==NULL || this->m_currentRecord==this->m_pCurrentRecordBatch->getNumRecords()){
+    	boost::lock_guard<boost::mutex> bufferLock(this->m_recordBatchMutex);
+    	if(this->m_pCurrentRecordBatch !=NULL){
+    		DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Deleted old Record batch " << (void*) m_pCurrentRecordBatch << std::endl;)
+                		delete this->m_pCurrentRecordBatch; //free the previous record batch
+    		this->m_pCurrentRecordBatch=NULL;
+    	}
+    	this->m_currentRecord=0;
+    	this->m_pQueryResult->waitForData();
+    	if(m_pQueryResult->hasError()){
+    		return m_pQueryResult->getErrorStatus();
+    	}
+    	this->m_pCurrentRecordBatch=this->m_pQueryResult->getNext();
+    	if(this->m_pCurrentRecordBatch != NULL){
+    		DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Fetched new Record batch " << std::endl;)
+    	}else{
+    		DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "No new Record batch found " << std::endl;)
+    	}
+    	if(this->m_pCurrentRecordBatch==NULL || this->m_pCurrentRecordBatch->getNumRecords()==0){
+    		DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "No more data." << std::endl;)
+                		ret = QRY_NO_MORE_DATA;
+    	}else if(this->m_pCurrentRecordBatch->hasSchemaChanged()){
+    		ret=QRY_SUCCESS_WITH_INFO;
+    	}
     }
+
     return ret;
 }
 
@@ -258,30 +261,28 @@ status_t RecordIterator::getCol(size_t i, void** b, size_t* sz){
     //TODO: check fields out of bounds without calling getColDefs
     //if(i>=getColDefs().size()) return QRY_OUT_OF_BOUNDS;
     //return raw byte buffer
-    if(!this->m_pQueryResult->isCancelled()){
-        const ValueVectorBase* pVector=this->m_pCurrentRecordBatch->getFields()[i]->getVector();
-        if(!pVector->isNull(this->m_currentRecord)){
-            *b=pVector->getRaw(this->m_currentRecord);
-            *sz=pVector->getSize(this->m_currentRecord);
-        }else{
-            *b=NULL;
-            *sz=0;
-
-        }
-        return QRY_SUCCESS;
+    if(this->m_pQueryResult->isCancelled()){
+    	return QRY_CANCEL;
+    }
+    const ValueVectorBase* pVector=this->m_pCurrentRecordBatch->getFields()[i]->getVector();
+    if(!pVector->isNull(this->m_currentRecord)){
+    	*b=pVector->getRaw(this->m_currentRecord);
+    	*sz=pVector->getSize(this->m_currentRecord);
     }else{
-        return QRY_CANCEL;
+    	*b=NULL;
+    	*sz=0;
     }
+    return QRY_SUCCESS;
 }
 
 /* true if ith column in the current record is NULL. */
 bool RecordIterator::isNull(size_t i){
-    if(!this->m_pQueryResult->isCancelled()){
-        const ValueVectorBase* pVector=this->m_pCurrentRecordBatch->getFields()[i]->getVector();
-        return pVector->isNull(this->m_currentRecord);
-    }else{
-        return false;
+    if(this->m_pQueryResult->isCancelled()){
+    	return false;
     }
+
+    const ValueVectorBase* pVector=this->m_pCurrentRecordBatch->getFields()[i]->getVector();
+    return pVector->isNull(this->m_currentRecord);
 }
 
 status_t RecordIterator::cancel(){
@@ -329,19 +330,15 @@ DrillClient::~DrillClient(){
 }
 
 connectionStatus_t DrillClient::connect(const char* connectStr, const char* defaultSchema){
-    connectionStatus_t ret=CONN_SUCCESS;
-    ret=this->m_pImpl->connect(connectStr);
     DrillUserProperties props;
     std::string schema(defaultSchema);
     props.setProperty(USERPROP_SCHEMA,  schema);
-    if(ret==CONN_SUCCESS){
-        if(defaultSchema!=NULL){
-            ret=this->m_pImpl->validateHandshake(&props);
-        }else{
-            ret=this->m_pImpl->validateHandshake(NULL);
-        }
+    if (defaultSchema != NULL) {
+    	return connect(connectStr, static_cast<DrillUserProperties*>(NULL));
+    }
+    else {
+    	return connect(connectStr, &props);
     }
-    return ret;
 }
 
 connectionStatus_t DrillClient::connect(const char* connectStr, DrillUserProperties* properties){
@@ -366,14 +363,12 @@ void DrillClient::close() {
 }
 
 status_t DrillClient::submitQuery(Drill::QueryType t, const std::string& plan, pfnQueryResultsListener listener, void* listenerCtx, QueryHandle_t* qHandle){
-
     ::exec::shared::QueryType castedType = static_cast< ::exec::shared::QueryType> (t);
     DrillClientQueryResult* pResult=this->m_pImpl->SubmitQuery(castedType, plan, listener, listenerCtx);
+    *qHandle=static_cast<QueryHandle_t>(pResult);
     if(pResult==NULL){
-        *qHandle=NULL;
         return (status_t)this->m_pImpl->getError()->status;
     }
-    *qHandle=(QueryHandle_t)pResult;
     return QRY_SUCCESS;
 }
 
@@ -387,14 +382,32 @@ RecordIterator* DrillClient::submitQuery(Drill::QueryType t, const std::string&
     return pIter;
 }
 
+status_t DrillClient::prepareQuery(const std::string& sql, pfnPreparedStatementListener listener, void* listenerCtx, QueryHandle_t* qHandle) {
+	DrillClientPrepareHandle* pResult=this->m_pImpl->PrepareQuery(sql, listener, listenerCtx);
+	*qHandle=static_cast<QueryHandle_t>(pResult);
+	if(pResult==NULL){
+		return static_cast<status_t>(this->m_pImpl->getError()->status);
+	}
+	return QRY_SUCCESS;
+}
+
+status_t DrillClient::executeQuery(const PreparedStatement& pstmt, pfnQueryResultsListener listener, void* listenerCtx, QueryHandle_t* qHandle) {
+	DrillClientQueryResult* pResult=this->m_pImpl->ExecuteQuery(pstmt, listener, listenerCtx);
+	*qHandle=static_cast<QueryHandle_t>(pResult);
+	if(pResult==NULL){
+		return static_cast<status_t>(this->m_pImpl->getError()->status);
+	}
+	return QRY_SUCCESS;
+}
+
 void* DrillClient::getApplicationContext(QueryHandle_t handle){
     assert(handle!=NULL);
-    return ((DrillClientQueryResult*)handle)->getListenerContext();
+    return (static_cast<DrillClientQueryHandle*>(handle))->getApplicationContext();
 }
 
 status_t DrillClient::getQueryStatus(QueryHandle_t handle){
     assert(handle!=NULL);
-    return ((DrillClientQueryResult*)handle)->getQueryStatus();
+    return static_cast<DrillClientQueryHandle*>(handle)->getQueryStatus();
 }
 
 std::string& DrillClient::getError(){
@@ -402,7 +415,7 @@ std::string& DrillClient::getError(){
 }
 
 const std::string& DrillClient::getError(QueryHandle_t handle){
-    return ((DrillClientQueryResult*)handle)->getError()->msg;
+    return static_cast<DrillClientQueryHandle*>(handle)->getError()->msg;
 }
 
 void DrillClient::waitForResults(){
@@ -410,13 +423,23 @@ void DrillClient::waitForResults(){
 }
 
 void DrillClient::registerSchemaChangeListener(QueryHandle_t* handle, pfnSchemaListener l){
-    if(handle!=NULL){
-        ((DrillClientQueryResult*)(*handle))->registerSchemaChangeListener(l);
+	if (!handle) {
+		return;
+	}
+
+	// Let's ensure that handle is really an instance of DrillClientQueryResult
+	// by using dynamic_cast to verify. Since void is not a class, we first have
+	// to static_cast to a DrillClientQueryHandle
+	DrillClientQueryHandle* pHandle = static_cast<DrillClientQueryHandle*>(*handle);
+	DrillClientQueryResult* result = dynamic_cast<DrillClientQueryResult*>(pHandle);
+
+	if (result) {
+        result->registerSchemaChangeListener(l);
     }
 }
 
 void DrillClient::freeQueryResources(QueryHandle_t* handle){
-    delete (DrillClientQueryResult*)(*handle);
+	this->m_pImpl->freeQueryResources(static_cast<DrillClientQueryHandle*>(*handle));
     *handle=NULL;
 }
 
@@ -424,4 +447,12 @@ void DrillClient::freeRecordBatch(RecordBatch* pRecordBatch){
     delete pRecordBatch;
 }
 
+Metadata* DrillClient::getMetadata() {
+    return this->m_pImpl->getMetadata();
+}
+
+void DrillClient::freeMetadata(Metadata** metadata) {
+    this->m_pImpl->freeMetadata(static_cast<meta::DrillMetadata*>(*metadata));
+    *metadata = NULL;
+}
 } // namespace Drill