You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bb...@apache.org on 2021/04/29 14:25:04 UTC
[geode-native] branch develop updated: GEODE-9210: Removes global
keep alive flag. (#791)
This is an automated email from the ASF dual-hosted git repository.
bbender pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode-native.git
The following commit(s) were added to refs/heads/develop by this push:
new 646695d GEODE-9210: Removes global keep alive flag. (#791)
646695d is described below
commit 646695d166087ba5c722f2f7c139cb98c19b8a08
Author: Jacob Barrett <ja...@vmware.com>
AuthorDate: Thu Apr 29 07:24:56 2021 -0700
GEODE-9210: Removes global keep alive flag. (#791)
* GEODE-9210: Removes global keep alive flag.
Puts keep alive and cache and pool level.
---
clicache/src/TypeRegistry.cpp | 4 +-
clicache/src/impl/PdxInstanceImpl.cpp | 1 +
clicache/src/impl/SafeConvert.hpp | 4 +-
cppcache/integration-test/CacheHelper.hpp | 1 +
cppcache/integration-test/QueryHelper.hpp | 1 +
cppcache/src/AdminRegion.cpp | 1 +
cppcache/src/Cache.cpp | 2 +-
cppcache/src/CacheImpl.cpp | 16 ++--
cppcache/src/CacheImpl.hpp | 3 +
cppcache/src/CqService.cpp | 7 +-
cppcache/src/DistributedSystemImpl.cpp | 2 +
cppcache/src/EntryExpiryTask.cpp | 1 +
cppcache/src/LocalRegion.cpp | 4 +
cppcache/src/PdxInstanceImpl.cpp | 4 +
cppcache/src/PdxTypeRegistry.cpp | 1 +
cppcache/src/RemoteQueryService.cpp | 1 +
cppcache/src/RemoteQueryService.hpp | 4 +-
cppcache/src/SerializableHelper.hpp | 6 ++
cppcache/src/TcrConnection.cpp | 37 ++------
cppcache/src/TcrConnection.hpp | 12 +--
cppcache/src/TcrMessage.cpp | 140 +++++++++---------------------
cppcache/src/TcrMessage.hpp | 62 ++++---------
cppcache/src/ThinClientPoolDM.cpp | 4 +-
cppcache/src/ThinClientPoolDM.hpp | 3 +
cppcache/src/ThinClientPoolHADM.cpp | 5 --
cppcache/src/ThinClientPoolHADM.hpp | 5 +-
cppcache/src/TombstoneExpiryTask.cpp | 3 +-
cppcache/src/TombstoneExpiryTask.hpp | 3 -
cppcache/src/TombstoneList.cpp | 2 +
cppcache/src/TypeRegistry.cpp | 2 +-
cppcache/test/TcrMessageTest.cpp | 25 +++++-
31 files changed, 150 insertions(+), 216 deletions(-)
diff --git a/clicache/src/TypeRegistry.cpp b/clicache/src/TypeRegistry.cpp
index 0845eb5..ea257b2 100644
--- a/clicache/src/TypeRegistry.cpp
+++ b/clicache/src/TypeRegistry.cpp
@@ -310,7 +310,7 @@ namespace Apache
}
generic<class TValue>
- TValue wrap(std::shared_ptr<native::DataSerializablePrimitive> dataSerializablePrimitive)
+ TValue wrap(std::shared_ptr<native::internal::DataSerializablePrimitive> dataSerializablePrimitive)
{
switch (dataSerializablePrimitive->getDsCode())
{
@@ -463,7 +463,7 @@ namespace Apache
return TValue();
}
- if (auto dataSerializablePrimitive = std::dynamic_pointer_cast<native::DataSerializablePrimitive>(val))
+ if (auto dataSerializablePrimitive = std::dynamic_pointer_cast<native::internal::DataSerializablePrimitive>(val))
{
switch (dataSerializablePrimitive->getDsCode())
{
diff --git a/clicache/src/impl/PdxInstanceImpl.cpp b/clicache/src/impl/PdxInstanceImpl.cpp
index 736e780..14cae16 100755
--- a/clicache/src/impl/PdxInstanceImpl.cpp
+++ b/clicache/src/impl/PdxInstanceImpl.cpp
@@ -20,6 +20,7 @@
#include <CacheRegionHelper.hpp>
#include <CacheImpl.hpp>
#include <CachePerfStats.hpp>
+#include <Utils.hpp>
#include "../end_native.hpp"
#include "PdxInstanceImpl.hpp"
diff --git a/clicache/src/impl/SafeConvert.hpp b/clicache/src/impl/SafeConvert.hpp
index a52d221..45e2090 100644
--- a/clicache/src/impl/SafeConvert.hpp
+++ b/clicache/src/impl/SafeConvert.hpp
@@ -74,7 +74,7 @@ namespace Apache
{
return managedPrimitive->ptr();
}
- else if (auto primitive = std::dynamic_pointer_cast<native::DataSerializablePrimitive>(serializableObject))
+ else if (auto primitive = std::dynamic_pointer_cast<native::internal::DataSerializablePrimitive>(serializableObject))
{
if (auto wrapperMethod = TypeRegistry::GetDataSerializablePrimitiveWrapperDelegateForDsCode(static_cast<int8_t>(primitive->getDsCode())))
{
@@ -174,7 +174,7 @@ namespace Apache
return (Client::ICacheableKey^)mg_obj->ptr( );
}
- if (auto primitive = std::dynamic_pointer_cast<native::DataSerializablePrimitive>(obj)) {
+ if (auto primitive = std::dynamic_pointer_cast<native::internal::DataSerializablePrimitive>(obj)) {
auto wrapperMethod = TypeRegistry::GetDataSerializablePrimitiveWrapperDelegateForDsCode(static_cast<int8_t>(primitive->getDsCode()));
if (wrapperMethod != nullptr)
{
diff --git a/cppcache/integration-test/CacheHelper.hpp b/cppcache/integration-test/CacheHelper.hpp
index 1c4646b..963c9a7 100644
--- a/cppcache/integration-test/CacheHelper.hpp
+++ b/cppcache/integration-test/CacheHelper.hpp
@@ -32,6 +32,7 @@
#include <geode/PoolManager.hpp>
#include "TimeBomb.hpp"
+#include "SerializationRegistry.hpp"
#include "DistributedSystemImpl.hpp"
#include "Utils.hpp"
#include "config.h"
diff --git a/cppcache/integration-test/QueryHelper.hpp b/cppcache/integration-test/QueryHelper.hpp
index 27a8b83..11ce267 100644
--- a/cppcache/integration-test/QueryHelper.hpp
+++ b/cppcache/integration-test/QueryHelper.hpp
@@ -32,6 +32,7 @@
#include "CacheImpl.hpp"
#include "CacheRegionHelper.hpp"
#include "DistributedSystemImpl.hpp"
+#include "SerializationRegistry.hpp"
#include "testobject/Portfolio.hpp"
#include "testobject/Position.hpp"
#include "testobject/PdxType.hpp"
diff --git a/cppcache/src/AdminRegion.cpp b/cppcache/src/AdminRegion.cpp
index 4f217a5..cae6df3 100644
--- a/cppcache/src/AdminRegion.cpp
+++ b/cppcache/src/AdminRegion.cpp
@@ -21,6 +21,7 @@
#include "CacheImpl.hpp"
#include "TcrConnectionManager.hpp"
+#include "ThinClientCacheDistributionManager.hpp"
#include "ThinClientPoolDM.hpp"
#include "ThinClientRegion.hpp"
#include "util/exception.hpp"
diff --git a/cppcache/src/Cache.cpp b/cppcache/src/Cache.cpp
index 1112c8d..6942ca9 100644
--- a/cppcache/src/Cache.cpp
+++ b/cppcache/src/Cache.cpp
@@ -21,10 +21,10 @@
#include <geode/FunctionService.hpp>
#include <geode/PoolManager.hpp>
#include <geode/RegionFactory.hpp>
+#include <geode/SystemProperties.hpp>
#include "CacheImpl.hpp"
#include "CacheRegionHelper.hpp"
-#include "DistributedSystemImpl.hpp"
#include "ProxyRegion.hpp"
#include "UserAttributes.hpp"
diff --git a/cppcache/src/CacheImpl.cpp b/cppcache/src/CacheImpl.cpp
index fdaf206..988e79f 100644
--- a/cppcache/src/CacheImpl.cpp
+++ b/cppcache/src/CacheImpl.cpp
@@ -74,7 +74,8 @@ CacheImpl::CacheImpl(Cache* c, const std::shared_ptr<Properties>& dsProps,
m_serializationRegistry(std::make_shared<SerializationRegistry>()),
m_pdxTypeRegistry(nullptr),
m_threadPool(m_distributedSystem.getSystemProperties().threadPoolSize()),
- m_authInitialize(authInitialize) {
+ m_authInitialize(authInitialize),
+ m_keepAlive(false) {
using apache::geode::statistics::StatisticsManager;
m_cacheTXManager = std::shared_ptr<CacheTransactionManager>(
@@ -234,11 +235,12 @@ void CacheImpl::sendNotificationCloseMsgs() {
}
}
-void CacheImpl::close(bool keepalive) {
+void CacheImpl::close(bool keepAlive) {
this->throwIfClosed();
- TcrMessage::setKeepAlive(keepalive);
- // bug #247 fix for durable clients missing events when recycled
+ m_keepAlive = keepAlive;
+
+ // fix for durable clients missing events when recycled
sendNotificationCloseMsgs();
{
@@ -299,12 +301,12 @@ void CacheImpl::close(bool keepalive) {
m_cacheStats->close();
}
- m_poolManager->close(keepalive);
+ m_poolManager->close(keepAlive);
m_poolManager.reset();
LOGFINE("Closed pool manager with keepalive %s",
- keepalive ? "true" : "false");
+ keepAlive ? "true" : "false");
// Close CachePef Stats
if (m_cacheStats) {
@@ -901,6 +903,8 @@ int CacheImpl::getNumberOfTimeEndpointDisconnected(
throw IllegalStateException("Endpoint not found");
}
+bool CacheImpl::isKeepAlive() { return m_keepAlive; }
+
} // namespace client
} // namespace geode
} // namespace apache
diff --git a/cppcache/src/CacheImpl.hpp b/cppcache/src/CacheImpl.hpp
index bcc714a..a96517c 100644
--- a/cppcache/src/CacheImpl.hpp
+++ b/cppcache/src/CacheImpl.hpp
@@ -306,6 +306,8 @@ class APACHE_GEODE_EXPORT CacheImpl {
bool doIfDestroyNotPending(std::function<void()>);
+ bool isKeepAlive();
+
private:
std::atomic<bool> m_networkhop;
std::atomic<int8_t> m_serverGroupFlag;
@@ -370,6 +372,7 @@ class APACHE_GEODE_EXPORT CacheImpl {
ThreadPool m_threadPool;
const std::shared_ptr<AuthInitialize> m_authInitialize;
std::unique_ptr<TypeRegistry> m_typeRegistry;
+ bool m_keepAlive;
inline void throwIfClosed() const {
if (m_closed) {
diff --git a/cppcache/src/CqService.cpp b/cppcache/src/CqService.cpp
index ccbd6c6..3ced543 100644
--- a/cppcache/src/CqService.cpp
+++ b/cppcache/src/CqService.cpp
@@ -301,8 +301,9 @@ void CqService::stopCqs(query_container_type& cqs) {
}
void CqService::closeCqs(query_container_type& cqs) {
- LOGDEBUG("closeCqs() TcrMessage::isKeepAlive() = %d ",
- TcrMessage::isKeepAlive());
+ const auto keepAlive =
+ m_tccdm->getConnectionManager().getCacheImpl()->isKeepAlive();
+ LOGDEBUG("closeCqs() keepAlive = %d ", keepAlive);
if (!cqs.empty()) {
std::string cqName;
for (auto& cq : cqs) {
@@ -311,7 +312,7 @@ void CqService::closeCqs(query_container_type& cqs) {
cqName = cqi->getName();
LOGDEBUG("closeCqs() cqname = %s isDurable = %d ", cqName.c_str(),
cqi->isDurable());
- if (!(cqi->isDurable() && TcrMessage::isKeepAlive())) {
+ if (!(cqi->isDurable() && keepAlive)) {
cqi->close(true);
} else {
cqi->close(false);
diff --git a/cppcache/src/DistributedSystemImpl.cpp b/cppcache/src/DistributedSystemImpl.cpp
index 778c61e..6606246 100644
--- a/cppcache/src/DistributedSystemImpl.cpp
+++ b/cppcache/src/DistributedSystemImpl.cpp
@@ -17,6 +17,8 @@
#include "DistributedSystemImpl.hpp"
+#include "Utils.hpp"
+
#if defined(HAVE_PTHREAD_H)
#include <pthread.h>
#elif defined(_WIN32)
diff --git a/cppcache/src/EntryExpiryTask.cpp b/cppcache/src/EntryExpiryTask.cpp
index 5e39067..f824cbd 100644
--- a/cppcache/src/EntryExpiryTask.cpp
+++ b/cppcache/src/EntryExpiryTask.cpp
@@ -19,6 +19,7 @@
#include "CacheImpl.hpp"
#include "RegionInternal.hpp"
+#include "Utils.hpp"
namespace apache {
namespace geode {
diff --git a/cppcache/src/LocalRegion.cpp b/cppcache/src/LocalRegion.cpp
index 05e3342..0b3503c 100644
--- a/cppcache/src/LocalRegion.cpp
+++ b/cppcache/src/LocalRegion.cpp
@@ -21,6 +21,7 @@
#include <geode/PoolManager.hpp>
#include <geode/SystemProperties.hpp>
+#include <geode/internal/DataSerializablePrimitive.hpp>
#include "CacheImpl.hpp"
#include "CacheRegionHelper.hpp"
@@ -36,6 +37,7 @@
#include "TcrConnectionManager.hpp"
#include "Utils.hpp"
#include "VersionTag.hpp"
+#include "VersionedCacheableObjectPartList.hpp"
#include "util/Log.hpp"
#include "util/bounds.hpp"
#include "util/exception.hpp"
@@ -44,6 +46,8 @@ namespace apache {
namespace geode {
namespace client {
+using internal::DataSerializablePrimitive;
+
LocalRegion::LocalRegion(const std::string& name, CacheImpl* cacheImpl,
const std::shared_ptr<RegionInternal>& rPtr,
RegionAttributes attributes,
diff --git a/cppcache/src/PdxInstanceImpl.cpp b/cppcache/src/PdxInstanceImpl.cpp
index ceadaf8..387312a 100644
--- a/cppcache/src/PdxInstanceImpl.cpp
+++ b/cppcache/src/PdxInstanceImpl.cpp
@@ -22,15 +22,19 @@
#include <geode/Cache.hpp>
#include <geode/PdxFieldTypes.hpp>
#include <geode/PdxReader.hpp>
+#include <geode/internal/DataSerializablePrimitive.hpp>
#include "CacheRegionHelper.hpp"
#include "PdxHelper.hpp"
+#include "Utils.hpp"
#include "util/string.hpp"
namespace apache {
namespace geode {
namespace client {
+using internal::DataSerializablePrimitive;
+
int8_t PdxInstanceImpl::m_BooleanDefaultBytes[] = {0};
int8_t PdxInstanceImpl::m_ByteDefaultBytes[] = {0};
int8_t PdxInstanceImpl::m_ShortDefaultBytes[] = {0, 0};
diff --git a/cppcache/src/PdxTypeRegistry.cpp b/cppcache/src/PdxTypeRegistry.cpp
index 602a9ef..68d2de0 100644
--- a/cppcache/src/PdxTypeRegistry.cpp
+++ b/cppcache/src/PdxTypeRegistry.cpp
@@ -22,6 +22,7 @@
#include "CacheImpl.hpp"
#include "CacheRegionHelper.hpp"
#include "PreservedDataExpiryTask.hpp"
+#include "SerializationRegistry.hpp"
#include "ThinClientPoolDM.hpp"
namespace apache {
diff --git a/cppcache/src/RemoteQueryService.cpp b/cppcache/src/RemoteQueryService.cpp
index 8a3d079..9598a5f 100644
--- a/cppcache/src/RemoteQueryService.cpp
+++ b/cppcache/src/RemoteQueryService.cpp
@@ -21,6 +21,7 @@
#include "CqServiceVsdStats.hpp"
#include "ReadWriteLock.hpp"
#include "RemoteQuery.hpp"
+#include "ThinClientCacheDistributionManager.hpp"
#include "ThinClientPoolDM.hpp"
#include "UserAttributes.hpp"
#include "statistics/StatisticsManager.hpp"
diff --git a/cppcache/src/RemoteQueryService.hpp b/cppcache/src/RemoteQueryService.hpp
index ca72ada..8c64ba2 100644
--- a/cppcache/src/RemoteQueryService.hpp
+++ b/cppcache/src/RemoteQueryService.hpp
@@ -23,12 +23,12 @@
#include <memory>
#include <string>
+#include <ace/RW_Thread_Mutex.h>
+
#include <geode/QueryService.hpp>
#include <geode/internal/geode_globals.hpp>
#include "CqService.hpp"
-#include "ThinClientCacheDistributionManager.hpp"
-#include "statistics/StatisticsManager.hpp"
namespace apache {
namespace geode {
diff --git a/cppcache/src/SerializableHelper.hpp b/cppcache/src/SerializableHelper.hpp
index 39b5565..9fe7716 100644
--- a/cppcache/src/SerializableHelper.hpp
+++ b/cppcache/src/SerializableHelper.hpp
@@ -22,11 +22,17 @@
#include <geode/DataInput.hpp>
#include <geode/DataOutput.hpp>
+#include <geode/DataSerializable.hpp>
+#include <geode/internal/DataSerializableInternal.hpp>
+#include <geode/internal/DataSerializablePrimitive.hpp>
namespace apache {
namespace geode {
namespace client {
+using internal::DataSerializableInternal;
+using internal::DataSerializablePrimitive;
+
template <class _Serializable>
struct SerializableHelper {
inline void serialize(DataOutput& dataOutput,
diff --git a/cppcache/src/TcrConnection.cpp b/cppcache/src/TcrConnection.cpp
index 5035e6d..8cc88bf 100644
--- a/cppcache/src/TcrConnection.cpp
+++ b/cppcache/src/TcrConnection.cpp
@@ -918,12 +918,16 @@ bool TcrConnection::processChunk(TcrMessageReply& reply,
}
void TcrConnection::close() {
- TcrMessage* closeMsg = TcrMessage::getCloseConnMessage(
- m_poolDM->getConnectionManager().getCacheImpl());
+ auto cache = m_poolDM->getConnectionManager().getCacheImpl();
+ TcrMessageCloseConnection closeMsg{
+ std::unique_ptr<DataOutput>(
+ new DataOutput(cache->createDataOutput(m_poolDM))),
+ cache->isKeepAlive() || m_poolDM->isKeepAlive()};
+
try {
if (!TcrConnectionManager::TEST_DURABLE_CLIENT_CRASH &&
!m_connectionManager.isNetDown()) {
- send(closeMsg->getMsgData(), closeMsg->getMsgLength(),
+ send(closeMsg.getMsgData(), closeMsg.getMsgLength(),
std::chrono::seconds(2), false);
}
} catch (Exception& e) {
@@ -961,33 +965,6 @@ std::vector<int8_t> TcrConnection::readHandshakeData(
}
}
-std::shared_ptr<CacheableBytes> TcrConnection::readHandshakeRawData(
- int32_t msgLength, std::chrono::microseconds connectTimeout) {
- ConnErrType error = CONN_NOERR;
- if (msgLength < 0) {
- msgLength = 0;
- }
- if (msgLength == 0) {
- return nullptr;
- }
- std::vector<int8_t> message(msgLength);
- if ((error = receiveData(reinterpret_cast<char*>(message.data()), msgLength,
- connectTimeout)) != CONN_NOERR) {
- m_conn.reset();
- if (error & CONN_TIMEOUT) {
- throwException(
- TimeoutException("TcrConnection::TcrConnection: "
- "Timeout in handshake"));
- } else {
- throwException(
- GeodeIOException("TcrConnection::TcrConnection: "
- "Handshake failure"));
- }
- } else {
- return CacheableBytes::create(std::move(message));
- }
-}
-
// read a byte array
int32_t TcrConnection::readHandshakeArraySize(
std::chrono::microseconds connectTimeout) {
diff --git a/cppcache/src/TcrConnection.hpp b/cppcache/src/TcrConnection.hpp
index 4889cda..fb7562d 100644
--- a/cppcache/src/TcrConnection.hpp
+++ b/cppcache/src/TcrConnection.hpp
@@ -245,10 +245,8 @@ class TcrConnection {
uint16_t inline getPort() { return m_port; }
TcrEndpoint* getEndpointObject() const { return m_endpointObj.get(); }
- bool isBeingUsed() { return m_isBeingUsed; }
- bool setAndGetBeingUsed(
- volatile bool isBeingUsed,
- bool forTransaction); // { m_isBeingUsed = isBeingUsed ;}
+
+ bool setAndGetBeingUsed(volatile bool isBeingUsed, bool forTransaction);
// helpers for pool connection manager
void touch();
@@ -325,12 +323,6 @@ class TcrConnection {
int32_t msgLength, std::chrono::microseconds connectTimeout);
/**
- * Reads raw bytes (without appending nullptr terminator) from socket and
- * handles error conditions in case of Handshake.
- */
- std::shared_ptr<CacheableBytes> readHandshakeRawData(
- int32_t msgLength, std::chrono::microseconds connectTimeout);
- /**
* Reads a string from socket and handles error conditions in case of
* Handshake.
*/
diff --git a/cppcache/src/TcrMessage.cpp b/cppcache/src/TcrMessage.cpp
index cd8ca3e..bdaf340 100644
--- a/cppcache/src/TcrMessage.cpp
+++ b/cppcache/src/TcrMessage.cpp
@@ -17,19 +17,20 @@
#include "TcrMessage.hpp"
-#include <sstream>
-
#include <geode/CacheableBuiltins.hpp>
#include <geode/CacheableObjectArray.hpp>
#include <geode/SystemProperties.hpp>
#include "AutoDelete.hpp"
+#include "BucketServerLocation.hpp"
#include "CacheRegionHelper.hpp"
#include "DataInputInternal.hpp"
#include "DataOutputInternal.hpp"
#include "DiskStoreId.hpp"
#include "DiskVersionTag.hpp"
-#include "DistributedSystem.hpp"
+#include "EventId.hpp"
+#include "FixedPartitionAttributesImpl.hpp"
+#include "SerializationRegistry.hpp"
#include "StackTrace.hpp"
#include "TSSTXStateWrapper.hpp"
#include "TXState.hpp"
@@ -39,6 +40,8 @@
#include "ThinClientBaseDM.hpp"
#include "ThinClientPoolDM.hpp"
#include "ThinClientRegion.hpp"
+#include "VersionTag.hpp"
+#include "VersionedCacheableObjectPartList.hpp"
#include "util/JavaModifiedUtf8.hpp"
#include "util/string.hpp"
@@ -50,9 +53,16 @@ namespace apache {
namespace geode {
namespace client {
namespace {
-const uint32_t g_headerLen = 17;
-const uint32_t REGULAR_EXPRESSION =
- 1; // come from Java InterestType.REGULAR_EXPRESSION
+
+constexpr size_t kHeaderLength = 17;
+
+/**
+ * come from Java InterestType.kREGULAR_EXPRESSION
+ */
+constexpr int32_t kREGULAR_EXPRESSION = 1;
+
+constexpr int32_t kFlagEmpty = 0x01;
+constexpr int32_t kFlagConcurrencyChecks = 0x02;
inline void readInt(uint8_t* buffer, uint16_t* value) {
uint16_t tmp = *(buffer++);
@@ -83,12 +93,6 @@ inline void writeInt(uint8_t* buffer, uint32_t value) {
extern void setThreadLocalExceptionMessage(std::string);
-uint8_t* TcrMessage::m_keepAlive = nullptr;
-const int TcrMessage::m_flag_empty = 0x01;
-const int TcrMessage::m_flag_concurrency_checks = 0x02;
-
-bool TcrMessage::isKeepAlive() { return (m_keepAlive && (*m_keepAlive > 0)); }
-
bool TcrMessage::isUserInitiativeOps(const TcrMessage& msg) {
int32_t msgType = msg.getMessageType();
@@ -140,20 +144,14 @@ TcrMessage::TcrMessage()
m_callbackArgument(nullptr),
m_versionTag(),
m_eventid(nullptr),
- m_vectorPtr(),
- m_bucketServerLocation(nullptr),
m_tombstoneVersions(),
m_tombstoneKeys(),
- m_versionObjPartListptr(),
m_exceptionMessage(),
m_regionName("INVALID_REGION_NAME"),
m_regex(),
- m_bucketServerLocations(),
m_colocatedWith(),
- m_partitionResolverName(),
m_securityHeaderLength(0),
m_msgType(TcrMessage::INVALID),
- m_msgLength(-1),
m_msgTypeRequest(0),
m_txId(-1),
m_bucketCount(0),
@@ -162,7 +160,7 @@ TcrMessage::TcrMessage()
m_deltaBytesLen(0),
m_entryNotFound(0),
m_feAnotherHop(false),
- isSecurityOn(false),
+ m_isSecurityOn(false),
m_isLastChunkAndisSecurityHeader(0),
m_isSecurityHeaderAdded(false),
m_isMetaRegion(false),
@@ -229,8 +227,6 @@ bool TcrMessage::hasCqPart() const { return m_hasCqsPart; }
uint32_t TcrMessage::getMessageTypeForCq() const { return m_msgTypeForCq; }
-bool TcrMessage::isInterestListPassed() const { return m_isInterestListPassed; }
-
bool TcrMessage::shouldIgnore() const { return m_shouldIgnore; }
int8_t TcrMessage::getMetaDataVersion() const { return m_metaDataVersion; }
@@ -259,16 +255,6 @@ TcrChunkedResult* TcrMessage::getChunkedResultHandler() {
return m_chunkedResult;
}
-void TcrMessage::setVersionedObjectPartList(
- std::shared_ptr<VersionedCacheableObjectPartList> versionObjPartListptr) {
- m_versionObjPartListptr = versionObjPartListptr;
-}
-
-std::shared_ptr<VersionedCacheableObjectPartList>
-TcrMessage::getVersionedObjectPartList() {
- return m_versionObjPartListptr;
-}
-
DataInput* TcrMessage::getDelta() { return m_delta.get(); }
// getDeltaBytes( ) is called *only* by CqService, returns a CacheableBytes
@@ -298,10 +284,6 @@ const std::string& TcrMessage::getColocatedWith() const {
return m_colocatedWith;
}
-const std::string& TcrMessage::getPartitionResolver() const {
- return m_partitionResolverName;
-}
-
std::vector<std::vector<std::shared_ptr<BucketServerLocation>>>*
TcrMessage::getMetadata() {
return m_metadata;
@@ -322,10 +304,6 @@ void TcrMessage::setCallBackArguement(bool aCallBackArguement) {
m_isCallBackArguement = aCallBackArguement;
}
-void TcrMessage::setBucketServerLocation(
- std::shared_ptr<BucketServerLocation> serverLocation) {
- m_bucketServerLocation = serverLocation;
-}
void TcrMessage::setVersionTag(std::shared_ptr<VersionTag> versionTag) {
m_versionTag = versionTag;
}
@@ -346,19 +324,6 @@ TcrMessage* TcrMessage::getAllEPDisMess() {
return allEPDisconnected;
}
-TcrMessage* TcrMessage::getCloseConnMessage(CacheImpl* cacheImpl) {
- static auto closeConnMsg = new TcrMessageCloseConnection(
- new DataOutput(cacheImpl->createDataOutput()), true);
- return closeConnMsg;
-}
-
-void TcrMessage::setKeepAlive(bool keepalive) {
- // TODO global
- if (TcrMessage::m_keepAlive != nullptr) {
- *TcrMessage::m_keepAlive = keepalive ? 1 : 0;
- }
-}
-
void TcrMessage::writeInterestResultPolicyPart(InterestResultPolicy policy) {
m_request->writeInt(static_cast<int32_t>(3)); // size
m_request->write(static_cast<int8_t>(1)); // isObject
@@ -467,13 +432,6 @@ void TcrMessage::readIntPart(DataInput& input, uint32_t* intValue) {
*intValue = input.readInt32();
}
-void TcrMessage::readLongPart(DataInput& input, uint64_t* intValue) {
- uint32_t longLen = input.readInt32();
- if (longLen != 8) throw Exception("long length should have been 8");
- if (input.read()) throw Exception("Long is not an object");
- *intValue = input.readInt64();
-}
-
const std::string TcrMessage::readStringPart(DataInput& input) {
auto stringLength = input.readInt32();
if (input.read()) {
@@ -815,7 +773,7 @@ void TcrMessage::writeHeader(uint32_t msgType, uint32_t numOfParts) {
int8_t earlyAck = 0x0;
LOGDEBUG("TcrMessage::writeHeader m_isMetaRegion = %d", m_isMetaRegion);
if (m_tcdm != nullptr) {
- if ((isSecurityOn =
+ if ((m_isSecurityOn =
(m_tcdm->isSecurityOn() &&
TcrMessage::isUserInitiativeOps(*this) && !m_isMetaRegion))) {
earlyAck |= 0x2;
@@ -890,7 +848,7 @@ void TcrMessage::writeEventIdPart(int reserveSize,
void TcrMessage::writeMessageLength() {
auto totalLen = m_request->getBufferLength();
- auto msgLen = totalLen - g_headerLen;
+ auto msgLen = totalLen - kHeaderLength;
m_request->rewindCursor(
totalLen -
4); // msg len is written after the msg type which is of 4 bytes ...
@@ -1190,9 +1148,9 @@ void TcrMessage::handleByteArrayResponse(
m_txId = input.readInt32();
auto earlyack = input.read();
LOGDEBUG(
- "handleByteArrayResponse m_msgType = %d isSecurityOn = %d requesttype "
+ "handleByteArrayResponse m_msgType = %d m_isSecurityOn = %d requesttype "
"=%d",
- m_msgType, isSecurityOn, m_msgTypeRequest);
+ m_msgType, m_isSecurityOn, m_msgTypeRequest);
LOGDEBUG(
"Message type=%d, length=%d, parts=%d, txid=%d and eack %d with data "
"length=%d",
@@ -1297,7 +1255,7 @@ void TcrMessage::handleByteArrayResponse(
uint8_t lastChunk = static_cast<uint8_t>(numparts);
lastChunk = (lastChunk << 5);
readExceptionPart(input, lastChunk);
- // if (isSecurityOn)
+ // if (m_isSecurityOn)
// readSecureObjectPart( input );
break;
}
@@ -1553,7 +1511,7 @@ void TcrMessage::handleByteArrayResponse(
input.read(); // ignore isObj;
if (partLength > 0) {
// PART3 = partitionresolvername
- m_partitionResolverName = input.readString();
+ input.readString(); // ignore
}
input.readInt32(); // ignore partlen;
@@ -1696,7 +1654,7 @@ TcrMessageClearRegion::TcrMessageClearRegion(
m_timeout = DEFAULT_TIMEOUT;
m_messageResponseTimeout = messageResponsetimeout;
- isSecurityOn = false;
+ m_isSecurityOn = false;
m_isSecurityHeaderAdded = false;
uint32_t numOfParts = 1;
@@ -2079,7 +2037,7 @@ TcrMessageReply::TcrMessageReply(bool decodeAll,
m_decodeAll = decodeAll;
m_tcdm = connectionDM;
- if (connectionDM != nullptr) isSecurityOn = connectionDM->isSecurityOn();
+ if (connectionDM != nullptr) m_isSecurityOn = connectionDM->isSecurityOn();
}
TcrMessagePing::TcrMessagePing(std::unique_ptr<DataOutput> dataOutput) {
@@ -2089,18 +2047,14 @@ TcrMessagePing::TcrMessagePing(std::unique_ptr<DataOutput> dataOutput) {
writeMessageLength();
}
-TcrMessageCloseConnection::TcrMessageCloseConnection(DataOutput* dataOutput,
- bool decodeAll) {
+TcrMessageCloseConnection::TcrMessageCloseConnection(
+ std::unique_ptr<DataOutput> dataOutput, bool keepAlive) {
m_msgType = TcrMessage::CLOSE_CONNECTION;
- m_decodeAll = decodeAll;
- m_request.reset(dataOutput);
+ m_request = std::move(dataOutput);
writeHeader(m_msgType, 1);
- // last two parts are not used ... setting zero in both the parts.
- m_request->writeInt(static_cast<int32_t>(1)); // len is 1
- m_request->write(static_cast<int8_t>(0)); // is obj is '0'.
- // cast away constness here since we want to modify this
- TcrMessage::m_keepAlive = const_cast<uint8_t*>(m_request->getCursor());
- m_request->write(static_cast<int8_t>(0)); // keepalive is '0'.
+ m_request->writeInt(static_cast<int32_t>(1)); // len
+ m_request->writeBoolean(false); // is obj
+ m_request->writeBoolean(keepAlive); // keepalive
writeMessageLength();
}
@@ -2238,7 +2192,7 @@ TcrMessageRegisterInterest::TcrMessageRegisterInterest(
writeHeader(m_msgType, numOfParts);
writeRegionPart(str1); // region name
- writeIntPart(REGULAR_EXPRESSION); // InterestType
+ writeIntPart(kREGULAR_EXPRESSION); // InterestType
writeInterestResultPolicyPart(interestPolicy); // InterestResultPolicy
writeBytePart(isDurable ? 1 : 0);
writeRegionPart(str2); // regexp string
@@ -2272,11 +2226,11 @@ TcrMessageUnregisterInterest::TcrMessageUnregisterInterest(
uint32_t numOfParts = 3;
numOfParts += 2;
writeHeader(m_msgType, numOfParts);
- writeRegionPart(str1); // region name
- writeIntPart(REGULAR_EXPRESSION); // InterestType
- writeRegionPart(str2); // regexp string
- writeBytePart(0); // isClosing
- writeBytePart(isDurable ? 1 : 0); // keepalive
+ writeRegionPart(str1); // region name
+ writeIntPart(kREGULAR_EXPRESSION); // InterestType
+ writeRegionPart(str2); // regexp string
+ writeBytePart(0); // isClosing
+ writeBytePart(isDurable ? 1 : 0); // keepalive
writeMessageLength();
m_regionName = str1;
m_regex = str2;
@@ -2413,11 +2367,11 @@ TcrMessagePutAll::TcrMessagePutAll(
// checks are disabled.
int flags = 0;
if (!region->getAttributes().getCachingEnabled()) {
- flags |= TcrMessage::m_flag_empty;
+ flags |= kFlagEmpty;
LOGDEBUG("TcrMessage::PUTALL datapolicy empty flags = %d ", flags);
}
if (region->getAttributes().getConcurrencyChecksEnabled()) {
- flags |= TcrMessage::m_flag_concurrency_checks;
+ flags |= kFlagConcurrencyChecks;
LOGDEBUG("TcrMessage::PUTALL ConcurrencyChecksEnabled flags = %d ", flags);
}
writeIntPart(flags);
@@ -2467,11 +2421,11 @@ TcrMessageRemoveAll::TcrMessageRemoveAll(
// checks are disabled.
int flags = 0;
if (!region->getAttributes().getCachingEnabled()) {
- flags |= TcrMessage::m_flag_empty;
+ flags |= kFlagEmpty;
LOGDEBUG("TcrMessage::REMOVE_ALL datapolicy empty flags = %d ", flags);
}
if (region->getAttributes().getConcurrencyChecksEnabled()) {
- flags |= TcrMessage::m_flag_concurrency_checks;
+ flags |= kFlagConcurrencyChecks;
LOGDEBUG("TcrMessage::REMOVE_ALL ConcurrencyChecksEnabled flags = %d ",
flags);
}
@@ -3057,7 +3011,6 @@ void TcrMessage::setMessageType(int32_t msgType) { m_msgType = msgType; }
void TcrMessage::setMessageTypeRequest(int32_t msgType) {
m_msgTypeRequest = msgType;
}
-int32_t TcrMessage::getMessageTypeRequest() const { return m_msgTypeRequest; }
const std::map<std::string, int>* TcrMessage::getCqs() const { return m_cqs; }
std::shared_ptr<CacheableKey> TcrMessage::getKey() const { return m_key; }
@@ -3082,19 +3035,8 @@ const char* TcrMessage::getMsgData() const {
return reinterpret_cast<const char*>(m_request->getBuffer());
}
-const char* TcrMessage::getMsgHeader() const {
- return reinterpret_cast<const char*>(m_request->getBuffer());
-}
-
-const char* TcrMessage::getMsgBody() const {
- return reinterpret_cast<const char*>(m_request->getBuffer() + g_headerLen);
-}
-
size_t TcrMessage::getMsgLength() const { return m_request->getBufferLength(); }
-size_t TcrMessage::getMsgBodyLength() const {
- return m_request->getBufferLength() - g_headerLen;
-}
std::shared_ptr<EventId> TcrMessage::getEventId() const { return m_eventid; }
int32_t TcrMessage::getTransId() const { return m_txId; }
diff --git a/cppcache/src/TcrMessage.hpp b/cppcache/src/TcrMessage.hpp
index d4e3d7d..d151048 100644
--- a/cppcache/src/TcrMessage.hpp
+++ b/cppcache/src/TcrMessage.hpp
@@ -37,22 +37,26 @@
#include <geode/Serializable.hpp>
#include <geode/internal/geode_globals.hpp>
-#include "BucketServerLocation.hpp"
-#include "EventId.hpp"
#include "EventIdMap.hpp"
-#include "FixedPartitionAttributesImpl.hpp"
#include "InterestResultPolicy.hpp"
-#include "SerializationRegistry.hpp"
-#include "TcrChunkedContext.hpp"
-#include "VersionTag.hpp"
-#include "VersionedCacheableObjectPartList.hpp"
+#include "util/concurrent/binary_semaphore.hpp"
namespace apache {
namespace geode {
namespace client {
+
class ThinClientBaseDM;
class TcrConnection;
class TcrMessagePing;
+class BucketServerLocation;
+class EventId;
+class FixedPartitionAttributesImpl;
+class SerializationRegistry;
+class VersionTag;
+class VersionedCacheableObjectPartList;
+class MemberListForVersionStamp;
+class TcrChunkedResult;
+class DSMemberForVersionStamp;
class TcrMessage {
public:
@@ -170,7 +174,6 @@ class TcrMessage {
} MsgType;
- static bool isKeepAlive();
static bool isUserInitiativeOps(const TcrMessage& msg);
static std::shared_ptr<VersionTag> readVersionTagPart(
@@ -230,10 +233,11 @@ class TcrMessage {
Region* getRegion() const;
int32_t getMessageType() const;
void setMessageType(int32_t msgType);
- void setMessageTypeRequest(int32_t msgType); // the msgType of the request
- // that was made if this is a
- // reply
- int32_t getMessageTypeRequest() const;
+
+ /**
+ * Set the msgType of the request that was made if this is a reply.
+ */
+ void setMessageTypeRequest(int32_t msgType);
std::shared_ptr<CacheableKey> getKey() const;
const std::shared_ptr<CacheableKey>& getKeyRef() const;
std::shared_ptr<Cacheable> getValue() const;
@@ -246,10 +250,7 @@ class TcrMessage {
const std::string& getException();
const char* getMsgData() const;
- const char* getMsgHeader() const;
- const char* getMsgBody() const;
size_t getMsgLength() const;
- size_t getMsgBodyLength() const;
std::shared_ptr<EventId> getEventId() const;
int32_t getTransId() const;
@@ -259,15 +260,10 @@ class TcrMessage {
void setTimeout(std::chrono::milliseconds timeout);
static TcrMessage* getAllEPDisMess();
- /* we need a static method to generate close connection message */
- /* The caller should not delete the message since it is global. */
- static TcrMessage* getCloseConnMessage(CacheImpl* cacheImpl);
- static void setKeepAlive(bool keepalive);
bool isDurable() const;
bool receiveValues() const;
bool hasCqPart() const;
uint32_t getMessageTypeForCq() const;
- bool isInterestListPassed() const;
bool shouldIgnore() const;
int8_t getMetaDataVersion() const;
uint32_t getEntryNotFound() const;
@@ -280,11 +276,6 @@ class TcrMessage {
// set the chunked response handler
void setChunkedResultHandler(TcrChunkedResult* chunkedResult);
TcrChunkedResult* getChunkedResultHandler();
- void setVersionedObjectPartList(
- std::shared_ptr<VersionedCacheableObjectPartList> versionObjPartListptr);
-
- std::shared_ptr<VersionedCacheableObjectPartList>
- getVersionedObjectPartList();
DataInput* getDelta();
// getDeltaBytes( ) is called *only* by CqService, returns a CacheableBytes
@@ -318,8 +309,6 @@ class TcrMessage {
const std::string& getColocatedWith() const;
- const std::string& getPartitionResolver() const;
-
std::vector<std::vector<std::shared_ptr<BucketServerLocation>>>*
getMetadata();
@@ -331,8 +320,6 @@ class TcrMessage {
void setCallBackArguement(bool aCallBackArguement);
- void setBucketServerLocation(
- std::shared_ptr<BucketServerLocation> serverLocation);
void setVersionTag(std::shared_ptr<VersionTag> versionTag);
std::shared_ptr<VersionTag> getVersionTag();
uint8_t hasResult() const;
@@ -365,7 +352,6 @@ class TcrMessage {
void readKeyPart(DataInput& input);
void readBooleanPartAsObject(DataInput& input, bool* boolVal);
void readIntPart(DataInput& input, uint32_t* intValue);
- void readLongPart(DataInput& input, uint64_t* intValue);
bool readExceptionPart(DataInput& input, uint8_t isLastChunk,
bool skipFirstPart = true);
void readVersionTag(DataInput& input, uint16_t endpointMemId,
@@ -426,20 +412,14 @@ class TcrMessage {
std::shared_ptr<Cacheable> m_callbackArgument;
std::shared_ptr<VersionTag> m_versionTag;
std::shared_ptr<EventId> m_eventid;
- std::shared_ptr<CacheableVector> m_vectorPtr;
- std::shared_ptr<BucketServerLocation> m_bucketServerLocation;
std::shared_ptr<CacheableHashMap> m_tombstoneVersions;
std::shared_ptr<CacheableHashSet> m_tombstoneKeys;
- std::shared_ptr<VersionedCacheableObjectPartList> m_versionObjPartListptr;
std::string m_exceptionMessage;
std::string m_regionName;
std::string m_regex;
- std::vector<std::shared_ptr<BucketServerLocation>> m_bucketServerLocations;
std::string m_colocatedWith;
- std::string m_partitionResolverName;
int32_t m_securityHeaderLength;
int32_t m_msgType;
- int32_t m_msgLength;
/** the msgType of the request if this TcrMessage is a reply msg */
int32_t m_msgTypeRequest;
int32_t m_txId;
@@ -449,7 +429,7 @@ class TcrMessage {
int32_t m_deltaBytesLen;
uint32_t m_entryNotFound;
bool m_feAnotherHop;
- bool isSecurityOn;
+ bool m_isSecurityOn;
uint8_t m_isLastChunkAndisSecurityHeader;
bool m_isSecurityHeaderAdded;
bool m_isMetaRegion;
@@ -467,11 +447,6 @@ class TcrMessage {
bool m_isCallBackArguement;
uint8_t m_hasResult;
- static std::atomic<int32_t> m_transactionId;
- static uint8_t* m_keepAlive;
- const static int m_flag_empty;
- const static int m_flag_concurrency_checks;
-
friend class TcrMessageHelper;
};
@@ -915,7 +890,8 @@ class TcrMessagePing : public TcrMessage {
class TcrMessageCloseConnection : public TcrMessage {
public:
- TcrMessageCloseConnection(DataOutput* dataOutput, bool decodeAll);
+ TcrMessageCloseConnection(std::unique_ptr<DataOutput> dataOutput,
+ bool keepAlive);
~TcrMessageCloseConnection() override = default;
};
diff --git a/cppcache/src/ThinClientPoolDM.cpp b/cppcache/src/ThinClientPoolDM.cpp
index 2abd274..696b41a 100644
--- a/cppcache/src/ThinClientPoolDM.cpp
+++ b/cppcache/src/ThinClientPoolDM.cpp
@@ -784,7 +784,9 @@ void ThinClientPoolDM::destroy(bool keepAlive) {
LOGDEBUG("ThinClientPoolDM::destroy...");
if (!m_isDestroyed && (!m_destroyPending || m_destroyPendingHADM)) {
checkRegions();
- TcrMessage::setKeepAlive(keepAlive);
+
+ m_keepAlive = keepAlive;
+
if (m_remoteQueryServicePtr != nullptr) {
m_remoteQueryServicePtr->close();
m_remoteQueryServicePtr = nullptr;
diff --git a/cppcache/src/ThinClientPoolDM.hpp b/cppcache/src/ThinClientPoolDM.hpp
index 6a13076..e306dff 100644
--- a/cppcache/src/ThinClientPoolDM.hpp
+++ b/cppcache/src/ThinClientPoolDM.hpp
@@ -187,6 +187,7 @@ class ThinClientPoolDM
m_primaryServerQueueSize = queueSize;
}
int getPrimaryServerQueueSize() const { return m_primaryServerQueueSize; }
+ bool isKeepAlive() const { return m_keepAlive; }
protected:
ThinClientStickyManager* m_manager;
@@ -312,6 +313,8 @@ class ThinClientPoolDM
std::atomic<int32_t> connected_endpoints_;
std::unique_ptr<statistics::PoolStatsSampler> m_PoolStatsSampler;
std::unique_ptr<ClientMetadataService> m_clientMetadataService;
+ bool m_keepAlive;
+
friend class CacheImpl;
friend class ThinClientStickyManager;
friend class FunctionExecution;
diff --git a/cppcache/src/ThinClientPoolHADM.cpp b/cppcache/src/ThinClientPoolHADM.cpp
index bdb6df2..3d43c85 100644
--- a/cppcache/src/ThinClientPoolHADM.cpp
+++ b/cppcache/src/ThinClientPoolHADM.cpp
@@ -111,11 +111,6 @@ GfErrType ThinClientPoolHADM::sendSyncRequest(TcrMessage& request,
}
}
-bool ThinClientPoolHADM::registerInterestForHARegion(
- TcrEndpoint* ep, const TcrMessage* request, ThinClientHARegion& region) {
- return (region.registerKeys(ep, request) == GF_NOERR);
-}
-
GfErrType ThinClientPoolHADM::sendSyncRequestRegisterInterestEP(
TcrMessage& request, TcrMessageReply& reply, bool attemptFailover,
TcrEndpoint* endpoint) {
diff --git a/cppcache/src/ThinClientPoolHADM.hpp b/cppcache/src/ThinClientPoolHADM.hpp
index c817027..f1094e1 100644
--- a/cppcache/src/ThinClientPoolHADM.hpp
+++ b/cppcache/src/ThinClientPoolHADM.hpp
@@ -50,9 +50,6 @@ class ThinClientPoolHADM : public ThinClientPoolDM {
bool attemptFailover = true,
bool isBGThread = false) override;
- bool registerInterestForHARegion(TcrEndpoint* ep, const TcrMessage* request,
- ThinClientHARegion& region);
-
GfErrType sendSyncRequestRegisterInterestEP(TcrMessage& request,
TcrMessageReply& reply,
bool attemptFailover,
@@ -62,7 +59,7 @@ class ThinClientPoolHADM : public ThinClientPoolDM {
const TcrMessage* request,
TcrMessageReply* reply);
- virtual void destroy(bool keepAlive = false) override;
+ void destroy(bool keepAlive = false) override;
void readyForEvents();
diff --git a/cppcache/src/TombstoneExpiryTask.cpp b/cppcache/src/TombstoneExpiryTask.cpp
index c91ed4d..56e30d4 100644
--- a/cppcache/src/TombstoneExpiryTask.cpp
+++ b/cppcache/src/TombstoneExpiryTask.cpp
@@ -17,10 +17,9 @@
#include "TombstoneExpiryTask.hpp"
-#include "CacheImpl.hpp"
#include "MapSegment.hpp"
-#include "RegionInternal.hpp"
#include "TombstoneEntry.hpp"
+#include "Utils.hpp"
namespace apache {
namespace geode {
diff --git a/cppcache/src/TombstoneExpiryTask.hpp b/cppcache/src/TombstoneExpiryTask.hpp
index ec6e2b5..c0fd10b 100644
--- a/cppcache/src/TombstoneExpiryTask.hpp
+++ b/cppcache/src/TombstoneExpiryTask.hpp
@@ -20,12 +20,9 @@
#ifndef GEODE_TOMBSTONEEXPIRYTASK_H_
#define GEODE_TOMBSTONEEXPIRYTASK_H_
-#include <geode/ExpirationAction.hpp>
-#include <geode/Region.hpp>
#include <geode/internal/geode_globals.hpp>
#include "ExpiryTask.hpp"
-#include "RegionInternal.hpp"
namespace apache {
namespace geode {
diff --git a/cppcache/src/TombstoneList.cpp b/cppcache/src/TombstoneList.cpp
index 22ee9b3..d272990 100644
--- a/cppcache/src/TombstoneList.cpp
+++ b/cppcache/src/TombstoneList.cpp
@@ -19,6 +19,8 @@
#include <unordered_set>
+#include <geode/SystemProperties.hpp>
+
#include "CacheImpl.hpp"
#include "MapSegment.hpp"
#include "TombstoneEntry.hpp"
diff --git a/cppcache/src/TypeRegistry.cpp b/cppcache/src/TypeRegistry.cpp
index af33a0e..060f12e 100644
--- a/cppcache/src/TypeRegistry.cpp
+++ b/cppcache/src/TypeRegistry.cpp
@@ -18,7 +18,7 @@
#include "geode/TypeRegistry.hpp"
#include "CacheImpl.hpp"
-#include "CacheRegionHelper.hpp"
+#include "SerializationRegistry.hpp"
namespace apache {
namespace geode {
diff --git a/cppcache/test/TcrMessageTest.cpp b/cppcache/test/TcrMessageTest.cpp
index 0234834..741d88b 100644
--- a/cppcache/test/TcrMessageTest.cpp
+++ b/cppcache/test/TcrMessageTest.cpp
@@ -18,11 +18,13 @@
#include <TcrMessage.hpp>
#include <iostream>
+#include <gtest/gtest.h>
+
#include <geode/CacheFactory.hpp>
#include <geode/CqState.hpp>
#include "ByteArrayFixture.hpp"
-#include "gtest/gtest.h"
+#include "SerializationRegistry.hpp"
namespace {
@@ -804,7 +806,11 @@ TEST_F(TcrMessageTest, testConstructorPing) {
TEST_F(TcrMessageTest, testConstructorCloseConnection) {
using apache::geode::client::TcrMessageCloseConnection;
- TcrMessageCloseConnection testMessage(new DataOutputUnderTest(), false);
+ std::shared_ptr<Cacheable> myCacheablePtr(
+ CacheableString::createDeserializable());
+
+ TcrMessageCloseConnection testMessage(
+ std::unique_ptr<DataOutput>(new DataOutputUnderTest()), false);
EXPECT_EQ(TcrMessage::CLOSE_CONNECTION, testMessage.getMessageType());
@@ -812,4 +818,19 @@ TEST_F(TcrMessageTest, testConstructorCloseConnection) {
testMessage);
}
+TEST_F(TcrMessageTest, testConstructorCloseConnectionKeepAlive) {
+ using apache::geode::client::TcrMessageCloseConnection;
+
+ std::shared_ptr<Cacheable> myCacheablePtr(
+ CacheableString::createDeserializable());
+
+ TcrMessageCloseConnection testMessage(
+ std::unique_ptr<DataOutput>(new DataOutputUnderTest()), true);
+
+ EXPECT_EQ(TcrMessage::CLOSE_CONNECTION, testMessage.getMessageType());
+
+ EXPECT_MESSAGE_EQ("000000120000000600000001FFFFFFFF00000000010001",
+ testMessage);
+}
+
} // namespace