You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by parthchandra <gi...@git.apache.org> on 2016/02/29 04:24:46 UTC

[GitHub] drill pull request: DRILL-4313: Improve method of picking a random...

GitHub user parthchandra opened a pull request:

    https://github.com/apache/drill/pull/396

    DRILL-4313: Improve method of picking a random drillbit from the C++ client

    Submitting as multiple commits to make review easier
    Changes -
    1) Made the logging thread safe. 
    2) Use Boost random method generation instead of C rand() method.
    3) Include git properties in build and print build commit id in log.
    4) Add interface to get client error based on query handle. (Previous API always got back the last error that occurred). This could lead to clobbering of error messages when multiple errors were encountered.
    5) Allow pooled client connections. Every query submitted used a different connection from the pool. The pool is filled lazily. Disabled by default . Can be enabled by (and pool size defined by) setting environment variable(s).


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/parthchandra/incubator-drill DRILL-4313_2

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/drill/pull/396.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #396
    
----
commit a03ae1ec30ed28a1e15ffc8162ab560fdd544411
Author: Parth Chandra <pa...@apache.org>
Date:   2016-02-12T23:42:53Z

    DRILL-4313: C++ Client - Thread safe Logging

commit b494dc184578853167a3f6330c92beb475b74835
Author: Parth Chandra <pa...@apache.org>
Date:   2016-02-12T22:03:37Z

    DRILL-4313: Update random drill bit selection. Shuffle the list initially, then round robin.
    Add Utility methods to get random numbers and to shuffle and add vectors.
    Whitespace cleanup

commit 629142a295ecb4c79f37f31d5dd34d4b8a889638
Author: Parth Chandra <pa...@apache.org>
Date:   2016-02-18T23:53:35Z

    DRILL-4313: C++ client - Add Git properties to build and print to log.

commit 273cfc4bac96e4b65f66e6a7eea84642d50ecf50
Author: Parth Chandra <pa...@apache.org>
Date:   2016-02-22T19:21:58Z

    DRILL-4313: - Add interface to get error based on query handle.

commit f399792c2ca48e5d2c41b3daba4a2a3274992fcc
Author: Parth Chandra <pa...@apache.org>
Date:   2016-02-13T06:38:51Z

    DRILL-4313: Add support for Pooled connections.
    Allows switching between pooled and unpooled connections based on
    environment variables
    
    Conflicts:
    	contrib/native/client/src/clientlib/utils.hpp

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] drill pull request: DRILL-4313: Improve method of picking a random...

Posted by parthchandra <gi...@git.apache.org>.
Github user parthchandra commented on the pull request:

    https://github.com/apache/drill/pull/396#issuecomment-192797744
  
    Updated the PR to address review comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] drill pull request: DRILL-4313: Improve method of picking a random...

Posted by parthchandra <gi...@git.apache.org>.
Github user parthchandra closed the pull request at:

    https://github.com/apache/drill/pull/396


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] drill pull request: DRILL-4313: Improve method of picking a random...

Posted by sudheeshkatkam <gi...@git.apache.org>.
Github user sudheeshkatkam commented on a diff in the pull request:

    https://github.com/apache/drill/pull/396#discussion_r55112200
  
    --- Diff: contrib/native/client/example/pooledConnections.cpp ---
    @@ -0,0 +1,300 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +#include <fstream>
    +#include <iostream>
    +#include <stdio.h>
    +#include <stdlib.h>
    +#include <boost/thread.hpp>
    +#include "drill/drillc.hpp"
    +
    +int nOptions=5;
    +
    +struct Option{
    +    char name[32];
    +    char desc[128];
    +    bool required;
    +}qsOptions[]= {
    +    {"query", "Query strings, separated by semicolons", true},
    +    {"connectStr", "Connect string", true},
    +    {"logLevel", "Logging level [trace|debug|info|warn|error|fatal]", false},
    +    {"numConnections", "Number of simultaneous connections", true},
    +    {"numIterations", "Number of iterations to run. Each query is sent to each connection this many times", true}
    +};
    +
    +std::map<std::string, std::string> qsOptionValues;
    +
    +const char* exceptionInject="alter session set `drill.exec.testing.controls` = '{ \"injections\" : [{ \"type\":\"exception\",\"siteClass\":\"org.apache.drill.exec.work.fragment.FragmentExecutor\",\"desc\":\"fragment-execution\",\"nSkip\":0,\"nFire\":1,\"exceptionClass\":\"java.lang.OutOfMemoryError\"}]}'";
    --- End diff --
    
    not valid; missing injection site in FragmentExecutor?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] drill pull request: DRILL-4313: Improve method of picking a random...

