You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2014/06/20 22:24:51 UTC

[15/32] git commit: DRILL-875: Fixes for DRILL-707, DRILL-780, DRILL-835 (Schema change), DRILL-852, DRILL-876, DRILL_877, DRILL-878, DRILL-890

DRILL-875: Fixes for DRILL-707, DRILL-780, DRILL-835 (Schema change), DRILL-852, DRILL-876, DRILL_877, DRILL-878, DRILL-890


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/aaa4db74
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/aaa4db74
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/aaa4db74

Branch: refs/heads/master
Commit: aaa4db74b215e03ad0e1334cfc18964972d93a3b
Parents: ff39fb8
Author: Parth Chandra <pc...@maprtech.com>
Authored: Fri May 30 11:17:40 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu Jun 19 20:29:53 2014 -0700

----------------------------------------------------------------------
 contrib/native/client/CMakeLists.txt            |   25 +-
 .../native/client/example/querySubmitter.cpp    |   75 +-
 contrib/native/client/readme.linux              |   96 +
 .../parquet_scan_union_screen_physical.json     |    2 +-
 .../native/client/resources/simple_plan.json    |   12 +-
 contrib/native/client/scripts/fixProtodefs.sh   |    2 +-
 .../native/client/src/clientlib/CMakeLists.txt  |   15 +-
 .../client/src/clientlib/decimalUtils.cpp       |    2 +-
 .../native/client/src/clientlib/drillClient.cpp |  126 +-
 .../client/src/clientlib/drillClientImpl.cpp    |  539 +++-
 .../client/src/clientlib/drillClientImpl.hpp    |  163 +-
 contrib/native/client/src/clientlib/errmsgs.cpp |    6 +-
 contrib/native/client/src/clientlib/errmsgs.hpp |   44 +-
 contrib/native/client/src/clientlib/logger.cpp  |   69 +
 contrib/native/client/src/clientlib/logger.hpp  |   72 +
 .../native/client/src/clientlib/recordBatch.cpp |  109 +-
 .../native/client/src/clientlib/rpcDecoder.cpp  |    2 +
 .../native/client/src/clientlib/rpcEncoder.cpp  |    3 +
 contrib/native/client/src/clientlib/utils.hpp   |   47 +
 .../native/client/src/include/drill/common.hpp  |   25 +-
 .../client/src/include/drill/drillClient.hpp    |   73 +-
 .../client/src/include/drill/protobuf/User.pb.h |   40 +-
 .../client/src/include/drill/recordBatch.hpp    |  100 +-
 .../native/client/src/protobuf/BitControl.pb.cc |  458 +--
 .../native/client/src/protobuf/BitControl.pb.h  |  370 +--
 .../native/client/src/protobuf/CMakeLists.txt   |   56 +-
 contrib/native/client/src/protobuf/User.pb.cc   |   80 +-
 .../client/src/protobuf/UserBitShared.pb.cc     | 2618 ++++++++++++++-
 .../client/src/protobuf/UserBitShared.pb.h      | 2969 ++++++++++++++----
 29 files changed, 6260 insertions(+), 1938 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/aaa4db74/contrib/native/client/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/contrib/native/client/CMakeLists.txt b/contrib/native/client/CMakeLists.txt
index a306780..9ac705b 100644
--- a/contrib/native/client/CMakeLists.txt
+++ b/contrib/native/client/CMakeLists.txt
@@ -15,7 +15,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-cmake_minimum_required(VERSION 2.8)
+cmake_minimum_required(VERSION 2.6)
 
 project(drillclient)
 
@@ -26,20 +26,21 @@ set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_SOURCE_DIR}/cmakeModules/")
 
 
 # Find Boost
-set(Boost_USE_STATIC_LIBS OFF) 
-set(Boost_USE_MULTITHREADED ON)  
-set(Boost_USE_STATIC_RUNTIME OFF) 
-find_package(Boost REQUIRED COMPONENTS regex system date_time chrono thread log log_setup)
+set(Boost_USE_STATIC_LIBS OFF)
+set(Boost_USE_MULTITHREADED ON)
+set(Boost_USE_STATIC_RUNTIME OFF)
+find_package(Boost 1.53.0 REQUIRED COMPONENTS regex system date_time chrono thread )
 include_directories(${Boost_INCLUDE_DIRS})
 
-if(CMAKE_COMPILER_IS_GNUCXX) 
-    set(CMAKE_EXE_LINKER_FLAGS "-lrt -lpthread")                                                   
-endif()    
+if(CMAKE_COMPILER_IS_GNUCXX)
+    set(CMAKE_EXE_LINKER_FLAGS "-lrt -lpthread")
+    set(CMAKE_CXX_FLAGS "-fPIC")
+endif()
 
 add_definitions(-DBOOST_ALL_DYN_LINK)
 
 # Find Protobufs
-find_package(Protobuf REQUIRED)
+find_package(Protobuf REQUIRED )
 include_directories(${PROTOBUF_INCLUDE_DIR})
 
 #Find Zookeeper
@@ -50,13 +51,13 @@ find_package(Zookeeper  REQUIRED )
 #
 
 # Preprocess to fix protobuf .proto definitions
-add_subdirectory("${CMAKE_SOURCE_DIR}/src/protobuf") 
+add_subdirectory("${CMAKE_SOURCE_DIR}/src/protobuf")
 # protobuf includes are required by clientlib
 include_directories(${ProtoHeadersDir})
 include_directories(${ProtoIncludesDir})
 
 # Build the Client Library as a shared library
-add_subdirectory("${CMAKE_SOURCE_DIR}/src/clientlib") 
+add_subdirectory("${CMAKE_SOURCE_DIR}/src/clientlib")
 include_directories(${CMAKE_SOURCE_DIR}/src/include ${Zookeeper_INCLUDE_DIRS}  )
 
 # add a DEBUG preprocessor macro
@@ -68,7 +69,7 @@ set_property(
 # Link directory
 link_directories(/usr/local/lib)
 
-add_executable(querySubmitter example/querySubmitter.cpp ) 
+add_executable(querySubmitter example/querySubmitter.cpp )
 
 target_link_libraries(querySubmitter ${Boost_LIBRARIES} ${PROTOBUF_LIBRARY} drillClient protomsgs )
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/aaa4db74/contrib/native/client/example/querySubmitter.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/example/querySubmitter.cpp b/contrib/native/client/example/querySubmitter.cpp
index 96e2c65..9d24e68 100644
--- a/contrib/native/client/example/querySubmitter.cpp
+++ b/contrib/native/client/example/querySubmitter.cpp
@@ -20,9 +20,23 @@
 #include <iostream>
 #include <stdio.h>
 #include <stdlib.h>
-#include <boost/asio.hpp>
 #include "drill/drillc.hpp"
 
+Drill::status_t SchemaListener(void* ctx, Drill::FieldDefPtr fields, Drill::DrillClientError* err){
+    if(!err){
+        printf("SCHEMA CHANGE DETECTED:\n");
+        for(size_t i=0; i<fields->size(); i++){
+            std::string name= fields->at(i)->getName();
+            printf("%s\t", name.c_str());
+        }
+        printf("\n");
+        return Drill::QRY_SUCCESS ;
+    }else{
+        std::cerr<< "ERROR: " << err->msg << std::endl;
+        return Drill::QRY_FAILURE;
+    }
+}
+
 Drill::status_t QueryResultsListener(void* ctx, Drill::RecordBatch* b, Drill::DrillClientError* err){
     if(!err){
         b->print(std::cout, 0); // print all rows
@@ -79,18 +93,19 @@ void print(const Drill::FieldMetadata* pFieldMetadata, void* buf, size_t sz){
     return;
 }
 
-int nOptions=5;
+int nOptions=6;
 
 struct Option{
     char name[32];
     char desc[128];
     bool required;
-}qsOptions[]= { 
+}qsOptions[]= {
     {"plan", "Plan files separated by semicolons", false},
     {"query", "Query strings, separated by semicolons", false},
     {"type", "Query type [physical|logical|sql]", true},
     {"connectStr", "Connect string", true},
-    {"api", "API type [sync|async]", true}
+    {"api", "API type [sync|async]", true},
+    {"logLevel", "Logging level [trace|debug|info|warn|error|fatal]", false}
 };
 
 std::map<std::string, std::string> qsOptionValues;
@@ -136,7 +151,7 @@ int parseArgs(int argc, char* argv[]){
             }
         }
     }
-    if(error){ 
+    if(error){
         printUsage();
         exit(1);
     }
@@ -170,7 +185,7 @@ int readPlans(const std::string& planList, std::vector<std::string>& plans){
         std::string plan((std::istreambuf_iterator<char>(f)), (std::istreambuf_iterator<char>()));
         std::cout << "plan:" << plan << std::endl;
         plans.push_back(plan);
-    } 
+    }
     return 0;
 }
 
@@ -201,6 +216,18 @@ bool validate(const std::string& type, const std::string& query, const std::stri
         return true;
 }
 
