You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pa...@apache.org on 2015/01/07 02:56:26 UTC
[1/4] drill git commit: DRILL-1533: C++ Drill Client always sets
hasSchemaChanged to true for every new record batch
Repository: drill
Updated Branches:
refs/heads/master b491cdb37 -> 363d30b54
DRILL-1533: C++ Drill Client always sets hasSchemaChanged to true for every new record batch
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/07f276d6
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/07f276d6
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/07f276d6
Branch: refs/heads/master
Commit: 07f276d686c99bc6485661ece0425c1a187aa802
Parents: b491cdb
Author: norrislee <no...@hotmail.com>
Authored: Mon Jan 5 15:28:34 2015 -0800
Committer: Parth Chandra <pc...@maprtech.com>
Committed: Tue Jan 6 09:00:14 2015 -0800
----------------------------------------------------------------------
.../client/src/clientlib/drillClientImpl.cpp | 51 +++++++++-----------
1 file changed, 23 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/07f276d6/contrib/native/client/src/clientlib/drillClientImpl.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/drillClientImpl.cpp b/contrib/native/client/src/clientlib/drillClientImpl.cpp
index 23dc407..5b390ef 100644
--- a/contrib/native/client/src/clientlib/drillClientImpl.cpp
+++ b/contrib/native/client/src/clientlib/drillClientImpl.cpp
@@ -923,49 +923,44 @@ status_t DrillClientQueryResult::setupColumnDefs(exec::shared::QueryResult* pQue
bool isFirstIter=false;
boost::lock_guard<boost::mutex> schLock(this->m_schemaMutex);
- FieldDefPtr prevSchema=this->m_columnDefs;
isFirstIter=this->m_numBatches==1?true:false;
std::map<std::string, Drill::FieldMetadata*> oldSchema;
if(!m_columnDefs->empty()){
- for(std::vector<Drill::FieldMetadata*>::iterator it = prevSchema->begin(); it != prevSchema->end(); ++it){
+ for(std::vector<Drill::FieldMetadata*>::iterator it = this->m_columnDefs->begin(); it != this->m_columnDefs->end(); ++it){
// the key is the field_name + type
char type[256];
sprintf(type, ":%d:%d",(*it)->getMinorType(), (*it)->getDataMode() );
std::string k= (*it)->getName()+type;
oldSchema[k]=*it;
+ delete *it;
}
}
m_columnDefs->clear();
size_t numFields=pQueryResult->def().field_size();
- for(size_t i=0; i<numFields; i++){
- Drill::FieldMetadata* fmd= new Drill::FieldMetadata;
- fmd->set(pQueryResult->def().field(i));
- this->m_columnDefs->push_back(fmd);
-
- //Look for changes in the vector and trigger a Schema change event if necessary.
- //If vectors are different, then call the schema change listener.
- char type[256];
- sprintf(type, ":%d:%d",fmd->getMinorType(), fmd->getDataMode() );
- std::string k= fmd->getName()+type;
- std::map<std::string, Drill::FieldMetadata*>::iterator iter=oldSchema.find(k);
- if(iter==oldSchema.end()){
- // not found
- hasSchemaChanged=true;
- }else{
- oldSchema.erase(iter);
+ if (numFields > 0){
+ for(size_t i=0; i<numFields; i++){
+ Drill::FieldMetadata* fmd= new Drill::FieldMetadata;
+ fmd->set(pQueryResult->def().field(i));
+ this->m_columnDefs->push_back(fmd);
+
+ //Look for changes in the vector and trigger a Schema change event if necessary.
+ //If vectors are different, then call the schema change listener.
+ char type[256];
+ sprintf(type, ":%d:%d",fmd->getMinorType(), fmd->getDataMode() );
+ std::string k= fmd->getName()+type;
+ std::map<std::string, Drill::FieldMetadata*>::iterator iter=oldSchema.find(k);
+ if(iter==oldSchema.end()){
+ // not found
+ hasSchemaChanged=true;
+ }else{
+ oldSchema.erase(iter);
+ }
}
- }
- if(oldSchema.size()>0){
- hasSchemaChanged=true;
- }
-
- //free memory allocated for FieldMetadata objects saved in previous columnDefs;
- if(!prevSchema->empty()){
- for(std::vector<Drill::FieldMetadata*>::iterator it = prevSchema->begin(); it != prevSchema->end(); ++it){
- delete *it;
+ if(oldSchema.size()>0){
+ hasSchemaChanged=true;
+ oldSchema.clear();
}
}
- prevSchema->clear();
this->m_bHasSchemaChanged=hasSchemaChanged&&!isFirstIter;
if(this->m_bHasSchemaChanged){
//invoke schema change Listener
[3/4] drill git commit: DRILL-1498 Drill Client to handle spurious
results and handshake messages
Posted by pa...@apache.org.
DRILL-1498 Drill Client to handle spurious results and handshake messages
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/4304b25e
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/4304b25e
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/4304b25e
Branch: refs/heads/master
Commit: 4304b25e8246970163966e5790231d7ffdbd308f
Parents: 621527d
Author: norrislee <no...@hotmail.com>
Authored: Tue Jan 6 14:44:00 2015 -0800
Committer: Parth Chandra <pc...@maprtech.com>
Committed: Tue Jan 6 15:31:26 2015 -0800
----------------------------------------------------------------------
.../client/src/clientlib/drillClientImpl.cpp | 41 ++++++++++++++++++--
.../native/client/src/clientlib/rpcDecoder.cpp | 2 +
.../native/client/src/clientlib/rpcMessage.hpp | 4 ++
3 files changed, 44 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/4304b25e/contrib/native/client/src/clientlib/drillClientImpl.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/drillClientImpl.cpp b/contrib/native/client/src/clientlib/drillClientImpl.cpp
index 5b390ef..6f4a2ca 100644
--- a/contrib/native/client/src/clientlib/drillClientImpl.cpp
+++ b/contrib/native/client/src/clientlib/drillClientImpl.cpp
@@ -478,6 +478,17 @@ status_t DrillClientImpl::processQueryResult(AllocatedBufferPtr allocatedBuffer
qid.CopyFrom(qr->query_id());
std::map<exec::shared::QueryId*, DrillClientQueryResult*, compareQueryId>::iterator it;
+ DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: m_queryResults size: " << m_queryResults.size() << std::endl;
+ if(m_queryResults.size() != 0){
+ for(it=m_queryResults.begin(); it!=m_queryResults.end(); it++){
+ DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: m_QueryResult ids: [" << it->first->part1() << ":"
+ << it->first->part2() << "]\n";
+ }
+ }
+ if(qid.part1()==0){
+ DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: QID=0. Ignore and return QRY_SUCCESS." << std::endl;
+ return QRY_SUCCESS;
+ }
it=this->m_queryResults.find(&qid);
if(it!=this->m_queryResults.end()){
pDrillClientQueryResult=(*it).second;
@@ -610,6 +621,13 @@ status_t DrillClientImpl::processQueryId(AllocatedBufferPtr allocatedBuffer, InB
boost::lock_guard<boost::mutex> lock(m_dcMutex);
std::map<int,DrillClientQueryResult*>::iterator it;
+ for(it=this->m_queryIds.begin();it!=this->m_queryIds.end();it++){
+ DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::processQueryId: m_queryIds: " << it->first << std::endl;
+ }
+ if(msg.m_coord_id==0){
+ DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryId: m_coord_id=0. Ignore and return QRY_SUCCESS." << std::endl;
+ return QRY_SUCCESS;
+ }
it=this->m_queryIds.find(msg.m_coord_id);
if(it!=this->m_queryIds.end()){
pDrillClientQueryResult=(*it).second;
@@ -755,10 +773,27 @@ void DrillClientImpl::handleRead(ByteBuf_t _buf,
handleQryError(QRY_COMM_ERROR, getMessage(ERR_QRY_COMMERR, error.message().c_str()), NULL);
return;
}else{
- //If not QUERY_RESULT, then we think something serious has gone wrong?
- assert(0);
- DRILL_LOG(LOG_TRACE) << "QueryResult returned " << msg.m_rpc_type << std::endl;
+ // If not QUERY_RESULT, then we think something serious has gone wrong?
+ // In one case when the client hung, we observed that the server was sending a handshake request to the client
+ // We should properly handle these handshake requests/responses
+ if(msg.has_rpc_type() && msg.m_rpc_type==exec::user::HANDSHAKE){
+ if(msg.has_mode() && msg.m_mode==exec::rpc::REQUEST){
+ DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handshake request from server. Send response.\n";
+ exec::user::UserToBitHandshake u2b;
+ u2b.set_channel(exec::shared::USER);
+ u2b.set_rpc_version(DRILL_RPC_VERSION);
+ u2b.set_support_listening(true);
+ OutBoundRpcMessage out_msg(exec::rpc::RESPONSE, exec::user::HANDSHAKE, msg.m_coord_id, &u2b);
+ sendSync(out_msg);
+ DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handshake response sent.\n";
+ }else{
+ DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handshake response from server. Ignore.\n";
+ }
+ }else{
+ DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: ERR_QRY_INVRPCTYPE. "
+ << "QueryResult returned " << msg.m_rpc_type << std::endl;
handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVRPCTYPE, msg.m_rpc_type), NULL);
+ }
return;
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/4304b25e/contrib/native/client/src/clientlib/rpcDecoder.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/rpcDecoder.cpp b/contrib/native/client/src/clientlib/rpcDecoder.cpp
index c1001fd..d3cf50c 100644
--- a/contrib/native/client/src/clientlib/rpcDecoder.cpp
+++ b/contrib/native/client/src/clientlib/rpcDecoder.cpp
@@ -84,8 +84,10 @@ int RpcDecoder::Decode(const uint8_t* buf, int length, InBoundRpcMessage& msg) {
int header_limit = cis->PushLimit(header_length);
header.ParseFromCodedStream(cis);
cis->PopLimit(header_limit);
+ msg.m_has_mode = header.has_mode();
msg.m_mode = header.mode();
msg.m_coord_id = header.coordination_id();
+ msg.m_has_rpc_type = header.has_rpc_type();
msg.m_rpc_type = header.rpc_type();
//if(RpcConstants.EXTRA_DEBUGGING) logger.debug(" post header read index {}", buffer.readerIndex());
http://git-wip-us.apache.org/repos/asf/drill/blob/4304b25e/contrib/native/client/src/clientlib/rpcMessage.hpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/rpcMessage.hpp b/contrib/native/client/src/clientlib/rpcMessage.hpp
index fa92c42..6696971 100644
--- a/contrib/native/client/src/clientlib/rpcMessage.hpp
+++ b/contrib/native/client/src/clientlib/rpcMessage.hpp
@@ -33,6 +33,10 @@ class InBoundRpcMessage {
int m_coord_id;
DataBuf m_pbody;
ByteBuf_t m_dbody;
+ bool m_has_mode;
+ bool m_has_rpc_type;
+ bool has_mode() { return m_has_mode; };
+ bool has_rpc_type() { return m_has_rpc_type; };
};
class OutBoundRpcMessage {
[2/4] drill git commit: DRILL-1776: C++ Client. Add interface to get
application context.
Posted by pa...@apache.org.
DRILL-1776: C++ Client. Add interface to get application context.
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/621527d5
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/621527d5
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/621527d5
Branch: refs/heads/master
Commit: 621527d580e447c38952fa9046703811388b670c
Parents: 07f276d
Author: Xiao Meng <xi...@gmail.com>
Authored: Mon Nov 24 14:41:55 2014 -0800
Committer: Parth Chandra <pc...@maprtech.com>
Committed: Tue Jan 6 09:07:42 2015 -0800
----------------------------------------------------------------------
contrib/native/client/src/clientlib/drillClient.cpp | 3 +++
contrib/native/client/src/include/drill/drillClient.hpp | 4 ++++
2 files changed, 7 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/621527d5/contrib/native/client/src/clientlib/drillClient.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/drillClient.cpp b/contrib/native/client/src/clientlib/drillClient.cpp
index 70058ec..a8cfe8c 100644
--- a/contrib/native/client/src/clientlib/drillClient.cpp
+++ b/contrib/native/client/src/clientlib/drillClient.cpp
@@ -294,6 +294,9 @@ RecordIterator* DrillClient::submitQuery(Drill::QueryType t, const std::string&
return pIter;
}
+void* DrillClient::getApplicationContext(QueryHandle_t handle){
+ return ((DrillClientQueryResult*)handle)->getListenerContext();
+}
std::string& DrillClient::getError(){
return m_pImpl->getError()->msg;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/621527d5/contrib/native/client/src/include/drill/drillClient.hpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/include/drill/drillClient.hpp b/contrib/native/client/src/include/drill/drillClient.hpp
index 65c07d6..490c823 100644
--- a/contrib/native/client/src/include/drill/drillClient.hpp
+++ b/contrib/native/client/src/include/drill/drillClient.hpp
@@ -238,6 +238,10 @@ class DECLSPEC_DRILL_CLIENT DrillClient{
* back. The listener callback will return the handle in the ctx parameter.
*/
status_t submitQuery(Drill::QueryType t, const std::string& plan, pfnQueryResultsListener listener, void* listenerCtx, QueryHandle_t* qHandle);
+ /*
+ * Get the application context from query handle
+ */
+ static void* getApplicationContext(QueryHandle_t handle);
/*
* Submit a query asynchronously and wait for results to be returned through an iterator that returns
[4/4] drill git commit: DRILL-1361: C++ Client needs a better error
message when the handshake fails.
Posted by pa...@apache.org.
DRILL-1361: C++ Client needs a better error message when the handshake fails.
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/363d30b5
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/363d30b5
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/363d30b5
Branch: refs/heads/master
Commit: 363d30b54255840daa46959e1f7d8b2a779fa692
Parents: 4304b25
Author: Parth Chandra <pc...@maprtech.com>
Authored: Fri Jan 2 15:22:31 2015 -0800
Committer: Parth Chandra <pc...@maprtech.com>
Committed: Tue Jan 6 17:30:46 2015 -0800
----------------------------------------------------------------------
contrib/native/client/src/clientlib/drillClientImpl.cpp | 7 ++++++-
contrib/native/client/src/clientlib/errmsgs.cpp | 2 +-
2 files changed, 7 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/363d30b5/contrib/native/client/src/clientlib/drillClientImpl.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/drillClientImpl.cpp b/contrib/native/client/src/clientlib/drillClientImpl.cpp
index 6f4a2ca..84aa6cd 100644
--- a/contrib/native/client/src/clientlib/drillClientImpl.cpp
+++ b/contrib/native/client/src/clientlib/drillClientImpl.cpp
@@ -230,7 +230,12 @@ void DrillClientImpl::handleHandshake(ByteBuf_t _buf,
}else{
// boost error
- handleConnError(CONN_FAILURE, getMessage(ERR_CONN_RDFAIL, error.message().c_str()));
+ if(error==boost::asio::error::eof){ // Server broke off the connection
+ handleConnError(CONN_HANDSHAKE_FAILED,
+ getMessage(ERR_CONN_NOHSHAKE, DRILL_RPC_VERSION, m_handshakeVersion));
+ }else{
+ handleConnError(CONN_FAILURE, getMessage(ERR_CONN_RDFAIL, error.message().c_str()));
+ }
return;
}
return;
http://git-wip-us.apache.org/repos/asf/drill/blob/363d30b5/contrib/native/client/src/clientlib/errmsgs.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/errmsgs.cpp b/contrib/native/client/src/clientlib/errmsgs.cpp
index e09bda1..a5e7217 100644
--- a/contrib/native/client/src/clientlib/errmsgs.cpp
+++ b/contrib/native/client/src/clientlib/errmsgs.cpp
@@ -29,7 +29,7 @@ static Drill::ErrorMessages errorMessages[]={
{ERR_CONN_FAILURE, ERR_CATEGORY_CONN, 0, "Connection failure. Host:%s port:%s. Error: %s."},
{ERR_CONN_EXCEPT, ERR_CATEGORY_CONN, 0, "Socket connection failure with the following exception: %s."},
{ERR_CONN_UNKPROTO, ERR_CATEGORY_CONN, 0, "Unknown protocol: %s."},
- {ERR_CONN_RDFAIL, ERR_CATEGORY_CONN, 0, "A socket read failed with error: %s."},
+ {ERR_CONN_RDFAIL, ERR_CATEGORY_CONN, 0, "Connection failed with error: %s."},
{ERR_CONN_WFAIL, ERR_CATEGORY_CONN, 0, "Synchronous socket write failed with error: %s."},
{ERR_CONN_ZOOKEEPER, ERR_CATEGORY_CONN, 0, "Zookeeper error. %s"},
{ERR_CONN_NOHSHAKE, ERR_CATEGORY_CONN, 0, "Handshake failed: Expected RPC version %d, got %d."},