Posted by sudheeshkatkam <gi...@git.apache.org>.
Github user sudheeshkatkam commented on a diff in the pull request:

    https://github.com/apache/drill/pull/396#discussion_r55112213
  
    --- Diff: contrib/native/client/src/clientlib/drillClientImpl.cpp ---
    @@ -1392,6 +1387,198 @@ void DrillClientQueryResult::clearAndDestroy(){
         }
     }
     
    +
    +connectionStatus_t PooledDrillClientImpl::connect(const char* connStr){
    +    connectionStatus_t stat = CONN_SUCCESS;
    +    std::string pathToDrill, protocol, hostPortStr;
    +    std::string host;
    +    std::string port;
    +    m_connectStr=connStr;
    +    Utils::parseConnectStr(connStr, pathToDrill, protocol, hostPortStr);
    +    if(!strcmp(protocol.c_str(), "zk")){
    +        // Get a list of drillbits
    +        ZookeeperImpl zook;
    +        std::vector<std::string> drillbits;
    +        int err = zook.getAllDrillbits(hostPortStr.c_str(), pathToDrill.c_str(), drillbits);
    +        if(!err){
    +            Utils::shuffle(drillbits);
    +            // The original shuffled order is maintained if we shuffle first and then add any missing elements
    +            Utils::add(m_drillbits, drillbits);
    +            exec::DrillbitEndpoint e;
    +            size_t nextIndex=0;
    +            boost::lock_guard<boost::mutex> cLock(m_cMutex);
    +            m_lastConnection++;
    +            nextIndex = (m_lastConnection)%(getDrillbitCount());
    +            DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Pooled Connection"
    +                    << "(" << (void*)this << ")"
    +                    << ": Current counter is: " 
    +                    << m_lastConnection << std::endl;)
    +                err=zook.getEndPoint(m_drillbits, nextIndex, e);
    +            if(!err){
    +                host=boost::lexical_cast<std::string>(e.address());
    +                port=boost::lexical_cast<std::string>(e.user_port());
    +            }
    +        }
    +        if(err){
    +            return handleConnError(CONN_ZOOKEEPER_ERROR, getMessage(ERR_CONN_ZOOKEEPER, zook.getError().c_str()));
    +        }
    +        zook.close();
    +        m_bIsDirectConnection=false;
    +    }else if(!strcmp(protocol.c_str(), "local")){
    +        char tempStr[MAX_CONNECT_STR+1];
    +        strncpy(tempStr, hostPortStr.c_str(), MAX_CONNECT_STR); tempStr[MAX_CONNECT_STR]=0;
    +        host=strtok(tempStr, ":");
    +        port=strtok(NULL, "");
    +        m_bIsDirectConnection=true;
    +    }else{
    +        return handleConnError(CONN_INVALID_INPUT, getMessage(ERR_CONN_UNKPROTO, protocol.c_str()));
    +    }
    +    DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Connecting to endpoint: (Pooled) " << host << ":" << port << std::endl;)
    +        DrillClientImpl* pDrillClientImpl = new DrillClientImpl();
    +    stat =  pDrillClientImpl->connect(host.c_str(), port.c_str());
    +    if(stat == CONN_SUCCESS){
    +        m_clientConnections.push_back(pDrillClientImpl);
    +    }else{
    +        DrillClientError* pErr = pDrillClientImpl->getError();
    +        handleConnError((connectionStatus_t)pErr->status, pErr->msg);
    +        delete pDrillClientImpl;
    +    }
    +    return stat;
    +}
    +
    +connectionStatus_t PooledDrillClientImpl::validateHandshake(DrillUserProperties* props){
    +    // Assume there is one valid connection to at least one drillbit
    +    connectionStatus_t stat=CONN_FAILURE;
    +    // Keep a copy of the user properties
    +    if(props!=NULL){
    +        m_pUserProperties = new DrillUserProperties;
    +        for(size_t i=0; i<props->size(); i++){
    +            m_pUserProperties->setProperty(
    +                    props->keyAt(i),
    +                    props->valueAt(i)
    +                    );
    +        }
    +    }
    +    DrillClientImpl* pDrillClientImpl = getOneConnection();
    +    if(pDrillClientImpl != NULL){
    +        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Validating handshake: (Pooled) " << pDrillClientImpl->m_connectedHost << std::endl;)
    +        stat=pDrillClientImpl->validateHandshake(m_pUserProperties);
    +    }
    +    else{
    +        stat =  handleConnError(CONN_NOTCONNECTED, getMessage(ERR_CONN_NOCONN));
    +    }
    +    return stat;
    +}
    +
    +DrillClientQueryResult* PooledDrillClientImpl::SubmitQuery(::exec::shared::QueryType t, const std::string& plan, pfnQueryResultsListener listener, void* listenerCtx){
    +    DrillClientQueryResult* pDrillClientQueryResult = NULL;
    +    DrillClientImpl* pDrillClientImpl = NULL;
    +    pDrillClientImpl = getOneConnection();
    +    if(pDrillClientImpl != NULL){
    +        pDrillClientQueryResult=pDrillClientImpl->SubmitQuery(t,plan,listener,listenerCtx);
    +        m_queriesExecuted++;
    +    }
    +    return pDrillClientQueryResult;
    +}
    +
    +void PooledDrillClientImpl::freeQueryResources(DrillClientQueryResult* pQryResult){
    +    // Nothing to do. If this class ever keeps track of executing queries then it will need 
    +    // to implement this call to free any query specific resources the pool might have 
    +    // allocated
    +    return;
    +}
    +
    +bool PooledDrillClientImpl::Active(){
    +    for(std::vector<DrillClientImpl*>::iterator it = m_clientConnections.begin(); it != m_clientConnections.end(); ++it){
    +        if((*it)->Active()){
    +            return true;
    +        }
    +    }
    +    return false;
    +}
    +
    +void PooledDrillClientImpl::Close() {
    +    for(std::vector<DrillClientImpl*>::iterator it = m_clientConnections.begin(); it != m_clientConnections.end(); ++it){
    +        (*it)->Close();
    --- End diff --
    
    Can this throw an exception?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] drill pull request: DRILL-4313: Improve method of picking a random...

Posted by sudheeshkatkam <gi...@git.apache.org>.
Github user sudheeshkatkam commented on a diff in the pull request:

    https://github.com/apache/drill/pull/396#discussion_r55112202
  
    --- Diff: contrib/native/client/example/pooledConnections.cpp ---
    @@ -0,0 +1,300 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +#include <fstream>
    +#include <iostream>
    +#include <stdio.h>
    +#include <stdlib.h>
    +#include <boost/thread.hpp>
    +#include "drill/drillc.hpp"
    +
    +int nOptions=5;
    +
    +struct Option{
    +    char name[32];
    +    char desc[128];
    +    bool required;
    +}qsOptions[]= {
    +    {"query", "Query strings, separated by semicolons", true},
    +    {"connectStr", "Connect string", true},
    +    {"logLevel", "Logging level [trace|debug|info|warn|error|fatal]", false},
    +    {"numConnections", "Number of simultaneous connections", true},
    +    {"numIterations", "Number of iterations to run. Each query is sent to each connection this many times", true}
    +};
    +
    +std::map<std::string, std::string> qsOptionValues;
    +
    +const char* exceptionInject="alter session set `drill.exec.testing.controls` = '{ \"injections\" : [{ \"type\":\"exception\",\"siteClass\":\"org.apache.drill.exec.work.fragment.FragmentExecutor\",\"desc\":\"fragment-execution\",\"nSkip\":0,\"nFire\":1,\"exceptionClass\":\"java.lang.OutOfMemoryError\"}]}'";
    +
    +Drill::status_t SchemaListener(void* ctx, Drill::FieldDefPtr fields, Drill::DrillClientError* err){
    +    if(!err){
    +        printf("SCHEMA CHANGE DETECTED:\n");
    +        for(size_t i=0; i<fields->size(); i++){
    +            std::string name= fields->at(i)->getName();
    +            printf("%s\t", name.c_str());
    +        }
    +        printf("\n");
    +        return Drill::QRY_SUCCESS ;
    +    }else{
    +        std::cerr<< "ERROR: " << err->msg << std::endl;
    +        return Drill::QRY_FAILURE;
    +    }
    +}
    +
    +boost::mutex listenerMutex;
    +Drill::status_t QueryResultsListener(void* ctx, Drill::RecordBatch* b, Drill::DrillClientError* err){
    +    boost::lock_guard<boost::mutex> listenerLock(listenerMutex);
    +    if(!err){
    +        if(b!=NULL){
    +            b->print(std::cout, 0); // print all rows
    +            std::cout << "DATA RECEIVED ..." << std::endl;
    +            delete b; // we're done with this batch, we can delete it
    +            return Drill::QRY_FAILURE;
    +        }else{
    +            std::cout << "Query Complete." << std::endl;
    +            return Drill::QRY_SUCCESS;
    +		}
    +    }else{
    +        assert(b==NULL);
    +        switch(err->status) {
    +            case Drill::QRY_COMPLETED:
    +            case Drill::QRY_CANCELED:
    +                std::cerr<< "INFO: " << err->msg << std::endl;
    +                return Drill::QRY_SUCCESS;
    --- End diff --
    
    Confusing, since query succeeds when error != null ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] drill pull request: DRILL-4313: Improve method of picking a random...

Posted by parthchandra <gi...@git.apache.org>.
Github user parthchandra commented on a diff in the pull request:

    https://github.com/apache/drill/pull/396#discussion_r55117425
  
    --- Diff: contrib/native/client/src/clientlib/drillClientImpl.cpp ---
    @@ -1392,6 +1387,198 @@ void DrillClientQueryResult::clearAndDestroy(){
         }
     }
     
    +
    +connectionStatus_t PooledDrillClientImpl::connect(const char* connStr){
    +    connectionStatus_t stat = CONN_SUCCESS;
    +    std::string pathToDrill, protocol, hostPortStr;
    +    std::string host;
    +    std::string port;
    +    m_connectStr=connStr;
    +    Utils::parseConnectStr(connStr, pathToDrill, protocol, hostPortStr);
    +    if(!strcmp(protocol.c_str(), "zk")){
    +        // Get a list of drillbits
    +        ZookeeperImpl zook;
    +        std::vector<std::string> drillbits;
    +        int err = zook.getAllDrillbits(hostPortStr.c_str(), pathToDrill.c_str(), drillbits);
    +        if(!err){
    +            Utils::shuffle(drillbits);
    +            // The original shuffled order is maintained if we shuffle first and then add any missing elements
    +            Utils::add(m_drillbits, drillbits);
    +            exec::DrillbitEndpoint e;
    +            size_t nextIndex=0;
    +            boost::lock_guard<boost::mutex> cLock(m_cMutex);
    +            m_lastConnection++;
    +            nextIndex = (m_lastConnection)%(getDrillbitCount());
    +            DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Pooled Connection"
    +                    << "(" << (void*)this << ")"
    +                    << ": Current counter is: " 
    +                    << m_lastConnection << std::endl;)
    +                err=zook.getEndPoint(m_drillbits, nextIndex, e);
    +            if(!err){
    +                host=boost::lexical_cast<std::string>(e.address());
    +                port=boost::lexical_cast<std::string>(e.user_port());
    +            }
    +        }
    +        if(err){
    +            return handleConnError(CONN_ZOOKEEPER_ERROR, getMessage(ERR_CONN_ZOOKEEPER, zook.getError().c_str()));
    +        }
    +        zook.close();
    +        m_bIsDirectConnection=false;
    +    }else if(!strcmp(protocol.c_str(), "local")){
    +        char tempStr[MAX_CONNECT_STR+1];
    +        strncpy(tempStr, hostPortStr.c_str(), MAX_CONNECT_STR); tempStr[MAX_CONNECT_STR]=0;
    +        host=strtok(tempStr, ":");
    +        port=strtok(NULL, "");
    +        m_bIsDirectConnection=true;
    +    }else{
    +        return handleConnError(CONN_INVALID_INPUT, getMessage(ERR_CONN_UNKPROTO, protocol.c_str()));
    +    }
    +    DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Connecting to endpoint: (Pooled) " << host << ":" << port << std::endl;)
    +        DrillClientImpl* pDrillClientImpl = new DrillClientImpl();
    +    stat =  pDrillClientImpl->connect(host.c_str(), port.c_str());
    +    if(stat == CONN_SUCCESS){
    +        m_clientConnections.push_back(pDrillClientImpl);
    +    }else{
    +        DrillClientError* pErr = pDrillClientImpl->getError();
    +        handleConnError((connectionStatus_t)pErr->status, pErr->msg);
    +        delete pDrillClientImpl;
    +    }
    +    return stat;
    +}
    +
    +connectionStatus_t PooledDrillClientImpl::validateHandshake(DrillUserProperties* props){
    +    // Assume there is one valid connection to at least one drillbit
    +    connectionStatus_t stat=CONN_FAILURE;
    +    // Keep a copy of the user properties
    +    if(props!=NULL){
    +        m_pUserProperties = new DrillUserProperties;
    +        for(size_t i=0; i<props->size(); i++){
    +            m_pUserProperties->setProperty(
    +                    props->keyAt(i),
    +                    props->valueAt(i)
    +                    );
    +        }
    +    }
    +    DrillClientImpl* pDrillClientImpl = getOneConnection();
    +    if(pDrillClientImpl != NULL){
    +        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Validating handshake: (Pooled) " << pDrillClientImpl->m_connectedHost << std::endl;)
    +        stat=pDrillClientImpl->validateHandshake(m_pUserProperties);
    +    }
    +    else{
    +        stat =  handleConnError(CONN_NOTCONNECTED, getMessage(ERR_CONN_NOCONN));
    +    }
    +    return stat;
    +}
    +
    +DrillClientQueryResult* PooledDrillClientImpl::SubmitQuery(::exec::shared::QueryType t, const std::string& plan, pfnQueryResultsListener listener, void* listenerCtx){
    +    DrillClientQueryResult* pDrillClientQueryResult = NULL;
    +    DrillClientImpl* pDrillClientImpl = NULL;
    +    pDrillClientImpl = getOneConnection();
    +    if(pDrillClientImpl != NULL){
    +        pDrillClientQueryResult=pDrillClientImpl->SubmitQuery(t,plan,listener,listenerCtx);
    +        m_queriesExecuted++;
    +    }
    +    return pDrillClientQueryResult;
    +}
    +
    +void PooledDrillClientImpl::freeQueryResources(DrillClientQueryResult* pQryResult){
    +    // Nothing to do. If this class ever keeps track of executing queries then it will need 
    +    // to implement this call to free any query specific resources the pool might have 
    +    // allocated
    +    return;
    +}
    +
    +bool PooledDrillClientImpl::Active(){
    +    for(std::vector<DrillClientImpl*>::iterator it = m_clientConnections.begin(); it != m_clientConnections.end(); ++it){
    +        if((*it)->Active()){
    +            return true;
    +        }
    +    }
    +    return false;
    +}
    +
    +void PooledDrillClientImpl::Close() {
    +    for(std::vector<DrillClientImpl*>::iterator it = m_clientConnections.begin(); it != m_clientConnections.end(); ++it){
    +        (*it)->Close();
    --- End diff --
    
    Nope. This simply shuts down the socket. the shutdown can return one of the following errors (none of which is a problem)- 
    EBADF socket is not a valid file descriptor.
    ENOTSOCK socket is not a socket.
    ENOTCONN socket is not connected.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] drill pull request: DRILL-4313: Improve method of picking a random...

Posted by hnfgns <gi...@git.apache.org>.
Github user hnfgns commented on a diff in the pull request:

    https://github.com/apache/drill/pull/396#discussion_r55286410
  
    --- Diff: contrib/native/client/example/querySubmitter.cpp ---
    @@ -317,16 +320,20 @@ int main(int argc, char* argv[]) {
             std::vector<Drill::QueryHandle_t*>::iterator queryHandleIter;
     
             Drill::DrillClient client;
    -        // To log to file
    -        //DrillClient::initLogging("/var/log/drill/", l);
    +#if defined _WIN32 || defined _WIN64
    +		const char* logpathPrefix = "C:\\Users\\Administrator\\Documents\\temp\\drillclient";
    --- End diff --
    
    minor: the use of GetTempPath in <windows.h> seems more appropriate here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] drill pull request: DRILL-4313: Improve method of picking a random...

Posted by hnfgns <gi...@git.apache.org>.
Github user hnfgns commented on a diff in the pull request:

    https://github.com/apache/drill/pull/396#discussion_r55290473
  
    --- Diff: contrib/native/client/src/clientlib/drillClientImpl.cpp ---
    @@ -1392,6 +1390,206 @@ void DrillClientQueryResult::clearAndDestroy(){
         }
     }
     
    +
    +connectionStatus_t PooledDrillClientImpl::connect(const char* connStr){
    +    connectionStatus_t stat = CONN_SUCCESS;
    +    std::string pathToDrill, protocol, hostPortStr;
    +    std::string host;
    +    std::string port;
    +    m_connectStr=connStr;
    +    Utils::parseConnectStr(connStr, pathToDrill, protocol, hostPortStr);
    +    if(!strcmp(protocol.c_str(), "zk")){
    +        // Get a list of drillbits
    +        ZookeeperImpl zook;
    +        std::vector<std::string> drillbits;
    +        int err = zook.getAllDrillbits(hostPortStr.c_str(), pathToDrill.c_str(), drillbits);
    +        if(!err){
    +            Utils::shuffle(drillbits);
    +            // The original shuffled order is maintained if we shuffle first and then add any missing elements
    +            Utils::add(m_drillbits, drillbits);
    +            exec::DrillbitEndpoint e;
    +            size_t nextIndex=0;
    +            {
    +                boost::lock_guard<boost::mutex> cLock(m_cMutex);
    +                m_lastConnection++;
    +                nextIndex = (m_lastConnection)%(getDrillbitCount());
    +            }
    +            DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Pooled Connection"
    +                    << "(" << (void*)this << ")"
    +                    << ": Current counter is: " 
    +                    << m_lastConnection << std::endl;)
    +                err=zook.getEndPoint(m_drillbits, nextIndex, e);
    +            if(!err){
    +                host=boost::lexical_cast<std::string>(e.address());
    +                port=boost::lexical_cast<std::string>(e.user_port());
    +            }
    +        }
    +        if(err){
    +            return handleConnError(CONN_ZOOKEEPER_ERROR, getMessage(ERR_CONN_ZOOKEEPER, zook.getError().c_str()));
    +        }
    +        zook.close();
    +        m_bIsDirectConnection=false;
    +    }else if(!strcmp(protocol.c_str(), "local")){
    +        char tempStr[MAX_CONNECT_STR+1];
    +        strncpy(tempStr, hostPortStr.c_str(), MAX_CONNECT_STR); tempStr[MAX_CONNECT_STR]=0;
    +        host=strtok(tempStr, ":");
    +        port=strtok(NULL, "");
    +        m_bIsDirectConnection=true;
    +    }else{
    +        return handleConnError(CONN_INVALID_INPUT, getMessage(ERR_CONN_UNKPROTO, protocol.c_str()));
    +    }
    +    DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Connecting to endpoint: (Pooled) " << host << ":" << port << std::endl;)
    +        DrillClientImpl* pDrillClientImpl = new DrillClientImpl();
    +    stat =  pDrillClientImpl->connect(host.c_str(), port.c_str());
    +    if(stat == CONN_SUCCESS){
    +        boost::lock_guard<boost::mutex> lock(m_poolMutex);
    +        m_clientConnections.push_back(pDrillClientImpl);
    +    }else{
    +        DrillClientError* pErr = pDrillClientImpl->getError();
    +        handleConnError((connectionStatus_t)pErr->status, pErr->msg);
    +        delete pDrillClientImpl;
    +    }
    +    return stat;
    +}
    +
    +connectionStatus_t PooledDrillClientImpl::validateHandshake(DrillUserProperties* props){
    +    // Assume there is one valid connection to at least one drillbit
    +    connectionStatus_t stat=CONN_FAILURE;
    +    // Keep a copy of the user properties
    +    if(props!=NULL){
    +        m_pUserProperties = new DrillUserProperties;
    +        for(size_t i=0; i<props->size(); i++){
    +            m_pUserProperties->setProperty(
    +                    props->keyAt(i),
    +                    props->valueAt(i)
    +                    );
    +        }
    +    }
    +    DrillClientImpl* pDrillClientImpl = getOneConnection();
    +    if(pDrillClientImpl != NULL){
    +        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Validating handshake: (Pooled) " << pDrillClientImpl->m_connectedHost << std::endl;)
    +        stat=pDrillClientImpl->validateHandshake(m_pUserProperties);
    +    }
    +    else{
    +        stat =  handleConnError(CONN_NOTCONNECTED, getMessage(ERR_CONN_NOCONN));
    +    }
    +    return stat;
    +}
    +
    +DrillClientQueryResult* PooledDrillClientImpl::SubmitQuery(::exec::shared::QueryType t, const std::string& plan, pfnQueryResultsListener listener, void* listenerCtx){
    +    DrillClientQueryResult* pDrillClientQueryResult = NULL;
    +    DrillClientImpl* pDrillClientImpl = NULL;
    +    pDrillClientImpl = getOneConnection();
    +    if(pDrillClientImpl != NULL){
    +        pDrillClientQueryResult=pDrillClientImpl->SubmitQuery(t,plan,listener,listenerCtx);
    +        m_queriesExecuted++;
    +    }
    +    return pDrillClientQueryResult;
    +}
    +
    +void PooledDrillClientImpl::freeQueryResources(DrillClientQueryResult* pQryResult){
    +    // Nothing to do. If this class ever keeps track of executing queries then it will need 
    +    // to implement this call to free any query specific resources the pool might have 
    +    // allocated
    +    return;
    +}
    +
    +bool PooledDrillClientImpl::Active(){
    +    boost::lock_guard<boost::mutex> lock(m_poolMutex);
    +    for(std::vector<DrillClientImpl*>::iterator it = m_clientConnections.begin(); it != m_clientConnections.end(); ++it){
    +        if((*it)->Active()){
    +            return true;
    +        }
    +    }
    +    return false;
    +}
    +
    +void PooledDrillClientImpl::Close() {
    +    boost::lock_guard<boost::mutex> lock(m_poolMutex);
    --- End diff --
    
    It is reasonable to assume that an instance will be reused following a call to #Close(). So we need to reset all variables here in #Close(): m_drillbits, m_lastConnection...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] drill pull request: DRILL-4313: Improve method of picking a random...

Posted by parthchandra <gi...@git.apache.org>.
Github user parthchandra commented on a diff in the pull request:

    https://github.com/apache/drill/pull/396#discussion_r55117302
  
    --- Diff: contrib/native/client/example/pooledConnections.cpp ---
    @@ -0,0 +1,300 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +#include <fstream>
    +#include <iostream>
    +#include <stdio.h>
    +#include <stdlib.h>
    +#include <boost/thread.hpp>
    +#include "drill/drillc.hpp"
    +
    +int nOptions=5;
    +
    +struct Option{
    +    char name[32];
    +    char desc[128];
    +    bool required;
    +}qsOptions[]= {
    +    {"query", "Query strings, separated by semicolons", true},
    +    {"connectStr", "Connect string", true},
    +    {"logLevel", "Logging level [trace|debug|info|warn|error|fatal]", false},
    +    {"numConnections", "Number of simultaneous connections", true},
    +    {"numIterations", "Number of iterations to run. Each query is sent to each connection this many times", true}
    +};
    +
    +std::map<std::string, std::string> qsOptionValues;
    +
    +const char* exceptionInject="alter session set `drill.exec.testing.controls` = '{ \"injections\" : [{ \"type\":\"exception\",\"siteClass\":\"org.apache.drill.exec.work.fragment.FragmentExecutor\",\"desc\":\"fragment-execution\",\"nSkip\":0,\"nFire\":1,\"exceptionClass\":\"java.lang.OutOfMemoryError\"}]}'";
    --- End diff --
    
    What do i need to add?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] drill pull request: DRILL-4313: Improve method of picking a random...

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on the pull request:

    https://github.com/apache/drill/pull/396#issuecomment-193489132
  
    Thanks for addressing the review comments!
    +1



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] drill pull request: DRILL-4313: Improve method of picking a random...