+Drill::logLevel_t getLogLevel(const char *s){
+    if(s!=NULL){
+        if(!strcmp(s, "trace")) return Drill::LOG_TRACE;
+        if(!strcmp(s, "debug")) return Drill::LOG_DEBUG;
+        if(!strcmp(s, "info")) return Drill::LOG_INFO;
+        if(!strcmp(s, "warn")) return Drill::LOG_WARNING;
+        if(!strcmp(s, "error")) return Drill::LOG_ERROR;
+        if(!strcmp(s, "fatal")) return Drill::LOG_FATAL;
+    }
+    return Drill::LOG_ERROR;
+}
+
 int main(int argc, char* argv[]) {
     try {
 
@@ -213,26 +240,29 @@ int main(int argc, char* argv[]) {
         std::string planList=qsOptionValues["plan"];
         std::string api=qsOptionValues["api"];
         std::string type_str=qsOptionValues["type"];
+        std::string logLevel=qsOptionValues["logLevel"];
 
-        exec::user::QueryType type;
+        exec::shared::QueryType type;
 
         if(!validate(type_str, queryList, planList)){
             exit(1);
         }
 
+        Drill::logLevel_t l=getLogLevel(logLevel.c_str());
+
         std::vector<std::string> queryInputs;
         if(type_str=="sql" ){
             readQueries(queryList, queryInputs);
-            type=exec::user::SQL;
+            type=exec::shared::SQL;
         }else if(type_str=="physical" ){
             readPlans(planList, queryInputs);
-            type=exec::user::PHYSICAL;
+            type=exec::shared::PHYSICAL;
         }else if(type_str == "logical"){
             readPlans(planList, queryInputs);
-            type=exec::user::LOGICAL;
+            type=exec::shared::LOGICAL;
         }else{
             readQueries(queryList, queryInputs);
-            type=exec::user::SQL;
+            type=exec::shared::SQL;
         }
 
         std::vector<std::string>::iterator queryInpIter;
@@ -245,9 +275,9 @@ int main(int argc, char* argv[]) {
 
         Drill::DrillClient client;
         // To log to file
-        //DrillClient::initLogging("/var/log/drill/", LOG_INFO);
+        //DrillClient::initLogging("/var/log/drill/", l);
         // To log to stderr
-        Drill::DrillClient::initLogging(NULL, Drill::LOG_INFO);
+        Drill::DrillClient::initLogging(NULL, l);
 
         if(client.connect(connectStr.c_str())!=Drill::CONN_SUCCESS){
             std::cerr<< "Failed to connect with error: "<< client.getError() << " (Using:"<<connectStr<<")"<<std::endl;
@@ -269,26 +299,27 @@ int main(int argc, char* argv[]) {
                 // get fields.
                 row=0;
                 Drill::RecordIterator* pRecIter=*recordIterIter;
-                std::vector<Drill::FieldMetadata*>& fields = pRecIter->getColDefs();
-                while((ret=pRecIter->next())==Drill::QRY_SUCCESS){
+                Drill::FieldDefPtr fields= pRecIter->getColDefs();
+                while((ret=pRecIter->next()), ret==Drill::QRY_SUCCESS || ret==Drill::QRY_SUCCESS_WITH_INFO){
+                    fields = pRecIter->getColDefs();
                     row++;
-                    if(row%4095==0){
-                        for(size_t i=0; i<fields.size(); i++){
-                            std::string name= fields[i]->getName();
+                    if( (ret==Drill::QRY_SUCCESS_WITH_INFO  && pRecIter->hasSchemaChanged() )|| ( row%100==1)){
+                        for(size_t i=0; i<fields->size(); i++){
+                            std::string name= fields->at(i)->getName();
                             printf("%s\t", name.c_str());
                         }
                         printf("\n");
                     }
                     printf("ROW: %ld\t", row);
-                    for(size_t i=0; i<fields.size(); i++){
+                    for(size_t i=0; i<fields->size(); i++){
                         void* pBuf; size_t sz;
                         pRecIter->getCol(i, &pBuf, &sz);
-                        print(fields[i], pBuf, sz);
+                        print(fields->at(i), pBuf, sz);
                     }
                     printf("\n");
                 }
                 if(ret!=Drill::QRY_NO_MORE_DATA){
-                    std::cerr<< pRecIter->getError();
+                    std::cerr<< pRecIter->getError() << std::endl;
                 }
                 client.freeQueryIterator(&pRecIter);
             }
@@ -296,11 +327,13 @@ int main(int argc, char* argv[]) {
             for(queryInpIter = queryInputs.begin(); queryInpIter != queryInputs.end(); queryInpIter++) {
                 Drill::QueryHandle_t* qryHandle = new Drill::QueryHandle_t;
                 client.submitQuery(type, *queryInpIter, QueryResultsListener, NULL, qryHandle);
+                client.registerSchemaChangeListener(qryHandle, SchemaListener);
                 queryHandles.push_back(qryHandle);
             }
             client.waitForResults();
             for(queryHandleIter = queryHandles.begin(); queryHandleIter != queryHandles.end(); queryHandleIter++) {
                 client.freeQueryResources(*queryHandleIter);
+                delete *queryHandleIter;
             }
         }
         client.close();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/aaa4db74/contrib/native/client/readme.linux
----------------------------------------------------------------------
diff --git a/contrib/native/client/readme.linux b/contrib/native/client/readme.linux
new file mode 100644
index 0000000..fbdb6e4
--- /dev/null
+++ b/contrib/native/client/readme.linux
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+CentOS 6.5 build
+
+Install Prerequisites
+---------------------
+0) Install development tools
+    $>yum groupinstall 'Development Tools'
+
+1) CMAKE 2.8
+    $> yum install cmake28
+
+2.1) Download protobuf 2.5 from :
+    http://rpm.pbone.net/index.php3/stat/4/idpl/23552166/dir/centos_6/com/protobuf-2.5.0-16.1.x86_64.rpm.html
+    http://rpm.pbone.net/index.php3/stat/4/idpl/23552167/dir/centos_6/com/protobuf-compiler-2.5.0-16.1.x86_64.rpm.html
+    http://rpm.pbone.net/index.php3/stat/4/idpl/23552169/dir/centos_6/com/protobuf-devel-2.5.0-16.1.x86_64.rpm.html
+
+2.2) Install Protobufs
+    $> sudo yum install protobuf
+    $> sudo yum install protobuf-compiler
+    $> sudo yum install protobuf-devel
+
+3)
+3.1) Install Zookeeper prerequisites
+    - autoconf 2.59 or greater (should be installed with dev tools)
+    - cppunit 1.10.x or higher
+
+3.1.1) install cppuint
+    $> sudo yum install cppunit
+    $> sudo yum install cppunit-devel
+
+3.2) Download Zookeeper from :
+    - http://apache.mirror.quintex.com/zookeeper/zookeeper-3.4.6/
+    - untar and then follow instructions in ZOOKEEPER_DIR/src/c/README to build and install the client libs
+
+3.3) run autoreconf
+    $> autoreconf -if
+
+3.4) Build Zookeeper libs
+    $> ./configure --enable-debug --with-syncapi --enable-static --enable-shared
+    $> make && sudo make install
+
+4) Install boost. The minumim version required is 1.53, which will probably have to be built from source
+
+    # Remove any previous boost
+    $> sudo yum -y erase boost
+
+    # fetch the boost source rpm and create binary rpms
+    $> wget ftp://ftp.icm.edu.pl/vol/rzm2/linux-fedora-secondary/development/rawhide/source/SRPMS/b/boost-1.53.0-6.fc19.src.rpm
+    $> rpmbuild --rebuild boost-1.53.0-6.fc19.src.rpm
+
+    #install the binary rpms
+    #(Note: the "rpm" utility does not clean up old versions very well.)
+    $> sudo yum -y install ~/rpmbuild/RPMS/x86_64/*
+
+OR 
+    Download and build using boost build. 
+    See this link for how to build: http://www.boost.org/doc/libs/1_53_0/more/getting_started/unix-variants.html#prepare-to-use-a-boost-library-binary 
+    
+
+Build drill client
+-------------------
+    $> cd DRILL_DIR/contrib/native/client
+    $> mkdir build
+    $> cd build && cmake28 -G "Unix Makefiles" -D CMAKE_BUILD_TYPE=Debug ..
+    $> make
+
+Test
+----
+Run query submitter from the command line
+    $> querySubmitter query='select * from dfs.`/Users/pchandra/work/data/tpc-h/customer.parquet`' type=sql connectStr=local=10.250.0.146:31010 api=async logLevel=trace
+
+Valgrind
+--------
+Examples to run Valgrind and see the log in Valkyrie
+    $> valgrind --leak-check=yes --xml=yes --xml-file=qs-vg-log-a.xml querySubmitter query='select LINEITEM from dfs.`/Users/pchandra/work/data/tpc-h/customer.parquet`' type=sql connectStr=local=10.250.0.146:31010 api=async logLevel=trace
+    $> valkyrie -l qs-vg-log-a.xml
+
+

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/aaa4db74/contrib/native/client/resources/parquet_scan_union_screen_physical.json
----------------------------------------------------------------------
diff --git a/contrib/native/client/resources/parquet_scan_union_screen_physical.json b/contrib/native/client/resources/parquet_scan_union_screen_physical.json
index e677b15..81a62a3 100644
--- a/contrib/native/client/resources/parquet_scan_union_screen_physical.json
+++ b/contrib/native/client/resources/parquet_scan_union_screen_physical.json
@@ -34,7 +34,7 @@
         @id: 3,
         child: 2,
         pop: "screen"
-      } 
+      }
     ]
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/aaa4db74/contrib/native/client/resources/simple_plan.json
----------------------------------------------------------------------
diff --git a/contrib/native/client/resources/simple_plan.json b/contrib/native/client/resources/simple_plan.json
index ad3cdc7..8fede75 100644
--- a/contrib/native/client/resources/simple_plan.json
+++ b/contrib/native/client/resources/simple_plan.json
@@ -13,7 +13,7 @@
     cp: {type: "classpath"}
   },
   query:[
-    
+
                 {
                   @id:"1",
                   op: "scan",
@@ -38,7 +38,7 @@
                   input:"2",
                   op: "filter",
                   expr: "donuts.ppu < 1.00"
-                }, 
+                },
                 {
                   @id:"4",
                   input:"3",
@@ -51,7 +51,7 @@
                   input:"4",
                   op: "collapsingaggregate",
                   within: "ppusegment",
-                  carryovers: ["donuts.ppu"], 
+                  carryovers: ["donuts.ppu"],
                   aggregations: [
                     { ref: "donuts.typeCount",  expr: "count(1)" },
                     { ref: "donuts.quantity",  expr: "sum(quantity)" },
@@ -65,7 +65,7 @@
                   orderings: [
                     {order: "DESC", expr: "donuts.ppu" }
                   ]
-                }, 
+                },
                 {
                   @id:"7",
                   input:"6",
@@ -80,7 +80,7 @@
                   op: "limit",
           first: 0,
           last: 100
-        }, 
+        },
                 {
                   @id:"9",
                   input:"8",
@@ -88,7 +88,7 @@
                   memo: "output sink",
                   storageengine: "console",
                   target: {pipe: "STD_OUT"}
-                }      
+                }
   ]
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/aaa4db74/contrib/native/client/scripts/fixProtodefs.sh
----------------------------------------------------------------------
diff --git a/contrib/native/client/scripts/fixProtodefs.sh b/contrib/native/client/scripts/fixProtodefs.sh
index 7cb9710..f3ce781 100755
--- a/contrib/native/client/scripts/fixProtodefs.sh
+++ b/contrib/native/client/scripts/fixProtodefs.sh
@@ -43,7 +43,7 @@ main() {
 
         if [ -e ${TARGDIR}/${FNAME} ]
         then
-            if [ ${SRCDIR}/${FNAME} -nt ${TARGDIR}/${FNAME} ] 
+            if [ ${SRCDIR}/${FNAME} -nt ${TARGDIR}/${FNAME} ]
             then
                 fixFile ${FNAME}
             fi

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/aaa4db74/contrib/native/client/src/clientlib/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/CMakeLists.txt b/contrib/native/client/src/clientlib/CMakeLists.txt
index d07f930..7cd5dfb 100644
--- a/contrib/native/client/src/clientlib/CMakeLists.txt
+++ b/contrib/native/client/src/clientlib/CMakeLists.txt
@@ -18,14 +18,15 @@
 
 # Drill Client library
 
-set (CLIENTLIB_SRC_FILES 
+set (CLIENTLIB_SRC_FILES
     ${CMAKE_CURRENT_SOURCE_DIR}/decimalUtils.cpp
-    ${CMAKE_CURRENT_SOURCE_DIR}/drillClient.cpp 
-    ${CMAKE_CURRENT_SOURCE_DIR}/drillClientImpl.cpp 
-    ${CMAKE_CURRENT_SOURCE_DIR}/recordBatch.cpp 
-    ${CMAKE_CURRENT_SOURCE_DIR}/rpcEncoder.cpp 
-    ${CMAKE_CURRENT_SOURCE_DIR}/rpcDecoder.cpp 
-    ${CMAKE_CURRENT_SOURCE_DIR}/errmsgs.cpp 
+    ${CMAKE_CURRENT_SOURCE_DIR}/drillClient.cpp
+    ${CMAKE_CURRENT_SOURCE_DIR}/drillClientImpl.cpp
+    ${CMAKE_CURRENT_SOURCE_DIR}/recordBatch.cpp
+    ${CMAKE_CURRENT_SOURCE_DIR}/rpcEncoder.cpp
+    ${CMAKE_CURRENT_SOURCE_DIR}/rpcDecoder.cpp
+    ${CMAKE_CURRENT_SOURCE_DIR}/errmsgs.cpp
+    ${CMAKE_CURRENT_SOURCE_DIR}/logger.cpp
     )
 
 include_directories(${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_SOURCE_DIR}/../include )

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/aaa4db74/contrib/native/client/src/clientlib/decimalUtils.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/decimalUtils.cpp b/contrib/native/client/src/clientlib/decimalUtils.cpp
index 39439c5..3885faa 100644
--- a/contrib/native/client/src/clientlib/decimalUtils.cpp
+++ b/contrib/native/client/src/clientlib/decimalUtils.cpp
@@ -60,7 +60,7 @@ DecimalValue getDecimalValueFromByteBuf(SlicedByteBuf& data, size_t startIndex,
     bool needsEndiannessSwap = !truncateScale;
 
     // Initialize the BigDecimal, first digit in the ByteBuf has the sign so mask it out
-    cpp_int decimalDigits = (needsEndiannessSwap ? 
+    cpp_int decimalDigits = (needsEndiannessSwap ?
             bswap_32(data.getUint32(startIndex)) & 0x7FFFFFFF :
             (data.getUint32(startIndex) & 0x7FFFFFFF));
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/aaa4db74/contrib/native/client/src/clientlib/drillClient.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/drillClient.cpp b/contrib/native/client/src/clientlib/drillClient.cpp
index 9a9b919..10480b4 100644
--- a/contrib/native/client/src/clientlib/drillClient.cpp
+++ b/contrib/native/client/src/clientlib/drillClient.cpp
@@ -17,17 +17,11 @@
  */
 
 
