You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ec...@apache.org on 2017/08/10 15:20:17 UTC
[09/27] geode-native git commit: GEODE-2729: Remove global variables
http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/SerializationRegistry.cpp
----------------------------------------------------------------------
diff --git a/src/cppcache/src/SerializationRegistry.cpp b/src/cppcache/src/SerializationRegistry.cpp
index a4e593c..4f308b4 100644
--- a/src/cppcache/src/SerializationRegistry.cpp
+++ b/src/cppcache/src/SerializationRegistry.cpp
@@ -49,304 +49,80 @@
#include "ThinClientPoolDM.hpp"
#include "PdxType.hpp"
#include <geode/PdxWrapper.hpp>
-#include <geode/PdxSerializable.hpp>
#include "EnumInfo.hpp"
#include "VersionTag.hpp"
#include "DiskStoreId.hpp"
#include "DiskVersionTag.hpp"
#include "CachedDeserializableHelper.hpp"
-
-#include "NonCopyable.hpp"
-
#include <mutex>
-#include "util/concurrent/spinlock_mutex.hpp"
+#include <functional>
namespace apache {
namespace geode {
namespace client {
-/* adongre
- * CID 28729: Other violation (MISSING_COPY)
- * Class "apache::geode::client::TheTypeMap" owns resources that are managed in
- * its
- * constructor and destructor but has no user-written copy constructor.
- *
- * CID 28715: Other violation (MISSING_ASSIGN)
- * Class "apache::geode::client::TheTypeMap" owns resources that are managed
- * in its constructor and destructor but has no user-written assignment
- * operator.
- *
- * FIX : Make the class NonCopyable
- */
-
-class TheTypeMap : private NonCopyable, private NonAssignable {
- private:
- IdToFactoryMap* m_map;
- IdToFactoryMap* m_map2; // to hold Fixed IDs since GFE 5.7.
- StrToPdxFactoryMap* m_pdxTypemap;
- spinlock_mutex m_mapLock;
- spinlock_mutex m_map2Lock;
- spinlock_mutex m_pdxTypemapLock;
-
- public:
- TheTypeMap();
-
- virtual ~TheTypeMap() {
- if (m_map != nullptr) {
- delete m_map;
- }
-
- if (m_map2 != nullptr) {
- delete m_map2;
- }
-
- if (m_pdxTypemap != nullptr) {
- delete m_pdxTypemap;
- }
- }
-
- inline void setup() {
- // Register Geode builtins here!!
- // update type ids in GeodeTypeIds.hpp
-
- bind(CacheableByte::createDeserializable);
- bind(CacheableBoolean::createDeserializable);
- bind(BooleanArray::createDeserializable);
- bind(CacheableBytes::createDeserializable);
- bind(CacheableFloat::createDeserializable);
- bind(CacheableFloatArray::createDeserializable);
- bind(CacheableDouble::createDeserializable);
- bind(CacheableDoubleArray::createDeserializable);
- bind(CacheableDate::createDeserializable);
- bind(CacheableFileName::createDeserializable);
- bind(CacheableHashMap::createDeserializable);
- bind(CacheableHashSet::createDeserializable);
- bind(CacheableHashTable::createDeserializable);
- bind(CacheableIdentityHashMap::createDeserializable);
- bind(CacheableLinkedHashSet::createDeserializable);
- bind(CacheableInt16::createDeserializable);
- bind(CacheableInt16Array::createDeserializable);
- bind(CacheableInt32::createDeserializable);
- bind(CacheableInt32Array::createDeserializable);
- bind(CacheableInt64::createDeserializable);
- bind(CacheableInt64Array::createDeserializable);
- bind(CacheableObjectArray::createDeserializable);
- bind(CacheableString::createDeserializable);
- bind(CacheableString::createDeserializableHuge);
- bind(CacheableString::createUTFDeserializable);
- bind(CacheableString::createUTFDeserializableHuge);
- bind(CacheableStringArray::createDeserializable);
- bind(CacheableVector::createDeserializable);
- bind(CacheableArrayList::createDeserializable);
- bind(CacheableLinkedList::createDeserializable);
- bind(CacheableStack::createDeserializable);
- bind(CacheableWideChar::createDeserializable);
- bind(CharArray::createDeserializable);
- bind(CacheableToken::createDeserializable);
- bind(RegionAttributes::createDeserializable);
- bind(Properties::createDeserializable);
- // bind(CacheableObjectPartList::createDeserializable);
- // bind internal/fixed classes - since GFE 5.7
- bind2(CacheableUndefined::createDeserializable);
- bind2(EventId::createDeserializable);
- bind2(Struct::createDeserializable);
- bind2(ClientConnectionResponse::create);
- bind2(QueueConnectionResponse::create);
- bind2(LocatorListResponse::create);
- bind2(ClientProxyMembershipID::createDeserializable);
- bind2(GatewayEventCallbackArgument::createDeserializable);
- bind2(GatewaySenderEventCallbackArgument::createDeserializable);
- bind2(GetAllServersResponse::create);
- bind2(TXCommitMessage::create);
- bind2(EnumInfo::createDeserializable);
- bind2(VersionTag::createDeserializable);
- rebind2(GeodeTypeIdsImpl::DiskStoreId, DiskStoreId::createDeserializable);
- rebind2(GeodeTypeIdsImpl::DiskVersionTag,
- DiskVersionTag::createDeserializable);
- bind2(CachedDeserializableHelper::createForVmCachedDeserializable);
- bind2(CachedDeserializableHelper::createForPreferBytesDeserializable);
- // bind2(VersionedCacheableObjectPartList::createDeserializable);
- }
-
- inline void clear() {
- std::lock_guard<spinlock_mutex> guard(m_mapLock);
- m_map->unbind_all();
-
- std::lock_guard<spinlock_mutex> guard2(m_map2Lock);
- m_map2->unbind_all();
-
- std::lock_guard<spinlock_mutex> guard3(m_pdxTypemapLock);
- m_pdxTypemap->unbind_all();
- }
-
- inline void find(int64_t id, TypeFactoryMethod& func) {
- std::lock_guard<spinlock_mutex> guard(m_mapLock);
- m_map->find(id, func);
- }
-
- inline void find2(int64_t id, TypeFactoryMethod& func) {
- std::lock_guard<spinlock_mutex> guard(m_map2Lock);
- m_map2->find(id, func);
- }
-
- inline void bind(TypeFactoryMethod func) {
- Serializable* obj = func();
- std::lock_guard<spinlock_mutex> guard(m_mapLock);
- int64_t compId = static_cast<int64_t>(obj->typeId());
- if (compId == GeodeTypeIdsImpl::CacheableUserData ||
- compId == GeodeTypeIdsImpl::CacheableUserData2 ||
- compId == GeodeTypeIdsImpl::CacheableUserData4) {
- compId |= ((static_cast<int64_t>(obj->classId())) << 32);
- }
- delete obj;
- int bindRes = m_map->bind(compId, func);
- if (bindRes == 1) {
- LOGERROR(
- "A class with "
- "ID %d is already registered.",
- compId);
- throw IllegalStateException(
- "A class with "
- "given ID is already registered.");
- } else if (bindRes == -1) {
- LOGERROR(
- "Unknown error "
- "while adding class ID %d to map.",
- compId);
- throw IllegalStateException(
- "Unknown error "
- "while adding type to map.");
- }
- }
- inline void rebind(int64_t compId, TypeFactoryMethod func) {
- std::lock_guard<spinlock_mutex> guard(m_mapLock);
- int bindRes = m_map->rebind(compId, func);
- if (bindRes == -1) {
- LOGERROR(
- "Unknown error "
- "while adding class ID %d to map.",
- compId);
- throw IllegalStateException(
- "Unknown error "
- "while adding type to map.");
- }
- }
-
- inline void unbind(int64_t compId) {
- std::lock_guard<spinlock_mutex> guard(m_mapLock);
- m_map->unbind(compId);
- }
-
- inline void bind2(TypeFactoryMethod func) {
- Serializable* obj = func();
- std::lock_guard<spinlock_mutex> guard(m_map2Lock);
- int8_t dsfid = obj->DSFID();
-
- int64_t compId = 0;
- if (dsfid == GeodeTypeIdsImpl::FixedIDShort) {
- compId = compId = static_cast<int64_t>(obj->classId());
- } else {
- compId = static_cast<int64_t>(obj->typeId());
- }
- delete obj;
- int bindRes = m_map2->bind(compId, func);
- if (bindRes == 1) {
- LOGERROR(
- "A fixed class with "
- "ID %d is already registered.",
- compId);
- throw IllegalStateException(
- "A fixed class with "
- "given ID is already registered.");
- } else if (bindRes == -1) {
- LOGERROR(
- "Unknown error "
- "while adding class ID %d to map2.",
- compId);
- throw IllegalStateException(
- "Unknown error "
- "while adding to map2.");
- }
- }
-
- inline void rebind2(int64_t compId, TypeFactoryMethod func) {
- std::lock_guard<spinlock_mutex> guard(m_map2Lock);
- m_map2->rebind(compId, func);
- }
-
- inline void unbind2(int64_t compId) {
- std::lock_guard<spinlock_mutex> guard(m_map2Lock);
- m_map2->unbind(compId);
- }
-
- inline void bindPdxType(TypeFactoryMethodPdx func) {
- PdxSerializable* obj = func();
- std::lock_guard<spinlock_mutex> guard(m_pdxTypemapLock);
- const char* objFullName = obj->getClassName();
-
- int bindRes = m_pdxTypemap->bind(objFullName, func);
-
- delete obj;
-
- if (bindRes == 1) {
- LOGERROR("A object with FullName %s is already registered.", objFullName);
- throw IllegalStateException(
- "A Object with "
- "given FullName is already registered.");
- } else if (bindRes == -1) {
- LOGERROR(
- "Unknown error "
- "while adding Pdx Object named %s to map.",
- objFullName);
- throw IllegalStateException(
- "Unknown error "
- "while adding type to map.");
- }
- }
-
- inline void findPdxType(const char* objFullName, TypeFactoryMethodPdx& func) {
- std::lock_guard<spinlock_mutex> guard(m_pdxTypemapLock);
- m_pdxTypemap->find(objFullName, func);
- }
-
- inline void rebindPdxType(const char* objFullName,
- TypeFactoryMethodPdx func) {
- std::lock_guard<spinlock_mutex> guard(m_pdxTypemapLock);
- int bindRes = m_pdxTypemap->rebind(objFullName, func);
- if (bindRes == -1) {
- LOGERROR(
- "Unknown error "
- "while adding Pdx Object FullName %s to map.",
- objFullName);
- throw IllegalStateException(
- "Unknown error "
- "while adding type to map.");
- }
- }
-
- inline void unbindPdxType(const char* objFullName) {
- std::lock_guard<spinlock_mutex> guard(m_pdxTypemapLock);
- m_pdxTypemap->unbind(objFullName);
- }
-};
-
-TheTypeMap::TheTypeMap() {
- m_map = new IdToFactoryMap();
-
- // second map to hold internal Data Serializable Fixed IDs - since GFE 5.7
- m_map2 = new IdToFactoryMap();
-
- // map to hold PDX types <string, funptr>.
- m_pdxTypemap = new StrToPdxFactoryMap();
+void TheTypeMap::setup() {
+ // Register Geode builtins here!!
+ // update type ids in GeodeTypeIds.hpp
+
+ bind(CacheableByte::createDeserializable);
+ bind(CacheableBoolean::createDeserializable);
+ bind(BooleanArray::createDeserializable);
+ bind(CacheableBytes::createDeserializable);
+ bind(CacheableFloat::createDeserializable);
+ bind(CacheableFloatArray::createDeserializable);
+ bind(CacheableDouble::createDeserializable);
+ bind(CacheableDoubleArray::createDeserializable);
+ bind(CacheableDate::createDeserializable);
+ bind(CacheableFileName::createDeserializable);
+ bind(CacheableHashMap::createDeserializable);
+ bind(CacheableHashSet::createDeserializable);
+ bind(CacheableHashTable::createDeserializable);
+ bind(CacheableIdentityHashMap::createDeserializable);
+ bind(CacheableLinkedHashSet::createDeserializable);
+ bind(CacheableInt16::createDeserializable);
+ bind(CacheableInt16Array::createDeserializable);
+ bind(CacheableInt32::createDeserializable);
+ bind(CacheableInt32Array::createDeserializable);
+ bind(CacheableInt64::createDeserializable);
+ bind(CacheableInt64Array::createDeserializable);
+ bind(CacheableObjectArray::createDeserializable);
+ bind(CacheableString::createDeserializable);
+ bind(CacheableString::createDeserializableHuge);
+ bind(CacheableString::createUTFDeserializable);
+ bind(CacheableString::createUTFDeserializableHuge);
+ bind(CacheableStringArray::createDeserializable);
+ bind(CacheableVector::createDeserializable);
+ bind(CacheableArrayList::createDeserializable);
+ bind(CacheableLinkedList::createDeserializable);
+ bind(CacheableStack::createDeserializable);
+ bind(CacheableWideChar::createDeserializable);
+ bind(CharArray::createDeserializable);
+ bind(CacheableToken::createDeserializable);
+ bind(RegionAttributes::createDeserializable);
+ bind(Properties::createDeserializable);
+
+ bind2(CacheableUndefined::createDeserializable);
+ bind2(EventId::createDeserializable);
+ bind2(Struct::createDeserializable);
+ bind2(ClientConnectionResponse::create);
+ bind2(QueueConnectionResponse::create);
+ bind2(LocatorListResponse::create);
+ bind2(ClientProxyMembershipID::createDeserializable);
+ bind2(GatewayEventCallbackArgument::createDeserializable);
+ bind2(GatewaySenderEventCallbackArgument::createDeserializable);
+ bind2(GetAllServersResponse::create);
+ bind2(EnumInfo::createDeserializable);
+
+ rebind2(GeodeTypeIdsImpl::DiskStoreId, DiskStoreId::createDeserializable);
+
+ bind2(CachedDeserializableHelper::createForVmCachedDeserializable);
+ bind2(CachedDeserializableHelper::createForPreferBytesDeserializable);
}
-typedef ACE_Singleton<TheTypeMap, ACE_Thread_Mutex> theTypeMap;
-
-PdxSerializerPtr SerializationRegistry::m_pdxSerializer = nullptr;
-
/** This starts at reading the typeid.. assumes the length has been read. */
SerializablePtr SerializationRegistry::deserialize(DataInput& input,
- int8_t typeId) {
+ int8_t typeId) const {
bool findinternal = false;
int8_t currentTypeId = typeId;
@@ -393,9 +169,9 @@ SerializablePtr SerializationRegistry::deserialize(DataInput& input,
}
if (findinternal) {
- theTypeMap::instance()->find2(compId, createType);
+ theTypeMap.find2(compId, createType);
} else {
- theTypeMap::instance()->find(compId, createType);
+ theTypeMap.find(compId, createType);
}
if (createType == nullptr) {
if (findinternal) {
@@ -425,47 +201,41 @@ SerializablePtr SerializationRegistry::deserialize(DataInput& input,
}
void SerializationRegistry::addType(TypeFactoryMethod func) {
- theTypeMap::instance()->bind(func);
+ theTypeMap.bind(func);
}
void SerializationRegistry::addPdxType(TypeFactoryMethodPdx func) {
- theTypeMap::instance()->bindPdxType(func);
+ theTypeMap.bindPdxType(func);
}
void SerializationRegistry::addType(int64_t compId, TypeFactoryMethod func) {
- theTypeMap::instance()->rebind(compId, func);
+ theTypeMap.rebind(compId, func);
}
void SerializationRegistry::removeType(int64_t compId) {
- theTypeMap::instance()->unbind(compId);
+ theTypeMap.unbind(compId);
}
void SerializationRegistry::addType2(TypeFactoryMethod func) {
- theTypeMap::instance()->bind2(func);
+ theTypeMap.bind2(func);
}
void SerializationRegistry::addType2(int64_t compId, TypeFactoryMethod func) {
- theTypeMap::instance()->rebind2(compId, func);
+ theTypeMap.rebind2(compId, func);
}
void SerializationRegistry::removeType2(int64_t compId) {
- theTypeMap::instance()->unbind2(compId);
+ theTypeMap.unbind2(compId);
}
-void SerializationRegistry::init() {
- // Everything here is done in the constructor for TheTypeMap...
- theTypeMap::instance();
- theTypeMap::instance()->clear();
- theTypeMap::instance()->setup();
-}
-
-PdxSerializablePtr SerializationRegistry::getPdxType(const char* className) {
+PdxSerializablePtr SerializationRegistry::getPdxType(char* className) {
TypeFactoryMethodPdx objectType = nullptr;
- theTypeMap::instance()->findPdxType(className, objectType);
+ theTypeMap.findPdxType(className, objectType);
PdxSerializablePtr pdxObj;
if (nullptr == objectType) {
try {
- pdxObj = std::make_shared<PdxWrapper>(className);
+ pdxObj =
+ std::make_shared<PdxWrapper>((const char*)className, m_pdxSerializer);
} catch (const Exception&) {
LOGERROR(
"Unregistered class %s during PDX deserialization: Did the "
@@ -488,19 +258,8 @@ PdxSerializerPtr SerializationRegistry::getPdxSerializer() {
return m_pdxSerializer;
}
-int32_t SerializationRegistry::GetPDXIdForType(const char* poolName,
- SerializablePtr pdxType) {
- PoolPtr pool = nullptr;
-
- if (poolName == nullptr) {
- for (const auto& iter : PoolManager::getAll()) {
- pool = iter.second;
- break;
- }
- } else {
- pool = PoolManager::find(poolName);
- }
-
+int32_t SerializationRegistry::GetPDXIdForType(PoolPtr pool,
+ SerializablePtr pdxType) const {
if (pool == nullptr) {
throw IllegalStateException("Pool not found, Pdx operation failed");
}
@@ -508,19 +267,8 @@ int32_t SerializationRegistry::GetPDXIdForType(const char* poolName,
return static_cast<ThinClientPoolDM*>(pool.get())->GetPDXIdForType(pdxType);
}
-SerializablePtr SerializationRegistry::GetPDXTypeById(const char* poolName,
- int32_t typeId) {
- PoolPtr pool = nullptr;
-
- if (poolName == nullptr) {
- for (const auto& iter : PoolManager::getAll()) {
- pool = iter.second;
- break;
- }
- } else {
- pool = PoolManager::find(poolName);
- }
-
+SerializablePtr SerializationRegistry::GetPDXTypeById(PoolPtr pool,
+ int32_t typeId) const {
if (pool == nullptr) {
throw IllegalStateException("Pool not found, Pdx operation failed");
}
@@ -528,16 +276,16 @@ SerializablePtr SerializationRegistry::GetPDXTypeById(const char* poolName,
return static_cast<ThinClientPoolDM*>(pool.get())->GetPDXTypeById(typeId);
}
-int32_t SerializationRegistry::GetEnumValue(SerializablePtr enumInfo) {
- PoolPtr pool = getPool();
+int32_t SerializationRegistry::GetEnumValue(PoolPtr pool,
+ SerializablePtr enumInfo) const {
if (pool == nullptr) {
throw IllegalStateException("Pool not found, Pdx operation failed");
}
return static_cast<ThinClientPoolDM*>(pool.get())->GetEnumValue(enumInfo);
}
-SerializablePtr SerializationRegistry::GetEnum(int32_t val) {
- PoolPtr pool = getPool();
+SerializablePtr SerializationRegistry::GetEnum(PoolPtr pool,
+ int32_t val) const {
if (pool == nullptr) {
throw IllegalStateException("Pool not found, Pdx operation failed");
}
@@ -545,14 +293,167 @@ SerializablePtr SerializationRegistry::GetEnum(int32_t val) {
return static_cast<ThinClientPoolDM*>(pool.get())->GetEnum(val);
}
-PoolPtr SerializationRegistry::getPool() {
- PoolPtr pool = nullptr;
- for (const auto& iter: PoolManager::getAll()) {
- pool = iter.second;
- break;
+void TheTypeMap::clear() {
+ std::lock_guard<util::concurrent::spinlock_mutex> guard(m_mapLock);
+ m_map->unbind_all();
+
+ std::lock_guard<util::concurrent::spinlock_mutex> guard2(m_map2Lock);
+ m_map2->unbind_all();
+
+ std::lock_guard<util::concurrent::spinlock_mutex> guard3(m_pdxTypemapLock);
+ m_pdxTypemap->unbind_all();
+}
+
+void TheTypeMap::find(int64_t id, TypeFactoryMethod& func) const {
+ std::lock_guard<util::concurrent::spinlock_mutex> guard(m_mapLock);
+ m_map->find(id, func);
+}
+
+void TheTypeMap::find2(int64_t id, TypeFactoryMethod& func) const {
+ std::lock_guard<util::concurrent::spinlock_mutex> guard(m_map2Lock);
+ m_map2->find(id, func);
+}
+
+void TheTypeMap::bind(TypeFactoryMethod func) {
+ Serializable* obj = func();
+ std::lock_guard<util::concurrent::spinlock_mutex> guard(m_mapLock);
+ int64_t compId = static_cast<int64_t>(obj->typeId());
+ if (compId == GeodeTypeIdsImpl::CacheableUserData ||
+ compId == GeodeTypeIdsImpl::CacheableUserData2 ||
+ compId == GeodeTypeIdsImpl::CacheableUserData4) {
+ compId |= ((static_cast<int64_t>(obj->classId())) << 32);
+ }
+ delete obj;
+ int bindRes = m_map->bind(compId, func);
+ if (bindRes == 1) {
+ LOGERROR(
+ "A class with "
+ "ID %d is already registered.",
+ compId);
+ throw IllegalStateException(
+ "A class with "
+ "given ID is already registered.");
+ } else if (bindRes == -1) {
+ LOGERROR(
+ "Unknown error "
+ "while adding class ID %d to map.",
+ compId);
+ throw IllegalStateException(
+ "Unknown error "
+ "while adding type to map.");
+ }
+}
+
+void TheTypeMap::rebind(int64_t compId, TypeFactoryMethod func) {
+ std::lock_guard<util::concurrent::spinlock_mutex> guard(m_mapLock);
+ int bindRes = m_map->rebind(compId, func);
+ if (bindRes == -1) {
+ LOGERROR(
+ "Unknown error "
+ "while adding class ID %d to map.",
+ compId);
+ throw IllegalStateException(
+ "Unknown error "
+ "while adding type to map.");
+ }
+}
+
+void TheTypeMap::unbind(int64_t compId) {
+ std::lock_guard<util::concurrent::spinlock_mutex> guard(m_mapLock);
+ m_map->unbind(compId);
+}
+
+void TheTypeMap::bind2(TypeFactoryMethod func) {
+ Serializable* obj = func();
+ std::lock_guard<util::concurrent::spinlock_mutex> guard(m_map2Lock);
+ int8_t dsfid = obj->DSFID();
+
+ int64_t compId = 0;
+ if (dsfid == GeodeTypeIdsImpl::FixedIDShort) {
+ compId = compId = static_cast<int64_t>(obj->classId());
+ } else {
+ compId = static_cast<int64_t>(obj->typeId());
+ }
+ delete obj;
+ int bindRes = m_map2->bind(compId, func);
+ if (bindRes == 1) {
+ LOGERROR(
+ "A fixed class with "
+ "ID %d is already registered.",
+ compId);
+ throw IllegalStateException(
+ "A fixed class with "
+ "given ID is already registered.");
+ } else if (bindRes == -1) {
+ LOGERROR(
+ "Unknown error "
+ "while adding class ID %d to map2.",
+ compId);
+ throw IllegalStateException(
+ "Unknown error "
+ "while adding to map2.");
}
- return pool;
}
+
+void TheTypeMap::rebind2(int64_t compId, TypeFactoryMethod func) {
+ std::lock_guard<util::concurrent::spinlock_mutex> guard(m_map2Lock);
+ m_map2->rebind(compId, func);
+}
+
+void TheTypeMap::unbind2(int64_t compId) {
+ std::lock_guard<util::concurrent::spinlock_mutex> guard(m_map2Lock);
+ m_map2->unbind(compId);
+}
+
+void TheTypeMap::bindPdxType(TypeFactoryMethodPdx func) {
+ PdxSerializable* obj = func();
+ std::lock_guard<util::concurrent::spinlock_mutex> guard(m_pdxTypemapLock);
+ const char* objFullName = obj->getClassName();
+
+ int bindRes = m_pdxTypemap->bind(objFullName, func);
+
+ delete obj;
+
+ if (bindRes == 1) {
+ LOGERROR("A object with FullName %s is already registered.", objFullName);
+ throw IllegalStateException(
+ "A Object with "
+ "given FullName is already registered.");
+ } else if (bindRes == -1) {
+ LOGERROR(
+ "Unknown error "
+ "while adding Pdx Object named %s to map.",
+ objFullName);
+ throw IllegalStateException(
+ "Unknown error "
+ "while adding type to map.");
+ }
+}
+
+void TheTypeMap::findPdxType(char* objFullName, TypeFactoryMethodPdx& func) {
+ std::lock_guard<util::concurrent::spinlock_mutex> guard(m_pdxTypemapLock);
+ m_pdxTypemap->find(objFullName, func);
+}
+
+void TheTypeMap::rebindPdxType(char* objFullName, TypeFactoryMethodPdx func) {
+ std::lock_guard<util::concurrent::spinlock_mutex> guard(m_pdxTypemapLock);
+ int bindRes = m_pdxTypemap->rebind(objFullName, func);
+ if (bindRes == -1) {
+ LOGERROR(
+ "Unknown error "
+ "while adding Pdx Object FullName %s to map.",
+ objFullName);
+ throw IllegalStateException(
+ "Unknown error "
+ "while adding type to map.");
+ }
+}
+
+void TheTypeMap::unbindPdxType(char* objFullName) {
+ std::lock_guard<util::concurrent::spinlock_mutex> guard(m_pdxTypemapLock);
+ m_pdxTypemap->unbind(objFullName);
+}
+
} // namespace client
} // namespace geode
} // namespace apache
http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/SerializationRegistry.hpp
----------------------------------------------------------------------
diff --git a/src/cppcache/src/SerializationRegistry.hpp b/src/cppcache/src/SerializationRegistry.hpp
index 33cc86b..9803a8a 100644
--- a/src/cppcache/src/SerializationRegistry.hpp
+++ b/src/cppcache/src/SerializationRegistry.hpp
@@ -33,6 +33,10 @@
#include <geode/ExceptionTypes.hpp>
#include <geode/Delta.hpp>
#include <string>
+#include "util/concurrent/spinlock_mutex.hpp"
+#include "NonCopyable.hpp"
+#include <geode/PdxSerializable.hpp>
+#include "MemberListForVersionStamp.hpp"
#if defined(_MACOSX)
ACE_BEGIN_VERSIONED_NAMESPACE_DECL
@@ -58,14 +62,75 @@ typedef ACE_Hash_Map_Manager<int64_t, TypeFactoryMethod, ACE_Null_Mutex>
typedef ACE_Hash_Map_Manager<std::string, TypeFactoryMethodPdx, ACE_Null_Mutex>
StrToPdxFactoryMap;
+class TheTypeMap : private NonCopyable {
+ private:
+ IdToFactoryMap* m_map;
+ IdToFactoryMap* m_map2; // to hold Fixed IDs since GFE 5.7.
+ StrToPdxFactoryMap* m_pdxTypemap;
+ mutable util::concurrent::spinlock_mutex m_mapLock;
+ mutable util::concurrent::spinlock_mutex m_map2Lock;
+ mutable util::concurrent::spinlock_mutex m_pdxTypemapLock;
+
+ public:
+ TheTypeMap() {
+ m_map = new IdToFactoryMap();
+
+ // second map to hold internal Data Serializable Fixed IDs - since GFE 5.7
+ m_map2 = new IdToFactoryMap();
+
+ // map to hold PDX types <string, funptr>.
+ m_pdxTypemap = new StrToPdxFactoryMap();
+
+ setup();
+ }
+
+ virtual ~TheTypeMap() {
+ if (m_map != nullptr) {
+ delete m_map;
+ }
+
+ if (m_map2 != nullptr) {
+ delete m_map2;
+ }
+
+ if (m_pdxTypemap != nullptr) {
+ delete m_pdxTypemap;
+ }
+ }
+
+ void setup();
+
+ void clear();
+
+ void find(int64_t id, TypeFactoryMethod& func) const;
+ void find2(int64_t id, TypeFactoryMethod& func) const;
+
+ void bind(TypeFactoryMethod func);
+
+ inline void rebind(int64_t compId, TypeFactoryMethod func);
+ inline void unbind(int64_t compId);
+ inline void bind2(TypeFactoryMethod func);
+
+ inline void rebind2(int64_t compId, TypeFactoryMethod func);
+
+ inline void unbind2(int64_t compId);
+ inline void bindPdxType(TypeFactoryMethodPdx func);
+ inline void findPdxType(char* objFullName, TypeFactoryMethodPdx& func);
+ inline void unbindPdxType(char* objFullName);
+
+ void rebindPdxType(char* objFullName, TypeFactoryMethodPdx func);
+};
+
class CPPCACHE_EXPORT SerializationRegistry {
public:
+ SerializationRegistry() : theTypeMap() {}
+
/** write the length of the serialization, write the typeId of the object,
* then write whatever the object's toData requires. The length at the
* front is backfilled after the serialization.
*/
- inline static void serialize(const Serializable* obj, DataOutput& output,
- bool isDelta = false) {
+ inline void serialize(const Serializable* obj, DataOutput& output,
+ bool isDelta = false) const {
if (obj == nullptr) {
output.write(static_cast<int8_t>(GeodeTypeIds::NullObj));
} else {
@@ -106,7 +171,7 @@ class CPPCACHE_EXPORT SerializationRegistry {
}
}
- inline static void serialize(const SerializablePtr& obj, DataOutput& output) {
+ inline void serialize(const SerializablePtr& obj, DataOutput& output) const {
serialize(obj.get(), output);
}
@@ -114,45 +179,44 @@ class CPPCACHE_EXPORT SerializationRegistry {
* Read the length, typeid, and run the objs fromData. Returns the New
* object.
*/
- static SerializablePtr deserialize(DataInput& input, int8_t typeId = -1);
-
- static void addType(TypeFactoryMethod func);
+ SerializablePtr deserialize(DataInput& input, int8_t typeId = -1) const;
- static void addType(int64_t compId, TypeFactoryMethod func);
+ void addType(TypeFactoryMethod func);
- static void addPdxType(TypeFactoryMethodPdx func);
+ void addType(int64_t compId, TypeFactoryMethod func);
- static void setPdxSerializer(PdxSerializerPtr pdxSerializer);
+ void addPdxType(TypeFactoryMethodPdx func);
- static PdxSerializerPtr getPdxSerializer();
+ void setPdxSerializer(PdxSerializerPtr pdxSerializer);
- static void removeType(int64_t compId);
+ PdxSerializerPtr getPdxSerializer();
- static void init();
+ void removeType(int64_t compId);
// following for internal types with Data Serializable Fixed IDs - since GFE
// 5.7
- static void addType2(TypeFactoryMethod func);
+ void addType2(TypeFactoryMethod func);
- static void addType2(int64_t compId, TypeFactoryMethod func);
+ void addType2(int64_t compId, TypeFactoryMethod func);
- static void removeType2(int64_t compId);
+ void removeType2(int64_t compId);
- static int32_t GetPDXIdForType(const char* poolName, SerializablePtr pdxType);
+ int32_t GetPDXIdForType(PoolPtr pool, SerializablePtr pdxType) const;
- static SerializablePtr GetPDXTypeById(const char* poolName, int32_t typeId);
+ SerializablePtr GetPDXTypeById(PoolPtr pool, int32_t typeId) const;
- static int32_t GetEnumValue(SerializablePtr enumInfo);
- static SerializablePtr GetEnum(int32_t val);
+ int32_t GetEnumValue(PoolPtr pool, SerializablePtr enumInfo) const;
+ SerializablePtr GetEnum(PoolPtr pool, int32_t val) const;
- static PdxSerializablePtr getPdxType(const char* className);
+ PdxSerializablePtr getPdxType(char* className);
private:
- static PoolPtr getPool();
- static IdToFactoryMap* s_typeMap;
- static PdxSerializerPtr m_pdxSerializer;
+ PdxSerializerPtr m_pdxSerializer;
+ TheTypeMap theTypeMap;
};
+
+typedef std::shared_ptr<SerializationRegistry> SerializationRegistryPtr;
} // namespace client
} // namespace geode
} // namespace apache
http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/TXCommitMessage.cpp
----------------------------------------------------------------------
diff --git a/src/cppcache/src/TXCommitMessage.cpp b/src/cppcache/src/TXCommitMessage.cpp
index 211450f..933d980 100644
--- a/src/cppcache/src/TXCommitMessage.cpp
+++ b/src/cppcache/src/TXCommitMessage.cpp
@@ -33,7 +33,7 @@ namespace apache {
namespace geode {
namespace client {
-TXCommitMessage::TXCommitMessage()
+TXCommitMessage::TXCommitMessage(MemberListForVersionStamp & memberListForVersionStamp) : m_memberListForVersionStamp(memberListForVersionStamp)
// UNUSED : m_processorId(0)
{}
@@ -88,7 +88,7 @@ m_processorId = -1;
int32_t regionSize;
input.readInt(®ionSize);
for (int32_t i = 0; i < regionSize; i++) {
- auto rc = std::make_shared<RegionCommit>();
+ auto rc = std::make_shared<RegionCommit>(m_memberListForVersionStamp);
rc->fromData(input);
m_regions.push_back(rc);
}
@@ -159,7 +159,7 @@ int8_t TXCommitMessage::typeId() const {
return static_cast<int8_t>(GeodeTypeIdsImpl::TXCommitMessage);
}
-Serializable* TXCommitMessage::create() { return new TXCommitMessage(); }
+Serializable* TXCommitMessage::create(MemberListForVersionStamp & memberListForVersionStamp) { return new TXCommitMessage(memberListForVersionStamp); }
void TXCommitMessage::apply(Cache* cache) {
for (std::vector<RegionCommitPtr>::iterator iter = m_regions.begin();
http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/TXCommitMessage.hpp
----------------------------------------------------------------------
diff --git a/src/cppcache/src/TXCommitMessage.hpp b/src/cppcache/src/TXCommitMessage.hpp
index eb1b89b..09bc702 100644
--- a/src/cppcache/src/TXCommitMessage.hpp
+++ b/src/cppcache/src/TXCommitMessage.hpp
@@ -32,14 +32,14 @@ _GF_PTR_DEF_(TXCommitMessage, TXCommitMessagePtr);
class TXCommitMessage : public apache::geode::client::Cacheable {
public:
- TXCommitMessage();
+ TXCommitMessage(MemberListForVersionStamp & memberListForVersionStamp);
virtual ~TXCommitMessage();
virtual Serializable* fromData(DataInput& input);
virtual void toData(DataOutput& output) const;
virtual int32_t classId() const;
int8_t typeId() const;
- static Serializable* create();
+ static Serializable* create(MemberListForVersionStamp & memberListForVersionStamp);
// VectorOfEntryEvent getEvents(Cache* cache);
void apply(Cache* cache);
@@ -47,7 +47,7 @@ class TXCommitMessage : public apache::geode::client::Cacheable {
private:
// UNUSED int32_t m_processorId;
bool isAckRequired();
-
+ MemberListForVersionStamp & m_memberListForVersionStamp;
std::vector<RegionCommitPtr> m_regions;
};
} // namespace client
http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/TcpConn.cpp
----------------------------------------------------------------------
diff --git a/src/cppcache/src/TcpConn.cpp b/src/cppcache/src/TcpConn.cpp
index b80aa51..581d267 100644
--- a/src/cppcache/src/TcpConn.cpp
+++ b/src/cppcache/src/TcpConn.cpp
@@ -27,11 +27,9 @@
#include <ace/SOCK_Connector.h>
#include <ace/SOCK_Acceptor.h>
#include <ace/OS.h>
-
+#include "CacheImpl.hpp"
using namespace apache::geode::client;
-int TcpConn::m_chunkSize = TcpConn::setChunkSize();
-
void TcpConn::clearNagle(ACE_SOCKET sock) {
int32_t val = 1;
#ifdef WIN32
@@ -60,19 +58,10 @@ int32_t TcpConn::maxSize(ACE_SOCKET sock, int32_t flag, int32_t size) {
socklen_t plen = sizeof(val);
socklen_t clen = sizeof(val);
- static int32_t max = 32000;
- if (m_maxBuffSizePool <= 0) {
- SystemProperties *props = DistributedSystem::getSystemProperties();
- if (props != nullptr) {
- max = props->maxSocketBufferSize();
- }
- } else {
- max = m_maxBuffSizePool;
- }
int32_t inc = 32120;
val = size - (3 * inc);
if (val < 0) val = 0;
- if (size == 0) size = max;
+ if (size == 0) size = m_maxBuffSizePool;
int32_t red = 0;
int32_t lastRed = -1;
while (lastRed != red) {
@@ -92,7 +81,7 @@ int32_t TcpConn::maxSize(ACE_SOCKET sock, int32_t flag, int32_t size) {
#ifdef _LINUX
val /= 2;
#endif
- if ((val >= max) || (val >= size)) continue;
+ if ((val >= m_maxBuffSizePool) || (val >= size)) continue;
red = val;
}
return val;
@@ -105,17 +94,7 @@ void TcpConn::createSocket(ACE_SOCKET sock) {
}
void TcpConn::init() {
- /* adongre
- * CID 28736: Improper use of negative value (NEGATIVE_RETURNS)
- * Function "socket(2, 1, 0)" returns a negative number.
- * Assigning: unsigned variable "sock" = "socket".
- *
- * CID 28737: Unsigned compared against 0 (NO_EFFECT)
- * This less-than-zero comparison of an unsigned value is never true. "sock <
- * 0U".
- */
ACE_SOCKET sock = socket(AF_INET, SOCK_STREAM, 0);
- // if ( sock < 0 ) {
if (sock == -1) {
int32_t lastError = ACE_OS::last_error();
LOGERROR("Failed to create socket. Errno: %d: %s", lastError,
@@ -128,19 +107,19 @@ void TcpConn::init() {
clearNagle(sock);
- static int32_t readSize = 0;
- static int32_t writeSize = 0;
+ int32_t readSize = 0;
+ int32_t writeSize = 0;
int32_t originalReadSize = readSize;
readSize = maxSize(sock, SO_SNDBUF, readSize);
if (originalReadSize != readSize) {
// This should get logged once at startup and again only if it changes
- LOGINFO("Using socket send buffer size of %d.", readSize);
+ LOGFINEST("Using socket send buffer size of %d.", readSize);
}
int32_t originalWriteSize = writeSize;
writeSize = maxSize(sock, SO_RCVBUF, writeSize);
if (originalWriteSize != writeSize) {
// This should get logged once at startup and again only if it changes
- LOGINFO("Using socket receive buffer size of %d.", writeSize);
+ LOGFINEST("Using socket receive buffer size of %d.", writeSize);
}
createSocket(sock);
@@ -148,21 +127,21 @@ void TcpConn::init() {
connect();
}
-TcpConn::TcpConn() : m_io(nullptr), m_waitSeconds(0), m_maxBuffSizePool(0) {}
-
TcpConn::TcpConn(const char *ipaddr, uint32_t waitSeconds,
int32_t maxBuffSizePool)
: m_io(nullptr),
m_addr(ipaddr),
- m_waitSeconds(waitSeconds),
- m_maxBuffSizePool(maxBuffSizePool) {}
+ m_waitMilliSeconds(waitSeconds * 1000),
+ m_maxBuffSizePool(maxBuffSizePool),
+ m_chunkSize(getDefaultChunkSize()) {}
TcpConn::TcpConn(const char *hostname, int32_t port, uint32_t waitSeconds,
int32_t maxBuffSizePool)
: m_io(nullptr),
m_addr(port, hostname),
- m_waitSeconds(waitSeconds),
- m_maxBuffSizePool(maxBuffSizePool) {}
+ m_waitMilliSeconds(waitSeconds * 1000),
+ m_maxBuffSizePool(maxBuffSizePool),
+ m_chunkSize(getDefaultChunkSize()) {}
void TcpConn::listen(const char *hostname, int32_t port, uint32_t waitSeconds) {
ACE_INET_Addr addr(port, hostname);
@@ -216,14 +195,14 @@ void TcpConn::connect(const char *hostname, int32_t port,
uint32_t waitSeconds) {
ACE_INET_Addr addr(port, hostname);
m_addr = addr;
- m_waitSeconds = waitSeconds;
+ m_waitMilliSeconds = waitSeconds;
connect();
}
void TcpConn::connect(const char *ipaddr, uint32_t waitSeconds) {
ACE_INET_Addr addr(ipaddr);
m_addr = addr;
- m_waitSeconds = waitSeconds;
+ m_waitMilliSeconds = waitSeconds;
connect();
}
@@ -231,25 +210,18 @@ void TcpConn::connect() {
GF_DEV_ASSERT(m_io != nullptr);
ACE_INET_Addr ipaddr = m_addr;
- uint32_t waitSeconds = m_waitSeconds;
+ uint32_t waitMicroSeconds = m_waitMilliSeconds * 1000;
ACE_OS::signal(SIGPIPE, SIG_IGN); // Ignore broken pipe
- // passing waittime as microseconds
- if (DistributedSystem::getSystemProperties()->readTimeoutUnitInMillis()) {
- waitSeconds = waitSeconds * 1000;
- } else {
- waitSeconds = waitSeconds * (1000 * 1000);
- }
-
LOGFINER("Connecting plain socket stream to %s:%d waiting %d micro sec",
- ipaddr.get_host_name(), ipaddr.get_port_number(), waitSeconds);
+ ipaddr.get_host_name(), ipaddr.get_port_number(), waitMicroSeconds);
ACE_SOCK_Connector conn;
int32_t retVal = 0;
- if (waitSeconds > 0) {
+ if (waitMicroSeconds > 0) {
// passing waittime as microseconds
- ACE_Time_Value wtime(0, waitSeconds);
+ ACE_Time_Value wtime(0, waitMicroSeconds);
retVal = conn.connect(*m_io, ipaddr, &wtime);
} else {
retVal = conn.connect(*m_io, ipaddr);
@@ -258,10 +230,10 @@ void TcpConn::connect() {
char msg[256];
int32_t lastError = ACE_OS::last_error();
if (lastError == ETIME || lastError == ETIMEDOUT) {
- ACE_OS::snprintf(
- msg, 256,
- "TcpConn::connect Attempt to connect timed out after %d seconds.",
- waitSeconds);
+ ACE_OS::snprintf(msg, 256,
+ "TcpConn::connect Attempt to connect timed out after %d "
+ "microseconds.",
+ waitMicroSeconds);
// this is only called by constructor, so we must delete m_io
GF_SAFE_DELETE(m_io);
throw TimeoutException(msg);
@@ -269,7 +241,7 @@ void TcpConn::connect() {
ACE_OS::snprintf(msg, 256, "TcpConn::connect failed with errno: %d: %s",
lastError, ACE_OS::strerror(lastError));
// this is only called by constructor, so we must delete m_io
- close();
+ close();
throw GeodeIOException(msg);
}
int rc = this->m_io->enable(ACE_NONBLOCK);
http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/TcpConn.hpp
----------------------------------------------------------------------
diff --git a/src/cppcache/src/TcpConn.hpp b/src/cppcache/src/TcpConn.hpp
index 60ee0a9..13964d4 100644
--- a/src/cppcache/src/TcpConn.hpp
+++ b/src/cppcache/src/TcpConn.hpp
@@ -50,7 +50,7 @@ class CPPCACHE_EXPORT TcpConn : public Connector {
protected:
ACE_INET_Addr m_addr;
- uint32_t m_waitSeconds;
+ uint32_t m_waitMilliSeconds;
int32_t m_maxBuffSizePool;
@@ -65,9 +65,9 @@ class CPPCACHE_EXPORT TcpConn : public Connector {
virtual void createSocket(ACE_SOCKET sock);
public:
- static int m_chunkSize;
+ int m_chunkSize;
- static int setChunkSize() {
+ static int getDefaultChunkSize() {
// Attempt to set chunk size to nearest OS page size
// for perf improvement
int pageSize = ACE_OS::getpagesize();
@@ -80,12 +80,9 @@ class CPPCACHE_EXPORT TcpConn : public Connector {
return 16000000;
}
- TcpConn();
- TcpConn(const char* hostname, int32_t port,
- uint32_t waitSeconds = DEFAULT_CONNECT_TIMEOUT,
- int32_t maxBuffSizePool = 0);
- TcpConn(const char* ipaddr, uint32_t waitSeconds = DEFAULT_CONNECT_TIMEOUT,
- int32_t maxBuffSizePool = 0);
+ TcpConn(const char* hostname, int32_t port, uint32_t waitSeconds,
+ int32_t maxBuffSizePool);
+ TcpConn(const char* ipaddr, uint32_t waitSeconds, int32_t maxBuffSizePool);
virtual ~TcpConn() { close(); }
http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/TcpSslConn.cpp
----------------------------------------------------------------------
diff --git a/src/cppcache/src/TcpSslConn.cpp b/src/cppcache/src/TcpSslConn.cpp
index 3f4935f..74b999d 100644
--- a/src/cppcache/src/TcpSslConn.cpp
+++ b/src/cppcache/src/TcpSslConn.cpp
@@ -19,7 +19,7 @@
#include <geode/SystemProperties.hpp>
#include <geode/DistributedSystem.hpp>
#include "../../cryptoimpl/Ssl.hpp"
-
+#include "CacheImpl.hpp"
using namespace apache::geode::client;
Ssl* TcpSslConn::getSSLImpl(ACE_SOCKET sock, const char* pubkeyfile,
@@ -42,20 +42,13 @@ Ssl* TcpSslConn::getSSLImpl(ACE_SOCKET sock, const char* pubkeyfile,
LOGERROR(msg);
throw IllegalStateException(msg);
}
- // adongre: Added for Ticket #758
- const char* pemPassword =
- DistributedSystem::getSystemProperties()->sslKeystorePassword();
-
return reinterpret_cast<Ssl*>(
- func(sock, pubkeyfile, privkeyfile, pemPassword));
+ func(sock, pubkeyfile, privkeyfile, m_pemPassword));
}
void TcpSslConn::createSocket(ACE_SOCKET sock) {
- SystemProperties* props = DistributedSystem::getSystemProperties();
- const char* pubkeyfile = props->sslTrustStore();
- const char* privkeyfile = props->sslKeyStore();
LOGDEBUG("Creating SSL socket stream");
- m_ssl = getSSLImpl(sock, pubkeyfile, privkeyfile);
+ m_ssl = getSSLImpl(sock, m_pubkeyfile, m_privkeyfile);
}
void TcpSslConn::listen(ACE_INET_Addr addr, uint32_t waitSeconds) {
@@ -101,28 +94,21 @@ void TcpSslConn::connect() {
// m_ssl->init();
- uint32_t waitSeconds = m_waitSeconds;
-
- // passing waittime as microseconds
- if (DistributedSystem::getSystemProperties()->readTimeoutUnitInMillis()) {
- waitSeconds = waitSeconds * 1000;
- } else {
- waitSeconds = waitSeconds * (1000 * 1000);
- }
+ uint32_t waitMicroSeconds = m_waitMilliSeconds * 1000;
- LOGDEBUG("Connecting SSL socket stream to %s:%d waiting %d sec",
- m_addr.get_host_name(), m_addr.get_port_number(), m_waitSeconds);
+ LOGDEBUG("Connecting SSL socket stream to %s:%d waiting %d micro sec",
+ m_addr.get_host_name(), m_addr.get_port_number(), waitMicroSeconds);
- int32_t retVal = m_ssl->connect(m_addr, waitSeconds);
+ int32_t retVal = m_ssl->connect(m_addr, waitMicroSeconds);
if (retVal == -1) {
char msg[256];
int32_t lastError = ACE_OS::last_error();
if (lastError == ETIME || lastError == ETIMEDOUT) {
- ACE_OS::snprintf(
- msg, 256,
- "TcpSslConn::connect Attempt to connect timed out after %d seconds.",
- m_waitSeconds);
+ ACE_OS::snprintf(msg, 256,
+ "TcpSslConn::connect Attempt to connect timed out after "
+ "%d micro-seconds.",
+ waitMicroSeconds);
// this is only called by constructor, so we must delete m_ssl
GF_SAFE_DELETE(m_ssl);
throw TimeoutException(msg);
http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/TcpSslConn.hpp
----------------------------------------------------------------------
diff --git a/src/cppcache/src/TcpSslConn.hpp b/src/cppcache/src/TcpSslConn.hpp
index 387a95c..1ebf3db 100644
--- a/src/cppcache/src/TcpSslConn.hpp
+++ b/src/cppcache/src/TcpSslConn.hpp
@@ -32,6 +32,9 @@ class TcpSslConn : public TcpConn {
private:
Ssl* m_ssl;
ACE_DLL m_dll;
+ const char* m_pubkeyfile;
+ const char* m_privkeyfile;
+ const char* m_pemPassword;
// adongre: Added for Ticket #758
// Pass extra parameter for the password
typedef void* (*gf_create_SslImpl)(ACE_SOCKET, const char*, const char*,
@@ -47,16 +50,23 @@ class TcpSslConn : public TcpConn {
void createSocket(ACE_SOCKET sock);
public:
- TcpSslConn() : TcpConn(), m_ssl(nullptr){};
-
- TcpSslConn(const char* hostname, int32_t port,
- uint32_t waitSeconds = DEFAULT_CONNECT_TIMEOUT,
- int32_t maxBuffSizePool = 0)
- : TcpConn(hostname, port, waitSeconds, maxBuffSizePool), m_ssl(nullptr){};
-
- TcpSslConn(const char* ipaddr, uint32_t waitSeconds = DEFAULT_CONNECT_TIMEOUT,
- int32_t maxBuffSizePool = 0)
- : TcpConn(ipaddr, waitSeconds, maxBuffSizePool), m_ssl(nullptr){};
+ TcpSslConn(const char* hostname, int32_t port, uint32_t waitSeconds,
+ int32_t maxBuffSizePool, const char* pubkeyfile,
+ const char* privkeyfile, const char* pemPassword)
+ : TcpConn(hostname, port, waitSeconds, maxBuffSizePool),
+ m_ssl(nullptr),
+ m_pubkeyfile(pubkeyfile),
+ m_privkeyfile(privkeyfile),
+ m_pemPassword(pemPassword){};
+
+ TcpSslConn(const char* ipaddr, uint32_t waitSeconds, int32_t maxBuffSizePool,
+ const char* pubkeyfile, const char* privkeyfile,
+ const char* pemPassword)
+ : TcpConn(ipaddr, waitSeconds, maxBuffSizePool),
+ m_ssl(nullptr),
+ m_pubkeyfile(pubkeyfile),
+ m_privkeyfile(privkeyfile),
+ m_pemPassword(pemPassword){};
// TODO: Watch out for virt dtor calling virt methods!
http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/TcrChunkedContext.hpp
----------------------------------------------------------------------
diff --git a/src/cppcache/src/TcrChunkedContext.hpp b/src/cppcache/src/TcrChunkedContext.hpp
index c07c22c..068bb09 100644
--- a/src/cppcache/src/TcrChunkedContext.hpp
+++ b/src/cppcache/src/TcrChunkedContext.hpp
@@ -53,7 +53,8 @@ class TcrChunkedResult {
/** handle a chunk of response message from server */
virtual void handleChunk(const uint8_t* bytes, int32_t len,
- uint8_t isLastChunkWithSecurity) = 0;
+ uint8_t isLastChunkWithSecurity,
+ const Cache* cache) = 0;
public:
inline TcrChunkedResult()
@@ -75,13 +76,14 @@ class TcrChunkedResult {
virtual void reset() = 0;
void fireHandleChunk(const uint8_t* bytes, int32_t len,
- uint8_t isLastChunkWithSecurity) {
+ uint8_t isLastChunkWithSecurity, const Cache* cache) {
if (appDomainContext) {
- appDomainContext->run([this, bytes, len, isLastChunkWithSecurity]() {
- handleChunk(bytes, len, isLastChunkWithSecurity);
- });
+ appDomainContext->run(
+ [this, bytes, len, isLastChunkWithSecurity, &cache]() {
+ handleChunk(bytes, len, isLastChunkWithSecurity, cache);
+ });
} else {
- handleChunk(bytes, len, isLastChunkWithSecurity);
+ handleChunk(bytes, len, isLastChunkWithSecurity, cache);
}
}
@@ -135,15 +137,17 @@ class TcrChunkedContext {
const uint8_t* m_bytes;
const int32_t m_len;
const uint8_t m_isLastChunkWithSecurity;
+ const Cache* m_cache;
TcrChunkedResult* m_result;
public:
inline TcrChunkedContext(const uint8_t* bytes, int32_t len,
TcrChunkedResult* result,
- uint8_t isLastChunkWithSecurity)
+ uint8_t isLastChunkWithSecurity, const Cache* cache)
: m_bytes(bytes),
m_len(len),
m_isLastChunkWithSecurity(isLastChunkWithSecurity),
+ m_cache(cache),
m_result(result) {}
inline ~TcrChunkedContext() { GF_SAFE_DELETE_ARRAY(m_bytes); }
@@ -158,7 +162,8 @@ class TcrChunkedContext {
m_result->finalize(inSameThread);
} else if (!m_result->exceptionOccurred()) {
try {
- m_result->fireHandleChunk(m_bytes, m_len, m_isLastChunkWithSecurity);
+ m_result->fireHandleChunk(m_bytes, m_len, m_isLastChunkWithSecurity,
+ m_cache);
} catch (Exception& ex) {
LOGERROR("HandleChunk error message %s, name = %s", ex.getMessage(),
ex.getName());
http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/TcrConnection.cpp
----------------------------------------------------------------------
diff --git a/src/cppcache/src/TcrConnection.cpp b/src/cppcache/src/TcrConnection.cpp
index f11db0c..dafaed2 100644
--- a/src/cppcache/src/TcrConnection.cpp
+++ b/src/cppcache/src/TcrConnection.cpp
@@ -62,6 +62,9 @@ bool TcrConnection::InitTcrConnection(
m_creationTime = ACE_OS::gettimeofday();
connectionId = INITIAL_CONNECTION_ID;
m_lastAccessed = ACE_OS::gettimeofday();
+ const auto& distributedSystem =
+ m_poolDM->getConnectionManager().getCacheImpl()->getDistributedSystem();
+ const auto& sysProp = distributedSystem.getSystemProperties();
LOGDEBUG(
"Tcrconnection const isSecondary = %d and isClientNotification = %d, "
@@ -77,8 +80,6 @@ bool TcrConnection::InitTcrConnection(
GF_DEV_ASSERT(!isSecondary || isClientNotification);
- DistributedSystemPtr dsys = DistributedSystem::getInstance();
-
// Create TcpConn object which manages a socket connection with the endpoint.
if (endpointObj && endpointObj->getPoolHADM()) {
m_conn = createConnection(
@@ -87,33 +88,34 @@ bool TcrConnection::InitTcrConnection(
endpointObj->getPoolHADM()->getSocketBufferSize()));
isPool = true;
} else {
- m_conn = createConnection(m_endpoint, connectTimeout, 0);
+ m_conn = createConnection(m_endpoint, connectTimeout,
+ sysProp.maxSocketBufferSize());
}
GF_DEV_ASSERT(m_conn != nullptr);
- DataOutput handShakeMsg;
+ auto handShakeMsg = m_poolDM->getConnectionManager().getCacheImpl()->getCache()->createDataOutput();
bool isNotificationChannel = false;
// Send byte Acceptor.CLIENT_TO_SERVER = (byte) 100;
// Send byte Acceptor.SERVER_TO_CLIENT = (byte) 101;
if (isClientNotification) {
isNotificationChannel = true;
if (isSecondary) {
- handShakeMsg.write(static_cast<int8_t>(SECONDARY_SERVER_TO_CLIENT));
+ handShakeMsg->write(static_cast<int8_t>(SECONDARY_SERVER_TO_CLIENT));
} else {
- handShakeMsg.write(static_cast<int8_t>(PRIMARY_SERVER_TO_CLIENT));
+ handShakeMsg->write(static_cast<int8_t>(PRIMARY_SERVER_TO_CLIENT));
}
} else {
- handShakeMsg.write(static_cast<int8_t>(CLIENT_TO_SERVER));
+ handShakeMsg->write(static_cast<int8_t>(CLIENT_TO_SERVER));
}
// added for versioned client
int8_t versionOrdinal = Version::getOrdinal();
- handShakeMsg.write(versionOrdinal);
+ handShakeMsg->write(versionOrdinal);
LOGFINE("Client version ordinal is %d", versionOrdinal);
- handShakeMsg.write(static_cast<int8_t>(REPLY_OK));
+ handShakeMsg->write(static_cast<int8_t>(REPLY_OK));
// Send byte REPLY_OK = (byte)58;
if (!isClientNotification) {
@@ -122,9 +124,9 @@ bool TcrConnection::InitTcrConnection(
} else {
// add the local ports to message
Set<uint16_t>::Iterator iter = ports.iterator();
- handShakeMsg.writeInt(static_cast<int32_t>(ports.size()));
+ handShakeMsg->writeInt(static_cast<int32_t>(ports.size()));
while (iter.hasNext()) {
- handShakeMsg.writeInt(static_cast<int32_t>(iter.next()));
+ handShakeMsg->writeInt(static_cast<int32_t>(iter.next()));
}
}
@@ -134,21 +136,21 @@ bool TcrConnection::InitTcrConnection(
// permissible value for bug #232 for now.
// minus 10 sec because the GFE 5.7 gridDev branch adds a
// 5 sec buffer which was causing an int overflow.
- handShakeMsg.writeInt((int32_t)0x7fffffff - 10000);
+ handShakeMsg->writeInt((int32_t)0x7fffffff - 10000);
}
// Write header for byte FixedID since GFE 5.7
- handShakeMsg.write(static_cast<int8_t>(GeodeTypeIdsImpl::FixedIDByte));
+ handShakeMsg->write(static_cast<int8_t>(GeodeTypeIdsImpl::FixedIDByte));
// Writing byte for ClientProxyMembershipID class id=38 as registered on the
// java server.
- handShakeMsg.write(
+ handShakeMsg->write(
static_cast<int8_t>(GeodeTypeIdsImpl::ClientProxyMembershipId));
if (endpointObj->getPoolHADM()) {
ClientProxyMembershipID* memId =
endpointObj->getPoolHADM()->getMembershipId();
uint32_t memIdBufferLength;
const char* memIdBuffer = memId->getDSMemberId(memIdBufferLength);
- handShakeMsg.writeBytes((int8_t*)memIdBuffer, memIdBufferLength);
+ handShakeMsg->writeBytes((int8_t*)memIdBuffer, memIdBufferLength);
} else {
ACE_TCHAR hostName[256];
ACE_OS::hostname(hostName, sizeof(hostName) - 1);
@@ -158,42 +160,37 @@ bool TcrConnection::InitTcrConnection(
uint16_t hostPort = 0;
// Add 3 durable Subcription properties to ClientProxyMembershipID
- SystemProperties* sysProp = DistributedSystem::getSystemProperties();
- const char* durableId =
- (sysProp != nullptr) ? sysProp->durableClientId() : nullptr;
- const uint32_t durableTimeOut =
- (sysProp != nullptr) ? sysProp->durableTimeout() : 0;
+ const char* durableId = sysProp.durableClientId();
+ const uint32_t durableTimeOut = sysProp.durableTimeout();
// Write ClientProxyMembershipID serialized object.
uint32_t memIdBufferLength;
- ClientProxyMembershipID memId(hostName, hostAddr, hostPort, durableId,
- durableTimeOut);
- const char* memIdBuffer = memId.getDSMemberId(memIdBufferLength);
- handShakeMsg.writeBytes((int8_t*)memIdBuffer, memIdBufferLength);
+ const auto memId =
+ m_connectionManager->getCacheImpl()
+ ->getClientProxyMembershipIDFactory()
+ .create(hostName, hostAddr, hostPort, durableId, durableTimeOut);
+ const auto memIdBuffer = memId->getDSMemberId(memIdBufferLength);
+ handShakeMsg->writeBytes((int8_t*)memIdBuffer, memIdBufferLength);
}
- handShakeMsg.writeInt((int32_t)1);
+ handShakeMsg->writeInt((int32_t)1);
bool isDhOn = false;
bool requireServerAuth = false;
PropertiesPtr credentials;
CacheableBytesPtr serverChallenge;
- SystemProperties* tmpSystemProperties =
- DistributedSystem::getSystemProperties();
-
// Write overrides (just conflation for now)
- handShakeMsg.write(getOverrides(tmpSystemProperties));
+ handShakeMsg->write(getOverrides(&sysProp));
- bool tmpIsSecurityOn = tmpSystemProperties->isSecurityOn();
- isDhOn = tmpSystemProperties->isDhOn();
+ bool tmpIsSecurityOn = sysProp.isSecurityOn();
+ isDhOn = sysProp.isDhOn();
if (m_endpointObj) {
- tmpIsSecurityOn = tmpSystemProperties->isSecurityOn() ||
- this->m_endpointObj->isMultiUserMode();
+ tmpIsSecurityOn =
+ sysProp.isSecurityOn() || this->m_endpointObj->isMultiUserMode();
CacheableStringPtr dhalgo =
- tmpSystemProperties->getSecurityProperties()->find(
- "security-client-dhalgo");
+ sysProp.getSecurityProperties()->find("security-client-dhalgo");
LOGDEBUG("TcrConnection this->m_endpointObj->isMultiUserMode() = %d ",
this->m_endpointObj->isMultiUserMode());
@@ -205,7 +202,7 @@ bool TcrConnection::InitTcrConnection(
LOGDEBUG(
"TcrConnection algo name %s tmpIsSecurityOn = %d isDhOn = %d "
"isNotificationChannel = %d ",
- tmpSystemProperties->securityClientDhAlgo(), tmpIsSecurityOn, isDhOn,
+ sysProp.securityClientDhAlgo(), tmpIsSecurityOn, isDhOn,
isNotificationChannel);
bool doIneedToSendCreds = true;
if (isNotificationChannel && m_endpointObj &&
@@ -216,48 +213,35 @@ bool TcrConnection::InitTcrConnection(
}
if (isNotificationChannel && !doIneedToSendCreds) {
- handShakeMsg.write(
+ handShakeMsg->write(
static_cast<uint8_t>(SECURITY_MULTIUSER_NOTIFICATIONCHANNEL));
} else if (isDhOn) {
m_dh = new DiffieHellman();
- m_dh->initDhKeys(tmpSystemProperties->getSecurityProperties());
- handShakeMsg.write(static_cast<uint8_t>(SECURITY_CREDENTIALS_DHENCRYPT));
+ m_dh->initDhKeys(sysProp.getSecurityProperties());
+ handShakeMsg->write(static_cast<uint8_t>(SECURITY_CREDENTIALS_DHENCRYPT));
} else if (tmpIsSecurityOn) {
- handShakeMsg.write(static_cast<uint8_t>(SECURITY_CREDENTIALS_NORMAL));
+ handShakeMsg->write(static_cast<uint8_t>(SECURITY_CREDENTIALS_NORMAL));
} else {
- handShakeMsg.write(static_cast<uint8_t>(SECURITY_CREDENTIALS_NONE));
+ handShakeMsg->write(static_cast<uint8_t>(SECURITY_CREDENTIALS_NONE));
}
if (tmpIsSecurityOn) {
try {
LOGFINER("TcrConnection: about to invoke authloader");
- PropertiesPtr tmpSecurityProperties =
- tmpSystemProperties->getSecurityProperties();
+ const auto& tmpSecurityProperties = sysProp.getSecurityProperties();
if (tmpSecurityProperties == nullptr) {
LOGWARN("TcrConnection: security properties not found.");
}
- // AuthInitializePtr authInitialize =
- // tmpSystemProperties->getAuthLoader();
- //:only for backward connection
+ // only for backward connection
if (isClientNotification) {
- AuthInitializePtr authInitialize =
- DistributedSystem::m_impl->getAuthLoader();
- if (authInitialize != nullptr) {
+ if (const auto& authInitialize =
+ distributedSystem.m_impl->getAuthLoader()) {
LOGFINER(
"TcrConnection: acquired handle to authLoader, "
"invoking getCredentials");
- /* adongre
- * CID 28898: Copy into fixed size buffer (STRING_OVERFLOW)
- * You might overrun the 100 byte fixed-size string "tmpEndpoint" by
- * copying "this->m_endpoint" without checking the length.
- * Note: This defect has an elevated risk because the source argument
- * is a parameter of the current function.
- */
- // char tmpEndpoint[100] = { '\0' } ;
- // strcpy(tmpEndpoint, m_endpoint);
- PropertiesPtr tmpAuthIniSecurityProperties =
- authInitialize->getCredentials(tmpSecurityProperties,
- /*tmpEndpoint*/ m_endpoint);
+
+ const auto& tmpAuthIniSecurityProperties =
+ authInitialize->getCredentials(tmpSecurityProperties, m_endpoint);
LOGFINER("TcrConnection: after getCredentials ");
credentials = tmpAuthIniSecurityProperties;
}
@@ -267,20 +251,20 @@ bool TcrConnection::InitTcrConnection(
CacheableStringPtr ksPath =
tmpSecurityProperties->find("security-client-kspath");
requireServerAuth = (ksPath != nullptr && ksPath->length() > 0);
- handShakeMsg.writeBoolean(requireServerAuth);
+ handShakeMsg->writeBoolean(requireServerAuth);
LOGFINE(
"HandShake: Server authentication using RSA signature %s required",
requireServerAuth ? "is" : "not");
// Send the symmetric key algorithm name string
- handShakeMsg.write(static_cast<int8_t>(GeodeTypeIds::CacheableString));
- handShakeMsg.writeASCII(tmpSystemProperties->securityClientDhAlgo());
+ handShakeMsg->write(static_cast<int8_t>(GeodeTypeIds::CacheableString));
+ handShakeMsg->writeASCII(sysProp.securityClientDhAlgo());
// Send the client's DH public key to the server
// CacheableBytesPtr dhPubKey = DiffieHellman::getPublicKey();
CacheableBytesPtr dhPubKey = m_dh->getPublicKey();
LOGDEBUG("DH pubkey send len is %d", dhPubKey->length());
- dhPubKey->toData(handShakeMsg);
+ dhPubKey->toData(*handShakeMsg);
if (requireServerAuth) {
char serverChallengeBytes[64] = {0};
@@ -290,11 +274,11 @@ bool TcrConnection::InitTcrConnection(
}
serverChallenge = CacheableBytes::create(
reinterpret_cast<const uint8_t*>(serverChallengeBytes), 64);
- serverChallenge->toData(handShakeMsg);
+ serverChallenge->toData(*handShakeMsg);
}
} else { // if isDhOn
if (isClientNotification) { //:only for backward connection
- credentials->toData(handShakeMsg);
+ credentials->toData(*handShakeMsg);
}
} // else isDhOn
} catch (const AuthenticationRequiredException&) {
@@ -314,7 +298,7 @@ bool TcrConnection::InitTcrConnection(
}
uint32_t msgLengh;
- char* data = (char*)handShakeMsg.getBuffer(&msgLengh);
+ char* data = (char*)handShakeMsg->getBuffer(&msgLengh);
LOGFINE("Attempting handshake with endpoint %s for %s%s connection", endpoint,
isClientNotification ? (isSecondary ? "secondary " : "primary ") : "",
isClientNotification ? "subscription" : "client");
@@ -325,8 +309,7 @@ bool TcrConnection::InitTcrConnection(
LOGDEBUG(" Handshake: Got Accept Code %d", (*acceptanceCode)[0]);
/* adongre */
- if ((*acceptanceCode)[0] == REPLY_SSL_ENABLED &&
- !tmpSystemProperties->sslEnabled()) {
+ if ((*acceptanceCode)[0] == REPLY_SSL_ENABLED && !sysProp.sslEnabled()) {
LOGERROR("SSL is enabled on server, enable SSL in client as well");
AuthenticationRequiredException ex(
"SSL is enabled on server, enable SSL in client as well");
@@ -370,18 +353,18 @@ bool TcrConnection::InitTcrConnection(
LOGDEBUG("Handshake: Got challengeSize %d", challengeBytes->length());
// encrypt the credentials and challenge bytes
- DataOutput cleartext;
+ auto cleartext = m_poolDM->getConnectionManager().getCacheImpl()->getCache()->createDataOutput();
if (isClientNotification) { //:only for backward connection
- credentials->toData(cleartext);
+ credentials->toData(*cleartext);
}
- challengeBytes->toData(cleartext);
+ challengeBytes->toData(*cleartext);
CacheableBytesPtr ciphertext =
- m_dh->encrypt(cleartext.getBuffer(), cleartext.getBufferLength());
+ m_dh->encrypt(cleartext->getBuffer(), cleartext->getBufferLength());
- DataOutput sendCreds;
- ciphertext->toData(sendCreds);
+ auto sendCreds = m_poolDM->getConnectionManager().getCacheImpl()->getCache()->createDataOutput();
+ ciphertext->toData(*sendCreds);
uint32_t credLen;
- char* credData = (char*)sendCreds.getBuffer(&credLen);
+ char* credData = (char*)sendCreds->getBuffer(&credLen);
// send the encrypted bytes and check the response
error = sendData(credData, credLen, connectTimeout, false);
@@ -418,9 +401,9 @@ bool TcrConnection::InitTcrConnection(
m_hasServerQueue = NON_REDUNDANT_SERVER;
}
CacheableBytesPtr queueSizeMsg = readHandshakeData(4, connectTimeout);
- DataInput dI(queueSizeMsg->value(), queueSizeMsg->length());
+ auto dI = m_connectionManager->getCacheImpl()->getCache()->createDataInput(queueSizeMsg->value(), queueSizeMsg->length());
int32_t queueSize = 0;
- dI.readInt(&queueSize);
+ dI->readInt(&queueSize);
m_queueSize = queueSize > 0 ? queueSize : 0;
m_endpointObj->setServerQueueStatus(m_hasServerQueue, m_queueSize);
@@ -449,43 +432,49 @@ bool TcrConnection::InitTcrConnection(
if (static_cast<int8_t>((*arrayLenHeader)[0]) == -2) {
CacheableBytesPtr recvMsgLenBytes =
readHandshakeData(2, connectTimeout);
- DataInput dI2(recvMsgLenBytes->value(), recvMsgLenBytes->length());
+ auto dI2 = m_connectionManager->getCacheImpl()->getCache()->createDataInput(
+ recvMsgLenBytes->value(), recvMsgLenBytes->length());
int16_t recvMsgLenShort = 0;
- dI2.readInt(&recvMsgLenShort);
+ dI2->readInt(&recvMsgLenShort);
recvMsgLen = recvMsgLenShort;
} else if (static_cast<int8_t>((*arrayLenHeader)[0]) == -3) {
CacheableBytesPtr recvMsgLenBytes =
readHandshakeData(4, connectTimeout);
- DataInput dI2(recvMsgLenBytes->value(), recvMsgLenBytes->length());
- dI2.readInt(&recvMsgLen);
+ auto dI2 = m_connectionManager->getCacheImpl()->getCache()->createDataInput(
+ recvMsgLenBytes->value(), recvMsgLenBytes->length());
+ dI2->readInt(&recvMsgLen);
}
- CacheableBytesPtr recvMessage =
- readHandshakeData(recvMsgLen, connectTimeout);
+ auto recvMessage = readHandshakeData(recvMsgLen, connectTimeout);
// If the distributed member has not been set yet, set it.
if (getEndpointObject()->getDistributedMemberID() == 0) {
LOGDEBUG("Deserializing distributed member Id");
- DataInput diForClient(recvMessage->value(), recvMessage->length());
+ auto diForClient = m_connectionManager->getCacheImpl()->getCache()->createDataInput(
+ recvMessage->value(), recvMessage->length());
ClientProxyMembershipIDPtr member;
- diForClient.readObject(member);
- uint16_t memId = CacheImpl::getMemberListForVersionStamp()->add(
- (DSMemberForVersionStampPtr)member);
+ diForClient->readObject(member);
+ auto memId = m_poolDM->getConnectionManager()
+ .getCacheImpl()
+ ->getMemberListForVersionStamp()
+ ->add(member);
getEndpointObject()->setDistributedMemberID(memId);
LOGDEBUG("Deserialized distributed member Id %d", memId);
}
}
CacheableBytesPtr recvMsgLenBytes = readHandshakeData(2, connectTimeout);
- DataInput dI3(recvMsgLenBytes->value(), recvMsgLenBytes->length());
+ auto dI3 = m_connectionManager->getCacheImpl()->getCache()->createDataInput(
+ recvMsgLenBytes->value(), recvMsgLenBytes->length());
uint16_t recvMsgLen2 = 0;
- dI3.readInt(&recvMsgLen2);
+ dI3->readInt(&recvMsgLen2);
CacheableBytesPtr recvMessage =
readHandshakeData(recvMsgLen2, connectTimeout);
if (!isClientNotification) {
CacheableBytesPtr deltaEnabledMsg = readHandshakeData(1, connectTimeout);
- DataInput di(deltaEnabledMsg->value(), 1);
+ auto di = m_connectionManager->getCacheImpl()->getCache()->createDataInput(
+ deltaEnabledMsg->value(), 1);
bool isDeltaEnabledOnServer;
- di.readBoolean(&isDeltaEnabledOnServer);
+ di->readBoolean(&isDeltaEnabledOnServer);
ThinClientBaseDM::setDeltaEnabledOnServer(isDeltaEnabledOnServer);
}
@@ -579,8 +568,14 @@ Connector* TcrConnection::createConnection(const char* endpoint,
uint32_t connectTimeout,
int32_t maxBuffSizePool) {
Connector* socket = nullptr;
- if (DistributedSystem::getSystemProperties()->sslEnabled()) {
- socket = new TcpSslConn(endpoint, connectTimeout, maxBuffSizePool);
+ auto& systemProperties = m_connectionManager->getCacheImpl()
+ ->getDistributedSystem()
+ .getSystemProperties();
+ if (systemProperties.sslEnabled()) {
+ socket = new TcpSslConn(endpoint, connectTimeout, maxBuffSizePool,
+ systemProperties.sslKeystorePassword(),
+ systemProperties.sslTrustStore(),
+ systemProperties.sslKeyStore());
} else {
socket = new TcpConn(endpoint, connectTimeout, maxBuffSizePool);
}
@@ -611,7 +606,11 @@ inline ConnErrType TcrConnection::receiveData(char* buffer, int32_t length,
// if gfcpp property unit set then sendTimeoutSec will be in millisecond
// otherwise it will be in second
- if (DistributedSystem::getSystemProperties()->readTimeoutUnitInMillis()) {
+ if (m_poolDM->getConnectionManager()
+ .getCacheImpl()
+ ->getDistributedSystem()
+ .getSystemProperties()
+ .readTimeoutUnitInMillis()) {
LOGFINER("recieveData %d %d ", receiveTimeoutSec, notPublicApiWithTimeout);
if (notPublicApiWithTimeout == TcrMessage::QUERY ||
notPublicApiWithTimeout == TcrMessage::QUERY_WITH_PARAMETERS ||
@@ -694,7 +693,11 @@ inline ConnErrType TcrConnection::sendData(uint32_t& timeSpent,
bool isPublicApiTimeout = false;
// if gfcpp property unit set then sendTimeoutSec will be in millisecond
// otherwise it will be in second
- if (DistributedSystem::getSystemProperties()->readTimeoutUnitInMillis()) {
+ if (m_poolDM->getConnectionManager()
+ .getCacheImpl()
+ ->getDistributedSystem()
+ .getSystemProperties()
+ .readTimeoutUnitInMillis()) {
LOGFINER("sendData %d %d", sendTimeoutSec, notPublicApiWithTimeout);
if (notPublicApiWithTimeout == TcrMessage::QUERY ||
notPublicApiWithTimeout == TcrMessage::QUERY_WITH_PARAMETERS ||
@@ -950,9 +953,10 @@ char* TcrConnection::readMessage(size_t* recvLen, uint32_t receiveTimeoutSec,
m_endpoint,
Utils::convertBytesToString(msg_header, HEADER_LENGTH)->asChar());
- DataInput input(reinterpret_cast<uint8_t*>(msg_header), HEADER_LENGTH);
- input.readInt(&msgType);
- input.readInt(&msgLen);
+ auto input = m_connectionManager->getCacheImpl()->getCache()->createDataInput(
+ reinterpret_cast<uint8_t*>(msg_header), HEADER_LENGTH);
+ input->readInt(&msgType);
+ input->readInt(&msgLen);
// check that message length is valid.
if (!(msgLen > 0) && request == TcrMessage::GET_CLIENT_PR_METADATA) {
char* fullMessage;
@@ -1059,17 +1063,18 @@ void TcrConnection::readMessageChunked(TcrMessageReply& reply,
m_endpoint,
Utils::convertBytesToString(msg_header, HDR_LEN_12)->asChar());
- DataInput input(msg_header, HDR_LEN_12);
+ auto input = m_connectionManager->getCacheImpl()->getCache()->createDataInput(
+ msg_header, HDR_LEN_12);
int32_t msgType;
- input.readInt(&msgType);
+ input->readInt(&msgType);
reply.setMessageType(msgType);
int32_t txId;
int32_t numOfParts;
- input.readInt(&numOfParts);
+ input->readInt(&numOfParts);
LOGDEBUG("TcrConnection::readMessageChunked numberof parts = %d ",
numOfParts);
- // input.advanceCursor(4);
- input.readInt(&txId);
+ // input->advanceCursor(4);
+ input->readInt(&txId);
reply.setTransId(txId);
// bool isLastChunk = false;
@@ -1125,13 +1130,14 @@ void TcrConnection::readMessageChunked(TcrMessageReply& reply,
Utils::convertBytesToString((msg_header + HDR_LEN_12), HDR_LEN)
->asChar());
- DataInput inp((msg_header + HDR_LEN_12), HDR_LEN);
+ auto input = m_connectionManager->getCacheImpl()->getCache()->createDataInput(
+ msg_header + HDR_LEN_12, HDR_LEN);
int32_t chunkLen;
- inp.readInt(&chunkLen);
+ input->readInt(&chunkLen);
// check that chunk length is valid.
GF_DEV_ASSERT(chunkLen > 0);
- // inp.readBoolean(&isLastChunk);
- inp.read(&isLastChunk);
+ // input->readBoolean(&isLastChunk);
+ input->read(&isLastChunk);
uint8_t* chunk_body;
GF_NEW(chunk_body, uint8_t[chunkLen]);
@@ -1172,17 +1178,19 @@ void TcrConnection::readMessageChunked(TcrMessageReply& reply,
void TcrConnection::close() {
// If this is a short lived grid client, don't bother with this close ack
// message
- if (DistributedSystem::getSystemProperties()->isGridClient()) {
+ if (m_poolDM->getConnectionManager()
+ .getCacheImpl()
+ ->getDistributedSystem()
+ .getSystemProperties()
+ .isGridClient()) {
return;
}
- TcrMessage* closeMsg = TcrMessage::getCloseConnMessage();
+ TcrMessage* closeMsg = TcrMessage::getCloseConnMessage(
+ m_poolDM->getConnectionManager().getCacheImpl()->getCache());
try {
- // LOGINFO("TcrConnection::close DC = %d; netdown = %d endpoint %s",
- // TcrConnectionManager::TEST_DURABLE_CLIENT_CRASH,
- // TcrConnectionManager::isNetDown, m_endpoint);
if (!TcrConnectionManager::TEST_DURABLE_CLIENT_CRASH &&
- !TcrConnectionManager::isNetDown) {
+ !m_connectionManager->isNetDown()) {
send(closeMsg->getMsgData(), closeMsg->getMsgLength(), 2, false);
}
} catch (Exception& e) {
@@ -1285,9 +1293,10 @@ CacheableBytesPtr TcrConnection::readHandshakeByteArray(
// read a byte array
uint32_t TcrConnection::readHandshakeArraySize(uint32_t connectTimeout) {
CacheableBytesPtr codeBytes = readHandshakeData(1, connectTimeout);
- DataInput codeDI(codeBytes->value(), codeBytes->length());
+ auto codeDI = m_connectionManager->getCacheImpl()->getCache()->createDataInput(
+ codeBytes->value(), codeBytes->length());
uint8_t code = 0;
- codeDI.read(&code);
+ codeDI->read(&code);
uint32_t arraySize = 0;
if (code == 0xFF) {
return 0;
@@ -1296,15 +1305,15 @@ uint32_t TcrConnection::readHandshakeArraySize(uint32_t connectTimeout) {
if (tempLen > 252) { // 252 is java's ((byte)-4 && 0xFF)
if (code == 0xFE) {
CacheableBytesPtr lenBytes = readHandshakeData(2, connectTimeout);
- DataInput lenDI(lenBytes->value(), lenBytes->length());
+ auto lenDI = m_connectionManager->getCacheImpl()->getCache()->createDataInput(lenBytes->value(), lenBytes->length());
uint16_t val;
- lenDI.readInt(&val);
+ lenDI->readInt(&val);
tempLen = val;
} else if (code == 0xFD) {
CacheableBytesPtr lenBytes = readHandshakeData(4, connectTimeout);
- DataInput lenDI(lenBytes->value(), lenBytes->length());
+ auto lenDI = m_connectionManager->getCacheImpl()->getCache()->createDataInput(lenBytes->value(), lenBytes->length());
uint32_t val;
- lenDI.readInt(&val);
+ lenDI->readInt(&val);
tempLen = val;
} else {
GF_SAFE_DELETE_CON(m_conn);
@@ -1393,9 +1402,9 @@ int32_t TcrConnection::readHandShakeInt(uint32_t connectTimeout) {
}
}
- DataInput di(recvMessage, 4);
+ auto di = m_connectionManager->getCacheImpl()->getCache()->createDataInput(recvMessage, 4);
int32_t val;
- di.readInt(&val);
+ di->readInt(&val);
GF_SAFE_DELETE_ARRAY(recvMessage);
@@ -1431,8 +1440,8 @@ CacheableStringPtr TcrConnection::readHandshakeString(uint32_t connectTimeout) {
case GF_STRING: {
uint16_t shortLen = 0;
CacheableBytesPtr lenBytes = readHandshakeData(2, connectTimeout);
- DataInput lenDI(lenBytes->value(), lenBytes->length());
- lenDI.readInt(&shortLen);
+ auto lenDI = m_connectionManager->getCacheImpl()->getCache()->createDataInput(lenBytes->value(), lenBytes->length());
+ lenDI->readInt(&shortLen);
length = shortLen;
break;
}
@@ -1512,7 +1521,7 @@ void TcrConnection::touch() { m_lastAccessed = ACE_OS::gettimeofday(); }
ACE_Time_Value TcrConnection::getLastAccessed() { return m_lastAccessed; }
-uint8_t TcrConnection::getOverrides(SystemProperties* props) {
+uint8_t TcrConnection::getOverrides(const SystemProperties* props) {
const char* conflate = props->conflateEvents();
uint8_t conflateByte = 0;
if (conflate != nullptr) {
@@ -1522,27 +1531,7 @@ uint8_t TcrConnection::getOverrides(SystemProperties* props) {
conflateByte = 2;
}
}
- /*
- const char * removeUnresponsive = props->removeUnresponsiveClientOverride();
- uint8_t removeByte = 0;
- if (removeUnresponsive != nullptr ) {
- if ( ACE_OS::strcasecmp(removeUnresponsive, "true") == 0 ) {
- removeByte = 1;
- } else if ( ACE_OS::strcasecmp(removeUnresponsive, "false") == 0 ) {
- removeByte = 2;
- }
- }
- const char * notify = props->notifyBySubscriptionOverride();
- uint8_t notifyByte = 0;
- if (notify != nullptr ) {
- if ( ACE_OS::strcasecmp(notify, "true") == 0 ) {
- notifyByte = 1;
- } else if ( ACE_OS::strcasecmp(notify, "false") == 0 ) {
- notifyByte = 2;
- }
- }
- return (((notifyByte << 2) | removeByte) << 2) | conflateByte;
- */
+
return conflateByte;
}
http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/TcrConnection.hpp
----------------------------------------------------------------------
diff --git a/src/cppcache/src/TcrConnection.hpp b/src/cppcache/src/TcrConnection.hpp
index 9e8d873..fc8f54f 100644
--- a/src/cppcache/src/TcrConnection.hpp
+++ b/src/cppcache/src/TcrConnection.hpp
@@ -79,6 +79,7 @@ enum ServerQueueStatus {
class TcrEndpoint;
class SystemProperties;
class ThinClientPoolDM;
+class TcrConnectionManager;
class CPPCACHE_EXPORT TcrConnection {
public:
/** Create one connection, endpoint is in format of hostname:portno
@@ -111,8 +112,10 @@ class CPPCACHE_EXPORT TcrConnection {
bool isSecondary = false,
uint32_t connectTimeout = DEFAULT_CONNECT_TIMEOUT);
- TcrConnection(volatile const bool& isConnected)
+ TcrConnection(const TcrConnectionManager& connectionManager,
+ volatile const bool& isConnected)
: connectionId(0),
+ m_connectionManager(&connectionManager),
m_dh(nullptr),
m_endpoint(nullptr),
m_endpointObj(nullptr),
@@ -279,6 +282,10 @@ class CPPCACHE_EXPORT TcrConnection {
connectionId = id;
}
+ const TcrConnectionManager& getConnectionManager() {
+ return *m_connectionManager;
+ }
+
CacheableBytesPtr encryptBytes(CacheableBytesPtr data) {
if (m_dh != nullptr) {
return m_dh->encrypt(data);
@@ -297,6 +304,7 @@ class CPPCACHE_EXPORT TcrConnection {
private:
int64_t connectionId;
+ const TcrConnectionManager* m_connectionManager;
DiffieHellman* m_dh;
/**
* To read Intantiator message(which meant for java client), here we are
@@ -308,7 +316,7 @@ class CPPCACHE_EXPORT TcrConnection {
* Packs the override settings bits into bytes - currently a single byte for
* conflation, remove-unresponsive-client and notify-by-subscription.
*/
- uint8_t getOverrides(SystemProperties* props);
+ uint8_t getOverrides(const SystemProperties* props);
/**
* To read the from stream
http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/TcrConnectionManager.cpp
----------------------------------------------------------------------
diff --git a/src/cppcache/src/TcrConnectionManager.cpp b/src/cppcache/src/TcrConnectionManager.cpp
index 71d2347..714f479 100644
--- a/src/cppcache/src/TcrConnectionManager.cpp
+++ b/src/cppcache/src/TcrConnectionManager.cpp
@@ -39,7 +39,6 @@
namespace apache {
namespace geode {
namespace client {
-volatile bool TcrConnectionManager::isNetDown = false;
volatile bool TcrConnectionManager::TEST_DURABLE_CLIENT_CRASH = false;
const char *TcrConnectionManager::NC_Redundancy = "NC Redundancy";
@@ -59,7 +58,8 @@ TcrConnectionManager::TcrConnectionManager(CacheImpl *cache)
m_notifyCleanupSemaList(false),
m_redundancySema(0),
m_redundancyTask(nullptr),
- m_isDurable(false) {
+ m_isDurable(false),
+ m_isNetDown(false) {
m_redundancyManager = new ThinClientRedundancyManager(this);
}
@@ -70,14 +70,14 @@ void TcrConnectionManager::init(bool isPool) {
} else {
return;
}
- SystemProperties *props = DistributedSystem::getSystemProperties();
- m_isDurable = strlen(props->durableClientId()) > 0;
- int32_t pingInterval = (props->pingInterval() / 2);
- if (!props->isGridClient() && !isPool) {
+ auto &props = m_cache->getDistributedSystem().getSystemProperties();
+ m_isDurable = strlen(props.durableClientId()) > 0;
+ int32_t pingInterval = (props.pingInterval() / 2);
+ if (!props.isGridClient() && !isPool) {
ACE_Event_Handler *connectionChecker =
new ExpiryHandler_T<TcrConnectionManager>(
this, &TcrConnectionManager::checkConnection);
- m_pingTaskId = CacheImpl::expiryTaskManager->scheduleExpiryTask(
+ m_pingTaskId = m_cache->getExpiryTaskManager().scheduleExpiryTask(
connectionChecker, 10, pingInterval, false);
LOGFINE(
"TcrConnectionManager::TcrConnectionManager Registered ping "
@@ -105,9 +105,9 @@ void TcrConnectionManager::init(bool isPool) {
ACE_Event_Handler *redundancyChecker =
new ExpiryHandler_T<TcrConnectionManager>(
this, &TcrConnectionManager::checkRedundancy);
- int32_t redundancyMonitorInterval = props->redundancyMonitorInterval();
+ int32_t redundancyMonitorInterval = props.redundancyMonitorInterval();
- m_servermonitorTaskId = CacheImpl::expiryTaskManager->scheduleExpiryTask(
+ m_servermonitorTaskId = m_cache->getExpiryTaskManager().scheduleExpiryTask(
redundancyChecker, 1, redundancyMonitorInterval, false);
LOGFINE(
"TcrConnectionManager::TcrConnectionManager Registered server "
@@ -125,7 +125,7 @@ void TcrConnectionManager::init(bool isPool) {
m_redundancyManager->m_HAenabled = true;
}
- if (!props->isGridClient()) {
+ if (!props.isGridClient()) {
startFailoverAndCleanupThreads(isPool);
}
}
@@ -152,7 +152,7 @@ void TcrConnectionManager::startFailoverAndCleanupThreads(bool isPool) {
void TcrConnectionManager::close() {
LOGFINE("TcrConnectionManager is closing");
if (m_pingTaskId > 0) {
- CacheImpl::expiryTaskManager->cancelTask(m_pingTaskId);
+ m_cache->getExpiryTaskManager().cancelTask(m_pingTaskId);
}
if (m_failoverTask != nullptr) {
@@ -166,7 +166,7 @@ void TcrConnectionManager::close() {
if (cacheAttributes != nullptr &&
(cacheAttributes->getRedundancyLevel() > 0 || m_isDurable)) {
if (m_servermonitorTaskId > 0) {
- CacheImpl::expiryTaskManager->cancelTask(m_servermonitorTaskId);
+ m_cache->getExpiryTaskManager().cancelTask(m_servermonitorTaskId);
}
if (m_redundancyTask != nullptr) {
m_redundancyTask->stopNoblock();
@@ -344,7 +344,7 @@ int TcrConnectionManager::checkConnection(const ACE_Time_Value &,
ACE_Recursive_Thread_Mutex>::iterator currItr =
m_endpoints.begin();
while (currItr != m_endpoints.end()) {
- if ((*currItr).int_id_->connected() && !isNetDown) {
+ if ((*currItr).int_id_->connected() && !m_isNetDown) {
(*currItr).int_id_->pingServer();
}
currItr++;
@@ -362,7 +362,7 @@ int TcrConnectionManager::failover(volatile bool &isRunning) {
LOGFINE("TcrConnectionManager: starting failover thread");
while (isRunning) {
m_failoverSema.acquire();
- if (isRunning && !isNetDown) {
+ if (isRunning && !m_isNetDown) {
try {
ACE_Guard<ACE_Recursive_Thread_Mutex> guard(m_distMngrsLock);
for (std::list<ThinClientBaseDM *>::iterator it = m_distMngrs.begin();
@@ -477,7 +477,7 @@ void TcrConnectionManager::removeHAEndpoints() {
}
void TcrConnectionManager::netDown() {
- isNetDown = true;
+ m_isNetDown = true;
// sleep for 15 seconds to allow ping and redundancy threads to pause.
std::this_thread::sleep_for(std::chrono::seconds(15));
@@ -499,7 +499,7 @@ void TcrConnectionManager::netDown() {
/* Need to do a get on unknown key after calling this Fn to restablish all
* connection */
void TcrConnectionManager::revive() {
- isNetDown = false;
+ m_isNetDown = false;
// sleep for 15 seconds to allow redundancy thread to reestablish
// connections.
@@ -510,7 +510,7 @@ int TcrConnectionManager::redundancy(volatile bool &isRunning) {
LOGFINE("Starting subscription maintain redundancy thread.");
while (isRunning) {
m_redundancySema.acquire();
- if (isRunning && !isNetDown) {
+ if (isRunning && !m_isNetDown) {
m_redundancyManager->maintainRedundancyLevel();
while (m_redundancySema.tryacquire() != -1) {
;
http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/TcrConnectionManager.hpp
----------------------------------------------------------------------
diff --git a/src/cppcache/src/TcrConnectionManager.hpp b/src/cppcache/src/TcrConnectionManager.hpp
index c902286..bb9afbc 100644
--- a/src/cppcache/src/TcrConnectionManager.hpp
+++ b/src/cppcache/src/TcrConnectionManager.hpp
@@ -76,7 +76,6 @@ class CPPCACHE_EXPORT TcrConnectionManager {
void netDown();
void revive();
void setClientCrashTEST() { TEST_DURABLE_CLIENT_CRASH = true; }
- volatile static bool isNetDown;
volatile static bool TEST_DURABLE_CLIENT_CRASH;
inline ACE_Map_Manager<std::string, TcrEndpoint*, ACE_Recursive_Thread_Mutex>&
@@ -106,7 +105,7 @@ class CPPCACHE_EXPORT TcrConnectionManager {
bool isDurable() { return m_isDurable; };
bool haEnabled() { return m_redundancyManager->m_HAenabled; };
- CacheImpl* getCacheImpl() { return m_cache; };
+ CacheImpl* getCacheImpl() const { return m_cache; };
GfErrType sendSyncRequestCq(TcrMessage& request, TcrMessageReply& reply,
TcrHADistributionManager* theHADM);
@@ -140,6 +139,8 @@ class CPPCACHE_EXPORT TcrConnectionManager {
return m_redundancyManager->sendRequestToPrimary(request, reply);
}
+ bool isNetDown() const { return m_isNetDown; }
+
private:
CacheImpl* m_cache;
volatile bool m_initGuard;
@@ -175,6 +176,8 @@ class CPPCACHE_EXPORT TcrConnectionManager {
ACE_Recursive_Thread_Mutex m_notificationLock;
bool m_isDurable;
+ bool m_isNetDown;
+
ThinClientRedundancyManager* m_redundancyManager;
int failover(volatile bool& isRunning);