Posted by parthchandra <gi...@git.apache.org>.
Github user parthchandra commented on a diff in the pull request:

    https://github.com/apache/drill/pull/396#discussion_r55117339
  
    --- Diff: contrib/native/client/example/pooledConnections.cpp ---
    @@ -0,0 +1,300 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +#include <fstream>
    +#include <iostream>
    +#include <stdio.h>
    +#include <stdlib.h>
    +#include <boost/thread.hpp>
    +#include "drill/drillc.hpp"
    +
    +int nOptions=5;
    +
    +struct Option{
    +    char name[32];
    +    char desc[128];
    +    bool required;
    +}qsOptions[]= {
    +    {"query", "Query strings, separated by semicolons", true},
    +    {"connectStr", "Connect string", true},
    +    {"logLevel", "Logging level [trace|debug|info|warn|error|fatal]", false},
    +    {"numConnections", "Number of simultaneous connections", true},
    +    {"numIterations", "Number of iterations to run. Each query is sent to each connection this many times", true}
    +};
    +
    +std::map<std::string, std::string> qsOptionValues;
    +
    +const char* exceptionInject="alter session set `drill.exec.testing.controls` = '{ \"injections\" : [{ \"type\":\"exception\",\"siteClass\":\"org.apache.drill.exec.work.fragment.FragmentExecutor\",\"desc\":\"fragment-execution\",\"nSkip\":0,\"nFire\":1,\"exceptionClass\":\"java.lang.OutOfMemoryError\"}]}'";
    +
    +Drill::status_t SchemaListener(void* ctx, Drill::FieldDefPtr fields, Drill::DrillClientError* err){
    +    if(!err){
    +        printf("SCHEMA CHANGE DETECTED:\n");
    +        for(size_t i=0; i<fields->size(); i++){
    +            std::string name= fields->at(i)->getName();
    +            printf("%s\t", name.c_str());
    +        }
    +        printf("\n");
    +        return Drill::QRY_SUCCESS ;
    +    }else{
    +        std::cerr<< "ERROR: " << err->msg << std::endl;
    +        return Drill::QRY_FAILURE;
    +    }
    +}
    +
    +boost::mutex listenerMutex;
    +Drill::status_t QueryResultsListener(void* ctx, Drill::RecordBatch* b, Drill::DrillClientError* err){
    +    boost::lock_guard<boost::mutex> listenerLock(listenerMutex);
    +    if(!err){
    +        if(b!=NULL){
    +            b->print(std::cout, 0); // print all rows
    +            std::cout << "DATA RECEIVED ..." << std::endl;
    +            delete b; // we're done with this batch, we can delete it
    +            return Drill::QRY_FAILURE;
    +        }else{
    +            std::cout << "Query Complete." << std::endl;
    +            return Drill::QRY_SUCCESS;
    +		}
    +    }else{
    +        assert(b==NULL);
    +        switch(err->status) {
    +            case Drill::QRY_COMPLETED:
    +            case Drill::QRY_CANCELED:
    +                std::cerr<< "INFO: " << err->msg << std::endl;
    +                return Drill::QRY_SUCCESS;
    --- End diff --
    
    Yes. Bit of a bad design there. The return value of failure is actually to signal that the listener that the query completed. WE added another api to check the status explicitly later. Unfortunately, the ODBC driver now uses this so hard to change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] drill pull request: DRILL-4313: Improve method of picking a random...

Posted by hnfgns <gi...@git.apache.org>.
Github user hnfgns commented on the pull request:

    https://github.com/apache/drill/pull/396#issuecomment-193499159
  
    +1 once comments above are addressed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] drill pull request: DRILL-4313: Improve method of picking a random...

Posted by parthchandra <gi...@git.apache.org>.
Github user parthchandra commented on a diff in the pull request:

    https://github.com/apache/drill/pull/396#discussion_r55117740
  
    --- Diff: contrib/native/client/example/pooledConnections.cpp ---
    @@ -0,0 +1,300 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +#include <fstream>
    +#include <iostream>
    +#include <stdio.h>
    +#include <stdlib.h>
    +#include <boost/thread.hpp>
    +#include "drill/drillc.hpp"
    +
    +int nOptions=5;
    +
    +struct Option{
    +    char name[32];
    +    char desc[128];
    +    bool required;
    +}qsOptions[]= {
    +    {"query", "Query strings, separated by semicolons", true},
    +    {"connectStr", "Connect string", true},
    +    {"logLevel", "Logging level [trace|debug|info|warn|error|fatal]", false},
    +    {"numConnections", "Number of simultaneous connections", true},
    +    {"numIterations", "Number of iterations to run. Each query is sent to each connection this many times", true}
    +};
    +
    +std::map<std::string, std::string> qsOptionValues;
    +
    +const char* exceptionInject="alter session set `drill.exec.testing.controls` = '{ \"injections\" : [{ \"type\":\"exception\",\"siteClass\":\"org.apache.drill.exec.work.fragment.FragmentExecutor\",\"desc\":\"fragment-execution\",\"nSkip\":0,\"nFire\":1,\"exceptionClass\":\"java.lang.OutOfMemoryError\"}]}'";
    --- End diff --
    
    Actually, I'm going to remove this example. It is not correct as it depends on setting session options which do not work for pooled connections.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] drill pull request: DRILL-4313: Improve method of picking a random...

Posted by parthchandra <gi...@git.apache.org>.
Github user parthchandra commented on the pull request:

    https://github.com/apache/drill/pull/396#issuecomment-193864509
  
    Merged in df0f0af3d963c1b65eb01c3141fe84532c53f5a5


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---