-#include <boost/format.hpp>
-#include <boost/log/expressions.hpp>
-#include <boost/log/sinks/text_file_backend.hpp>
-#include <boost/log/utility/setup/file.hpp>
-#include <boost/log/utility/setup/common_attributes.hpp>
-#include <boost/log/sources/severity_logger.hpp>
-
 #include "drill/drillClient.hpp"
 #include "drill/recordBatch.hpp"
 #include "drillClientImpl.hpp"
 #include "errmsgs.hpp"
+#include "logger.hpp"
 
 #include "Types.pb.h"
 
@@ -50,84 +44,96 @@ DrillClientInitializer::~DrillClientInitializer(){
 
 logLevel_t DrillClientConfig::s_logLevel=LOG_ERROR;
 uint64_t DrillClientConfig::s_bufferLimit=-1;
-boost::mutex DrillClientConfig::s_mutex; 
+int32_t DrillClientConfig::s_socketTimeout=180;
+boost::mutex DrillClientConfig::s_mutex;
 
 DrillClientConfig::DrillClientConfig(){
     initLogging(NULL);
 }
 
+DrillClientConfig::~DrillClientConfig(){
+    Logger::close();
+}
+
 void DrillClientConfig::initLogging(const char* path){
-    if(path!=NULL){
-        std::string f=std::string(path)+"drill_clientlib_%N.log";
-        try{
-            boost::log::add_file_log
-                (
-                 boost::log::keywords::file_name = f.c_str(),
-                 boost::log::keywords::rotation_size = 10 * 1024 * 1024,
-                 boost::log::keywords::time_based_rotation = 
-                 boost::log::sinks::file::rotation_at_time_point(0, 0, 0),
-                 boost::log::keywords::format = "[%TimeStamp%]: %Message%"
-                );
-        }catch(std::exception& e){
-            // do nothing. Logging will happen to stderr
-            BOOST_LOG_TRIVIAL(error) << "Logging could not be initialized. Logging to stderr." ;
-        }
-    }
-    boost::log::add_common_attributes();
-    boost::log::core::get()->set_filter(boost::log::trivial::severity >= s_logLevel);
+    Logger::init(path);
 }
 
 void DrillClientConfig::setLogLevel(logLevel_t l){
-    boost::lock_guard<boost::mutex> bufferLock(DrillClientConfig::s_mutex);
+    boost::lock_guard<boost::mutex> configLock(DrillClientConfig::s_mutex);
     s_logLevel=l;
-    boost::log::core::get()->set_filter(boost::log::trivial::severity >= s_logLevel);
+    Logger::s_level=l;
+    //boost::log::core::get()->set_filter(boost::log::trivial::severity >= s_logLevel);
 }
 
 void DrillClientConfig::setBufferLimit(uint64_t l){
-    boost::lock_guard<boost::mutex> bufferLock(DrillClientConfig::s_mutex);
+    boost::lock_guard<boost::mutex> configLock(DrillClientConfig::s_mutex);
     s_bufferLimit=l;
 }
 
 uint64_t DrillClientConfig::getBufferLimit(){
-    boost::lock_guard<boost::mutex> bufferLock(DrillClientConfig::s_mutex);
+    boost::lock_guard<boost::mutex> configLock(DrillClientConfig::s_mutex);
     return s_bufferLimit;
 }
 
+void DrillClientConfig::setSocketTimeout(int32_t t){
+    boost::lock_guard<boost::mutex> configLock(DrillClientConfig::s_mutex);
+    s_socketTimeout=t;
+}
+
+int32_t DrillClientConfig::getSocketTimeout(){
+    boost::lock_guard<boost::mutex> configLock(DrillClientConfig::s_mutex);
+    return s_socketTimeout;
+}
+
 logLevel_t DrillClientConfig::getLogLevel(){
-    boost::lock_guard<boost::mutex> bufferLock(DrillClientConfig::s_mutex);
+    boost::lock_guard<boost::mutex> configLock(DrillClientConfig::s_mutex);
     return s_logLevel;
 }
 
 RecordIterator::~RecordIterator(){
     if(m_pColDefs!=NULL){
-        for(std::vector<Drill::FieldMetadata*>::iterator it=m_pColDefs->begin(); 
-                it!=m_pColDefs->end(); 
+        for(std::vector<Drill::FieldMetadata*>::iterator it=m_pColDefs->begin();
+                it!=m_pColDefs->end();
                 ++it){
             delete *it;
         }
     }
-    delete this->m_pColDefs;
-    this->m_pColDefs=NULL;
     delete this->m_pQueryResult;
     this->m_pQueryResult=NULL;
+    if(this->m_pCurrentRecordBatch!=NULL){
+        DRILL_LOG(LOG_TRACE) << "Deleted last Record batch " << (void*) m_pCurrentRecordBatch << std::endl;
+        delete this->m_pCurrentRecordBatch; this->m_pCurrentRecordBatch=NULL;
+    }
 }
 
-std::vector<Drill::FieldMetadata*>&  RecordIterator::getColDefs(){
+FieldDefPtr RecordIterator::getColDefs(){
     if(m_pQueryResult->hasError()){
         return DrillClientQueryResult::s_emptyColDefs;
     }
     //NOTE: if query is cancelled, return whatever you have. Client applications job to deal with it.
-    if(this->m_pColDefs==NULL){
+    if(this->m_pColDefs==NULL || this->hasSchemaChanged()){
         if(this->m_pCurrentRecordBatch==NULL){
             this->m_pQueryResult->waitForData();
             if(m_pQueryResult->hasError()){
                 return DrillClientQueryResult::s_emptyColDefs;
             }
         }
-        std::vector<Drill::FieldMetadata*>* pColDefs = new std::vector<Drill::FieldMetadata*>;
+        if(this->hasSchemaChanged()){
+            if(m_pColDefs!=NULL){
+                for(std::vector<Drill::FieldMetadata*>::iterator it=m_pColDefs->begin();
+                        it!=m_pColDefs->end();
+                        ++it){
+                    delete *it;
+                }
+                m_pColDefs->clear();
+                //delete m_pColDefs; m_pColDefs=NULL;
+            }
+        }
+        FieldDefPtr pColDefs(  new std::vector<Drill::FieldMetadata*>);
         {   //lock after we come out of the  wait.
             boost::lock_guard<boost::mutex> bufferLock(this->m_recordBatchMutex);
-            std::vector<Drill::FieldMetadata*>&  currentColDefs=DrillClientQueryResult::s_emptyColDefs;
+            boost::shared_ptr< std::vector<Drill::FieldMetadata*> >  currentColDefs=DrillClientQueryResult::s_emptyColDefs;
             if(this->m_pCurrentRecordBatch!=NULL){
                 currentColDefs=this->m_pCurrentRecordBatch->getColumnDefs();
             }else{
@@ -138,7 +144,7 @@ std::vector<Drill::FieldMetadata*>&  RecordIterator::getColDefs(){
                     currentColDefs=pR->getColumnDefs();
                 }
             }
-            for(std::vector<Drill::FieldMetadata*>::iterator it=currentColDefs.begin(); it!=currentColDefs.end(); ++it){
+            for(std::vector<Drill::FieldMetadata*>::iterator it=currentColDefs->begin(); it!=currentColDefs->end(); ++it){
                 Drill::FieldMetadata* fmd= new Drill::FieldMetadata;
                 fmd->copy(*(*it));//Yup, that's 2 stars
                 pColDefs->push_back(fmd);
@@ -146,7 +152,7 @@ std::vector<Drill::FieldMetadata*>&  RecordIterator::getColDefs(){
         }
         this->m_pColDefs = pColDefs;
     }
-    return *this->m_pColDefs;
+    return this->m_pColDefs;
 }
 
 status_t RecordIterator::next(){
@@ -160,12 +166,19 @@ status_t RecordIterator::next(){
     if(!this->m_pQueryResult->isCancelled()){
         if(this->m_pCurrentRecordBatch==NULL || this->m_currentRecord==this->m_pCurrentRecordBatch->getNumRecords()){
             boost::lock_guard<boost::mutex> bufferLock(this->m_recordBatchMutex);
-            delete this->m_pCurrentRecordBatch; //free the previous record batch
+            if(this->m_pCurrentRecordBatch !=NULL){
+                DRILL_LOG(LOG_TRACE) << "Deleted old Record batch " << (void*) m_pCurrentRecordBatch << std::endl;
+                delete this->m_pCurrentRecordBatch; //free the previous record batch
+            }
             this->m_currentRecord=0;
             this->m_pCurrentRecordBatch=this->m_pQueryResult->getNext();
-            BOOST_LOG_TRIVIAL(trace) << "Fetched new Record batch " ;
+            if(this->m_pCurrentRecordBatch != NULL){
+                DRILL_LOG(LOG_TRACE) << "Fetched new Record batch " << std::endl;
+            }else{
+                DRILL_LOG(LOG_TRACE) << "No new Record batch found " << std::endl;
+            }
             if(this->m_pCurrentRecordBatch==NULL || this->m_pCurrentRecordBatch->getNumRecords()==0){
-                BOOST_LOG_TRIVIAL(trace) << "No more data." ;
+                DRILL_LOG(LOG_TRACE) << "No more data." << std::endl;
                 ret = QRY_NO_MORE_DATA;
             }else if(this->m_pCurrentRecordBatch->hasSchemaChanged()){
                 ret=QRY_SUCCESS_WITH_INFO;
@@ -213,11 +226,16 @@ status_t RecordIterator::cancel(){
     return QRY_CANCEL;
 }
 
-void RecordIterator::registerSchemaChangeListener(pfnSchemaListener* l){
-    //TODO:
+bool RecordIterator::hasSchemaChanged(){
+    return m_currentRecord==0 && m_pCurrentRecordBatch!=NULL && m_pCurrentRecordBatch->hasSchemaChanged();
 }
 
-std::string& RecordIterator::getError(){
+void RecordIterator::registerSchemaChangeListener(pfnSchemaListener l){
+    assert(m_pQueryResult!=NULL);
+    this->m_pQueryResult->registerSchemaChangeListener(l);
+}
+
+const std::string& RecordIterator::getError(){
     return m_pQueryResult->getError()->msg;
 }
 
@@ -243,7 +261,7 @@ connectionStatus_t DrillClient::connect(const char* connectStr ){
     ret=this->m_pImpl->connect(connectStr);
 
     if(ret==CONN_SUCCESS)
-        ret=this->m_pImpl->ValidateHandShake()?CONN_SUCCESS:CONN_HANDSHAKE_FAILED;
+        ret=this->m_pImpl->validateHandShake()?CONN_SUCCESS:CONN_HANDSHAKE_FAILED;
     return ret;
 
 }
@@ -256,13 +274,13 @@ void DrillClient::close() {
     this->m_pImpl->Close();
 }
 
-status_t DrillClient::submitQuery(exec::user::QueryType t, const std::string& plan, pfnQueryResultsListener listener, void* listenerCtx, QueryHandle_t* qHandle){
+status_t DrillClient::submitQuery(::exec::shared::QueryType t, const std::string& plan, pfnQueryResultsListener listener, void* listenerCtx, QueryHandle_t* qHandle){
     DrillClientQueryResult* pResult=this->m_pImpl->SubmitQuery(t, plan, listener, listenerCtx);
     *qHandle=(QueryHandle_t)pResult;
-    return QRY_SUCCESS; 
+    return QRY_SUCCESS;
 }
 
-RecordIterator* DrillClient::submitQuery(exec::user::QueryType t, const std::string& plan, DrillClientError* err){
+RecordIterator* DrillClient::submitQuery(::exec::shared::QueryType t, const std::string& plan, DrillClientError* err){
     RecordIterator* pIter=NULL;
     DrillClientQueryResult* pResult=this->m_pImpl->SubmitQuery(t, plan, NULL, NULL);
     if(pResult){
@@ -280,6 +298,12 @@ void DrillClient::waitForResults(){
     this->m_pImpl->waitForResults();
 }
 
+void DrillClient::registerSchemaChangeListener(QueryHandle_t* handle, pfnSchemaListener l){
+    if(handle!=NULL){
+        ((DrillClientQueryResult*)(*handle))->registerSchemaChangeListener(l);
+    }
+}
+
 void DrillClient::freeQueryResources(QueryHandle_t* handle){
     delete (DrillClientQueryResult*)(*handle);
     *handle=NULL;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/aaa4db74/contrib/native/client/src/clientlib/drillClientImpl.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/drillClientImpl.cpp b/contrib/native/client/src/clientlib/drillClientImpl.cpp
index 14b4f7d..0767396 100644
--- a/contrib/native/client/src/clientlib/drillClientImpl.cpp
+++ b/contrib/native/client/src/clientlib/drillClientImpl.cpp
@@ -20,53 +20,73 @@
 #include <string.h>
 #include <boost/asio.hpp>
 #include <boost/bind.hpp>
+#include <boost/date_time/posix_time/posix_time_duration.hpp>
 #include <boost/lexical_cast.hpp>
 #include <boost/thread.hpp>
-#include <boost/log/trivial.hpp>
 #include <zookeeper/zookeeper.h>
 
 #include "drill/drillClient.hpp"
 #include "drill/recordBatch.hpp"
 #include "drillClientImpl.hpp"
 #include "errmsgs.hpp"
+#include "logger.hpp"
 #include "rpcEncoder.hpp"
 #include "rpcDecoder.hpp"
 #include "rpcMessage.hpp"
+#include "utils.hpp"
 
 #include "GeneralRPC.pb.h"
 #include "UserBitShared.pb.h"
 
-#ifdef DEBUG
-#define BOOST_ASIO_ENABLE_HANDLER_TRACKING
-#endif
-
-
 namespace Drill{
 
 RpcEncoder DrillClientImpl::s_encoder;
 RpcDecoder DrillClientImpl::s_decoder;
 
 std::string debugPrintQid(const exec::shared::QueryId& qid){
-    return std::string("[")+boost::lexical_cast<std::string>(qid.part1()) +std::string(":") + boost::lexical_cast<std::string>(qid.part2())+std::string("] "); 
+    return std::string("[")+boost::lexical_cast<std::string>(qid.part1()) +std::string(":") + boost::lexical_cast<std::string>(qid.part2())+std::string("] ");
 }
 
-void DrillClientImpl::parseConnectStr(const char* connectStr, std::string& protocol, std::string& hostPortStr){
-    char u[1024];
-    strcpy(u,connectStr);
+void setSocketTimeout(boost::asio::ip::tcp::socket& socket, int32_t timeout){
+#if defined _WIN32
+    int32_t timeoutMsecs=timeout*1000;
+    setsockopt(socket.native(), SOL_SOCKET, SO_RCVTIMEO, (const char*)&timeoutMsecs, sizeof(timeoutMsecs));
+    setsockopt(socket.native(), SOL_SOCKET, SO_SNDTIMEO, (const char*)&timeoutMsecs, sizeof(timeoutMsecs));
+#else
+    struct timeval tv;
+    tv.tv_sec  = timeout;
+    tv.tv_usec = 0;
+    int e=0;
+    e=setsockopt(socket.native(), SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
+    e=setsockopt(socket.native(), SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv));
+#endif
+}
+
+
+void DrillClientImpl::parseConnectStr(const char* connectStr,
+        std::string& pathToDrill,
+        std::string& protocol,
+        std::string& hostPortStr){
+    char u[MAX_CONNECT_STR+1];
+    strncpy(u,connectStr, MAX_CONNECT_STR); u[MAX_CONNECT_STR]=0;
     char* z=strtok(u, "=");
-    char* c=strtok(NULL, "");
+    char* c=strtok(NULL, "/");
+    char* p=strtok(NULL, "");
+
+    if(p!=NULL) pathToDrill=std::string("/")+p;
     protocol=z; hostPortStr=c;
+    return;
 }
 
 connectionStatus_t DrillClientImpl::connect(const char* connStr){
-    std::string protocol, hostPortStr;
-    std::string host; 
+    std::string pathToDrill, protocol, hostPortStr;
+    std::string host;
     std::string port;
     if(!this->m_bIsConnected){
-        parseConnectStr(connStr, protocol, hostPortStr);
-        if(!strcmp(protocol.c_str(), "jdbc:drill:zk")){
+        parseConnectStr(connStr, pathToDrill, protocol, hostPortStr);
+        if(!strcmp(protocol.c_str(), "zk")){
             ZookeeperImpl zook;
-            if(zook.connectToZookeeper(hostPortStr.c_str())!=0){
+            if(zook.connectToZookeeper(hostPortStr.c_str(), pathToDrill.c_str())!=0){
                 return handleConnError(CONN_ZOOKEEPER_ERROR, getMessage(ERR_CONN_ZOOKEEPER, zook.getError().c_str()));
             }
             zook.debugPrint();
@@ -74,9 +94,9 @@ connectionStatus_t DrillClientImpl::connect(const char* connStr){
             host=boost::lexical_cast<std::string>(e.address());
             port=boost::lexical_cast<std::string>(e.user_port());
             zook.close();
-        }else if(!strcmp(protocol.c_str(), "jdbc:drill:local")){
-            char tempStr[1024];
-            strcpy(tempStr, hostPortStr.c_str());
+        }else if(!strcmp(protocol.c_str(), "local")){
+            char tempStr[MAX_CONNECT_STR+1];
+            strncpy(tempStr, hostPortStr.c_str(), MAX_CONNECT_STR); tempStr[MAX_CONNECT_STR]=0;
             host=strtok(tempStr, ":");
             port=strtok(NULL, "");
         }else{
@@ -97,7 +117,7 @@ connectionStatus_t DrillClientImpl::connect(const char* host, const char* port){
         tcp::resolver::iterator end;
         while (iter != end){
             endpoint = *iter++;
-            BOOST_LOG_TRIVIAL(trace) << endpoint << std::endl;
+            DRILL_LOG(LOG_TRACE) << endpoint << std::endl;
         }
         boost::system::error_code ec;
         m_socket.connect(endpoint, ec);
@@ -108,59 +128,147 @@ connectionStatus_t DrillClientImpl::connect(const char* host, const char* port){
     }catch(std::exception e){
         return handleConnError(CONN_FAILURE, getMessage(ERR_CONN_EXCEPT, e.what()));
     }
+
+    //
+    // We put some OS dependent code here for timing out a socket. Mostly, this appears to
+    // do nothing. Should we leave it in there?
+    //
+    setSocketTimeout(m_socket, DrillClientConfig::getSocketTimeout());
+
     return CONN_SUCCESS;
 }
 
-void DrillClientImpl::sendSync(OutBoundRpcMessage& msg){
+connectionStatus_t DrillClientImpl::sendSync(OutBoundRpcMessage& msg){
     DrillClientImpl::s_encoder.Encode(m_wbuf, msg);
-    m_socket.write_some(boost::asio::buffer(m_wbuf));
+    boost::system::error_code ec;
+    size_t s=m_socket.write_some(boost::asio::buffer(m_wbuf), ec);
+    if(!ec && s!=0){
+    return CONN_SUCCESS;
+    }else{
+        return handleConnError(CONN_FAILURE, getMessage(ERR_CONN_WFAIL, ec.message().c_str()));
+    }
 }
 
-void DrillClientImpl::recvSync(InBoundRpcMessage& msg){
-    m_socket.read_some(boost::asio::buffer(m_rbuf));
-    uint32_t length = 0;
-    int bytes_read = DrillClientImpl::s_decoder.LengthDecode(m_rbuf.data(), &length);
-    DrillClientImpl::s_decoder.Decode(m_rbuf.data() + bytes_read, length, msg);
+connectionStatus_t DrillClientImpl::recvHandshake(){
+    if(m_rbuf==NULL){
+        m_rbuf = Utils::allocateBuffer(MAX_SOCK_RD_BUFSIZE);
+    }
+
+    m_deadlineTimer.expires_from_now(boost::posix_time::seconds(DrillClientConfig::getSocketTimeout()));
+    m_deadlineTimer.async_wait(boost::bind(
+                &DrillClientImpl::handleHShakeReadTimeout,
+                this,
+                boost::asio::placeholders::error
+                ));
+    DRILL_LOG(LOG_TRACE) << "Started new handshake wait timer."  << std::endl;
+
+    async_read(
+            this->m_socket,
+            boost::asio::buffer(m_rbuf, LEN_PREFIX_BUFLEN),
+            boost::bind(
+                &DrillClientImpl::handleHandshake,
+                this,
+                m_rbuf,
+                boost::asio::placeholders::error,
+                boost::asio::placeholders::bytes_transferred)
+            );
+    DRILL_LOG(LOG_DEBUG) << "Sent handshake read request to server" << std::endl;
+    m_io_service.run();
+    if(m_rbuf!=NULL){
+        Utils::freeBuffer(m_rbuf); m_rbuf=NULL;
+    }
+    return CONN_SUCCESS;
 }
 
-bool DrillClientImpl::ValidateHandShake(){
-    exec::user::UserToBitHandshake u2b;
-    exec::user::BitToUserHandshake b2u;
+void DrillClientImpl::handleHandshake(ByteBuf_t _buf,
+        const boost::system::error_code& err,
+        size_t bytes_transferred) {
+    boost::system::error_code error=err;
+    // cancel the timer
+    m_deadlineTimer.cancel();
+    DRILL_LOG(LOG_TRACE) << "Deadline timer cancelled."  << std::endl;
+    if(!error){
+        InBoundRpcMessage msg;
+        uint32_t length = 0;
+        int bytes_read = DrillClientImpl::s_decoder.LengthDecode(m_rbuf, &length);
+        if(length>0){
+            size_t leftover = LEN_PREFIX_BUFLEN - bytes_read;
+            ByteBuf_t b=m_rbuf + LEN_PREFIX_BUFLEN;
+            size_t bytesToRead=length - leftover;
+            while(1){
+                size_t dataBytesRead=m_socket.read_some(
+                        boost::asio::buffer(b, bytesToRead),
+                        error);
+                if(err) break;
+                DRILL_LOG(LOG_TRACE) << "Handshake Message: actual bytes read = " << dataBytesRead << std::endl;
+                if(dataBytesRead==bytesToRead) break;
+                bytesToRead-=dataBytesRead;
+                b+=dataBytesRead;
+            }
+            DrillClientImpl::s_decoder.Decode(m_rbuf+bytes_read, length, msg);
+        }else{
+            handleConnError(CONN_FAILURE, getMessage(ERR_CONN_RDFAIL, "No handshake"));
+        }
+        exec::user::BitToUserHandshake b2u;
+        b2u.ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size());
+        this->m_handshakeVersion=b2u.rpc_version();
+
+    }else{
+        // boost error
+        handleConnError(CONN_FAILURE, getMessage(ERR_CONN_RDFAIL, error.message().c_str()));
+        return;
+    }
+    return;
+}
 
+void DrillClientImpl::handleHShakeReadTimeout(const boost::system::error_code & err){
+    // if err == boost::asio::error::operation_aborted) then the caller cancelled the timer.
+    if(!err){
+        // Check whether the deadline has passed.
+        if (m_deadlineTimer.expires_at() <= boost::asio::deadline_timer::traits_type::now()){
+            // The deadline has passed.
+            m_deadlineTimer.expires_at(boost::posix_time::pos_infin);
+            DRILL_LOG(LOG_TRACE) << "Deadline timer expired."  << std::endl;
+            m_socket.close();
+        }
+    }
+    return;
+}
+
+bool DrillClientImpl::validateHandShake(){
+    exec::user::UserToBitHandshake u2b;
     u2b.set_channel(exec::shared::USER);
-    u2b.set_rpc_version(1);
+    u2b.set_rpc_version(DRILL_RPC_VERSION);
     u2b.set_support_listening(true);
-
     {
         boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
         uint64_t coordId = this->getNextCoordinationId();
 
         OutBoundRpcMessage out_msg(exec::rpc::REQUEST, exec::user::HANDSHAKE, coordId, &u2b);
         sendSync(out_msg);
-
-        InBoundRpcMessage in_msg;
-        recvSync(in_msg);
-
-        b2u.ParseFromArray(in_msg.m_pbody.data(), in_msg.m_pbody.size());
     }
 
-    // validate handshake
-    if (b2u.rpc_version() != u2b.rpc_version()) {
-        BOOST_LOG_TRIVIAL(trace) << "Invalid rpc version.  Expected << " 
-            << u2b.rpc_version() << ", actual "<< b2u.rpc_version() << "." ;
-        handleConnError(CONN_HANDSHAKE_FAILED, 
-                getMessage(ERR_CONN_NOHSHAKE, u2b.rpc_version(), b2u.rpc_version()));
+    recvHandshake();
+    this->m_io_service.reset();
+    if(this->m_pError!=NULL){
+        return false;
+    }
+    if(m_handshakeVersion != u2b.rpc_version()) {
+        DRILL_LOG(LOG_TRACE) << "Invalid rpc version.  Expected << "
+            << DRILL_RPC_VERSION << ", actual "<< m_handshakeVersion << "." << std::endl;
+        handleConnError(CONN_HANDSHAKE_FAILED,
+                getMessage(ERR_CONN_NOHSHAKE, DRILL_RPC_VERSION, m_handshakeVersion));
         return false;
     }
     return true;
 }
 
 
-std::vector<Drill::FieldMetadata*> DrillClientQueryResult::s_emptyColDefs;
+FieldDefPtr DrillClientQueryResult::s_emptyColDefs( new (std::vector<Drill::FieldMetadata*>));
 
-DrillClientQueryResult* DrillClientImpl::SubmitQuery(exec::user::QueryType t, 
-        const std::string& plan, 
-        pfnQueryResultsListener l, 
+DrillClientQueryResult* DrillClientImpl::SubmitQuery(::exec::shared::QueryType t,
+        const std::string& plan,
+        pfnQueryResultsListener l,
         void* lCtx){
     exec::user::RunQuery query;
     query.set_results_mode(exec::user::STREAM_FULL);
@@ -180,16 +288,16 @@ DrillClientQueryResult* DrillClientImpl::SubmitQuery(exec::user::QueryType t,
         bool sendRequest=false;
         this->m_queryIds[coordId]=pQuery;
 
-        BOOST_LOG_TRIVIAL(debug)  << "Submit Query Request. Coordination id = " << coordId;
+        DRILL_LOG(LOG_DEBUG)  << "Submit Query Request. Coordination id = " << coordId << std::endl;
 
         if(m_pendingRequests++==0){
             sendRequest=true;
         }else{
-            BOOST_LOG_TRIVIAL(debug) << "Queueing read request to server" << std::endl;
-            BOOST_LOG_TRIVIAL(debug) << "Number of pending requests = " << m_pendingRequests << std::endl;
+            DRILL_LOG(LOG_DEBUG) << "Queueing read request to server" << std::endl;
+            DRILL_LOG(LOG_DEBUG) << "Number of pending requests = " << m_pendingRequests << std::endl;
         }
         if(sendRequest){
-            BOOST_LOG_TRIVIAL(debug) << "Sending read request. Number of pending requests = " 
+            DRILL_LOG(LOG_DEBUG) << "Sending read request. Number of pending requests = "
                 << m_pendingRequests << std::endl;
             getNextResult(); // async wait for results
         }
@@ -198,8 +306,8 @@ DrillClientQueryResult* DrillClientImpl::SubmitQuery(exec::user::QueryType t,
     //run this in a new thread
     {
         if(this->m_pListenerThread==NULL){
-            BOOST_LOG_TRIVIAL(debug) << "Starting listener thread." << std::endl;
-            this->m_pListenerThread= new boost::thread(boost::bind(&boost::asio::io_service::run, 
+            DRILL_LOG(LOG_DEBUG) << "Starting listener thread." << std::endl;
+            this->m_pListenerThread= new boost::thread(boost::bind(&boost::asio::io_service::run,
                         &this->m_io_service));
         }
     }
@@ -211,61 +319,72 @@ void DrillClientImpl::getNextResult(){
     // This call is always made from within a function where the mutex has already been acquired
     //boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
 
-    //use free, not delete to free 
-    ByteBuf_t readBuf = allocateBuffer(LEN_PREFIX_BUFLEN);
-    async_read( 
+    //use free, not delete to free
+    ByteBuf_t readBuf = Utils::allocateBuffer(LEN_PREFIX_BUFLEN);
+
+    m_deadlineTimer.expires_from_now(boost::posix_time::seconds(DrillClientConfig::getSocketTimeout()));
+    m_deadlineTimer.async_wait(boost::bind(
+                &DrillClientImpl::handleReadTimeout,
+                this,
+                boost::asio::placeholders::error
+                ));
+    DRILL_LOG(LOG_TRACE) << "Started new async wait timer."  << std::endl;
+
+    async_read(
             this->m_socket,
             boost::asio::buffer(readBuf, LEN_PREFIX_BUFLEN),
             boost::bind(
                 &DrillClientImpl::handleRead,
                 this,
                 readBuf,
-                boost::asio::placeholders::error, 
+                boost::asio::placeholders::error,
                 boost::asio::placeholders::bytes_transferred)
             );
-    BOOST_LOG_TRIVIAL(debug) << "Sent read request to server" << std::endl;
+    DRILL_LOG(LOG_DEBUG) << "Sent read request to server" << std::endl;
 }
 
 void DrillClientImpl::waitForResults(){
     this->m_pListenerThread->join();
-    BOOST_LOG_TRIVIAL(debug) << "Listener thread exited." << std::endl;
+    DRILL_LOG(LOG_DEBUG) << "Listener thread exited." << std::endl;
     delete this->m_pListenerThread; this->m_pListenerThread=NULL;
 }
 
-status_t DrillClientImpl::readMsg(ByteBuf_t _buf, InBoundRpcMessage& msg, boost::system::error_code& error){
+status_t DrillClientImpl::readMsg(ByteBuf_t _buf, ByteBuf_t* allocatedBuffer, InBoundRpcMessage& msg, boost::system::error_code& error){
     size_t leftover=0;
     uint32_t rmsgLen;
     ByteBuf_t currentBuffer;
+    *allocatedBuffer=NULL;
     {
-        // We need to protect the readLength and read buffer, and the pending requests counter, 
+        // We need to protect the readLength and read buffer, and the pending requests counter,
         // but we don't have to keep the lock while we decode the rest of the buffer.
         boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
         int bytes_read = DrillClientImpl::s_decoder.LengthDecode(_buf, &rmsgLen);
-        BOOST_LOG_TRIVIAL(trace) << "len bytes = " << bytes_read << std::endl;
-        BOOST_LOG_TRIVIAL(trace) << "rmsgLen = " << rmsgLen << std::endl;
+        DRILL_LOG(LOG_TRACE) << "len bytes = " << bytes_read << std::endl;
+        DRILL_LOG(LOG_TRACE) << "rmsgLen = " << rmsgLen << std::endl;
 
         if(rmsgLen>0){
             leftover = LEN_PREFIX_BUFLEN - bytes_read;
             // Allocate a buffer
-            BOOST_LOG_TRIVIAL(trace) << "Allocated and locked buffer." << std::endl;
-            currentBuffer=allocateBuffer(rmsgLen);
+            DRILL_LOG(LOG_TRACE) << "Allocated and locked buffer." << std::endl;
+            currentBuffer=Utils::allocateBuffer(rmsgLen);
             if(currentBuffer==NULL){
+                Utils::freeBuffer(_buf);
                 return handleQryError(QRY_CLIENT_OUTOFMEM, getMessage(ERR_QRY_OUTOFMEM), NULL);
             }
+            *allocatedBuffer=currentBuffer;
             if(leftover){
                 memcpy(currentBuffer, _buf + bytes_read, leftover);
             }
-            freeBuffer(_buf);
-            BOOST_LOG_TRIVIAL(trace) << "reading data (rmsgLen - leftover) : " 
+            DRILL_LOG(LOG_TRACE) << "reading data (rmsgLen - leftover) : "
                 << (rmsgLen - leftover) << std::endl;
             ByteBuf_t b=currentBuffer + leftover;
             size_t bytesToRead=rmsgLen - leftover;
             while(1){
                 size_t dataBytesRead=this->m_socket.read_some(
-                        boost::asio::buffer(b, bytesToRead), 
+                        boost::asio::buffer(b, bytesToRead),
                         error);
                 if(error) break;
-                BOOST_LOG_TRIVIAL(trace) << "Data Message: actual bytes read = " << dataBytesRead << std::endl;
+                DRILL_LOG(LOG_TRACE) << "Data Message: actual bytes read = " << dataBytesRead << std::endl;
                 if(dataBytesRead==bytesToRead) break;
                 bytesToRead-=dataBytesRead;
                 b+=dataBytesRead;
@@ -273,31 +392,34 @@ status_t DrillClientImpl::readMsg(ByteBuf_t _buf, InBoundRpcMessage& msg, boost:
             if(!error){
                 // read data successfully
                 DrillClientImpl::s_decoder.Decode(currentBuffer, rmsgLen, msg);
-                BOOST_LOG_TRIVIAL(trace) << "Done decoding chunk. Coordination id: " <<msg.m_coord_id<< std::endl;
+                DRILL_LOG(LOG_TRACE) << "Done decoding chunk. Coordination id: " <<msg.m_coord_id<< std::endl;
             }else{
-                return handleQryError(QRY_COMM_ERROR, 
+                Utils::freeBuffer(_buf);
+                return handleQryError(QRY_COMM_ERROR,
                         getMessage(ERR_QRY_COMMERR, error.message().c_str()), NULL);
             }
         }else{
-            // got a message with an invalid read length. 
+            // got a message with an invalid read length.
+            Utils::freeBuffer(_buf);
             return handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVREADLEN), NULL);
         }
     }
+    Utils::freeBuffer(_buf);
     return QRY_SUCCESS;
 }
 
-status_t DrillClientImpl::processQueryResult(InBoundRpcMessage& msg ){
+status_t DrillClientImpl::processQueryResult(ByteBuf_t allocatedBuffer, InBoundRpcMessage& msg ){
     DrillClientQueryResult* pDrillClientQueryResult=NULL;
     status_t ret=QRY_SUCCESS;
     {
         boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
         exec::user::QueryResult* qr = new exec::user::QueryResult; //Record Batch will own this object and free it up.
 
-        BOOST_LOG_TRIVIAL(debug) << "Processing Query Result " << std::endl;
+        DRILL_LOG(LOG_DEBUG) << "Processing Query Result " << std::endl;
         qr->ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size());
-        BOOST_LOG_TRIVIAL(trace) << qr->DebugString();
+        DRILL_LOG(LOG_TRACE) << qr->DebugString() << std::endl;
 
-        BOOST_LOG_TRIVIAL(debug) << "Searching for Query Id - " << debugPrintQid(qr->query_id()) << std::endl;
+        DRILL_LOG(LOG_DEBUG) << "Searching for Query Id - " << debugPrintQid(qr->query_id()) << std::endl;
 
         exec::shared::QueryId qid;
         qid.CopyFrom(qr->query_id());
@@ -306,39 +428,48 @@ status_t DrillClientImpl::processQueryResult(InBoundRpcMessage& msg ){
         if(it!=this->m_queryResults.end()){
             pDrillClientQueryResult=(*it).second;
         }else{
-            assert(0); 
+            assert(0);
             //assert might be compiled away in a release build. So return an error to the app.
             status_t ret= handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_OUTOFORDER), NULL);
             delete qr;
             return ret;
         }
-        BOOST_LOG_TRIVIAL(debug) << "Drill Client Query Result Query Id - " << 
-            debugPrintQid(*pDrillClientQueryResult->m_pQueryId) 
+        DRILL_LOG(LOG_DEBUG) << "Drill Client Query Result Query Id - " <<
+            debugPrintQid(*pDrillClientQueryResult->m_pQueryId)
             << std::endl;
         //Check QueryResult.queryState. QueryResult could have an error.
         if(qr->query_state() == exec::user::QueryResult_QueryState_FAILED){
             status_t ret=handleQryError(QRY_FAILURE, qr->error(0), pDrillClientQueryResult);
+            Utils::freeBuffer(allocatedBuffer);
             delete qr;
             return ret;
         }
         //Validate the RPC message
         std::string valErr;
         if( (ret=validateMessage(msg, *qr, valErr)) != QRY_SUCCESS){
+            Utils::freeBuffer(allocatedBuffer);
+            delete qr;
             return handleQryError(ret, getMessage(ERR_QRY_INVRPC, valErr.c_str()), pDrillClientQueryResult);
         }
 
-        //Build Record Batch here 
-        BOOST_LOG_TRIVIAL(trace) << qr->DebugString();
+        //Build Record Batch here
+        DRILL_LOG(LOG_TRACE) << qr->DebugString() << std::endl;
+
+        RecordBatch* pRecordBatch= new RecordBatch(qr, allocatedBuffer,  msg.m_dbody);
+        pDrillClientQueryResult->m_numBatches++;
 
-        RecordBatch* pRecordBatch= new RecordBatch(qr, msg.m_dbody);
+        DRILL_LOG(LOG_TRACE) << "Allocated new Record batch." << (void*)pRecordBatch << std::endl;
         pRecordBatch->build();
-        BOOST_LOG_TRIVIAL(debug) << debugPrintQid(qr->query_id())<<"recordBatch.numRecords " 
+        DRILL_LOG(LOG_DEBUG) << debugPrintQid(qr->query_id())<<"recordBatch.numRecords "
             << pRecordBatch->getNumRecords()  << std::endl;
-        BOOST_LOG_TRIVIAL(debug) << debugPrintQid(qr->query_id())<<"recordBatch.numFields " 
+        DRILL_LOG(LOG_DEBUG) << debugPrintQid(qr->query_id())<<"recordBatch.numFields "
             << pRecordBatch->getNumFields()  << std::endl;
-        BOOST_LOG_TRIVIAL(debug) << debugPrintQid(qr->query_id())<<"recordBatch.isLastChunk " 
+        DRILL_LOG(LOG_DEBUG) << debugPrintQid(qr->query_id())<<"recordBatch.isLastChunk "
             << pRecordBatch->isLastChunk()  << std::endl;
 
+        ret=pDrillClientQueryResult->setupColumnDefs(qr);
+        if(ret==QRY_SUCCESS_WITH_INFO)pRecordBatch->schemaChanged(true);
+
         pDrillClientQueryResult->m_bIsQueryPending=true;
         pDrillClientQueryResult->m_bIsLastChunk=qr->is_last_chunk();
         pfnQueryResultsListener pResultsListener=pDrillClientQueryResult->m_pResultsListener;
@@ -346,7 +477,7 @@ status_t DrillClientImpl::processQueryResult(InBoundRpcMessage& msg ){
             ret = pResultsListener(pDrillClientQueryResult, pRecordBatch, NULL);
         }else{
             //Use a default callback that is called when a record batch is received
-            ret = pDrillClientQueryResult->defaultQueryResultsListener(pDrillClientQueryResult, 
+            ret = pDrillClientQueryResult->defaultQueryResultsListener(pDrillClientQueryResult,
                     pRecordBatch, NULL);
         }
     } // release lock
@@ -357,16 +488,16 @@ status_t DrillClientImpl::processQueryResult(InBoundRpcMessage& msg ){
             m_pendingRequests--;
         }
         pDrillClientQueryResult->m_bIsQueryPending=false;
-        BOOST_LOG_TRIVIAL(debug) << "Client app cancelled query.";
+        DRILL_LOG(LOG_DEBUG) << "Client app cancelled query." << std::endl;
         return ret;
     }
     if(pDrillClientQueryResult->m_bIsLastChunk){
         {
             boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
             m_pendingRequests--;
-            BOOST_LOG_TRIVIAL(debug) << debugPrintQid(*pDrillClientQueryResult->m_pQueryId) 
+            DRILL_LOG(LOG_DEBUG) << debugPrintQid(*pDrillClientQueryResult->m_pQueryId)
                 <<  "Received last batch. " << std::endl;
-            BOOST_LOG_TRIVIAL(debug) << debugPrintQid(*pDrillClientQueryResult->m_pQueryId) 
+            DRILL_LOG(LOG_DEBUG) << debugPrintQid(*pDrillClientQueryResult->m_pQueryId)
                 << "Pending requests: " << m_pendingRequests <<"." << std::endl;
         }
         ret=QRY_NO_MORE_DATA;
@@ -377,9 +508,9 @@ status_t DrillClientImpl::processQueryResult(InBoundRpcMessage& msg ){
     return ret;
 }
 
-status_t DrillClientImpl::processQueryId(InBoundRpcMessage& msg ){
+status_t DrillClientImpl::processQueryId(ByteBuf_t allocatedBuffer, InBoundRpcMessage& msg ){
     DrillClientQueryResult* pDrillClientQueryResult=NULL;
-    BOOST_LOG_TRIVIAL(debug) << "Processing Query Handle with coordination id:" << msg.m_coord_id << std::endl;
+    DRILL_LOG(LOG_DEBUG) << "Processing Query Handle with coordination id:" << msg.m_coord_id << std::endl;
     status_t ret=QRY_SUCCESS;
 
     boost::lock_guard<boost::mutex> lock(m_dcMutex);
@@ -388,45 +519,74 @@ status_t DrillClientImpl::processQueryId(InBoundRpcMessage& msg ){
     if(it!=this->m_queryIds.end()){
         pDrillClientQueryResult=(*it).second;
         exec::shared::QueryId *qid = new exec::shared::QueryId;
-        BOOST_LOG_TRIVIAL(trace)  << "Received Query Handle" << msg.m_pbody.size();
+        DRILL_LOG(LOG_TRACE)  << "Received Query Handle" << msg.m_pbody.size() << std::endl;
         qid->ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size());
-        BOOST_LOG_TRIVIAL(trace) << qid->DebugString();
+        DRILL_LOG(LOG_TRACE) << qid->DebugString() << std::endl;
         m_queryResults[qid]=pDrillClientQueryResult;
         //save queryId allocated here so we can free it later
         pDrillClientQueryResult->setQueryId(qid);
     }else{
+        Utils::freeBuffer(allocatedBuffer);
         return handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVQUERYID), NULL);
     }
+    Utils::freeBuffer(allocatedBuffer);
     return ret;
 }
 
-void DrillClientImpl::handleRead(ByteBuf_t _buf, 
-        const boost::system::error_code& err, 
+void DrillClientImpl::handleReadTimeout(const boost::system::error_code & err){
+    // if err == boost::asio::error::operation_aborted) then the caller cancelled the timer.
+    if(!err){
+        // Check whether the deadline has passed.
+        if (m_deadlineTimer.expires_at() <= boost::asio::deadline_timer::traits_type::now()){
+            // The deadline has passed.
+            handleQryError(QRY_COMM_ERROR, getMessage(ERR_QRY_TIMOUT), NULL);
+            // There is no longer an active deadline. The expiry is set to positive
+            // infinity so that the timer never expires until a new deadline is set.
+            // Note that at this time, the caller is not in a (async) wait for the timer.
+            m_deadlineTimer.expires_at(boost::posix_time::pos_infin);
+            DRILL_LOG(LOG_TRACE) << "Deadline timer expired."  << std::endl;
+            // Cancel all pending async IOs.
+            // The cancel call _MAY_ not work on all platforms. To be a little more reliable we need
+            // to have the BOOST_ASIO_ENABLE_CANCELIO macro (as well as the BOOST_ASIO_DISABLE_IOCP macro?)
+            // defined. To be really sure, we need to close the socket. Closing the socket is a bit
+            // drastic and we will defer that till a later release.
+            m_socket.cancel();
+        }
+    }
+    return;
+}
+
+void DrillClientImpl::handleRead(ByteBuf_t _buf,
+        const boost::system::error_code& err,
         size_t bytes_transferred) {
     boost::system::error_code error=err;
+    // cancel the timer
+    m_deadlineTimer.cancel();
+    DRILL_LOG(LOG_TRACE) << "Deadline timer cancelled."  << std::endl;
     if(!error){
         InBoundRpcMessage msg;
 
-        BOOST_LOG_TRIVIAL(trace) << "Getting new message" << std::endl;
+        DRILL_LOG(LOG_TRACE) << "Getting new message" << std::endl;
+        ByteBuf_t allocatedBuffer=NULL;
 
-        if(readMsg(_buf, msg, error)!=QRY_SUCCESS){
+        if(readMsg(_buf, &allocatedBuffer, msg, error)!=QRY_SUCCESS){
             if(m_pendingRequests!=0){
                 boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
                 getNextResult();
             }
             return;
-        } 
+        }
 
         if(!error && msg.m_rpc_type==exec::user::QUERY_RESULT){
-            if(processQueryResult(msg)!=QRY_SUCCESS){
+            if(processQueryResult(allocatedBuffer, msg)!=QRY_SUCCESS){
                 if(m_pendingRequests!=0){
                     boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
                     getNextResult();
                 }
                 return;
             }
-        }else if(!error && msg.m_rpc_type==exec::user::QUERY_HANDLE){ 
-            if(processQueryId(msg)!=QRY_SUCCESS){
+        }else if(!error && msg.m_rpc_type==exec::user::QUERY_HANDLE){
+            if(processQueryId(allocatedBuffer, msg)!=QRY_SUCCESS){
                 if(m_pendingRequests!=0){
                     boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
                     getNextResult();
@@ -436,15 +596,15 @@ void DrillClientImpl::handleRead(ByteBuf_t _buf,
         }else{
             boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
             if(error){
-                // We have a socket read error, but we do not know which query this is for. 
+                // We have a socket read error, but we do not know which query this is for.
                 // Signal ALL pending queries that they should stop waiting.
-                BOOST_LOG_TRIVIAL(trace) << "read error: " << error << "\n";
+                DRILL_LOG(LOG_TRACE) << "read error: " << error << std::endl;
                 handleQryError(QRY_COMM_ERROR, getMessage(ERR_QRY_COMMERR, error.message().c_str()), NULL);
                 return;
             }else{
                 //If not QUERY_RESULT, then we think something serious has gone wrong?
                 assert(0);
-                BOOST_LOG_TRIVIAL(trace) << "QueryResult returned " << msg.m_rpc_type;
+                DRILL_LOG(LOG_TRACE) << "QueryResult returned " << msg.m_rpc_type << std::endl;
                 handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVRPCTYPE, msg.m_rpc_type), NULL);
                 return;
             }
@@ -455,6 +615,7 @@ void DrillClientImpl::handleRead(ByteBuf_t _buf,
         }
     }else{
         // boost error
+        Utils::freeBuffer(_buf);
         boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
         handleQryError(QRY_COMM_ERROR, getMessage(ERR_QRY_COMMERR, error.message().c_str()), NULL);
         return;
@@ -485,6 +646,7 @@ status_t DrillClientImpl::validateMessage(InBoundRpcMessage& msg, exec::user::Qu
 connectionStatus_t DrillClientImpl::handleConnError(connectionStatus_t status, std::string msg){
     DrillClientError* pErr = new DrillClientError(status, DrillClientError::CONN_ERROR_START+status, msg);
     m_pendingRequests=0;
+    if(m_pError!=NULL){ delete m_pError; m_pError=NULL;}
     m_pError=pErr;
     broadcastError(this->m_pError);
     return status;
@@ -492,19 +654,20 @@ connectionStatus_t DrillClientImpl::handleConnError(connectionStatus_t status, s
 
 status_t DrillClientImpl::handleQryError(status_t status, std::string msg, DrillClientQueryResult* pQueryResult){
     DrillClientError* pErr = new DrillClientError(status, DrillClientError::QRY_ERROR_START+status, msg);
+    if(m_pError!=NULL){ delete m_pError; m_pError=NULL;}
+    m_pError=pErr;
     if(pQueryResult!=NULL){
         m_pendingRequests--;
         pQueryResult->signalError(pErr);
     }else{
         m_pendingRequests=0;
-        m_pError=pErr;
         broadcastError(this->m_pError);
     }
     return status;
 }
 
-status_t DrillClientImpl::handleQryError(status_t status, 
-        const exec::shared::DrillPBError& e, 
+status_t DrillClientImpl::handleQryError(status_t status,
+        const exec::shared::DrillPBError& e,
         DrillClientQueryResult* pQueryResult){
     assert(pQueryResult!=NULL);
     this->m_pError = DrillClientError::getErrorObject(e);
@@ -516,8 +679,10 @@ status_t DrillClientImpl::handleQryError(status_t status,
 void DrillClientImpl::broadcastError(DrillClientError* pErr){
     if(pErr!=NULL){
         std::map<int, DrillClientQueryResult*>::iterator iter;
-        for(iter = m_queryIds.begin(); iter != m_queryIds.end(); iter++) {
-            iter->second->signalError(pErr);
+        if(!m_queryIds.empty()){
+            for(iter = m_queryIds.begin(); iter != m_queryIds.end(); iter++) {
+                iter->second->signalError(pErr);
+            }
         }
     }
     return;
@@ -526,17 +691,21 @@ void DrillClientImpl::broadcastError(DrillClientError* pErr){
 void DrillClientImpl::clearMapEntries(DrillClientQueryResult* pQueryResult){
     std::map<int, DrillClientQueryResult*>::iterator iter;
     boost::lock_guard<boost::mutex> lock(m_dcMutex);
-    for(iter=m_queryIds.begin(); iter!=m_queryIds.end(); iter++) {
-        if(pQueryResult==(DrillClientQueryResult*)iter->second){
-            m_queryIds.erase(iter->first);
-            break;
+    if(!m_queryIds.empty()){
+        for(iter=m_queryIds.begin(); iter!=m_queryIds.end(); iter++) {
+            if(pQueryResult==(DrillClientQueryResult*)iter->second){
+                m_queryIds.erase(iter->first);
+                break;
+            }
         }
     }
-    std::map<exec::shared::QueryId*, DrillClientQueryResult*, compareQueryId>::iterator it;
-    for(it=m_queryResults.begin(); it!=m_queryResults.end(); it++) {
-        if(pQueryResult==(DrillClientQueryResult*)it->second){
-            m_queryResults.erase(it->first);
-            break;
+    if(!m_queryResults.empty()){
+        std::map<exec::shared::QueryId*, DrillClientQueryResult*, compareQueryId>::iterator it;
+        for(it=m_queryResults.begin(); it!=m_queryResults.end(); it++) {
+            if(pQueryResult==(DrillClientQueryResult*)it->second){
+                m_queryResults.erase(it->first);
+                break;
+            }
         }
     }
 }
@@ -547,7 +716,7 @@ void DrillClientImpl::sendAck(InBoundRpcMessage& msg){
     OutBoundRpcMessage ack_msg(exec::rpc::RESPONSE, exec::user::ACK, msg.m_coord_id, &ack);
     boost::lock_guard<boost::mutex> lock(m_dcMutex);
     sendSync(ack_msg);
-    BOOST_LOG_TRIVIAL(trace) << "ACK sent" << std::endl;
+    DRILL_LOG(LOG_TRACE) << "ACK sent" << std::endl;
 }
 
 void DrillClientImpl::sendCancel(InBoundRpcMessage& msg){
@@ -556,34 +725,36 @@ void DrillClientImpl::sendCancel(InBoundRpcMessage& msg){
     OutBoundRpcMessage ack_msg(exec::rpc::RESPONSE, exec::user::CANCEL_QUERY, msg.m_coord_id, &ack);
     boost::lock_guard<boost::mutex> lock(m_dcMutex);
     sendSync(ack_msg);
-    BOOST_LOG_TRIVIAL(trace) << "CANCEL sent" << std::endl;
+    DRILL_LOG(LOG_TRACE) << "CANCEL sent" << std::endl;
 }
 
-// This COPIES the FieldMetadata definition for the record batch.  ColumnDefs held by this 
+// This COPIES the FieldMetadata definition for the record batch.  ColumnDefs held by this
 // class are used by the async callbacks.
 status_t DrillClientQueryResult::setupColumnDefs(exec::user::QueryResult* pQueryResult) {
     bool hasSchemaChanged=false;
+    bool isFirstIter=false;
     boost::lock_guard<boost::mutex> schLock(this->m_schemaMutex);
 
-    std::vector<Drill::FieldMetadata*> prevSchema=this->m_columnDefs;
+    FieldDefPtr prevSchema=this->m_columnDefs;
+    isFirstIter=this->m_numBatches==1?true:false;
     std::map<std::string, Drill::FieldMetadata*> oldSchema;
-    for(std::vector<Drill::FieldMetadata*>::iterator it = prevSchema.begin(); it != prevSchema.end(); ++it){
-        // the key is the field_name + type
-        char type[256];
-        sprintf(type, ":%d:%d",(*it)->getMinorType(), (*it)->getDataMode() );
-        std::string k= (*it)->getName()+type;
-        oldSchema[k]=*it;
+    if(!m_columnDefs->empty()){
+        for(std::vector<Drill::FieldMetadata*>::iterator it = prevSchema->begin(); it != prevSchema->end(); ++it){
+            // the key is the field_name + type
+            char type[256];
+            sprintf(type, ":%d:%d",(*it)->getMinorType(), (*it)->getDataMode() );
+            std::string k= (*it)->getName()+type;
+            oldSchema[k]=*it;
+        }
     }
-
-    m_columnDefs.clear();
+    m_columnDefs->clear();
     size_t numFields=pQueryResult->def().field_size();
     for(size_t i=0; i<numFields; i++){
-        //TODO: free this??
         Drill::FieldMetadata* fmd= new Drill::FieldMetadata;
         fmd->set(pQueryResult->def().field(i));
-        this->m_columnDefs.push_back(fmd);
+        this->m_columnDefs->push_back(fmd);
 
-        //Look for changes in the vector and trigger a Schema change event if necessary. 
+        //Look for changes in the vector and trigger a Schema change event if necessary.
         //If vectors are different, then call the schema change listener.
         char type[256];
         sprintf(type, ":%d:%d",fmd->getMinorType(), fmd->getDataMode() );
@@ -601,22 +772,27 @@ status_t DrillClientQueryResult::setupColumnDefs(exec::user::QueryResult* pQuery
     }
 
     //free memory allocated for FieldMetadata objects saved in previous columnDefs;
-    for(std::vector<Drill::FieldMetadata*>::iterator it = prevSchema.begin(); it != prevSchema.end(); ++it){
-        delete *it;    
+    if(!prevSchema->empty()){
+        for(std::vector<Drill::FieldMetadata*>::iterator it = prevSchema->begin(); it != prevSchema->end(); ++it){
+            delete *it;
+        }
     }
-    prevSchema.clear();
-    this->m_bHasSchemaChanged=hasSchemaChanged;
-    if(hasSchemaChanged){
-        //TODO: invoke schema change Listener
+    prevSchema->clear();
+    this->m_bHasSchemaChanged=hasSchemaChanged&&!isFirstIter;
+    if(this->m_bHasSchemaChanged){
+        //invoke schema change Listener
+        if(m_pSchemaListener!=NULL){
+            m_pSchemaListener(this, m_columnDefs, NULL);
+        }
     }
-    return hasSchemaChanged?QRY_SUCCESS_WITH_INFO:QRY_SUCCESS;
+    return this->m_bHasSchemaChanged?QRY_SUCCESS_WITH_INFO:QRY_SUCCESS;
 }
 
-status_t DrillClientQueryResult::defaultQueryResultsListener(void* ctx,  
-        RecordBatch* b, 
+status_t DrillClientQueryResult::defaultQueryResultsListener(void* ctx,
+        RecordBatch* b,
         DrillClientError* err) {
     //ctx; // unused, we already have the this pointer
-    BOOST_LOG_TRIVIAL(trace) << "Query result listener called" << std::endl;
+    DRILL_LOG(LOG_TRACE) << "Query result listener called" << std::endl;
     //check if the query has been canceled. IF so then return FAILURE. Caller will send cancel to the server.
     if(this->m_bCancel){
         return QRY_FAILURE;
@@ -625,7 +801,7 @@ status_t DrillClientQueryResult::defaultQueryResultsListener(void* ctx,
         // signal the cond var
         {
             #ifdef DEBUG
-            BOOST_LOG_TRIVIAL(debug)<<debugPrintQid(b->getQueryResult()->query_id())  
+            DRILL_LOG(LOG_DEBUG)<<debugPrintQid(b->getQueryResult()->query_id())
                 << "Query result listener saved result to queue." << std::endl;
             #endif
             boost::lock_guard<boost::mutex> cvLock(this->m_cvMutex);
@@ -644,7 +820,7 @@ RecordBatch*  DrillClientQueryResult::peekNext() {
     //if no more data, return NULL;
     if(!m_bIsQueryPending) return NULL;
     boost::unique_lock<boost::mutex> cvLock(this->m_cvMutex);
-    BOOST_LOG_TRIVIAL(trace) << "Synchronous read waiting for data." << std::endl;
+    DRILL_LOG(LOG_TRACE) << "Synchronous read waiting for data." << std::endl;
     while(!this->m_bHasData && !m_bHasError) {
         this->m_cv.wait(cvLock);
     }
@@ -656,11 +832,17 @@ RecordBatch*  DrillClientQueryResult::peekNext() {
 RecordBatch*  DrillClientQueryResult::getNext() {
     RecordBatch* pRecordBatch=NULL;
     //if no more data, return NULL;
-    if(!m_bIsQueryPending) return NULL;
+    if(!m_bIsQueryPending){
+        DRILL_LOG(LOG_TRACE) << "Query is done." << std::endl;
+        if(!m_recordBatches.empty()){
+            DRILL_LOG(LOG_TRACE) << " But there is a Record batch left behind." << std::endl;
+        }
+        return NULL;
+    }
 
     boost::unique_lock<boost::mutex> cvLock(this->m_cvMutex);
-    BOOST_LOG_TRIVIAL(trace) << "Synchronous read waiting for data." << std::endl;
-    while(!this->m_bHasData && !m_bHasError) {
+    DRILL_LOG(LOG_TRACE) << "Synchronous read waiting for data." << std::endl;
+    while(!this->m_bHasData && !m_bHasError){
         this->m_cv.wait(cvLock);
     }
     // remove first element from queue
@@ -709,33 +891,49 @@ void DrillClientQueryResult::signalError(DrillClientError* pErr){
 }
 
 void DrillClientQueryResult::clearAndDestroy(){
-    if(this->m_pQueryId!=NULL){
-        delete this->m_pQueryId; this->m_pQueryId=NULL;
-    }
     //free memory allocated for FieldMetadata objects saved in m_columnDefs;
-    for(std::vector<Drill::FieldMetadata*>::iterator it = m_columnDefs.begin(); it != m_columnDefs.end(); ++it){
-        delete *it;    
+    if(!m_columnDefs->empty()){
+        for(std::vector<Drill::FieldMetadata*>::iterator it = m_columnDefs->begin(); it != m_columnDefs->end(); ++it){
+            delete *it;
+        }
+        m_columnDefs->clear();
     }
-    m_columnDefs.clear();
     //Tell the parent to remove this from it's lists
     m_pClient->clearMapEntries(this);
+
+    //clear query id map entries.
+    if(this->m_pQueryId!=NULL){
+        delete this->m_pQueryId; this->m_pQueryId=NULL;
+    }
+    if(!m_recordBatches.empty()){
+        // When multiple qwueries execute in parallel we sometimes get an empty record batch back from the servrer _after_
+        // the last chunk has been received. We eventually delete it.
+        DRILL_LOG(LOG_TRACE) << "Freeing Record batch(es) left behind "<< std::endl;
+        RecordBatch* pR=NULL;
+        while(!m_recordBatches.empty()){
+            pR=m_recordBatches.front();
+            m_recordBatches.pop();
+            delete pR;
+        }
+    }
 }
 
-char ZookeeperImpl::s_drillRoot[]="/drill/drillbits1";
+char ZookeeperImpl::s_drillRoot[]="/drill/";
+char ZookeeperImpl::s_defaultCluster[]="drillbits1";
 
-ZookeeperImpl::ZookeeperImpl(){ 
+ZookeeperImpl::ZookeeperImpl(){
     m_pDrillbits=new String_vector;
     srand (time(NULL));
     m_bConnecting=true;
     memset(&m_id, 0, sizeof(m_id));
 }
 
-ZookeeperImpl::~ZookeeperImpl(){ 
+ZookeeperImpl::~ZookeeperImpl(){
     delete m_pDrillbits;
 }
 
 ZooLogLevel ZookeeperImpl::getZkLogLevel(){
-    //typedef enum {ZOO_LOG_LEVEL_ERROR=1, 
+    //typedef enum {ZOO_LOG_LEVEL_ERROR=1,
     //    ZOO_LOG_LEVEL_WARN=2,
     //    ZOO_LOG_LEVEL_INFO=3,
     //    ZOO_LOG_LEVEL_DEBUG=4
@@ -752,11 +950,11 @@ ZooLogLevel ZookeeperImpl::getZkLogLevel(){
         case LOG_FATAL:
         default:
             return ZOO_LOG_LEVEL_ERROR;
-    } 
+    }
     return ZOO_LOG_LEVEL_ERROR;
 }
 
-int ZookeeperImpl::connectToZookeeper(const char* connectStr){
+int ZookeeperImpl::connectToZookeeper(const char* connectStr, const char* pathToDrill){
     uint32_t waitTime=30000; // 10 seconds
     zoo_set_debug_level(getZkLogLevel());
     zoo_deterministic_conn_order(1); // enable deterministic order
@@ -780,24 +978,31 @@ int ZookeeperImpl::connectToZookeeper(const char* connectStr){
         return CONN_FAILURE;
     }
     int rc = ZOK;
-    rc=zoo_get_children(m_zh, (char*)s_drillRoot, 0, m_pDrillbits);
+    char rootDir[MAX_CONNECT_STR+1];
+    if(pathToDrill==NULL || strlen(pathToDrill)==0){
+        strcpy(rootDir, (char*)s_drillRoot);
+        strcat(rootDir, s_defaultCluster);
+    }else{
+        strncpy(rootDir, pathToDrill, MAX_CONNECT_STR); rootDir[MAX_CONNECT_STR]=0;
+    }
+    rc=zoo_get_children(m_zh, (char*)rootDir, 0, m_pDrillbits);
     if(rc!=ZOK){
         m_err=getMessage(ERR_CONN_ZKERR, rc);
         zookeeper_close(m_zh);
         return -1;
     }
 
-
     //Let's pick a random drillbit.
     if(m_pDrillbits && m_pDrillbits->count >0){
         int r=rand()%(this->m_pDrillbits->count);
         assert(r<this->m_pDrillbits->count);
         char * bit=this->m_pDrillbits->data[r];
         std::string s;
-        s=s_drillRoot +  std::string("/") + bit;
-        int buffer_len=1024;
-        char buffer[1024];
+        s=rootDir +  std::string("/") + bit;
+        int buffer_len=MAX_CONNECT_STR;
+        char buffer[MAX_CONNECT_STR+1];
         struct Stat stat;
+        buffer[MAX_CONNECT_STR]=0;
         rc= zoo_get(m_zh, s.c_str(), 0, buffer,  &buffer_len, &stat);
         if(rc!=ZOK){
             m_err=getMessage(ERR_CONN_ZKDBITERR, rc);
@@ -840,7 +1045,7 @@ void ZookeeperImpl::watcher(zhandle_t *zzh, int type, int state, const char *pat
     // signal the cond var
     {
         if (state == ZOO_CONNECTED_STATE){
-            BOOST_LOG_TRIVIAL(trace) << "Connected to Zookeeper." << std::endl;
+            DRILL_LOG(LOG_TRACE) << "Connected to Zookeeper." << std::endl;
         }
         boost::lock_guard<boost::mutex> bufferLock(self->m_cvMutex);
         self->m_bConnecting=false;
@@ -850,7 +1055,7 @@ void ZookeeperImpl::watcher(zhandle_t *zzh, int type, int state, const char *pat
 
 void ZookeeperImpl:: debugPrint(){
     if(m_zh!=NULL && m_state==ZOO_CONNECTED_STATE){
-        BOOST_LOG_TRIVIAL(trace) << m_drillServiceInstance.DebugString();
+        DRILL_LOG(LOG_TRACE) << m_drillServiceInstance.DebugString() << std::endl;
     }
 }