You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2014/06/20 22:24:51 UTC
[15/32] git commit: DRILL-875: Fixes for DRILL-707, DRILL-780,
DRILL-835 (Schema change), DRILL-852, DRILL-876, DRILL_877, DRILL-878,
DRILL-890
DRILL-875: Fixes for DRILL-707, DRILL-780, DRILL-835 (Schema change), DRILL-852, DRILL-876, DRILL_877, DRILL-878, DRILL-890
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/aaa4db74
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/aaa4db74
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/aaa4db74
Branch: refs/heads/master
Commit: aaa4db74b215e03ad0e1334cfc18964972d93a3b
Parents: ff39fb8
Author: Parth Chandra <pc...@maprtech.com>
Authored: Fri May 30 11:17:40 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu Jun 19 20:29:53 2014 -0700
----------------------------------------------------------------------
contrib/native/client/CMakeLists.txt | 25 +-
.../native/client/example/querySubmitter.cpp | 75 +-
contrib/native/client/readme.linux | 96 +
.../parquet_scan_union_screen_physical.json | 2 +-
.../native/client/resources/simple_plan.json | 12 +-
contrib/native/client/scripts/fixProtodefs.sh | 2 +-
.../native/client/src/clientlib/CMakeLists.txt | 15 +-
.../client/src/clientlib/decimalUtils.cpp | 2 +-
.../native/client/src/clientlib/drillClient.cpp | 126 +-
.../client/src/clientlib/drillClientImpl.cpp | 539 +++-
.../client/src/clientlib/drillClientImpl.hpp | 163 +-
contrib/native/client/src/clientlib/errmsgs.cpp | 6 +-
contrib/native/client/src/clientlib/errmsgs.hpp | 44 +-
contrib/native/client/src/clientlib/logger.cpp | 69 +
contrib/native/client/src/clientlib/logger.hpp | 72 +
.../native/client/src/clientlib/recordBatch.cpp | 109 +-
.../native/client/src/clientlib/rpcDecoder.cpp | 2 +
.../native/client/src/clientlib/rpcEncoder.cpp | 3 +
contrib/native/client/src/clientlib/utils.hpp | 47 +
.../native/client/src/include/drill/common.hpp | 25 +-
.../client/src/include/drill/drillClient.hpp | 73 +-
.../client/src/include/drill/protobuf/User.pb.h | 40 +-
.../client/src/include/drill/recordBatch.hpp | 100 +-
.../native/client/src/protobuf/BitControl.pb.cc | 458 +--
.../native/client/src/protobuf/BitControl.pb.h | 370 +--
.../native/client/src/protobuf/CMakeLists.txt | 56 +-
contrib/native/client/src/protobuf/User.pb.cc | 80 +-
.../client/src/protobuf/UserBitShared.pb.cc | 2618 ++++++++++++++-
.../client/src/protobuf/UserBitShared.pb.h | 2969 ++++++++++++++----
29 files changed, 6260 insertions(+), 1938 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/aaa4db74/contrib/native/client/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/contrib/native/client/CMakeLists.txt b/contrib/native/client/CMakeLists.txt
index a306780..9ac705b 100644
--- a/contrib/native/client/CMakeLists.txt
+++ b/contrib/native/client/CMakeLists.txt
@@ -15,7 +15,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-cmake_minimum_required(VERSION 2.8)
+cmake_minimum_required(VERSION 2.6)
project(drillclient)
@@ -26,20 +26,21 @@ set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_SOURCE_DIR}/cmakeModules/")
# Find Boost
-set(Boost_USE_STATIC_LIBS OFF)
-set(Boost_USE_MULTITHREADED ON)
-set(Boost_USE_STATIC_RUNTIME OFF)
-find_package(Boost REQUIRED COMPONENTS regex system date_time chrono thread log log_setup)
+set(Boost_USE_STATIC_LIBS OFF)
+set(Boost_USE_MULTITHREADED ON)
+set(Boost_USE_STATIC_RUNTIME OFF)
+find_package(Boost 1.53.0 REQUIRED COMPONENTS regex system date_time chrono thread )
include_directories(${Boost_INCLUDE_DIRS})
-if(CMAKE_COMPILER_IS_GNUCXX)
- set(CMAKE_EXE_LINKER_FLAGS "-lrt -lpthread")
-endif()
+if(CMAKE_COMPILER_IS_GNUCXX)
+ set(CMAKE_EXE_LINKER_FLAGS "-lrt -lpthread")
+ set(CMAKE_CXX_FLAGS "-fPIC")
+endif()
add_definitions(-DBOOST_ALL_DYN_LINK)
# Find Protobufs
-find_package(Protobuf REQUIRED)
+find_package(Protobuf REQUIRED )
include_directories(${PROTOBUF_INCLUDE_DIR})
#Find Zookeeper
@@ -50,13 +51,13 @@ find_package(Zookeeper REQUIRED )
#
# Preprocess to fix protobuf .proto definitions
-add_subdirectory("${CMAKE_SOURCE_DIR}/src/protobuf")
+add_subdirectory("${CMAKE_SOURCE_DIR}/src/protobuf")
# protobuf includes are required by clientlib
include_directories(${ProtoHeadersDir})
include_directories(${ProtoIncludesDir})
# Build the Client Library as a shared library
-add_subdirectory("${CMAKE_SOURCE_DIR}/src/clientlib")
+add_subdirectory("${CMAKE_SOURCE_DIR}/src/clientlib")
include_directories(${CMAKE_SOURCE_DIR}/src/include ${Zookeeper_INCLUDE_DIRS} )
# add a DEBUG preprocessor macro
@@ -68,7 +69,7 @@ set_property(
# Link directory
link_directories(/usr/local/lib)
-add_executable(querySubmitter example/querySubmitter.cpp )
+add_executable(querySubmitter example/querySubmitter.cpp )
target_link_libraries(querySubmitter ${Boost_LIBRARIES} ${PROTOBUF_LIBRARY} drillClient protomsgs )
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/aaa4db74/contrib/native/client/example/querySubmitter.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/example/querySubmitter.cpp b/contrib/native/client/example/querySubmitter.cpp
index 96e2c65..9d24e68 100644
--- a/contrib/native/client/example/querySubmitter.cpp
+++ b/contrib/native/client/example/querySubmitter.cpp
@@ -20,9 +20,23 @@
#include <iostream>
#include <stdio.h>
#include <stdlib.h>
-#include <boost/asio.hpp>
#include "drill/drillc.hpp"
+Drill::status_t SchemaListener(void* ctx, Drill::FieldDefPtr fields, Drill::DrillClientError* err){
+ if(!err){
+ printf("SCHEMA CHANGE DETECTED:\n");
+ for(size_t i=0; i<fields->size(); i++){
+ std::string name= fields->at(i)->getName();
+ printf("%s\t", name.c_str());
+ }
+ printf("\n");
+ return Drill::QRY_SUCCESS ;
+ }else{
+ std::cerr<< "ERROR: " << err->msg << std::endl;
+ return Drill::QRY_FAILURE;
+ }
+}
+
Drill::status_t QueryResultsListener(void* ctx, Drill::RecordBatch* b, Drill::DrillClientError* err){
if(!err){
b->print(std::cout, 0); // print all rows
@@ -79,18 +93,19 @@ void print(const Drill::FieldMetadata* pFieldMetadata, void* buf, size_t sz){
return;
}
-int nOptions=5;
+int nOptions=6;
struct Option{
char name[32];
char desc[128];
bool required;
-}qsOptions[]= {
+}qsOptions[]= {
{"plan", "Plan files separated by semicolons", false},
{"query", "Query strings, separated by semicolons", false},
{"type", "Query type [physical|logical|sql]", true},
{"connectStr", "Connect string", true},
- {"api", "API type [sync|async]", true}
+ {"api", "API type [sync|async]", true},
+ {"logLevel", "Logging level [trace|debug|info|warn|error|fatal]", false}
};
std::map<std::string, std::string> qsOptionValues;
@@ -136,7 +151,7 @@ int parseArgs(int argc, char* argv[]){
}
}
}
- if(error){
+ if(error){
printUsage();
exit(1);
}
@@ -170,7 +185,7 @@ int readPlans(const std::string& planList, std::vector<std::string>& plans){
std::string plan((std::istreambuf_iterator<char>(f)), (std::istreambuf_iterator<char>()));
std::cout << "plan:" << plan << std::endl;
plans.push_back(plan);
- }
+ }
return 0;
}
@@ -201,6 +216,18 @@ bool validate(const std::string& type, const std::string& query, const std::stri
return true;
}
+Drill::logLevel_t getLogLevel(const char *s){
+ if(s!=NULL){
+ if(!strcmp(s, "trace")) return Drill::LOG_TRACE;
+ if(!strcmp(s, "debug")) return Drill::LOG_DEBUG;
+ if(!strcmp(s, "info")) return Drill::LOG_INFO;
+ if(!strcmp(s, "warn")) return Drill::LOG_WARNING;
+ if(!strcmp(s, "error")) return Drill::LOG_ERROR;
+ if(!strcmp(s, "fatal")) return Drill::LOG_FATAL;
+ }
+ return Drill::LOG_ERROR;
+}
+
int main(int argc, char* argv[]) {
try {
@@ -213,26 +240,29 @@ int main(int argc, char* argv[]) {
std::string planList=qsOptionValues["plan"];
std::string api=qsOptionValues["api"];
std::string type_str=qsOptionValues["type"];
+ std::string logLevel=qsOptionValues["logLevel"];
- exec::user::QueryType type;
+ exec::shared::QueryType type;
if(!validate(type_str, queryList, planList)){
exit(1);
}
+ Drill::logLevel_t l=getLogLevel(logLevel.c_str());
+
std::vector<std::string> queryInputs;
if(type_str=="sql" ){
readQueries(queryList, queryInputs);
- type=exec::user::SQL;
+ type=exec::shared::SQL;
}else if(type_str=="physical" ){
readPlans(planList, queryInputs);
- type=exec::user::PHYSICAL;
+ type=exec::shared::PHYSICAL;
}else if(type_str == "logical"){
readPlans(planList, queryInputs);
- type=exec::user::LOGICAL;
+ type=exec::shared::LOGICAL;
}else{
readQueries(queryList, queryInputs);
- type=exec::user::SQL;
+ type=exec::shared::SQL;
}
std::vector<std::string>::iterator queryInpIter;
@@ -245,9 +275,9 @@ int main(int argc, char* argv[]) {
Drill::DrillClient client;
// To log to file
- //DrillClient::initLogging("/var/log/drill/", LOG_INFO);
+ //DrillClient::initLogging("/var/log/drill/", l);
// To log to stderr
- Drill::DrillClient::initLogging(NULL, Drill::LOG_INFO);
+ Drill::DrillClient::initLogging(NULL, l);
if(client.connect(connectStr.c_str())!=Drill::CONN_SUCCESS){
std::cerr<< "Failed to connect with error: "<< client.getError() << " (Using:"<<connectStr<<")"<<std::endl;
@@ -269,26 +299,27 @@ int main(int argc, char* argv[]) {
// get fields.
row=0;
Drill::RecordIterator* pRecIter=*recordIterIter;
- std::vector<Drill::FieldMetadata*>& fields = pRecIter->getColDefs();
- while((ret=pRecIter->next())==Drill::QRY_SUCCESS){
+ Drill::FieldDefPtr fields= pRecIter->getColDefs();
+ while((ret=pRecIter->next()), ret==Drill::QRY_SUCCESS || ret==Drill::QRY_SUCCESS_WITH_INFO){
+ fields = pRecIter->getColDefs();
row++;
- if(row%4095==0){
- for(size_t i=0; i<fields.size(); i++){
- std::string name= fields[i]->getName();
+ if( (ret==Drill::QRY_SUCCESS_WITH_INFO && pRecIter->hasSchemaChanged() )|| ( row%100==1)){
+ for(size_t i=0; i<fields->size(); i++){
+ std::string name= fields->at(i)->getName();
printf("%s\t", name.c_str());
}
printf("\n");
}
printf("ROW: %ld\t", row);
- for(size_t i=0; i<fields.size(); i++){
+ for(size_t i=0; i<fields->size(); i++){
void* pBuf; size_t sz;
pRecIter->getCol(i, &pBuf, &sz);
- print(fields[i], pBuf, sz);
+ print(fields->at(i), pBuf, sz);
}
printf("\n");
}
if(ret!=Drill::QRY_NO_MORE_DATA){
- std::cerr<< pRecIter->getError();
+ std::cerr<< pRecIter->getError() << std::endl;
}
client.freeQueryIterator(&pRecIter);
}
@@ -296,11 +327,13 @@ int main(int argc, char* argv[]) {
for(queryInpIter = queryInputs.begin(); queryInpIter != queryInputs.end(); queryInpIter++) {
Drill::QueryHandle_t* qryHandle = new Drill::QueryHandle_t;
client.submitQuery(type, *queryInpIter, QueryResultsListener, NULL, qryHandle);
+ client.registerSchemaChangeListener(qryHandle, SchemaListener);
queryHandles.push_back(qryHandle);
}
client.waitForResults();
for(queryHandleIter = queryHandles.begin(); queryHandleIter != queryHandles.end(); queryHandleIter++) {
client.freeQueryResources(*queryHandleIter);
+ delete *queryHandleIter;
}
}
client.close();
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/aaa4db74/contrib/native/client/readme.linux
----------------------------------------------------------------------
diff --git a/contrib/native/client/readme.linux b/contrib/native/client/readme.linux
new file mode 100644
index 0000000..fbdb6e4
--- /dev/null
+++ b/contrib/native/client/readme.linux
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+CentOS 6.5 build
+
+Install Prerequisites
+---------------------
+0) Install development tools
+ $>yum groupinstall 'Development Tools'
+
+1) CMAKE 2.8
+ $> yum install cmake28
+
+2.1) Download protobuf 2.5 from :
+ http://rpm.pbone.net/index.php3/stat/4/idpl/23552166/dir/centos_6/com/protobuf-2.5.0-16.1.x86_64.rpm.html
+ http://rpm.pbone.net/index.php3/stat/4/idpl/23552167/dir/centos_6/com/protobuf-compiler-2.5.0-16.1.x86_64.rpm.html
+ http://rpm.pbone.net/index.php3/stat/4/idpl/23552169/dir/centos_6/com/protobuf-devel-2.5.0-16.1.x86_64.rpm.html
+
+2.2) Install Protobufs
+ $> sudo yum install protobuf
+ $> sudo yum install protobuf-compiler
+ $> sudo yum install protobuf-devel
+
+3)
+3.1) Install Zookeeper prerequisites
+ - autoconf 2.59 or greater (should be installed with dev tools)
+ - cppunit 1.10.x or higher
+
+3.1.1) install cppuint
+ $> sudo yum install cppunit
+ $> sudo yum install cppunit-devel
+
+3.2) Download Zookeeper from :
+ - http://apache.mirror.quintex.com/zookeeper/zookeeper-3.4.6/
+ - untar and then follow instructions in ZOOKEEPER_DIR/src/c/README to build and install the client libs
+
+3.3) run autoreconf
+ $> autoreconf -if
+
+3.4) Build Zookeeper libs
+ $> ./configure --enable-debug --with-syncapi --enable-static --enable-shared
+ $> make && sudo make install
+
+4) Install boost. The minumim version required is 1.53, which will probably have to be built from source
+
+ # Remove any previous boost
+ $> sudo yum -y erase boost
+
+ # fetch the boost source rpm and create binary rpms
+ $> wget ftp://ftp.icm.edu.pl/vol/rzm2/linux-fedora-secondary/development/rawhide/source/SRPMS/b/boost-1.53.0-6.fc19.src.rpm
+ $> rpmbuild --rebuild boost-1.53.0-6.fc19.src.rpm
+
+ #install the binary rpms
+ #(Note: the "rpm" utility does not clean up old versions very well.)
+ $> sudo yum -y install ~/rpmbuild/RPMS/x86_64/*
+
+OR
+ Download and build using boost build.
+ See this link for how to build: http://www.boost.org/doc/libs/1_53_0/more/getting_started/unix-variants.html#prepare-to-use-a-boost-library-binary
+
+
+Build drill client
+-------------------
+ $> cd DRILL_DIR/contrib/native/client
+ $> mkdir build
+ $> cd build && cmake28 -G "Unix Makefiles" -D CMAKE_BUILD_TYPE=Debug ..
+ $> make
+
+Test
+----
+Run query submitter from the command line
+ $> querySubmitter query='select * from dfs.`/Users/pchandra/work/data/tpc-h/customer.parquet`' type=sql connectStr=local=10.250.0.146:31010 api=async logLevel=trace
+
+Valgrind
+--------
+Examples to run Valgrind and see the log in Valkyrie
+ $> valgrind --leak-check=yes --xml=yes --xml-file=qs-vg-log-a.xml querySubmitter query='select LINEITEM from dfs.`/Users/pchandra/work/data/tpc-h/customer.parquet`' type=sql connectStr=local=10.250.0.146:31010 api=async logLevel=trace
+ $> valkyrie -l qs-vg-log-a.xml
+
+
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/aaa4db74/contrib/native/client/resources/parquet_scan_union_screen_physical.json
----------------------------------------------------------------------
diff --git a/contrib/native/client/resources/parquet_scan_union_screen_physical.json b/contrib/native/client/resources/parquet_scan_union_screen_physical.json
index e677b15..81a62a3 100644
--- a/contrib/native/client/resources/parquet_scan_union_screen_physical.json
+++ b/contrib/native/client/resources/parquet_scan_union_screen_physical.json
@@ -34,7 +34,7 @@
@id: 3,
child: 2,
pop: "screen"
- }
+ }
]
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/aaa4db74/contrib/native/client/resources/simple_plan.json
----------------------------------------------------------------------
diff --git a/contrib/native/client/resources/simple_plan.json b/contrib/native/client/resources/simple_plan.json
index ad3cdc7..8fede75 100644
--- a/contrib/native/client/resources/simple_plan.json
+++ b/contrib/native/client/resources/simple_plan.json
@@ -13,7 +13,7 @@
cp: {type: "classpath"}
},
query:[
-
+
{
@id:"1",
op: "scan",
@@ -38,7 +38,7 @@
input:"2",
op: "filter",
expr: "donuts.ppu < 1.00"
- },
+ },
{
@id:"4",
input:"3",
@@ -51,7 +51,7 @@
input:"4",
op: "collapsingaggregate",
within: "ppusegment",
- carryovers: ["donuts.ppu"],
+ carryovers: ["donuts.ppu"],
aggregations: [
{ ref: "donuts.typeCount", expr: "count(1)" },
{ ref: "donuts.quantity", expr: "sum(quantity)" },
@@ -65,7 +65,7 @@
orderings: [
{order: "DESC", expr: "donuts.ppu" }
]
- },
+ },
{
@id:"7",
input:"6",
@@ -80,7 +80,7 @@
op: "limit",
first: 0,
last: 100
- },
+ },
{
@id:"9",
input:"8",
@@ -88,7 +88,7 @@
memo: "output sink",
storageengine: "console",
target: {pipe: "STD_OUT"}
- }
+ }
]
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/aaa4db74/contrib/native/client/scripts/fixProtodefs.sh
----------------------------------------------------------------------
diff --git a/contrib/native/client/scripts/fixProtodefs.sh b/contrib/native/client/scripts/fixProtodefs.sh
index 7cb9710..f3ce781 100755
--- a/contrib/native/client/scripts/fixProtodefs.sh
+++ b/contrib/native/client/scripts/fixProtodefs.sh
@@ -43,7 +43,7 @@ main() {
if [ -e ${TARGDIR}/${FNAME} ]
then
- if [ ${SRCDIR}/${FNAME} -nt ${TARGDIR}/${FNAME} ]
+ if [ ${SRCDIR}/${FNAME} -nt ${TARGDIR}/${FNAME} ]
then
fixFile ${FNAME}
fi
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/aaa4db74/contrib/native/client/src/clientlib/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/CMakeLists.txt b/contrib/native/client/src/clientlib/CMakeLists.txt
index d07f930..7cd5dfb 100644
--- a/contrib/native/client/src/clientlib/CMakeLists.txt
+++ b/contrib/native/client/src/clientlib/CMakeLists.txt
@@ -18,14 +18,15 @@
# Drill Client library
-set (CLIENTLIB_SRC_FILES
+set (CLIENTLIB_SRC_FILES
${CMAKE_CURRENT_SOURCE_DIR}/decimalUtils.cpp
- ${CMAKE_CURRENT_SOURCE_DIR}/drillClient.cpp
- ${CMAKE_CURRENT_SOURCE_DIR}/drillClientImpl.cpp
- ${CMAKE_CURRENT_SOURCE_DIR}/recordBatch.cpp
- ${CMAKE_CURRENT_SOURCE_DIR}/rpcEncoder.cpp
- ${CMAKE_CURRENT_SOURCE_DIR}/rpcDecoder.cpp
- ${CMAKE_CURRENT_SOURCE_DIR}/errmsgs.cpp
+ ${CMAKE_CURRENT_SOURCE_DIR}/drillClient.cpp
+ ${CMAKE_CURRENT_SOURCE_DIR}/drillClientImpl.cpp
+ ${CMAKE_CURRENT_SOURCE_DIR}/recordBatch.cpp
+ ${CMAKE_CURRENT_SOURCE_DIR}/rpcEncoder.cpp
+ ${CMAKE_CURRENT_SOURCE_DIR}/rpcDecoder.cpp
+ ${CMAKE_CURRENT_SOURCE_DIR}/errmsgs.cpp
+ ${CMAKE_CURRENT_SOURCE_DIR}/logger.cpp
)
include_directories(${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_SOURCE_DIR}/../include )
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/aaa4db74/contrib/native/client/src/clientlib/decimalUtils.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/decimalUtils.cpp b/contrib/native/client/src/clientlib/decimalUtils.cpp
index 39439c5..3885faa 100644
--- a/contrib/native/client/src/clientlib/decimalUtils.cpp
+++ b/contrib/native/client/src/clientlib/decimalUtils.cpp
@@ -60,7 +60,7 @@ DecimalValue getDecimalValueFromByteBuf(SlicedByteBuf& data, size_t startIndex,
bool needsEndiannessSwap = !truncateScale;
// Initialize the BigDecimal, first digit in the ByteBuf has the sign so mask it out
- cpp_int decimalDigits = (needsEndiannessSwap ?
+ cpp_int decimalDigits = (needsEndiannessSwap ?
bswap_32(data.getUint32(startIndex)) & 0x7FFFFFFF :
(data.getUint32(startIndex) & 0x7FFFFFFF));
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/aaa4db74/contrib/native/client/src/clientlib/drillClient.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/drillClient.cpp b/contrib/native/client/src/clientlib/drillClient.cpp
index 9a9b919..10480b4 100644
--- a/contrib/native/client/src/clientlib/drillClient.cpp
+++ b/contrib/native/client/src/clientlib/drillClient.cpp
@@ -17,17 +17,11 @@
*/
-#include <boost/format.hpp>
-#include <boost/log/expressions.hpp>
-#include <boost/log/sinks/text_file_backend.hpp>
-#include <boost/log/utility/setup/file.hpp>
-#include <boost/log/utility/setup/common_attributes.hpp>
-#include <boost/log/sources/severity_logger.hpp>
-
#include "drill/drillClient.hpp"
#include "drill/recordBatch.hpp"
#include "drillClientImpl.hpp"
#include "errmsgs.hpp"
+#include "logger.hpp"
#include "Types.pb.h"
@@ -50,84 +44,96 @@ DrillClientInitializer::~DrillClientInitializer(){
logLevel_t DrillClientConfig::s_logLevel=LOG_ERROR;
uint64_t DrillClientConfig::s_bufferLimit=-1;
-boost::mutex DrillClientConfig::s_mutex;
+int32_t DrillClientConfig::s_socketTimeout=180;
+boost::mutex DrillClientConfig::s_mutex;
DrillClientConfig::DrillClientConfig(){
initLogging(NULL);
}
+DrillClientConfig::~DrillClientConfig(){
+ Logger::close();
+}
+
void DrillClientConfig::initLogging(const char* path){
- if(path!=NULL){
- std::string f=std::string(path)+"drill_clientlib_%N.log";
- try{
- boost::log::add_file_log
- (
- boost::log::keywords::file_name = f.c_str(),
- boost::log::keywords::rotation_size = 10 * 1024 * 1024,
- boost::log::keywords::time_based_rotation =
- boost::log::sinks::file::rotation_at_time_point(0, 0, 0),
- boost::log::keywords::format = "[%TimeStamp%]: %Message%"
- );
- }catch(std::exception& e){
- // do nothing. Logging will happen to stderr
- BOOST_LOG_TRIVIAL(error) << "Logging could not be initialized. Logging to stderr." ;
- }
- }
- boost::log::add_common_attributes();
- boost::log::core::get()->set_filter(boost::log::trivial::severity >= s_logLevel);
+ Logger::init(path);
}
void DrillClientConfig::setLogLevel(logLevel_t l){
- boost::lock_guard<boost::mutex> bufferLock(DrillClientConfig::s_mutex);
+ boost::lock_guard<boost::mutex> configLock(DrillClientConfig::s_mutex);
s_logLevel=l;
- boost::log::core::get()->set_filter(boost::log::trivial::severity >= s_logLevel);
+ Logger::s_level=l;
+ //boost::log::core::get()->set_filter(boost::log::trivial::severity >= s_logLevel);
}
void DrillClientConfig::setBufferLimit(uint64_t l){
- boost::lock_guard<boost::mutex> bufferLock(DrillClientConfig::s_mutex);
+ boost::lock_guard<boost::mutex> configLock(DrillClientConfig::s_mutex);
s_bufferLimit=l;
}
uint64_t DrillClientConfig::getBufferLimit(){
- boost::lock_guard<boost::mutex> bufferLock(DrillClientConfig::s_mutex);
+ boost::lock_guard<boost::mutex> configLock(DrillClientConfig::s_mutex);
return s_bufferLimit;
}
+void DrillClientConfig::setSocketTimeout(int32_t t){
+ boost::lock_guard<boost::mutex> configLock(DrillClientConfig::s_mutex);
+ s_socketTimeout=t;
+}
+
+int32_t DrillClientConfig::getSocketTimeout(){
+ boost::lock_guard<boost::mutex> configLock(DrillClientConfig::s_mutex);
+ return s_socketTimeout;
+}
+
logLevel_t DrillClientConfig::getLogLevel(){
- boost::lock_guard<boost::mutex> bufferLock(DrillClientConfig::s_mutex);
+ boost::lock_guard<boost::mutex> configLock(DrillClientConfig::s_mutex);
return s_logLevel;
}
RecordIterator::~RecordIterator(){
if(m_pColDefs!=NULL){
- for(std::vector<Drill::FieldMetadata*>::iterator it=m_pColDefs->begin();
- it!=m_pColDefs->end();
+ for(std::vector<Drill::FieldMetadata*>::iterator it=m_pColDefs->begin();
+ it!=m_pColDefs->end();
++it){
delete *it;
}
}
- delete this->m_pColDefs;
- this->m_pColDefs=NULL;
delete this->m_pQueryResult;
this->m_pQueryResult=NULL;
+ if(this->m_pCurrentRecordBatch!=NULL){
+ DRILL_LOG(LOG_TRACE) << "Deleted last Record batch " << (void*) m_pCurrentRecordBatch << std::endl;
+ delete this->m_pCurrentRecordBatch; this->m_pCurrentRecordBatch=NULL;
+ }
}
-std::vector<Drill::FieldMetadata*>& RecordIterator::getColDefs(){
+FieldDefPtr RecordIterator::getColDefs(){
if(m_pQueryResult->hasError()){
return DrillClientQueryResult::s_emptyColDefs;
}
//NOTE: if query is cancelled, return whatever you have. Client applications job to deal with it.
- if(this->m_pColDefs==NULL){
+ if(this->m_pColDefs==NULL || this->hasSchemaChanged()){
if(this->m_pCurrentRecordBatch==NULL){
this->m_pQueryResult->waitForData();
if(m_pQueryResult->hasError()){
return DrillClientQueryResult::s_emptyColDefs;
}
}
- std::vector<Drill::FieldMetadata*>* pColDefs = new std::vector<Drill::FieldMetadata*>;
+ if(this->hasSchemaChanged()){
+ if(m_pColDefs!=NULL){
+ for(std::vector<Drill::FieldMetadata*>::iterator it=m_pColDefs->begin();
+ it!=m_pColDefs->end();
+ ++it){
+ delete *it;
+ }
+ m_pColDefs->clear();
+ //delete m_pColDefs; m_pColDefs=NULL;
+ }
+ }
+ FieldDefPtr pColDefs( new std::vector<Drill::FieldMetadata*>);
{ //lock after we come out of the wait.
boost::lock_guard<boost::mutex> bufferLock(this->m_recordBatchMutex);
- std::vector<Drill::FieldMetadata*>& currentColDefs=DrillClientQueryResult::s_emptyColDefs;
+ boost::shared_ptr< std::vector<Drill::FieldMetadata*> > currentColDefs=DrillClientQueryResult::s_emptyColDefs;
if(this->m_pCurrentRecordBatch!=NULL){
currentColDefs=this->m_pCurrentRecordBatch->getColumnDefs();
}else{
@@ -138,7 +144,7 @@ std::vector<Drill::FieldMetadata*>& RecordIterator::getColDefs(){
currentColDefs=pR->getColumnDefs();
}
}
- for(std::vector<Drill::FieldMetadata*>::iterator it=currentColDefs.begin(); it!=currentColDefs.end(); ++it){
+ for(std::vector<Drill::FieldMetadata*>::iterator it=currentColDefs->begin(); it!=currentColDefs->end(); ++it){
Drill::FieldMetadata* fmd= new Drill::FieldMetadata;
fmd->copy(*(*it));//Yup, that's 2 stars
pColDefs->push_back(fmd);
@@ -146,7 +152,7 @@ std::vector<Drill::FieldMetadata*>& RecordIterator::getColDefs(){
}
this->m_pColDefs = pColDefs;
}
- return *this->m_pColDefs;
+ return this->m_pColDefs;
}
status_t RecordIterator::next(){
@@ -160,12 +166,19 @@ status_t RecordIterator::next(){
if(!this->m_pQueryResult->isCancelled()){
if(this->m_pCurrentRecordBatch==NULL || this->m_currentRecord==this->m_pCurrentRecordBatch->getNumRecords()){
boost::lock_guard<boost::mutex> bufferLock(this->m_recordBatchMutex);
- delete this->m_pCurrentRecordBatch; //free the previous record batch
+ if(this->m_pCurrentRecordBatch !=NULL){
+ DRILL_LOG(LOG_TRACE) << "Deleted old Record batch " << (void*) m_pCurrentRecordBatch << std::endl;
+ delete this->m_pCurrentRecordBatch; //free the previous record batch
+ }
this->m_currentRecord=0;
this->m_pCurrentRecordBatch=this->m_pQueryResult->getNext();
- BOOST_LOG_TRIVIAL(trace) << "Fetched new Record batch " ;
+ if(this->m_pCurrentRecordBatch != NULL){
+ DRILL_LOG(LOG_TRACE) << "Fetched new Record batch " << std::endl;
+ }else{
+ DRILL_LOG(LOG_TRACE) << "No new Record batch found " << std::endl;
+ }
if(this->m_pCurrentRecordBatch==NULL || this->m_pCurrentRecordBatch->getNumRecords()==0){
- BOOST_LOG_TRIVIAL(trace) << "No more data." ;
+ DRILL_LOG(LOG_TRACE) << "No more data." << std::endl;
ret = QRY_NO_MORE_DATA;
}else if(this->m_pCurrentRecordBatch->hasSchemaChanged()){
ret=QRY_SUCCESS_WITH_INFO;
@@ -213,11 +226,16 @@ status_t RecordIterator::cancel(){
return QRY_CANCEL;
}
-void RecordIterator::registerSchemaChangeListener(pfnSchemaListener* l){
- //TODO:
+bool RecordIterator::hasSchemaChanged(){
+ return m_currentRecord==0 && m_pCurrentRecordBatch!=NULL && m_pCurrentRecordBatch->hasSchemaChanged();
}
-std::string& RecordIterator::getError(){
+void RecordIterator::registerSchemaChangeListener(pfnSchemaListener l){
+ assert(m_pQueryResult!=NULL);
+ this->m_pQueryResult->registerSchemaChangeListener(l);
+}
+
+const std::string& RecordIterator::getError(){
return m_pQueryResult->getError()->msg;
}
@@ -243,7 +261,7 @@ connectionStatus_t DrillClient::connect(const char* connectStr ){
ret=this->m_pImpl->connect(connectStr);
if(ret==CONN_SUCCESS)
- ret=this->m_pImpl->ValidateHandShake()?CONN_SUCCESS:CONN_HANDSHAKE_FAILED;
+ ret=this->m_pImpl->validateHandShake()?CONN_SUCCESS:CONN_HANDSHAKE_FAILED;
return ret;
}
@@ -256,13 +274,13 @@ void DrillClient::close() {
this->m_pImpl->Close();
}
-status_t DrillClient::submitQuery(exec::user::QueryType t, const std::string& plan, pfnQueryResultsListener listener, void* listenerCtx, QueryHandle_t* qHandle){
+status_t DrillClient::submitQuery(::exec::shared::QueryType t, const std::string& plan, pfnQueryResultsListener listener, void* listenerCtx, QueryHandle_t* qHandle){
DrillClientQueryResult* pResult=this->m_pImpl->SubmitQuery(t, plan, listener, listenerCtx);
*qHandle=(QueryHandle_t)pResult;
- return QRY_SUCCESS;
+ return QRY_SUCCESS;
}
-RecordIterator* DrillClient::submitQuery(exec::user::QueryType t, const std::string& plan, DrillClientError* err){
+RecordIterator* DrillClient::submitQuery(::exec::shared::QueryType t, const std::string& plan, DrillClientError* err){
RecordIterator* pIter=NULL;
DrillClientQueryResult* pResult=this->m_pImpl->SubmitQuery(t, plan, NULL, NULL);
if(pResult){
@@ -280,6 +298,12 @@ void DrillClient::waitForResults(){
this->m_pImpl->waitForResults();
}
+void DrillClient::registerSchemaChangeListener(QueryHandle_t* handle, pfnSchemaListener l){
+ if(handle!=NULL){
+ ((DrillClientQueryResult*)(*handle))->registerSchemaChangeListener(l);
+ }
+}
+
void DrillClient::freeQueryResources(QueryHandle_t* handle){
delete (DrillClientQueryResult*)(*handle);
*handle=NULL;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/aaa4db74/contrib/native/client/src/clientlib/drillClientImpl.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/drillClientImpl.cpp b/contrib/native/client/src/clientlib/drillClientImpl.cpp
index 14b4f7d..0767396 100644
--- a/contrib/native/client/src/clientlib/drillClientImpl.cpp
+++ b/contrib/native/client/src/clientlib/drillClientImpl.cpp
@@ -20,53 +20,73 @@
#include <string.h>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
+#include <boost/date_time/posix_time/posix_time_duration.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/thread.hpp>
-#include <boost/log/trivial.hpp>
#include <zookeeper/zookeeper.h>
#include "drill/drillClient.hpp"
#include "drill/recordBatch.hpp"
#include "drillClientImpl.hpp"
#include "errmsgs.hpp"
+#include "logger.hpp"
#include "rpcEncoder.hpp"
#include "rpcDecoder.hpp"
#include "rpcMessage.hpp"
+#include "utils.hpp"
#include "GeneralRPC.pb.h"
#include "UserBitShared.pb.h"
-#ifdef DEBUG
-#define BOOST_ASIO_ENABLE_HANDLER_TRACKING
-#endif
-
-
namespace Drill{
RpcEncoder DrillClientImpl::s_encoder;
RpcDecoder DrillClientImpl::s_decoder;
std::string debugPrintQid(const exec::shared::QueryId& qid){
- return std::string("[")+boost::lexical_cast<std::string>(qid.part1()) +std::string(":") + boost::lexical_cast<std::string>(qid.part2())+std::string("] ");
+ return std::string("[")+boost::lexical_cast<std::string>(qid.part1()) +std::string(":") + boost::lexical_cast<std::string>(qid.part2())+std::string("] ");
}
-void DrillClientImpl::parseConnectStr(const char* connectStr, std::string& protocol, std::string& hostPortStr){
- char u[1024];
- strcpy(u,connectStr);
+void setSocketTimeout(boost::asio::ip::tcp::socket& socket, int32_t timeout){
+#if defined _WIN32
+ int32_t timeoutMsecs=timeout*1000;
+ setsockopt(socket.native(), SOL_SOCKET, SO_RCVTIMEO, (const char*)&timeoutMsecs, sizeof(timeoutMsecs));
+ setsockopt(socket.native(), SOL_SOCKET, SO_SNDTIMEO, (const char*)&timeoutMsecs, sizeof(timeoutMsecs));
+#else
+ struct timeval tv;
+ tv.tv_sec = timeout;
+ tv.tv_usec = 0;
+ int e=0;
+ e=setsockopt(socket.native(), SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
+ e=setsockopt(socket.native(), SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv));
+#endif
+}
+
+
+void DrillClientImpl::parseConnectStr(const char* connectStr,
+ std::string& pathToDrill,
+ std::string& protocol,
+ std::string& hostPortStr){
+ char u[MAX_CONNECT_STR+1];
+ strncpy(u,connectStr, MAX_CONNECT_STR); u[MAX_CONNECT_STR]=0;
char* z=strtok(u, "=");
- char* c=strtok(NULL, "");
+ char* c=strtok(NULL, "/");
+ char* p=strtok(NULL, "");
+
+ if(p!=NULL) pathToDrill=std::string("/")+p;
protocol=z; hostPortStr=c;
+ return;
}
connectionStatus_t DrillClientImpl::connect(const char* connStr){
- std::string protocol, hostPortStr;
- std::string host;
+ std::string pathToDrill, protocol, hostPortStr;
+ std::string host;
std::string port;
if(!this->m_bIsConnected){
- parseConnectStr(connStr, protocol, hostPortStr);
- if(!strcmp(protocol.c_str(), "jdbc:drill:zk")){
+ parseConnectStr(connStr, pathToDrill, protocol, hostPortStr);
+ if(!strcmp(protocol.c_str(), "zk")){
ZookeeperImpl zook;
- if(zook.connectToZookeeper(hostPortStr.c_str())!=0){
+ if(zook.connectToZookeeper(hostPortStr.c_str(), pathToDrill.c_str())!=0){
return handleConnError(CONN_ZOOKEEPER_ERROR, getMessage(ERR_CONN_ZOOKEEPER, zook.getError().c_str()));
}
zook.debugPrint();
@@ -74,9 +94,9 @@ connectionStatus_t DrillClientImpl::connect(const char* connStr){
host=boost::lexical_cast<std::string>(e.address());
port=boost::lexical_cast<std::string>(e.user_port());
zook.close();
- }else if(!strcmp(protocol.c_str(), "jdbc:drill:local")){
- char tempStr[1024];
- strcpy(tempStr, hostPortStr.c_str());
+ }else if(!strcmp(protocol.c_str(), "local")){
+ char tempStr[MAX_CONNECT_STR+1];
+ strncpy(tempStr, hostPortStr.c_str(), MAX_CONNECT_STR); tempStr[MAX_CONNECT_STR]=0;
host=strtok(tempStr, ":");
port=strtok(NULL, "");
}else{
@@ -97,7 +117,7 @@ connectionStatus_t DrillClientImpl::connect(const char* host, const char* port){
tcp::resolver::iterator end;
while (iter != end){
endpoint = *iter++;
- BOOST_LOG_TRIVIAL(trace) << endpoint << std::endl;
+ DRILL_LOG(LOG_TRACE) << endpoint << std::endl;
}
boost::system::error_code ec;
m_socket.connect(endpoint, ec);
@@ -108,59 +128,147 @@ connectionStatus_t DrillClientImpl::connect(const char* host, const char* port){
}catch(std::exception e){
return handleConnError(CONN_FAILURE, getMessage(ERR_CONN_EXCEPT, e.what()));
}
+
+ //
+ // We put some OS dependent code here for timing out a socket. Mostly, this appears to
+ // do nothing. Should we leave it in there?
+ //
+ setSocketTimeout(m_socket, DrillClientConfig::getSocketTimeout());
+
return CONN_SUCCESS;
}
-void DrillClientImpl::sendSync(OutBoundRpcMessage& msg){
+connectionStatus_t DrillClientImpl::sendSync(OutBoundRpcMessage& msg){
DrillClientImpl::s_encoder.Encode(m_wbuf, msg);
- m_socket.write_some(boost::asio::buffer(m_wbuf));
+ boost::system::error_code ec;
+ size_t s=m_socket.write_some(boost::asio::buffer(m_wbuf), ec);
+ if(!ec && s!=0){
+ return CONN_SUCCESS;
+ }else{
+ return handleConnError(CONN_FAILURE, getMessage(ERR_CONN_WFAIL, ec.message().c_str()));
+ }
}
-void DrillClientImpl::recvSync(InBoundRpcMessage& msg){
- m_socket.read_some(boost::asio::buffer(m_rbuf));
- uint32_t length = 0;
- int bytes_read = DrillClientImpl::s_decoder.LengthDecode(m_rbuf.data(), &length);
- DrillClientImpl::s_decoder.Decode(m_rbuf.data() + bytes_read, length, msg);
+connectionStatus_t DrillClientImpl::recvHandshake(){
+ if(m_rbuf==NULL){
+ m_rbuf = Utils::allocateBuffer(MAX_SOCK_RD_BUFSIZE);
+ }
+
+ m_deadlineTimer.expires_from_now(boost::posix_time::seconds(DrillClientConfig::getSocketTimeout()));
+ m_deadlineTimer.async_wait(boost::bind(
+ &DrillClientImpl::handleHShakeReadTimeout,
+ this,
+ boost::asio::placeholders::error
+ ));
+ DRILL_LOG(LOG_TRACE) << "Started new handshake wait timer." << std::endl;
+
+ async_read(
+ this->m_socket,
+ boost::asio::buffer(m_rbuf, LEN_PREFIX_BUFLEN),
+ boost::bind(
+ &DrillClientImpl::handleHandshake,
+ this,
+ m_rbuf,
+ boost::asio::placeholders::error,
+ boost::asio::placeholders::bytes_transferred)
+ );
+ DRILL_LOG(LOG_DEBUG) << "Sent handshake read request to server" << std::endl;
+ m_io_service.run();
+ if(m_rbuf!=NULL){
+ Utils::freeBuffer(m_rbuf); m_rbuf=NULL;
+ }
+ return CONN_SUCCESS;
}
-bool DrillClientImpl::ValidateHandShake(){
- exec::user::UserToBitHandshake u2b;
- exec::user::BitToUserHandshake b2u;
+void DrillClientImpl::handleHandshake(ByteBuf_t _buf,
+ const boost::system::error_code& err,
+ size_t bytes_transferred) {
+ boost::system::error_code error=err;
+ // cancel the timer
+ m_deadlineTimer.cancel();
+ DRILL_LOG(LOG_TRACE) << "Deadline timer cancelled." << std::endl;
+ if(!error){
+ InBoundRpcMessage msg;
+ uint32_t length = 0;
+ int bytes_read = DrillClientImpl::s_decoder.LengthDecode(m_rbuf, &length);
+ if(length>0){
+ size_t leftover = LEN_PREFIX_BUFLEN - bytes_read;
+ ByteBuf_t b=m_rbuf + LEN_PREFIX_BUFLEN;
+ size_t bytesToRead=length - leftover;
+ while(1){
+ size_t dataBytesRead=m_socket.read_some(
+ boost::asio::buffer(b, bytesToRead),
+ error);
+ if(err) break;
+ DRILL_LOG(LOG_TRACE) << "Handshake Message: actual bytes read = " << dataBytesRead << std::endl;
+ if(dataBytesRead==bytesToRead) break;
+ bytesToRead-=dataBytesRead;
+ b+=dataBytesRead;
+ }
+ DrillClientImpl::s_decoder.Decode(m_rbuf+bytes_read, length, msg);
+ }else{
+ handleConnError(CONN_FAILURE, getMessage(ERR_CONN_RDFAIL, "No handshake"));
+ }
+ exec::user::BitToUserHandshake b2u;
+ b2u.ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size());
+ this->m_handshakeVersion=b2u.rpc_version();
+
+ }else{
+ // boost error
+ handleConnError(CONN_FAILURE, getMessage(ERR_CONN_RDFAIL, error.message().c_str()));
+ return;
+ }
+ return;
+}
+void DrillClientImpl::handleHShakeReadTimeout(const boost::system::error_code & err){
+ // if err == boost::asio::error::operation_aborted) then the caller cancelled the timer.
+ if(!err){
+ // Check whether the deadline has passed.
+ if (m_deadlineTimer.expires_at() <= boost::asio::deadline_timer::traits_type::now()){
+ // The deadline has passed.
+ m_deadlineTimer.expires_at(boost::posix_time::pos_infin);
+ DRILL_LOG(LOG_TRACE) << "Deadline timer expired." << std::endl;
+ m_socket.close();
+ }
+ }
+ return;
+}
+
+bool DrillClientImpl::validateHandShake(){
+ exec::user::UserToBitHandshake u2b;
u2b.set_channel(exec::shared::USER);
- u2b.set_rpc_version(1);
+ u2b.set_rpc_version(DRILL_RPC_VERSION);
u2b.set_support_listening(true);
-
{
boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
uint64_t coordId = this->getNextCoordinationId();
OutBoundRpcMessage out_msg(exec::rpc::REQUEST, exec::user::HANDSHAKE, coordId, &u2b);
sendSync(out_msg);
-
- InBoundRpcMessage in_msg;
- recvSync(in_msg);
-
- b2u.ParseFromArray(in_msg.m_pbody.data(), in_msg.m_pbody.size());
}
- // validate handshake
- if (b2u.rpc_version() != u2b.rpc_version()) {
- BOOST_LOG_TRIVIAL(trace) << "Invalid rpc version. Expected << "
- << u2b.rpc_version() << ", actual "<< b2u.rpc_version() << "." ;
- handleConnError(CONN_HANDSHAKE_FAILED,
- getMessage(ERR_CONN_NOHSHAKE, u2b.rpc_version(), b2u.rpc_version()));
+ recvHandshake();
+ this->m_io_service.reset();
+ if(this->m_pError!=NULL){
+ return false;
+ }
+ if(m_handshakeVersion != u2b.rpc_version()) {
+ DRILL_LOG(LOG_TRACE) << "Invalid rpc version. Expected << "
+ << DRILL_RPC_VERSION << ", actual "<< m_handshakeVersion << "." << std::endl;
+ handleConnError(CONN_HANDSHAKE_FAILED,
+ getMessage(ERR_CONN_NOHSHAKE, DRILL_RPC_VERSION, m_handshakeVersion));
return false;
}
return true;
}
-std::vector<Drill::FieldMetadata*> DrillClientQueryResult::s_emptyColDefs;
+FieldDefPtr DrillClientQueryResult::s_emptyColDefs( new (std::vector<Drill::FieldMetadata*>));
-DrillClientQueryResult* DrillClientImpl::SubmitQuery(exec::user::QueryType t,
- const std::string& plan,
- pfnQueryResultsListener l,
+DrillClientQueryResult* DrillClientImpl::SubmitQuery(::exec::shared::QueryType t,
+ const std::string& plan,
+ pfnQueryResultsListener l,
void* lCtx){
exec::user::RunQuery query;
query.set_results_mode(exec::user::STREAM_FULL);
@@ -180,16 +288,16 @@ DrillClientQueryResult* DrillClientImpl::SubmitQuery(exec::user::QueryType t,
bool sendRequest=false;
this->m_queryIds[coordId]=pQuery;
- BOOST_LOG_TRIVIAL(debug) << "Submit Query Request. Coordination id = " << coordId;
+ DRILL_LOG(LOG_DEBUG) << "Submit Query Request. Coordination id = " << coordId << std::endl;
if(m_pendingRequests++==0){
sendRequest=true;
}else{
- BOOST_LOG_TRIVIAL(debug) << "Queueing read request to server" << std::endl;
- BOOST_LOG_TRIVIAL(debug) << "Number of pending requests = " << m_pendingRequests << std::endl;
+ DRILL_LOG(LOG_DEBUG) << "Queueing read request to server" << std::endl;
+ DRILL_LOG(LOG_DEBUG) << "Number of pending requests = " << m_pendingRequests << std::endl;
}
if(sendRequest){
- BOOST_LOG_TRIVIAL(debug) << "Sending read request. Number of pending requests = "
+ DRILL_LOG(LOG_DEBUG) << "Sending read request. Number of pending requests = "
<< m_pendingRequests << std::endl;
getNextResult(); // async wait for results
}
@@ -198,8 +306,8 @@ DrillClientQueryResult* DrillClientImpl::SubmitQuery(exec::user::QueryType t,
//run this in a new thread
{
if(this->m_pListenerThread==NULL){
- BOOST_LOG_TRIVIAL(debug) << "Starting listener thread." << std::endl;
- this->m_pListenerThread= new boost::thread(boost::bind(&boost::asio::io_service::run,
+ DRILL_LOG(LOG_DEBUG) << "Starting listener thread." << std::endl;
+ this->m_pListenerThread= new boost::thread(boost::bind(&boost::asio::io_service::run,
&this->m_io_service));
}
}
@@ -211,61 +319,72 @@ void DrillClientImpl::getNextResult(){
// This call is always made from within a function where the mutex has already been acquired
//boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
- //use free, not delete to free
- ByteBuf_t readBuf = allocateBuffer(LEN_PREFIX_BUFLEN);
- async_read(
+ //use free, not delete to free
+ ByteBuf_t readBuf = Utils::allocateBuffer(LEN_PREFIX_BUFLEN);
+
+ m_deadlineTimer.expires_from_now(boost::posix_time::seconds(DrillClientConfig::getSocketTimeout()));
+ m_deadlineTimer.async_wait(boost::bind(
+ &DrillClientImpl::handleReadTimeout,
+ this,
+ boost::asio::placeholders::error
+ ));
+ DRILL_LOG(LOG_TRACE) << "Started new async wait timer." << std::endl;
+
+ async_read(
this->m_socket,
boost::asio::buffer(readBuf, LEN_PREFIX_BUFLEN),
boost::bind(
&DrillClientImpl::handleRead,
this,
readBuf,
- boost::asio::placeholders::error,
+ boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred)
);
- BOOST_LOG_TRIVIAL(debug) << "Sent read request to server" << std::endl;
+ DRILL_LOG(LOG_DEBUG) << "Sent read request to server" << std::endl;
}
void DrillClientImpl::waitForResults(){
this->m_pListenerThread->join();
- BOOST_LOG_TRIVIAL(debug) << "Listener thread exited." << std::endl;
+ DRILL_LOG(LOG_DEBUG) << "Listener thread exited." << std::endl;
delete this->m_pListenerThread; this->m_pListenerThread=NULL;
}
-status_t DrillClientImpl::readMsg(ByteBuf_t _buf, InBoundRpcMessage& msg, boost::system::error_code& error){
+status_t DrillClientImpl::readMsg(ByteBuf_t _buf, ByteBuf_t* allocatedBuffer, InBoundRpcMessage& msg, boost::system::error_code& error){
size_t leftover=0;
uint32_t rmsgLen;
ByteBuf_t currentBuffer;
+ *allocatedBuffer=NULL;
{
- // We need to protect the readLength and read buffer, and the pending requests counter,
+ // We need to protect the readLength and read buffer, and the pending requests counter,
// but we don't have to keep the lock while we decode the rest of the buffer.
boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
int bytes_read = DrillClientImpl::s_decoder.LengthDecode(_buf, &rmsgLen);
- BOOST_LOG_TRIVIAL(trace) << "len bytes = " << bytes_read << std::endl;
- BOOST_LOG_TRIVIAL(trace) << "rmsgLen = " << rmsgLen << std::endl;
+ DRILL_LOG(LOG_TRACE) << "len bytes = " << bytes_read << std::endl;
+ DRILL_LOG(LOG_TRACE) << "rmsgLen = " << rmsgLen << std::endl;
if(rmsgLen>0){
leftover = LEN_PREFIX_BUFLEN - bytes_read;
// Allocate a buffer
- BOOST_LOG_TRIVIAL(trace) << "Allocated and locked buffer." << std::endl;
- currentBuffer=allocateBuffer(rmsgLen);
+ DRILL_LOG(LOG_TRACE) << "Allocated and locked buffer." << std::endl;
+ currentBuffer=Utils::allocateBuffer(rmsgLen);
if(currentBuffer==NULL){
+ Utils::freeBuffer(_buf);
return handleQryError(QRY_CLIENT_OUTOFMEM, getMessage(ERR_QRY_OUTOFMEM), NULL);
}
+ *allocatedBuffer=currentBuffer;
if(leftover){
memcpy(currentBuffer, _buf + bytes_read, leftover);
}
- freeBuffer(_buf);
- BOOST_LOG_TRIVIAL(trace) << "reading data (rmsgLen - leftover) : "
+ DRILL_LOG(LOG_TRACE) << "reading data (rmsgLen - leftover) : "
<< (rmsgLen - leftover) << std::endl;
ByteBuf_t b=currentBuffer + leftover;
size_t bytesToRead=rmsgLen - leftover;
while(1){
size_t dataBytesRead=this->m_socket.read_some(
- boost::asio::buffer(b, bytesToRead),
+ boost::asio::buffer(b, bytesToRead),
error);
if(error) break;
- BOOST_LOG_TRIVIAL(trace) << "Data Message: actual bytes read = " << dataBytesRead << std::endl;
+ DRILL_LOG(LOG_TRACE) << "Data Message: actual bytes read = " << dataBytesRead << std::endl;
if(dataBytesRead==bytesToRead) break;
bytesToRead-=dataBytesRead;
b+=dataBytesRead;
@@ -273,31 +392,34 @@ status_t DrillClientImpl::readMsg(ByteBuf_t _buf, InBoundRpcMessage& msg, boost:
if(!error){
// read data successfully
DrillClientImpl::s_decoder.Decode(currentBuffer, rmsgLen, msg);
- BOOST_LOG_TRIVIAL(trace) << "Done decoding chunk. Coordination id: " <<msg.m_coord_id<< std::endl;
+ DRILL_LOG(LOG_TRACE) << "Done decoding chunk. Coordination id: " <<msg.m_coord_id<< std::endl;
}else{
- return handleQryError(QRY_COMM_ERROR,
+ Utils::freeBuffer(_buf);
+ return handleQryError(QRY_COMM_ERROR,
getMessage(ERR_QRY_COMMERR, error.message().c_str()), NULL);
}
}else{
- // got a message with an invalid read length.
+ // got a message with an invalid read length.
+ Utils::freeBuffer(_buf);
return handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVREADLEN), NULL);
}
}
+ Utils::freeBuffer(_buf);
return QRY_SUCCESS;
}
-status_t DrillClientImpl::processQueryResult(InBoundRpcMessage& msg ){
+status_t DrillClientImpl::processQueryResult(ByteBuf_t allocatedBuffer, InBoundRpcMessage& msg ){
DrillClientQueryResult* pDrillClientQueryResult=NULL;
status_t ret=QRY_SUCCESS;
{
boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
exec::user::QueryResult* qr = new exec::user::QueryResult; //Record Batch will own this object and free it up.
- BOOST_LOG_TRIVIAL(debug) << "Processing Query Result " << std::endl;
+ DRILL_LOG(LOG_DEBUG) << "Processing Query Result " << std::endl;
qr->ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size());
- BOOST_LOG_TRIVIAL(trace) << qr->DebugString();
+ DRILL_LOG(LOG_TRACE) << qr->DebugString() << std::endl;
- BOOST_LOG_TRIVIAL(debug) << "Searching for Query Id - " << debugPrintQid(qr->query_id()) << std::endl;
+ DRILL_LOG(LOG_DEBUG) << "Searching for Query Id - " << debugPrintQid(qr->query_id()) << std::endl;
exec::shared::QueryId qid;
qid.CopyFrom(qr->query_id());
@@ -306,39 +428,48 @@ status_t DrillClientImpl::processQueryResult(InBoundRpcMessage& msg ){
if(it!=this->m_queryResults.end()){
pDrillClientQueryResult=(*it).second;
}else{
- assert(0);
+ assert(0);
//assert might be compiled away in a release build. So return an error to the app.
status_t ret= handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_OUTOFORDER), NULL);
delete qr;
return ret;
}
- BOOST_LOG_TRIVIAL(debug) << "Drill Client Query Result Query Id - " <<
- debugPrintQid(*pDrillClientQueryResult->m_pQueryId)
+ DRILL_LOG(LOG_DEBUG) << "Drill Client Query Result Query Id - " <<
+ debugPrintQid(*pDrillClientQueryResult->m_pQueryId)
<< std::endl;
//Check QueryResult.queryState. QueryResult could have an error.
if(qr->query_state() == exec::user::QueryResult_QueryState_FAILED){
status_t ret=handleQryError(QRY_FAILURE, qr->error(0), pDrillClientQueryResult);
+ Utils::freeBuffer(allocatedBuffer);
delete qr;
return ret;
}
//Validate the RPC message
std::string valErr;
if( (ret=validateMessage(msg, *qr, valErr)) != QRY_SUCCESS){
+ Utils::freeBuffer(allocatedBuffer);
+ delete qr;
return handleQryError(ret, getMessage(ERR_QRY_INVRPC, valErr.c_str()), pDrillClientQueryResult);
}
- //Build Record Batch here
- BOOST_LOG_TRIVIAL(trace) << qr->DebugString();
+ //Build Record Batch here
+ DRILL_LOG(LOG_TRACE) << qr->DebugString() << std::endl;
+
+ RecordBatch* pRecordBatch= new RecordBatch(qr, allocatedBuffer, msg.m_dbody);
+ pDrillClientQueryResult->m_numBatches++;
- RecordBatch* pRecordBatch= new RecordBatch(qr, msg.m_dbody);
+ DRILL_LOG(LOG_TRACE) << "Allocated new Record batch." << (void*)pRecordBatch << std::endl;
pRecordBatch->build();
- BOOST_LOG_TRIVIAL(debug) << debugPrintQid(qr->query_id())<<"recordBatch.numRecords "
+ DRILL_LOG(LOG_DEBUG) << debugPrintQid(qr->query_id())<<"recordBatch.numRecords "
<< pRecordBatch->getNumRecords() << std::endl;
- BOOST_LOG_TRIVIAL(debug) << debugPrintQid(qr->query_id())<<"recordBatch.numFields "
+ DRILL_LOG(LOG_DEBUG) << debugPrintQid(qr->query_id())<<"recordBatch.numFields "
<< pRecordBatch->getNumFields() << std::endl;
- BOOST_LOG_TRIVIAL(debug) << debugPrintQid(qr->query_id())<<"recordBatch.isLastChunk "
+ DRILL_LOG(LOG_DEBUG) << debugPrintQid(qr->query_id())<<"recordBatch.isLastChunk "
<< pRecordBatch->isLastChunk() << std::endl;
+ ret=pDrillClientQueryResult->setupColumnDefs(qr);
+ if(ret==QRY_SUCCESS_WITH_INFO)pRecordBatch->schemaChanged(true);
+
pDrillClientQueryResult->m_bIsQueryPending=true;
pDrillClientQueryResult->m_bIsLastChunk=qr->is_last_chunk();
pfnQueryResultsListener pResultsListener=pDrillClientQueryResult->m_pResultsListener;
@@ -346,7 +477,7 @@ status_t DrillClientImpl::processQueryResult(InBoundRpcMessage& msg ){
ret = pResultsListener(pDrillClientQueryResult, pRecordBatch, NULL);
}else{
//Use a default callback that is called when a record batch is received
- ret = pDrillClientQueryResult->defaultQueryResultsListener(pDrillClientQueryResult,
+ ret = pDrillClientQueryResult->defaultQueryResultsListener(pDrillClientQueryResult,
pRecordBatch, NULL);
}
} // release lock
@@ -357,16 +488,16 @@ status_t DrillClientImpl::processQueryResult(InBoundRpcMessage& msg ){
m_pendingRequests--;
}
pDrillClientQueryResult->m_bIsQueryPending=false;
- BOOST_LOG_TRIVIAL(debug) << "Client app cancelled query.";
+ DRILL_LOG(LOG_DEBUG) << "Client app cancelled query." << std::endl;
return ret;
}
if(pDrillClientQueryResult->m_bIsLastChunk){
{
boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
m_pendingRequests--;
- BOOST_LOG_TRIVIAL(debug) << debugPrintQid(*pDrillClientQueryResult->m_pQueryId)
+ DRILL_LOG(LOG_DEBUG) << debugPrintQid(*pDrillClientQueryResult->m_pQueryId)
<< "Received last batch. " << std::endl;
- BOOST_LOG_TRIVIAL(debug) << debugPrintQid(*pDrillClientQueryResult->m_pQueryId)
+ DRILL_LOG(LOG_DEBUG) << debugPrintQid(*pDrillClientQueryResult->m_pQueryId)
<< "Pending requests: " << m_pendingRequests <<"." << std::endl;
}
ret=QRY_NO_MORE_DATA;
@@ -377,9 +508,9 @@ status_t DrillClientImpl::processQueryResult(InBoundRpcMessage& msg ){
return ret;
}
-status_t DrillClientImpl::processQueryId(InBoundRpcMessage& msg ){
+status_t DrillClientImpl::processQueryId(ByteBuf_t allocatedBuffer, InBoundRpcMessage& msg ){
DrillClientQueryResult* pDrillClientQueryResult=NULL;
- BOOST_LOG_TRIVIAL(debug) << "Processing Query Handle with coordination id:" << msg.m_coord_id << std::endl;
+ DRILL_LOG(LOG_DEBUG) << "Processing Query Handle with coordination id:" << msg.m_coord_id << std::endl;
status_t ret=QRY_SUCCESS;
boost::lock_guard<boost::mutex> lock(m_dcMutex);
@@ -388,45 +519,74 @@ status_t DrillClientImpl::processQueryId(InBoundRpcMessage& msg ){
if(it!=this->m_queryIds.end()){
pDrillClientQueryResult=(*it).second;
exec::shared::QueryId *qid = new exec::shared::QueryId;
- BOOST_LOG_TRIVIAL(trace) << "Received Query Handle" << msg.m_pbody.size();
+ DRILL_LOG(LOG_TRACE) << "Received Query Handle" << msg.m_pbody.size() << std::endl;
qid->ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size());
- BOOST_LOG_TRIVIAL(trace) << qid->DebugString();
+ DRILL_LOG(LOG_TRACE) << qid->DebugString() << std::endl;
m_queryResults[qid]=pDrillClientQueryResult;
//save queryId allocated here so we can free it later
pDrillClientQueryResult->setQueryId(qid);
}else{
+ Utils::freeBuffer(allocatedBuffer);
return handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVQUERYID), NULL);
}
+ Utils::freeBuffer(allocatedBuffer);
return ret;
}
-void DrillClientImpl::handleRead(ByteBuf_t _buf,
- const boost::system::error_code& err,
+void DrillClientImpl::handleReadTimeout(const boost::system::error_code & err){
+ // if err == boost::asio::error::operation_aborted) then the caller cancelled the timer.
+ if(!err){
+ // Check whether the deadline has passed.
+ if (m_deadlineTimer.expires_at() <= boost::asio::deadline_timer::traits_type::now()){
+ // The deadline has passed.
+ handleQryError(QRY_COMM_ERROR, getMessage(ERR_QRY_TIMOUT), NULL);
+ // There is no longer an active deadline. The expiry is set to positive
+ // infinity so that the timer never expires until a new deadline is set.
+ // Note that at this time, the caller is not in a (async) wait for the timer.
+ m_deadlineTimer.expires_at(boost::posix_time::pos_infin);
+ DRILL_LOG(LOG_TRACE) << "Deadline timer expired." << std::endl;
+ // Cancel all pending async IOs.
+ // The cancel call _MAY_ not work on all platforms. To be a little more reliable we need
+ // to have the BOOST_ASIO_ENABLE_CANCELIO macro (as well as the BOOST_ASIO_DISABLE_IOCP macro?)
+ // defined. To be really sure, we need to close the socket. Closing the socket is a bit
+ // drastic and we will defer that till a later release.
+ m_socket.cancel();
+ }
+ }
+ return;
+}
+
+void DrillClientImpl::handleRead(ByteBuf_t _buf,
+ const boost::system::error_code& err,
size_t bytes_transferred) {
boost::system::error_code error=err;
+ // cancel the timer
+ m_deadlineTimer.cancel();
+ DRILL_LOG(LOG_TRACE) << "Deadline timer cancelled." << std::endl;
if(!error){
InBoundRpcMessage msg;
- BOOST_LOG_TRIVIAL(trace) << "Getting new message" << std::endl;
+ DRILL_LOG(LOG_TRACE) << "Getting new message" << std::endl;
+ ByteBuf_t allocatedBuffer=NULL;
- if(readMsg(_buf, msg, error)!=QRY_SUCCESS){
+ if(readMsg(_buf, &allocatedBuffer, msg, error)!=QRY_SUCCESS){
if(m_pendingRequests!=0){
boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
getNextResult();
}
return;
- }
+ }
if(!error && msg.m_rpc_type==exec::user::QUERY_RESULT){
- if(processQueryResult(msg)!=QRY_SUCCESS){
+ if(processQueryResult(allocatedBuffer, msg)!=QRY_SUCCESS){
if(m_pendingRequests!=0){
boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
getNextResult();
}
return;
}
- }else if(!error && msg.m_rpc_type==exec::user::QUERY_HANDLE){
- if(processQueryId(msg)!=QRY_SUCCESS){
+ }else if(!error && msg.m_rpc_type==exec::user::QUERY_HANDLE){
+ if(processQueryId(allocatedBuffer, msg)!=QRY_SUCCESS){
if(m_pendingRequests!=0){
boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
getNextResult();
@@ -436,15 +596,15 @@ void DrillClientImpl::handleRead(ByteBuf_t _buf,
}else{
boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
if(error){
- // We have a socket read error, but we do not know which query this is for.
+ // We have a socket read error, but we do not know which query this is for.
// Signal ALL pending queries that they should stop waiting.
- BOOST_LOG_TRIVIAL(trace) << "read error: " << error << "\n";
+ DRILL_LOG(LOG_TRACE) << "read error: " << error << std::endl;
handleQryError(QRY_COMM_ERROR, getMessage(ERR_QRY_COMMERR, error.message().c_str()), NULL);
return;
}else{
//If not QUERY_RESULT, then we think something serious has gone wrong?
assert(0);
- BOOST_LOG_TRIVIAL(trace) << "QueryResult returned " << msg.m_rpc_type;
+ DRILL_LOG(LOG_TRACE) << "QueryResult returned " << msg.m_rpc_type << std::endl;
handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVRPCTYPE, msg.m_rpc_type), NULL);
return;
}
@@ -455,6 +615,7 @@ void DrillClientImpl::handleRead(ByteBuf_t _buf,
}
}else{
// boost error
+ Utils::freeBuffer(_buf);
boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
handleQryError(QRY_COMM_ERROR, getMessage(ERR_QRY_COMMERR, error.message().c_str()), NULL);
return;
@@ -485,6 +646,7 @@ status_t DrillClientImpl::validateMessage(InBoundRpcMessage& msg, exec::user::Qu
connectionStatus_t DrillClientImpl::handleConnError(connectionStatus_t status, std::string msg){
DrillClientError* pErr = new DrillClientError(status, DrillClientError::CONN_ERROR_START+status, msg);
m_pendingRequests=0;
+ if(m_pError!=NULL){ delete m_pError; m_pError=NULL;}
m_pError=pErr;
broadcastError(this->m_pError);
return status;
@@ -492,19 +654,20 @@ connectionStatus_t DrillClientImpl::handleConnError(connectionStatus_t status, s
status_t DrillClientImpl::handleQryError(status_t status, std::string msg, DrillClientQueryResult* pQueryResult){
DrillClientError* pErr = new DrillClientError(status, DrillClientError::QRY_ERROR_START+status, msg);
+ if(m_pError!=NULL){ delete m_pError; m_pError=NULL;}
+ m_pError=pErr;
if(pQueryResult!=NULL){
m_pendingRequests--;
pQueryResult->signalError(pErr);
}else{
m_pendingRequests=0;
- m_pError=pErr;
broadcastError(this->m_pError);
}
return status;
}
-status_t DrillClientImpl::handleQryError(status_t status,
- const exec::shared::DrillPBError& e,
+status_t DrillClientImpl::handleQryError(status_t status,
+ const exec::shared::DrillPBError& e,
DrillClientQueryResult* pQueryResult){
assert(pQueryResult!=NULL);
this->m_pError = DrillClientError::getErrorObject(e);
@@ -516,8 +679,10 @@ status_t DrillClientImpl::handleQryError(status_t status,
void DrillClientImpl::broadcastError(DrillClientError* pErr){
if(pErr!=NULL){
std::map<int, DrillClientQueryResult*>::iterator iter;
- for(iter = m_queryIds.begin(); iter != m_queryIds.end(); iter++) {
- iter->second->signalError(pErr);
+ if(!m_queryIds.empty()){
+ for(iter = m_queryIds.begin(); iter != m_queryIds.end(); iter++) {
+ iter->second->signalError(pErr);
+ }
}
}
return;
@@ -526,17 +691,21 @@ void DrillClientImpl::broadcastError(DrillClientError* pErr){
void DrillClientImpl::clearMapEntries(DrillClientQueryResult* pQueryResult){
std::map<int, DrillClientQueryResult*>::iterator iter;
boost::lock_guard<boost::mutex> lock(m_dcMutex);
- for(iter=m_queryIds.begin(); iter!=m_queryIds.end(); iter++) {
- if(pQueryResult==(DrillClientQueryResult*)iter->second){
- m_queryIds.erase(iter->first);
- break;
+ if(!m_queryIds.empty()){
+ for(iter=m_queryIds.begin(); iter!=m_queryIds.end(); iter++) {
+ if(pQueryResult==(DrillClientQueryResult*)iter->second){
+ m_queryIds.erase(iter->first);
+ break;
+ }
}
}
- std::map<exec::shared::QueryId*, DrillClientQueryResult*, compareQueryId>::iterator it;
- for(it=m_queryResults.begin(); it!=m_queryResults.end(); it++) {
- if(pQueryResult==(DrillClientQueryResult*)it->second){
- m_queryResults.erase(it->first);
- break;
+ if(!m_queryResults.empty()){
+ std::map<exec::shared::QueryId*, DrillClientQueryResult*, compareQueryId>::iterator it;
+ for(it=m_queryResults.begin(); it!=m_queryResults.end(); it++) {
+ if(pQueryResult==(DrillClientQueryResult*)it->second){
+ m_queryResults.erase(it->first);
+ break;
+ }
}
}
}
@@ -547,7 +716,7 @@ void DrillClientImpl::sendAck(InBoundRpcMessage& msg){
OutBoundRpcMessage ack_msg(exec::rpc::RESPONSE, exec::user::ACK, msg.m_coord_id, &ack);
boost::lock_guard<boost::mutex> lock(m_dcMutex);
sendSync(ack_msg);
- BOOST_LOG_TRIVIAL(trace) << "ACK sent" << std::endl;
+ DRILL_LOG(LOG_TRACE) << "ACK sent" << std::endl;
}
void DrillClientImpl::sendCancel(InBoundRpcMessage& msg){
@@ -556,34 +725,36 @@ void DrillClientImpl::sendCancel(InBoundRpcMessage& msg){
OutBoundRpcMessage ack_msg(exec::rpc::RESPONSE, exec::user::CANCEL_QUERY, msg.m_coord_id, &ack);
boost::lock_guard<boost::mutex> lock(m_dcMutex);
sendSync(ack_msg);
- BOOST_LOG_TRIVIAL(trace) << "CANCEL sent" << std::endl;
+ DRILL_LOG(LOG_TRACE) << "CANCEL sent" << std::endl;
}
-// This COPIES the FieldMetadata definition for the record batch. ColumnDefs held by this
+// This COPIES the FieldMetadata definition for the record batch. ColumnDefs held by this
// class are used by the async callbacks.
status_t DrillClientQueryResult::setupColumnDefs(exec::user::QueryResult* pQueryResult) {
bool hasSchemaChanged=false;
+ bool isFirstIter=false;
boost::lock_guard<boost::mutex> schLock(this->m_schemaMutex);
- std::vector<Drill::FieldMetadata*> prevSchema=this->m_columnDefs;
+ FieldDefPtr prevSchema=this->m_columnDefs;
+ isFirstIter=this->m_numBatches==1?true:false;
std::map<std::string, Drill::FieldMetadata*> oldSchema;
- for(std::vector<Drill::FieldMetadata*>::iterator it = prevSchema.begin(); it != prevSchema.end(); ++it){
- // the key is the field_name + type
- char type[256];
- sprintf(type, ":%d:%d",(*it)->getMinorType(), (*it)->getDataMode() );
- std::string k= (*it)->getName()+type;
- oldSchema[k]=*it;
+ if(!m_columnDefs->empty()){
+ for(std::vector<Drill::FieldMetadata*>::iterator it = prevSchema->begin(); it != prevSchema->end(); ++it){
+ // the key is the field_name + type
+ char type[256];
+ sprintf(type, ":%d:%d",(*it)->getMinorType(), (*it)->getDataMode() );
+ std::string k= (*it)->getName()+type;
+ oldSchema[k]=*it;
+ }
}
-
- m_columnDefs.clear();
+ m_columnDefs->clear();
size_t numFields=pQueryResult->def().field_size();
for(size_t i=0; i<numFields; i++){
- //TODO: free this??
Drill::FieldMetadata* fmd= new Drill::FieldMetadata;
fmd->set(pQueryResult->def().field(i));
- this->m_columnDefs.push_back(fmd);
+ this->m_columnDefs->push_back(fmd);
- //Look for changes in the vector and trigger a Schema change event if necessary.
+ //Look for changes in the vector and trigger a Schema change event if necessary.
//If vectors are different, then call the schema change listener.
char type[256];
sprintf(type, ":%d:%d",fmd->getMinorType(), fmd->getDataMode() );
@@ -601,22 +772,27 @@ status_t DrillClientQueryResult::setupColumnDefs(exec::user::QueryResult* pQuery
}
//free memory allocated for FieldMetadata objects saved in previous columnDefs;
- for(std::vector<Drill::FieldMetadata*>::iterator it = prevSchema.begin(); it != prevSchema.end(); ++it){
- delete *it;
+ if(!prevSchema->empty()){
+ for(std::vector<Drill::FieldMetadata*>::iterator it = prevSchema->begin(); it != prevSchema->end(); ++it){
+ delete *it;
+ }
}
- prevSchema.clear();
- this->m_bHasSchemaChanged=hasSchemaChanged;
- if(hasSchemaChanged){
- //TODO: invoke schema change Listener
+ prevSchema->clear();
+ this->m_bHasSchemaChanged=hasSchemaChanged&&!isFirstIter;
+ if(this->m_bHasSchemaChanged){
+ //invoke schema change Listener
+ if(m_pSchemaListener!=NULL){
+ m_pSchemaListener(this, m_columnDefs, NULL);
+ }
}
- return hasSchemaChanged?QRY_SUCCESS_WITH_INFO:QRY_SUCCESS;
+ return this->m_bHasSchemaChanged?QRY_SUCCESS_WITH_INFO:QRY_SUCCESS;
}
-status_t DrillClientQueryResult::defaultQueryResultsListener(void* ctx,
- RecordBatch* b,
+status_t DrillClientQueryResult::defaultQueryResultsListener(void* ctx,
+ RecordBatch* b,
DrillClientError* err) {
//ctx; // unused, we already have the this pointer
- BOOST_LOG_TRIVIAL(trace) << "Query result listener called" << std::endl;
+ DRILL_LOG(LOG_TRACE) << "Query result listener called" << std::endl;
//check if the query has been canceled. IF so then return FAILURE. Caller will send cancel to the server.
if(this->m_bCancel){
return QRY_FAILURE;
@@ -625,7 +801,7 @@ status_t DrillClientQueryResult::defaultQueryResultsListener(void* ctx,
// signal the cond var
{
#ifdef DEBUG
- BOOST_LOG_TRIVIAL(debug)<<debugPrintQid(b->getQueryResult()->query_id())
+ DRILL_LOG(LOG_DEBUG)<<debugPrintQid(b->getQueryResult()->query_id())
<< "Query result listener saved result to queue." << std::endl;
#endif
boost::lock_guard<boost::mutex> cvLock(this->m_cvMutex);
@@ -644,7 +820,7 @@ RecordBatch* DrillClientQueryResult::peekNext() {
//if no more data, return NULL;
if(!m_bIsQueryPending) return NULL;
boost::unique_lock<boost::mutex> cvLock(this->m_cvMutex);
- BOOST_LOG_TRIVIAL(trace) << "Synchronous read waiting for data." << std::endl;
+ DRILL_LOG(LOG_TRACE) << "Synchronous read waiting for data." << std::endl;
while(!this->m_bHasData && !m_bHasError) {
this->m_cv.wait(cvLock);
}
@@ -656,11 +832,17 @@ RecordBatch* DrillClientQueryResult::peekNext() {
RecordBatch* DrillClientQueryResult::getNext() {
RecordBatch* pRecordBatch=NULL;
//if no more data, return NULL;
- if(!m_bIsQueryPending) return NULL;
+ if(!m_bIsQueryPending){
+ DRILL_LOG(LOG_TRACE) << "Query is done." << std::endl;
+ if(!m_recordBatches.empty()){
+ DRILL_LOG(LOG_TRACE) << " But there is a Record batch left behind." << std::endl;
+ }
+ return NULL;
+ }
boost::unique_lock<boost::mutex> cvLock(this->m_cvMutex);
- BOOST_LOG_TRIVIAL(trace) << "Synchronous read waiting for data." << std::endl;
- while(!this->m_bHasData && !m_bHasError) {
+ DRILL_LOG(LOG_TRACE) << "Synchronous read waiting for data." << std::endl;
+ while(!this->m_bHasData && !m_bHasError){
this->m_cv.wait(cvLock);
}
// remove first element from queue
@@ -709,33 +891,49 @@ void DrillClientQueryResult::signalError(DrillClientError* pErr){
}
void DrillClientQueryResult::clearAndDestroy(){
- if(this->m_pQueryId!=NULL){
- delete this->m_pQueryId; this->m_pQueryId=NULL;
- }
//free memory allocated for FieldMetadata objects saved in m_columnDefs;
- for(std::vector<Drill::FieldMetadata*>::iterator it = m_columnDefs.begin(); it != m_columnDefs.end(); ++it){
- delete *it;
+ if(!m_columnDefs->empty()){
+ for(std::vector<Drill::FieldMetadata*>::iterator it = m_columnDefs->begin(); it != m_columnDefs->end(); ++it){
+ delete *it;
+ }
+ m_columnDefs->clear();
}
- m_columnDefs.clear();
//Tell the parent to remove this from it's lists
m_pClient->clearMapEntries(this);
+
+ //clear query id map entries.
+ if(this->m_pQueryId!=NULL){
+ delete this->m_pQueryId; this->m_pQueryId=NULL;
+ }
+ if(!m_recordBatches.empty()){
+ // When multiple qwueries execute in parallel we sometimes get an empty record batch back from the servrer _after_
+ // the last chunk has been received. We eventually delete it.
+ DRILL_LOG(LOG_TRACE) << "Freeing Record batch(es) left behind "<< std::endl;
+ RecordBatch* pR=NULL;
+ while(!m_recordBatches.empty()){
+ pR=m_recordBatches.front();
+ m_recordBatches.pop();
+ delete pR;
+ }
+ }
}
-char ZookeeperImpl::s_drillRoot[]="/drill/drillbits1";
+char ZookeeperImpl::s_drillRoot[]="/drill/";
+char ZookeeperImpl::s_defaultCluster[]="drillbits1";
-ZookeeperImpl::ZookeeperImpl(){
+ZookeeperImpl::ZookeeperImpl(){
m_pDrillbits=new String_vector;
srand (time(NULL));
m_bConnecting=true;
memset(&m_id, 0, sizeof(m_id));
}
-ZookeeperImpl::~ZookeeperImpl(){
+ZookeeperImpl::~ZookeeperImpl(){
delete m_pDrillbits;
}
ZooLogLevel ZookeeperImpl::getZkLogLevel(){
- //typedef enum {ZOO_LOG_LEVEL_ERROR=1,
+ //typedef enum {ZOO_LOG_LEVEL_ERROR=1,
// ZOO_LOG_LEVEL_WARN=2,
// ZOO_LOG_LEVEL_INFO=3,
// ZOO_LOG_LEVEL_DEBUG=4
@@ -752,11 +950,11 @@ ZooLogLevel ZookeeperImpl::getZkLogLevel(){
case LOG_FATAL:
default:
return ZOO_LOG_LEVEL_ERROR;
- }
+ }
return ZOO_LOG_LEVEL_ERROR;
}
-int ZookeeperImpl::connectToZookeeper(const char* connectStr){
+int ZookeeperImpl::connectToZookeeper(const char* connectStr, const char* pathToDrill){
uint32_t waitTime=30000; // 10 seconds
zoo_set_debug_level(getZkLogLevel());
zoo_deterministic_conn_order(1); // enable deterministic order
@@ -780,24 +978,31 @@ int ZookeeperImpl::connectToZookeeper(const char* connectStr){
return CONN_FAILURE;
}
int rc = ZOK;
- rc=zoo_get_children(m_zh, (char*)s_drillRoot, 0, m_pDrillbits);
+ char rootDir[MAX_CONNECT_STR+1];
+ if(pathToDrill==NULL || strlen(pathToDrill)==0){
+ strcpy(rootDir, (char*)s_drillRoot);
+ strcat(rootDir, s_defaultCluster);
+ }else{
+ strncpy(rootDir, pathToDrill, MAX_CONNECT_STR); rootDir[MAX_CONNECT_STR]=0;
+ }
+ rc=zoo_get_children(m_zh, (char*)rootDir, 0, m_pDrillbits);
if(rc!=ZOK){
m_err=getMessage(ERR_CONN_ZKERR, rc);
zookeeper_close(m_zh);
return -1;
}
-
//Let's pick a random drillbit.
if(m_pDrillbits && m_pDrillbits->count >0){
int r=rand()%(this->m_pDrillbits->count);
assert(r<this->m_pDrillbits->count);
char * bit=this->m_pDrillbits->data[r];
std::string s;
- s=s_drillRoot + std::string("/") + bit;
- int buffer_len=1024;
- char buffer[1024];
+ s=rootDir + std::string("/") + bit;
+ int buffer_len=MAX_CONNECT_STR;
+ char buffer[MAX_CONNECT_STR+1];
struct Stat stat;
+ buffer[MAX_CONNECT_STR]=0;
rc= zoo_get(m_zh, s.c_str(), 0, buffer, &buffer_len, &stat);
if(rc!=ZOK){
m_err=getMessage(ERR_CONN_ZKDBITERR, rc);
@@ -840,7 +1045,7 @@ void ZookeeperImpl::watcher(zhandle_t *zzh, int type, int state, const char *pat
// signal the cond var
{
if (state == ZOO_CONNECTED_STATE){
- BOOST_LOG_TRIVIAL(trace) << "Connected to Zookeeper." << std::endl;
+ DRILL_LOG(LOG_TRACE) << "Connected to Zookeeper." << std::endl;
}
boost::lock_guard<boost::mutex> bufferLock(self->m_cvMutex);
self->m_bConnecting=false;
@@ -850,7 +1055,7 @@ void ZookeeperImpl::watcher(zhandle_t *zzh, int type, int state, const char *pat
void ZookeeperImpl:: debugPrint(){
if(m_zh!=NULL && m_state==ZOO_CONNECTED_STATE){
- BOOST_LOG_TRIVIAL(trace) << m_drillServiceInstance.DebugString();
+ DRILL_LOG(LOG_TRACE) << m_drillServiceInstance.DebugString() << std::endl;
}
}