You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ec...@apache.org on 2017/08/10 15:20:15 UTC
[07/27] geode-native git commit: GEODE-2729: Remove global variables
http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/TcrMessage.hpp
----------------------------------------------------------------------
diff --git a/src/cppcache/src/TcrMessage.hpp b/src/cppcache/src/TcrMessage.hpp
index 30e95e0..60dd5a6 100644
--- a/src/cppcache/src/TcrMessage.hpp
+++ b/src/cppcache/src/TcrMessage.hpp
@@ -1,8 +1,3 @@
-#pragma once
-
-#ifndef GEODE_TCRMESSAGE_H_
-#define GEODE_TCRMESSAGE_H_
-
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
@@ -20,6 +15,13 @@
* limitations under the License.
*/
+#pragma once
+
+#ifndef GEODE_TCRMESSAGE_H_
+#define GEODE_TCRMESSAGE_H_
+
+#include <ace/OS.h>
+
#include <geode/geode_globals.hpp>
#include <atomic>
#include <geode/Cacheable.hpp>
@@ -40,6 +42,7 @@
#include "FixedPartitionAttributesImpl.hpp"
#include "VersionTag.hpp"
#include "VersionedCacheableObjectPartList.hpp"
+#include "SerializationRegistry.hpp"
#include <string>
#include <map>
#include <vector>
@@ -175,8 +178,6 @@ class CPPCACHE_EXPORT TcrMessage {
} MsgType;
- static bool init();
- static void cleanup();
static bool isKeepAlive() { return *m_keepalive; }
static bool isUserInitiativeOps(const TcrMessage& msg) {
int32_t msgType = msg.getMessageType();
@@ -207,11 +208,14 @@ class CPPCACHE_EXPORT TcrMessage {
}
return false;
}
- static VersionTagPtr readVersionTagPart(DataInput& input,
- uint16_t endpointMemId);
+ static VersionTagPtr readVersionTagPart(
+ DataInput& input, uint16_t endpointMemId,
+ MemberListForVersionStamp& memberListForVersionStamp);
/* constructors */
- void setData(const char* bytearray, int32_t len, uint16_t memId);
+ void setData(const char* bytearray, int32_t len, uint16_t memId,
+ const SerializationRegistry& serializationRegistry,
+ MemberListForVersionStamp& memberListForVersionStamp);
void startProcessChunk(ACE_Semaphore& finalizeSema);
// nullptr chunk means that this is the last chunk
@@ -336,11 +340,11 @@ class CPPCACHE_EXPORT TcrMessage {
/* we need a static method to generate ping */
/* The caller should not delete the message since it is global. */
- static TcrMessagePing* getPingMessage();
+ static TcrMessagePing* getPingMessage(Cache* cache);
static TcrMessage* getAllEPDisMess();
/* we need a static method to generate close connection message */
/* The caller should not delete the message since it is global. */
- static TcrMessage* getCloseConnMessage();
+ static TcrMessage* getCloseConnMessage(Cache* cache);
static void setKeepAlive(bool keepalive);
bool isDurable() const { return m_isDurable; }
bool receiveValues() const { return m_receiveValues; }
@@ -372,7 +376,7 @@ class CPPCACHE_EXPORT TcrMessage {
return m_versionObjPartListptr;
}
- DataInput* getDelta() { return m_delta; }
+ DataInput* getDelta() { return m_delta.get(); }
// getDeltaBytes( ) is called *only* by CqService, returns a CacheableBytes
// that
@@ -456,7 +460,7 @@ class CPPCACHE_EXPORT TcrMessage {
m_securityHeaderLength(0),
m_isMetaRegion(false),
exceptionMessage(),
- m_request(new DataOutput),
+ m_request(nullptr),
m_msgType(TcrMessage::INVALID),
m_msgLength(-1),
m_msgTypeRequest(0),
@@ -514,9 +518,6 @@ class CPPCACHE_EXPORT TcrMessage {
SerializablePtr readCacheableString(DataInput& input, int lenObj);
static std::atomic<int32_t> m_transactionId;
- static TcrMessagePing* m_pingMsg;
- static TcrMessage* m_closeConnMsg;
- static TcrMessage* m_allEPDisconnected;
static uint8_t* m_keepalive;
const static int m_flag_empty;
const static int m_flag_concurrency_checks;
@@ -531,13 +532,14 @@ class CPPCACHE_EXPORT TcrMessage {
CacheableStringPtr exceptionMessage;
- // Disallow copy constructor and assignment operator.
- TcrMessage(const TcrMessage&);
- TcrMessage& operator=(const TcrMessage&);
+ TcrMessage(const TcrMessage&) = delete;
+ TcrMessage& operator=(const TcrMessage&) = delete;
// some private methods to handle things internally.
- void handleByteArrayResponse(const char* bytearray, int32_t len,
- uint16_t endpointMemId);
+ void handleByteArrayResponse(
+ const char* bytearray, int32_t len, uint16_t endpointMemId,
+ const SerializationRegistry& serializationRegistry,
+ MemberListForVersionStamp& memberListForVersionStamp);
void readObjectPart(DataInput& input, bool defaultString = false);
void readFailedNodePart(DataInput& input, bool defaultString = false);
void readCallbackObjectPart(DataInput& input, bool defaultString = false);
@@ -547,7 +549,8 @@ class CPPCACHE_EXPORT TcrMessage {
void readLongPart(DataInput& input, uint64_t* intValue);
bool readExceptionPart(DataInput& input, uint8_t isLastChunk,
bool skipFirstPart = true);
- void readVersionTag(DataInput& input, uint16_t endpointMemId);
+ void readVersionTag(DataInput& input, uint16_t endpointMemId,
+ MemberListForVersionStamp& memberListForVersionStamp);
void readOldValue(DataInput& input);
void readPrMetaData(DataInput& input);
void writeObjectPart(const SerializablePtr& se, bool isDelta = false,
@@ -578,7 +581,7 @@ class CPPCACHE_EXPORT TcrMessage {
CacheableHashSetPtr& value);
DSMemberForVersionStampPtr readDSMember(
apache::geode::client::DataInput& input);
- DataOutput* m_request;
+ std::unique_ptr<DataOutput> m_request;
int32_t m_msgType;
int32_t m_msgLength;
int32_t m_msgTypeRequest; // the msgType of the request if this TcrMessage is
@@ -622,7 +625,7 @@ class CPPCACHE_EXPORT TcrMessage {
std::map<std::string, int>* m_cqs;
int32_t m_messageResponseTimeout;
bool m_boolValue;
- DataInput* m_delta;
+ std::unique_ptr<DataInput> m_delta;
uint8_t* m_deltaBytes;
int32_t m_deltaBytesLen;
bool m_isCallBackArguement;
@@ -641,425 +644,568 @@ class CPPCACHE_EXPORT TcrMessage {
class TcrMessageDestroyRegion : public TcrMessage {
public:
- TcrMessageDestroyRegion(const Region* region,
+ TcrMessageDestroyRegion(std::unique_ptr<DataOutput> dataOutput,
+ const Region* region,
const UserDataPtr& aCallbackArgument,
int messageResponsetimeout,
ThinClientBaseDM* connectionDM);
virtual ~TcrMessageDestroyRegion() {}
+
+ private:
};
class TcrMessageClearRegion : public TcrMessage {
public:
- TcrMessageClearRegion(const Region* region,
+ TcrMessageClearRegion(std::unique_ptr<DataOutput> dataOutput,
+ const Region* region,
const UserDataPtr& aCallbackArgument,
int messageResponsetimeout,
ThinClientBaseDM* connectionDM);
virtual ~TcrMessageClearRegion() {}
+
+ private:
};
class TcrMessageQuery : public TcrMessage {
public:
- TcrMessageQuery(const std::string& regionName, int messageResponsetimeout,
+ TcrMessageQuery(std::unique_ptr<DataOutput> dataOutput,
+ const std::string& regionName, int messageResponsetimeout,
ThinClientBaseDM* connectionDM);
virtual ~TcrMessageQuery() {}
+
+ private:
};
class TcrMessageStopCQ : public TcrMessage {
public:
- TcrMessageStopCQ(const std::string& regionName, int messageResponsetimeout,
+ TcrMessageStopCQ(std::unique_ptr<DataOutput> dataOutput,
+ const std::string& regionName, int messageResponsetimeout,
ThinClientBaseDM* connectionDM);
virtual ~TcrMessageStopCQ() {}
+
+ private:
};
class TcrMessageCloseCQ : public TcrMessage {
public:
- TcrMessageCloseCQ(const std::string& regionName, int messageResponsetimeout,
+ TcrMessageCloseCQ(std::unique_ptr<DataOutput> dataOutput,
+ const std::string& regionName, int messageResponsetimeout,
ThinClientBaseDM* connectionDM);
virtual ~TcrMessageCloseCQ() {}
+
+ private:
};
class TcrMessageQueryWithParameters : public TcrMessage {
public:
- TcrMessageQueryWithParameters(const std::string& regionName,
+ TcrMessageQueryWithParameters(std::unique_ptr<DataOutput> dataOutput,
+ const std::string& regionName,
const UserDataPtr& aCallbackArgument,
CacheableVectorPtr paramList,
int messageResponsetimeout,
ThinClientBaseDM* connectionDM);
virtual ~TcrMessageQueryWithParameters() {}
+
+ private:
};
class TcrMessageContainsKey : public TcrMessage {
public:
- TcrMessageContainsKey(const Region* region, const CacheableKeyPtr& key,
+ TcrMessageContainsKey(std::unique_ptr<DataOutput> dataOutput,
+ const Region* region, const CacheableKeyPtr& key,
const UserDataPtr& aCallbackArgument,
bool isContainsKey, ThinClientBaseDM* connectionDM);
virtual ~TcrMessageContainsKey() {}
+
+ private:
};
class TcrMessageGetDurableCqs : public TcrMessage {
public:
- TcrMessageGetDurableCqs(ThinClientBaseDM* connectionDM);
+ TcrMessageGetDurableCqs(std::unique_ptr<DataOutput> dataOutput,
+ ThinClientBaseDM* connectionDM);
virtual ~TcrMessageGetDurableCqs() {}
+
+ private:
};
class TcrMessageRequest : public TcrMessage {
public:
- TcrMessageRequest(const Region* region, const CacheableKeyPtr& key,
+ TcrMessageRequest(std::unique_ptr<DataOutput> dataOutput,
+ const Region* region, const CacheableKeyPtr& key,
const UserDataPtr& aCallbackArgument,
ThinClientBaseDM* connectionDM = nullptr);
virtual ~TcrMessageRequest() {}
+
+ private:
};
class TcrMessageInvalidate : public TcrMessage {
public:
- TcrMessageInvalidate(const Region* region, const CacheableKeyPtr& key,
+ TcrMessageInvalidate(std::unique_ptr<DataOutput> dataOutput,
+ const Region* region, const CacheableKeyPtr& key,
const UserDataPtr& aCallbackArgument,
ThinClientBaseDM* connectionDM = nullptr);
+
+ private:
};
class TcrMessageDestroy : public TcrMessage {
public:
- TcrMessageDestroy(const Region* region, const CacheableKeyPtr& key,
+ TcrMessageDestroy(std::unique_ptr<DataOutput> dataOutput,
+ const Region* region, const CacheableKeyPtr& key,
const CacheablePtr& value,
const UserDataPtr& aCallbackArgument,
ThinClientBaseDM* connectionDM = nullptr);
+
+ private:
};
class TcrMessageRegisterInterestList : public TcrMessage {
public:
TcrMessageRegisterInterestList(
- const Region* region, const VectorOfCacheableKey& keys,
- bool isDurable = false, bool isCachingEnabled = false,
- bool receiveValues = true,
+ std::unique_ptr<DataOutput> dataOutput, const Region* region,
+ const VectorOfCacheableKey& keys, bool isDurable = false,
+ bool isCachingEnabled = false, bool receiveValues = true,
InterestResultPolicy interestPolicy = InterestResultPolicy::NONE,
ThinClientBaseDM* connectionDM = nullptr);
virtual ~TcrMessageRegisterInterestList() {}
+
+ private:
};
class TcrMessageUnregisterInterestList : public TcrMessage {
public:
TcrMessageUnregisterInterestList(
- const Region* region, const VectorOfCacheableKey& keys,
- bool isDurable = false, bool isCachingEnabled = false,
- bool receiveValues = true,
+ std::unique_ptr<DataOutput> dataOutput, const Region* region,
+ const VectorOfCacheableKey& keys, bool isDurable = false,
+ bool isCachingEnabled = false, bool receiveValues = true,
InterestResultPolicy interestPolicy = InterestResultPolicy::NONE,
ThinClientBaseDM* connectionDM = nullptr);
virtual ~TcrMessageUnregisterInterestList() {}
+
+ private:
};
class TcrMessagePut : public TcrMessage {
public:
- TcrMessagePut(const Region* region, const CacheableKeyPtr& key,
- const CacheablePtr& value, const UserDataPtr& aCallbackArgument,
- bool isDelta = false, ThinClientBaseDM* connectionDM = nullptr,
+ TcrMessagePut(std::unique_ptr<DataOutput> dataOutput, const Region* region,
+ const CacheableKeyPtr& key, const CacheablePtr& value,
+ const UserDataPtr& aCallbackArgument, bool isDelta = false,
+ ThinClientBaseDM* connectionDM = nullptr,
bool isMetaRegion = false, bool fullValueAfterDeltaFail = false,
const char* regionName = nullptr);
virtual ~TcrMessagePut() {}
+
+ private:
};
class TcrMessageCreateRegion : public TcrMessage {
public:
TcrMessageCreateRegion(
- const std::string& str1, const std::string& str2,
+ std::unique_ptr<DataOutput> dataOutput, const std::string& str1,
+ const std::string& str2,
InterestResultPolicy interestPolicy = InterestResultPolicy::NONE,
bool isDurable = false, bool isCachingEnabled = false,
bool receiveValues = true, ThinClientBaseDM* connectionDM = nullptr);
virtual ~TcrMessageCreateRegion() {}
+
+ private:
};
class TcrMessageRegisterInterest : public TcrMessage {
public:
TcrMessageRegisterInterest(
- const std::string& str1, const std::string& str2,
+ std::unique_ptr<DataOutput> dataOutput, const std::string& str1,
+ const std::string& str2,
InterestResultPolicy interestPolicy = InterestResultPolicy::NONE,
bool isDurable = false, bool isCachingEnabled = false,
bool receiveValues = true, ThinClientBaseDM* connectionDM = nullptr);
virtual ~TcrMessageRegisterInterest() {}
+
+ private:
};
class TcrMessageUnregisterInterest : public TcrMessage {
public:
TcrMessageUnregisterInterest(
- const std::string& str1, const std::string& str2,
+ std::unique_ptr<DataOutput> dataOutput, const std::string& str1,
+ const std::string& str2,
InterestResultPolicy interestPolicy = InterestResultPolicy::NONE,
bool isDurable = false, bool isCachingEnabled = false,
bool receiveValues = true, ThinClientBaseDM* connectionDM = nullptr);
virtual ~TcrMessageUnregisterInterest() {}
+
+ private:
};
class TcrMessageTxSynchronization : public TcrMessage {
public:
- TcrMessageTxSynchronization(int ordinal, int txid, int status);
+ TcrMessageTxSynchronization(std::unique_ptr<DataOutput> dataOutput,
+ int ordinal, int txid, int status);
virtual ~TcrMessageTxSynchronization() {}
+
+ private:
};
class TcrMessageClientReady : public TcrMessage {
public:
- TcrMessageClientReady();
+ TcrMessageClientReady(std::unique_ptr<DataOutput> dataOutput);
virtual ~TcrMessageClientReady() {}
+
+ private:
};
class TcrMessageCommit : public TcrMessage {
public:
- TcrMessageCommit();
+ TcrMessageCommit(std::unique_ptr<DataOutput> dataOutput);
virtual ~TcrMessageCommit() {}
+
+ private:
};
class TcrMessageRollback : public TcrMessage {
public:
- TcrMessageRollback();
+ TcrMessageRollback(std::unique_ptr<DataOutput> dataOutput);
virtual ~TcrMessageRollback() {}
+
+ private:
};
class TcrMessageTxFailover : public TcrMessage {
public:
- TcrMessageTxFailover();
+ TcrMessageTxFailover(std::unique_ptr<DataOutput> dataOutput);
virtual ~TcrMessageTxFailover() {}
+
+ private:
};
class TcrMessageMakePrimary : public TcrMessage {
public:
- TcrMessageMakePrimary(bool processedMarker);
+ TcrMessageMakePrimary(std::unique_ptr<DataOutput> dataOutput,
+ bool processedMarker);
virtual ~TcrMessageMakePrimary() {}
+
+ private:
};
class TcrMessagePutAll : public TcrMessage {
public:
- TcrMessagePutAll(const Region* region, const HashMapOfCacheable& map,
- int messageResponsetimeout, ThinClientBaseDM* connectionDM,
+ TcrMessagePutAll(std::unique_ptr<DataOutput> dataOutput, const Region* region,
+ const HashMapOfCacheable& map, int messageResponsetimeout,
+ ThinClientBaseDM* connectionDM,
const UserDataPtr& aCallbackArgument);
virtual ~TcrMessagePutAll() {}
+
+ private:
};
class TcrMessageRemoveAll : public TcrMessage {
public:
- TcrMessageRemoveAll(const Region* region, const VectorOfCacheableKey& keys,
+ TcrMessageRemoveAll(std::unique_ptr<DataOutput> dataOutput,
+ const Region* region, const VectorOfCacheableKey& keys,
const UserDataPtr& aCallbackArgument,
ThinClientBaseDM* connectionDM = nullptr);
virtual ~TcrMessageRemoveAll() {}
+
+ private:
};
class TcrMessageExecuteCq : public TcrMessage {
public:
- TcrMessageExecuteCq(const std::string& str1, const std::string& str2,
+ TcrMessageExecuteCq(std::unique_ptr<DataOutput> dataOutput,
+ const std::string& str1, const std::string& str2,
int state, bool isDurable,
ThinClientBaseDM* connectionDM);
virtual ~TcrMessageExecuteCq() {}
+
+ private:
};
class TcrMessageExecuteCqWithIr : public TcrMessage {
public:
- TcrMessageExecuteCqWithIr(const std::string& str1, const std::string& str2,
+ TcrMessageExecuteCqWithIr(std::unique_ptr<DataOutput> dataOutput,
+ const std::string& str1, const std::string& str2,
int state, bool isDurable,
ThinClientBaseDM* connectionDM);
virtual ~TcrMessageExecuteCqWithIr() {}
+
+ private:
};
class TcrMessageExecuteRegionFunction : public TcrMessage {
public:
TcrMessageExecuteRegionFunction(
- const std::string& funcName, const Region* region,
- const CacheablePtr& args, CacheableVectorPtr routingObj,
- uint8_t getResult, CacheableHashSetPtr failedNodes, int32_t timeout,
+ std::unique_ptr<DataOutput> dataOutput, const std::string& funcName,
+ const Region* region, const CacheablePtr& args,
+ CacheableVectorPtr routingObj, uint8_t getResult,
+ CacheableHashSetPtr failedNodes, int32_t timeout,
ThinClientBaseDM* connectionDM = nullptr, int8_t reExecute = 0);
virtual ~TcrMessageExecuteRegionFunction() {}
+
+ private:
};
class TcrMessageExecuteRegionFunctionSingleHop : public TcrMessage {
public:
TcrMessageExecuteRegionFunctionSingleHop(
- const std::string& funcName, const Region* region,
- const CacheablePtr& args, CacheableHashSetPtr routingObj,
- uint8_t getResult, CacheableHashSetPtr failedNodes, bool allBuckets,
- int32_t timeout, ThinClientBaseDM* connectionDM);
+ std::unique_ptr<DataOutput> dataOutput, const std::string& funcName,
+ const Region* region, const CacheablePtr& args,
+ CacheableHashSetPtr routingObj, uint8_t getResult,
+ CacheableHashSetPtr failedNodes, bool allBuckets, int32_t timeout,
+ ThinClientBaseDM* connectionDM);
virtual ~TcrMessageExecuteRegionFunctionSingleHop() {}
+
+ private:
};
class TcrMessageGetClientPartitionAttributes : public TcrMessage {
public:
- TcrMessageGetClientPartitionAttributes(const char* regionName);
+ TcrMessageGetClientPartitionAttributes(std::unique_ptr<DataOutput> dataOutput,
+ const char* regionName);
virtual ~TcrMessageGetClientPartitionAttributes() {}
+
+ private:
};
class TcrMessageGetClientPrMetadata : public TcrMessage {
public:
- TcrMessageGetClientPrMetadata(const char* regionName);
+ TcrMessageGetClientPrMetadata(std::unique_ptr<DataOutput> dataOutput,
+ const char* regionName);
virtual ~TcrMessageGetClientPrMetadata() {}
+
+ private:
};
class TcrMessageSize : public TcrMessage {
public:
- TcrMessageSize(const char* regionName);
+ TcrMessageSize(std::unique_ptr<DataOutput> dataOutput,
+ const char* regionName);
virtual ~TcrMessageSize() {}
+
+ private:
};
class TcrMessageUserCredential : public TcrMessage {
public:
- TcrMessageUserCredential(PropertiesPtr creds,
+ TcrMessageUserCredential(std::unique_ptr<DataOutput> dataOutput,
+ PropertiesPtr creds,
ThinClientBaseDM* connectionDM = nullptr);
virtual ~TcrMessageUserCredential() {}
+
+ private:
};
class TcrMessageRemoveUserAuth : public TcrMessage {
public:
- TcrMessageRemoveUserAuth(bool keepAlive, ThinClientBaseDM* connectionDM);
+ TcrMessageRemoveUserAuth(std::unique_ptr<DataOutput> dataOutput,
+ bool keepAlive, ThinClientBaseDM* connectionDM);
virtual ~TcrMessageRemoveUserAuth() {}
+
+ private:
};
class TcrMessageGetPdxIdForType : public TcrMessage {
public:
- TcrMessageGetPdxIdForType(const CacheablePtr& pdxType,
+ TcrMessageGetPdxIdForType(std::unique_ptr<DataOutput> dataOutput,
+ const CacheablePtr& pdxType,
ThinClientBaseDM* connectionDM,
int32_t pdxTypeId = 0);
virtual ~TcrMessageGetPdxIdForType() {}
+
+ private:
};
class TcrMessageAddPdxType : public TcrMessage {
public:
- TcrMessageAddPdxType(const CacheablePtr& pdxType,
+ TcrMessageAddPdxType(std::unique_ptr<DataOutput> dataOutput,
+ const CacheablePtr& pdxType,
ThinClientBaseDM* connectionDM, int32_t pdxTypeId = 0);
virtual ~TcrMessageAddPdxType() {}
+
+ private:
};
class TcrMessageGetPdxIdForEnum : public TcrMessage {
public:
- TcrMessageGetPdxIdForEnum(const CacheablePtr& pdxType,
+ TcrMessageGetPdxIdForEnum(std::unique_ptr<DataOutput> dataOutput,
+ const CacheablePtr& pdxType,
ThinClientBaseDM* connectionDM,
int32_t pdxTypeId = 0);
virtual ~TcrMessageGetPdxIdForEnum() {}
+
+ private:
};
class TcrMessageAddPdxEnum : public TcrMessage {
public:
- TcrMessageAddPdxEnum(const CacheablePtr& pdxType,
+ TcrMessageAddPdxEnum(std::unique_ptr<DataOutput> dataOutput,
+ const CacheablePtr& pdxType,
ThinClientBaseDM* connectionDM, int32_t pdxTypeId = 0);
virtual ~TcrMessageAddPdxEnum() {}
+
+ private:
};
class TcrMessageGetPdxTypeById : public TcrMessage {
public:
- TcrMessageGetPdxTypeById(int32_t typeId, ThinClientBaseDM* connectionDM);
+ TcrMessageGetPdxTypeById(std::unique_ptr<DataOutput> dataOutput,
+ int32_t typeId, ThinClientBaseDM* connectionDM);
virtual ~TcrMessageGetPdxTypeById() {}
+
+ private:
};
class TcrMessageGetPdxEnumById : public TcrMessage {
public:
- TcrMessageGetPdxEnumById(int32_t typeId, ThinClientBaseDM* connectionDM);
+ TcrMessageGetPdxEnumById(std::unique_ptr<DataOutput> dataOutput,
+ int32_t typeId, ThinClientBaseDM* connectionDM);
virtual ~TcrMessageGetPdxEnumById() {}
+
+ private:
};
class TcrMessageGetFunctionAttributes : public TcrMessage {
public:
- TcrMessageGetFunctionAttributes(const std::string& funcName,
+ TcrMessageGetFunctionAttributes(std::unique_ptr<DataOutput> dataOutput,
+ const std::string& funcName,
ThinClientBaseDM* connectionDM = nullptr);
virtual ~TcrMessageGetFunctionAttributes() {}
+
+ private:
};
class TcrMessageKeySet : public TcrMessage {
public:
- TcrMessageKeySet(const std::string& funcName,
+ TcrMessageKeySet(std::unique_ptr<DataOutput> dataOutput,
+ const std::string& funcName,
ThinClientBaseDM* connectionDM = nullptr);
virtual ~TcrMessageKeySet() {}
+
+ private:
};
class TcrMessageRequestEventValue : public TcrMessage {
public:
- TcrMessageRequestEventValue(EventIdPtr eventId);
+ TcrMessageRequestEventValue(std::unique_ptr<DataOutput> dataOutput,
+ EventIdPtr eventId);
virtual ~TcrMessageRequestEventValue() {}
+
+ private:
};
class TcrMessagePeriodicAck : public TcrMessage {
public:
- TcrMessagePeriodicAck(const EventIdMapEntryList& entries);
+ TcrMessagePeriodicAck(std::unique_ptr<DataOutput> dataOutput,
+ const EventIdMapEntryList& entries);
virtual ~TcrMessagePeriodicAck() {}
+
+ private:
};
class TcrMessageUpdateClientNotification : public TcrMessage {
public:
- TcrMessageUpdateClientNotification(int32_t port);
+ TcrMessageUpdateClientNotification(std::unique_ptr<DataOutput> dataOutput,
+ int32_t port);
virtual ~TcrMessageUpdateClientNotification() {}
+
+ private:
};
class TcrMessageGetAll : public TcrMessage {
public:
- TcrMessageGetAll(const Region* region, const VectorOfCacheableKey* keys,
+ TcrMessageGetAll(std::unique_ptr<DataOutput> dataOutput, const Region* region,
+ const VectorOfCacheableKey* keys,
ThinClientBaseDM* connectionDM = nullptr,
const UserDataPtr& aCallbackArgument = nullptr);
virtual ~TcrMessageGetAll() {}
+
+ private:
};
class TcrMessageExecuteFunction : public TcrMessage {
public:
- TcrMessageExecuteFunction(const std::string& funcName,
+ TcrMessageExecuteFunction(std::unique_ptr<DataOutput> dataOutput,
+ const std::string& funcName,
const CacheablePtr& args, uint8_t getResult,
ThinClientBaseDM* connectionDM, int32_t timeout);
virtual ~TcrMessageExecuteFunction() {}
+
+ private:
};
class TcrMessagePing : public TcrMessage {
public:
- TcrMessagePing(bool decodeAll);
+ TcrMessagePing(std::unique_ptr<DataOutput> dataOutput, bool decodeAll);
virtual ~TcrMessagePing() {}
+
+ private:
};
class TcrMessageCloseConnection : public TcrMessage {
public:
- TcrMessageCloseConnection(bool decodeAll);
+ TcrMessageCloseConnection(std::unique_ptr<DataOutput> dataOutput,
+ bool decodeAll);
virtual ~TcrMessageCloseConnection() {}
+
+ private:
};
class TcrMessageClientMarker : public TcrMessage {
public:
- TcrMessageClientMarker(bool decodeAll);
+ TcrMessageClientMarker(std::unique_ptr<DataOutput> dataOutput,
+ bool decodeAll);
virtual ~TcrMessageClientMarker() {}
+
+ private:
};
class TcrMessageReply : public TcrMessage {
http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/TcrPoolEndPoint.cpp
----------------------------------------------------------------------
diff --git a/src/cppcache/src/TcrPoolEndPoint.cpp b/src/cppcache/src/TcrPoolEndPoint.cpp
index c88554d..9021fcd 100644
--- a/src/cppcache/src/TcrPoolEndPoint.cpp
+++ b/src/cppcache/src/TcrPoolEndPoint.cpp
@@ -64,13 +64,12 @@ GfErrType TcrPoolEndPoint::registerDM(bool clientNotification, bool isSecondary,
GfErrType err = GF_NOERR;
ACE_Guard<ACE_Recursive_Thread_Mutex> _guard(m_dm->getPoolLock());
ACE_Guard<ACE_Recursive_Thread_Mutex> guardQueueHosted(getQueueHostedMutex());
-
+ auto& sysProp = m_cacheImpl->getDistributedSystem().getSystemProperties();
if (!connected()) {
TcrConnection* newConn;
- if ((err = createNewConnection(
- newConn, false, false,
- DistributedSystem::getSystemProperties()->connectTimeout(), 0,
- connected())) != GF_NOERR) {
+ if ((err = createNewConnection(newConn, false, false,
+ sysProp.connectTimeout(), 0, connected())) !=
+ GF_NOERR) {
setConnected(false);
return err;
}
@@ -85,10 +84,9 @@ GfErrType TcrPoolEndPoint::registerDM(bool clientNotification, bool isSecondary,
name().c_str());
if (m_numRegionListener == 0) {
- if ((err = createNewConnection(
- m_notifyConnection, true, isSecondary,
- DistributedSystem::getSystemProperties()->connectTimeout() * 3,
- 0)) != GF_NOERR) {
+ if ((err = createNewConnection(m_notifyConnection, true, isSecondary,
+ sysProp.connectTimeout() * 3, 0)) !=
+ GF_NOERR) {
setConnected(false);
LOGWARN("Failed to start subscription channel for endpoint %s",
name().c_str());
http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/ThinClientBaseDM.cpp
----------------------------------------------------------------------
diff --git a/src/cppcache/src/ThinClientBaseDM.cpp b/src/cppcache/src/ThinClientBaseDM.cpp
index 0291c5a..adf882e 100644
--- a/src/cppcache/src/ThinClientBaseDM.cpp
+++ b/src/cppcache/src/ThinClientBaseDM.cpp
@@ -43,19 +43,21 @@ ThinClientBaseDM::ThinClientBaseDM(TcrConnectionManager& connManager,
ThinClientBaseDM::~ThinClientBaseDM() {}
void ThinClientBaseDM::init() {
- if (!DistributedSystem::getSystemProperties()->isGridClient()) {
- // start the chunk processing thread
- if (!DistributedSystem::getSystemProperties()
- ->disableChunkHandlerThread()) {
- startChunkProcessor();
- }
+ const auto& systemProperties = m_connManager.getCacheImpl()
+ ->getDistributedSystem()
+ .getSystemProperties();
+ if (!(systemProperties.isGridClient() &&
+ systemProperties.disableChunkHandlerThread())) {
+ startChunkProcessor();
}
m_initDone = true;
}
bool ThinClientBaseDM::isSecurityOn() {
- SystemProperties* sysProp = DistributedSystem::getSystemProperties();
- return sysProp->isSecurityOn();
+ return m_connManager.getCacheImpl()
+ ->getDistributedSystem()
+ .getSystemProperties()
+ .isSecurityOn();
}
void ThinClientBaseDM::destroy(bool keepalive) {
http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/ThinClientBaseDM.hpp
----------------------------------------------------------------------
diff --git a/src/cppcache/src/ThinClientBaseDM.hpp b/src/cppcache/src/ThinClientBaseDM.hpp
index ad10981..b3d3f74 100644
--- a/src/cppcache/src/ThinClientBaseDM.hpp
+++ b/src/cppcache/src/ThinClientBaseDM.hpp
@@ -124,8 +124,10 @@ class ThinClientBaseDM {
LOGFINE("Delta enabled on server: %s",
s_isDeltaEnabledOnServer ? "true" : "false");
}
- TcrConnectionManager& getConnectionManager() { return m_connManager; }
+ TcrConnectionManager& getConnectionManager() const { return m_connManager; }
+
virtual size_t getNumberOfEndPoints() const { return 0; }
+
bool isNotAuthorizedException(const char* exceptionMsg) {
if (exceptionMsg != nullptr &&
strstr(exceptionMsg,
@@ -138,6 +140,7 @@ class ThinClientBaseDM {
}
return false;
}
+
bool isPutAllPartialResultException(const char* exceptionMsg) {
if (exceptionMsg != nullptr &&
strstr(
http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/ThinClientCacheDistributionManager.hpp
----------------------------------------------------------------------
diff --git a/src/cppcache/src/ThinClientCacheDistributionManager.hpp b/src/cppcache/src/ThinClientCacheDistributionManager.hpp
index 1d859a9..20a5fb8 100644
--- a/src/cppcache/src/ThinClientCacheDistributionManager.hpp
+++ b/src/cppcache/src/ThinClientCacheDistributionManager.hpp
@@ -48,8 +48,8 @@ class CPPCACHE_EXPORT ThinClientCacheDistributionManager
GfErrType sendRequestToPrimary(TcrMessage& request, TcrMessageReply& reply);
protected:
- bool preFailoverAction();
- bool postFailoverAction(TcrEndpoint* endpoint);
+ virtual bool preFailoverAction();
+ virtual bool postFailoverAction(TcrEndpoint* endpoint);
private:
// Disallow default/copy constructor and assignment operator.
http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/ThinClientDistributionManager.cpp
----------------------------------------------------------------------
diff --git a/src/cppcache/src/ThinClientDistributionManager.cpp b/src/cppcache/src/ThinClientDistributionManager.cpp
index bc97b96..5fd092d 100644
--- a/src/cppcache/src/ThinClientDistributionManager.cpp
+++ b/src/cppcache/src/ThinClientDistributionManager.cpp
@@ -306,31 +306,23 @@ bool ThinClientDistributionManager::postFailoverAction(TcrEndpoint* endpoint) {
}
PropertiesPtr ThinClientDistributionManager::getCredentials(TcrEndpoint* ep) {
- PropertiesPtr tmpSecurityProperties =
- DistributedSystem::getSystemProperties()->getSecurityProperties();
+ const auto& distributedSystem =
+ m_connManager.getCacheImpl()->getDistributedSystem();
+ const auto& tmpSecurityProperties =
+ distributedSystem.getSystemProperties().getSecurityProperties();
- AuthInitializePtr authInitialize = DistributedSystem::m_impl->getAuthLoader();
-
- if (authInitialize != nullptr) {
+ if (const auto& authInitialize = distributedSystem.m_impl->getAuthLoader()) {
LOGFINER(
"ThinClientDistributionManager::getCredentials: acquired handle to "
"authLoader, "
"invoking getCredentials %s",
ep->name().c_str());
- /* adongre
- * CID 28900: Copy into fixed size buffer (STRING_OVERFLOW)
- * You might overrun the 100 byte fixed-size string "tmpEndpoint" by copying
- * the return
- * value of "stlp_std::basic_string<char, stlp_std::char_traits<char>,
- * stlp_std::allocator<char> >::c_str() const" without checking the
- * length.
- */
- // char tmpEndpoint[100] = { '\0' } ;
- // strcpy(tmpEndpoint, ep->name().c_str());
- PropertiesPtr tmpAuthIniSecurityProperties = authInitialize->getCredentials(
- tmpSecurityProperties, /*tmpEndpoint*/ ep->name().c_str());
+ const auto& tmpAuthIniSecurityProperties = authInitialize->getCredentials(
+ tmpSecurityProperties, ep->name().c_str());
+ LOGFINER("Done getting credentials");
return tmpAuthIniSecurityProperties;
}
+
return nullptr;
}
@@ -340,7 +332,9 @@ GfErrType ThinClientDistributionManager::sendUserCredentials(
GfErrType err = GF_NOERR;
- TcrMessageUserCredential request(credentials, this);
+ TcrMessageUserCredential request(
+ m_connManager.getCacheImpl()->getCache()->createDataOutput(), credentials,
+ this);
TcrMessageReply reply(true, this);
http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/ThinClientHARegion.cpp
----------------------------------------------------------------------
diff --git a/src/cppcache/src/ThinClientHARegion.cpp b/src/cppcache/src/ThinClientHARegion.cpp
index d59366e..5fd63fb 100644
--- a/src/cppcache/src/ThinClientHARegion.cpp
+++ b/src/cppcache/src/ThinClientHARegion.cpp
@@ -43,7 +43,9 @@ void ThinClientHARegion::initTCR() {
try {
bool isPool = m_attribute->getPoolName() != nullptr &&
strlen(m_attribute->getPoolName()) > 0;
- if (DistributedSystem::getSystemProperties()->isGridClient()) {
+ if (m_cacheImpl->getDistributedSystem()
+ .getSystemProperties()
+ .isGridClient()) {
LOGWARN(
"Region: HA region having notification channel created for grid "
"client; force starting required notification, cleanup and "
@@ -61,7 +63,10 @@ void ThinClientHARegion::initTCR() {
m_tcrdm->init();
} else {
m_tcrdm = dynamic_cast<ThinClientPoolHADM*>(
- PoolManager::find(m_attribute->getPoolName()).get());
+ m_cacheImpl->getCache()
+ ->getPoolManager()
+ .find(m_attribute->getPoolName())
+ .get());
if (m_tcrdm) {
m_poolDM = true;
// Pool DM should only be inited once and it
@@ -109,7 +114,7 @@ void ThinClientHARegion::handleMarker() {
if (m_listener != nullptr && !m_processedMarker) {
RegionEvent event(shared_from_this(), nullptr, false);
- int64_t sampleStartNanos = Utils::startStatOpTime();
+ int64_t sampleStartNanos = startStatOpTime();
try {
m_listener->afterRegionLive(event);
} catch (const Exception& ex) {
@@ -118,11 +123,9 @@ void ThinClientHARegion::handleMarker() {
} catch (...) {
LOGERROR("Unknown exception in CacheListener::afterRegionLive");
}
- m_cacheImpl->m_cacheStats->incListenerCalls();
- Utils::updateStatOpTime(
- m_regionStats->getStat(),
- RegionStatType::getInstance()->getListenerCallTimeId(),
- sampleStartNanos);
+ m_cacheImpl->getCachePerfStats().incListenerCalls();
+ updateStatOpTime(m_regionStats->getStat(),
+ m_regionStats->getListenerCallTimeId(), sampleStartNanos);
m_regionStats->incListenerCallsCompleted();
}
m_processedMarker = true;
@@ -155,7 +158,8 @@ void ThinClientHARegion::addDisMessToQueue() {
if (poolDM->m_redundancyManager->m_globalProcessedMarker &&
!m_processedMarker) {
- TcrMessage* regionMsg = new TcrMessageClientMarker(true);
+ TcrMessage* regionMsg =
+ new TcrMessageClientMarker(m_cache->createDataOutput(), true);
receiveNotification(regionMsg);
}
}
@@ -164,7 +168,8 @@ void ThinClientHARegion::addDisMessToQueue() {
GfErrType ThinClientHARegion::getNoThrow_FullObject(EventIdPtr eventId,
CacheablePtr& fullObject,
VersionTagPtr& versionTag) {
- TcrMessageRequestEventValue fullObjectMsg(eventId);
+ TcrMessageRequestEventValue fullObjectMsg(m_cache->createDataOutput(),
+ eventId);
TcrMessageReply reply(true, nullptr);
ThinClientPoolHADM* poolHADM = dynamic_cast<ThinClientPoolHADM*>(m_tcrdm);
http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/ThinClientLocatorHelper.cpp
----------------------------------------------------------------------
diff --git a/src/cppcache/src/ThinClientLocatorHelper.cpp b/src/cppcache/src/ThinClientLocatorHelper.cpp
index bb67290..59075f5 100644
--- a/src/cppcache/src/ThinClientLocatorHelper.cpp
+++ b/src/cppcache/src/ThinClientLocatorHelper.cpp
@@ -65,8 +65,15 @@ Connector* ThinClientLocatorHelper::createConnection(Connector*& conn,
uint32_t waitSeconds,
int32_t maxBuffSizePool) {
Connector* socket = nullptr;
- if (DistributedSystem::getSystemProperties()->sslEnabled()) {
- socket = new TcpSslConn(hostname, port, waitSeconds, maxBuffSizePool);
+ auto& systemProperties = m_poolDM->getConnectionManager()
+ .getCacheImpl()
+ ->getDistributedSystem()
+ .getSystemProperties();
+ if (m_poolDM && systemProperties.sslEnabled()) {
+ socket = new TcpSslConn(hostname, port, waitSeconds, maxBuffSizePool,
+ systemProperties.sslTrustStore(),
+ systemProperties.sslKeyStore(),
+ systemProperties.sslKeystorePassword());
} else {
socket = new TcpConn(hostname, port, waitSeconds, maxBuffSizePool);
}
@@ -78,6 +85,11 @@ Connector* ThinClientLocatorHelper::createConnection(Connector*& conn,
GfErrType ThinClientLocatorHelper::getAllServers(
std::vector<ServerLocation>& servers, const std::string& serverGrp) {
ACE_Guard<ACE_Thread_Mutex> guard(m_locatorLock);
+
+ auto& sysProps = m_poolDM->getConnectionManager()
+ .getCacheImpl()
+ ->getDistributedSystem()
+ .getSystemProperties();
for (unsigned i = 0; i < m_locHostPort.size(); i++) {
ServerLocation loc = m_locHostPort[i];
try {
@@ -89,15 +101,14 @@ GfErrType ThinClientLocatorHelper::getAllServers(
}
Connector* conn = nullptr;
ConnectionWrapper cw(conn);
- createConnection(
- conn, loc.getServerName().c_str(), loc.getPort(),
- DistributedSystem::getSystemProperties()->connectTimeout(), buffSize);
+ createConnection(conn, loc.getServerName().c_str(), loc.getPort(),
+ sysProps.connectTimeout(), buffSize);
GetAllServersRequest request(serverGrp);
- DataOutput data;
- data.writeInt((int32_t)1001); // GOSSIPVERSION
- data.writeObject(&request);
+ auto data = m_poolDM->getConnectionManager().getCacheImpl()->getCache()->createDataOutput();
+ data->writeInt((int32_t)1001); // GOSSIPVERSION
+ data->writeObject(&request);
int sentLength = conn->send(
- (char*)(data.getBuffer()), data.getBufferLength(),
+ (char*)(data->getBuffer()), data->getBufferLength(),
m_poolDM ? (m_poolDM->getReadTimeout() / 1000) * 1000 * 1000
: 10 * 1000 * 1000,
0);
@@ -117,23 +128,23 @@ GfErrType ThinClientLocatorHelper::getAllServers(
continue;
}
- DataInput di(reinterpret_cast<uint8_t*>(buff), receivedLength);
+ auto di = m_poolDM->getConnectionManager().getCacheImpl()->getCache()->createDataInput(
+ reinterpret_cast<uint8_t*>(buff), receivedLength);
GetAllServersResponsePtr response(nullptr);
/* adongre
* SSL Enabled on Location and not in the client
*/
int8_t acceptanceCode;
- di.read(&acceptanceCode);
- if (acceptanceCode == REPLY_SSL_ENABLED &&
- !DistributedSystem::getSystemProperties()->sslEnabled()) {
+ di->read(&acceptanceCode);
+ if (acceptanceCode == REPLY_SSL_ENABLED && !sysProps.sslEnabled()) {
LOGERROR("SSL is enabled on locator, enable SSL in client as well");
throw AuthenticationRequiredException(
"SSL is enabled on locator, enable SSL in client as well");
}
- di.rewindCursor(1);
+ di->rewindCursor(1);
- di.readObject(response);
+ di->readObject(response);
servers = response->getServers();
return GF_NOERR;
} catch (const AuthenticationRequiredException&) {
@@ -154,6 +165,10 @@ GfErrType ThinClientLocatorHelper::getEndpointForNewCallBackConn(
/*const std::set<TcrEndpoint*>& exclEndPts,*/
const std::string& serverGrp) {
ACE_Guard<ACE_Thread_Mutex> guard(m_locatorLock);
+ auto& sysProps = m_poolDM->getConnectionManager()
+ .getCacheImpl()
+ ->getDistributedSystem()
+ .getSystemProperties();
int locatorsRetry = 3;
if (m_poolDM) {
int poolRetry = m_poolDM->getRetryAttempts();
@@ -183,20 +198,17 @@ GfErrType ThinClientLocatorHelper::getEndpointForNewCallBackConn(
}
Connector* conn = nullptr;
ConnectionWrapper cw(conn);
- createConnection(
- conn, loc.getServerName().c_str(), loc.getPort(),
- DistributedSystem::getSystemProperties()->connectTimeout(), buffSize);
+ createConnection(conn, loc.getServerName().c_str(), loc.getPort(),
+ sysProps.connectTimeout(), buffSize);
QueueConnectionRequest request(memId, exclEndPts, redundancy, false,
serverGrp);
- DataOutput data;
- data.writeInt((int32_t)1001); // GOSSIPVERSION
- data.writeObject(&request);
+ auto data = m_poolDM->getConnectionManager().getCacheImpl()->getCache()->createDataOutput();
+ data->writeInt((int32_t)1001); // GOSSIPVERSION
+ data->writeObject(&request);
int sentLength = conn->send(
- (char*)(data.getBuffer()), data.getBufferLength(),
- m_poolDM
- ? (m_poolDM->getReadTimeout() / 1000) * 1000 * 1000
- : DistributedSystem::getSystemProperties()->connectTimeout() *
- 1000 * 1000,
+ (char*)(data->getBuffer()), data->getBufferLength(),
+ m_poolDM ? (m_poolDM->getReadTimeout() / 1000) * 1000 * 1000
+ : sysProps.connectTimeout() * 1000 * 1000,
0);
if (sentLength <= 0) {
// conn->close(); delete conn; conn = nullptr;
@@ -205,32 +217,30 @@ GfErrType ThinClientLocatorHelper::getEndpointForNewCallBackConn(
char buff[BUFF_SIZE];
int receivedLength = conn->receive(
buff, BUFF_SIZE,
- m_poolDM
- ? (m_poolDM->getReadTimeout() / 1000) * 1000 * 1000
- : DistributedSystem::getSystemProperties()->connectTimeout() *
- 1000 * 1000,
+ m_poolDM ? (m_poolDM->getReadTimeout() / 1000) * 1000 * 1000
+ : sysProps.connectTimeout() * 1000 * 1000,
0);
// conn->close();
// delete conn; conn = nullptr;
if (receivedLength <= 0) {
continue;
}
- DataInput di(reinterpret_cast<uint8_t*>(buff), receivedLength);
+ auto di = m_poolDM->getConnectionManager().getCacheImpl()->getCache()->createDataInput(
+ reinterpret_cast<uint8_t*>(buff), receivedLength);
QueueConnectionResponsePtr response(nullptr);
/* adongre
* ssl defect
*/
int8_t acceptanceCode;
- di.read(&acceptanceCode);
- if (acceptanceCode == REPLY_SSL_ENABLED &&
- !DistributedSystem::getSystemProperties()->sslEnabled()) {
+ di->read(&acceptanceCode);
+ if (acceptanceCode == REPLY_SSL_ENABLED && !sysProps.sslEnabled()) {
LOGERROR("SSL is enabled on locator, enable SSL in client as well");
throw AuthenticationRequiredException(
"SSL is enabled on locator, enable SSL in client as well");
}
- di.rewindCursor(1);
- di.readObject(response);
+ di->rewindCursor(1);
+ di->readObject(response);
outEndpoint = response->getServers();
return GF_NOERR;
} catch (const AuthenticationRequiredException& excp) {
@@ -251,6 +261,11 @@ GfErrType ThinClientLocatorHelper::getEndpointForNewFwdConn(
bool locatorFound = false;
int locatorsRetry = 3;
ACE_Guard<ACE_Thread_Mutex> guard(m_locatorLock);
+ auto& sysProps = m_poolDM->getConnectionManager()
+ .getCacheImpl()
+ ->getDistributedSystem()
+ .getSystemProperties();
+
if (m_poolDM) {
int poolRetry = m_poolDM->getRetryAttempts();
locatorsRetry = poolRetry <= 0 ? locatorsRetry : poolRetry;
@@ -278,28 +293,25 @@ GfErrType ThinClientLocatorHelper::getEndpointForNewFwdConn(
}
Connector* conn = nullptr;
ConnectionWrapper cw(conn);
- createConnection(
- conn, serLoc.getServerName().c_str(), serLoc.getPort(),
- DistributedSystem::getSystemProperties()->connectTimeout(), buffSize);
- DataOutput data;
- data.writeInt(1001); // GOSSIPVERSION
+ createConnection(conn, serLoc.getServerName().c_str(), serLoc.getPort(),
+ sysProps.connectTimeout(), buffSize);
+ auto data = m_poolDM->getConnectionManager().getCacheImpl()->getCache()->createDataOutput();
+ data->writeInt(1001); // GOSSIPVERSION
if (currentServer == nullptr) {
LOGDEBUG("Creating ClientConnectionRequest");
ClientConnectionRequest request(exclEndPts, serverGrp);
- data.writeObject(&request);
+ data->writeObject(&request);
} else {
LOGDEBUG("Creating ClientReplacementRequest for connection: ",
currentServer->getEndpointObject()->name().c_str());
ClientReplacementRequest request(
currentServer->getEndpointObject()->name(), exclEndPts, serverGrp);
- data.writeObject(&request);
+ data->writeObject(&request);
}
int sentLength = conn->send(
- (char*)(data.getBuffer()), data.getBufferLength(),
- m_poolDM
- ? (m_poolDM->getReadTimeout() / 1000) * 1000 * 1000
- : DistributedSystem::getSystemProperties()->connectTimeout() *
- 1000 * 1000,
+ (char*)(data->getBuffer()), data->getBufferLength(),
+ m_poolDM ? (m_poolDM->getReadTimeout() / 1000) * 1000 * 1000
+ : sysProps.connectTimeout() * 1000 * 1000,
0);
if (sentLength <= 0) {
// conn->close();
@@ -309,33 +321,31 @@ GfErrType ThinClientLocatorHelper::getEndpointForNewFwdConn(
char buff[BUFF_SIZE];
int receivedLength = conn->receive(
buff, BUFF_SIZE,
- m_poolDM
- ? (m_poolDM->getReadTimeout() / 1000) * 1000 * 1000
- : DistributedSystem::getSystemProperties()->connectTimeout() *
- 1000 * 1000,
+ m_poolDM ? (m_poolDM->getReadTimeout() / 1000) * 1000 * 1000
+ : sysProps.connectTimeout() * 1000 * 1000,
0);
// conn->close();
// delete conn;
if (receivedLength <= 0) {
continue; // return GF_EUNDEF;
}
- DataInput di(reinterpret_cast<uint8_t*>(buff), receivedLength);
+ auto di = m_poolDM->getConnectionManager().getCacheImpl()->getCache()->createDataInput(
+ reinterpret_cast<uint8_t*>(buff), receivedLength);
ClientConnectionResponsePtr response;
/* adongre
* SSL is enabled on locator and not in the client
*/
int8_t acceptanceCode;
- di.read(&acceptanceCode);
- if (acceptanceCode == REPLY_SSL_ENABLED &&
- !DistributedSystem::getSystemProperties()->sslEnabled()) {
+ di->read(&acceptanceCode);
+ if (acceptanceCode == REPLY_SSL_ENABLED && !sysProps.sslEnabled()) {
LOGERROR("SSL is enabled on locator, enable SSL in client as well");
throw AuthenticationRequiredException(
"SSL is enabled on locator, enable SSL in client as well");
}
- di.rewindCursor(1);
+ di->rewindCursor(1);
- di.readObject(response);
+ di->readObject(response);
response->printInfo();
if (!response->serverFound()) {
LOGFINE("Server not found");
@@ -366,6 +376,11 @@ GfErrType ThinClientLocatorHelper::getEndpointForNewFwdConn(
GfErrType ThinClientLocatorHelper::updateLocators(
const std::string& serverGrp) {
ACE_Guard<ACE_Thread_Mutex> guard(m_locatorLock);
+ auto& sysProps = m_poolDM->getConnectionManager()
+ .getCacheImpl()
+ ->getDistributedSystem()
+ .getSystemProperties();
+
for (unsigned attempts = 0; attempts < m_locHostPort.size(); attempts++) {
ServerLocation serLoc = m_locHostPort[attempts];
Connector* conn = nullptr;
@@ -378,19 +393,16 @@ GfErrType ThinClientLocatorHelper::updateLocators(
serLoc.getServerName().c_str(), serLoc.getPort(),
serverGrp.c_str());
ConnectionWrapper cw(conn);
- createConnection(
- conn, serLoc.getServerName().c_str(), serLoc.getPort(),
- DistributedSystem::getSystemProperties()->connectTimeout(), buffSize);
+ createConnection(conn, serLoc.getServerName().c_str(), serLoc.getPort(),
+ sysProps.connectTimeout(), buffSize);
LocatorListRequest request(serverGrp);
- DataOutput data;
- data.writeInt((int32_t)1001); // GOSSIPVERSION
- data.writeObject(&request);
+ auto data = m_poolDM->getConnectionManager().getCacheImpl()->getCache()->createDataOutput();
+ data->writeInt((int32_t)1001); // GOSSIPVERSION
+ data->writeObject(&request);
int sentLength = conn->send(
- (char*)(data.getBuffer()), data.getBufferLength(),
- m_poolDM
- ? (m_poolDM->getReadTimeout() / 1000) * 1000 * 1000
- : DistributedSystem::getSystemProperties()->connectTimeout() *
- 1000 * 1000,
+ (char*)(data->getBuffer()), data->getBufferLength(),
+ m_poolDM ? (m_poolDM->getReadTimeout() / 1000) * 1000 * 1000
+ : sysProps.connectTimeout() * 1000 * 1000,
0);
if (sentLength <= 0) {
// conn->close();
@@ -401,33 +413,31 @@ GfErrType ThinClientLocatorHelper::updateLocators(
char buff[BUFF_SIZE];
int receivedLength = conn->receive(
buff, BUFF_SIZE,
- m_poolDM
- ? (m_poolDM->getReadTimeout() / 1000) * 1000 * 1000
- : DistributedSystem::getSystemProperties()->connectTimeout() *
- 1000 * 1000,
+ m_poolDM ? (m_poolDM->getReadTimeout() / 1000) * 1000 * 1000
+ : sysProps.connectTimeout() * 1000 * 1000,
0);
// conn->close();
// delete conn; conn = nullptr;
if (receivedLength <= 0) {
continue;
}
- DataInput di(reinterpret_cast<uint8_t*>(buff), receivedLength);
+ auto di = m_poolDM->getConnectionManager().getCacheImpl()->getCache()->createDataInput(
+ reinterpret_cast<uint8_t*>(buff), receivedLength);
auto response = std::make_shared<LocatorListResponse>();
/* adongre
* SSL Enabled on Location and not in the client
*/
int8_t acceptanceCode;
- di.read(&acceptanceCode);
- if (acceptanceCode == REPLY_SSL_ENABLED &&
- !DistributedSystem::getSystemProperties()->sslEnabled()) {
+ di->read(&acceptanceCode);
+ if (acceptanceCode == REPLY_SSL_ENABLED && !sysProps.sslEnabled()) {
LOGERROR("SSL is enabled on locator, enable SSL in client as well");
throw AuthenticationRequiredException(
"SSL is enabled on locator, enable SSL in client as well");
}
- di.rewindCursor(1);
+ di->rewindCursor(1);
- di.readObject(response);
+ di->readObject(response);
std::vector<ServerLocation> locators = response->getLocators();
if (locators.size() > 0) {
RandGen randGen;
http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/ThinClientPoolDM.cpp
----------------------------------------------------------------------
diff --git a/src/cppcache/src/ThinClientPoolDM.cpp b/src/cppcache/src/ThinClientPoolDM.cpp
index 8205ba6..3950e5c 100644
--- a/src/cppcache/src/ThinClientPoolDM.cpp
+++ b/src/cppcache/src/ThinClientPoolDM.cpp
@@ -35,9 +35,6 @@
using namespace apache::geode::client;
using namespace apache::geode::statistics;
-ExpiryTaskManager* getCacheImplExpiryTaskManager();
-void removePool(const char*);
-
/* adongre
* CID 28730: Other violation (MISSING_COPY)
* Class "GetAllWork" owns resources that are managed in its constructor and
@@ -81,7 +78,8 @@ class GetAllWork : public PooledWork<GfErrType>,
m_keys(keys),
m_region(region),
m_aCallbackArgument(aCallbackArgument) {
- m_request = new TcrMessageGetAll(region.get(), m_keys.get(), m_poolDM,
+ m_request = new TcrMessageGetAll(region->getCache()->createDataOutput(),
+ region.get(), m_keys.get(), m_poolDM,
m_aCallbackArgument);
m_reply = new TcrMessageReply(true, m_poolDM);
if (m_poolDM->isMultiUserMode()) {
@@ -159,17 +157,19 @@ ThinClientPoolDM::ThinClientPoolDM(const char* name,
if (firstGurd) ClientProxyMembershipID::increaseSynchCounter();
firstGurd = true;
- SystemProperties* sysProp = DistributedSystem::getSystemProperties();
+ auto& distributedSystem =
+ m_connManager.getCacheImpl()->getDistributedSystem();
+
+ auto& sysProp = distributedSystem.getSystemProperties();
// to set security flag at pool level
- this->m_isSecurityOn = sysProp->isSecurityOn();
+ this->m_isSecurityOn = sysProp.isSecurityOn();
ACE_TCHAR hostName[256];
ACE_OS::hostname(hostName, sizeof(hostName) - 1);
ACE_INET_Addr driver(hostName);
uint32_t hostAddr = driver.get_ip_address();
uint16_t hostPort = 0;
- const char* durableId =
- (sysProp != nullptr) ? sysProp->durableClientId() : nullptr;
+ const char* durableId = sysProp.durableClientId();
std::string poolSeparator = "_gem_";
std::string clientDurableId =
@@ -179,10 +179,11 @@ ThinClientPoolDM::ThinClientPoolDM(const char* name,
? (poolSeparator + m_poolName)
: "");
- const uint32_t durableTimeOut =
- (sysProp != nullptr) ? sysProp->durableTimeout() : 0;
- m_memId = new ClientProxyMembershipID(
- hostName, hostAddr, hostPort, clientDurableId.c_str(), durableTimeOut);
+ const uint32_t durableTimeOut = sysProp.durableTimeout();
+ m_memId =
+ m_connManager.getCacheImpl()->getClientProxyMembershipIDFactory().create(
+ hostName, hostAddr, hostPort, clientDurableId.c_str(),
+ durableTimeOut);
if (m_attrs->m_initLocList.size() == 0 &&
m_attrs->m_initServList.size() == 0) {
@@ -193,9 +194,12 @@ ThinClientPoolDM::ThinClientPoolDM(const char* name,
reset();
m_locHelper = new ThinClientLocatorHelper(m_attrs->m_initLocList, this);
- m_stats = new PoolStats(m_poolName.c_str());
+ auto statisticsManager = distributedSystem.getStatisticsManager();
+ m_stats =
+ new PoolStats(statisticsManager->getStatisticsFactory(), m_poolName);
+ statisticsManager->forceSample();
- if (!sysProp->isEndpointShufflingDisabled()) {
+ if (!sysProp.isEndpointShufflingDisabled()) {
if (m_attrs->m_initServList.size() > 0) {
RandGen randgen;
m_server = randgen(static_cast<uint32_t>(m_attrs->m_initServList.size()));
@@ -210,26 +214,26 @@ ThinClientPoolDM::ThinClientPoolDM(const char* name,
void ThinClientPoolDM::init() {
LOGDEBUG("ThinClientPoolDM::init: Starting pool initialization");
- SystemProperties* sysProp = DistributedSystem::getSystemProperties();
+ auto& sysProp = m_connManager.getCacheImpl()
+ ->getDistributedSystem()
+ .getSystemProperties();
m_isMultiUserMode = this->getMultiuserAuthentication();
if (m_isMultiUserMode) {
LOGINFO("Multiuser authentication is enabled for pool %s",
m_poolName.c_str());
}
// to set security flag at pool level
- this->m_isSecurityOn = sysProp->isSecurityOn();
+ this->m_isSecurityOn = sysProp.isSecurityOn();
LOGDEBUG("ThinClientPoolDM::init: security in on/off = %d ",
this->m_isSecurityOn);
m_connManager.init(true);
- SystemProperties* props = DistributedSystem::getSystemProperties();
-
LOGDEBUG("ThinClientPoolDM::init: is grid client = %d ",
- props->isGridClient());
+ sysProp.isGridClient());
- if (!props->isGridClient()) {
+ if (!sysProp.isGridClient()) {
ThinClientPoolDM::startBackgroundThreads();
}
@@ -237,20 +241,22 @@ void ThinClientPoolDM::init() {
}
PropertiesPtr ThinClientPoolDM::getCredentials(TcrEndpoint* ep) {
- PropertiesPtr tmpSecurityProperties =
- DistributedSystem::getSystemProperties()->getSecurityProperties();
+ const auto& distributedSystem =
+ m_connManager.getCacheImpl()->getDistributedSystem();
+ const auto& tmpSecurityProperties =
+ distributedSystem.getSystemProperties().getSecurityProperties();
- AuthInitializePtr authInitialize = DistributedSystem::m_impl->getAuthLoader();
-
- if (authInitialize != nullptr) {
+ if (const auto& authInitialize = distributedSystem.m_impl->getAuthLoader()) {
LOGFINER(
"ThinClientPoolDM::getCredentials: acquired handle to authLoader, "
"invoking getCredentials %s",
ep->name().c_str());
- PropertiesPtr tmpAuthIniSecurityProperties = authInitialize->getCredentials(
+ const auto& tmpAuthIniSecurityProperties = authInitialize->getCredentials(
tmpSecurityProperties, ep->name().c_str());
+ LOGFINER("Done getting credentials");
return tmpAuthIniSecurityProperties;
}
+
return nullptr;
}
@@ -260,9 +266,11 @@ void ThinClientPoolDM::startBackgroundThreads() {
NC_Ping_Thread);
m_pingTask->start();
- SystemProperties* props = DistributedSystem::getSystemProperties();
+ auto& props = m_connManager.getCacheImpl()
+ ->getDistributedSystem()
+ .getSystemProperties();
- if (props->onClientDisconnectClearPdxTypeIds() == true) {
+ if (props.onClientDisconnectClearPdxTypeIds() == true) {
m_cliCallbackTask =
new Task<ThinClientPoolDM>(this, &ThinClientPoolDM::cliCallback);
m_cliCallbackTask->start();
@@ -277,8 +285,9 @@ void ThinClientPoolDM::startBackgroundThreads() {
LOGDEBUG(
"ThinClientPoolDM::startBackgroundThreads: Scheduling ping task at %ld",
pingInterval);
- m_pingTaskId = getCacheImplExpiryTaskManager()->scheduleExpiryTask(
- pingHandler, 1, pingInterval, false);
+ m_pingTaskId =
+ m_connManager.getCacheImpl()->getExpiryTaskManager().scheduleExpiryTask(
+ pingHandler, 1, pingInterval, false);
} else {
LOGDEBUG(
"ThinClientPoolDM::startBackgroundThreads: Not Scheduling ping task as "
@@ -306,7 +315,7 @@ void ThinClientPoolDM::startBackgroundThreads() {
"task at %ld",
updateLocatorListInterval);
m_updateLocatorListTaskId =
- getCacheImplExpiryTaskManager()->scheduleExpiryTask(
+ m_connManager.getCacheImpl()->getExpiryTaskManager().scheduleExpiryTask(
updateLocatorListHandler, 1, updateLocatorListInterval, false);
}
@@ -337,8 +346,9 @@ void ThinClientPoolDM::startBackgroundThreads() {
LOGDEBUG(
"ThinClientPoolDM::startBackgroundThreads: Scheduling "
"manageConnections task");
- m_connManageTaskId = getCacheImplExpiryTaskManager()->scheduleExpiryTask(
- connHandler, 1, idle / 1000 + 1, false);
+ m_connManageTaskId =
+ m_connManager.getCacheImpl()->getExpiryTaskManager().scheduleExpiryTask(
+ connHandler, 1, idle / 1000 + 1, false);
}
LOGDEBUG(
@@ -352,7 +362,7 @@ void ThinClientPoolDM::startBackgroundThreads() {
LOGDEBUG(
"ThinClientPoolDM::startBackgroundThreads: Starting pool stat sampler");
if (m_PoolStatsSampler == nullptr && getStatisticInterval() > -1 &&
- DistributedSystem::getSystemProperties()->statisticsEnabled()) {
+ props.statisticsEnabled()) {
m_PoolStatsSampler = new PoolStatsSampler(
getStatisticInterval() / 1000 + 1, m_connManager.getCacheImpl(), this);
m_PoolStatsSampler->start();
@@ -470,7 +480,7 @@ void ThinClientPoolDM::cleanStaleConnections(volatile bool& isRunning) {
}
}
if (m_connManageTaskId >= 0 && isRunning &&
- getCacheImplExpiryTaskManager()->resetTask(
+ m_connManager.getCacheImpl()->getExpiryTaskManager().resetTask(
m_connManageTaskId, static_cast<uint32_t>(_nextIdle.sec() + 1))) {
LOGERROR("Failed to reschedule connection manager");
} else {
@@ -619,7 +629,7 @@ GfErrType ThinClientPoolDM::sendRequestToAllServers(
int feIndex = 0;
FunctionExecution* fePtrList = new FunctionExecution[csArray->length()];
- ThreadPool* threadPool = TPSingleton::instance();
+ auto* threadPool = m_connManager.getCacheImpl()->getThreadPool();
UserAttributesPtr userAttr =
TSSUserAttributesWrapper::s_geodeTSSUserAttributes->getUserAttributes();
for (int i = 0; i < csArray->length(); i++) {
@@ -746,7 +756,8 @@ void ThinClientPoolDM::stopPingThread() {
m_pingTask->wait();
GF_SAFE_DELETE(m_pingTask);
if (m_pingTaskId >= 0) {
- getCacheImplExpiryTaskManager()->cancelTask(m_pingTaskId);
+ m_connManager.getCacheImpl()->getExpiryTaskManager().cancelTask(
+ m_pingTaskId);
}
}
}
@@ -759,7 +770,8 @@ void ThinClientPoolDM::stopUpdateLocatorListThread() {
m_updateLocatorListTask->wait();
GF_SAFE_DELETE(m_updateLocatorListTask);
if (m_updateLocatorListTaskId >= 0) {
- getCacheImplExpiryTaskManager()->cancelTask(m_updateLocatorListTaskId);
+ m_connManager.getCacheImpl()->getExpiryTaskManager().cancelTask(
+ m_updateLocatorListTaskId);
}
}
}
@@ -798,7 +810,8 @@ void ThinClientPoolDM::destroy(bool keepAlive) {
m_connManageTask->wait();
GF_SAFE_DELETE(m_connManageTask);
if (m_connManageTaskId >= 0) {
- getCacheImplExpiryTaskManager()->cancelTask(m_connManageTaskId);
+ m_connManager.getCacheImpl()->getExpiryTaskManager().cancelTask(
+ m_connManageTaskId);
}
}
@@ -827,12 +840,17 @@ void ThinClientPoolDM::destroy(bool keepAlive) {
// Close Stats
getStats().close();
+ m_connManager.getCacheImpl()
+ ->getDistributedSystem()
+ .getStatisticsManager()
+ ->forceSample();
if (m_clientMetadataService != nullptr) {
GF_SAFE_DELETE(m_clientMetadataService);
}
- removePool(m_poolName.c_str());
+ m_connManager.getCacheImpl()->getCache()->getPoolManager().removePool(
+ m_poolName.c_str());
stopChunkProcessor();
m_manager->closeAllStickyConnections();
@@ -868,9 +886,11 @@ QueryServicePtr ThinClientPoolDM::getQueryServiceWithoutCheck() {
if (!(m_remoteQueryServicePtr == nullptr)) {
return m_remoteQueryServicePtr;
}
- SystemProperties* props = DistributedSystem::getSystemProperties();
+ auto& props = m_connManager.getCacheImpl()
+ ->getDistributedSystem()
+ .getSystemProperties();
- if (props->isGridClient()) {
+ if (props.isGridClient()) {
LOGWARN("Initializing query service while grid-client setting is enabled.");
// Init Query Service
m_remoteQueryServicePtr = std::make_shared<RemoteQueryService>(
@@ -895,7 +915,9 @@ void ThinClientPoolDM::sendUserCacheCloseMessage(bool keepAlive) {
for (it = uca.begin(); it != uca.end(); it++) {
UserConnectionAttributes* uca = (*it).second;
if (uca->isAuthenticated() && uca->getEndpoint()->connected()) {
- TcrMessageRemoveUserAuth request(keepAlive, this);
+ TcrMessageRemoveUserAuth request(
+ m_connManager.getCacheImpl()->getCache()->createDataOutput(),
+ keepAlive, this);
TcrMessageReply reply(true, this);
sendRequestToEP(request, reply, uca->getEndpoint());
@@ -927,7 +949,9 @@ int32_t ThinClientPoolDM::GetPDXIdForType(SerializablePtr pdxType) {
GfErrType err = GF_NOERR;
- TcrMessageGetPdxIdForType request(pdxType, this);
+ TcrMessageGetPdxIdForType request(
+ m_connManager.getCacheImpl()->getCache()->createDataOutput(), pdxType,
+ this);
TcrMessageReply reply(true, this);
@@ -946,7 +970,9 @@ int32_t ThinClientPoolDM::GetPDXIdForType(SerializablePtr pdxType) {
// need to broadcast this id to all other pool
{
- for (const auto& iter : PoolManager::getAll()) {
+ auto& poolManager =
+ m_connManager.getCacheImpl()->getCache()->getPoolManager();
+ for (const auto& iter : poolManager.getAll()) {
auto currPool = static_cast<ThinClientPoolDM*>(iter.second.get());
if (currPool != this) {
@@ -963,7 +989,9 @@ void ThinClientPoolDM::AddPdxType(SerializablePtr pdxType, int32_t pdxTypeId) {
GfErrType err = GF_NOERR;
- TcrMessageAddPdxType request(pdxType, this, pdxTypeId);
+ TcrMessageAddPdxType request(
+ m_connManager.getCacheImpl()->getCache()->createDataOutput(), pdxType,
+ this, pdxTypeId);
TcrMessageReply reply(true, this);
@@ -983,7 +1011,9 @@ SerializablePtr ThinClientPoolDM::GetPDXTypeById(int32_t typeId) {
GfErrType err = GF_NOERR;
- TcrMessageGetPdxTypeById request(typeId, this);
+ TcrMessageGetPdxTypeById request(
+ m_connManager.getCacheImpl()->getCache()->createDataOutput(), typeId,
+ this);
TcrMessageReply reply(true, this);
@@ -1005,7 +1035,9 @@ int32_t ThinClientPoolDM::GetEnumValue(SerializablePtr enumInfo) {
GfErrType err = GF_NOERR;
- TcrMessageGetPdxIdForEnum request(enumInfo, this);
+ TcrMessageGetPdxIdForEnum request(
+ m_connManager.getCacheImpl()->getCache()->createDataOutput(), enumInfo,
+ this);
TcrMessageReply reply(true, this);
@@ -1024,7 +1056,9 @@ int32_t ThinClientPoolDM::GetEnumValue(SerializablePtr enumInfo) {
// need to broadcast this id to all other pool
{
- for (const auto& iter : PoolManager::getAll()) {
+ auto& poolManager =
+ m_connManager.getCacheImpl()->getCache()->getPoolManager();
+ for (const auto& iter : poolManager.getAll()) {
const auto& currPool =
std::dynamic_pointer_cast<ThinClientPoolDM>(iter.second);
@@ -1042,7 +1076,8 @@ SerializablePtr ThinClientPoolDM::GetEnum(int32_t val) {
GfErrType err = GF_NOERR;
- TcrMessageGetPdxEnumById request(val, this);
+ TcrMessageGetPdxEnumById request(
+ m_connManager.getCacheImpl()->getCache()->createDataOutput(), val, this);
TcrMessageReply reply(true, this);
@@ -1064,7 +1099,9 @@ void ThinClientPoolDM::AddEnum(SerializablePtr enumInfo, int enumVal) {
GfErrType err = GF_NOERR;
- TcrMessageAddPdxEnum request(enumInfo, this, enumVal);
+ TcrMessageAddPdxEnum request(
+ m_connManager.getCacheImpl()->getCache()->createDataOutput(), enumInfo,
+ this, enumVal);
TcrMessageReply reply(true, this);
@@ -1087,7 +1124,9 @@ GfErrType ThinClientPoolDM::sendUserCredentials(PropertiesPtr credentials,
GfErrType err = GF_NOERR;
- TcrMessageUserCredential request(credentials, this);
+ TcrMessageUserCredential request(
+ m_connManager.getCacheImpl()->getCache()->createDataOutput(), credentials,
+ this);
TcrMessageReply reply(true, this);
@@ -1257,7 +1296,7 @@ GfErrType ThinClientPoolDM::sendSyncRequest(TcrMessage& request,
nullptr);
}
std::vector<GetAllWork*> getAllWorkers;
- ThreadPool* threadPool = TPSingleton::instance();
+ auto* threadPool = m_connManager.getCacheImpl()->getThreadPool();
ChunkedGetAllResponse* responseHandler =
static_cast<ChunkedGetAllResponse*>(reply.getChunkedResultHandler());
@@ -1719,10 +1758,12 @@ GfErrType ThinClientPoolDM::createPoolConnectionToAEndPoint(
"connection to the endpoint %s",
theEP->name().c_str());
// if the pool size is within limits, create a new connection.
- error = theEP->createNewConnection(
- conn, false, false,
- DistributedSystem::getSystemProperties()->connectTimeout(), false, true,
- appThreadrequest);
+ error = theEP->createNewConnection(conn, false, false,
+ m_connManager.getCacheImpl()
+ ->getDistributedSystem()
+ .getSystemProperties()
+ .connectTimeout(),
+ false, true, appThreadrequest);
if (conn == nullptr || error != GF_NOERR) {
LOGFINE("2Failed to connect to %s", theEP->name().c_str());
if (conn != nullptr) GF_SAFE_DELETE(conn);
@@ -1801,9 +1842,12 @@ GfErrType ThinClientPoolDM::createPoolConnection(
conn->updateCreationTime();
break;
} else {
- error = ep->createNewConnection(
- conn, false, false,
- DistributedSystem::getSystemProperties()->connectTimeout(), false);
+ error = ep->createNewConnection(conn, false, false,
+ m_connManager.getCacheImpl()
+ ->getDistributedSystem()
+ .getSystemProperties()
+ .connectTimeout(),
+ false);
}
if (conn == nullptr || error != GF_NOERR) {
@@ -1854,14 +1898,20 @@ TcrConnection* ThinClientPoolDM::getConnectionFromQueue(
getStats().incWaitingConnections();
/*get the start time for connectionWaitTime stat*/
- int64_t sampleStartNanos = Utils::startStatOpTime();
+ bool enableTimeStatistics = m_connManager.getCacheImpl()
+ ->getDistributedSystem()
+ .getSystemProperties()
+ .getEnableTimeStatistics();
+ int64_t sampleStartNanos =
+ enableTimeStatistics ? Utils::startStatOpTime() : 0;
TcrConnection* mp =
getUntil(timeoutTime, error, excludeServers, maxConnLimit);
/*Update the time stat for clientOpsTime */
- Utils::updateStatOpTime(
- getStats().getStats(),
- PoolStatType::getInstance()->getTotalWaitingConnTimeId(),
- sampleStartNanos);
+ if (enableTimeStatistics) {
+ Utils::updateStatOpTime(getStats().getStats(),
+ getStats().getTotalWaitingConnTimeId(),
+ sampleStartNanos);
+ }
return mp;
}
@@ -1892,9 +1942,13 @@ GfErrType ThinClientPoolDM::sendRequestToEP(const TcrMessage& request,
LOGDEBUG(
"ThinClientPoolDM::sendRequestToEP(): couldnt create a pool "
"connection, creating a temporary connection.");
- error = currentEndpoint->createNewConnection(
- conn, false, false,
- DistributedSystem::getSystemProperties()->connectTimeout(), false);
+ error =
+ currentEndpoint->createNewConnection(conn, false, false,
+ m_connManager.getCacheImpl()
+ ->getDistributedSystem()
+ .getSystemProperties()
+ .connectTimeout(),
+ false);
putConnInPool = false;
currentEndpoint->setConnectionStatus(true);
}
@@ -2076,7 +2130,7 @@ int ThinClientPoolDM::updateLocatorList(volatile bool& isRunning) {
LOGFINE("Starting updateLocatorList thread for pool %s", m_poolName.c_str());
while (isRunning) {
m_updateLocatorListSema.acquire();
- if (isRunning && !TcrConnectionManager::isNetDown) {
+ if (isRunning && !m_connManager.isNetDown()) {
((ThinClientLocatorHelper*)m_locHelper)
->updateLocators(this->getServerGroup());
}
@@ -2089,7 +2143,7 @@ int ThinClientPoolDM::pingServer(volatile bool& isRunning) {
LOGFINE("Starting ping thread for pool %s", m_poolName.c_str());
while (isRunning) {
m_pingSema.acquire();
- if (isRunning && !TcrConnectionManager::isNetDown) {
+ if (isRunning && !m_connManager.isNetDown()) {
pingServerLocal();
while (m_pingSema.tryacquire() != -1) {
;
@@ -2107,9 +2161,10 @@ int ThinClientPoolDM::cliCallback(volatile bool& isRunning) {
if (isRunning) {
LOGFINE("Clearing Pdx Type Registry");
// this call for csharp client
- DistributedSystemImpl::CallCliCallBack();
+ DistributedSystemImpl::CallCliCallBack(
+ *(m_connManager.getCacheImpl()->getCache()));
// this call for cpp client
- PdxTypeRegistry::clear();
+ m_connManager.getCacheImpl()->getPdxTypeRegistry()->clear();
while (m_cliCallbackSema.tryacquire() != -1) {
;
}
@@ -2317,7 +2372,8 @@ void ThinClientPoolDM::updateNotificationStats(bool isDeltaSuccess,
GfErrType ThinClientPoolDM::doFailover(TcrConnection* conn) {
m_manager->setStickyConnection(conn, true);
- TcrMessageTxFailover request;
+ TcrMessageTxFailover request(
+ m_connManager.getCacheImpl()->getCache()->createDataOutput());
TcrMessageReply reply(true, nullptr);
GfErrType err = this->sendSyncRequest(request, reply);
http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/ThinClientPoolDM.hpp
----------------------------------------------------------------------
diff --git a/src/cppcache/src/ThinClientPoolDM.hpp b/src/cppcache/src/ThinClientPoolDM.hpp
index 73802f7..1da32d8 100644
--- a/src/cppcache/src/ThinClientPoolDM.hpp
+++ b/src/cppcache/src/ThinClientPoolDM.hpp
@@ -119,16 +119,13 @@ class ThinClientPoolDM
virtual ~ThinClientPoolDM() {
destroy();
- GF_SAFE_DELETE(m_memId);
GF_SAFE_DELETE(m_locHelper);
GF_SAFE_DELETE(m_stats);
GF_SAFE_DELETE(m_clientMetadataService);
GF_SAFE_DELETE(m_manager);
}
// void updateQueue(const char* regionPath) ;
- ClientProxyMembershipID* getMembershipId() {
- return (ClientProxyMembershipID*)m_memId;
- }
+ ClientProxyMembershipID* getMembershipId() { return m_memId.get(); }
virtual void processMarker(){};
virtual bool checkDupAndAdd(EventIdPtr eventid) {
return m_connManager.checkDupAndAdd(eventid);
@@ -386,7 +383,8 @@ class ThinClientPoolDM
std::string selectEndpoint(std::set<ServerLocation>&,
const TcrConnection* currentServer = nullptr);
- volatile ClientProxyMembershipID* m_memId;
+ // TODO global - m_memId was volatile
+ std::unique_ptr<ClientProxyMembershipID> m_memId;
virtual TcrEndpoint* createEP(const char* endpointName) {
return new TcrPoolEndPoint(endpointName, m_connManager.getCacheImpl(),
m_connManager.m_failoverSema,
@@ -491,7 +489,11 @@ class FunctionExecution : public PooledWork<GfErrType> {
if (m_userAttr != nullptr) gua.setProxyCache(m_userAttr->getProxyCache());
std::string funcName(m_func);
- TcrMessageExecuteFunction request(funcName, m_args, m_getResult, m_poolDM,
+ TcrMessageExecuteFunction request(m_poolDM->getConnectionManager()
+ .getCacheImpl()
+ ->getCache()
+ ->createDataOutput(),
+ funcName, m_args, m_getResult, m_poolDM,
m_timeout);
TcrMessageReply reply(true, m_poolDM);
ChunkedFunctionExecutionResponse* resultProcessor(
@@ -509,29 +511,7 @@ class FunctionExecution : public PooledWork<GfErrType> {
m_error = m_poolDM->handleEPError(m_ep, reply, m_error);
if (m_error != GF_NOERR) {
if (m_error == GF_NOTCON || m_error == GF_IOERR) {
- /*
- ==25848== 650 (72 direct, 578 indirect) bytes in 2 blocks are definitely
- lost in loss record 184 of 218
- ==25848== at 0x4007D75: operator new(unsigned int)
- (vg_replace_malloc.c:313)
- ==25848== by 0x439BD41:
- apache::geode::client::FunctionExecution::execute()
- (ThinClientPoolDM.hpp:417)
- ==25848== by 0x439A5A1:
- apache::geode::client::PooledWork<GfErrType>::call()
- (ThreadPool.hpp:25)
- ==25848== by 0x43C335F:
- apache::geode::client::ThreadPoolWorker::svc()
- (ThreadPool.cpp:43)
- ==25848== by 0x440521D: ACE_6_1_0::ACE_Task_Base::svc_run(void*) (in
- /export/pnq-gst-dev01a/users/adongre/cedar_dev_Nov12/build-artifacts/linux/product/lib/libgfcppcache.so)
- ==25848== by 0x441E16A: ACE_6_1_0::ACE_Thread_Adapter::invoke_i() (in
- /export/pnq-gst-dev01a/users/adongre/cedar_dev_Nov12/build-artifacts/linux/product/lib/libgfcppcache.so)
- ==25848== by 0x441E307: ACE_6_1_0::ACE_Thread_Adapter::invoke() (in
- /export/pnq-gst-dev01a/users/adongre/cedar_dev_Nov12/build-artifacts/linux/product/lib/libgfcppcache.so)
- ==25848== by 0x8CFA48: start_thread (in /lib/libpthread-2.12.so)
- ==25848== by 0x34BE1D: clone (in /lib/libc-2.12.so)
- */
+
delete resultProcessor;
resultProcessor = nullptr;
return GF_NOERR; // if server is unavailable its not an error for
@@ -543,26 +523,7 @@ class FunctionExecution : public PooledWork<GfErrType> {
if (reply.getMessageType() == TcrMessage::EXCEPTION) {
exceptionPtr = CacheableString::create(reply.getException());
}
- /**
- * ==13294== 48,342 (1,656 direct, 46,686 indirect) bytes in 46 blocks are
-definitely lost in loss record 241 of 244
-==13294== at 0x4007D75: operator new(unsigned int) (vg_replace_malloc.c:313)
-==13294== by 0x439BE11: apache::geode::client::FunctionExecution::execute()
-(ThinClientPoolDM.hpp:417)
-==13294== by 0x439A671: apache::geode::client::PooledWork<GfErrType>::call()
-(ThreadPool.hpp:25)
-==13294== by 0x43C33FF: apache::geode::client::ThreadPoolWorker::svc()
-(ThreadPool.cpp:43)
-==13294== by 0x44052BD: ACE_6_1_0::ACE_Task_Base::svc_run(void*) (in
-/export/pnq-gst-dev01a/users/adongre/cedar_dev_Nov12/build-artifacts/linux/product/lib/libgfcppcache.so)
-==13294== by 0x441E20A: ACE_6_1_0::ACE_Thread_Adapter::invoke_i() (in
-/export/pnq-gst-dev01a/users/adongre/cedar_dev_Nov12/build-artifacts/linux/product/lib/libgfcppcache.so)
-==13294== by 0x441E3A7: ACE_6_1_0::ACE_Thread_Adapter::invoke() (in
-/export/pnq-gst-dev01a/users/adongre/cedar_dev_Nov12/build-artifacts/linux/product/lib/libgfcppcache.so)
-==13294== by 0x8CFA48: start_thread (in /lib/libpthread-2.12.so)
-==13294== by 0x34BE1D: clone (in /lib/libc-2.12.so)
- *
- */
+
delete resultProcessor;
resultProcessor = nullptr;
return m_error;
@@ -573,13 +534,7 @@ definitely lost in loss record 241 of 244
exceptionPtr = CacheableString::create(reply.getException());
}
if (resultProcessor->getResult() == true) {
- // CacheableVectorPtr values =
- // resultProcessor->getFunctionExecutionResults();
- // ACE_Guard< ACE_Recursive_Thread_Mutex > guard(
- // *m_resultCollectorLock );
- // //(*m_rc)->addResult(values);
- // ExecutionImpl::addResults(*m_rc,values);
- // resultProcessor->reset();
+
}
delete resultProcessor;
resultProcessor = nullptr;
@@ -629,6 +584,10 @@ class OnRegionFunctionExecution : public PooledWork<GfErrType> {
std::string funcName(m_func);
m_request = new TcrMessageExecuteRegionFunctionSingleHop(
+ m_poolDM->getConnectionManager()
+ .getCacheImpl()
+ ->getCache()
+ ->createDataOutput(),
funcName, m_region, m_args, m_routingObj, m_getResult, nullptr,
m_allBuckets, timeout, m_poolDM);
m_reply = new TcrMessageReply(true, m_poolDM);
http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/ThinClientPoolHADM.cpp
----------------------------------------------------------------------
diff --git a/src/cppcache/src/ThinClientPoolHADM.cpp b/src/cppcache/src/ThinClientPoolHADM.cpp
index b18aeeb..6c44296 100644
--- a/src/cppcache/src/ThinClientPoolHADM.cpp
+++ b/src/cppcache/src/ThinClientPoolHADM.cpp
@@ -40,9 +40,11 @@ void ThinClientPoolHADM::init() {
}
void ThinClientPoolHADM::startBackgroundThreads() {
- SystemProperties* props = DistributedSystem::getSystemProperties();
+ auto& props = m_connManager.getCacheImpl()
+ ->getDistributedSystem()
+ .getSystemProperties();
- if (props->isGridClient()) {
+ if (props.isGridClient()) {
LOGWARN("Starting background threads and ignoring grid-client setting");
ThinClientPoolDM::startBackgroundThreads();
}
@@ -55,10 +57,11 @@ void ThinClientPoolHADM::startBackgroundThreads() {
ACE_Event_Handler* redundancyChecker =
new ExpiryHandler_T<ThinClientPoolHADM>(
this, &ThinClientPoolHADM::checkRedundancy);
- int32_t redundancyMonitorInterval = props->redundancyMonitorInterval();
+ int32_t redundancyMonitorInterval = props.redundancyMonitorInterval();
- m_servermonitorTaskId = CacheImpl::expiryTaskManager->scheduleExpiryTask(
- redundancyChecker, 1, redundancyMonitorInterval, false);
+ m_servermonitorTaskId =
+ m_connManager.getCacheImpl()->getExpiryTaskManager().scheduleExpiryTask(
+ redundancyChecker, 1, redundancyMonitorInterval, false);
LOGFINE(
"ThinClientPoolHADM::ThinClientPoolHADM Registered server "
"monitor task with id = %ld, interval = %ld",
@@ -133,7 +136,7 @@ GfErrType ThinClientPoolHADM::sendSyncRequestCq(TcrMessage& request,
bool ThinClientPoolHADM::preFailoverAction() { return true; }
bool ThinClientPoolHADM::postFailoverAction(TcrEndpoint* endpoint) {
- m_theTcrConnManager.triggerRedundancyThread();
+ m_connManager.triggerRedundancyThread();
return true;
}
@@ -141,7 +144,7 @@ int ThinClientPoolHADM::redundancy(volatile bool& isRunning) {
LOGFINE("ThinClientPoolHADM: Starting maintain redundancy thread.");
while (isRunning) {
m_redundancySema.acquire();
- if (isRunning && !TcrConnectionManager::isNetDown) {
+ if (isRunning && !m_connManager.isNetDown()) {
m_redundancyManager->maintainRedundancyLevel();
while (m_redundancySema.tryacquire() != -1) {
;
@@ -183,7 +186,8 @@ void ThinClientPoolHADM::destroy(bool keepAlive) {
void ThinClientPoolHADM::sendNotificationCloseMsgs() {
if (m_redundancyTask) {
if (m_servermonitorTaskId >= 0) {
- CacheImpl::expiryTaskManager->cancelTask(m_servermonitorTaskId);
+ m_connManager.getCacheImpl()->getExpiryTaskManager().cancelTask(
+ m_servermonitorTaskId);
}
m_redundancyTask->stopNoblock();
m_redundancySema.release();
@@ -193,21 +197,6 @@ void ThinClientPoolHADM::sendNotificationCloseMsgs() {
}
}
-/*
-void ThinClientPoolHADM::stopNotificationThreads()
-{
- ACE_Guard< ACE_Recursive_Thread_Mutex > guard( m_endpointsLock );
- for( ACE_Map_Manager< std::string, TcrEndpoint *, ACE_Recursive_Thread_Mutex
->::iterator it = m_endpoints.begin(); it != m_endpoints.end(); it++){
- ((*it).int_id_)->stopNoBlock();
- }
- for( ACE_Map_Manager< std::string, TcrEndpoint *, ACE_Recursive_Thread_Mutex
->::iterator it = m_endpoints.begin(); it != m_endpoints.end(); it++){
- ((*it).int_id_)->stopNotifyReceiverAndCleanup();
- }
-}
-*/
-
GfErrType ThinClientPoolHADM::registerInterestAllRegions(
TcrEndpoint* ep, const TcrMessage* request, TcrMessageReply* reply) {
GfErrType err = GF_NOERR;
@@ -248,12 +237,14 @@ void ThinClientPoolHADM::removeRegion(ThinClientRegion* theTCR) {
}
void ThinClientPoolHADM::readyForEvents() {
- if (!DistributedSystem::getSystemProperties()->autoReadyForEvents()) {
+ auto& sysProp = m_connManager.getCacheImpl()
+ ->getDistributedSystem()
+ .getSystemProperties();
+ if (!sysProp.autoReadyForEvents()) {
init();
}
- const char* durable =
- DistributedSystem::getSystemProperties()->durableClientId();
+ const char* durable = sysProp.durableClientId();
if (durable != nullptr && strlen(durable) > 0) {
m_redundancyManager->readyForEvents();
http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/ThinClientPoolHADM.hpp
----------------------------------------------------------------------
diff --git a/src/cppcache/src/ThinClientPoolHADM.hpp b/src/cppcache/src/ThinClientPoolHADM.hpp
index 9783293..ab5dc78 100644
--- a/src/cppcache/src/ThinClientPoolHADM.hpp
+++ b/src/cppcache/src/ThinClientPoolHADM.hpp
@@ -110,8 +110,8 @@ class ThinClientPoolHADM : public ThinClientPoolDM {
// Disallow copy constructor and assignment operator.
ThinClientRedundancyManager* m_redundancyManager;
ThinClientPoolHADM(const ThinClientPoolHADM&);
- ThinClientPoolHADM& operator=(const ThinClientPoolHADM&);
- // const char* m_name; // COVERITY -> 30305 Uninitialized pointer field
+ ThinClientPoolHADM& operator=(const ThinClientPoolHADM&) = delete;
+
TcrConnectionManager& m_theTcrConnManager;
ACE_Semaphore m_redundancySema;
Task<ThinClientPoolHADM>* m_redundancyTask;
http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/ThinClientPoolRegion.cpp
----------------------------------------------------------------------
diff --git a/src/cppcache/src/ThinClientPoolRegion.cpp b/src/cppcache/src/ThinClientPoolRegion.cpp
index 834520c..b888e1c 100644
--- a/src/cppcache/src/ThinClientPoolRegion.cpp
+++ b/src/cppcache/src/ThinClientPoolRegion.cpp
@@ -40,7 +40,10 @@ ThinClientPoolRegion::~ThinClientPoolRegion() { m_tcrdm = nullptr; }
void ThinClientPoolRegion::initTCR() {
try {
ThinClientPoolDM* poolDM = dynamic_cast<ThinClientPoolDM*>(
- PoolManager::find(m_regionAttributes->getPoolName()).get());
+ getCache()
+ ->getPoolManager()
+ .find(m_regionAttributes->getPoolName())
+ .get());
m_tcrdm = dynamic_cast<ThinClientBaseDM*>(poolDM);
if (!m_tcrdm) {
// TODO: create a PoolNotFound exception.