You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ec...@apache.org on 2017/08/10 15:20:14 UTC
[06/27] geode-native git commit: GEODE-2729: Remove global variables
http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/ThinClientPoolStickyHADM.cpp
----------------------------------------------------------------------
diff --git a/src/cppcache/src/ThinClientPoolStickyHADM.cpp b/src/cppcache/src/ThinClientPoolStickyHADM.cpp
deleted file mode 100644
index 9989328..0000000
--- a/src/cppcache/src/ThinClientPoolStickyHADM.cpp
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "ThinClientPoolStickyHADM.hpp"
-#include "TssConnectionWrapper.hpp"
-#include <algorithm>
-using namespace apache::geode::client;
-/*TcrConnection* ThinClientPoolStickyHADM::getConnectionFromQueueW( GfErrType*
-error,
- std::set< ServerLocation >& excludeServers, bool isBGThread, TcrMessage &
-request, int8_t& version, bool & dummy, const BucketServerLocationPtr&
-serverLocation )
-{
- TcrConnection* conn = nullptr;
- if( isBGThread ){
- conn = ThinClientPoolDM::getConnectionFromQueueW( error, excludeServers,
-isBGThread, request, version, dummy, serverLocation);
- return conn;
- }
-
- m_manager->getStickyConnection(conn , error, excludeServers,
-request.forTransaction());
- return conn;
-}
-void ThinClientPoolStickyHADM::putInQueue(TcrConnection* conn, bool isBGThread,
-bool isTransaction )
-{
- if( !isBGThread )
- m_manager->setStickyConnection( conn, isTransaction );
- else
- ThinClientPoolDM::putInQueue( conn, isBGThread, isTransaction);
-}
-void ThinClientPoolStickyHADM::setStickyNull( bool isBGThread )
-{
- if( !isBGThread ) m_manager->setStickyConnection( nullptr, false );
-}
-
-void ThinClientPoolStickyHADM::cleanStickyConnections(volatile bool& isRunning)
-{
- if (!isRunning) {
- return;
- }
- m_manager->cleanStaleStickyConnection();
-}
-
-bool ThinClientPoolStickyHADM::canItBeDeleted(TcrConnection* conn)
-{
- return m_manager->canThisConnBeDeleted( conn );
-}
-void ThinClientPoolStickyHADM::releaseThreadLocalConnection()
-{
- m_manager->releaseThreadLocalConnection();
-}
-void ThinClientPoolStickyHADM::setThreadLocalConnection(TcrConnection* conn)
-{
- m_manager->addStickyConnection(conn);
-}
-bool ThinClientPoolStickyHADM::canItBeDeletedNoImpl(TcrConnection* conn )
-{
- return ThinClientPoolDM::canItBeDeleted( conn );
-}*/
http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/ThinClientPoolStickyHADM.hpp
----------------------------------------------------------------------
diff --git a/src/cppcache/src/ThinClientPoolStickyHADM.hpp b/src/cppcache/src/ThinClientPoolStickyHADM.hpp
index 40c1694..07fa4ed 100644
--- a/src/cppcache/src/ThinClientPoolStickyHADM.hpp
+++ b/src/cppcache/src/ThinClientPoolStickyHADM.hpp
@@ -29,29 +29,9 @@ class ThinClientPoolStickyHADM : public ThinClientPoolHADM {
ThinClientPoolStickyHADM(const char* name, PoolAttributesPtr poolAttrs,
TcrConnectionManager& connManager)
: ThinClientPoolHADM(name, poolAttrs, connManager) {
- // m_manager = new ThinClientStickyManager( this );
m_sticky = true;
}
- virtual ~ThinClientPoolStickyHADM() {
- /*m_manager->closeAllStickyConnections();
- delete m_manager; m_manager = nullptr;*/
- }
- /*bool canItBeDeletedNoImpl(TcrConnection* conn );
-protected:
- virtual void cleanStickyConnections(volatile bool& isRunning);
- virtual TcrConnection* getConnectionFromQueueW( GfErrType* error,
- std::set< ServerLocation >&, bool isBGThread, TcrMessage & request, int8_t&
-version, bool & dummy, const BucketServerLocationPtr& serverLocation = nullptr
-);
- virtual void putInQueue(TcrConnection* conn, bool isBGThread, bool
-isTransaction = false );
- virtual void setStickyNull( bool isBGThread );
- virtual bool canItBeDeleted(TcrConnection* conn);
- virtual void releaseThreadLocalConnection();
- virtual void setThreadLocalConnection(TcrConnection* conn);
-*/
- // virtual void cleanStickyConnections(volatile bool& isRunning);
- // ThinClientStickyManager* m_manager;
+ virtual ~ThinClientPoolStickyHADM() {}
};
typedef std::shared_ptr<ThinClientPoolStickyHADM> ThinClientPoolStickyHADMPtr;
} // namespace client
http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/ThinClientRedundancyManager.cpp
----------------------------------------------------------------------
diff --git a/src/cppcache/src/ThinClientRedundancyManager.cpp b/src/cppcache/src/ThinClientRedundancyManager.cpp
index d356782..6ab3248 100644
--- a/src/cppcache/src/ThinClientRedundancyManager.cpp
+++ b/src/cppcache/src/ThinClientRedundancyManager.cpp
@@ -642,11 +642,13 @@ void ThinClientRedundancyManager::initialize(int redundancyLevel) {
m_redundancyLevel = redundancyLevel;
m_HAenabled = (redundancyLevel > 0 || m_theTcrConnManager->isDurable() ||
ThinClientBaseDM::isDeltaEnabledOnServer());
- SystemProperties* sysProp = DistributedSystem::getSystemProperties();
+ auto& sysProp = m_theTcrConnManager->getCacheImpl()
+ ->getDistributedSystem()
+ .getSystemProperties();
if (m_poolHADM) {
m_eventidmap.init(m_poolHADM->getSubscriptionMessageTrackingTimeout());
} else {
- m_eventidmap.init(sysProp->notifyDupCheckLife());
+ m_eventidmap.init(sysProp.notifyDupCheckLife());
}
int millis = 100;
if (m_HAenabled) {
@@ -655,7 +657,7 @@ void ThinClientRedundancyManager::initialize(int redundancyLevel) {
millis = m_poolHADM->getSubscriptionAckInterval();
} else {
- millis = sysProp->notifyAckInterval();
+ millis = sysProp.notifyAckInterval();
}
if (millis < 100) millis = 100;
{
@@ -719,7 +721,8 @@ void ThinClientRedundancyManager::close() {
if (m_periodicAckTask) {
if (m_processEventIdMapTaskId >= 0) {
- CacheImpl::expiryTaskManager->cancelTask(m_processEventIdMapTaskId);
+ m_theTcrConnManager->getCacheImpl()->getExpiryTaskManager().cancelTask(
+ m_processEventIdMapTaskId);
}
m_periodicAckTask->stopNoblock();
m_periodicAckSema.release();
@@ -750,7 +753,8 @@ bool ThinClientRedundancyManager::readyForEvents(
return true;
}
- TcrMessageClientReady request;
+ TcrMessageClientReady request(
+ m_theTcrConnManager->getCacheImpl()->getCache()->createDataOutput());
TcrMessageReply reply(true, nullptr);
GfErrType err = GF_NOTCON;
@@ -791,6 +795,7 @@ bool ThinClientRedundancyManager::sendMakePrimaryMesg(
}
TcrMessageReply reply(false, nullptr);
const TcrMessageMakePrimary makePrimaryRequest(
+ m_theTcrConnManager->getCacheImpl()->getCache()->createDataOutput(),
ThinClientRedundancyManager::m_sentReadyForEvents);
LOGFINE("Making primary subscription endpoint %s", ep->name().c_str());
@@ -1109,7 +1114,8 @@ bool ThinClientRedundancyManager::isDurable() {
}
void ThinClientRedundancyManager::readyForEvents() {
- TcrMessageClientReady request;
+ TcrMessageClientReady request(
+ m_theTcrConnManager->getCacheImpl()->getCache()->createDataOutput());
TcrMessageReply reply(true, nullptr);
GfErrType result = GF_NOTCON;
unsigned int epCount = 0;
@@ -1194,7 +1200,9 @@ void ThinClientRedundancyManager::doPeriodicAck() {
m_redundantEndpoints.begin();
if (endpoint != m_redundantEndpoints.end()) {
- TcrMessagePeriodicAck request(entries);
+ TcrMessagePeriodicAck request(
+ m_theTcrConnManager->getCacheImpl()->getCache()->createDataOutput(),
+ entries);
TcrMessageReply reply(true, nullptr);
GfErrType result = GF_NOERR;
@@ -1243,24 +1251,26 @@ void ThinClientRedundancyManager::startPeriodicAck() {
m_periodicAckTask = new Task<ThinClientRedundancyManager>(
this, &ThinClientRedundancyManager::periodicAck, NC_PerodicACK);
m_periodicAckTask->start();
- SystemProperties* props = DistributedSystem::getSystemProperties();
+ const auto& props = m_theTcrConnManager->getCacheImpl()
+ ->getDistributedSystem()
+ .getSystemProperties();
// start the periodic ACK task handler
- ACE_Event_Handler* periodicAckTask =
- new ExpiryHandler_T<ThinClientRedundancyManager>(
- this, &ThinClientRedundancyManager::processEventIdMap);
- // m_processEventIdMapTaskId = CacheImpl::expiryTaskManager->
- // scheduleExpiryTask(periodicAckTask, 1, 1, false);
- m_processEventIdMapTaskId = CacheImpl::expiryTaskManager->scheduleExpiryTask(
- periodicAckTask, m_nextAckInc, m_nextAckInc, false);
+ auto periodicAckTask = new ExpiryHandler_T<ThinClientRedundancyManager>(
+ this, &ThinClientRedundancyManager::processEventIdMap);
+ m_processEventIdMapTaskId =
+ m_theTcrConnManager->getCacheImpl()
+ ->getExpiryTaskManager()
+ .scheduleExpiryTask(periodicAckTask, m_nextAckInc, m_nextAckInc,
+ false);
LOGFINE(
"Registered subscription event "
"periodic ack task with id = %ld, notify-ack-interval = %ld, "
"notify-dupcheck-life = %ld, periodic ack is %sabled",
m_processEventIdMapTaskId,
m_poolHADM ? m_poolHADM->getSubscriptionAckInterval()
- : props->notifyAckInterval(),
+ : props.notifyAckInterval(),
m_poolHADM ? m_poolHADM->getSubscriptionMessageTrackingTimeout()
- : props->notifyDupCheckLife(),
+ : props.notifyDupCheckLife(),
m_HAenabled ? "en" : "dis");
}
http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/ThinClientRegion.cpp
----------------------------------------------------------------------
diff --git a/src/cppcache/src/ThinClientRegion.cpp b/src/cppcache/src/ThinClientRegion.cpp
index b7a2dcb..8c7ddf3 100644
--- a/src/cppcache/src/ThinClientRegion.cpp
+++ b/src/cppcache/src/ThinClientRegion.cpp
@@ -15,24 +15,26 @@
* limitations under the License.
*/
+#include <geode/SelectResultsIterator.hpp>
+#include <geode/SystemProperties.hpp>
+#include <geode/PoolManager.hpp>
+#include <geode/UserFunctionExecutionException.hpp>
+
#include "Utils.hpp"
+#include "CacheRegionHelper.hpp"
#include "ThinClientRegion.hpp"
#include "TcrDistributionManager.hpp"
#include "ThinClientPoolDM.hpp"
#include "ThinClientBaseDM.hpp"
#include "TcrEndpoint.hpp"
-#include <geode/SystemProperties.hpp>
#include "CacheImpl.hpp"
#include "RegionGlobalLocks.hpp"
#include "ReadWriteLock.hpp"
#include "RemoteQuery.hpp"
-#include <geode/SelectResultsIterator.hpp>
#include <geode/Struct.hpp>
#include "GeodeTypeIdsImpl.hpp"
#include "AutoDelete.hpp"
-#include <geode/PoolManager.hpp>
#include "UserAttributes.hpp"
-#include <geode/UserFunctionExecutionException.hpp>
#include "PutAllPartialResultServerException.hpp"
#include "VersionedCacheableObjectPartList.hpp"
//#include "PutAllPartialResult.hpp"
@@ -88,9 +90,9 @@ class PutAllWork : public PooledWork<GfErrType>,
m_isPapeReceived(false)
// UNUSED , m_aCallbackArgument(aCallbackArgument)
{
- m_request = new TcrMessagePutAll(m_region.get(), *m_map.get(),
- static_cast<int>(m_timeout * 1000),
- m_poolDM, aCallbackArgument);
+ m_request = new TcrMessagePutAll(
+ m_region->getCache()->createDataOutput(), m_region.get(), *m_map.get(),
+ static_cast<int>(m_timeout * 1000), m_poolDM, aCallbackArgument);
m_reply = new TcrMessageReply(true, m_poolDM);
// create new instanceof VCOPL
@@ -228,8 +230,9 @@ class RemoveAllWork : public PooledWork<GfErrType>,
m_keys(keys),
m_papException(nullptr),
m_isPapeReceived(false) {
- m_request = new TcrMessageRemoveAll(m_region.get(), *keys,
- m_aCallbackArgument, m_poolDM);
+ m_request = new TcrMessageRemoveAll(
+ m_region->getCache()->createDataOutput(), m_region.get(), *keys,
+ m_aCallbackArgument, m_poolDM);
m_reply = new TcrMessageReply(true, m_poolDM);
// create new instanceof VCOPL
ACE_Recursive_Thread_Mutex responseLock;
@@ -327,30 +330,35 @@ class RemoveAllWork : public PooledWork<GfErrType>,
}
};
-ThinClientRegion::ThinClientRegion(const std::string& name, CacheImpl* cache,
+ThinClientRegion::ThinClientRegion(const std::string& name,
+ CacheImpl* cacheImpl,
const RegionInternalPtr& rPtr,
const RegionAttributesPtr& attributes,
const CacheStatisticsPtr& stats, bool shared)
- : LocalRegion(name, cache, rPtr, attributes, stats, shared),
+ : LocalRegion(name, cacheImpl, rPtr, attributes, stats, shared),
m_tcrdm((ThinClientBaseDM*)0),
m_notifyRelease(false),
m_notificationSema(1),
m_isMetaDataRefreshed(false) {
m_transactionEnabled = true;
- m_isDurableClnt =
- strlen(DistributedSystem::getSystemProperties()->durableClientId()) > 0;
+ m_isDurableClnt = strlen(cacheImpl->getDistributedSystem()
+ .getSystemProperties()
+ .durableClientId()) > 0;
}
void ThinClientRegion::initTCR() {
bool subscription = false;
- PoolPtr pool = PoolManager::find(getAttributes()->getPoolName());
+ PoolPtr pool = m_cacheImpl->getCache()->getPoolManager().find(
+ getAttributes()->getPoolName());
if (pool != nullptr) {
subscription = pool->getSubscriptionEnabled();
}
bool notificationEnabled =
getAttributes()->getClientNotificationEnabled() || subscription;
if (notificationEnabled) {
- if (DistributedSystem::getSystemProperties()->isGridClient()) {
+ if (m_cacheImpl->getDistributedSystem()
+ .getSystemProperties()
+ .isGridClient()) {
LOGWARN(
"Region %s: client subscription channel enabled for a grid "
"client; starting required internal subscription, cleanup and "
@@ -374,7 +382,8 @@ void ThinClientRegion::initTCR() {
void ThinClientRegion::registerKeys(const VectorOfCacheableKey& keys,
bool isDurable, bool getInitialValues,
bool receiveValues) {
- PoolPtr pool = PoolManager::find(getAttributes()->getPoolName());
+ PoolPtr pool = m_cacheImpl->getCache()->getPoolManager().find(
+ getAttributes()->getPoolName());
if (pool != nullptr) {
if (!pool->getSubscriptionEnabled()) {
LOGERROR(
@@ -417,7 +426,8 @@ void ThinClientRegion::registerKeys(const VectorOfCacheableKey& keys,
}
void ThinClientRegion::unregisterKeys(const VectorOfCacheableKey& keys) {
- PoolPtr pool = PoolManager::find(getAttributes()->getPoolName());
+ PoolPtr pool = m_cacheImpl->getCache()->getPoolManager().find(
+ getAttributes()->getPoolName());
if (pool != nullptr) {
if (!pool->getSubscriptionEnabled()) {
LOGERROR(
@@ -452,7 +462,8 @@ void ThinClientRegion::registerAllKeys(bool isDurable,
VectorOfCacheableKeyPtr resultKeys,
bool getInitialValues,
bool receiveValues) {
- PoolPtr pool = PoolManager::find(getAttributes()->getPoolName());
+ PoolPtr pool = m_cacheImpl->getCache()->getPoolManager().find(
+ getAttributes()->getPoolName());
if (pool != nullptr) {
if (!pool->getSubscriptionEnabled()) {
LOGERROR(
@@ -510,7 +521,8 @@ void ThinClientRegion::registerRegex(const char* regex, bool isDurable,
VectorOfCacheableKeyPtr resultKeys,
bool getInitialValues,
bool receiveValues) {
- PoolPtr pool = PoolManager::find(getAttributes()->getPoolName());
+ PoolPtr pool = m_cacheImpl->getCache()->getPoolManager().find(
+ getAttributes()->getPoolName());
if (pool != nullptr) {
if (!pool->getSubscriptionEnabled()) {
LOGERROR(
@@ -571,7 +583,8 @@ void ThinClientRegion::registerRegex(const char* regex, bool isDurable,
}
void ThinClientRegion::unregisterRegex(const char* regex) {
- PoolPtr pool = PoolManager::find(getAttributes()->getPoolName());
+ PoolPtr pool = m_cacheImpl->getCache()->getPoolManager().find(
+ getAttributes()->getPoolName());
if (pool != nullptr) {
if (!pool->getSubscriptionEnabled()) {
LOGERROR(
@@ -596,7 +609,8 @@ void ThinClientRegion::unregisterRegex(const char* regex) {
}
void ThinClientRegion::unregisterAllKeys() {
- PoolPtr pool = PoolManager::find(getAttributes()->getPoolName());
+ PoolPtr pool = m_cacheImpl->getCache()->getPoolManager().find(
+ getAttributes()->getPoolName());
if (pool != nullptr) {
if (!pool->getSubscriptionEnabled()) {
LOGERROR(
@@ -688,7 +702,8 @@ bool ThinClientRegion::existsValue(const char* predicate, uint32_t timeout) {
}
GfErrType ThinClientRegion::unregisterKeysBeforeDestroyRegion() {
- PoolPtr pool = PoolManager::find(getAttributes()->getPoolName());
+ PoolPtr pool = m_cacheImpl->getCache()->getPoolManager().find(
+ getAttributes()->getPoolName());
if (pool != nullptr) {
if (!pool->getSubscriptionEnabled()) {
LOGDEBUG(
@@ -756,7 +771,8 @@ void ThinClientRegion::serverKeys(VectorOfCacheableKey& v) {
CHECK_DESTROY_PENDING(TryReadGuard, Region::serverKeys);
TcrMessageReply reply(true, m_tcrdm);
- TcrMessageKeySet request(m_fullPath, m_tcrdm);
+ TcrMessageKeySet request(m_cacheImpl->getCache()->createDataOutput(),
+ m_fullPath, m_tcrdm);
reply.setMessageTypeRequest(TcrMessage::KEY_SET);
// need to check
ChunkedKeySetResponse* resultCollector(
@@ -812,8 +828,9 @@ bool ThinClientRegion::containsKeyOnServer(
/** @brief Create message and send to bridge server */
- TcrMessageContainsKey request(this, keyPtr, static_cast<UserDataPtr>(nullptr),
- true, m_tcrdm);
+ TcrMessageContainsKey request(m_cache->createDataOutput(), this, keyPtr,
+ static_cast<UserDataPtr>(nullptr), true,
+ m_tcrdm);
TcrMessageReply reply(true, m_tcrdm);
reply.setMessageTypeRequest(TcrMessage::CONTAINS_KEY);
err = m_tcrdm->sendSyncRequest(request, reply);
@@ -866,8 +883,9 @@ bool ThinClientRegion::containsValueForKey_remote(
/** @brief Create message and send to bridge server */
- TcrMessageContainsKey request(this, keyPtr, static_cast<UserDataPtr>(nullptr),
- false, m_tcrdm);
+ TcrMessageContainsKey request(m_cache->createDataOutput(), this, keyPtr,
+ static_cast<UserDataPtr>(nullptr), false,
+ m_tcrdm);
TcrMessageReply reply(true, m_tcrdm);
reply.setMessageTypeRequest(TcrMessage::CONTAINS_KEY);
err = m_tcrdm->sendSyncRequest(request, reply);
@@ -912,7 +930,8 @@ void ThinClientRegion::clear(const UserDataPtr& aCallbackArgument) {
/** @brief Create message and send to bridge server */
- TcrMessageClearRegion request(this, aCallbackArgument, -1, m_tcrdm);
+ TcrMessageClearRegion request(m_cache->createDataOutput(), this,
+ aCallbackArgument, -1, m_tcrdm);
TcrMessageReply reply(true, m_tcrdm);
err = m_tcrdm->sendSyncRequest(request, reply);
if (err != GF_NOERR) GfErrTypeToException("Region::clear", err);
@@ -952,7 +971,8 @@ GfErrType ThinClientRegion::getNoThrow_remote(
/** @brief Create message and send to bridge server */
- TcrMessageRequest request(this, keyPtr, aCallbackArgument, m_tcrdm);
+ TcrMessageRequest request(m_cache->createDataOutput(), this, keyPtr,
+ aCallbackArgument, m_tcrdm);
TcrMessageReply reply(true, m_tcrdm);
err = m_tcrdm->sendSyncRequest(request, reply);
if (err != GF_NOERR) return err;
@@ -991,7 +1011,8 @@ GfErrType ThinClientRegion::invalidateNoThrow_remote(
/** @brief Create message and send to bridge server */
- TcrMessageInvalidate request(this, keyPtr, aCallbackArgument, m_tcrdm);
+ TcrMessageInvalidate request(m_cache->createDataOutput(), this, keyPtr,
+ aCallbackArgument, m_tcrdm);
TcrMessageReply reply(true, m_tcrdm);
err = m_tcrdm->sendSyncRequest(request, reply);
if (err != GF_NOERR) return err;
@@ -1030,25 +1051,27 @@ GfErrType ThinClientRegion::putNoThrow_remote(
// do TCR put
// bool delta = valuePtr->hasDelta();
bool delta = false;
- const char* conFlationValue =
- DistributedSystem::getSystemProperties()->conflateEvents();
+ const char* conFlationValue = getCacheImpl()
+ ->getDistributedSystem()
+ .getSystemProperties()
+ .conflateEvents();
if (checkDelta && valuePtr != nullptr && conFlationValue != nullptr &&
strcmp(conFlationValue, "true") != 0 &&
ThinClientBaseDM::isDeltaEnabledOnServer()) {
Delta* temp = dynamic_cast<Delta*>(valuePtr.get());
delta = (temp && temp->hasDelta());
}
- TcrMessagePut request(this, keyPtr, valuePtr, aCallbackArgument, delta,
- m_tcrdm);
+ TcrMessagePut request(m_cache->createDataOutput(), this, keyPtr, valuePtr,
+ aCallbackArgument, delta, m_tcrdm);
TcrMessageReply* reply = new TcrMessageReply(true, m_tcrdm);
err = m_tcrdm->sendSyncRequest(request, *reply);
if (delta) {
- m_cacheImpl->m_cacheStats
- ->incDeltaPut(); // Does not chcek whether success of failure..
+ m_cacheImpl->getCachePerfStats()
+ .incDeltaPut(); // Does not chcek whether success of failure..
if (reply->getMessageType() ==
TcrMessage::PUT_DELTA_ERROR) { // Try without delta
- TcrMessagePut request(this, keyPtr, valuePtr, aCallbackArgument, false,
- m_tcrdm, false, true);
+ TcrMessagePut request(m_cache->createDataOutput(), this, keyPtr, valuePtr,
+ aCallbackArgument, false, m_tcrdm, false, true);
delete reply;
reply = new TcrMessageReply(true, m_tcrdm);
err = m_tcrdm->sendSyncRequest(request, *reply);
@@ -1096,7 +1119,8 @@ GfErrType ThinClientRegion::destroyNoThrow_remote(
GfErrType err = GF_NOERR;
// do TCR destroy
- TcrMessageDestroy request(this, keyPtr, nullptr, aCallbackArgument, m_tcrdm);
+ TcrMessageDestroy request(m_cache->createDataOutput(), this, keyPtr, nullptr,
+ aCallbackArgument, m_tcrdm);
TcrMessageReply reply(true, m_tcrdm);
err = m_tcrdm->sendSyncRequest(request, reply);
if (err != GF_NOERR) return err;
@@ -1137,7 +1161,8 @@ GfErrType ThinClientRegion::removeNoThrow_remote(
GfErrType err = GF_NOERR;
// do TCR remove
- TcrMessageDestroy request(this, keyPtr, cvalue, aCallbackArgument, m_tcrdm);
+ TcrMessageDestroy request(m_cache->createDataOutput(), this, keyPtr, cvalue,
+ aCallbackArgument, m_tcrdm);
TcrMessageReply reply(true, m_tcrdm);
err = m_tcrdm->sendSyncRequest(request, reply);
if (err != GF_NOERR) {
@@ -1178,7 +1203,8 @@ GfErrType ThinClientRegion::removeNoThrowEX_remote(
GfErrType err = GF_NOERR;
// do TCR remove
- TcrMessageDestroy request(this, keyPtr, nullptr, aCallbackArgument, m_tcrdm);
+ TcrMessageDestroy request(m_cache->createDataOutput(), this, keyPtr, nullptr,
+ aCallbackArgument, m_tcrdm);
TcrMessageReply reply(true, m_tcrdm);
err = m_tcrdm->sendSyncRequest(request, reply);
if (err != GF_NOERR) {
@@ -1239,7 +1265,7 @@ GfErrType ThinClientRegion::getAllNoThrow_remote(
}
// create the GET_ALL request
TcrMessageGetAll request(
- this, keys, m_tcrdm,
+ m_cache->createDataOutput(), this, keys, m_tcrdm,
aCallbackArgument); // now we need to initialize later
TcrMessageReply reply(true, m_tcrdm);
@@ -1347,7 +1373,8 @@ GfErrType ThinClientRegion::singleHopPutAllNoThrow_remote(
* e. insert the worker into the vector.
*/
std::vector<PutAllWork*> putAllWorkers;
- ThreadPool* threadPool = TPSingleton::instance();
+ auto* threadPool =
+ CacheRegionHelper::getCacheImpl(getCache().get())->getThreadPool();
int locationMapIndex = 0;
for (const auto& locationIter : *locationMap) {
const auto& serverLocation = locationIter.first;
@@ -1616,7 +1643,8 @@ GfErrType ThinClientRegion::multiHopPutAllNoThrow_remote(
GfErrType err = GF_NOERR;
// Construct request/reply for putAll
- TcrMessagePutAll request(this, map, static_cast<int>(timeout * 1000), m_tcrdm,
+ TcrMessagePutAll request(m_cache->createDataOutput(), this, map,
+ static_cast<int>(timeout * 1000), m_tcrdm,
aCallbackArgument);
TcrMessageReply reply(true, m_tcrdm);
request.setTimeout(timeout);
@@ -1624,7 +1652,7 @@ GfErrType ThinClientRegion::multiHopPutAllNoThrow_remote(
ACE_Recursive_Thread_Mutex responseLock;
versionedObjPartList =
- std::make_shared<VersionedCacheableObjectPartList>(responseLock);
+ std::make_shared<VersionedCacheableObjectPartList>(this, responseLock);
// need to check
ChunkedPutAllResponse* resultCollector(new ChunkedPutAllResponse(
shared_from_this(), reply, responseLock, versionedObjPartList));
@@ -1731,7 +1759,8 @@ GfErrType ThinClientRegion::singleHopRemoveAllNoThrow_remote(
* e. insert the worker into the vector.
*/
std::vector<RemoveAllWork*> removeAllWorkers;
- ThreadPool* threadPool = TPSingleton::instance();
+ auto* threadPool =
+ CacheRegionHelper::getCacheImpl(getCache().get())->getThreadPool();
int locationMapIndex = 0;
for (const auto& locationIter : *locationMap) {
const auto& serverLocation = locationIter.first;
@@ -1947,12 +1976,13 @@ GfErrType ThinClientRegion::multiHopRemoveAllNoThrow_remote(
GfErrType err = GF_NOERR;
// Construct request/reply for putAll
- TcrMessageRemoveAll request(this, keys, aCallbackArgument, m_tcrdm);
+ TcrMessageRemoveAll request(m_cache->createDataOutput(), this, keys,
+ aCallbackArgument, m_tcrdm);
TcrMessageReply reply(true, m_tcrdm);
ACE_Recursive_Thread_Mutex responseLock;
versionedObjPartList =
- std::make_shared<VersionedCacheableObjectPartList>(responseLock);
+ std::make_shared<VersionedCacheableObjectPartList>(this, responseLock);
// need to check
ChunkedRemoveAllResponse* resultCollector(new ChunkedRemoveAllResponse(
shared_from_this(), reply, responseLock, versionedObjPartList));
@@ -2023,7 +2053,7 @@ uint32_t ThinClientRegion::size_remote() {
GfErrType err = GF_NOERR;
// do TCR size
- TcrMessageSize request(m_fullPath.c_str());
+ TcrMessageSize request(m_cache->createDataOutput(), m_fullPath.c_str());
TcrMessageReply reply(true, m_tcrdm);
err = m_tcrdm->sendSyncRequest(request, reply);
@@ -2034,8 +2064,7 @@ uint32_t ThinClientRegion::size_remote() {
switch (reply.getMessageType()) {
case TcrMessage::RESPONSE: {
CacheableInt32Ptr size =
- std::static_pointer_cast<CacheableInt32>(
- reply.getValue());
+ std::static_pointer_cast<CacheableInt32>(reply.getValue());
return size->value();
// LOGINFO("Map is written into remote server at region %s",
// m_fullPath.c_str());
@@ -2229,7 +2258,8 @@ GfErrType ThinClientRegion::destroyRegionNoThrow_remote(
GfErrType err = GF_NOERR;
// do TCR destroyRegion
- TcrMessageDestroyRegion request(this, aCallbackArgument, -1, m_tcrdm);
+ TcrMessageDestroyRegion request(m_cache->createDataOutput(), this,
+ aCallbackArgument, -1, m_tcrdm);
TcrMessageReply reply(true, m_tcrdm);
err = m_tcrdm->sendSyncRequest(request, reply);
if (err != GF_NOERR) return err;
@@ -2284,8 +2314,9 @@ GfErrType ThinClientRegion::registerKeysNoThrow(
interestPolicy.ordinal);
TcrMessageRegisterInterestList request(
- this, keys, isDurable, getAttributes()->getCachingEnabled(),
- receiveValues, interestPolicy, m_tcrdm);
+ m_cache->createDataOutput(), this, keys, isDurable,
+ getAttributes()->getCachingEnabled(), receiveValues, interestPolicy,
+ m_tcrdm);
ACE_Recursive_Thread_Mutex responseLock;
TcrChunkedResult* resultCollector = nullptr;
if (interestPolicy.ordinal == InterestResultPolicy::KEYS_VALUES.ordinal) {
@@ -2348,7 +2379,8 @@ GfErrType ThinClientRegion::unregisterKeysNoThrow(
return GF_CACHE_ILLEGAL_STATE_EXCEPTION;
}
- TcrMessageUnregisterInterestList request(this, keys, false, false, true,
+ TcrMessageUnregisterInterestList request(m_cache->createDataOutput(), this,
+ keys, false, false, true,
InterestResultPolicy::NONE, m_tcrdm);
err = m_tcrdm->sendSyncRequestRegisterInterest(request, reply);
if (err == GF_NOERR /*|| err == GF_CACHE_REDUNDANCY_FAILURE*/) {
@@ -2382,7 +2414,8 @@ GfErrType ThinClientRegion::unregisterKeysNoThrowLocalDestroy(
return GF_CACHE_ILLEGAL_STATE_EXCEPTION;
}
- TcrMessageUnregisterInterestList request(this, keys, false, false, true,
+ TcrMessageUnregisterInterestList request(m_cache->createDataOutput(), this,
+ keys, false, false, true,
InterestResultPolicy::NONE, m_tcrdm);
err = m_tcrdm->sendSyncRequestRegisterInterest(request, reply);
if (err == GF_NOERR) {
@@ -2456,8 +2489,8 @@ GfErrType ThinClientRegion::registerRegexNoThrow(
// TODO:
TcrMessageRegisterInterest request(
- m_fullPath, regex.c_str(), interestPolicy, isDurable,
- getAttributes()->getCachingEnabled(), receiveValues, m_tcrdm);
+ m_cache->createDataOutput(), m_fullPath, regex.c_str(), interestPolicy,
+ isDurable, getAttributes()->getCachingEnabled(), receiveValues, m_tcrdm);
ACE_Recursive_Thread_Mutex responseLock;
if (reply == nullptr) {
reply = &replyLocal;
@@ -2527,9 +2560,9 @@ GfErrType ThinClientRegion::unregisterRegexNoThrow(const std::string& regex,
if (err == GF_NOERR) {
TcrMessageReply reply(false, m_tcrdm);
- TcrMessageUnregisterInterest request(m_fullPath, regex,
- InterestResultPolicy::NONE, false,
- false, true, m_tcrdm);
+ TcrMessageUnregisterInterest request(
+ m_cache->createDataOutput(), m_fullPath, regex,
+ InterestResultPolicy::NONE, false, false, true, m_tcrdm);
err = m_tcrdm->sendSyncRequestRegisterInterest(request, reply);
if (err == GF_NOERR /*|| err == GF_CACHE_REDUNDANCY_FAILURE*/) {
if (attemptFailover) {
@@ -2573,9 +2606,9 @@ GfErrType ThinClientRegion::unregisterRegexNoThrowLocalDestroy(
if (err == GF_NOERR) {
TcrMessageReply reply(false, m_tcrdm);
- TcrMessageUnregisterInterest request(m_fullPath, regex,
- InterestResultPolicy::NONE, false,
- false, true, m_tcrdm);
+ TcrMessageUnregisterInterest request(
+ m_cache->createDataOutput(), m_fullPath, regex,
+ InterestResultPolicy::NONE, false, false, true, m_tcrdm);
err = m_tcrdm->sendSyncRequestRegisterInterest(request, reply);
if (err == GF_NOERR) {
if (attemptFailover) {
@@ -3054,12 +3087,12 @@ void ThinClientRegion::executeFunction(const char* func,
TcrMessage* msg;
if (reExecuteForServ) {
msg = new TcrMessageExecuteRegionFunction(
- funcName, this, args, routingObj, getResult, failedNodes, timeout,
- m_tcrdm, static_cast<int8_t>(1));
+ m_cache->createDataOutput(), funcName, this, args, routingObj,
+ getResult, failedNodes, timeout, m_tcrdm, static_cast<int8_t>(1));
} else {
msg = new TcrMessageExecuteRegionFunction(
- funcName, this, args, routingObj, getResult, failedNodes, timeout,
- m_tcrdm, static_cast<int8_t>(0));
+ m_cache->createDataOutput(), funcName, this, args, routingObj,
+ getResult, failedNodes, timeout, m_tcrdm, static_cast<int8_t>(0));
}
TcrMessageReply reply(true, m_tcrdm);
// need to check
@@ -3151,9 +3184,10 @@ CacheableVectorPtr ThinClientRegion::reExecuteFunction(
do {
reExecute = false;
std::string funcName(func);
- TcrMessageExecuteRegionFunction msg(
- funcName, this, args, routingObj, getResult, failedNodes, timeout,
- m_tcrdm, /*reExecute*/ static_cast<int8_t>(1));
+ TcrMessageExecuteRegionFunction msg(m_cache->createDataOutput(), funcName,
+ this, args, routingObj, getResult,
+ failedNodes, timeout, m_tcrdm,
+ /*reExecute*/ static_cast<int8_t>(1));
TcrMessageReply reply(true, m_tcrdm);
// need to check
ChunkedFunctionExecutionResponse* resultCollector(
@@ -3222,7 +3256,8 @@ bool ThinClientRegion::executeFunctionSH(
const auto& userAttr =
TSSUserAttributesWrapper::s_geodeTSSUserAttributes->getUserAttributes();
std::vector<OnRegionFunctionExecution*> feWorkers;
- auto threadPool = TPSingleton::instance();
+ auto* threadPool =
+ CacheRegionHelper::getCacheImpl(getCache().get())->getThreadPool();
for (const auto& locationIter : *locationMap) {
const auto& serverLocation = locationIter.first;
@@ -3317,7 +3352,8 @@ GfErrType ThinClientRegion::getFuncAttributes(const char* func,
// do TCR GET_FUNCTION_ATTRIBUTES
LOGDEBUG("Tcrmessage request GET_FUNCTION_ATTRIBUTES ");
std::string funcName(func);
- TcrMessageGetFunctionAttributes request(funcName, m_tcrdm);
+ TcrMessageGetFunctionAttributes request(m_cache->createDataOutput(), funcName,
+ m_tcrdm);
TcrMessageReply reply(true, m_tcrdm);
err = m_tcrdm->sendSyncRequest(request, reply);
if (err != GF_NOERR) {
@@ -3345,7 +3381,8 @@ GfErrType ThinClientRegion::getFuncAttributes(const char* func,
GfErrType ThinClientRegion::getNoThrow_FullObject(EventIdPtr eventId,
CacheablePtr& fullObject,
VersionTagPtr& versionTag) {
- TcrMessageRequestEventValue fullObjectMsg(eventId);
+ TcrMessageRequestEventValue fullObjectMsg(m_cache->createDataOutput(),
+ eventId);
TcrMessageReply reply(true, nullptr);
GfErrType err = GF_NOTCON;
@@ -3378,12 +3415,12 @@ void ThinClientRegion::txPut(const CacheableKeyPtr& key,
const UserDataPtr& aCallbackArgument,
VersionTagPtr versionTag) {
CacheablePtr oldValue;
- int64_t sampleStartNanos = Utils::startStatOpTime();
+ int64_t sampleStartNanos = startStatOpTime();
GfErrType err = putNoThrowTX(key, value, aCallbackArgument, oldValue, -1,
CacheEventFlags::NORMAL, versionTag);
- Utils::updateStatOpTime(m_regionStats->getStat(),
- RegionStatType::getInstance()->getPutTimeId(),
- sampleStartNanos);
+
+ updateStatOpTime(m_regionStats->getStat(), m_regionStats->getPutTimeId(),
+ sampleStartNanos);
GfErrTypeToException("Region::putTX", err);
}
@@ -3395,18 +3432,19 @@ void ChunkedInterestResponse::reset() {
void ChunkedInterestResponse::handleChunk(const uint8_t* chunk,
int32_t chunkLen,
- uint8_t isLastChunkWithSecurity) {
- DataInput input(chunk, chunkLen);
+ uint8_t isLastChunkWithSecurity,
+ const Cache* cache) {
+ auto input = cache->createDataInput(chunk, chunkLen);
- input.setPoolName(m_replyMsg.getPoolName());
+ input->setPoolName(m_replyMsg.getPoolName());
uint32_t partLen;
if (TcrMessageHelper::readChunkPartHeader(
- m_msg, input, 0, GeodeTypeIds::CacheableArrayList,
+ m_msg, *input, 0, GeodeTypeIds::CacheableArrayList,
"ChunkedInterestResponse", partLen,
isLastChunkWithSecurity) != TcrMessageHelper::OBJECT) {
// encountered an exception part, so return without reading more
- m_replyMsg.readSecureObjectPart(input, false, true,
+ m_replyMsg.readSecureObjectPart(*input, false, true,
isLastChunkWithSecurity);
return;
}
@@ -3414,8 +3452,8 @@ void ChunkedInterestResponse::handleChunk(const uint8_t* chunk,
if (m_resultKeys == nullptr) {
m_resultKeys = std::make_shared<VectorOfCacheableKey>();
}
- serializer::readObject(input, *m_resultKeys);
- m_replyMsg.readSecureObjectPart(input, false, true, isLastChunkWithSecurity);
+ serializer::readObject(*input, *m_resultKeys);
+ m_replyMsg.readSecureObjectPart(*input, false, true, isLastChunkWithSecurity);
}
void ChunkedKeySetResponse::reset() {
@@ -3425,24 +3463,25 @@ void ChunkedKeySetResponse::reset() {
}
void ChunkedKeySetResponse::handleChunk(const uint8_t* chunk, int32_t chunkLen,
- uint8_t isLastChunkWithSecurity) {
- DataInput input(chunk, chunkLen);
+ uint8_t isLastChunkWithSecurity,
+ const Cache* cache) {
+ auto input = cache->createDataInput(chunk, chunkLen);
- input.setPoolName(m_replyMsg.getPoolName());
+ input->setPoolName(m_replyMsg.getPoolName());
uint32_t partLen;
if (TcrMessageHelper::readChunkPartHeader(
- m_msg, input, 0, GeodeTypeIds::CacheableArrayList,
+ m_msg, *input, 0, GeodeTypeIds::CacheableArrayList,
"ChunkedKeySetResponse", partLen,
isLastChunkWithSecurity) != TcrMessageHelper::OBJECT) {
// encountered an exception part, so return without reading more
- m_replyMsg.readSecureObjectPart(input, false, true,
+ m_replyMsg.readSecureObjectPart(*input, false, true,
isLastChunkWithSecurity);
return;
}
- serializer::readObject(input, m_resultKeys);
- m_replyMsg.readSecureObjectPart(input, false, true, isLastChunkWithSecurity);
+ serializer::readObject(*input, m_resultKeys);
+ m_replyMsg.readSecureObjectPart(*input, false, true, isLastChunkWithSecurity);
}
void ChunkedQueryResponse::reset() {
@@ -3510,31 +3549,32 @@ void ChunkedQueryResponse::readObjectPartList(DataInput& input,
}
void ChunkedQueryResponse::handleChunk(const uint8_t* chunk, int32_t chunkLen,
- uint8_t isLastChunkWithSecurity) {
+ uint8_t isLastChunkWithSecurity,
+ const Cache* cache) {
LOGDEBUG("ChunkedQueryResponse::handleChunk..");
- DataInput input(chunk, chunkLen);
- input.setPoolName(m_msg.getPoolName());
+ auto input = cache->createDataInput(chunk, chunkLen);
+ input->setPoolName(m_msg.getPoolName());
uint32_t partLen;
int8_t isObj;
TcrMessageHelper::ChunkObjectType objType;
if ((objType = TcrMessageHelper::readChunkPartHeader(
- m_msg, input, GeodeTypeIdsImpl::FixedIDByte,
+ m_msg, *input, GeodeTypeIdsImpl::FixedIDByte,
static_cast<uint8_t>(GeodeTypeIdsImpl::CollectionTypeImpl),
"ChunkedQueryResponse", partLen, isLastChunkWithSecurity)) ==
TcrMessageHelper::EXCEPTION) {
// encountered an exception part, so return without reading more
- m_msg.readSecureObjectPart(input, false, true, isLastChunkWithSecurity);
+ m_msg.readSecureObjectPart(*input, false, true, isLastChunkWithSecurity);
return;
} else if (objType == TcrMessageHelper::NULL_OBJECT) {
// special case for scalar result
- input.readInt(&partLen);
- input.read(&isObj);
+ input->readInt(&partLen);
+ input->read(&isObj);
CacheableInt32Ptr intVal;
- input.readObject(intVal, true);
+ input->readObject(intVal, true);
m_queryResults->push_back(intVal);
// TODO:
- m_msg.readSecureObjectPart(input, false, true, isLastChunkWithSecurity);
+ m_msg.readSecureObjectPart(*input, false, true, isLastChunkWithSecurity);
return;
}
@@ -3550,30 +3590,30 @@ void ChunkedQueryResponse::handleChunk(const uint8_t* chunk, int32_t chunkLen,
// If the results on server are in a bag, or the user need to manipulate
// the elements, then we have to revisit this issue.
// For now, we'll live with duplicate records, hoping they do not cost much.
- skipClass(input);
+ skipClass(*input);
// skipping CollectionTypeImpl
- // skipClass(input); // no longer, since GFE 5.7
+ // skipClass(*input); // no longer, since GFE 5.7
int8_t structType;
- input.read(&structType); // this is Fixed ID byte (1)
- input.read(&structType); // this is DataSerializable (45)
- input.read(&classByte);
+ input->read(&structType); // this is Fixed ID byte (1)
+ input->read(&structType); // this is DataSerializable (45)
+ input->read(&classByte);
uint8_t stringType;
- input.read(&stringType); // ignore string header - assume 64k string
- input.readUTF(&isStructTypeImpl, &stiLen);
+ input->read(&stringType); // ignore string header - assume 64k string
+ input->readUTF(&isStructTypeImpl, &stiLen);
DeleteArray<char> delSTI(isStructTypeImpl);
if (strcmp(isStructTypeImpl, "org.apache.geode.cache.query.Struct") == 0) {
int32_t numOfFldNames;
- input.readArrayLen(&numOfFldNames);
+ input->readArrayLen(&numOfFldNames);
bool skip = false;
if (m_structFieldNames.size() != 0) {
skip = true;
}
for (int i = 0; i < numOfFldNames; i++) {
CacheableStringPtr sptr;
- // input.readObject(sptr);
- input.readNativeString(sptr);
+ // input->readObject(sptr);
+ input->readNativeString(sptr);
if (!skip) {
m_structFieldNames.push_back(sptr);
}
@@ -3581,12 +3621,12 @@ void ChunkedQueryResponse::handleChunk(const uint8_t* chunk, int32_t chunkLen,
}
// skip the remaining part
- input.reset();
+ input->reset();
// skip the whole part including partLen and isObj (4+1)
- input.advanceCursor(partLen + 5);
+ input->advanceCursor(partLen + 5);
- input.readInt(&partLen);
- input.read(&isObj);
+ input->readInt(&partLen);
+ input->read(&isObj);
if (!isObj) {
LOGERROR(
"Query response part is not an object; possible serialization "
@@ -3599,30 +3639,30 @@ void ChunkedQueryResponse::handleChunk(const uint8_t* chunk, int32_t chunkLen,
bool isResultSet = (m_structFieldNames.size() == 0);
int8_t arrayType;
- input.read(&arrayType);
+ input->read(&arrayType);
if (arrayType == GeodeTypeIds::CacheableObjectArray) {
int32_t arraySize;
- input.readArrayLen(&arraySize);
- skipClass(input);
+ input->readArrayLen(&arraySize);
+ skipClass(*input);
for (int32_t arrayItem = 0; arrayItem < arraySize; ++arrayItem) {
SerializablePtr value;
if (isResultSet) {
- input.readObject(value);
+ input->readObject(value);
m_queryResults->push_back(value);
} else {
- input.read(&isObj);
+ input->read(&isObj);
int32_t arraySize2;
- input.readArrayLen(&arraySize2);
- skipClass(input);
+ input->readArrayLen(&arraySize2);
+ skipClass(*input);
for (int32_t index = 0; index < arraySize2; ++index) {
- input.readObject(value);
+ input->readObject(value);
m_queryResults->push_back(value);
}
}
}
} else if (arrayType == GeodeTypeIdsImpl::FixedIDByte) {
- input.read(&arrayType);
+ input->read(&arrayType);
if (arrayType != GeodeTypeIdsImpl::CacheableObjectPartList) {
LOGERROR(
"Query response got unhandled message format %d while expecting "
@@ -3632,7 +3672,7 @@ void ChunkedQueryResponse::handleChunk(const uint8_t* chunk, int32_t chunkLen,
"Query response got unhandled message format while expecting object "
"part list; possible serialization mismatch");
}
- readObjectPartList(input, isResultSet);
+ readObjectPartList(*input, isResultSet);
} else {
LOGERROR(
"Query response got unhandled message format %d; possible "
@@ -3643,7 +3683,7 @@ void ChunkedQueryResponse::handleChunk(const uint8_t* chunk, int32_t chunkLen,
"mismatch");
}
- m_msg.readSecureObjectPart(input, false, true, isLastChunkWithSecurity);
+ m_msg.readSecureObjectPart(*input, false, true, isLastChunkWithSecurity);
}
void ChunkedQueryResponse::skipClass(DataInput& input) {
@@ -3668,19 +3708,20 @@ void ChunkedFunctionExecutionResponse::reset() {
}
void ChunkedFunctionExecutionResponse::handleChunk(
- const uint8_t* chunk, int32_t chunkLen, uint8_t isLastChunkWithSecurity) {
+ const uint8_t* chunk, int32_t chunkLen, uint8_t isLastChunkWithSecurity,
+ const Cache* cache) {
LOGDEBUG("ChunkedFunctionExecutionResponse::handleChunk");
- DataInput input(chunk, chunkLen);
- input.setPoolName(m_msg.getPoolName());
+ auto input = cache->createDataInput(chunk, chunkLen);
+ input->setPoolName(m_msg.getPoolName());
uint32_t partLen;
int8_t arrayType;
if ((arrayType = static_cast<TcrMessageHelper::ChunkObjectType>(
TcrMessageHelper::readChunkPartHeader(
- m_msg, input, "ChunkedFunctionExecutionResponse", partLen,
+ m_msg, *input, "ChunkedFunctionExecutionResponse", partLen,
isLastChunkWithSecurity))) == TcrMessageHelper::EXCEPTION) {
// encountered an exception part, so return without reading more
- m_msg.readSecureObjectPart(input, false, true, isLastChunkWithSecurity);
+ m_msg.readSecureObjectPart(*input, false, true, isLastChunkWithSecurity);
return;
}
@@ -3692,20 +3733,20 @@ void ChunkedFunctionExecutionResponse::handleChunk(
TcrMessageHelper::NULL_OBJECT) {
LOGDEBUG("ChunkedFunctionExecutionResponse::handleChunk nullptr object");
// m_functionExecutionResults->push_back(nullptr);
- m_msg.readSecureObjectPart(input, false, true, isLastChunkWithSecurity);
+ m_msg.readSecureObjectPart(*input, false, true, isLastChunkWithSecurity);
return;
}
int32_t len;
int startLen =
- input.getBytesRead() -
+ input->getBytesRead() -
1; // from here need to look value part + memberid AND -1 for array type
- input.readArrayLen(&len);
+ input->readArrayLen(&len);
// read a byte to determine whether to read exception part for sendException
// or read objects.
uint8_t partType;
- input.read(&partType);
+ input->read(&partType);
bool isExceptionPart = false;
// See If partType is JavaSerializable
const int CHUNK_HDR_LEN = 5;
@@ -3718,7 +3759,7 @@ void ChunkedFunctionExecutionResponse::handleChunk(
if (partType == GeodeTypeIdsImpl::JavaSerializable) {
isExceptionPart = true;
// reset the input.
- input.reset();
+ input->reset();
if (((isLastChunkWithSecurity & 0x02) &&
(chunkLen - static_cast<int32_t>(partLen) <=
@@ -3726,54 +3767,54 @@ void ChunkedFunctionExecutionResponse::handleChunk(
(((isLastChunkWithSecurity & 0x02) == 0) &&
(chunkLen - static_cast<int32_t>(partLen) <= CHUNK_HDR_LEN))) {
readPart = false;
- input.readInt(&partLen);
- input.advanceCursor(1); // skip isObject byte
- input.advanceCursor(partLen);
+ input->readInt(&partLen);
+ input->advanceCursor(1); // skip isObject byte
+ input->advanceCursor(partLen);
} else {
// skip first part i.e JavaSerializable.
- TcrMessageHelper::skipParts(m_msg, input, 1);
+ TcrMessageHelper::skipParts(m_msg, *input, 1);
// read the second part which is string in usual manner, first its length.
- input.readInt(&partLen);
+ input->readInt(&partLen);
int8_t isObject;
// then isObject byte
- input.read(&isObject);
+ input->read(&isObject);
- startLen = input.getBytesRead(); // reset from here need to look value
+ startLen = input->getBytesRead(); // reset from here need to look value
// part + memberid AND -1 for array type
// Since it is contained as a part of other results, read arrayType which
// is arrayList = 65.
- input.read(&arrayType);
+ input->read(&arrayType);
// then its len which is 2
- input.readArrayLen(&len);
+ input->readArrayLen(&len);
}
} else {
// rewind cursor by 1 to what we had read a byte to determine whether to
// read exception part or read objects.
- input.rewindCursor(1);
+ input->rewindCursor(1);
}
// Read either object or exception string from sendException.
SerializablePtr value;
// CacheablePtr memberId;
if (readPart) {
- input.readObject(value);
+ input->readObject(value);
// TODO: track this memberId for PrFxHa
- // input.readObject(memberId);
- int objectlen = input.getBytesRead() - startLen;
+ // input->readObject(memberId);
+ int objectlen = input->getBytesRead() - startLen;
int memberIdLen = partLen - objectlen;
- input.advanceCursor(memberIdLen);
+ input->advanceCursor(memberIdLen);
LOGDEBUG("function partlen = %d , objectlen = %d, memberidlen = %d ",
partLen, objectlen, memberIdLen);
- LOGDEBUG("function input.getBytesRemaining() = %d ",
- input.getBytesRemaining());
+ LOGDEBUG("function input->getBytesRemaining() = %d ",
+ input->getBytesRemaining());
// is there any way to assert it, as after that we need to read security
// header
- /*if(input.getBytesRemaining() != 0) {
+ /*if(input->getBytesRemaining() != 0) {
LOGERROR("Function response not read all bytes");
throw IllegalStateException("Function Execution didn't read all bytes");
}*/
@@ -3796,7 +3837,7 @@ void ChunkedFunctionExecutionResponse::handleChunk(
}
}
- m_msg.readSecureObjectPart(input, false, true, isLastChunkWithSecurity);
+ m_msg.readSecureObjectPart(*input, false, true, isLastChunkWithSecurity);
// m_functionExecutionResults->push_back(value);
}
@@ -3809,16 +3850,17 @@ void ChunkedGetAllResponse::reset() {
// process a GET_ALL response chunk
void ChunkedGetAllResponse::handleChunk(const uint8_t* chunk, int32_t chunkLen,
- uint8_t isLastChunkWithSecurity) {
- DataInput input(chunk, chunkLen);
- input.setPoolName(m_msg.getPoolName());
+ uint8_t isLastChunkWithSecurity,
+ const Cache* cache) {
+ auto input = cache->createDataInput(chunk, chunkLen);
+ input->setPoolName(m_msg.getPoolName());
uint32_t partLen;
if (TcrMessageHelper::readChunkPartHeader(
- m_msg, input, GeodeTypeIdsImpl::FixedIDByte,
+ m_msg, *input, GeodeTypeIdsImpl::FixedIDByte,
GeodeTypeIdsImpl::VersionedObjectPartList, "ChunkedGetAllResponse",
partLen, isLastChunkWithSecurity) != TcrMessageHelper::OBJECT) {
// encountered an exception part, so return without reading more
- m_msg.readSecureObjectPart(input, false, true, isLastChunkWithSecurity);
+ m_msg.readSecureObjectPart(*input, false, true, isLastChunkWithSecurity);
return;
}
@@ -3827,9 +3869,9 @@ void ChunkedGetAllResponse::handleChunk(const uint8_t* chunk, int32_t chunkLen,
&m_trackerMap, m_destroyTracker, m_addToLocalCache, m_dsmemId,
m_responseLock);
- objectList.fromData(input);
+ objectList.fromData(*input);
- m_msg.readSecureObjectPart(input, false, true, isLastChunkWithSecurity);
+ m_msg.readSecureObjectPart(*input, false, true, isLastChunkWithSecurity);
}
void ChunkedGetAllResponse::add(const ChunkedGetAllResponse* other) {
@@ -3862,20 +3904,21 @@ void ChunkedPutAllResponse::reset() {
// process a PUT_ALL response chunk
void ChunkedPutAllResponse::handleChunk(const uint8_t* chunk, int32_t chunkLen,
- uint8_t isLastChunkWithSecurity) {
- DataInput input(chunk, chunkLen);
- input.setPoolName(m_msg.getPoolName());
+ uint8_t isLastChunkWithSecurity,
+ const Cache* cache) {
+ auto input = cache->createDataInput(chunk, chunkLen);
+ input->setPoolName(m_msg.getPoolName());
uint32_t partLen;
int8_t chunkType;
if ((chunkType = (TcrMessageHelper::ChunkObjectType)
TcrMessageHelper::readChunkPartHeader(
- m_msg, input, GeodeTypeIdsImpl::FixedIDByte,
+ m_msg, *input, GeodeTypeIdsImpl::FixedIDByte,
GeodeTypeIdsImpl::VersionedObjectPartList,
"ChunkedPutAllResponse", partLen, isLastChunkWithSecurity)) ==
TcrMessageHelper::NULL_OBJECT) {
LOGDEBUG("ChunkedPutAllResponse::handleChunk nullptr object");
// No issues it will be empty in case of disabled caching.
- m_msg.readSecureObjectPart(input, false, true, isLastChunkWithSecurity);
+ m_msg.readSecureObjectPart(*input, false, true, isLastChunkWithSecurity);
return;
}
@@ -3884,21 +3927,22 @@ void ChunkedPutAllResponse::handleChunk(const uint8_t* chunk, int32_t chunkLen,
LOGDEBUG("ChunkedPutAllResponse::handleChunk object");
ACE_Recursive_Thread_Mutex responseLock;
auto vcObjPart = std::make_shared<VersionedCacheableObjectPartList>(
+ dynamic_cast<ThinClientRegion*>(m_region.get()),
m_msg.getChunkedResultHandler()->getEndpointMemId(), responseLock);
- vcObjPart->fromData(input);
+ vcObjPart->fromData(*input);
m_list->addAll(vcObjPart);
- m_msg.readSecureObjectPart(input, false, true, isLastChunkWithSecurity);
+ m_msg.readSecureObjectPart(*input, false, true, isLastChunkWithSecurity);
} else {
LOGDEBUG("ChunkedPutAllResponse::handleChunk BYTES PART");
int8_t byte0;
- input.read(&byte0);
+ input->read(&byte0);
LOGDEBUG("ChunkedPutAllResponse::handleChunk single-hop bytes byte0 = %d ",
byte0);
int8_t byte1;
- input.read(&byte1);
- m_msg.readSecureObjectPart(input, false, true, isLastChunkWithSecurity);
+ input->read(&byte1);
+ m_msg.readSecureObjectPart(*input, false, true, isLastChunkWithSecurity);
- PoolPtr pool = PoolManager::find(m_msg.getPoolName());
+ PoolPtr pool = cache->getPoolManager().find(m_msg.getPoolName());
if (pool != nullptr && !pool->isDestroyed() &&
pool->getPRSingleHopEnabled()) {
ThinClientPoolDM* poolDM = dynamic_cast<ThinClientPoolDM*>(pool.get());
@@ -3924,20 +3968,21 @@ void ChunkedRemoveAllResponse::reset() {
// process a REMOVE_ALL response chunk
void ChunkedRemoveAllResponse::handleChunk(const uint8_t* chunk,
int32_t chunkLen,
- uint8_t isLastChunkWithSecurity) {
- DataInput input(chunk, chunkLen);
- input.setPoolName(m_msg.getPoolName());
+ uint8_t isLastChunkWithSecurity,
+ const Cache* cache) {
+ auto input = cache->createDataInput(chunk, chunkLen);
+ input->setPoolName(m_msg.getPoolName());
uint32_t partLen;
int8_t chunkType;
if ((chunkType = (TcrMessageHelper::ChunkObjectType)
TcrMessageHelper::readChunkPartHeader(
- m_msg, input, GeodeTypeIdsImpl::FixedIDByte,
+ m_msg, *input, GeodeTypeIdsImpl::FixedIDByte,
GeodeTypeIdsImpl::VersionedObjectPartList,
"ChunkedRemoveAllResponse", partLen, isLastChunkWithSecurity)) ==
TcrMessageHelper::NULL_OBJECT) {
LOGDEBUG("ChunkedRemoveAllResponse::handleChunk nullptr object");
// No issues it will be empty in case of disabled caching.
- m_msg.readSecureObjectPart(input, false, true, isLastChunkWithSecurity);
+ m_msg.readSecureObjectPart(*input, false, true, isLastChunkWithSecurity);
return;
}
@@ -3946,22 +3991,23 @@ void ChunkedRemoveAllResponse::handleChunk(const uint8_t* chunk,
LOGDEBUG("ChunkedRemoveAllResponse::handleChunk object");
ACE_Recursive_Thread_Mutex responseLock;
auto vcObjPart = std::make_shared<VersionedCacheableObjectPartList>(
+ dynamic_cast<ThinClientRegion*>(m_region.get()),
m_msg.getChunkedResultHandler()->getEndpointMemId(), responseLock);
- vcObjPart->fromData(input);
+ vcObjPart->fromData(*input);
m_list->addAll(vcObjPart);
- m_msg.readSecureObjectPart(input, false, true, isLastChunkWithSecurity);
+ m_msg.readSecureObjectPart(*input, false, true, isLastChunkWithSecurity);
} else {
LOGDEBUG("ChunkedRemoveAllResponse::handleChunk BYTES PART");
int8_t byte0;
- input.read(&byte0);
+ input->read(&byte0);
LOGDEBUG(
"ChunkedRemoveAllResponse::handleChunk single-hop bytes byte0 = %d ",
byte0);
int8_t byte1;
- input.read(&byte1);
- m_msg.readSecureObjectPart(input, false, true, isLastChunkWithSecurity);
+ input->read(&byte1);
+ m_msg.readSecureObjectPart(*input, false, true, isLastChunkWithSecurity);
- PoolPtr pool = PoolManager::find(m_msg.getPoolName());
+ PoolPtr pool = cache->getPoolManager().find(m_msg.getPoolName());
if (pool != nullptr && !pool->isDestroyed() &&
pool->getPRSingleHopEnabled()) {
ThinClientPoolDM* poolDM = dynamic_cast<ThinClientPoolDM*>(pool.get());
@@ -3985,17 +4031,19 @@ void ChunkedDurableCQListResponse::reset() {
}
// handles the chunk response for GETDURABLECQS_MSG_TYPE
-void ChunkedDurableCQListResponse::handleChunk(
- const uint8_t* chunk, int32_t chunkLen, uint8_t isLastChunkWithSecurity) {
- DataInput input(chunk, chunkLen);
- input.setPoolName(m_msg.getPoolName());
+void ChunkedDurableCQListResponse::handleChunk(const uint8_t* chunk,
+ int32_t chunkLen,
+ uint8_t isLastChunkWithSecurity,
+ const Cache* cache) {
+ auto input = cache->createDataInput(chunk, chunkLen);
+ input->setPoolName(m_msg.getPoolName());
// read part length
uint32_t partLen;
- input.readInt(&partLen);
+ input->readInt(&partLen);
bool isObj;
- input.readBoolean(&isObj);
+ input->readBoolean(&isObj);
if (!isObj) {
// we're currently always expecting an object
@@ -4006,16 +4054,16 @@ void ChunkedDurableCQListResponse::handleChunk(
throw MessageException(exMsg);
}
- input.advanceCursor(1); // skip the CacheableArrayList type ID byte
+ input->advanceCursor(1); // skip the CacheableArrayList type ID byte
int8_t stringParts;
- input.read(&stringParts); // read the number of strings in the message this
+ input->read(&stringParts); // read the number of strings in the message this
// is one byte
CacheableStringPtr strTemp;
for (int i = 0; i < stringParts; i++) {
- input.readObject(strTemp);
+ input->readObject(strTemp);
m_resultList->push_back(strTemp);
}
}
http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/ThinClientRegion.hpp
----------------------------------------------------------------------
diff --git a/src/cppcache/src/ThinClientRegion.hpp b/src/cppcache/src/ThinClientRegion.hpp
index e52845e..0ed8d9d 100644
--- a/src/cppcache/src/ThinClientRegion.hpp
+++ b/src/cppcache/src/ThinClientRegion.hpp
@@ -366,7 +366,7 @@ class ChunkedInterestResponse : public TcrChunkedResult {
}
virtual void handleChunk(const uint8_t* chunk, int32_t chunkLen,
- uint8_t isLastChunkWithSecurity);
+ uint8_t isLastChunkWithSecurity, const Cache* cache);
virtual void reset();
};
@@ -404,7 +404,7 @@ class ChunkedQueryResponse : public TcrChunkedResult {
}
virtual void handleChunk(const uint8_t* chunk, int32_t chunkLen,
- uint8_t isLastChunkWithSecurity);
+ uint8_t isLastChunkWithSecurity, const Cache* cache);
virtual void reset();
void readObjectPartList(DataInput& input, bool isResultSet);
@@ -455,7 +455,7 @@ class ChunkedFunctionExecutionResponse : public TcrChunkedResult {
inline bool getResult() const { return m_getResult; }
virtual void handleChunk(const uint8_t* chunk, int32_t chunkLen,
- uint8_t isLastChunkWithSecurity);
+ uint8_t isLastChunkWithSecurity, const Cache* cache);
virtual void reset();
};
typedef std::shared_ptr<ChunkedFunctionExecutionResponse>
@@ -506,7 +506,7 @@ class ChunkedGetAllResponse : public TcrChunkedResult {
m_responseLock(responseLock) {}
virtual void handleChunk(const uint8_t* chunk, int32_t chunkLen,
- uint8_t isLastChunkWithSecurity);
+ uint8_t isLastChunkWithSecurity, const Cache* cache);
virtual void reset();
void add(const ChunkedGetAllResponse* other);
@@ -544,7 +544,7 @@ class ChunkedPutAllResponse : public TcrChunkedResult {
m_list(list) {}
virtual void handleChunk(const uint8_t* chunk, int32_t chunkLen,
- uint8_t isLastChunkWithSecurity);
+ uint8_t isLastChunkWithSecurity, const Cache* cache);
virtual void reset();
VersionedCacheableObjectPartListPtr getList() { return m_list; }
ACE_Recursive_Thread_Mutex& getResponseLock() { return m_responseLock; }
@@ -576,7 +576,7 @@ class ChunkedRemoveAllResponse : public TcrChunkedResult {
m_list(list) {}
virtual void handleChunk(const uint8_t* chunk, int32_t chunkLen,
- uint8_t isLastChunkWithSecurity);
+ uint8_t isLastChunkWithSecurity, const Cache* cache);
virtual void reset();
VersionedCacheableObjectPartListPtr getList() { return m_list; }
ACE_Recursive_Thread_Mutex& getResponseLock() { return m_responseLock; }
@@ -609,7 +609,7 @@ class ChunkedKeySetResponse : public TcrChunkedResult {
m_resultKeys(resultKeys) {}
virtual void handleChunk(const uint8_t* chunk, int32_t chunkLen,
- uint8_t isLastChunkWithSecurity);
+ uint8_t isLastChunkWithSecurity, const Cache* cache);
virtual void reset();
};
@@ -632,7 +632,7 @@ class ChunkedDurableCQListResponse : public TcrChunkedResult {
inline CacheableArrayListPtr getResults() { return m_resultList; }
virtual void handleChunk(const uint8_t* chunk, int32_t chunkLen,
- uint8_t isLastChunkWithSecurity);
+ uint8_t isLastChunkWithSecurity, const Cache* cache);
virtual void reset();
};
http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/ThreadPool.cpp
----------------------------------------------------------------------
diff --git a/src/cppcache/src/ThreadPool.cpp b/src/cppcache/src/ThreadPool.cpp
index e66a25a..1efd8f9 100644
--- a/src/cppcache/src/ThreadPool.cpp
+++ b/src/cppcache/src/ThreadPool.cpp
@@ -25,6 +25,7 @@
#include <geode/DistributedSystem.hpp>
#include <geode/SystemProperties.hpp>
#include "DistributedSystemImpl.hpp"
+#include "CacheImpl.hpp"
using namespace apache::geode::client;
ThreadPoolWorker::ThreadPoolWorker(IThreadPool* manager)
@@ -67,10 +68,11 @@ int ThreadPoolWorker::shutDown(void) {
ACE_thread_t ThreadPoolWorker::threadId(void) { return threadId_; }
-ThreadPool::ThreadPool()
- : shutdown_(0), workersLock_(), workersCond_(workersLock_) {
- SystemProperties* sysProp = DistributedSystem::getSystemProperties();
- poolSize_ = sysProp->threadPoolSize();
+ThreadPool::ThreadPool(uint32_t threadPoolSize)
+ : shutdown_(0),
+ workersLock_(),
+ workersCond_(workersLock_),
+ poolSize_(threadPoolSize) {
activate();
}
http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/ThreadPool.hpp
----------------------------------------------------------------------
diff --git a/src/cppcache/src/ThreadPool.hpp b/src/cppcache/src/ThreadPool.hpp
index ce8257f..91812b2 100644
--- a/src/cppcache/src/ThreadPool.hpp
+++ b/src/cppcache/src/ThreadPool.hpp
@@ -32,9 +32,9 @@
#include <ace/Activation_Queue.h>
#include <ace/Condition_T.h>
#include <ace/Singleton.h>
-#include <ace/Recursive_Thread_Mutex.h>
#include <ace/Guard_T.h>
-
+#include <mutex>
+#include <condition_variable>
namespace apache {
namespace geode {
namespace client {
@@ -42,43 +42,35 @@ namespace client {
template <class T>
class PooledWork : public ACE_Method_Request {
private:
- // ACE_Future<T> result_;
T m_retVal;
- ACE_Recursive_Thread_Mutex m_mutex;
- ACE_Condition<ACE_Recursive_Thread_Mutex> m_cond;
+ std::recursive_mutex m_mutex;
+ std::condition_variable_any m_cond;
bool m_done;
public:
- PooledWork() : m_mutex(), m_cond(m_mutex), m_done(false) {}
+ PooledWork() : m_mutex(), m_cond(), m_done(false) {}
virtual ~PooledWork() {}
virtual int call(void) {
T res = execute();
- ACE_Guard<ACE_Recursive_Thread_Mutex> sync(m_mutex);
+ std::lock_guard<decltype(m_mutex)> lock(m_mutex);
m_retVal = res;
m_done = true;
- m_cond.broadcast();
- // result_.set(res);
- return 0;
- }
+ m_cond.notify_all();
- /*
- void attach(ACE_Future_Observer<T> *cb) {
- result_.attach(cb);
+ return 0;
}
- */
T getResult(void) {
- ACE_Guard<ACE_Recursive_Thread_Mutex> sync(m_mutex);
+ std::unique_lock<decltype(m_mutex)> lock(m_mutex);
while (!m_done) {
- m_cond.wait();
+ m_cond.wait(lock, [this] { return m_done; });
}
- // T res;
- // result_.get(res);
+
return m_retVal;
}
@@ -131,15 +123,14 @@ class ThreadPool : public ACE_Task_Base, IThreadPool {
friend class ACE_Singleton<ThreadPool, ACE_Recursive_Thread_Mutex>;
public:
+ ThreadPool(uint32_t threadPoolSize);
+ virtual ~ThreadPool();
int perform(ACE_Method_Request* req);
int svc(void);
int shutDown(void);
virtual int returnToWork(ThreadPoolWorker* worker);
private:
- ThreadPool();
- virtual ~ThreadPool();
-
ThreadPoolWorker* chooseWorker(void);
int createWorkerPool(void);
int done(void);
@@ -154,8 +145,6 @@ class ThreadPool : public ACE_Task_Base, IThreadPool {
ACE_Activation_Queue queue_;
static const char* NC_Pool_Thread;
};
-
-typedef ACE_Singleton<ThreadPool, ACE_Recursive_Thread_Mutex> TPSingleton;
} // namespace client
} // namespace geode
} // namespace apache
http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/TombstoneExpiryHandler.cpp
----------------------------------------------------------------------
diff --git a/src/cppcache/src/TombstoneExpiryHandler.cpp b/src/cppcache/src/TombstoneExpiryHandler.cpp
index c5f5512..399131e 100644
--- a/src/cppcache/src/TombstoneExpiryHandler.cpp
+++ b/src/cppcache/src/TombstoneExpiryHandler.cpp
@@ -31,10 +31,12 @@ using namespace apache::geode::client;
TombstoneExpiryHandler::TombstoneExpiryHandler(TombstoneEntryPtr entryPtr,
TombstoneList* tombstoneList,
- uint32_t duration)
+ uint32_t duration,
+ CacheImpl* cacheImpl)
: m_entryPtr(entryPtr),
m_duration(duration),
- m_tombstoneList(tombstoneList) {}
+ m_tombstoneList(tombstoneList),
+ m_cacheImpl(cacheImpl) {}
int TombstoneExpiryHandler::handle_timeout(const ACE_Time_Value& current_time,
const void* arg) {
@@ -59,7 +61,7 @@ int TombstoneExpiryHandler::handle_timeout(const ACE_Time_Value& current_time,
"Resetting expiry task %d secs later for key "
"[%s]",
-sec / 1000 + 1, Utils::getCacheableKeyString(key)->asChar());
- CacheImpl::expiryTaskManager->resetTask(
+ m_cacheImpl->getExpiryTaskManager().resetTask(
static_cast<long>(m_entryPtr->getExpiryTaskId()),
uint32_t(-sec / 1000 + 1));
return 0;
@@ -71,7 +73,8 @@ int TombstoneExpiryHandler::handle_timeout(const ACE_Time_Value& current_time,
Utils::getCacheableKeyString(key)->asChar());
// we now delete the handler in GF_Timer_Heap_ImmediateReset_T
// and always return success.
- CacheImpl::expiryTaskManager->resetTask(static_cast<long>(expiryTaskId), 0);
+ m_cacheImpl->getExpiryTaskManager().resetTask(static_cast<long>(expiryTaskId),
+ 0);
return 0;
}
http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/TombstoneExpiryHandler.hpp
----------------------------------------------------------------------
diff --git a/src/cppcache/src/TombstoneExpiryHandler.hpp b/src/cppcache/src/TombstoneExpiryHandler.hpp
index 233b4b5..e4ef730 100644
--- a/src/cppcache/src/TombstoneExpiryHandler.hpp
+++ b/src/cppcache/src/TombstoneExpiryHandler.hpp
@@ -43,10 +43,11 @@ namespace client {
class CPPCACHE_EXPORT TombstoneExpiryHandler : public ACE_Event_Handler {
public:
/**
-* Constructor
-*/
+ * Constructor
+ */
TombstoneExpiryHandler(TombstoneEntryPtr entryPtr,
- TombstoneList* tombstoneList, uint32_t duration);
+ TombstoneList* tombstoneList, uint32_t duration,
+ CacheImpl* cacheImpl);
/** This task object will be registered with the Timer Queue.
* When the timer expires the handle_timeout is invoked.
@@ -65,6 +66,7 @@ class CPPCACHE_EXPORT TombstoneExpiryHandler : public ACE_Event_Handler {
// Duration after which the task should be reset in case of
// modification.
uint32_t m_duration;
+ CacheImpl* m_cacheImpl;
// perform the actual expiration action
void DoTheExpirationAction(const CacheableKeyPtr& key);
http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/TombstoneList.cpp
----------------------------------------------------------------------
diff --git a/src/cppcache/src/TombstoneList.cpp b/src/cppcache/src/TombstoneList.cpp
index 31ab22a..d32243a 100644
--- a/src/cppcache/src/TombstoneList.cpp
+++ b/src/cppcache/src/TombstoneList.cpp
@@ -39,46 +39,39 @@ long TombstoneList::getExpiryTask(TombstoneExpiryHandler** handler) {
// This function is not guarded as all functions of this class are called from
// MapSegment
// read TombstoneTImeout from systemProperties.
- uint32_t duration =
- DistributedSystem::getSystemProperties()->tombstoneTimeoutInMSec() / 1000;
+ uint32_t duration = m_cacheImpl->getDistributedSystem()
+ .getSystemProperties()
+ .tombstoneTimeoutInMSec() /
+ 1000;
ACE_Time_Value currTime(ACE_OS::gettimeofday());
auto tombstoneEntryPtr = std::make_shared<TombstoneEntry>(
nullptr, static_cast<int64_t>(currTime.get_msec()));
- *handler = new TombstoneExpiryHandler(tombstoneEntryPtr, this, duration);
+ *handler = new TombstoneExpiryHandler(tombstoneEntryPtr, this, duration,
+ m_cacheImpl);
tombstoneEntryPtr->setHandler(*handler);
- long id =
- CacheImpl::expiryTaskManager->scheduleExpiryTask(*handler, duration, 0);
+ long id = m_cacheImpl->getExpiryTaskManager().scheduleExpiryTask(*handler,
+ duration, 0);
return id;
}
-void TombstoneList::add(RegionInternal* rptr, const MapEntryImplPtr& entry,
+void TombstoneList::add(const MapEntryImplPtr& entry,
TombstoneExpiryHandler* handler, long taskid) {
// This function is not guarded as all functions of this class are called from
// MapSegment
// read TombstoneTImeout from systemProperties.
- // uint32_t duration =
- // DistributedSystem::getSystemProperties()->tombstoneTimeoutInMSec()/1000;
ACE_Time_Value currTime(ACE_OS::gettimeofday());
auto tombstoneEntryPtr = std::make_shared<TombstoneEntry>(
entry, static_cast<int64_t>(currTime.get_msec()));
- // TombstoneExpiryHandler* handler = new
- // TombstoneExpiryHandler(tombstoneEntryPtr, this, duration);
handler->setTombstoneEntry(tombstoneEntryPtr);
tombstoneEntryPtr->setHandler(handler);
- // long id = CacheImpl::expiryTaskManager->scheduleExpiryTask(
- // handler, duration, 0);
CacheableKeyPtr key;
entry->getKeyI(key);
- /*if (Log::finestEnabled()) {
- LOGFINEST("tombstone expiry for key [%s], task id = %d, "
- "duration = %d",
- Utils::getCacheableKeyString(key)->asChar(), id, duration);
- }*/
+
tombstoneEntryPtr->setExpiryTaskId(taskid);
m_tombstoneMap[key] = tombstoneEntryPtr;
- rptr->getCacheImpl()->m_cacheStats->incTombstoneCount();
+ m_cacheImpl->getCachePerfStats().incTombstoneCount();
int32_t tombstonesize = key->objectSize() + SIZEOF_TOMBSTONEOVERHEAD;
- rptr->getCacheImpl()->m_cacheStats->incTombstoneSize(tombstonesize);
+ m_cacheImpl->getCachePerfStats().incTombstoneSize(tombstonesize);
}
// Reaps the tombstones which have been gc'ed on server.
@@ -137,36 +130,34 @@ bool TombstoneList::exists(const CacheableKeyPtr& key) const {
}
void TombstoneList::eraseEntryFromTombstoneList(const CacheableKeyPtr& key,
- RegionInternal* region,
bool cancelTask) {
// This function is not guarded as all functions of this class are called from
// MapSegment
if (exists(key)) {
if (cancelTask) {
- CacheImpl::expiryTaskManager->cancelTask(
+ m_cacheImpl->getExpiryTaskManager().cancelTask(
static_cast<long>(m_tombstoneMap[key]->getExpiryTaskId()));
delete m_tombstoneMap[key]->getHandler();
}
- region->getCacheImpl()->m_cacheStats->decTombstoneCount();
+ m_cacheImpl->getCachePerfStats().decTombstoneCount();
int32_t tombstonesize = key->objectSize() + SIZEOF_TOMBSTONEOVERHEAD;
- region->getCacheImpl()->m_cacheStats->decTombstoneSize(tombstonesize);
+ m_cacheImpl->getCachePerfStats().decTombstoneSize(tombstonesize);
m_tombstoneMap.erase(key);
}
}
long TombstoneList::eraseEntryFromTombstoneListWithoutCancelTask(
- const CacheableKeyPtr& key, RegionInternal* region,
- TombstoneExpiryHandler*& handler) {
+ const CacheableKeyPtr& key, TombstoneExpiryHandler*& handler) {
// This function is not guarded as all functions of this class are called from
// MapSegment
long taskid = -1;
if (exists(key)) {
taskid = static_cast<long>(m_tombstoneMap[key]->getExpiryTaskId());
handler = m_tombstoneMap[key]->getHandler();
- region->getCacheImpl()->m_cacheStats->decTombstoneCount();
+ m_cacheImpl->getCachePerfStats().decTombstoneCount();
int32_t tombstonesize = key->objectSize() + SIZEOF_TOMBSTONEOVERHEAD;
- region->getCacheImpl()->m_cacheStats->decTombstoneSize(tombstonesize);
+ m_cacheImpl->getCachePerfStats().decTombstoneSize(tombstonesize);
m_tombstoneMap.erase(key);
}
return taskid;
@@ -175,8 +166,9 @@ long TombstoneList::eraseEntryFromTombstoneListWithoutCancelTask(
void TombstoneList::cleanUp() {
// This function is not guarded as all functions of this class are called from
// MapSegment
+ auto& expiryTaskManager = m_cacheImpl->getExpiryTaskManager();
for (const auto& queIter : m_tombstoneMap) {
- CacheImpl::expiryTaskManager->cancelTask(queIter.second->getExpiryTaskId());
+ expiryTaskManager.cancelTask(queIter.second->getExpiryTaskId());
delete queIter.second->getHandler();
}
}
http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/TombstoneList.hpp
----------------------------------------------------------------------
diff --git a/src/cppcache/src/TombstoneList.hpp b/src/cppcache/src/TombstoneList.hpp
index 66260c1..3178b29 100644
--- a/src/cppcache/src/TombstoneList.hpp
+++ b/src/cppcache/src/TombstoneList.hpp
@@ -72,10 +72,11 @@ typedef std::shared_ptr<TombstoneEntry> TombstoneEntryPtr;
class TombstoneList {
public:
- TombstoneList(MapSegment* mapSegment) { m_mapSegment = mapSegment; }
+ TombstoneList(MapSegment* mapSegment, CacheImpl* cacheImpl)
+ : m_mapSegment(mapSegment), m_cacheImpl(cacheImpl) {}
virtual ~TombstoneList() { cleanUp(); }
- void add(RegionInternal* rptr, const MapEntryImplPtr& entry,
- TombstoneExpiryHandler* handler, long taskID);
+ void add(const MapEntryImplPtr& entry, TombstoneExpiryHandler* handler,
+ long taskID);
// Reaps the tombstones which have been gc'ed on server.
// A map that has identifier for ClientProxyMembershipID as key
@@ -84,11 +85,9 @@ class TombstoneList {
void reapTombstones(std::map<uint16_t, int64_t>& gcVersions);
void reapTombstones(CacheableHashSetPtr removedKeys);
void eraseEntryFromTombstoneList(const CacheableKeyPtr& key,
- RegionInternal* region,
bool cancelTask = true);
long eraseEntryFromTombstoneListWithoutCancelTask(
- const CacheableKeyPtr& key, RegionInternal* region,
- TombstoneExpiryHandler*& handler);
+ const CacheableKeyPtr& key, TombstoneExpiryHandler*& handler);
void cleanUp();
long getExpiryTask(TombstoneExpiryHandler** handler);
bool exists(const CacheableKeyPtr& key) const;
@@ -103,6 +102,7 @@ class TombstoneList {
TombstoneMap m_tombstoneMap;
ACE_Recursive_Thread_Mutex m_queueLock;
MapSegment* m_mapSegment;
+ CacheImpl* m_cacheImpl;
friend class TombstoneExpiryHandler;
};
typedef std::shared_ptr<TombstoneList> TombstoneListPtr;
http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/TypeRegistry.cpp
----------------------------------------------------------------------
diff --git a/src/cppcache/src/TypeRegistry.cpp b/src/cppcache/src/TypeRegistry.cpp
new file mode 100644
index 0000000..f9ee8cd
--- /dev/null
+++ b/src/cppcache/src/TypeRegistry.cpp
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "geode/TypeRegistry.hpp"
+#include "CacheRegionHelper.hpp"
+#include "CacheImpl.hpp"
+
+/**
+ TypeRegistry is the public facing wrapper for the serialization registry.
+**/
+
+TypeRegistry::TypeRegistry(Cache &cache) : m_cache(cache) {}
+
+void TypeRegistry::registerType(TypeFactoryMethod creationFunction) {
+ CacheRegionHelper::getCacheImpl(&m_cache)
+ ->getSerializationRegistry()
+ ->addType(creationFunction);
+}
+
+void TypeRegistry::registerPdxType(TypeFactoryMethodPdx creationFunction) {
+ CacheRegionHelper::getCacheImpl(&m_cache)
+ ->getSerializationRegistry()
+ ->addPdxType(creationFunction);
+}
+
+void TypeRegistry::registerPdxSerializer(PdxSerializerPtr pdxSerializer) {
+ CacheRegionHelper::getCacheImpl(&m_cache)
+ ->getSerializationRegistry()
+ ->setPdxSerializer(pdxSerializer);
+}
http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/Utils.cpp
----------------------------------------------------------------------
diff --git a/src/cppcache/src/Utils.cpp b/src/cppcache/src/Utils.cpp
index bc0605d..b64e3db 100644
--- a/src/cppcache/src/Utils.cpp
+++ b/src/cppcache/src/Utils.cpp
@@ -214,6 +214,17 @@ int32_t Utils::logWideString(char* buf, size_t maxLen, const wchar_t* wStr) {
}
}
+int64_t Utils::startStatOpTime() {
+ return std::chrono::duration_cast<std::chrono::nanoseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+}
+
+void Utils::updateStatOpTime(statistics::Statistics* m_regionStats, int32_t statId,
+ int64_t start) {
+ m_regionStats->incLong(statId, startStatOpTime() - start);
+}
+
} // namespace client
} // namespace geode
-} // namespace apache
+} // namespace apache
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/Utils.hpp
----------------------------------------------------------------------
diff --git a/src/cppcache/src/Utils.hpp b/src/cppcache/src/Utils.hpp
index 8dbe76d..3e62fb0 100644
--- a/src/cppcache/src/Utils.hpp
+++ b/src/cppcache/src/Utils.hpp
@@ -24,6 +24,11 @@
* @file
*/
+#include <typeinfo>
+#include <string>
+#include <unordered_set>
+#include <memory>
+
#include <geode/geode_globals.hpp>
#include <geode/geode_base.hpp>
#include <geode/ExceptionTypes.hpp>
@@ -141,18 +146,7 @@ class CPPCACHE_EXPORT Utils {
return CacheableString::create("(null)");
}
- inline static int64_t startStatOpTime() {
- if (DistributedSystem::getSystemProperties() != nullptr) {
- return (DistributedSystem::getSystemProperties()
- ->getEnableTimeStatistics())
- ? std::chrono::duration_cast<std::chrono::nanoseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count()
- : 0;
- } else {
- return 0;
- }
- }
+ static int64_t startStatOpTime();
// Check objectSize() implementation return value and log a warning at most
// once.
@@ -176,14 +170,8 @@ class CPPCACHE_EXPORT Utils {
return objectSize;
}
- inline static void updateStatOpTime(statistics::Statistics* m_regionStats,
- int32_t statId, int64_t start) {
- if (DistributedSystem::getSystemProperties() != nullptr) {
- if (DistributedSystem::getSystemProperties()->getEnableTimeStatistics()) {
- m_regionStats->incLong(statId, startStatOpTime() - start);
- }
- }
- }
+ static void updateStatOpTime(statistics::Statistics* m_regionStats,
+ int32_t statId, int64_t start);
static void parseEndpointNamesString(
const char* endpoints, std::unordered_set<std::string>& endpointNames);
http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/VersionStamp.cpp
----------------------------------------------------------------------
diff --git a/src/cppcache/src/VersionStamp.cpp b/src/cppcache/src/VersionStamp.cpp
index 6c7becc..8a95356 100644
--- a/src/cppcache/src/VersionStamp.cpp
+++ b/src/cppcache/src/VersionStamp.cpp
@@ -168,7 +168,7 @@ GfErrType VersionStamp::checkForConflict(const RegionInternal* region,
}
if (!apply) {
- region->getCacheImpl()->m_cacheStats->incConflatedEvents();
+ region->getCacheImpl()->getCachePerfStats().incConflatedEvents();
return GF_CACHE_CONCURRENT_MODIFICATION_EXCEPTION;
}
return GF_NOERR;
http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/VersionTag.cpp
----------------------------------------------------------------------
diff --git a/src/cppcache/src/VersionTag.cpp b/src/cppcache/src/VersionTag.cpp
index cfb133e..8d57e81 100644
--- a/src/cppcache/src/VersionTag.cpp
+++ b/src/cppcache/src/VersionTag.cpp
@@ -22,27 +22,21 @@
using namespace apache::geode::client;
-VersionTag::VersionTag() {
- m_bits = 0;
- m_entryVersion = 0;
- m_regionVersionHighBytes = 0;
- m_regionVersionLowBytes = 0;
- m_timeStamp = 0;
- m_internalMemId = 0;
- m_previousMemId = 0;
-}
+VersionTag::VersionTag(MemberListForVersionStamp& memberListForVersionStamp)
+ : VersionTag(0, 0, 0, 0, 0, memberListForVersionStamp) {}
VersionTag::VersionTag(int32_t entryVersion, int16_t regionVersionHighBytes,
int32_t regionVersionLowBytes, uint16_t internalMemId,
- uint16_t previousMemId) {
- m_bits = 0;
- m_entryVersion = entryVersion;
- m_regionVersionHighBytes = regionVersionHighBytes;
- m_regionVersionLowBytes = regionVersionLowBytes;
- m_timeStamp = 0;
- m_internalMemId = internalMemId;
- m_previousMemId = previousMemId;
-}
+ uint16_t previousMemId,
+ MemberListForVersionStamp& memberListForVersionStamp)
+ : m_bits(0),
+ m_entryVersion(entryVersion),
+ m_regionVersionHighBytes(regionVersionHighBytes),
+ m_regionVersionLowBytes(regionVersionLowBytes),
+ m_timeStamp(0),
+ m_internalMemId(internalMemId),
+ m_previousMemId(previousMemId),
+ m_memberListForVersionStamp(memberListForVersionStamp) {}
VersionTag::~VersionTag() {}
@@ -80,7 +74,11 @@ Serializable* VersionTag::fromData(DataInput& input) {
return this;
}
-Serializable* VersionTag::createDeserializable() { return new VersionTag(); }
+Serializable* VersionTag::createDeserializable(
+ MemberListForVersionStamp& memberListForVersionStamp) {
+ return new VersionTag(memberListForVersionStamp);
+}
+
void VersionTag::replaceNullMemberId(uint16_t memId) {
if (m_previousMemId == 0) {
m_previousMemId = memId;
@@ -90,24 +88,19 @@ void VersionTag::replaceNullMemberId(uint16_t memId) {
}
}
void VersionTag::readMembers(uint16_t flags, DataInput& input) {
- ClientProxyMembershipIDPtr previousMemId, internalMemId;
- MemberListForVersionStampPtr memberList =
- CacheImpl::getMemberListForVersionStamp();
if ((flags & HAS_MEMBER_ID) != 0) {
- internalMemId = ClientProxyMembershipIDPtr(new ClientProxyMembershipID());
-
+ auto internalMemId = std::make_shared<ClientProxyMembershipID>();
internalMemId->readEssentialData(input);
- m_internalMemId =
- memberList->add((DSMemberForVersionStampPtr)internalMemId);
+ m_internalMemId = m_memberListForVersionStamp.add(
+ (DSMemberForVersionStampPtr)internalMemId);
}
if ((flags & HAS_PREVIOUS_MEMBER_ID) != 0) {
if ((flags & DUPLICATE_MEMBER_IDS) != 0) {
m_previousMemId = m_internalMemId;
} else {
- previousMemId = ClientProxyMembershipIDPtr(new ClientProxyMembershipID());
+ auto previousMemId = std::make_shared<ClientProxyMembershipID>();
previousMemId->readEssentialData(input);
- m_previousMemId =
- memberList->add((DSMemberForVersionStampPtr)previousMemId);
+ m_previousMemId = m_memberListForVersionStamp.add(previousMemId);
}
}
}
http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/VersionTag.hpp
----------------------------------------------------------------------
diff --git a/src/cppcache/src/VersionTag.hpp b/src/cppcache/src/VersionTag.hpp
index d645784..0489009 100644
--- a/src/cppcache/src/VersionTag.hpp
+++ b/src/cppcache/src/VersionTag.hpp
@@ -40,6 +40,7 @@ class VersionTag : public Cacheable {
uint16_t m_internalMemId;
uint16_t m_previousMemId;
int64_t m_timeStamp;
+ MemberListForVersionStamp& m_memberListForVersionStamp;
static const uint8_t HAS_MEMBER_ID = 0x01;
static const uint8_t HAS_PREVIOUS_MEMBER_ID = 0x02;
@@ -53,7 +54,7 @@ class VersionTag : public Cacheable {
virtual void readMembers(uint16_t flags, DataInput& input);
public:
- VersionTag();
+ VersionTag(MemberListForVersionStamp& memberListForVersionStamp);
virtual ~VersionTag();
@@ -65,7 +66,8 @@ class VersionTag : public Cacheable {
virtual Serializable* fromData(DataInput& input);
- static Serializable* createDeserializable();
+ static Serializable* createDeserializable(
+ MemberListForVersionStamp& memberListForVersionStamp);
int32_t getEntryVersion() const { return m_entryVersion; }
int16_t getRegionVersionHighBytes() const { return m_regionVersionHighBytes; }
@@ -83,7 +85,8 @@ class VersionTag : public Cacheable {
*/
VersionTag(int32_t entryVersion, int16_t regionVersionHighBytes,
int32_t regionVersionLowBytes, uint16_t internalMemId,
- uint16_t previousMemId);
+ uint16_t previousMemId,
+ MemberListForVersionStamp& memberListForVersionStamp);
};
} // namespace client
} // namespace geode