You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@geode.apache.org by mhansonp <gi...@git.apache.org> on 2017/03/30 23:08:35 UTC

[GitHub] geode-native pull request #82: GEODE-2736: Fixed orphaned worker threads

GitHub user mhansonp opened a pull request:

    https://github.com/apache/geode-native/pull/82

    GEODE-2736: Fixed orphaned worker threads

    - Force timeouts to be handled like no connection
    - Added test

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/mhansonp/geode-native feature/GEODE-2736

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/geode-native/pull/82.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #82
    
----
commit 75c6dcb206a72f68356805e34290f0e9836533b1
Author: Mark Hanson <mh...@pivotal.io>
Date:   2017-03-30T17:19:40Z

    GEODE-2736: Fixed orphaned worker threads
    
    - Force timeouts to be handled like no connection
    - Added test

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] geode-native pull request #82: GEODE-2736: Fixed orphaned worker threads

Posted by PivotalSarge <gi...@git.apache.org>.
Github user PivotalSarge commented on a diff in the pull request:

    https://github.com/apache/geode-native/pull/82#discussion_r109194938
  
    --- Diff: src/cppcache/integration-test/testThinClientPoolExecuteFunctionThrowsException.cpp ---
    @@ -0,0 +1,362 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +#define ROOT_NAME "testThinClientPoolExecuteFunctionThrowsException"
    +
    +#include "fw_dunit.hpp"
    +#include "ThinClientHelper.hpp"
    +#include "testobject/VariousPdxTypes.hpp"
    +
    +#include <thread>
    +#include <chrono>
    +
    +using namespace PdxTests;
    +/* This is to test
    +1- funtion execution on pool
    + */
    +
    +#define CLIENT1 s1p1
    +#define LOCATOR1 s2p1
    +#define SERVER s2p2
    +
    +bool isLocalServer = false;
    +bool isLocator = false;
    +bool isPoolWithEndpoint = false;
    +
    +const char* locHostPort =
    +    CacheHelper::getLocatorHostPort(isLocator, isLocalServer, 1);
    +const char* poolRegNames[] = {"partition_region", "PoolRegion2"};
    +
    +const char* serverGroup = "ServerGroup1";
    +
    +char* getFuncIName = (char*)"MultiGetFunctionI";
    +char* putFuncIName = (char*)"MultiPutFunctionI";
    +char* getFuncName = (char*)"MultiGetFunction";
    +char* putFuncName = (char*)"MultiPutFunction";
    +char* rjFuncName = (char*)"RegionOperationsFunction";
    +char* exFuncName = (char*)"ExceptionHandlingFunction";
    +char* exFuncNameSendException = (char*)"executeFunction_SendException";
    +char* exFuncNamePdxType = (char*)"PdxFunctionTest";
    +char* FEOnRegionPrSHOP = (char*)"FEOnRegionPrSHOP";
    +char* FEOnRegionPrSHOP_OptimizeForWrite =
    +    (char*)"FEOnRegionPrSHOP_OptimizeForWrite";
    +char* FETimeOut = (char*)"FunctionExecutionTimeOut";
    +
    +#define verifyGetResults()                                                    \
    +    bool found = false;                                                         \
    +    for (int j = 0; j < 34; j++) {                                              \
    +      if (j % 2 == 0) continue;                                                 \
    +      sprintf(buf, "VALUE--%d", j);                                             \
    +      if (strcmp(buf, dynCast<CacheableStringPtr>(resultList->operator[](i))    \
    +                 ->asChar()) == 0) {                                   \
    +        LOGINFO(                                                                \
    +                                                                                "buf = %s "                                                         \
    +                                                                                "dynCast<CacheableStringPtr>(resultList->operator[](i))->asChar() " \
    +                                                                                "= %s ",                                                            \
    +                                                                                buf,                                                                \
    +                                                                                dynCast<CacheableStringPtr>(resultList->operator[](i))->asChar());  \
    +                                                                                found = true;                                                           \
    +                                                                                break;                                                                  \
    +      }                                                                         \
    +    }                                                                           \
    +    ASSERT(found, "this returned value is invalid");
    +
    +#define verifyGetKeyResults()                                                 \
    +    bool found = false;                                                         \
    +    for (int j = 0; j < 34; j++) {                                              \
    +      if (j % 2 == 0) continue;                                                 \
    +      sprintf(buf, "KEY--%d", j);                                               \
    +      if (strcmp(buf, dynCast<CacheableStringPtr>(resultList->operator[](i))    \
    +                 ->asChar()) == 0) {                                   \
    +        LOGINFO(                                                                \
    +                                                                                "buf = %s "                                                         \
    +                                                                                "dynCast<CacheableStringPtr>(resultList->operator[](i))->asChar() " \
    +                                                                                "= %s ",                                                            \
    +                                                                                buf,                                                                \
    +                                                                                dynCast<CacheableStringPtr>(resultList->operator[](i))->asChar());  \
    +                                                                                found = true;                                                           \
    +                                                                                break;                                                                  \
    +      }                                                                         \
    +    }                                                                           \
    +    ASSERT(found, "this returned KEY is invalid");
    +
    +#define verifyPutResults()                   \
    +    bool found = false;                        \
    +    for (int j = 0; j < 34; j++) {             \
    +      if (j % 2 == 0) continue;                \
    +      sprintf(buf, "KEY--%d", j);              \
    +      if (strcmp(buf, value->asChar()) == 0) { \
    +        found = true;                          \
    +        break;                                 \
    +      }                                        \
    +    }                                          \
    +    ASSERT(found, "this returned value is invalid");
    +class MyResultCollector : public ResultCollector {
    + public:
    +  MyResultCollector()
    + : m_resultList(CacheableVector::create()),
    +   m_isResultReady(false),
    +   m_endResultCount(0),
    +   m_addResultCount(0),
    +   m_getResultCount(0) {}
    +  ~MyResultCollector() {}
    +  CacheableVectorPtr getResult(uint32_t timeout) {
    +    m_getResultCount++;
    +    if (m_isResultReady == true) {
    +      return m_resultList;
    +    } else {
    +      for (uint32_t i = 0; i < timeout; i++) {
    +        SLEEP(1);
    +        if (m_isResultReady == true) return m_resultList;
    +      }
    +      throw FunctionExecutionException(
    +          "Result is not ready, endResults callback is called before invoking "
    +          "getResult() method");
    +    }
    +  }
    +
    +  void addResult(CacheablePtr& resultItem) {
    +    m_addResultCount++;
    +    if (resultItem == NULLPTR) {
    +      return;
    +    }
    +    try {
    +      CacheableArrayListPtr result = dynCast<CacheableArrayListPtr>(resultItem);
    +      for (int32_t i = 0; i < result->size(); i++) {
    +        m_resultList->push_back(result->operator[](i));
    +      }
    +    } catch (ClassCastException) {
    +      UserFunctionExecutionExceptionPtr result =
    +          dynCast<UserFunctionExecutionExceptionPtr>(resultItem);
    +      m_resultList->push_back(result);
    +    }
    +  }
    +  void endResults() {
    +    m_isResultReady = true;
    +    m_endResultCount++;
    +  }
    +  uint32_t getEndResultCount() { return m_endResultCount; }
    +  uint32_t getAddResultCount() { return m_addResultCount; }
    +  uint32_t getGetResultCount() { return m_getResultCount; }
    +
    + private:
    +  CacheableVectorPtr m_resultList;
    +  volatile bool m_isResultReady;
    +  uint32_t m_endResultCount;
    +  uint32_t m_addResultCount;
    +  uint32_t m_getResultCount;
    +};
    +typedef SharedPtr<MyResultCollector> MyResultCollectorPtr;
    +
    +DUNIT_TASK_DEFINITION(LOCATOR1, StartLocator1)
    +{
    +  // starting locator
    +  if (isLocator) {
    +    CacheHelper::initLocator(1);
    +    LOG("Locator1 started");
    +  }
    +}
    +END_TASK_DEFINITION
    +
    +DUNIT_TASK_DEFINITION(SERVER, StartS12)
    +{
    +  const char* lhp = NULL;
    +  if (!isPoolWithEndpoint) lhp = locHostPort;
    +  if (isLocalServer) {
    +    CacheHelper::initServer(1, "func_cacheserver1_pool.xml", lhp);
    +  }
    +  if (isLocalServer) {
    +    CacheHelper::initServer(2, "func_cacheserver2_pool.xml", lhp);
    +  }
    +}
    +END_TASK_DEFINITION
    +
    +DUNIT_TASK_DEFINITION(CLIENT1, StartC1)
    +{
    +  // initClient(true);
    +  initClientWithPool(true, NULL, locHostPort, serverGroup, NULLPTR, 0, true,
    +                     -1, -1, 60000, /*singlehop*/ true,
    +                     /*threadLocal*/ true);
    +  // createPool(poolName, locHostPort,serverGroup, NULL, 0, true );
    +  // createRegionAndAttachPool(poolRegNames[0],USE_ACK, poolName);
    +
    +  RegionPtr regPtr0 =
    +      createRegionAndAttachPool(poolRegNames[0], USE_ACK, NULL);
    +  ;  // getHelper()->createRegion( poolRegNames[0], USE_ACK);
    +  regPtr0->registerAllKeys();
    +
    +  LOG("Clnt1Init complete.");
    +}
    +END_TASK_DEFINITION
    +
    +DUNIT_TASK_DEFINITION(CLIENT1, Client1OpTest)
    +{
    +  RegionPtr regPtr0 = getHelper()->getRegion(poolRegNames[0]);
    +  char buf[128];
    +
    +  for (int i = 0; i < 34; i++) {
    +    sprintf(buf, "VALUE--%d", i);
    +    CacheablePtr value(CacheableString::create(buf));
    +
    +    sprintf(buf, "KEY--%d", i);
    +    CacheableKeyPtr key = CacheableKey::create(buf);
    +    regPtr0->put(key, value);
    +  }
    +  std::this_thread::sleep_for(std::chrono::seconds(10)); // let the put finish
    +
    +    //-----------------------Test with sendException
    +    // onRegion-------------------------------//
    +
    +    for (int i = 1; i <= 200; i++) {
    +      CacheablePtr value(CacheableInt32::create(i));
    +
    +      sprintf(buf, "execKey-%d", i);
    +      CacheableKeyPtr key = CacheableKey::create(buf);
    +      regPtr0->put(key, value);
    +    }
    +    LOG("Put for execKey's on region complete.");
    +
    +    LOG("Adding filter");
    +    CacheableArrayListPtr arrList = CacheableArrayList::create();
    +    for (int i = 100; i < 120; i++) {
    +      sprintf(buf, "execKey-%d", i);
    +      CacheableKeyPtr key = CacheableKey::create(buf);
    +      arrList->push_back(key);
    +    }
    +
    +    CacheableVectorPtr filter = CacheableVector::create();
    +    for (int i = 100; i < 120; i++) {
    +      sprintf(buf, "execKey-%d", i);
    +      CacheableKeyPtr key = CacheableKey::create(buf);
    +      filter->push_back(key);
    +    }
    +    LOG("Adding filter done.");
    +
    +    CacheablePtr args = CacheableBoolean::create(1);
    +
    +    ExecutionPtr funcExec = FunctionService::onRegion(regPtr0);
    +    ASSERT(funcExec != NULLPTR, "onRegion Returned NULL");
    +
    +    ResultCollectorPtr collector =
    +        funcExec->withArgs(args)->withFilter(filter)->execute(
    +            exFuncNameSendException, 15);
    +    ASSERT(collector != NULLPTR, "onRegion collector NULL");
    +
    +    CacheableVectorPtr result = collector->getResult();
    +
    +    if (result == NULLPTR) {
    +      ASSERT(false, "echo String : result is NULL");
    +    } else {
    +      try {
    +        for (int i = 0; i < result->size(); i++) {
    +          UserFunctionExecutionExceptionPtr uFEPtr =
    +              dynCast<UserFunctionExecutionExceptionPtr>(
    +                  result->operator[](i));
    +          ASSERT(uFEPtr != NULLPTR, "uFEPtr exception is NULL");
    +          LOGINFO("Done casting to uFEPtr");
    +          LOGINFO("Read expected uFEPtr exception %s ",
    +                  uFEPtr->getMessage()->asChar());
    +        }
    +      } catch (ClassCastException& ex) {
    +        std::string logmsg = "";
    +        logmsg += ex.getName();
    +        logmsg += ": ";
    +        logmsg += ex.getMessage();
    +        LOG(logmsg.c_str());
    +        ex.printStackTrace();
    +        FAIL(
    +            "exFuncNameSendException casting to string for bool argument "
    +            "exception.");
    +      } catch (...) {
    +        FAIL(
    +            "exFuncNameSendException casting to string for bool argument "
    +            "Unknown exception.");
    +      }
    +    }
    +
    +    LOG("exFuncNameSendException done for bool argument.");
    +
    +    collector = funcExec->withArgs(arrList)->withFilter(filter)->execute(
    +        exFuncNameSendException, 15);
    +    ASSERT(collector != NULLPTR, "onRegion collector for arrList NULL");
    +    std::this_thread::sleep_for(std::chrono::seconds(2));
    +    
    +    try {
    +        CacheableVectorPtr fil = CacheableVector::create();
    +        fil->push_back(CacheableInt32::create(1));
    +        ExecutionPtr exe = FunctionService::onRegion(regPtr0);
    +        
    +        LOGINFO("Executing the exception test it is expected to throw.");
    +        CacheableVectorPtr executeFunctionResult3 =
    +        funcExec->withArgs(arrList)->withFilter(filter)->execute("ThinClientRegionExceptionTest", 15)->getResult();
    --- End diff --
    
    Is the expectation that this method invocation will throw an exception? If so, it probably should be followed in the try block by a FAIL() to catch the cases where it doesn't.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] geode-native pull request #82: GEODE-2736: Fixed orphaned worker threads

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/geode-native/pull/82


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] geode-native pull request #82: GEODE-2736: Fixed orphaned worker threads

Posted by mhansonp <gi...@git.apache.org>.
Github user mhansonp commented on a diff in the pull request:

    https://github.com/apache/geode-native/pull/82#discussion_r109271745
  
    --- Diff: src/cppcache/integration-test/testThinClientPoolExecuteFunctionThrowsException.cpp ---
    @@ -0,0 +1,362 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +#define ROOT_NAME "testThinClientPoolExecuteFunctionThrowsException"
    +
    +#include "fw_dunit.hpp"
    +#include "ThinClientHelper.hpp"
    +#include "testobject/VariousPdxTypes.hpp"
    +
    +#include <thread>
    +#include <chrono>
    +
    +using namespace PdxTests;
    +/* This is to test
    +1- funtion execution on pool
    + */
    +
    +#define CLIENT1 s1p1
    +#define LOCATOR1 s2p1
    +#define SERVER s2p2
    +
    +bool isLocalServer = false;
    +bool isLocator = false;
    +bool isPoolWithEndpoint = false;
    +
    +const char* locHostPort =
    +    CacheHelper::getLocatorHostPort(isLocator, isLocalServer, 1);
    +const char* poolRegNames[] = {"partition_region", "PoolRegion2"};
    +
    +const char* serverGroup = "ServerGroup1";
    +
    +char* getFuncIName = (char*)"MultiGetFunctionI";
    +char* putFuncIName = (char*)"MultiPutFunctionI";
    +char* getFuncName = (char*)"MultiGetFunction";
    +char* putFuncName = (char*)"MultiPutFunction";
    +char* rjFuncName = (char*)"RegionOperationsFunction";
    +char* exFuncName = (char*)"ExceptionHandlingFunction";
    +char* exFuncNameSendException = (char*)"executeFunction_SendException";
    +char* exFuncNamePdxType = (char*)"PdxFunctionTest";
    +char* FEOnRegionPrSHOP = (char*)"FEOnRegionPrSHOP";
    +char* FEOnRegionPrSHOP_OptimizeForWrite =
    +    (char*)"FEOnRegionPrSHOP_OptimizeForWrite";
    +char* FETimeOut = (char*)"FunctionExecutionTimeOut";
    +
    +#define verifyGetResults()                                                    \
    +    bool found = false;                                                         \
    +    for (int j = 0; j < 34; j++) {                                              \
    +      if (j % 2 == 0) continue;                                                 \
    +      sprintf(buf, "VALUE--%d", j);                                             \
    +      if (strcmp(buf, dynCast<CacheableStringPtr>(resultList->operator[](i))    \
    +                 ->asChar()) == 0) {                                   \
    +        LOGINFO(                                                                \
    +                                                                                "buf = %s "                                                         \
    +                                                                                "dynCast<CacheableStringPtr>(resultList->operator[](i))->asChar() " \
    +                                                                                "= %s ",                                                            \
    +                                                                                buf,                                                                \
    +                                                                                dynCast<CacheableStringPtr>(resultList->operator[](i))->asChar());  \
    +                                                                                found = true;                                                           \
    +                                                                                break;                                                                  \
    +      }                                                                         \
    +    }                                                                           \
    +    ASSERT(found, "this returned value is invalid");
    +
    +#define verifyGetKeyResults()                                                 \
    +    bool found = false;                                                         \
    +    for (int j = 0; j < 34; j++) {                                              \
    +      if (j % 2 == 0) continue;                                                 \
    +      sprintf(buf, "KEY--%d", j);                                               \
    +      if (strcmp(buf, dynCast<CacheableStringPtr>(resultList->operator[](i))    \
    +                 ->asChar()) == 0) {                                   \
    +        LOGINFO(                                                                \
    +                                                                                "buf = %s "                                                         \
    +                                                                                "dynCast<CacheableStringPtr>(resultList->operator[](i))->asChar() " \
    +                                                                                "= %s ",                                                            \
    +                                                                                buf,                                                                \
    +                                                                                dynCast<CacheableStringPtr>(resultList->operator[](i))->asChar());  \
    +                                                                                found = true;                                                           \
    +                                                                                break;                                                                  \
    +      }                                                                         \
    +    }                                                                           \
    +    ASSERT(found, "this returned KEY is invalid");
    +
    +#define verifyPutResults()                   \
    +    bool found = false;                        \
    +    for (int j = 0; j < 34; j++) {             \
    +      if (j % 2 == 0) continue;                \
    +      sprintf(buf, "KEY--%d", j);              \
    +      if (strcmp(buf, value->asChar()) == 0) { \
    +        found = true;                          \
    +        break;                                 \
    +      }                                        \
    +    }                                          \
    +    ASSERT(found, "this returned value is invalid");
    +class MyResultCollector : public ResultCollector {
    + public:
    +  MyResultCollector()
    + : m_resultList(CacheableVector::create()),
    +   m_isResultReady(false),
    +   m_endResultCount(0),
    +   m_addResultCount(0),
    +   m_getResultCount(0) {}
    +  ~MyResultCollector() {}
    +  CacheableVectorPtr getResult(uint32_t timeout) {
    +    m_getResultCount++;
    +    if (m_isResultReady == true) {
    +      return m_resultList;
    +    } else {
    +      for (uint32_t i = 0; i < timeout; i++) {
    +        SLEEP(1);
    +        if (m_isResultReady == true) return m_resultList;
    +      }
    +      throw FunctionExecutionException(
    +          "Result is not ready, endResults callback is called before invoking "
    +          "getResult() method");
    +    }
    +  }
    +
    +  void addResult(CacheablePtr& resultItem) {
    +    m_addResultCount++;
    +    if (resultItem == NULLPTR) {
    +      return;
    +    }
    +    try {
    +      CacheableArrayListPtr result = dynCast<CacheableArrayListPtr>(resultItem);
    +      for (int32_t i = 0; i < result->size(); i++) {
    +        m_resultList->push_back(result->operator[](i));
    +      }
    +    } catch (ClassCastException) {
    +      UserFunctionExecutionExceptionPtr result =
    +          dynCast<UserFunctionExecutionExceptionPtr>(resultItem);
    +      m_resultList->push_back(result);
    +    }
    +  }
    +  void endResults() {
    +    m_isResultReady = true;
    +    m_endResultCount++;
    +  }
    +  uint32_t getEndResultCount() { return m_endResultCount; }
    +  uint32_t getAddResultCount() { return m_addResultCount; }
    +  uint32_t getGetResultCount() { return m_getResultCount; }
    +
    + private:
    +  CacheableVectorPtr m_resultList;
    +  volatile bool m_isResultReady;
    +  uint32_t m_endResultCount;
    +  uint32_t m_addResultCount;
    +  uint32_t m_getResultCount;
    +};
    +typedef SharedPtr<MyResultCollector> MyResultCollectorPtr;
    +
    +DUNIT_TASK_DEFINITION(LOCATOR1, StartLocator1)
    +{
    +  // starting locator
    +  if (isLocator) {
    +    CacheHelper::initLocator(1);
    +    LOG("Locator1 started");
    +  }
    +}
    +END_TASK_DEFINITION
    +
    +DUNIT_TASK_DEFINITION(SERVER, StartS12)
    +{
    +  const char* lhp = NULL;
    +  if (!isPoolWithEndpoint) lhp = locHostPort;
    +  if (isLocalServer) {
    +    CacheHelper::initServer(1, "func_cacheserver1_pool.xml", lhp);
    +  }
    +  if (isLocalServer) {
    +    CacheHelper::initServer(2, "func_cacheserver2_pool.xml", lhp);
    +  }
    +}
    +END_TASK_DEFINITION
    +
    +DUNIT_TASK_DEFINITION(CLIENT1, StartC1)
    +{
    +  // initClient(true);
    +  initClientWithPool(true, NULL, locHostPort, serverGroup, NULLPTR, 0, true,
    +                     -1, -1, 60000, /*singlehop*/ true,
    +                     /*threadLocal*/ true);
    +  // createPool(poolName, locHostPort,serverGroup, NULL, 0, true );
    +  // createRegionAndAttachPool(poolRegNames[0],USE_ACK, poolName);
    +
    +  RegionPtr regPtr0 =
    +      createRegionAndAttachPool(poolRegNames[0], USE_ACK, NULL);
    +  ;  // getHelper()->createRegion( poolRegNames[0], USE_ACK);
    +  regPtr0->registerAllKeys();
    +
    +  LOG("Clnt1Init complete.");
    +}
    +END_TASK_DEFINITION
    +
    +DUNIT_TASK_DEFINITION(CLIENT1, Client1OpTest)
    +{
    +  RegionPtr regPtr0 = getHelper()->getRegion(poolRegNames[0]);
    +  char buf[128];
    +
    +  for (int i = 0; i < 34; i++) {
    +    sprintf(buf, "VALUE--%d", i);
    +    CacheablePtr value(CacheableString::create(buf));
    +
    +    sprintf(buf, "KEY--%d", i);
    +    CacheableKeyPtr key = CacheableKey::create(buf);
    +    regPtr0->put(key, value);
    +  }
    +  std::this_thread::sleep_for(std::chrono::seconds(10)); // let the put finish
    +
    +    //-----------------------Test with sendException
    +    // onRegion-------------------------------//
    +
    +    for (int i = 1; i <= 200; i++) {
    +      CacheablePtr value(CacheableInt32::create(i));
    +
    +      sprintf(buf, "execKey-%d", i);
    +      CacheableKeyPtr key = CacheableKey::create(buf);
    +      regPtr0->put(key, value);
    +    }
    +    LOG("Put for execKey's on region complete.");
    +
    +    LOG("Adding filter");
    +    CacheableArrayListPtr arrList = CacheableArrayList::create();
    +    for (int i = 100; i < 120; i++) {
    +      sprintf(buf, "execKey-%d", i);
    +      CacheableKeyPtr key = CacheableKey::create(buf);
    +      arrList->push_back(key);
    +    }
    +
    +    CacheableVectorPtr filter = CacheableVector::create();
    +    for (int i = 100; i < 120; i++) {
    +      sprintf(buf, "execKey-%d", i);
    +      CacheableKeyPtr key = CacheableKey::create(buf);
    +      filter->push_back(key);
    +    }
    +    LOG("Adding filter done.");
    +
    +    CacheablePtr args = CacheableBoolean::create(1);
    +
    +    ExecutionPtr funcExec = FunctionService::onRegion(regPtr0);
    +    ASSERT(funcExec != NULLPTR, "onRegion Returned NULL");
    +
    +    ResultCollectorPtr collector =
    +        funcExec->withArgs(args)->withFilter(filter)->execute(
    +            exFuncNameSendException, 15);
    +    ASSERT(collector != NULLPTR, "onRegion collector NULL");
    +
    +    CacheableVectorPtr result = collector->getResult();
    +
    +    if (result == NULLPTR) {
    +      ASSERT(false, "echo String : result is NULL");
    +    } else {
    +      try {
    +        for (int i = 0; i < result->size(); i++) {
    +          UserFunctionExecutionExceptionPtr uFEPtr =
    +              dynCast<UserFunctionExecutionExceptionPtr>(
    +                  result->operator[](i));
    +          ASSERT(uFEPtr != NULLPTR, "uFEPtr exception is NULL");
    +          LOGINFO("Done casting to uFEPtr");
    +          LOGINFO("Read expected uFEPtr exception %s ",
    +                  uFEPtr->getMessage()->asChar());
    +        }
    +      } catch (ClassCastException& ex) {
    +        std::string logmsg = "";
    +        logmsg += ex.getName();
    +        logmsg += ": ";
    +        logmsg += ex.getMessage();
    +        LOG(logmsg.c_str());
    +        ex.printStackTrace();
    +        FAIL(
    +            "exFuncNameSendException casting to string for bool argument "
    +            "exception.");
    +      } catch (...) {
    +        FAIL(
    +            "exFuncNameSendException casting to string for bool argument "
    +            "Unknown exception.");
    +      }
    +    }
    +
    +    LOG("exFuncNameSendException done for bool argument.");
    +
    +    collector = funcExec->withArgs(arrList)->withFilter(filter)->execute(
    +        exFuncNameSendException, 15);
    +    ASSERT(collector != NULLPTR, "onRegion collector for arrList NULL");
    +    std::this_thread::sleep_for(std::chrono::seconds(2));
    +    
    +    try {
    +        CacheableVectorPtr fil = CacheableVector::create();
    +        fil->push_back(CacheableInt32::create(1));
    +        ExecutionPtr exe = FunctionService::onRegion(regPtr0);
    +        
    +        LOGINFO("Executing the exception test it is expected to throw.");
    +        CacheableVectorPtr executeFunctionResult3 =
    +        funcExec->withArgs(arrList)->withFilter(filter)->execute("ThinClientRegionExceptionTest", 15)->getResult();
    --- End diff --
    
    It will always throw and the fact that it does is success. The reason this is good is that it means that the cleaning up of the worker threads in the executeFunctionSH followed its cleanup path. I don't have a way to directly infer that all the workers were cleaned up directly, but the result of that code path is that when a fatal or unknown exception is thrown by the server, that we cleanup properly. If the exception comes through, then we know the exception that is to be thrown at the end of cleanup happened.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] geode-native pull request #82: GEODE-2736: Fixed orphaned worker threads

Posted by echobravopapa <gi...@git.apache.org>.
Github user echobravopapa commented on a diff in the pull request:

    https://github.com/apache/geode-native/pull/82#discussion_r109942702
  
    --- Diff: src/cppcache/integration-test/testThinClientPoolExecuteFunctionThrowsException.cpp ---
    @@ -303,6 +303,7 @@ DUNIT_TASK_DEFINITION(CLIENT1, Client1OpTest)
             LOGINFO("Executing the exception test it is expected to throw.");
             CacheableVectorPtr executeFunctionResult3 =
             funcExec->withArgs(arrList)->withFilter(filter)->execute("ThinClientRegionExceptionTest", 15)->getResult();
    +        FAIL("Failed to throw expected exception.");
    --- End diff --
    
    Perfect. @mhansonp may I ask you to please squash your commits as well...? TIA


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] geode-native pull request #82: GEODE-2736: Fixed orphaned worker threads

Posted by echobravopapa <gi...@git.apache.org>.
Github user echobravopapa commented on a diff in the pull request:

    https://github.com/apache/geode-native/pull/82#discussion_r109462974
  
    --- Diff: src/cppcache/integration-test/testThinClientPoolExecuteFunctionThrowsException.cpp ---
    @@ -0,0 +1,362 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +#define ROOT_NAME "testThinClientPoolExecuteFunctionThrowsException"
    +
    +#include "fw_dunit.hpp"
    +#include "ThinClientHelper.hpp"
    +#include "testobject/VariousPdxTypes.hpp"
    +
    +#include <thread>
    +#include <chrono>
    +
    +using namespace PdxTests;
    +/* This is to test
    +1- funtion execution on pool
    + */
    +
    +#define CLIENT1 s1p1
    +#define LOCATOR1 s2p1
    +#define SERVER s2p2
    +
    +bool isLocalServer = false;
    +bool isLocator = false;
    +bool isPoolWithEndpoint = false;
    +
    +const char* locHostPort =
    +    CacheHelper::getLocatorHostPort(isLocator, isLocalServer, 1);
    +const char* poolRegNames[] = {"partition_region", "PoolRegion2"};
    +
    +const char* serverGroup = "ServerGroup1";
    +
    +char* getFuncIName = (char*)"MultiGetFunctionI";
    +char* putFuncIName = (char*)"MultiPutFunctionI";
    +char* getFuncName = (char*)"MultiGetFunction";
    +char* putFuncName = (char*)"MultiPutFunction";
    +char* rjFuncName = (char*)"RegionOperationsFunction";
    +char* exFuncName = (char*)"ExceptionHandlingFunction";
    +char* exFuncNameSendException = (char*)"executeFunction_SendException";
    +char* exFuncNamePdxType = (char*)"PdxFunctionTest";
    +char* FEOnRegionPrSHOP = (char*)"FEOnRegionPrSHOP";
    +char* FEOnRegionPrSHOP_OptimizeForWrite =
    +    (char*)"FEOnRegionPrSHOP_OptimizeForWrite";
    +char* FETimeOut = (char*)"FunctionExecutionTimeOut";
    +
    +#define verifyGetResults()                                                    \
    +    bool found = false;                                                         \
    +    for (int j = 0; j < 34; j++) {                                              \
    +      if (j % 2 == 0) continue;                                                 \
    +      sprintf(buf, "VALUE--%d", j);                                             \
    +      if (strcmp(buf, dynCast<CacheableStringPtr>(resultList->operator[](i))    \
    +                 ->asChar()) == 0) {                                   \
    +        LOGINFO(                                                                \
    +                                                                                "buf = %s "                                                         \
    +                                                                                "dynCast<CacheableStringPtr>(resultList->operator[](i))->asChar() " \
    +                                                                                "= %s ",                                                            \
    +                                                                                buf,                                                                \
    +                                                                                dynCast<CacheableStringPtr>(resultList->operator[](i))->asChar());  \
    +                                                                                found = true;                                                           \
    +                                                                                break;                                                                  \
    +      }                                                                         \
    +    }                                                                           \
    +    ASSERT(found, "this returned value is invalid");
    +
    +#define verifyGetKeyResults()                                                 \
    +    bool found = false;                                                         \
    +    for (int j = 0; j < 34; j++) {                                              \
    +      if (j % 2 == 0) continue;                                                 \
    +      sprintf(buf, "KEY--%d", j);                                               \
    +      if (strcmp(buf, dynCast<CacheableStringPtr>(resultList->operator[](i))    \
    +                 ->asChar()) == 0) {                                   \
    +        LOGINFO(                                                                \
    +                                                                                "buf = %s "                                                         \
    +                                                                                "dynCast<CacheableStringPtr>(resultList->operator[](i))->asChar() " \
    +                                                                                "= %s ",                                                            \
    +                                                                                buf,                                                                \
    +                                                                                dynCast<CacheableStringPtr>(resultList->operator[](i))->asChar());  \
    +                                                                                found = true;                                                           \
    +                                                                                break;                                                                  \
    +      }                                                                         \
    +    }                                                                           \
    +    ASSERT(found, "this returned KEY is invalid");
    +
    +#define verifyPutResults()                   \
    +    bool found = false;                        \
    +    for (int j = 0; j < 34; j++) {             \
    +      if (j % 2 == 0) continue;                \
    +      sprintf(buf, "KEY--%d", j);              \
    +      if (strcmp(buf, value->asChar()) == 0) { \
    +        found = true;                          \
    +        break;                                 \
    +      }                                        \
    +    }                                          \
    +    ASSERT(found, "this returned value is invalid");
    +class MyResultCollector : public ResultCollector {
    + public:
    +  MyResultCollector()
    + : m_resultList(CacheableVector::create()),
    +   m_isResultReady(false),
    +   m_endResultCount(0),
    +   m_addResultCount(0),
    +   m_getResultCount(0) {}
    +  ~MyResultCollector() {}
    +  CacheableVectorPtr getResult(uint32_t timeout) {
    +    m_getResultCount++;
    +    if (m_isResultReady == true) {
    +      return m_resultList;
    +    } else {
    +      for (uint32_t i = 0; i < timeout; i++) {
    +        SLEEP(1);
    +        if (m_isResultReady == true) return m_resultList;
    +      }
    +      throw FunctionExecutionException(
    +          "Result is not ready, endResults callback is called before invoking "
    +          "getResult() method");
    +    }
    +  }
    +
    +  void addResult(CacheablePtr& resultItem) {
    +    m_addResultCount++;
    +    if (resultItem == NULLPTR) {
    +      return;
    +    }
    +    try {
    +      CacheableArrayListPtr result = dynCast<CacheableArrayListPtr>(resultItem);
    +      for (int32_t i = 0; i < result->size(); i++) {
    +        m_resultList->push_back(result->operator[](i));
    +      }
    +    } catch (ClassCastException) {
    +      UserFunctionExecutionExceptionPtr result =
    +          dynCast<UserFunctionExecutionExceptionPtr>(resultItem);
    +      m_resultList->push_back(result);
    +    }
    +  }
    +  void endResults() {
    +    m_isResultReady = true;
    +    m_endResultCount++;
    +  }
    +  uint32_t getEndResultCount() { return m_endResultCount; }
    +  uint32_t getAddResultCount() { return m_addResultCount; }
    +  uint32_t getGetResultCount() { return m_getResultCount; }
    +
    + private:
    +  CacheableVectorPtr m_resultList;
    +  volatile bool m_isResultReady;
    +  uint32_t m_endResultCount;
    +  uint32_t m_addResultCount;
    +  uint32_t m_getResultCount;
    +};
    +typedef SharedPtr<MyResultCollector> MyResultCollectorPtr;
    +
    +DUNIT_TASK_DEFINITION(LOCATOR1, StartLocator1)
    +{
    +  // starting locator
    +  if (isLocator) {
    +    CacheHelper::initLocator(1);
    +    LOG("Locator1 started");
    +  }
    +}
    +END_TASK_DEFINITION
    +
    +DUNIT_TASK_DEFINITION(SERVER, StartS12)
    +{
    +  const char* lhp = NULL;
    +  if (!isPoolWithEndpoint) lhp = locHostPort;
    +  if (isLocalServer) {
    +    CacheHelper::initServer(1, "func_cacheserver1_pool.xml", lhp);
    +  }
    +  if (isLocalServer) {
    +    CacheHelper::initServer(2, "func_cacheserver2_pool.xml", lhp);
    +  }
    +}
    +END_TASK_DEFINITION
    +
    +DUNIT_TASK_DEFINITION(CLIENT1, StartC1)
    +{
    +  // initClient(true);
    +  initClientWithPool(true, NULL, locHostPort, serverGroup, NULLPTR, 0, true,
    +                     -1, -1, 60000, /*singlehop*/ true,
    +                     /*threadLocal*/ true);
    +  // createPool(poolName, locHostPort,serverGroup, NULL, 0, true );
    +  // createRegionAndAttachPool(poolRegNames[0],USE_ACK, poolName);
    +
    +  RegionPtr regPtr0 =
    +      createRegionAndAttachPool(poolRegNames[0], USE_ACK, NULL);
    +  ;  // getHelper()->createRegion( poolRegNames[0], USE_ACK);
    +  regPtr0->registerAllKeys();
    +
    +  LOG("Clnt1Init complete.");
    +}
    +END_TASK_DEFINITION
    +
    +DUNIT_TASK_DEFINITION(CLIENT1, Client1OpTest)
    +{
    +  RegionPtr regPtr0 = getHelper()->getRegion(poolRegNames[0]);
    +  char buf[128];
    +
    +  for (int i = 0; i < 34; i++) {
    +    sprintf(buf, "VALUE--%d", i);
    +    CacheablePtr value(CacheableString::create(buf));
    +
    +    sprintf(buf, "KEY--%d", i);
    +    CacheableKeyPtr key = CacheableKey::create(buf);
    +    regPtr0->put(key, value);
    +  }
    +  std::this_thread::sleep_for(std::chrono::seconds(10)); // let the put finish
    +
    +    //-----------------------Test with sendException
    +    // onRegion-------------------------------//
    +
    +    for (int i = 1; i <= 200; i++) {
    +      CacheablePtr value(CacheableInt32::create(i));
    +
    +      sprintf(buf, "execKey-%d", i);
    +      CacheableKeyPtr key = CacheableKey::create(buf);
    +      regPtr0->put(key, value);
    +    }
    +    LOG("Put for execKey's on region complete.");
    +
    +    LOG("Adding filter");
    +    CacheableArrayListPtr arrList = CacheableArrayList::create();
    +    for (int i = 100; i < 120; i++) {
    +      sprintf(buf, "execKey-%d", i);
    +      CacheableKeyPtr key = CacheableKey::create(buf);
    +      arrList->push_back(key);
    +    }
    +
    +    CacheableVectorPtr filter = CacheableVector::create();
    +    for (int i = 100; i < 120; i++) {
    +      sprintf(buf, "execKey-%d", i);
    +      CacheableKeyPtr key = CacheableKey::create(buf);
    +      filter->push_back(key);
    +    }
    +    LOG("Adding filter done.");
    +
    +    CacheablePtr args = CacheableBoolean::create(1);
    +
    +    ExecutionPtr funcExec = FunctionService::onRegion(regPtr0);
    +    ASSERT(funcExec != NULLPTR, "onRegion Returned NULL");
    +
    +    ResultCollectorPtr collector =
    +        funcExec->withArgs(args)->withFilter(filter)->execute(
    +            exFuncNameSendException, 15);
    +    ASSERT(collector != NULLPTR, "onRegion collector NULL");
    +
    +    CacheableVectorPtr result = collector->getResult();
    +
    +    if (result == NULLPTR) {
    +      ASSERT(false, "echo String : result is NULL");
    +    } else {
    +      try {
    +        for (int i = 0; i < result->size(); i++) {
    +          UserFunctionExecutionExceptionPtr uFEPtr =
    +              dynCast<UserFunctionExecutionExceptionPtr>(
    +                  result->operator[](i));
    +          ASSERT(uFEPtr != NULLPTR, "uFEPtr exception is NULL");
    +          LOGINFO("Done casting to uFEPtr");
    +          LOGINFO("Read expected uFEPtr exception %s ",
    +                  uFEPtr->getMessage()->asChar());
    +        }
    +      } catch (ClassCastException& ex) {
    +        std::string logmsg = "";
    +        logmsg += ex.getName();
    +        logmsg += ": ";
    +        logmsg += ex.getMessage();
    +        LOG(logmsg.c_str());
    +        ex.printStackTrace();
    +        FAIL(
    +            "exFuncNameSendException casting to string for bool argument "
    +            "exception.");
    +      } catch (...) {
    +        FAIL(
    +            "exFuncNameSendException casting to string for bool argument "
    +            "Unknown exception.");
    +      }
    +    }
    +
    +    LOG("exFuncNameSendException done for bool argument.");
    +
    +    collector = funcExec->withArgs(arrList)->withFilter(filter)->execute(
    +        exFuncNameSendException, 15);
    +    ASSERT(collector != NULLPTR, "onRegion collector for arrList NULL");
    +    std::this_thread::sleep_for(std::chrono::seconds(2));
    +    
    +    try {
    +        CacheableVectorPtr fil = CacheableVector::create();
    +        fil->push_back(CacheableInt32::create(1));
    +        ExecutionPtr exe = FunctionService::onRegion(regPtr0);
    +        
    +        LOGINFO("Executing the exception test it is expected to throw.");
    +        CacheableVectorPtr executeFunctionResult3 =
    +        funcExec->withArgs(arrList)->withFilter(filter)->execute("ThinClientRegionExceptionTest", 15)->getResult();
    --- End diff --
    
    @mhansonp have a look at testThinClientSSLAuthFail for what @PivotalSarge is asking for regarding the FAIL()


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---