You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by jb...@apache.org on 2017/03/20 16:54:44 UTC
[2/3] geode-native git commit: GEODE-2657: Fixes and tests Function
Execution.
GEODE-2657: Fixes and tests Function Execution.
- Corrects message formatting.
- Fixes integration tests.
Project: http://git-wip-us.apache.org/repos/asf/geode-native/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode-native/commit/aad45453
Tree: http://git-wip-us.apache.org/repos/asf/geode-native/tree/aad45453
Diff: http://git-wip-us.apache.org/repos/asf/geode-native/diff/aad45453
Branch: refs/heads/develop
Commit: aad4545379cdaa3321ba648a3806504973403e38
Parents: 8417d69
Author: Jacob Barrett <jb...@pivotal.io>
Authored: Fri Mar 17 16:38:32 2017 -0700
Committer: Jacob Barrett <jb...@pivotal.io>
Committed: Mon Mar 20 09:53:16 2017 -0700
----------------------------------------------------------------------
src/cppcache/integration-test/CMakeLists.txt | 6 +-
.../testThinClientExecuteFunctionPrSHOP.cpp | 2 +-
.../testThinClientPoolExecuteFunction.cpp | 223 +---------
...ExecuteFunctionDisableChunkHandlerThread.cpp | 426 +++++++++++++++++++
.../testThinClientPoolExecuteFunctionPrSHOP.cpp | 23 +-
.../testThinClientPoolExecuteHAFunction.cpp | 27 +-
src/cppcache/src/TcrMessage.cpp | 5 +-
src/cppcache/test/TcrMessage_unittest.cpp | 10 +-
8 files changed, 463 insertions(+), 259 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/geode-native/blob/aad45453/src/cppcache/integration-test/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/cppcache/integration-test/CMakeLists.txt b/src/cppcache/integration-test/CMakeLists.txt
index 5e070d8..8f7824e 100644
--- a/src/cppcache/integration-test/CMakeLists.txt
+++ b/src/cppcache/integration-test/CMakeLists.txt
@@ -170,8 +170,6 @@ set_property(TEST testThinClientPartitionResolver PROPERTY LABELS FLAKY)
set_property(TEST testThinClientPdxDeltaWithNotification PROPERTY LABELS FLAKY)
set_property(TEST testThinClientPdxInstance PROPERTY LABELS FLAKY)
set_property(TEST testThinClientPdxTests PROPERTY LABELS FLAKY)
-set_property(TEST testThinClientPoolExecuteHAFunction PROPERTY LABELS FLAKY)
-set_property(TEST testThinClientPoolExecuteHAFunctionPrSHOP PROPERTY LABELS FLAKY)
set_property(TEST testThinClientPoolRedundancy PROPERTY LABELS FLAKY)
set_property(TEST testThinClientPoolServer PROPERTY LABELS FLAKY)
set_property(TEST testThinClientPutAll PROPERTY LABELS FLAKY)
@@ -186,13 +184,10 @@ set_property(TEST testTimedSemaphore PROPERTY LABELS FLAKY)
set_property(TEST testFwPerf PROPERTY LABELS OMITTED)
set_property(TEST testThinClientCqDurable PROPERTY LABELS OMITTED)
-set_property(TEST testThinClientExecuteFunctionPrSHOP PROPERTY LABELS OMITTED)
set_property(TEST testThinClientGatewayTest PROPERTY LABELS OMITTED)
set_property(TEST testThinClientHAFailoverRegex PROPERTY LABELS OMITTED)
set_property(TEST testThinClientPRSingleHop PROPERTY LABELS OMITTED)
set_property(TEST testThinClientPoolAttrTest PROPERTY LABELS OMITTED)
-set_property(TEST testThinClientPoolExecuteFunction PROPERTY LABELS OMITTED)
-set_property(TEST testThinClientPoolExecuteFunctionPrSHOP PROPERTY LABELS OMITTED)
set_property(TEST testThinClientPoolLocator PROPERTY LABELS OMITTED)
set_property(TEST testThinClientPutWithDelta PROPERTY LABELS OMITTED)
set_property(TEST testThinClientRemoteQueryTimeout PROPERTY LABELS OMITTED)
@@ -210,6 +205,7 @@ set_property(TEST testThinClientTicket303 PROPERTY LABELS OMITTED)
set_property(TEST testThinClientTicket304 PROPERTY LABELS OMITTED)
set_property(TEST testThinClientTracking PROPERTY LABELS OMITTED)
set_property(TEST testThinClientWriterException PROPERTY LABELS OMITTED)
+set_property(TEST testThinClientPoolExecuteFunctionDisableChunkHandlerThread PROPERTY LABELS OMITTED)
add_custom_target(run-stable-cppcache-integration-tests
COMMAND ctest -C $<CONFIGURATION> -L STABLE
http://git-wip-us.apache.org/repos/asf/geode-native/blob/aad45453/src/cppcache/integration-test/testThinClientExecuteFunctionPrSHOP.cpp
----------------------------------------------------------------------
diff --git a/src/cppcache/integration-test/testThinClientExecuteFunctionPrSHOP.cpp b/src/cppcache/integration-test/testThinClientExecuteFunctionPrSHOP.cpp
index 33447fc..73ced17 100644
--- a/src/cppcache/integration-test/testThinClientExecuteFunctionPrSHOP.cpp
+++ b/src/cppcache/integration-test/testThinClientExecuteFunctionPrSHOP.cpp
@@ -563,7 +563,7 @@ DUNIT_TASK_DEFINITION(CLIENT1, Client1OpTest2)
LOGINFO("FETimeOut begin onRegion");
ExecutionPtr RexecutionPtr = FunctionService::onRegion(regPtr0);
CacheableVectorPtr fe =
- RexecutionPtr->withArgs(CacheableInt32::create(5000))
+ RexecutionPtr->withArgs(CacheableInt32::create(5000 * 1000))
->execute(FETimeOut, 5000)
->getResult();
if (fe == NULLPTR) {
http://git-wip-us.apache.org/repos/asf/geode-native/blob/aad45453/src/cppcache/integration-test/testThinClientPoolExecuteFunction.cpp
----------------------------------------------------------------------
diff --git a/src/cppcache/integration-test/testThinClientPoolExecuteFunction.cpp b/src/cppcache/integration-test/testThinClientPoolExecuteFunction.cpp
index 27aacc4..91bb285 100644
--- a/src/cppcache/integration-test/testThinClientPoolExecuteFunction.cpp
+++ b/src/cppcache/integration-test/testThinClientPoolExecuteFunction.cpp
@@ -157,6 +157,8 @@ class MyResultCollector : public ResultCollector {
uint32_t m_addResultCount;
uint32_t m_getResultCount;
};
+_GF_PTR_DEF_(MyResultCollector, MyResultCollectorPtr)
+
DUNIT_TASK_DEFINITION(LOCATOR1, StartLocator1)
{
// starting locator
@@ -361,10 +363,10 @@ DUNIT_TASK_DEFINITION(CLIENT1, Client1OpTest)
}
}
// test get function with customer collector
- MyResultCollector* myRC = new MyResultCollector();
+ MyResultCollectorPtr myRC(new MyResultCollector());
executeFunctionResult = exc->withFilter(routingObj)
->withArgs(args)
- ->withCollector(ResultCollectorPtr(myRC))
+ ->withCollector(myRC)
->execute(getFuncName, getResult)
->getResult();
sprintf(buf, "add result count = %d", myRC->getAddResultCount());
@@ -616,10 +618,10 @@ DUNIT_TASK_DEFINITION(CLIENT1, Client1OpTest)
"test exFuncNameSendException function with customer collector with "
"bool as arguement using onRegion.");
- MyResultCollector* myRC1 = new MyResultCollector();
+ MyResultCollectorPtr myRC1(new MyResultCollector());
result = funcExec->withArgs(args)
->withFilter(filter)
- ->withCollector(ResultCollectorPtr(myRC1))
+ ->withCollector(myRC1)
->execute(exFuncNameSendException, getResult)
->getResult();
LOGINFO("add result count = %d", myRC1->getAddResultCount());
@@ -1084,9 +1086,9 @@ DUNIT_TASK_DEFINITION(CLIENT1, Client1OpTest)
"test exFuncNameSendException function with customer collector with "
"bool as arguement using onServers.");
- MyResultCollector* myRC2 = new MyResultCollector();
+ MyResultCollectorPtr myRC2(new MyResultCollector());
result = funcExec->withArgs(args)
- ->withCollector(ResultCollectorPtr(myRC2))
+ ->withCollector(myRC2)
->execute(exFuncNameSendException, getResult)
->getResult();
ASSERT(3 == myRC2->getAddResultCount(), "add result count is not 3");
@@ -1140,7 +1142,7 @@ DUNIT_TASK_DEFINITION(CLIENT1, Client2OpTest)
LOGINFO("FETimeOut begin onRegion");
ExecutionPtr RexecutionPtr = FunctionService::onRegion(regPtr0);
CacheableVectorPtr fe =
- RexecutionPtr->withArgs(CacheableInt32::create(5000))
+ RexecutionPtr->withArgs(CacheableInt32::create(5000 * 1000))
->execute(FETimeOut, 5000)
->getResult();
if (fe == NULLPTR) {
@@ -1158,9 +1160,10 @@ DUNIT_TASK_DEFINITION(CLIENT1, Client2OpTest)
LOGINFO("FETimeOut begin onServer");
ExecutionPtr serverExc = FunctionService::onServer(getHelper()->cachePtr);
- CacheableVectorPtr vec = serverExc->withArgs(CacheableInt32::create(5000))
- ->execute(FETimeOut, 5000)
- ->getResult();
+ CacheableVectorPtr vec =
+ serverExc->withArgs(CacheableInt32::create(5000 * 1000))
+ ->execute(FETimeOut, 5000)
+ ->getResult();
if (vec == NULLPTR) {
ASSERT(false, "functionResult is NULL");
} else {
@@ -1178,7 +1181,7 @@ DUNIT_TASK_DEFINITION(CLIENT1, Client2OpTest)
ExecutionPtr serversExc =
FunctionService::onServers(getHelper()->cachePtr);
CacheableVectorPtr vecs =
- serversExc->withArgs(CacheableInt32::create(5000))
+ serversExc->withArgs(CacheableInt32::create(5000 * 1000))
->execute(FETimeOut, 5000)
->getResult();
if (vecs == NULLPTR) {
@@ -1235,65 +1238,6 @@ DUNIT_TASK_DEFINITION(LOCATOR1, CloseLocator1)
}
END_TASK_DEFINITION
-DUNIT_TASK_DEFINITION(CLIENT1, StartTestClient)
- {
- LOG("in before starting StartTestClient");
- PropertiesPtr config = Properties::create();
- config->insert("disable-chunk-handler-thread", "true");
- config->insert("read-timeout-unit-in-millis", "true");
- config->insert("ping-interval", "-1");
- config->insert("bucket-wait-timeout", "2000");
- config->insert("connect-wait-timeout", "10");
-
- initClientWithPool(true, NULL, locHostPort, serverGroup, config, 0, true,
- -1, -1, -1, true, false);
- // createPool(poolName, locHostPort,serverGroup, NULL, 0, true );
-
- RegionPtr regPtr0 =
- createRegionAndAttachPool(poolRegNames[0], USE_ACK, NULL);
- ; // getHelper()->createRegion( poolRegNames[0], USE_ACK);
-
- LOG("StartTestClient complete.");
- }
-END_TASK_DEFINITION
-
-DUNIT_TASK_DEFINITION(CLIENT2, StartTestClient2)
- {
- LOG("in before starting StartTestClient");
- PropertiesPtr config = Properties::create();
- config->insert("disable-chunk-handler-thread", "true");
- config->insert("read-timeout-unit-in-millis", "true");
- config->insert("ping-interval", "-1");
- config->insert("bucket-wait-timeout", "2000");
- config->insert("connect-wait-timeout", "10");
-
- initClientWithPool(true, NULL, locHostPort, serverGroup, config, 0, true,
- -1, -1, -1, true, false);
- // createPool(poolName, locHostPort,serverGroup, NULL, 0, true );
-
- RegionPtr regPtr0 =
- createRegionAndAttachPool(poolRegNames[0], USE_ACK, NULL);
- ; // getHelper()->createRegion( poolRegNames[0], USE_ACK);
-
- LOG("StartTestClient complete.");
- }
-END_TASK_DEFINITION
-
-DUNIT_TASK_DEFINITION(CLIENT2, clientPuts)
- {
- RegionPtr regPtr0 = getHelper()->getRegion(poolRegNames[0]);
- char buf[128];
- for (int i = 1; i <= 500; i++) {
- CacheablePtr value(CacheableInt32::create(i));
-
- sprintf(buf, "am-%d", i);
- CacheableKeyPtr key = CacheableKey::create(buf);
- regPtr0->put(key, value);
- }
- LOG("clientPuts complete.");
- }
-END_TASK_DEFINITION
-
class putThread : public ACE_Task_Base {
private:
RegionPtr regPtr;
@@ -1380,26 +1324,6 @@ void executeFunction() {
ASSERT(failureCount <= 10 && failureCount > 0, "failureCount should be zero");
}
-const int nThreads = 10;
-putThread* threads[nThreads];
-
-DUNIT_TASK_DEFINITION(CLIENT1, dofuncOps)
- {
- RegionPtr regPtr0 = getHelper()->getRegion(poolRegNames[0]);
- // check nextwork hop for single key
- executeFunction();
-
-#ifdef __linux
-
- for (int thdIdx = 0; thdIdx < nThreads; thdIdx++) {
- threads[thdIdx] = new putThread(regPtr0, 0, 500, false);
- threads[thdIdx]->start();
- }
-#endif
- LOG("dofuncOps complete.");
- }
-END_TASK_DEFINITION
-
DUNIT_TASK_DEFINITION(CLIENT1, closeServer2)
{
// stop servers
@@ -1410,82 +1334,6 @@ DUNIT_TASK_DEFINITION(CLIENT1, closeServer2)
}
END_TASK_DEFINITION
-void waitForNoTimeout() {
- LOGINFO("entering into waitForNoTimeout");
- SLEEP(10);
- int maxTry = 1000; // 10 seconds
- int thdIdx = 0;
- while (maxTry-- > 0) {
- for (thdIdx = 0; thdIdx < nThreads; thdIdx++) {
- int currentTimeout = threads[thdIdx]->getTimeoutCount();
- SLEEP(10);
- if (currentTimeout != threads[thdIdx]->getTimeoutCount()) break;
- }
- if (thdIdx == nThreads) break;
- }
-
- LOGINFO("waitForNoTimeout nThreads: %d, thdIdx: %d", nThreads, thdIdx);
- if (thdIdx < nThreads) {
- LOGINFO(
- "waitForNoTimeout failed still getting timeouts nThreads: %d, thdIdx: "
- "%d",
- nThreads, thdIdx);
- ASSERT(thdIdx < nThreads, "waitForNoTimeout failed still getting timeouts");
- }
- SLEEP(20000);
-}
-
-void verifyTimeoutFirst() {
- int totalTimeoutCount = 0;
- for (int thdIdx = 0; thdIdx < nThreads; thdIdx++) {
- totalTimeoutCount += threads[thdIdx]->getTimeoutCount();
- }
-
- LOGINFO("Total timeout %d", totalTimeoutCount);
-
- int blackListBucketTimeouts =
- TestUtils::getCacheImpl(getHelper()->cachePtr)->blackListBucketTimeouts();
-
- LOGINFO("blackListBucketTimeouts %d", blackListBucketTimeouts);
-
- ASSERT(totalTimeoutCount > 0,
- "totalTimeoutCount should be greater than zero");
-
- ASSERT(blackListBucketTimeouts > 0,
- "blackListBucketTimeouts should be greater than zero");
-}
-
-DUNIT_TASK_DEFINITION(CLIENT1, stopClientThreads)
- {
-#ifdef __linux
- for (int thdIdx = 0; thdIdx < nThreads; thdIdx++) {
- threads[thdIdx]->stop();
- }
-
- LOG("Linux is defined");
-#endif
- LOG("completed stopClientThreads");
- }
-END_TASK_DEFINITION
-
-DUNIT_TASK_DEFINITION(CLIENT1, verifyClientResults)
- {
-#ifdef __linux
- /*for(int thdIdx = 0; thdIdx < nThreads; thdIdx++)
- {
- threads[thdIdx]->stop();
- }*/
-
- verifyTimeoutFirst();
-
- waitForNoTimeout();
-
- LOG("Linux is defined");
-#endif
- LOG("completed verifyClientResults");
- }
-END_TASK_DEFINITION
-
DUNIT_TASK_DEFINITION(CLIENT1, closeServer1)
{
// stop servers
@@ -1496,8 +1344,7 @@ DUNIT_TASK_DEFINITION(CLIENT1, closeServer1)
}
END_TASK_DEFINITION
-void runFunctionExecution(bool isEndpoint) {
- // with locator
+void runFunctionExecution() {
CALL_TASK(StartLocator1);
CALL_TASK(StartS12);
CALL_TASK(StartC1);
@@ -1506,48 +1353,10 @@ void runFunctionExecution(bool isEndpoint) {
CALL_TASK(StopC1);
CALL_TASK(CloseServers);
CALL_TASK(CloseLocator1);
-
- // with endpoints
- CALL_TASK(StartS12);
- CALL_TASK(StartC1);
- CALL_TASK(Client1OpTest);
- CALL_TASK(Client2OpTest);
- CALL_TASK(StopC1);
- CALL_TASK(CloseServers);
-}
-
-void runFunctionExecutionTestAPI() {
- // with locator
- CALL_TASK(StartLocator1);
- // start two servers
- CALL_TASK(StartS12);
- CALL_TASK(StartTestClient);
- CALL_TASK(StartTestClient2);
- // to create pr meta data
- CALL_TASK(clientPuts);
- // need to spawn thread which will do continuous FE
- CALL_TASK(dofuncOps);
- CALL_TASK(closeServer2);
- // check whether you get timeouts
- CALL_TASK(verifyClientResults);
-
- // starting server2
- CALL_TASK(startServer2); // starting server again
- CALL_TASK(verifyClientResults); // verifying timeouts again
-
- // stopping server1
- CALL_TASK(closeServer1);
- CALL_TASK(verifyClientResults);
-
- CALL_TASK(stopClientThreads); // verifying timeouts again
- CALL_TASK(StopC1);
- CALL_TASK(closeServer2);
- CALL_TASK(CloseLocator1);
}
DUNIT_MAIN
{
- runFunctionExecutionTestAPI();
- runFunctionExecution(false);
+ runFunctionExecution();
}
END_MAIN
http://git-wip-us.apache.org/repos/asf/geode-native/blob/aad45453/src/cppcache/integration-test/testThinClientPoolExecuteFunctionDisableChunkHandlerThread.cpp
----------------------------------------------------------------------
diff --git a/src/cppcache/integration-test/testThinClientPoolExecuteFunctionDisableChunkHandlerThread.cpp b/src/cppcache/integration-test/testThinClientPoolExecuteFunctionDisableChunkHandlerThread.cpp
new file mode 100644
index 0000000..cba7bad
--- /dev/null
+++ b/src/cppcache/integration-test/testThinClientPoolExecuteFunctionDisableChunkHandlerThread.cpp
@@ -0,0 +1,426 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "fw_dunit.hpp"
+#include "ThinClientHelper.hpp"
+#include "testobject/VariousPdxTypes.hpp"
+#include <ace/OS.h>
+#include <ace/High_Res_Timer.h>
+
+#include <ace/ACE.h>
+
+using namespace PdxTests;
+/* This is to test
+1- funtion execution on pool
+*/
+
+#define CLIENT1 s1p1
+#define CLIENT2 s1p2
+#define LOCATOR1 s2p1
+#define SERVER s2p2
+
+bool isLocalServer = false;
+bool isLocator = false;
+bool isPoolWithEndpoint = false;
+
+const char* locHostPort =
+ CacheHelper::getLocatorHostPort(isLocator, isLocalServer, 1);
+const char* poolRegNames[] = {"partition_region", "PoolRegion2"};
+
+const char* serverGroup = "ServerGroup1";
+
+char* getFuncName2 = (char*)"MultiGetFunction2";
+
+DUNIT_TASK_DEFINITION(LOCATOR1, StartLocator1)
+ {
+ // starting locator
+ if (isLocator) {
+ CacheHelper::initLocator(1);
+ LOG("Locator1 started");
+ }
+ }
+END_TASK_DEFINITION
+
+DUNIT_TASK_DEFINITION(SERVER, StartS12)
+ {
+ const char* lhp = NULL;
+ if (!isPoolWithEndpoint) lhp = locHostPort;
+ if (isLocalServer) {
+ CacheHelper::initServer(1, "func_cacheserver1_pool.xml", lhp);
+ }
+ if (isLocalServer) {
+ CacheHelper::initServer(2, "func_cacheserver2_pool.xml", lhp);
+ }
+ }
+END_TASK_DEFINITION
+
+DUNIT_TASK_DEFINITION(SERVER, startServer2)
+ {
+ const char* lhp = NULL;
+ if (!isPoolWithEndpoint) lhp = locHostPort;
+ if (isLocalServer) {
+ CacheHelper::initServer(2, "func_cacheserver2_pool.xml", lhp);
+ }
+ }
+END_TASK_DEFINITION
+
+DUNIT_TASK_DEFINITION(CLIENT1, StartC1)
+ {
+ // initClient(true);
+ initClientWithPool(true, NULL, locHostPort, serverGroup, NULLPTR, 0, true);
+ // createPool(poolName, locHostPort,serverGroup, NULL, 0, true );
+ // createRegionAndAttachPool(poolRegNames[0],USE_ACK, poolName);
+
+ RegionPtr regPtr0 =
+ createRegionAndAttachPool(poolRegNames[0], USE_ACK, NULL);
+ ; // getHelper()->createRegion( poolRegNames[0], USE_ACK);
+ regPtr0->registerAllKeys();
+
+ LOG("Clnt1Init complete.");
+ }
+END_TASK_DEFINITION
+
+DUNIT_TASK_DEFINITION(CLIENT1, StopC1)
+ {
+ cleanProc();
+ LOG("Clnt1Down complete: Keepalive = True");
+ }
+END_TASK_DEFINITION
+
+DUNIT_TASK_DEFINITION(SERVER, CloseServers)
+ {
+ // stop servers
+ if (isLocalServer) {
+ CacheHelper::closeServer(1);
+ LOG("SERVER1 stopped");
+ }
+ if (isLocalServer) {
+ CacheHelper::closeServer(2);
+ LOG("SERVER2 stopped");
+ }
+ if (isLocalServer) {
+ CacheHelper::closeServer(3);
+ LOG("SERVER3 stopped");
+ }
+ isPoolWithEndpoint = true;
+ }
+END_TASK_DEFINITION
+
+DUNIT_TASK_DEFINITION(LOCATOR1, CloseLocator1)
+ {
+ // stop locator
+ if (isLocator) {
+ CacheHelper::closeLocator(1);
+ LOG("Locator1 stopped");
+ }
+ }
+END_TASK_DEFINITION
+
+DUNIT_TASK_DEFINITION(CLIENT1, StartTestClient)
+ {
+ LOG("in before starting StartTestClient");
+ PropertiesPtr config = Properties::create();
+ config->insert("disable-chunk-handler-thread", "true");
+ config->insert("read-timeout-unit-in-millis", "true");
+ config->insert("ping-interval", "-1");
+ config->insert("bucket-wait-timeout", "2000");
+ config->insert("connect-wait-timeout", "10");
+
+ initClientWithPool(true, NULL, locHostPort, serverGroup, config, 0, true,
+ -1, -1, -1, true, false);
+ // createPool(poolName, locHostPort,serverGroup, NULL, 0, true );
+
+ RegionPtr regPtr0 =
+ createRegionAndAttachPool(poolRegNames[0], USE_ACK, NULL);
+ ; // getHelper()->createRegion( poolRegNames[0], USE_ACK);
+
+ LOG("StartTestClient complete.");
+ }
+END_TASK_DEFINITION
+
+DUNIT_TASK_DEFINITION(CLIENT2, StartTestClient2)
+ {
+ LOG("in before starting StartTestClient");
+ PropertiesPtr config = Properties::create();
+ config->insert("disable-chunk-handler-thread", "true");
+ config->insert("read-timeout-unit-in-millis", "true");
+ config->insert("ping-interval", "-1");
+ config->insert("bucket-wait-timeout", "2000");
+ config->insert("connect-wait-timeout", "10");
+
+ initClientWithPool(true, NULL, locHostPort, serverGroup, config, 0, true,
+ -1, -1, -1, true, false);
+ // createPool(poolName, locHostPort,serverGroup, NULL, 0, true );
+
+ RegionPtr regPtr0 =
+ createRegionAndAttachPool(poolRegNames[0], USE_ACK, NULL);
+ ; // getHelper()->createRegion( poolRegNames[0], USE_ACK);
+
+ LOG("StartTestClient complete.");
+ }
+END_TASK_DEFINITION
+
+DUNIT_TASK_DEFINITION(CLIENT2, clientPuts)
+ {
+ RegionPtr regPtr0 = getHelper()->getRegion(poolRegNames[0]);
+ char buf[128];
+ for (int i = 1; i <= 500; i++) {
+ CacheablePtr value(CacheableInt32::create(i));
+
+ sprintf(buf, "am-%d", i);
+ CacheableKeyPtr key = CacheableKey::create(buf);
+ regPtr0->put(key, value);
+ }
+ LOG("clientPuts complete.");
+ }
+END_TASK_DEFINITION
+
+class putThread : public ACE_Task_Base {
+ private:
+ RegionPtr regPtr;
+ int m_min;
+ int m_max;
+ int m_failureCount;
+ int m_timeoutCount;
+ volatile bool m_stop;
+
+ public:
+ putThread(RegionPtr rp, int min, int max, bool isWarmUpTask)
+ : regPtr(rp),
+ m_min(min),
+ m_max(max),
+ m_failureCount(0),
+ m_timeoutCount(0),
+ m_stop(false) {}
+
+ int getFailureCount() { return m_failureCount; }
+
+ int getTimeoutCount() { return m_timeoutCount; }
+
+ int svc(void) {
+ bool networkhop ATTR_UNUSED = false;
+ CacheableKeyPtr keyPtr;
+ CacheablePtr args = NULLPTR;
+ ResultCollectorPtr rPtr = NULLPTR;
+ RegionPtr regPtr0 = getHelper()->getRegion(poolRegNames[0]);
+ while (!m_stop) {
+ for (int i = m_min; i < m_max; i++) {
+ try {
+ char buf[128];
+ sprintf(buf, "am-%d", i);
+ CacheableKeyPtr key = CacheableKey::create(buf);
+ CacheableVectorPtr routingObj = CacheableVector::create();
+ routingObj->push_back(key);
+ ExecutionPtr exc = FunctionService::onRegion(regPtr0);
+ exc->execute(routingObj, args, rPtr, getFuncName2, 300 /*in millis*/)
+ ->getResult();
+ } catch (const TimeoutException& te) {
+ LOGINFO("Timeout exception occurred %s", te.getMessage());
+ m_timeoutCount++;
+ } catch (const Exception&) {
+ LOG("Exception occurred");
+ } catch (...) {
+ LOG("Random Exception occurred");
+ }
+ }
+ }
+ LOG("PutThread done");
+ return 0;
+ }
+ void start() { activate(); }
+ void stop() {
+ m_stop = true;
+ wait();
+ }
+};
+
+void executeFunction() {
+ RegionPtr regPtr0 = getHelper()->getRegion(poolRegNames[0]);
+ TestUtils::getCacheImpl(getHelper()->cachePtr)->getAndResetNetworkHopFlag();
+ CacheablePtr args = NULLPTR;
+ ResultCollectorPtr rPtr = NULLPTR;
+ int failureCount = 0;
+ LOGINFO("executeFunction started");
+ for (int i = 0; i < 300; i++) {
+ LOGINFO("executeFunction %d ", i);
+ bool networkhop = TestUtils::getCacheImpl(getHelper()->cachePtr)
+ ->getAndResetNetworkHopFlag();
+ if (networkhop) {
+ failureCount++;
+ }
+ char buf[128];
+ sprintf(buf, "am-%d", i);
+ CacheableKeyPtr key = CacheableKey::create(buf);
+ CacheableVectorPtr routingObj = CacheableVector::create();
+ routingObj->push_back(key);
+ ExecutionPtr exc = FunctionService::onRegion(regPtr0);
+ exc->execute(routingObj, args, rPtr, getFuncName2, 300 /*in millis*/)
+ ->getResult();
+ }
+ LOGINFO("executeFunction failureCount %d", failureCount);
+ ASSERT(failureCount <= 10 && failureCount > 0, "failureCount should be zero");
+}
+
+const int nThreads = 10;
+putThread* threads[nThreads];
+
+DUNIT_TASK_DEFINITION(CLIENT1, dofuncOps)
+ {
+ RegionPtr regPtr0 = getHelper()->getRegion(poolRegNames[0]);
+ // check nextwork hop for single key
+ executeFunction();
+
+#ifdef __linux
+
+ for (int thdIdx = 0; thdIdx < nThreads; thdIdx++) {
+ threads[thdIdx] = new putThread(regPtr0, 0, 500, false);
+ threads[thdIdx]->start();
+ }
+#endif
+ LOG("dofuncOps complete.");
+ }
+END_TASK_DEFINITION
+
+DUNIT_TASK_DEFINITION(CLIENT1, closeServer2)
+ {
+ // stop servers
+ if (isLocalServer) {
+ CacheHelper::closeServer(2);
+ LOG("SERVER2 stopped");
+ }
+ }
+END_TASK_DEFINITION
+
+void waitForNoTimeout() {
+ LOGINFO("entering into waitForNoTimeout");
+ SLEEP(10);
+ int maxTry = 1000; // 10 seconds
+ int thdIdx = 0;
+ while (maxTry-- > 0) {
+ for (thdIdx = 0; thdIdx < nThreads; thdIdx++) {
+ int currentTimeout = threads[thdIdx]->getTimeoutCount();
+ SLEEP(10);
+ if (currentTimeout != threads[thdIdx]->getTimeoutCount()) break;
+ }
+ if (thdIdx == nThreads) break;
+ }
+
+ LOGINFO("waitForNoTimeout nThreads: %d, thdIdx: %d", nThreads, thdIdx);
+ if (thdIdx < nThreads) {
+ LOGINFO(
+ "waitForNoTimeout failed still getting timeouts nThreads: %d, thdIdx: "
+ "%d",
+ nThreads, thdIdx);
+ ASSERT(thdIdx < nThreads, "waitForNoTimeout failed still getting timeouts");
+ }
+ SLEEP(20000);
+}
+
+void verifyTimeoutFirst() {
+ int totalTimeoutCount = 0;
+ for (int thdIdx = 0; thdIdx < nThreads; thdIdx++) {
+ totalTimeoutCount += threads[thdIdx]->getTimeoutCount();
+ }
+
+ LOGINFO("Total timeout %d", totalTimeoutCount);
+
+ int blackListBucketTimeouts =
+ TestUtils::getCacheImpl(getHelper()->cachePtr)->blackListBucketTimeouts();
+
+ LOGINFO("blackListBucketTimeouts %d", blackListBucketTimeouts);
+
+ ASSERT(totalTimeoutCount > 0,
+ "totalTimeoutCount should be greater than zero");
+
+ ASSERT(blackListBucketTimeouts > 0,
+ "blackListBucketTimeouts should be greater than zero");
+}
+
+DUNIT_TASK_DEFINITION(CLIENT1, stopClientThreads)
+ {
+#ifdef __linux
+ for (int thdIdx = 0; thdIdx < nThreads; thdIdx++) {
+ threads[thdIdx]->stop();
+ }
+
+ LOG("Linux is defined");
+#endif
+ LOG("completed stopClientThreads");
+ }
+END_TASK_DEFINITION
+
+DUNIT_TASK_DEFINITION(CLIENT1, verifyClientResults)
+ {
+#ifdef __linux
+ /*for(int thdIdx = 0; thdIdx < nThreads; thdIdx++)
+ {
+ threads[thdIdx]->stop();
+ }*/
+
+ verifyTimeoutFirst();
+
+ waitForNoTimeout();
+
+ LOG("Linux is defined");
+#endif
+ LOG("completed verifyClientResults");
+ }
+END_TASK_DEFINITION
+
+DUNIT_TASK_DEFINITION(CLIENT1, closeServer1)
+ {
+ // stop servers
+ if (isLocalServer) {
+ CacheHelper::closeServer(1);
+ LOG("SERVER1 stopped");
+ }
+ }
+END_TASK_DEFINITION
+
+void runFunctionExecutionDisableChunkHandlerThread() {
+ // with locator
+ CALL_TASK(StartLocator1);
+ // start two servers
+ CALL_TASK(StartS12);
+ CALL_TASK(StartTestClient);
+ CALL_TASK(StartTestClient2);
+ // to create pr meta data
+ CALL_TASK(clientPuts);
+ // need to spawn thread which will do continuous FE
+ CALL_TASK(dofuncOps);
+ CALL_TASK(closeServer2);
+ // check whether you get timeouts
+ CALL_TASK(verifyClientResults);
+
+ // starting server2
+ CALL_TASK(startServer2); // starting server again
+ CALL_TASK(verifyClientResults); // verifying timeouts again
+
+ // stopping server1
+ CALL_TASK(closeServer1);
+ CALL_TASK(verifyClientResults);
+
+ CALL_TASK(stopClientThreads); // verifying timeouts again
+ CALL_TASK(StopC1);
+ CALL_TASK(closeServer2);
+ CALL_TASK(CloseLocator1);
+}
+
+DUNIT_MAIN
+ {
+ runFunctionExecutionDisableChunkHandlerThread();
+ }
+END_MAIN
http://git-wip-us.apache.org/repos/asf/geode-native/blob/aad45453/src/cppcache/integration-test/testThinClientPoolExecuteFunctionPrSHOP.cpp
----------------------------------------------------------------------
diff --git a/src/cppcache/integration-test/testThinClientPoolExecuteFunctionPrSHOP.cpp b/src/cppcache/integration-test/testThinClientPoolExecuteFunctionPrSHOP.cpp
index 9e9c48b..8eb0b07 100644
--- a/src/cppcache/integration-test/testThinClientPoolExecuteFunctionPrSHOP.cpp
+++ b/src/cppcache/integration-test/testThinClientPoolExecuteFunctionPrSHOP.cpp
@@ -1170,7 +1170,7 @@ DUNIT_TASK_DEFINITION(CLIENT1, Client1OpTest)
LOGINFO("FETimeOut begin onRegion");
ExecutionPtr RexecutionPtr = FunctionService::onRegion(regPtr0);
CacheableVectorPtr fe =
- RexecutionPtr->withArgs(CacheableInt32::create(5000))
+ RexecutionPtr->withArgs(CacheableInt32::create(5000 * 1000))
->execute(FETimeOut, 5000)
->getResult();
if (fe == NULLPTR) {
@@ -1190,9 +1190,10 @@ DUNIT_TASK_DEFINITION(CLIENT1, Client1OpTest)
LOGINFO("FETimeOut begin onServer");
ExecutionPtr serverExc = FunctionService::onServer(getHelper()->cachePtr);
- CacheableVectorPtr vec = serverExc->withArgs(CacheableInt32::create(5000))
- ->execute(FETimeOut, 5000)
- ->getResult();
+ CacheableVectorPtr vec =
+ serverExc->withArgs(CacheableInt32::create(5000 * 1000))
+ ->execute(FETimeOut, 5000)
+ ->getResult();
if (vec == NULLPTR) {
ASSERT(false, "functionResult is NULL");
} else {
@@ -1211,7 +1212,7 @@ DUNIT_TASK_DEFINITION(CLIENT1, Client1OpTest)
ExecutionPtr serversExc =
FunctionService::onServers(getHelper()->cachePtr);
CacheableVectorPtr vecs =
- serversExc->withArgs(CacheableInt32::create(5000))
+ serversExc->withArgs(CacheableInt32::create(5000 * 1000))
->execute(FETimeOut, 5000)
->getResult();
if (vecs == NULLPTR) {
@@ -1277,8 +1278,7 @@ DUNIT_TASK_DEFINITION(LOCATOR1, CloseLocator1)
}
END_TASK_DEFINITION
-void runFunctionExecution(bool isEndpoint) {
- // with locator
+void runFunctionExecution() {
CALL_TASK(StartLocator1);
CALL_TASK(StartS12);
CALL_TASK(StartC1);
@@ -1286,15 +1286,8 @@ void runFunctionExecution(bool isEndpoint) {
CALL_TASK(StopC1);
CALL_TASK(CloseServers);
CALL_TASK(CloseLocator1);
-
- // with endpoints
- CALL_TASK(StartS12);
- CALL_TASK(StartC1);
- CALL_TASK(Client1OpTest);
- CALL_TASK(StopC1);
- CALL_TASK(CloseServers);
}
DUNIT_MAIN
- { runFunctionExecution(false); }
+ { runFunctionExecution(); }
END_MAIN
http://git-wip-us.apache.org/repos/asf/geode-native/blob/aad45453/src/cppcache/integration-test/testThinClientPoolExecuteHAFunction.cpp
----------------------------------------------------------------------
diff --git a/src/cppcache/integration-test/testThinClientPoolExecuteHAFunction.cpp b/src/cppcache/integration-test/testThinClientPoolExecuteHAFunction.cpp
index 82fad1d..0fbef1b 100644
--- a/src/cppcache/integration-test/testThinClientPoolExecuteHAFunction.cpp
+++ b/src/cppcache/integration-test/testThinClientPoolExecuteHAFunction.cpp
@@ -474,8 +474,7 @@ DUNIT_TASK_DEFINITION(LOCATOR1, CloseLocator1)
}
END_TASK_DEFINITION
-void runFunctionExecution(bool isEndpoint) {
- // with locator
+void runFunctionExecution() {
CALL_TASK(StartLocator1);
CALL_TASK(StartS12);
CALL_TASK(StartC1);
@@ -483,30 +482,8 @@ void runFunctionExecution(bool isEndpoint) {
CALL_TASK(StopC1);
CALL_TASK(CloseServers12);
CALL_TASK(CloseLocator1);
-
- // with endpoints
- CALL_TASK(StartS12);
- CALL_TASK(StartC1);
- CALL_TASK(Client1OpTest); // This tests isHA with onRegion
- CALL_TASK(StopC1);
- CALL_TASK(CloseServers12);
-
- CALL_TASK(StartLocator1);
- CALL_TASK(StartS13);
- CALL_TASK(StartC11);
- CALL_TASK(Client1OnServerHATest); // This tests isHA with onServer
- CALL_TASK(StopC1);
- CALL_TASK(CloseServers13);
- CALL_TASK(CloseLocator1);
-
- // with endpoints
- CALL_TASK(StartS13);
- CALL_TASK(StartC11);
- CALL_TASK(Client1OnServerHATest); // This tests isHA with onServer
- CALL_TASK(StopC1);
- CALL_TASK(CloseServers13);
}
DUNIT_MAIN
- { runFunctionExecution(false); }
+ { runFunctionExecution(); }
END_MAIN
http://git-wip-us.apache.org/repos/asf/geode-native/blob/aad45453/src/cppcache/src/TcrMessage.cpp
----------------------------------------------------------------------
diff --git a/src/cppcache/src/TcrMessage.cpp b/src/cppcache/src/TcrMessage.cpp
index ea99461..f211334 100644
--- a/src/cppcache/src/TcrMessage.cpp
+++ b/src/cppcache/src/TcrMessage.cpp
@@ -891,8 +891,8 @@ void TcrMessage::processChunk(const uint8_t* bytes, int32_t len,
}
// fall-through for other cases
}
- case EXECUTE_REGION_FUNCTION_RESULT:
- case EXECUTE_FUNCTION_RESULT:
+ case TcrMessage::EXECUTE_REGION_FUNCTION_RESULT:
+ case TcrMessage::EXECUTE_FUNCTION_RESULT:
case TcrMessage::CQDATAERROR_MSG_TYPE: // one part
case TcrMessage::CQ_EXCEPTION_TYPE: // one part
case TcrMessage::RESPONSE_FROM_PRIMARY: {
@@ -2502,6 +2502,7 @@ TcrMessageExecuteRegionFunctionSingleHop::
m_tcdm = connectionDM;
m_regionName = region == NULL ? "INVALID_REGION_NAME" : region->getFullPath();
m_region = region;
+ m_hasResult = getResult;
uint32_t numOfParts = 6 + (routingObj == NULLPTR ? 0 : routingObj->size());
numOfParts +=
http://git-wip-us.apache.org/repos/asf/geode-native/blob/aad45453/src/cppcache/test/TcrMessage_unittest.cpp
----------------------------------------------------------------------
diff --git a/src/cppcache/test/TcrMessage_unittest.cpp b/src/cppcache/test/TcrMessage_unittest.cpp
index b71bf72..e6752ea 100644
--- a/src/cppcache/test/TcrMessage_unittest.cpp
+++ b/src/cppcache/test/TcrMessage_unittest.cpp
@@ -506,7 +506,7 @@ TEST_F(TcrMessageTest, testConstructorEXECUTE_REGION_FUNCTION_SINGLE_HOP) {
CacheablePtr myPtr(CacheableString::createDeserializable());
TcrMessageExecuteRegionFunctionSingleHop message(
- "myFuncName", region, myPtr, myHashCachePtr, 0, myHashCachePtr,
+ "myFuncName", region, myPtr, myHashCachePtr, 2, myHashCachePtr,
false, // allBuckets
1, static_cast<ThinClientBaseDM *>(NULL));
@@ -514,11 +514,13 @@ TEST_F(TcrMessageTest, testConstructorEXECUTE_REGION_FUNCTION_SINGLE_HOP) {
message.getMessageType());
EXPECT_MESSAGE_EQ(
- "0000004F0000005E00000009FFFFFFFF00000000050000000003E80000001300494E5641"
+ "0000004F0000005E00000009FFFFFFFF00000000050002000003E80000001300494E5641"
"4C49445F524547494F4E5F4E414D450000000A006D7946756E634E616D65000000030157"
"000000000001012900000001000000000004000000000000000004000000000000000002"
"014200",
message);
+
+ EXPECT_TRUE(message.hasResult());
}
TEST_F(TcrMessageTest, testConstructorEXECUTE_REGION_FUNCTION) {
@@ -529,7 +531,7 @@ TEST_F(TcrMessageTest, testConstructorEXECUTE_REGION_FUNCTION) {
CacheableVectorPtr myVectPtr = CacheableVector::create();
TcrMessageExecuteRegionFunction testMessage(
- "ExecuteRegion", region, myCacheablePtr, myVectPtr, 1, myHashCachePtr, 10,
+ "ExecuteRegion", region, myCacheablePtr, myVectPtr, 2, myHashCachePtr, 10,
static_cast<ThinClientBaseDM *>(NULL), 10);
EXPECT_EQ(TcrMessage::EXECUTE_REGION_FUNCTION, testMessage.getMessageType());
@@ -537,7 +539,7 @@ TEST_F(TcrMessageTest, testConstructorEXECUTE_REGION_FUNCTION) {
// changes
EXPECT_MESSAGE_EQ(
- "0000003B0000006100000009FFFFFFFF00000000050001000027100000001300494E5641"
+ "0000003B0000006100000009FFFFFFFF00000000050002000027100000001300494E5641"
"4C49445F524547494F4E5F4E414D450000000D0045786563757465526567696F6E000000"
"030157000000000001012900000001000A00000004000000000000000004000000000000"
"000002014200",