You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bb...@apache.org on 2019/07/06 18:01:06 UTC
[geode-native] branch develop updated: GEODE 6835 - Remove use of
defunct client request
This is an automated email from the ASF dual-hosted git repository.
bbender pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode-native.git
The following commit(s) were added to refs/heads/develop by this push:
new 037a9ff GEODE 6835 - Remove use of defunct client request
037a9ff is described below
commit 037a9ffdbeca4da065f8fd997cc97d6065d55a0a
Author: Blake Bender <bb...@pivotal.io>
AuthorDate: Sat Jul 6 11:01:01 2019 -0700
GEODE 6835 - Remove use of defunct client request
* Remove defunct UPDATE_CLIENT_NOTIFICATION request
- Geode server has no code to handle this message, and hasn't for *several years*
- Sending the message while a security manager is in use will yield an AuthorizationRequiredException and cause the server to close the connection
- Remove commented-out code
- Adapt test framework to add user/password to connect command when using securityManager
* Miscellaneous Refactoring
- Remove gratuitous use of this pointer
- Fix crash at cache closing, and fix casing of m_keepAlive member variable
- Fix format specifier in log statement
- Add a cq test that verifies create/update/destroy notifications
- Fix typo in GF error code name
- Check for null member vars in catch block, to avoid segfault
- Remove TODO comment
- clang-format changes
- Fix casing of method initTcrConnection
- Break out AuthInitialize tests into separate file
- Count calls to AuthInitialize::getCredentials, verify that it's being called
- Test bad credentials on client
- Test with/without subscription enabled on pool (different failure mode)
- Rename GFErrTypeToException: Doesn't actually translate to an exception, but rather throws the exception. Doesn't always throw an exception, sometimes just does nothing
- Use Cluster object in put + Cq + AuthInitialize test. Adapt Cluster, Server, Locator classes to take security parameters
- Move SimpleAuthInitialize class into separate file. Class is used in two different test suites now, maybe more going forward
- Remove unused test functions that were breaking Linux builds
Co-authored-by: Jacob Barrett <jb...@pivotal.io>
Co-authored-by: Barry Oglesby <bo...@pivotal.io>
Co-authored-by: Ivan Godwin <ig...@pivotal.io>
Co-authored-by: Steve Sienkowski <ss...@pivotal.io>
---
cppcache/integration/framework/Cluster.cpp | 20 +-
cppcache/integration/framework/Cluster.h | 52 ++++-
cppcache/integration/framework/Gfsh.h | 71 ++++++-
cppcache/integration/framework/GfshExecute.cpp | 35 +++-
cppcache/integration/framework/GfshExecute.h | 28 +--
cppcache/integration/test/AuthInitializeTest.cpp | 214 +++++++++++++++++++++
cppcache/integration/test/CMakeLists.txt | 25 ++-
.../integration/test/CqPlusAuthInitializeTest.cpp | 206 ++++++++++++++++++++
cppcache/integration/test/CqTest.cpp | 132 +++++++++++++
cppcache/integration/test/SimpleAuthInitialize.cpp | 63 ++++++
.../test/SimpleAuthInitialize.hpp} | 41 ++--
cppcache/integration/test/SimpleCqListener.cpp | 60 ++++++
.../test/SimpleCqListener.hpp} | 53 +++--
cppcache/src/AdminRegion.cpp | 2 +-
cppcache/src/CacheTransactionManagerImpl.cpp | 2 +-
cppcache/src/CqQueryImpl.cpp | 13 +-
cppcache/src/CqService.cpp | 4 +-
cppcache/src/ErrType.hpp | 2 +-
cppcache/src/ExceptionTypes.cpp | 2 +-
cppcache/src/ExecutionImpl.cpp | 10 +-
cppcache/src/LocalRegion.cpp | 42 ++--
cppcache/src/RemoteQuery.cpp | 2 +-
cppcache/src/TcrConnection.cpp | 11 +-
cppcache/src/TcrConnection.hpp | 2 +-
cppcache/src/TcrEndpoint.cpp | 56 ++----
cppcache/src/TcrEndpoint.hpp | 3 +-
cppcache/src/TcrMessage.cpp | 33 ++--
cppcache/src/TcrMessage.hpp | 9 +-
cppcache/src/ThinClientDistributionManager.cpp | 8 +-
cppcache/src/ThinClientPoolDM.cpp | 88 +++++----
cppcache/src/ThinClientPoolHADM.cpp | 2 +-
cppcache/src/ThinClientRedundancyManager.cpp | 9 +-
cppcache/src/ThinClientRegion.cpp | 68 +++----
cppcache/src/Utils.cpp | 6 +
cppcache/src/Utils.hpp | 7 +
cppcache/src/util/exception.hpp | 2 +-
36 files changed, 1088 insertions(+), 295 deletions(-)
diff --git a/cppcache/integration/framework/Cluster.cpp b/cppcache/integration/framework/Cluster.cpp
index 21c1bbd..b6f4765 100644
--- a/cppcache/integration/framework/Cluster.cpp
+++ b/cppcache/integration/framework/Cluster.cpp
@@ -43,18 +43,17 @@ void Locator::start() {
.withMaxHeap("256m")
.withJmxManagerPort(jmxManagerPort_)
.withHttpServicePort(0)
- .execute();
+ .withClasspath(cluster_.getClasspath())
+ .withSecurityManager(cluster_.getSecurityManager())
+ .execute(cluster_.getUser(), cluster_.getPassword());
- // std::cout << "locator: " << locatorAddress_.port << ": started"
- // << std::endl;
started_ = true;
}
void Locator::stop() {
cluster_.getGfsh().stop().locator().withDir(name_).execute();
- // std::cout << "locator: " << locatorAddress_.port << ": stopped"
- // << std::endl;
+// std::cout << "locator: " << locatorAddress_.port << ": stopped" << std::endl << std::flush;
started_ = false;
}
@@ -72,18 +71,21 @@ void Server::start() {
.withMaxHeap("1g")
.withLocators(locators_.front().getAddress().address + "[" +
std::to_string(locators_.front().getAddress().port) + "]")
+ .withClasspath(cluster_.getClasspath())
+ .withSecurityManager(cluster_.getSecurityManager())
+ .withUser(cluster_.getUser())
+ .withPassword(cluster_.getPassword())
.execute();
- // std::cout << "server: " << serverAddress_.port << ": started" <<
- // std::endl;
+// std::cout << "server: " << serverAddress_.port << ": started" << std::endl << std::flush;
+
started_ = true;
}
void Server::stop() {
cluster_.getGfsh().stop().server().withDir(name_).execute();
- // std::cout << "server: " << serverAddress_.port << ": stopped" <<
- // std::endl;
+// std::cout << "server: " << serverAddress_.port << ": stopped" << std::endl << std::flush;
started_ = false;
}
diff --git a/cppcache/integration/framework/Cluster.h b/cppcache/integration/framework/Cluster.h
index 99c2ccf..e39e5cd 100644
--- a/cppcache/integration/framework/Cluster.h
+++ b/cppcache/integration/framework/Cluster.h
@@ -163,6 +163,10 @@ class Server {
using LocatorCount = NamedType<size_t, struct LocatorCountParameter>;
using ServerCount = NamedType<size_t, struct ServerCountParameter>;
using Name = NamedType<std::string, struct NameParameter>;
+using Classpath = NamedType<std::string, struct ClasspathParameter>;
+using SecurityManager = NamedType<std::string, struct SecurityManagerParameter>;
+using User = NamedType<std::string, struct UserParameter>;
+using Password = NamedType<std::string, struct PasswordParameter>;
class Cluster {
public:
@@ -177,14 +181,27 @@ class Cluster {
initialLocators, initialServers){};
Cluster(Name name, LocatorCount initialLocators, ServerCount initialServers)
- : name_(name.get()),
- initialLocators_(initialLocators.get()),
- initialServers_(initialServers.get()) {
+ : Cluster(Name(name.get()),
+ Classpath(""),
+ SecurityManager(""),
+ User(""),
+ Password(""),
+ initialLocators, initialServers){};
+
+ Cluster(Name name,
+ Classpath classpath,
+ SecurityManager securityManager,
+ User user,
+ Password password,
+ LocatorCount initialLocators,
+ ServerCount initialServers) :
+ name_(name.get()), classpath_(classpath.get()), securityManager_(securityManager.get()), user_(user.get()), password_(password.get()), initialLocators_(initialLocators.get()), initialServers_(initialServers.get()) {
+
jmxManagerPort_ = Framework::getAvailablePort();
removeServerDirectory();
start();
- }
+ };
~Cluster() noexcept {
try {
@@ -218,6 +235,11 @@ class Cluster {
apache::geode::client::Cache createCache(
const std::unordered_map<std::string, std::string> &properties) {
+ return createCache(properties, false);
+ }
+
+ apache::geode::client::Cache createCache(
+ const std::unordered_map<std::string, std::string> &properties, bool subscriptionEnabled) {
using apache::geode::client::CacheFactory;
CacheFactory cacheFactory;
@@ -230,7 +252,7 @@ class Cluster {
.set("statistic-sampling-enabled", "false")
.create();
- auto poolFactory = cache.getPoolManager().createFactory();
+ auto poolFactory = cache.getPoolManager().createFactory().setSubscriptionEnabled(subscriptionEnabled);
applyLocators(poolFactory);
poolFactory.create("default");
@@ -250,8 +272,28 @@ class Cluster {
return servers_;
}
+ std::string& getClasspath() {
+ return classpath_;
+ }
+
+ std::string& getSecurityManager() {
+ return securityManager_;
+ }
+
+ std::string& getUser() {
+ return user_;
+ }
+
+ std::string& getPassword() {
+ return password_;
+ }
+
private:
std::string name_;
+ std::string classpath_;
+ std::string securityManager_;
+ std::string user_;
+ std::string password_;
size_t initialLocators_;
std::vector<Locator> locators_;
diff --git a/cppcache/integration/framework/Gfsh.h b/cppcache/integration/framework/Gfsh.h
index c03831f..182434e 100644
--- a/cppcache/integration/framework/Gfsh.h
+++ b/cppcache/integration/framework/Gfsh.h
@@ -58,7 +58,8 @@ class Gfsh {
template <class Result>
class Command {
public:
- virtual Result execute() { Result{gfsh_}.parse(gfsh_.execute(command_)); }
+ virtual Result execute(const std::string &user, const std::string &password) { Result{gfsh_}.parse(gfsh_.execute(command_, user, password)); }
+ virtual Result execute() { Result{gfsh_}.parse(gfsh_.execute(command_, "", "")); }
protected:
Command(Gfsh &gfsh, std::string command)
@@ -121,6 +122,25 @@ class Gfsh {
command_ += " --max-heap=" + maxHeap;
return *this;
};
+
+ Locator &withClasspath(const std::string classpath) {
+ if (!classpath.empty()) {
+ command_ += " --classpath=" + classpath;
+ }
+ return *this;
+ };
+
+ Locator &withSecurityManager(const std::string securityManager) {
+ if (!securityManager.empty()) {
+ command_ += " --J=-Dgemfire.security-manager=" + securityManager;
+ }
+ return *this;
+ };
+
+ Locator &withConnect(const std::string connect) {
+ command_ += " --connect=" + connect;
+ return *this;
+ };
};
class Server : public Command<void> {
@@ -161,6 +181,35 @@ class Gfsh {
command_ += " --max-heap=" + maxHeap;
return *this;
};
+
+ Server &withClasspath(const std::string classpath) {
+ if (!classpath.empty()) {
+ command_ += " --classpath=" + classpath;
+ }
+ return *this;
+ };
+
+ Server &withSecurityManager(const std::string securityManager) {
+ if (!securityManager.empty()) {
+ command_ += " --J=-Dgemfire.security-manager=" + securityManager;
+ }
+ return *this;
+ };
+
+ Server &withUser(const std::string user) {
+ if (!user.empty()) {
+ command_ += " --user=" + user;
+ }
+
+ return *this;
+ };
+
+ Server &withPassword(const std::string password) {
+ if (!password.empty()) {
+ command_ += " --password=" + password;
+ }
+ return *this;
+ };
};
private:
@@ -242,6 +291,16 @@ class Gfsh {
command_ += " --jmx-manager=" + jmxManager;
return *this;
};
+
+ Connect &withUser(const std::string &user) {
+ command_ += " --user=" + user;
+ return *this;
+ };
+
+ Connect &withPassword(const std::string &password) {
+ command_ += " --password=" + password;
+ return *this;
+ };
};
class Shutdown : public Command<void> {
@@ -267,12 +326,18 @@ class Gfsh {
};
protected:
- virtual void execute(const std::string &command) = 0;
+ virtual void execute(const std::string &command, const std::string &user, const std::string &password) = 0;
};
template <>
+inline void Gfsh::Command<void>::execute(const std::string &user, const std::string &password) {
+ gfsh_.execute(command_, user, password);
+}
+
+template <>
inline void Gfsh::Command<void>::execute() {
- gfsh_.execute(command_);
+ gfsh_.execute(command_, "", "");
}
+
#endif // INTEGRATION_TEST_FRAMEWORK_GFSH_H
diff --git a/cppcache/integration/framework/GfshExecute.cpp b/cppcache/integration/framework/GfshExecute.cpp
index 4ea1b55..c5e752b 100644
--- a/cppcache/integration/framework/GfshExecute.cpp
+++ b/cppcache/integration/framework/GfshExecute.cpp
@@ -32,7 +32,7 @@ using boost::process::ipstream;
using boost::process::std_err;
using boost::process::std_out;
-void GfshExecute::execute(const std::string &command) {
+void GfshExecute::execute(const std::string &command, const std::string &user, const std::string &password) {
BOOST_LOG_TRIVIAL(info) << "Gfsh::execute: " << command;
std::vector<std::string> commands;
@@ -70,7 +70,7 @@ void GfshExecute::execute(const std::string &command) {
if (exit_code) {
throw new GfshExecuteException("gfsh error", exit_code);
}
- extractConnectionCommand(command);
+ extractConnectionCommand(command, user, password);
}
child GfshExecute::executeChild(std::vector<std::string> &commands,
@@ -83,3 +83,34 @@ child GfshExecute::executeChild(std::vector<std::string> &commands,
return child(getFrameworkString(FrameworkVariable::GfShExecutable), args = commands, env, std_out > outStream,
std_err > errStream);
}
+
+void GfshExecute::extractConnectionCommand(const std::string &command, const std::string &user, const std::string &password) {
+ if (starts_with(command, std::string("connect"))) {
+ connection_ = command;
+ } else if (starts_with(command, std::string("start locator"))) {
+ auto jmxManagerHost = std::string("localhost");
+ auto jmxManagerPort = std::string("1099");
+
+ std::regex jmxManagerHostRegex("bind-address=([^\\s]+)");
+ std::smatch jmxManagerHostMatch;
+ if (std::regex_search(command, jmxManagerHostMatch,
+ jmxManagerHostRegex)) {
+ jmxManagerHost = jmxManagerHostMatch[1];
+ }
+
+ std::regex jmxManagerPortRegex("jmx-manager-port=(\\d+)");
+ std::smatch jmxManagerPortMatch;
+ if (std::regex_search(command, jmxManagerPortMatch,
+ jmxManagerPortRegex)) {
+ jmxManagerPort = jmxManagerPortMatch[1];
+ }
+
+ connection_ = "connect --jmx-manager=" + jmxManagerHost + "[" +
+ jmxManagerPort + "]";
+
+ if (!(user.empty() || password.empty())) {
+ connection_ += " --user=" + user + " --password=" + password;
+ }
+ }
+}
+
diff --git a/cppcache/integration/framework/GfshExecute.h b/cppcache/integration/framework/GfshExecute.h
index 11922fe..b174828 100644
--- a/cppcache/integration/framework/GfshExecute.h
+++ b/cppcache/integration/framework/GfshExecute.h
@@ -70,38 +70,14 @@ class GfshExecute : public Gfsh {
};
protected:
- void execute(const std::string &command) override;
+ void execute(const std::string &command, const std::string &user, const std::string &password) override;
boost::process::child executeChild(std::vector<std::string> &commands,
boost::process::environment &env,
boost::process::ipstream &outStream,
boost::process::ipstream &errStream);
- void extractConnectionCommand(const std::string &command) {
- if (starts_with(command, std::string("connect"))) {
- connection_ = command;
- } else if (starts_with(command, std::string("start locator"))) {
- auto jmxManagerHost = std::string("localhost");
- auto jmxManagerPort = std::string("1099");
-
- std::regex jmxManagerHostRegex("bind-address=([^\\s]+)");
- std::smatch jmxManagerHostMatch;
- if (std::regex_search(command, jmxManagerHostMatch,
- jmxManagerHostRegex)) {
- jmxManagerHost = jmxManagerHostMatch[1];
- }
-
- std::regex jmxManagerPortRegex("jmx-manager-port=(\\d+)");
- std::smatch jmxManagerPortMatch;
- if (std::regex_search(command, jmxManagerPortMatch,
- jmxManagerPortRegex)) {
- jmxManagerPort = jmxManagerPortMatch[1];
- }
-
- connection_ = "connect --jmx-manager=" + jmxManagerHost + "[" +
- jmxManagerPort + "]";
- }
- }
+ void extractConnectionCommand(const std::string &command, const std::string &user = "", const std::string &password = "");
private:
std::string connection_;
diff --git a/cppcache/integration/test/AuthInitializeTest.cpp b/cppcache/integration/test/AuthInitializeTest.cpp
new file mode 100644
index 0000000..ff6dc36
--- /dev/null
+++ b/cppcache/integration/test/AuthInitializeTest.cpp
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <chrono>
+#include <future>
+#include <iostream>
+#include <random>
+#include <thread>
+
+#include <gtest/gtest.h>
+
+#include <geode/AuthInitialize.hpp>
+#include <geode/Cache.hpp>
+#include <geode/CqAttributes.hpp>
+#include <geode/CqAttributesFactory.hpp>
+#include <geode/CqEvent.hpp>
+#include <geode/CqListener.hpp>
+#include <geode/PoolManager.hpp>
+#include <geode/QueryService.hpp>
+#include <geode/RegionFactory.hpp>
+#include <geode/RegionShortcut.hpp>
+
+#include "CacheRegionHelper.hpp"
+#include "SimpleAuthInitialize.hpp"
+#include "SimpleCqListener.hpp"
+#include "framework/Cluster.h"
+#include "framework/Framework.h"
+#include "framework/Gfsh.h"
+
+using apache::geode::client::AuthenticationFailedException;
+using apache::geode::client::AuthInitialize;
+using apache::geode::client::Cache;
+using apache::geode::client::Cacheable;
+using apache::geode::client::CacheableKey;
+using apache::geode::client::CacheableString;
+using apache::geode::client::CacheFactory;
+using apache::geode::client::CqAttributes;
+using apache::geode::client::CqAttributesFactory;
+using apache::geode::client::CqEvent;
+using apache::geode::client::CqListener;
+using apache::geode::client::CqOperation;
+using apache::geode::client::Exception;
+using apache::geode::client::HashMapOfCacheable;
+using apache::geode::client::NotConnectedException;
+using apache::geode::client::Pool;
+using apache::geode::client::Properties;
+using apache::geode::client::QueryService;
+using apache::geode::client::Region;
+using apache::geode::client::RegionShortcut;
+
+using std::chrono::minutes;
+
+const int32_t CQ_PLUS_AUTH_TEST_REGION_ENTRY_COUNT = 100000;
+
+Cache createCache(std::shared_ptr<SimpleAuthInitialize> auth) {
+ auto cache = CacheFactory()
+ .set("log-level", "debug")
+ .set("log-file", "geode_native.log")
+ .set("statistic-sampling-enabled", "false")
+ .setAuthInitialize(auth)
+ .create();
+
+ return cache;
+}
+
+std::shared_ptr<Pool> createPool(Cluster& cluster, Cache& cache,
+ bool subscriptionEnabled) {
+ auto poolFactory = cache.getPoolManager().createFactory();
+
+ cluster.applyLocators(poolFactory);
+ poolFactory.setPRSingleHopEnabled(true).setSubscriptionEnabled(
+ subscriptionEnabled);
+
+ return poolFactory.create("default");
+}
+
+std::shared_ptr<Region> setupRegion(Cache& cache,
+ const std::shared_ptr<Pool>& pool) {
+ auto region = cache.createRegionFactory(RegionShortcut::PROXY)
+ .setPoolName(pool->getName())
+ .create("region");
+
+ return region;
+}
+
+TEST(AuthInitializeTest, putGetWithBasicAuth) {
+ Cluster cluster(
+ Name(std::string(::testing::UnitTest::GetInstance()
+ ->current_test_info()
+ ->test_case_name()) +
+ "/" +
+ ::testing::UnitTest::GetInstance()->current_test_info()->name()),
+ Classpath{getFrameworkString(FrameworkVariable::JavaObjectJarPath)},
+ SecurityManager{"javaobject.SimpleSecurityManager"}, User{"root"},
+ Password{"root-password"}, LocatorCount{1}, ServerCount{1});
+ cluster.getGfsh()
+ .create()
+ .region()
+ .withName("region")
+ .withType("PARTITION")
+ .execute();
+
+ auto authInitialize = std::make_shared<SimpleAuthInitialize>();
+ auto cache = createCache(authInitialize);
+ auto pool = createPool(cluster, cache, false);
+ auto region = setupRegion(cache, pool);
+
+ region->put("foo", "bar");
+ auto value = region->get("foo");
+ auto stringValue = std::dynamic_pointer_cast<CacheableString>(value)->value();
+ ASSERT_EQ(stringValue, std::string("bar"));
+ ASSERT_GT(authInitialize->getGetCredentialsCallCount(), 0);
+}
+
+TEST(AuthInitializeTest, putWithBadUsername) {
+ Cluster cluster(
+ Name(std::string(::testing::UnitTest::GetInstance()
+ ->current_test_info()
+ ->test_case_name()) +
+ "/" +
+ ::testing::UnitTest::GetInstance()->current_test_info()->name()),
+ Classpath{getFrameworkString(FrameworkVariable::JavaObjectJarPath)},
+ SecurityManager{"javaobject.SimpleSecurityManager"}, User{"root"},
+ Password{"root-password"}, LocatorCount{1}, ServerCount{1});
+ cluster.getGfsh()
+ .create()
+ .region()
+ .withName("region")
+ .withType("PARTITION")
+ .execute();
+ auto authInitialize = std::make_shared<SimpleAuthInitialize>(
+ "unauthorized-user", "root-password");
+ auto cache = createCache(authInitialize);
+ auto pool = createPool(cluster, cache, false);
+ auto region = setupRegion(cache, pool);
+
+ try {
+ region->put("foo", "bar");
+ } catch (const NotConnectedException&) {
+ } catch (const Exception& ex) {
+ std::cerr << "Caught unexpected exception: " << ex.what() << std::endl;
+ FAIL();
+ }
+
+ ASSERT_GT(authInitialize->getGetCredentialsCallCount(), 0);
+}
+
+TEST(AuthInitializeTest, putWithBadPassword) {
+ Cluster cluster(
+ Name(std::string(::testing::UnitTest::GetInstance()
+ ->current_test_info()
+ ->test_case_name()) +
+ "/" +
+ ::testing::UnitTest::GetInstance()->current_test_info()->name()),
+ Classpath{getFrameworkString(FrameworkVariable::JavaObjectJarPath)},
+ SecurityManager{"javaobject.SimpleSecurityManager"}, User{"root"},
+ Password{"root-password"}, LocatorCount{1}, ServerCount{1});
+
+ auto authInitialize =
+ std::make_shared<SimpleAuthInitialize>("root", "bad-password");
+ auto cache = createCache(authInitialize);
+ auto pool = createPool(cluster, cache, false);
+ auto region = setupRegion(cache, pool);
+
+ try {
+ region->put("foo", "bar");
+ } catch (const NotConnectedException&) {
+ } catch (const Exception& ex) {
+ std::cerr << "Caught unexpected exception: " << ex.what() << std::endl;
+ FAIL();
+ }
+
+ ASSERT_GT(authInitialize->getGetCredentialsCallCount(), 0);
+}
+
+TEST(AuthInitializeTest, badCredentialsWithSubscriptionEnabled) {
+ Cluster cluster(
+ Name(std::string(::testing::UnitTest::GetInstance()
+ ->current_test_info()
+ ->test_case_name()) +
+ "/" +
+ ::testing::UnitTest::GetInstance()->current_test_info()->name()),
+ Classpath{getFrameworkString(FrameworkVariable::JavaObjectJarPath)},
+ SecurityManager{"javaobject.SimpleSecurityManager"}, User{"root"},
+ Password{"root-password"}, LocatorCount{1}, ServerCount{1});
+
+ auto authInitialize =
+ std::make_shared<SimpleAuthInitialize>("root", "bad-password");
+ auto cache = createCache(authInitialize);
+
+ try {
+ createPool(cluster, cache, true);
+ } catch (const AuthenticationFailedException&) {
+ } catch (const Exception& ex) {
+ std::cerr << "Caught unexpected exception: " << ex.what() << std::endl;
+ FAIL();
+ }
+
+ ASSERT_GT(authInitialize->getGetCredentialsCallCount(), 0);
+}
diff --git a/cppcache/integration/test/CMakeLists.txt b/cppcache/integration/test/CMakeLists.txt
index bca47a1..e9f1f71 100644
--- a/cppcache/integration/test/CMakeLists.txt
+++ b/cppcache/integration/test/CMakeLists.txt
@@ -13,9 +13,28 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-file(GLOB_RECURSE SOURCES "${CMAKE_CURRENT_SOURCE_DIR}/*.cpp")
-
-add_executable(cpp-integration-test ${SOURCES})
+add_executable(cpp-integration-test
+ AuthInitializeTest.cpp
+ CommitConflictExceptionTest.cpp
+ CqPlusAuthInitializeTest.cpp
+ CqTest.cpp
+ DataSerializableTest.cpp
+ EnableChunkHandlerThreadTest.cpp
+ ExampleTest.cpp
+ ExpirationTest.cpp
+ FunctionExecutionTest.cpp
+ PdxInstanceTest.cpp
+ RegionGetAllTest.cpp
+ RegionPutAllTest.cpp
+ RegionPutGetAllTest.cpp
+ RegisterKeysTest.cpp
+ SimpleAuthInitialize.cpp
+ SimpleAuthInitialize.hpp
+ SimpleCqListener.cpp
+ SimpleCqListener.hpp
+ StructTest.cpp
+ TransactionCleaningTest.cpp
+)
target_compile_definitions(cpp-integration-test
PUBLIC
diff --git a/cppcache/integration/test/CqPlusAuthInitializeTest.cpp b/cppcache/integration/test/CqPlusAuthInitializeTest.cpp
new file mode 100644
index 0000000..e591b3d
--- /dev/null
+++ b/cppcache/integration/test/CqPlusAuthInitializeTest.cpp
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <chrono>
+#include <future>
+#include <iostream>
+#include <random>
+#include <thread>
+
+#include <gtest/gtest.h>
+
+#include <geode/AuthInitialize.hpp>
+#include <geode/Cache.hpp>
+#include <geode/CqAttributes.hpp>
+#include <geode/CqAttributesFactory.hpp>
+#include <geode/CqEvent.hpp>
+#include <geode/CqListener.hpp>
+#include <geode/PoolManager.hpp>
+#include <geode/QueryService.hpp>
+#include <geode/RegionFactory.hpp>
+#include <geode/RegionShortcut.hpp>
+
+#include "CacheRegionHelper.hpp"
+#include "SimpleAuthInitialize.hpp"
+#include "SimpleCqListener.hpp"
+#include "framework/Cluster.h"
+#include "framework/Framework.h"
+#include "framework/Gfsh.h"
+
+namespace {
+
+using apache::geode::client::AuthInitialize;
+using apache::geode::client::Cache;
+using apache::geode::client::Cacheable;
+using apache::geode::client::CacheableKey;
+using apache::geode::client::CacheableString;
+using apache::geode::client::CacheFactory;
+using apache::geode::client::CqAttributes;
+using apache::geode::client::CqAttributesFactory;
+using apache::geode::client::CqEvent;
+using apache::geode::client::CqListener;
+using apache::geode::client::CqOperation;
+using apache::geode::client::Exception;
+using apache::geode::client::HashMapOfCacheable;
+using apache::geode::client::Pool;
+using apache::geode::client::Properties;
+using apache::geode::client::QueryService;
+using apache::geode::client::Region;
+using apache::geode::client::RegionShortcut;
+
+using std::chrono::minutes;
+
+const int32_t CQ_PLUS_AUTH_TEST_REGION_ENTRY_COUNT = 50000;
+
+Cache createCache(std::shared_ptr<SimpleAuthInitialize> auth) {
+ auto cache = CacheFactory()
+ .set("log-level", "debug")
+ .set("log-file", "geode_native.log")
+ .set("statistic-sampling-enabled", "false")
+ .setAuthInitialize(auth)
+ .create();
+
+ return cache;
+}
+
+std::shared_ptr<Pool> createPool(Cluster& cluster, Cache& cache,
+ bool subscriptionEnabled) {
+ auto poolFactory = cache.getPoolManager().createFactory().setIdleTimeout(
+ std::chrono::milliseconds(0));
+
+ cluster.applyLocators(poolFactory);
+ poolFactory.setPRSingleHopEnabled(true).setSubscriptionEnabled(
+ subscriptionEnabled);
+
+ return poolFactory.create("default");
+}
+
+std::shared_ptr<Region> setupRegion(Cache& cache,
+ const std::shared_ptr<Pool>& pool) {
+ auto region = cache.createRegionFactory(RegionShortcut::PROXY)
+ .setPoolName(pool->getName())
+ .create("region");
+
+ return region;
+}
+
+TEST(CqPlusAuthInitializeTest, putInALoopWhileSubscribedAndAuthenticated) {
+ Cluster cluster(
+ Name(std::string(::testing::UnitTest::GetInstance()
+ ->current_test_info()
+ ->test_case_name()) +
+ "/" +
+ ::testing::UnitTest::GetInstance()->current_test_info()->name()),
+ Classpath{getFrameworkString(FrameworkVariable::JavaObjectJarPath)},
+ SecurityManager{"javaobject.SimpleSecurityManager"}, User{"root"},
+ Password{"root-password"}, LocatorCount{1}, ServerCount{1});
+ cluster.getGfsh()
+ .create()
+ .region()
+ .withName("region")
+ .withType("PARTITION")
+ .execute();
+
+ auto authInitialize = std::make_shared<SimpleAuthInitialize>();
+ auto cache = createCache(authInitialize);
+ auto pool = createPool(cluster, cache, true);
+ auto region = setupRegion(cache, pool);
+
+ try {
+ region->put("foo", "bar");
+ } catch (const Exception& ex) {
+ std::cerr << "Caught exception: " << ex.what() << std::endl;
+ std::cerr << "In initial region put" << std::endl;
+ std::cerr << "Callstack" << ex.getStackTrace() << std::endl;
+ FAIL();
+ }
+
+ auto queryService = cache.getQueryService();
+
+ CqAttributesFactory attributesFactory;
+ auto testListener = std::make_shared<SimpleCqListener>();
+ attributesFactory.addCqListener(testListener);
+ auto cqAttributes = attributesFactory.create();
+
+ auto query =
+ queryService->newCq("SimpleCQ", "SELECT * FROM /region", cqAttributes);
+
+ try {
+ query->execute();
+ } catch (const Exception& ex) {
+ std::cerr << "Caught exception: " << ex.what() << std::endl;
+ std::cerr << "While executing Cq" << std::endl;
+ std::cerr << "Callstack" << ex.getStackTrace() << std::endl;
+ FAIL();
+ }
+
+ int32_t i = 0;
+
+ try {
+ for (i = 0; i < CQ_PLUS_AUTH_TEST_REGION_ENTRY_COUNT; i++) {
+ region->put("key" + std::to_string(i), "value" + std::to_string(i));
+ std::this_thread::yield();
+ }
+ } catch (const Exception& ex) {
+ std::cerr << "Caught exception: " << ex.what() << std::endl;
+ std::cerr << "In value create loop, i=" << i << std::endl;
+ std::cerr << "Callstack" << ex.getStackTrace() << std::endl;
+ FAIL();
+ }
+
+ try {
+ for (i = 0; i < CQ_PLUS_AUTH_TEST_REGION_ENTRY_COUNT; i++) {
+ region->put("key" + std::to_string(i), "value" + std::to_string(i + 1));
+ std::this_thread::yield();
+ }
+ } catch (const Exception& ex) {
+ std::cerr << "Caught exception: " << ex.what() << std::endl;
+ std::cerr << "In value update loop, i=" << i << std::endl;
+ std::cerr << "Callstack" << ex.getStackTrace() << std::endl;
+ FAIL();
+ }
+
+ try {
+ for (i = 0; i < CQ_PLUS_AUTH_TEST_REGION_ENTRY_COUNT; i++) {
+ region->destroy("key" + std::to_string(i));
+ std::this_thread::yield();
+ }
+ } catch (const Exception& ex) {
+ std::cerr << "Caught exception: " << ex.what() << std::endl;
+ std::cerr << "In value destroy loop, i=" << i << std::endl;
+ std::cerr << "Callstack" << ex.getStackTrace() << std::endl;
+ FAIL();
+ }
+
+ for (i = 0; i < 1000; i++) {
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
+ if (testListener->getDestructionCount() ==
+ CQ_PLUS_AUTH_TEST_REGION_ENTRY_COUNT) {
+ break;
+ }
+ }
+
+ ASSERT_EQ(testListener->getCreationCount(),
+ CQ_PLUS_AUTH_TEST_REGION_ENTRY_COUNT);
+ ASSERT_EQ(testListener->getUpdateCount(),
+ CQ_PLUS_AUTH_TEST_REGION_ENTRY_COUNT);
+ ASSERT_EQ(testListener->getDestructionCount(),
+ CQ_PLUS_AUTH_TEST_REGION_ENTRY_COUNT);
+ ASSERT_GT(authInitialize->getGetCredentialsCallCount(), 0);
+}
+
+} // namespace
diff --git a/cppcache/integration/test/CqTest.cpp b/cppcache/integration/test/CqTest.cpp
new file mode 100644
index 0000000..1d83b2e
--- /dev/null
+++ b/cppcache/integration/test/CqTest.cpp
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <chrono>
+#include <future>
+#include <iostream>
+#include <random>
+#include <thread>
+
+#include <gtest/gtest.h>
+
+#include <geode/CqAttributesFactory.hpp>
+#include <geode/QueryService.hpp>
+#include <geode/RegionFactory.hpp>
+#include <geode/RegionShortcut.hpp>
+
+#include "CacheRegionHelper.hpp"
+#include "SimpleCqListener.hpp"
+#include "framework/Cluster.h"
+#include "framework/Framework.h"
+#include "framework/Gfsh.h"
+
+namespace {
+
+using apache::geode::client::Cache;
+using apache::geode::client::Cacheable;
+using apache::geode::client::CacheableKey;
+using apache::geode::client::CqAttributesFactory;
+using apache::geode::client::HashMapOfCacheable;
+using apache::geode::client::Pool;
+using apache::geode::client::Region;
+using apache::geode::client::RegionShortcut;
+
+using std::chrono::minutes;
+
+//
+// TODO: Use a random number of entries. Need to investigate how to log this
+// from/import it to a test first.
+//
+const int32_t CQ_TEST_REGION_ENTRY_COUNT = 100;
+
+Cache createCache() {
+ using apache::geode::client::CacheFactory;
+
+ auto cache = CacheFactory()
+ .set("log-level", "none")
+ .set("statistic-sampling-enabled", "false")
+ .create();
+
+ return cache;
+}
+
+std::shared_ptr<Pool> createPool(Cluster& cluster, Cache& cache) {
+ auto poolFactory = cache.getPoolManager().createFactory();
+ cluster.applyLocators(poolFactory);
+ poolFactory.setPRSingleHopEnabled(true);
+ poolFactory.setSubscriptionEnabled(true);
+ return poolFactory.create("default");
+}
+
+std::shared_ptr<Region> setupRegion(Cache& cache,
+ const std::shared_ptr<Pool>& pool) {
+ auto region = cache.createRegionFactory(RegionShortcut::PROXY)
+ .setPoolName(pool->getName())
+ .create("region");
+
+ return region;
+}
+
+TEST(CqTest, testCqCreateUpdateDestroy) {
+ Cluster cluster{LocatorCount{1}, ServerCount{2}};
+ cluster.getGfsh()
+ .create()
+ .region()
+ .withName("region")
+ .withType("PARTITION")
+ .execute();
+
+ auto cache = createCache();
+ auto pool = createPool(cluster, cache);
+ auto region = setupRegion(cache, pool);
+ auto queryService = cache.getQueryService();
+
+ CqAttributesFactory attributesFactory;
+ auto testListener = std::make_shared<SimpleCqListener>();
+ attributesFactory.addCqListener(testListener);
+ auto cqAttributes = attributesFactory.create();
+
+ auto query =
+ queryService->newCq("SimpleCQ", "SELECT * FROM /region", cqAttributes);
+
+ query->execute();
+
+ for (int i = 0; i < CQ_TEST_REGION_ENTRY_COUNT; i++) {
+ region->put("key" + std::to_string(i), "value" + std::to_string(i));
+ }
+
+ for (int i = 0; i < CQ_TEST_REGION_ENTRY_COUNT; i++) {
+ region->put("key" + std::to_string(i), "value" + std::to_string(i + 1));
+ }
+
+ for (int i = 0; i < CQ_TEST_REGION_ENTRY_COUNT; i++) {
+ region->destroy("key" + std::to_string(i));
+ }
+
+ for (int i = 0; i < 100; i++) {
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
+ if (testListener->getCreationCount() == CQ_TEST_REGION_ENTRY_COUNT) {
+ break;
+ }
+ }
+
+ ASSERT_EQ(testListener->getCreationCount(), CQ_TEST_REGION_ENTRY_COUNT);
+ ASSERT_EQ(testListener->getUpdateCount(), CQ_TEST_REGION_ENTRY_COUNT);
+ ASSERT_EQ(testListener->getDestructionCount(), CQ_TEST_REGION_ENTRY_COUNT);
+}
+
+} // namespace
diff --git a/cppcache/integration/test/SimpleAuthInitialize.cpp b/cppcache/integration/test/SimpleAuthInitialize.cpp
new file mode 100644
index 0000000..945279a
--- /dev/null
+++ b/cppcache/integration/test/SimpleAuthInitialize.cpp
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "SimpleAuthInitialize.hpp"
+
+#include <chrono>
+#include <future>
+#include <iostream>
+#include <random>
+#include <thread>
+
+#include <gtest/gtest.h>
+
+using apache::geode::client::AuthInitialize;
+using apache::geode::client::Properties;
+
+std::shared_ptr<Properties> SimpleAuthInitialize::getCredentials(
+ const std::shared_ptr<Properties>& securityprops,
+ const std::string& /*server*/) {
+ std::cout << "SimpleAuthInitialize::GetCredentials called\n";
+
+ securityprops->insert("security-username", username_);
+ securityprops->insert("security-password", password_);
+
+ countOfGetCredentialsCalls_++;
+ return securityprops;
+}
+
+void SimpleAuthInitialize::close() {
+ std::cout << "SimpleAuthInitialize::close called\n";
+}
+
+SimpleAuthInitialize::SimpleAuthInitialize()
+ : AuthInitialize(),
+ username_("root"),
+ password_("root-password"),
+ countOfGetCredentialsCalls_(0) {
+ std::cout << "SimpleAuthInitialize::SimpleAuthInitialize called\n";
+}
+
+SimpleAuthInitialize::SimpleAuthInitialize(std::string username,
+ std::string password)
+ : username_(std::move(username)),
+ password_(std::move(password)),
+ countOfGetCredentialsCalls_(0) {}
+
+int32_t SimpleAuthInitialize::getGetCredentialsCallCount() {
+ return countOfGetCredentialsCalls_;
+}
diff --git a/cppcache/src/util/exception.hpp b/cppcache/integration/test/SimpleAuthInitialize.hpp
similarity index 52%
copy from cppcache/src/util/exception.hpp
copy to cppcache/integration/test/SimpleAuthInitialize.hpp
index fdebdb4..cd72177 100644
--- a/cppcache/src/util/exception.hpp
+++ b/cppcache/integration/test/SimpleAuthInitialize.hpp
@@ -17,31 +17,34 @@
#pragma once
-#ifndef GEODE_UTIL_EXCEPTION_H_
-#define GEODE_UTIL_EXCEPTION_H_
+#ifndef SIMPLEAUTHINITIALIZE_H_
+#define SIMPLEAUTHINITIALIZE_H_
#include <string>
-#include <geode/internal/geode_base.hpp>
+#include <geode/AuthInitialize.hpp>
+#include <geode/Properties.hpp>
-#include "../ErrType.hpp"
+class SimpleAuthInitialize : public apache::geode::client::AuthInitialize {
+ public:
+ std::shared_ptr<apache::geode::client::Properties> getCredentials(
+ const std::shared_ptr<apache::geode::client::Properties>& securityprops,
+ const std::string& /*server*/) override;
-namespace apache {
-namespace geode {
-namespace client {
+ void close() override;
-extern void APACHE_GEODE_EXPORT GfErrTypeThrowException(const char* str,
- GfErrType err);
+ SimpleAuthInitialize();
-#define GfErrTypeToException(str, err) \
- { \
- if (err != GF_NOERR) { \
- GfErrTypeThrowException(str, err); \
- } \
- }
+ SimpleAuthInitialize(std::string username, std::string password);
-} // namespace client
-} // namespace geode
-} // namespace apache
+ ~SimpleAuthInitialize() override = default;
-#endif // GEODE_UTIL_EXCEPTION_H_
+ int32_t getGetCredentialsCallCount();
+
+ private:
+ std::string username_;
+ std::string password_;
+ int32_t countOfGetCredentialsCalls_;
+};
+
+#endif // SIMPLEAUTHINITIALIZE_H_
diff --git a/cppcache/integration/test/SimpleCqListener.cpp b/cppcache/integration/test/SimpleCqListener.cpp
new file mode 100644
index 0000000..77e9778
--- /dev/null
+++ b/cppcache/integration/test/SimpleCqListener.cpp
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "SimpleCqListener.hpp"
+
+#include <iostream>
+
+#include <geode/CqListener.hpp>
+#include <geode/CqOperation.hpp>
+
+SimpleCqListener::SimpleCqListener()
+ : creationCount_(0), updateCount_(0), destructionCount_(0) {}
+
+void SimpleCqListener::onEvent(const apache::geode::client::CqEvent& cqEvent) {
+ switch (cqEvent.getQueryOperation()) {
+ case apache::geode::client::CqOperation::OP_TYPE_CREATE:
+ creationCount_++;
+ break;
+ case apache::geode::client::CqOperation::OP_TYPE_UPDATE:
+ updateCount_++;
+ break;
+ case apache::geode::client::CqOperation::OP_TYPE_DESTROY:
+ destructionCount_++;
+ break;
+ default:
+ break;
+ }
+}
+
+void SimpleCqListener::onError(const apache::geode::client::CqEvent& cqEvent) {
+ std::cout << __FUNCTION__ << " called"
+ << dynamic_cast<apache::geode::client::CacheableString*>(
+ cqEvent.getKey().get())
+ ->value()
+ << std::endl;
+}
+
+void SimpleCqListener::close() {
+ std::cout << __FUNCTION__ << " called" << std::endl;
+}
+
+int32_t SimpleCqListener::getCreationCount() { return creationCount_; }
+
+int32_t SimpleCqListener::getUpdateCount() { return updateCount_; }
+
+int32_t SimpleCqListener::getDestructionCount() { return destructionCount_; }
diff --git a/cppcache/src/util/exception.hpp b/cppcache/integration/test/SimpleCqListener.hpp
similarity index 54%
copy from cppcache/src/util/exception.hpp
copy to cppcache/integration/test/SimpleCqListener.hpp
index fdebdb4..e42d381 100644
--- a/cppcache/src/util/exception.hpp
+++ b/cppcache/integration/test/SimpleCqListener.hpp
@@ -17,31 +17,28 @@
#pragma once
-#ifndef GEODE_UTIL_EXCEPTION_H_
-#define GEODE_UTIL_EXCEPTION_H_
-
-#include <string>
-
-#include <geode/internal/geode_base.hpp>
-
-#include "../ErrType.hpp"
-
-namespace apache {
-namespace geode {
-namespace client {
-
-extern void APACHE_GEODE_EXPORT GfErrTypeThrowException(const char* str,
- GfErrType err);
-
-#define GfErrTypeToException(str, err) \
- { \
- if (err != GF_NOERR) { \
- GfErrTypeThrowException(str, err); \
- } \
- }
-
-} // namespace client
-} // namespace geode
-} // namespace apache
-
-#endif // GEODE_UTIL_EXCEPTION_H_
+#ifndef SIMPLE_CQ_LISTENER_H
+#define SIMPLE_CQ_LISTENER_H
+
+#include <geode/CacheableString.hpp>
+#include <geode/CqListener.hpp>
+#include <geode/CqOperation.hpp>
+
+class SimpleCqListener : public apache::geode::client::CqListener {
+ public:
+ SimpleCqListener();
+ void onEvent(const apache::geode::client::CqEvent& cqEvent) override;
+ void onError(const apache::geode::client::CqEvent& cqEvent) override;
+ void close() override;
+
+ int32_t getCreationCount();
+ int32_t getUpdateCount();
+ int32_t getDestructionCount();
+
+ private:
+ int32_t creationCount_;
+ int32_t updateCount_;
+ int32_t destructionCount_;
+};
+
+#endif // SIMPLE_CQ_LISTENER_H
diff --git a/cppcache/src/AdminRegion.cpp b/cppcache/src/AdminRegion.cpp
index e08c734..ccc6139 100644
--- a/cppcache/src/AdminRegion.cpp
+++ b/cppcache/src/AdminRegion.cpp
@@ -70,7 +70,7 @@ TcrConnectionManager* AdminRegion::getConnectionManager() {
void AdminRegion::put(const std::shared_ptr<CacheableKey>& keyPtr,
const std::shared_ptr<Cacheable>& valuePtr) {
GfErrType err = putNoThrow(keyPtr, valuePtr);
- GfErrTypeToException("AdminRegion::put", err);
+ throwExceptionIfError("AdminRegion::put", err);
}
GfErrType AdminRegion::putNoThrow(const std::shared_ptr<CacheableKey>& keyPtr,
diff --git a/cppcache/src/CacheTransactionManagerImpl.cpp b/cppcache/src/CacheTransactionManagerImpl.cpp
index 8462736..d4a5a83 100644
--- a/cppcache/src/CacheTransactionManagerImpl.cpp
+++ b/cppcache/src/CacheTransactionManagerImpl.cpp
@@ -125,7 +125,7 @@ void CacheTransactionManagerImpl::rollback() {
try {
GfErrType err = rollback(txState, true);
if (err != GF_NOERR) {
- GfErrTypeToException("Error while committing", err);
+ throwExceptionIfError("Error while committing", err);
}
} catch (const Exception& ex) {
// TODO: put a log message
diff --git a/cppcache/src/CqQueryImpl.cpp b/cppcache/src/CqQueryImpl.cpp
index a7eadf8..53aa8cd 100644
--- a/cppcache/src/CqQueryImpl.cpp
+++ b/cppcache/src/CqQueryImpl.cpp
@@ -273,7 +273,6 @@ GfErrType CqQueryImpl::execute(TcrEndpoint* endpoint) {
err = m_tccdm->sendRequestToEP(request, reply, endpoint);
if (err != GF_NOERR) {
- // GfErrTypeToException("CqQuery::execute(endpoint)", err);
return err;
}
@@ -330,7 +329,7 @@ bool CqQueryImpl::executeCq(TcrMessage::MsgType) {
GfErrType err = GF_NOERR;
err = m_tccdm->sendSyncRequest(msg, reply);
if (err != GF_NOERR) {
- GfErrTypeToException("CqQuery::executeCq:", err);
+ throwExceptionIfError("CqQuery::executeCq:", err);
}
if (reply.getMessageType() == TcrMessage::EXCEPTION ||
reply.getMessageType() == TcrMessage::CQDATAERROR_MSG_TYPE ||
@@ -342,7 +341,7 @@ bool CqQueryImpl::executeCq(TcrMessage::MsgType) {
std::string("CqQuery::executeCq: exception at the server side: ") +
reply.getException());
} else {
- GfErrTypeToException("CqQuery::executeCq", err);
+ throwExceptionIfError("CqQuery::executeCq", err);
}
}
std::lock_guard<decltype(m_mutex)> _guard(m_mutex);
@@ -387,7 +386,7 @@ std::shared_ptr<CqResults> CqQueryImpl::executeWithInitialResults(
err = m_tccdm->sendSyncRequest(msg, reply);
if (err != GF_NOERR) {
LOGDEBUG("CqQueryImpl::executeCqWithInitialResults errorred!!!!");
- GfErrTypeToException("CqQuery::executeCqWithInitialResults:", err);
+ throwExceptionIfError("CqQuery::executeCqWithInitialResults:", err);
}
if (reply.getMessageType() == TcrMessage::EXCEPTION ||
reply.getMessageType() == TcrMessage::CQDATAERROR_MSG_TYPE ||
@@ -399,7 +398,7 @@ std::shared_ptr<CqResults> CqQueryImpl::executeWithInitialResults(
std::string("CqQuery::executeWithInitialResults: exception ") +
"at the server side: " + reply.getException());
} else {
- GfErrTypeToException("CqQuery::executeWithInitialResults", err);
+ throwExceptionIfError("CqQuery::executeWithInitialResults", err);
}
}
m_cqState = CqState::RUNNING;
@@ -478,7 +477,7 @@ void CqQueryImpl::sendStopOrClose(TcrMessage::MsgType requestType) {
}
if (err != GF_NOERR) {
- GfErrTypeToException("CqQuery::stop/close:", err);
+ throwExceptionIfError("CqQuery::stop/close:", err);
}
if (reply.getMessageType() == TcrMessage::EXCEPTION ||
reply.getMessageType() == TcrMessage::CQDATAERROR_MSG_TYPE ||
@@ -490,7 +489,7 @@ void CqQueryImpl::sendStopOrClose(TcrMessage::MsgType requestType) {
std::string("CqQuery::stop/close: exception at the server side: ") +
reply.getException());
} else {
- GfErrTypeToException("CqQuery::stop/close", err);
+ throwExceptionIfError("CqQuery::stop/close", err);
}
}
}
diff --git a/cppcache/src/CqService.cpp b/cppcache/src/CqService.cpp
index e872554..167ead0 100644
--- a/cppcache/src/CqService.cpp
+++ b/cppcache/src/CqService.cpp
@@ -559,7 +559,7 @@ std::shared_ptr<CacheableArrayList> CqService::getAllDurableCqsFromServer() {
err = m_tccdm->sendSyncRequest(msg, reply);
if (err != GF_NOERR) {
LOGDEBUG("CqService::getAllDurableCqsFromServer!!!!");
- GfErrTypeToException("CqService::getAllDurableCqsFromServer:", err);
+ throwExceptionIfError("CqService::getAllDurableCqsFromServer:", err);
}
if (reply.getMessageType() == TcrMessage::EXCEPTION ||
reply.getMessageType() == TcrMessage::GET_DURABLE_CQS_DATA_ERROR) {
@@ -571,7 +571,7 @@ std::shared_ptr<CacheableArrayList> CqService::getAllDurableCqsFromServer() {
<< "at the server side: " << reply.getException();
throw CqQueryException(message.str());
} else {
- GfErrTypeToException("CqService::getAllDurableCqsFromServer", err);
+ throwExceptionIfError("CqService::getAllDurableCqsFromServer", err);
}
}
diff --git a/cppcache/src/ErrType.hpp b/cppcache/src/ErrType.hpp
index bf8520d..6865a95 100644
--- a/cppcache/src/ErrType.hpp
+++ b/cppcache/src/ErrType.hpp
@@ -40,7 +40,7 @@ typedef enum {
GF_NOTSUP = 12, /**< operation not supported */
GF_SCPGBL = 13, /**< attempt to exit global scope */
GF_SCPEXC = 14, /**< maximum scopes exceeded */
- GF_TIMOUT = 15, /**< operation timed out */
+ GF_TIMEOUT = 15, /**< operation timed out */
GF_OVRFLW = 16, /**< arithmetic overflow */
GF_IOERR = 17, /**< paging file I/O error */
GF_EINTR = 18, /**< interrupted Geode call */
diff --git a/cppcache/src/ExceptionTypes.cpp b/cppcache/src/ExceptionTypes.cpp
index 366d189..2f31d17 100644
--- a/cppcache/src/ExceptionTypes.cpp
+++ b/cppcache/src/ExceptionTypes.cpp
@@ -165,7 +165,7 @@ const std::string& getThreadLocalExceptionMessage();
setThreadLocalExceptionMessage(nullptr);
throw ex;
}
- case GF_TIMOUT: {
+ case GF_TIMEOUT: {
message.append(!exMsg.empty() ? exMsg : ": timed out");
TimeoutException ex(message);
setThreadLocalExceptionMessage(nullptr);
diff --git a/cppcache/src/ExecutionImpl.cpp b/cppcache/src/ExecutionImpl.cpp
index bf5dc11..4dcb20e 100644
--- a/cppcache/src/ExecutionImpl.cpp
+++ b/cppcache/src/ExecutionImpl.cpp
@@ -119,7 +119,7 @@ std::shared_ptr<ResultCollector> ExecutionImpl::execute(
err = getFuncAttributes(func, &attr);
}
if (err != GF_NOERR) {
- GfErrTypeToException("Execute::GET_FUNCTION_ATTRIBUTES", err);
+ throwExceptionIfError("Execute::GET_FUNCTION_ATTRIBUTES", err);
}
if (!attr->empty() && err == GF_NOERR) {
m_func_attrs[func] = attr;
@@ -437,14 +437,14 @@ void ExecutionImpl::executeOnAllServers(const std::string& func,
throw FunctionExecutionException(
"Execute: failed to execute function with server.");
} else {
- GfErrTypeToException("Execute", err);
+ throwExceptionIfError("Execute", err);
}
}
if (err == GF_AUTHENTICATION_FAILED_EXCEPTION ||
err == GF_NOT_AUTHORIZED_EXCEPTION ||
err == GF_AUTHENTICATION_REQUIRED_EXCEPTION) {
- GfErrTypeToException("Execute", err);
+ throwExceptionIfError("Execute", err);
}
if (err != GF_NOERR) {
@@ -515,7 +515,7 @@ std::shared_ptr<CacheableVector> ExecutionImpl::executeOnPool(
reply.getException());
}
if (ThinClientBaseDM::isFatalClientError(err)) {
- GfErrTypeToException("ExecuteOnPool:", err);
+ throwExceptionIfError("ExecuteOnPool:", err);
} else if (err != GF_NOERR) {
if (getResult & 1) {
resultCollector->reset();
@@ -527,7 +527,7 @@ std::shared_ptr<CacheableVector> ExecutionImpl::executeOnPool(
}
continue;
} else {
- GfErrTypeToException("ExecuteOnPool:", err);
+ throwExceptionIfError("ExecuteOnPool:", err);
}
}
// auto values =
diff --git a/cppcache/src/LocalRegion.cpp b/cppcache/src/LocalRegion.cpp
index 7cc9ca4..07d45db 100644
--- a/cppcache/src/LocalRegion.cpp
+++ b/cppcache/src/LocalRegion.cpp
@@ -142,28 +142,28 @@ void LocalRegion::invalidateRegion(
const std::shared_ptr<Serializable>& aCallbackArgument) {
GfErrType err =
invalidateRegionNoThrow(aCallbackArgument, CacheEventFlags::NORMAL);
- GfErrTypeToException("Region::invalidateRegion", err);
+ throwExceptionIfError("Region::invalidateRegion", err);
}
void LocalRegion::localInvalidateRegion(
const std::shared_ptr<Serializable>& aCallbackArgument) {
GfErrType err =
invalidateRegionNoThrow(aCallbackArgument, CacheEventFlags::LOCAL);
- GfErrTypeToException("Region::localInvalidateRegion", err);
+ throwExceptionIfError("Region::localInvalidateRegion", err);
}
void LocalRegion::destroyRegion(
const std::shared_ptr<Serializable>& aCallbackArgument) {
GfErrType err =
destroyRegionNoThrow(aCallbackArgument, true, CacheEventFlags::NORMAL);
- GfErrTypeToException("Region::destroyRegion", err);
+ throwExceptionIfError("Region::destroyRegion", err);
}
void LocalRegion::localDestroyRegion(
const std::shared_ptr<Serializable>& aCallbackArgument) {
GfErrType err =
destroyRegionNoThrow(aCallbackArgument, true, CacheEventFlags::LOCAL);
- GfErrTypeToException("Region::localDestroyRegion", err);
+ throwExceptionIfError("Region::localDestroyRegion", err);
}
void LocalRegion::tombstoneOperationNoThrow(
@@ -334,7 +334,7 @@ std::shared_ptr<Cacheable> LocalRegion::get(
// rptr = handleReplay(err, rptr);
- GfErrTypeToException("Region::get", err);
+ throwExceptionIfError("Region::get", err);
return rptr;
}
@@ -350,7 +350,7 @@ void LocalRegion::put(const std::shared_ptr<CacheableKey>& key,
updateStatOpTime(m_regionStats->getStat(), m_regionStats->getPutTimeId(),
sampleStartNanos);
// handleReplay(err, nullptr);
- GfErrTypeToException("Region::put", err);
+ throwExceptionIfError("Region::put", err);
}
void LocalRegion::localPut(
@@ -361,7 +361,7 @@ void LocalRegion::localPut(
std::shared_ptr<VersionTag> versionTag;
GfErrType err = putNoThrow(key, value, aCallbackArgument, oldValue, -1,
CacheEventFlags::LOCAL, versionTag);
- GfErrTypeToException("Region::localPut", err);
+ throwExceptionIfError("Region::localPut", err);
}
void LocalRegion::putAll(
@@ -374,7 +374,7 @@ void LocalRegion::putAll(
updateStatOpTime(m_regionStats->getStat(), m_regionStats->getPutAllTimeId(),
sampleStartNanos);
// handleReplay(err, nullptr);
- GfErrTypeToException("Region::putAll", err);
+ throwExceptionIfError("Region::putAll", err);
}
void LocalRegion::removeAll(
@@ -387,7 +387,7 @@ void LocalRegion::removeAll(
GfErrType err = removeAllNoThrow(keys, aCallbackArgument);
updateStatOpTime(m_regionStats->getStat(),
m_regionStats->getRemoveAllTimeId(), sampleStartNanos);
- GfErrTypeToException("Region::removeAll", err);
+ throwExceptionIfError("Region::removeAll", err);
}
void LocalRegion::create(
@@ -398,7 +398,7 @@ void LocalRegion::create(
GfErrType err = createNoThrow(key, value, aCallbackArgument, -1,
CacheEventFlags::NORMAL, versionTag);
// handleReplay(err, nullptr);
- GfErrTypeToException("Region::create", err);
+ throwExceptionIfError("Region::create", err);
}
void LocalRegion::localCreate(
@@ -408,7 +408,7 @@ void LocalRegion::localCreate(
std::shared_ptr<VersionTag> versionTag;
GfErrType err = createNoThrow(key, value, aCallbackArgument, -1,
CacheEventFlags::LOCAL, versionTag);
- GfErrTypeToException("Region::localCreate", err);
+ throwExceptionIfError("Region::localCreate", err);
}
void LocalRegion::invalidate(
@@ -418,7 +418,7 @@ void LocalRegion::invalidate(
GfErrType err = invalidateNoThrow(key, aCallbackArgument, -1,
CacheEventFlags::NORMAL, versionTag);
// handleReplay(err, nullptr);
- GfErrTypeToException("Region::invalidate", err);
+ throwExceptionIfError("Region::invalidate", err);
}
void LocalRegion::localInvalidate(
@@ -427,7 +427,7 @@ void LocalRegion::localInvalidate(
std::shared_ptr<VersionTag> versionTag;
GfErrType err = invalidateNoThrow(keyPtr, aCallbackArgument, -1,
CacheEventFlags::LOCAL, versionTag);
- GfErrTypeToException("Region::localInvalidate", err);
+ throwExceptionIfError("Region::localInvalidate", err);
}
void LocalRegion::destroy(
@@ -438,7 +438,7 @@ void LocalRegion::destroy(
GfErrType err = destroyNoThrow(key, aCallbackArgument, -1,
CacheEventFlags::NORMAL, versionTag);
// handleReplay(err, nullptr);
- GfErrTypeToException("Region::destroy", err);
+ throwExceptionIfError("Region::destroy", err);
}
void LocalRegion::localDestroy(
@@ -447,7 +447,7 @@ void LocalRegion::localDestroy(
std::shared_ptr<VersionTag> versionTag;
GfErrType err = destroyNoThrow(key, aCallbackArgument, -1,
CacheEventFlags::LOCAL, versionTag);
- GfErrTypeToException("Region::localDestroy", err);
+ throwExceptionIfError("Region::localDestroy", err);
}
bool LocalRegion::remove(
@@ -463,7 +463,7 @@ bool LocalRegion::remove(
if (err == GF_NOERR) {
result = true;
} else if (err != GF_ENOENT && err != GF_CACHE_ENTRY_NOT_FOUND) {
- GfErrTypeToException("Region::remove", err);
+ throwExceptionIfError("Region::remove", err);
}
return result;
@@ -480,7 +480,7 @@ bool LocalRegion::removeEx(
if (err == GF_NOERR) {
result = true;
} else if (err != GF_ENOENT && err != GF_CACHE_ENTRY_NOT_FOUND) {
- GfErrTypeToException("Region::removeEx", err);
+ throwExceptionIfError("Region::removeEx", err);
}
return result;
@@ -499,7 +499,7 @@ bool LocalRegion::localRemove(
if (err == GF_NOERR) {
result = true;
} else if (err != GF_ENOENT && err != GF_CACHE_ENTRY_NOT_FOUND) {
- GfErrTypeToException("Region::localRemove", err);
+ throwExceptionIfError("Region::localRemove", err);
}
return result;
@@ -517,7 +517,7 @@ bool LocalRegion::localRemoveEx(
if (err == GF_NOERR) {
result = true;
} else if (err != GF_ENOENT && err != GF_CACHE_ENTRY_NOT_FOUND) {
- GfErrTypeToException("Region::localRemoveEx", err);
+ throwExceptionIfError("Region::localRemoveEx", err);
}
return result;
@@ -582,7 +582,7 @@ HashMapOfCacheable LocalRegion::getAll_internal(
updateStatOpTime(m_regionStats->getStat(), m_regionStats->getGetAllTimeId(),
sampleStartNanos);
- GfErrTypeToException("Region::getAll", err);
+ throwExceptionIfError("Region::getAll", err);
return *values;
}
@@ -2169,7 +2169,7 @@ void LocalRegion::clear(
void LocalRegion::localClear(
const std::shared_ptr<Serializable>& aCallbackArgument) {
GfErrType err = localClearNoThrow(aCallbackArgument, CacheEventFlags::LOCAL);
- if (err != GF_NOERR) GfErrTypeToException("LocalRegion::localClear", err);
+ if (err != GF_NOERR) throwExceptionIfError("LocalRegion::localClear", err);
}
GfErrType LocalRegion::localClearNoThrow(
const std::shared_ptr<Serializable>& aCallbackArgument,
diff --git a/cppcache/src/RemoteQuery.cpp b/cppcache/src/RemoteQuery.cpp
index fce5803..3a9abfe 100644
--- a/cppcache/src/RemoteQuery.cpp
+++ b/cppcache/src/RemoteQuery.cpp
@@ -82,7 +82,7 @@ std::shared_ptr<SelectResults> RemoteQuery::execute(
reply.setChunkedResultHandler(
static_cast<TcrChunkedResult*>(resultCollector));
GfErrType err = executeNoThrow(timeout, reply, func, tcdm, paramList);
- GfErrTypeToException(func, err);
+ throwExceptionIfError(func, err);
std::shared_ptr<SelectResults> sr;
diff --git a/cppcache/src/TcrConnection.cpp b/cppcache/src/TcrConnection.cpp
index d7a0f57..b2882f7 100644
--- a/cppcache/src/TcrConnection.cpp
+++ b/cppcache/src/TcrConnection.cpp
@@ -66,7 +66,7 @@ struct FinalizeProcessChunk {
}
};
-bool TcrConnection::InitTcrConnection(
+bool TcrConnection::initTcrConnection(
TcrEndpoint* endpointObj, const char* endpoint,
synchronized_set<std::unordered_set<uint16_t>>& ports,
bool isClientNotification, bool isSecondary,
@@ -943,9 +943,12 @@ void TcrConnection::readMessageChunked(TcrMessageReply& reply,
header = readChunkHeader(headerTimeout);
}
} catch (const Exception&) {
- auto ex = reply.getChunkedResultHandler()->getException();
- LOGDEBUG("Found existing exception ", ex->what());
- reply.getChunkedResultHandler()->clearException();
+ if (auto handler = reply.getChunkedResultHandler()) {
+ if (auto ex = handler->getException()) {
+ LOGDEBUG("Found existing exception ", ex->what());
+ handler->clearException();
+ }
+ }
throw;
}
diff --git a/cppcache/src/TcrConnection.hpp b/cppcache/src/TcrConnection.hpp
index 4374a96..fe18794 100644
--- a/cppcache/src/TcrConnection.hpp
+++ b/cppcache/src/TcrConnection.hpp
@@ -125,7 +125,7 @@ class APACHE_GEODE_EXPORT TcrConnection {
* @param ports List of local ports for connections to endpoint
* @param numPorts Size of ports list
*/
- bool InitTcrConnection(
+ bool initTcrConnection(
TcrEndpoint* endpointObj, const char* endpoint,
synchronized_set<std::unordered_set<uint16_t>>& ports,
bool isClientNotification = false, bool isSecondary = false,
diff --git a/cppcache/src/TcrEndpoint.cpp b/cppcache/src/TcrEndpoint.cpp
index 3418137..5ef3e0e 100644
--- a/cppcache/src/TcrEndpoint.cpp
+++ b/cppcache/src/TcrEndpoint.cpp
@@ -148,7 +148,7 @@ GfErrType TcrEndpoint::createNewConnectionWL(
LOGFINE("TcrEndpoint::createNewConnectionWL got lock");
newConn =
new TcrConnection(m_cacheImpl->tcrConnectionManager(), m_connected);
- newConn->InitTcrConnection(this, m_name.c_str(), m_ports,
+ newConn->initTcrConnection(this, m_name.c_str(), m_ports,
isClientNotification, isSecondary,
connectTimeout);
@@ -183,7 +183,7 @@ GfErrType TcrEndpoint::createNewConnectionWL(
GfErrType TcrEndpoint::createNewConnection(
TcrConnection*& newConn, bool isClientNotification, bool isSecondary,
std::chrono::microseconds connectTimeout, int32_t timeoutRetries,
- bool sendUpdateNotification, bool appThreadRequest) {
+ bool appThreadRequest) {
LOGFINE(
"TcrEndpoint::createNewConnection: connectTimeout =%d "
"m_needToConnectInLock=%d appThreadRequest =%d",
@@ -196,7 +196,7 @@ GfErrType TcrEndpoint::createNewConnection(
if (!needtoTakeConnectLock() || !appThreadRequest) {
newConn = new TcrConnection(m_cacheImpl->tcrConnectionManager(),
m_connected);
- bool authenticate = newConn->InitTcrConnection(
+ bool authenticate = newConn->initTcrConnection(
this, m_name.c_str(), m_ports, isClientNotification, isSecondary,
connectTimeout);
if (authenticate) {
@@ -212,30 +212,11 @@ GfErrType TcrEndpoint::createNewConnection(
}
// m_connected = true;
}
- if (!isClientNotification && sendUpdateNotification) {
- bool notificationStarted;
- {
- std::lock_guard<decltype(m_notifyReceiverLock)> guard(
- m_notifyReceiverLock);
- notificationStarted = (m_numRegionListener > 0) || m_isQueueHosted;
- }
- if (notificationStarted) {
- LOGFINE("Sending update notification message to endpoint %s",
- m_name.c_str());
- TcrMessageUpdateClientNotification updateNotificationMsg(
- new DataOutput(newConn->getConnectionManager()
- .getCacheImpl()
- ->createDataOutput()),
- static_cast<int32_t>(newConn->getPort()));
- newConn->send(updateNotificationMsg.getMsgData(),
- updateNotificationMsg.getMsgLength());
- }
- }
err = GF_NOERR;
break;
} catch (const TimeoutException&) {
LOGINFO("Timeout in handshake with endpoint[%s]", m_name.c_str());
- err = GF_TIMOUT;
+ err = GF_TIMEOUT;
m_needToConnectInLock = true; // while creating the connection
std::this_thread::sleep_for(std::chrono::milliseconds(50));
} catch (const GeodeIOException& ex) {
@@ -284,10 +265,10 @@ GfErrType TcrEndpoint::createNewConnection(
void TcrEndpoint::authenticateEndpoint(TcrConnection*& conn) {
LOGDEBUG(
"TcrEndpoint::authenticateEndpoint m_isAuthenticated = %d "
- "this->m_baseDM = %d",
- m_isAuthenticated, m_baseDM);
+ "m_baseDM = %d, connection = %p",
+ m_isAuthenticated, m_baseDM, conn);
if (!m_isAuthenticated && m_baseDM) {
- this->setConnected();
+ setConnected();
std::lock_guard<decltype(m_endpointAuthenticationLock)> guard(
m_endpointAuthenticationLock);
GfErrType err = GF_NOERR;
@@ -304,11 +285,14 @@ void TcrEndpoint::authenticateEndpoint(TcrConnection*& conn) {
new DataOutput(m_cacheImpl->createDataOutput()), creds, m_baseDM);
LOGDEBUG("request is created");
- TcrMessageReply reply(true, this->m_baseDM);
- // err = this->sendRequestToEP(request, reply, ( *it ).int_id_);
- err = this->sendRequestConnWithRetry(request, reply, conn);
- LOGDEBUG("authenticateEndpoint error = %d", err);
+ TcrMessageReply reply(true, m_baseDM);
+ err = sendRequestConnWithRetry(request, reply, conn);
+ LOGDEBUG("TcrEndpoint::authenticateEndpoint - ERROR: %d", err);
if (err == GF_NOERR) {
+ LOGDEBUG(
+ "TcrEndpoint::authenticateEndpoint - successfully authenticated on "
+ "conn %p",
+ conn);
// put the object into local region
switch (reply.getMessageType()) {
case TcrMessage::RESPONSE: {
@@ -329,7 +313,7 @@ void TcrEndpoint::authenticateEndpoint(TcrConnection*& conn) {
}
}
// throw exception if it is not authenticated
- GfErrTypeToException("TcrEndpoint::authenticateEndpoint", err);
+ throwExceptionIfError("TcrEndpoint::authenticateEndpoint", err);
m_isAuthenticated = true;
}
@@ -535,7 +519,7 @@ void TcrEndpoint::pingServer(ThinClientPoolDM* poolDM) {
if (error == GF_NOERR) {
m_pingSent = true;
}
- if (error == GF_TIMOUT && m_pingTimeouts < 2) {
+ if (error == GF_TIMEOUT && m_pingTimeouts < 2) {
++m_pingTimeouts;
} else {
m_pingTimeouts = 0;
@@ -595,7 +579,7 @@ void TcrEndpoint::receiveNotification(std::atomic<bool>& isRunning) {
msg = new TcrMessageReply(true, m_baseDM);
msg->initCqMap();
msg->setData(data, static_cast<int32_t>(dataLen),
- this->getDistributedMemberID(),
+ getDistributedMemberID(),
*(m_cacheImpl->getSerializationRegistry()),
*(m_cacheImpl->getMemberListForVersionStamp()));
handleNotificationStats(static_cast<int64_t>(dataLen));
@@ -828,7 +812,7 @@ GfErrType TcrEndpoint::sendRequestConn(const TcrMessage& request,
reply.getTimeout(), request.getMessageType());
reply.setMessageTypeRequest(type);
reply.setData(
- data, static_cast<int32_t>(dataLen), this->getDistributedMemberID(),
+ data, static_cast<int32_t>(dataLen), getDistributedMemberID(),
*(m_cacheImpl->getSerializationRegistry()),
*(m_cacheImpl
->getMemberListForVersionStamp())); // memory is released by
@@ -972,7 +956,7 @@ GfErrType TcrEndpoint::sendRequestWithRetry(
return GF_NOERR;
}
} catch (const TimeoutException&) {
- error = GF_TIMOUT;
+ error = GF_TIMEOUT;
LOGFINE(
"Send timed out for endpoint %s. "
"Message txid = %d",
@@ -1059,7 +1043,7 @@ GfErrType TcrEndpoint::sendRequestWithRetry(
epFailure = true;
failReason = "server connection could not be obtained";
if (timeout <= std::chrono::microseconds::zero()) {
- error = GF_TIMOUT;
+ error = GF_TIMEOUT;
LOGWARN(
"No connection available for %ld seconds "
"for endpoint %s.",
diff --git a/cppcache/src/TcrEndpoint.hpp b/cppcache/src/TcrEndpoint.hpp
index f1cea69..e80ca33 100644
--- a/cppcache/src/TcrEndpoint.hpp
+++ b/cppcache/src/TcrEndpoint.hpp
@@ -157,8 +157,7 @@ class TcrEndpoint {
TcrConnection*& newConn, bool isClientNotification = false,
bool isSecondary = false,
std::chrono::microseconds connectTimeout = DEFAULT_CONNECT_TIMEOUT,
- int32_t timeoutRetries = 1, bool sendUpdateNotification = true,
- bool appThreadRequest = false);
+ int32_t timeoutRetries = 1, bool appThreadRequest = false);
bool needtoTakeConnectLock();
diff --git a/cppcache/src/TcrMessage.cpp b/cppcache/src/TcrMessage.cpp
index 174c801..ac6d478 100644
--- a/cppcache/src/TcrMessage.cpp
+++ b/cppcache/src/TcrMessage.cpp
@@ -81,11 +81,11 @@ inline void writeInt(uint8_t* buffer, uint32_t value) {
extern void setThreadLocalExceptionMessage(const char*);
// AtomicInc TcrMessage::m_transactionId = 0;
-uint8_t* TcrMessage::m_keepalive = nullptr;
+uint8_t* TcrMessage::m_keepAlive = nullptr;
const int TcrMessage::m_flag_empty = 0x01;
const int TcrMessage::m_flag_concurrency_checks = 0x02;
-bool TcrMessage::isKeepAlive() { return *m_keepalive > 0; }
+bool TcrMessage::isKeepAlive() { return (m_keepAlive && (*m_keepAlive > 0)); }
bool TcrMessage::isUserInitiativeOps(const TcrMessage& msg) {
int32_t msgType = msg.getMessageType();
@@ -358,8 +358,8 @@ TcrMessage* TcrMessage::getCloseConnMessage(CacheImpl* cacheImpl) {
void TcrMessage::setKeepAlive(bool keepalive) {
// TODO global
- if (TcrMessage::m_keepalive != nullptr) {
- *TcrMessage::m_keepalive = keepalive ? 1 : 0;
+ if (TcrMessage::m_keepAlive != nullptr) {
+ *TcrMessage::m_keepAlive = keepalive ? 1 : 0;
}
}
@@ -2084,7 +2084,7 @@ TcrMessageCloseConnection::TcrMessageCloseConnection(DataOutput* dataOutput,
m_request->writeInt(static_cast<int32_t>(1)); // len is 1
m_request->write(static_cast<int8_t>(0)); // is obj is '0'.
// cast away constness here since we want to modify this
- TcrMessage::m_keepalive = const_cast<uint8_t*>(m_request->getCursor());
+ TcrMessage::m_keepAlive = const_cast<uint8_t*>(m_request->getCursor());
m_request->write(static_cast<int8_t>(0)); // keepalive is '0'.
}
@@ -2469,16 +2469,6 @@ TcrMessageRemoveAll::TcrMessageRemoveAll(
writeMessageLength();
}
-TcrMessageUpdateClientNotification::TcrMessageUpdateClientNotification(
- DataOutput* dataOutput, int32_t port) {
- m_msgType = TcrMessage::UPDATE_CLIENT_NOTIFICATION;
- m_request.reset(dataOutput);
-
- writeHeader(m_msgType, 1);
- writeIntPart(port);
- writeMessageLength();
-}
-
TcrMessageGetAll::TcrMessageGetAll(
DataOutput* dataOutput, const Region* region,
const std::vector<std::shared_ptr<CacheableKey>>* keys,
@@ -2805,7 +2795,7 @@ void TcrMessage::createUserCredentialMessage(TcrConnection* conn) {
writeObjectPart(encryptBytes);
writeMessageLength();
- LOGDEBUG("TcrMessage CUCM() = %s ",
+ LOGDEBUG("TcrMessage::createUserCredentialMessage msg = %s ",
Utils::convertBytesToString(m_request->getBuffer(),
m_request->getBufferLength())
.c_str());
@@ -2835,13 +2825,16 @@ void TcrMessage::addSecurityPart(int64_t connectionId, int64_t unique_id,
auto encryptBytes = conn->encryptBytes(bytes);
+ LOGDEBUG("TcrMessage::addSecurityPart [%p] length = %" PRId32
+ ", encrypted ID = %s ",
+ conn, encryptBytes->length(),
+ Utils::convertBytesToString(encryptBytes->value().data(),
+ encryptBytes->length())
+ .c_str());
+
writeObjectPart(encryptBytes);
writeMessageLength();
m_securityHeaderLength = 4 + 1 + encryptBytes->length();
- LOGDEBUG("TcrMessage addsp = %s ",
- Utils::convertBytesToString(m_request->getBuffer(),
- m_request->getBufferLength())
- .c_str());
}
void TcrMessage::addSecurityPart(int64_t connectionId, TcrConnection* conn) {
diff --git a/cppcache/src/TcrMessage.hpp b/cppcache/src/TcrMessage.hpp
index e1ac840..f2d2cd7 100644
--- a/cppcache/src/TcrMessage.hpp
+++ b/cppcache/src/TcrMessage.hpp
@@ -471,7 +471,7 @@ class TcrMessage {
uint8_t m_hasResult;
static std::atomic<int32_t> m_transactionId;
- static uint8_t* m_keepalive;
+ static uint8_t* m_keepAlive;
const static int m_flag_empty;
const static int m_flag_concurrency_checks;
@@ -888,13 +888,6 @@ class TcrMessagePeriodicAck : public TcrMessage {
~TcrMessagePeriodicAck() override = default;
};
-class TcrMessageUpdateClientNotification : public TcrMessage {
- public:
- TcrMessageUpdateClientNotification(DataOutput* dataOutput, int32_t port);
-
- ~TcrMessageUpdateClientNotification() override = default;
-};
-
class TcrMessageGetAll : public TcrMessage {
public:
TcrMessageGetAll(
diff --git a/cppcache/src/ThinClientDistributionManager.cpp b/cppcache/src/ThinClientDistributionManager.cpp
index 5365ffc..a9a30e2 100644
--- a/cppcache/src/ThinClientDistributionManager.cpp
+++ b/cppcache/src/ThinClientDistributionManager.cpp
@@ -58,7 +58,7 @@ void ThinClientDistributionManager::init() {
m_endpoints[m_activeEndpoint]->name().c_str());
} else if (isFatalError(err)) {
m_connManager.disconnect(this, m_endpoints);
- GfErrTypeToException("ThinClientDistributionManager::init", err);
+ throwExceptionIfError("ThinClientDistributionManager::init", err);
}
}
ThinClientBaseDM::init();
@@ -157,7 +157,7 @@ GfErrType ThinClientDistributionManager::sendSyncRequest(TcrMessage& request,
type == TcrMessage::EXECUTE_REGION_FUNCTION ||
type == TcrMessage::EXECUTE_REGION_FUNCTION_SINGLE_HOP ||
type == TcrMessage::EXECUTECQ_WITH_IR_MSG_TYPE) &&
- error == GF_TIMOUT) {
+ error == GF_TIMEOUT) {
forceSelect = true;
}
@@ -178,7 +178,7 @@ GfErrType ThinClientDistributionManager::sendSyncRequest(TcrMessage& request,
type == TcrMessage::EXECUTE_REGION_FUNCTION ||
type == TcrMessage::EXECUTE_REGION_FUNCTION_SINGLE_HOP ||
type == TcrMessage::EXECUTECQ_WITH_IR_MSG_TYPE) &&
- error == GF_TIMOUT) {
+ error == GF_TIMEOUT) {
return error;
}
currentEndpoint = m_activeEndpoint;
@@ -370,7 +370,7 @@ GfErrType ThinClientDistributionManager::sendUserCredentials(
}
}
// throw exception if it is not authenticated
- // GfErrTypeToException("ThinClientDistributionManager::sendUserCredentials",
+ // throwExceptionIfError("ThinClientDistributionManager::sendUserCredentials",
// err);
}
diff --git a/cppcache/src/ThinClientPoolDM.cpp b/cppcache/src/ThinClientPoolDM.cpp
index f2d4385..0560393 100644
--- a/cppcache/src/ThinClientPoolDM.cpp
+++ b/cppcache/src/ThinClientPoolDM.cpp
@@ -171,7 +171,7 @@ ThinClientPoolDM::ThinClientPoolDM(const char* name,
auto& sysProp = distributedSystem.getSystemProperties();
// to set security flag at pool level
- this->m_isSecurityOn = cacheImpl->getAuthInitialize() != nullptr;
+ m_isSecurityOn = cacheImpl->getAuthInitialize() != nullptr;
ACE_TCHAR hostName[256];
ACE_OS::hostname(hostName, sizeof(hostName) - 1);
@@ -219,17 +219,16 @@ void ThinClientPoolDM::init() {
LOGDEBUG("ThinClientPoolDM::init: Starting pool initialization");
auto cacheImpl = m_connManager.getCacheImpl();
auto& sysProp = cacheImpl->getDistributedSystem().getSystemProperties();
- m_isMultiUserMode = this->getMultiuserAuthentication();
+ m_isMultiUserMode = getMultiuserAuthentication();
if (m_isMultiUserMode) {
LOGINFO("Multiuser authentication is enabled for pool %s",
m_poolName.c_str());
}
// to set security flag at pool level
- this->m_isSecurityOn = cacheImpl->getAuthInitialize() != nullptr;
+ m_isSecurityOn = cacheImpl->getAuthInitialize() != nullptr;
- LOGDEBUG("ThinClientPoolDM::init: security in on/off = %d ",
- this->m_isSecurityOn);
+ LOGDEBUG("ThinClientPoolDM::init: security in on/off = %d ", m_isSecurityOn);
m_connManager.init(true);
@@ -673,7 +672,7 @@ GfErrType ThinClientPoolDM::sendRequestToAllServers(
err = funcExe->getResult();
if (err != GF_NOERR) {
if (funcExe->getException() == nullptr) {
- if (err == GF_TIMOUT) {
+ if (err == GF_TIMEOUT) {
getStats().incTimeoutClientOps();
} else {
getStats().incFailedClientOps();
@@ -939,7 +938,7 @@ int32_t ThinClientPoolDM::GetPDXIdForType(
err = sendSyncRequest(request, reply);
if (err != GF_NOERR) {
- GfErrTypeToException("Operation Failed", err);
+ throwExceptionIfError("Operation Failed", err);
} else if (reply.getMessageType() == TcrMessage::EXCEPTION) {
LOGDEBUG("ThinClientPoolDM::GetPDXTypeById: Exception = %s ",
reply.getException());
@@ -979,7 +978,7 @@ void ThinClientPoolDM::AddPdxType(std::shared_ptr<Serializable> pdxType,
err = sendSyncRequest(request, reply);
if (err != GF_NOERR) {
- GfErrTypeToException("Operation Failed", err);
+ throwExceptionIfError("Operation Failed", err);
} else if (reply.getMessageType() == TcrMessage::EXCEPTION) {
LOGDEBUG("ThinClientPoolDM::GetPDXTypeById: Exception = %s ",
reply.getException());
@@ -1000,7 +999,7 @@ std::shared_ptr<Serializable> ThinClientPoolDM::GetPDXTypeById(int32_t typeId) {
err = sendSyncRequest(request, reply);
if (err != GF_NOERR) {
- GfErrTypeToException("Operation Failed", err);
+ throwExceptionIfError("Operation Failed", err);
} else if (reply.getMessageType() == TcrMessage::EXCEPTION) {
LOGDEBUG("ThinClientPoolDM::GetPDXTypeById: Exception = %s ",
reply.getException());
@@ -1024,7 +1023,7 @@ int32_t ThinClientPoolDM::GetEnumValue(std::shared_ptr<Serializable> enumInfo) {
err = sendSyncRequest(request, reply);
if (err != GF_NOERR) {
- GfErrTypeToException("Operation Failed", err);
+ throwExceptionIfError("Operation Failed", err);
} else if (reply.getMessageType() == TcrMessage::EXCEPTION) {
LOGDEBUG("ThinClientPoolDM::GetEnumValue: Exception = %s ",
reply.getException());
@@ -1063,7 +1062,7 @@ std::shared_ptr<Serializable> ThinClientPoolDM::GetEnum(int32_t val) {
err = sendSyncRequest(request, reply);
if (err != GF_NOERR) {
- GfErrTypeToException("Operation Failed", err);
+ throwExceptionIfError("Operation Failed", err);
} else if (reply.getMessageType() == TcrMessage::EXCEPTION) {
LOGDEBUG("ThinClientPoolDM::GetEnum: Exception = %s ",
reply.getException());
@@ -1088,7 +1087,7 @@ void ThinClientPoolDM::AddEnum(std::shared_ptr<Serializable> enumInfo,
err = sendSyncRequest(request, reply);
if (err != GF_NOERR) {
- GfErrTypeToException("Operation Failed", err);
+ throwExceptionIfError("Operation Failed", err);
} else if (reply.getMessageType() == TcrMessage::EXCEPTION) {
LOGDEBUG("ThinClientPoolDM::AddEnum: Exception = %s ",
reply.getException());
@@ -1214,7 +1213,7 @@ TcrEndpoint* ThinClientPoolDM::getEndPoint(
// if servergroup is there, then verify otherwise you may reach to another
// group
if (m_attrs->m_initLocList.size()) {
- auto&& servGrp = this->getServerGroup();
+ auto&& servGrp = getServerGroup();
if (servGrp.length() > 0) {
auto groups = serverLocation->getServerGroups();
if ((groups != nullptr) && (groups->length() > 0)) {
@@ -1340,8 +1339,8 @@ GfErrType ThinClientPoolDM::sendSyncRequest(
type == TcrMessage::EXECUTE_REGION_FUNCTION_SINGLE_HOP ||
type == TcrMessage::EXECUTECQ_WITH_IR_MSG_TYPE)) {
// set only when message is not query, putall and executeCQ
- reply.setTimeout(this->getReadTimeout());
- request.setTimeout(this->getReadTimeout());
+ reply.setTimeout(getReadTimeout());
+ request.setTimeout(getReadTimeout());
}
bool retryAllEPsOnce = false;
@@ -1370,7 +1369,7 @@ GfErrType ThinClientPoolDM::sendSyncRequest(
type == TcrMessage::EXECUTE_REGION_FUNCTION ||
type == TcrMessage::EXECUTE_REGION_FUNCTION_SINGLE_HOP ||
type == TcrMessage::EXECUTECQ_WITH_IR_MSG_TYPE) &&
- error == GF_TIMOUT) {
+ error == GF_TIMEOUT) {
return error;
}
@@ -1381,8 +1380,7 @@ GfErrType ThinClientPoolDM::sendSyncRequest(
bool isUserNeedToReAuthenticate = false;
bool singleHopConnFound = false;
bool connFound = false;
- if (!this->m_isMultiUserMode ||
- (!TcrMessage::isUserInitiativeOps(request))) {
+ if (!m_isMultiUserMode || (!TcrMessage::isUserInitiativeOps(request))) {
conn = getConnectionFromQueueW(&queueErr, excludeServers, isBGThread,
request, version, singleHopConnFound,
connFound, serverLocation);
@@ -1434,7 +1432,7 @@ GfErrType ThinClientPoolDM::sendSyncRequest(
"ThinClientPoolDM::sendSyncRequest: isUserNeedToReAuthenticate = %d ",
isUserNeedToReAuthenticate);
LOGDEBUG(
- "ThinClientPoolDM::sendSyncRequest: m_isMultiUserMode = %d conn = %d "
+ "ThinClientPoolDM::sendSyncRequest: m_isMultiUserMode = %d conn = %p "
"type = %d",
m_isMultiUserMode, conn, type);
@@ -1460,14 +1458,14 @@ GfErrType ThinClientPoolDM::sendSyncRequest(
GfErrType userCredMsgErr = GF_NOERR;
bool isServerException = false;
if (TcrMessage::isUserInitiativeOps(request) &&
- (this->m_isSecurityOn || this->m_isMultiUserMode)) {
- if (!this->m_isMultiUserMode && !ep->isAuthenticated()) {
+ (m_isSecurityOn || m_isMultiUserMode)) {
+ if (!m_isMultiUserMode && !ep->isAuthenticated()) {
// first authenticate him on this endpoint
- userCredMsgErr = this->sendUserCredentials(
- this->getCredentials(ep), conn, isBGThread, isServerException);
+ userCredMsgErr = sendUserCredentials(getCredentials(ep), conn,
+ isBGThread, isServerException);
} else if (isUserNeedToReAuthenticate) {
- userCredMsgErr = this->sendUserCredentials(
- userAttr->getCredentials(), conn, isBGThread, isServerException);
+ userCredMsgErr = sendUserCredentials(userAttr->getCredentials(), conn,
+ isBGThread, isServerException);
}
}
@@ -1492,7 +1490,7 @@ GfErrType ThinClientPoolDM::sendSyncRequest(
// for Sticky conn.
LOGDEBUG("putting connection back in queue DONE");
} else {
- if (error != GF_TIMOUT) removeEPConnections(ep);
+ if (error != GF_TIMEOUT) removeEPConnections(ep);
// Update stats for the connection that failed.
removeEPConnections(1, false);
setStickyNull(isBGThread ||
@@ -1513,11 +1511,11 @@ GfErrType ThinClientPoolDM::sendSyncRequest(
}
if (error == GF_NOERR) {
- if ((this->m_isSecurityOn || this->m_isMultiUserMode)) {
+ if ((m_isSecurityOn || m_isMultiUserMode)) {
if (reply.getMessageType() == TcrMessage::EXCEPTION) {
if (isAuthRequireException(reply.getException())) {
TcrEndpoint* ep = conn->getEndpointObject();
- if (!this->m_isMultiUserMode) {
+ if (!m_isMultiUserMode) {
ep->setAuthenticated(false);
} else if (userAttr != nullptr) {
userAttr->unAuthenticateEP(ep);
@@ -1572,7 +1570,7 @@ GfErrType ThinClientPoolDM::sendSyncRequest(
getStats().setCurClientOps(--m_clientOps);
if (error == GF_NOERR) {
getStats().incSucceedClientOps(); /*inc Id for clientOs stat*/
- } else if (error == GF_TIMOUT) {
+ } else if (error == GF_TIMEOUT) {
getStats().incTimeoutClientOps();
} else {
getStats().incFailedClientOps();
@@ -1592,7 +1590,7 @@ GfErrType ThinClientPoolDM::sendSyncRequest(
if (error == GF_NOERR) {
getStats().incSucceedClientOps();
- } else if (error == GF_TIMOUT) {
+ } else if (error == GF_TIMEOUT) {
getStats().incTimeoutClientOps();
} else {
getStats().incFailedClientOps();
@@ -1736,7 +1734,7 @@ GfErrType ThinClientPoolDM::createPoolConnectionToAEndPoint(
->getDistributedSystem()
.getSystemProperties()
.connectTimeout(),
- false, true, appThreadrequest);
+ false, appThreadrequest);
if (conn == nullptr || error != GF_NOERR) {
LOGFINE("2Failed to connect to %s", theEP->name().c_str());
if (conn != nullptr) _GEODE_SAFE_DELETE(conn);
@@ -1948,7 +1946,7 @@ GfErrType ThinClientPoolDM::sendRequestToEP(const TcrMessage& request,
type == TcrMessage::EXECUTE_REGION_FUNCTION ||
type == TcrMessage::EXECUTE_REGION_FUNCTION_SINGLE_HOP ||
type == TcrMessage::EXECUTECQ_WITH_IR_MSG_TYPE)) {
- reply.setTimeout(this->getReadTimeout());
+ reply.setTimeout(getReadTimeout());
}
reply.setDM(this);
@@ -1956,24 +1954,24 @@ GfErrType ThinClientPoolDM::sendRequestToEP(const TcrMessage& request,
// in multi user mode need to chk whether user is authenticated or not
// and then follow usual process which we did in send syncrequest.
// need to user initiative ops
- LOGDEBUG("ThinClientPoolDM::sendRequestToEP: this->m_isMultiUserMode = %d",
- this->m_isMultiUserMode);
+ LOGDEBUG("ThinClientPoolDM::sendRequestToEP: m_isMultiUserMode = %d",
+ m_isMultiUserMode);
bool isServerException = false;
if (TcrMessage::isUserInitiativeOps((request)) &&
- (this->m_isSecurityOn || this->m_isMultiUserMode)) {
- if (!this->m_isMultiUserMode && !currentEndpoint->isAuthenticated()) {
+ (m_isSecurityOn || m_isMultiUserMode)) {
+ if (!m_isMultiUserMode && !currentEndpoint->isAuthenticated()) {
// first authenticate him on this endpoint
- error = this->sendUserCredentials(this->getCredentials(currentEndpoint),
- conn, false, isServerException);
- } else if (this->m_isMultiUserMode) {
+ error = sendUserCredentials(getCredentials(currentEndpoint), conn,
+ false, isServerException);
+ } else if (m_isMultiUserMode) {
ua = UserAttributes::threadLocalUserAttributes;
if (ua) {
UserConnectionAttributes* uca =
ua->getConnectionAttribute(currentEndpoint);
if (uca == nullptr) {
- error = this->sendUserCredentials(ua->getCredentials(), conn, false,
- isServerException);
+ error = sendUserCredentials(ua->getCredentials(), conn, false,
+ isServerException);
}
} else {
LOGWARN("Attempted operation type %d without credentials",
@@ -2020,10 +2018,10 @@ GfErrType ThinClientPoolDM::sendRequestToEP(const TcrMessage& request,
if (error == GF_NOERR || error == GF_CACHESERVER_EXCEPTION ||
error == GF_AUTHENTICATION_REQUIRED_EXCEPTION) {
- if ((this->m_isSecurityOn || this->m_isMultiUserMode)) {
+ if ((m_isSecurityOn || m_isMultiUserMode)) {
if (reply.getMessageType() == TcrMessage::EXCEPTION) {
if (isAuthRequireException(reply.getException())) {
- if (!this->m_isMultiUserMode) {
+ if (!m_isMultiUserMode) {
currentEndpoint->setAuthenticated(false);
} else if (ua != nullptr) {
ua->unAuthenticateEP(currentEndpoint);
@@ -2096,7 +2094,7 @@ void ThinClientPoolDM::updateLocatorList(std::atomic<bool>& isRunning) {
while (isRunning) {
m_updateLocatorListSema.acquire();
if (isRunning && !m_connManager.isNetDown()) {
- (m_locHelper)->updateLocators(this->getServerGroup());
+ (m_locHelper)->updateLocators(getServerGroup());
}
}
LOGFINE("Ending updateLocatorList thread for pool %s", m_poolName.c_str());
@@ -2328,7 +2326,7 @@ GfErrType ThinClientPoolDM::doFailover(TcrConnection* conn) {
new DataOutput(m_connManager.getCacheImpl()->createDataOutput()));
TcrMessageReply reply(true, nullptr);
- GfErrType err = this->sendSyncRequest(request, reply);
+ GfErrType err = sendSyncRequest(request, reply);
if (err == GF_NOERR) {
switch (reply.getMessageType()) {
diff --git a/cppcache/src/ThinClientPoolHADM.cpp b/cppcache/src/ThinClientPoolHADM.cpp
index 29b15ed..4b3685f 100644
--- a/cppcache/src/ThinClientPoolHADM.cpp
+++ b/cppcache/src/ThinClientPoolHADM.cpp
@@ -76,7 +76,7 @@ void ThinClientPoolHADM::startBackgroundThreads() {
"No locators were available during pool initialization with "
"subscription redundancy.");
} else {
- GfErrTypeToException("ThinClientPoolHADM::init", err);
+ throwExceptionIfError("ThinClientPoolHADM::init", err);
}
}
diff --git a/cppcache/src/ThinClientRedundancyManager.cpp b/cppcache/src/ThinClientRedundancyManager.cpp
index 598e1aa..fbea7c5 100644
--- a/cppcache/src/ThinClientRedundancyManager.cpp
+++ b/cppcache/src/ThinClientRedundancyManager.cpp
@@ -44,6 +44,8 @@ namespace apache {
namespace geode {
namespace client {
+const int MIN_RETRY_ATTEMPTS = 5;
+
const char* ThinClientRedundancyManager::NC_PerodicACK = "NC PerodicACK";
ThinClientRedundancyManager::ThinClientRedundancyManager(
@@ -831,9 +833,8 @@ GfErrType ThinClientRedundancyManager::sendSyncRequestCq(
int32_t attempts = static_cast<int32_t>(m_redundantEndpoints.size()) +
static_cast<int32_t>(m_nonredundantEndpoints.size());
- // TODO: FIXME: avoid magic number 5 for retry attempts
- attempts = attempts < 5
- ? 5
+ attempts = attempts < MIN_RETRY_ATTEMPTS
+ ? MIN_RETRY_ATTEMPTS
: attempts; // at least 5 attempts if ep lists are small.
AuthenticatedView* authenticatedView = nullptr;
@@ -865,7 +866,7 @@ GfErrType ThinClientRedundancyManager::sendSyncRequestCq(
gua.setAuthenticatedView(authenticatedView);
}
err = theHADM->sendRequestToEP(request, reply, primaryEndpoint);
- if (err == GF_NOERR || err == GF_TIMOUT ||
+ if (err == GF_NOERR || err == GF_TIMEOUT ||
ThinClientBaseDM::isFatalClientError(err)) {
break;
}
diff --git a/cppcache/src/ThinClientRegion.cpp b/cppcache/src/ThinClientRegion.cpp
index d812d32..11e2654 100644
--- a/cppcache/src/ThinClientRegion.cpp
+++ b/cppcache/src/ThinClientRegion.cpp
@@ -416,10 +416,10 @@ void ThinClientRegion::registerKeys(
interestPolicy, receiveValues);
if (m_tcrdm->isFatalError(err)) {
- GfErrTypeToException("Region::registerKeys", err);
+ throwExceptionIfError("Region::registerKeys", err);
}
- GfErrTypeToException("Region::registerKeys", err);
+ throwExceptionIfError("Region::registerKeys", err);
}
void ThinClientRegion::unregisterKeys(
@@ -452,7 +452,7 @@ void ThinClientRegion::unregisterKeys(
"keys vector is empty");
}
GfErrType err = unregisterKeysNoThrow(keys);
- GfErrTypeToException("Region::unregisterKeys", err);
+ throwExceptionIfError("Region::unregisterKeys", err);
}
void ThinClientRegion::registerAllKeys(bool isDurable, bool getInitialValues,
@@ -504,11 +504,11 @@ void ThinClientRegion::registerAllKeys(bool isDurable, bool getInitialValues,
interestPolicy, receiveValues);
if (m_tcrdm->isFatalError(err)) {
- GfErrTypeToException("Region::registerAllKeys", err);
+ throwExceptionIfError("Region::registerAllKeys", err);
}
// Get the entries from the server using a special GET_ALL message
- GfErrTypeToException("Region::registerAllKeys", err);
+ throwExceptionIfError("Region::registerAllKeys", err);
}
void ThinClientRegion::registerRegex(const std::string& regex, bool isDurable,
@@ -558,10 +558,10 @@ void ThinClientRegion::registerRegex(const std::string& regex, bool isDurable,
interestPolicy, receiveValues);
if (m_tcrdm->isFatalError(err)) {
- GfErrTypeToException("Region::registerRegex", err);
+ throwExceptionIfError("Region::registerRegex", err);
}
- GfErrTypeToException("Region::registerRegex", err);
+ throwExceptionIfError("Region::registerRegex", err);
}
void ThinClientRegion::unregisterRegex(const std::string& regex) {
@@ -584,7 +584,7 @@ void ThinClientRegion::unregisterRegex(const std::string& regex) {
}
GfErrType err = unregisterRegexNoThrow(regex);
- GfErrTypeToException("Region::unregisterRegex", err);
+ throwExceptionIfError("Region::unregisterRegex", err);
}
void ThinClientRegion::unregisterAllKeys() {
@@ -601,7 +601,7 @@ void ThinClientRegion::unregisterAllKeys() {
}
}
GfErrType err = unregisterRegexNoThrow(".*");
- GfErrTypeToException("Region::unregisterAllKeys", err);
+ throwExceptionIfError("Region::unregisterAllKeys", err);
}
std::shared_ptr<SelectResults> ThinClientRegion::query(
@@ -731,7 +731,7 @@ std::vector<std::shared_ptr<CacheableKey>> ThinClientRegion::serverKeys() {
err = m_tcrdm->sendSyncRequest(request, reply);
- GfErrTypeToException("Region::serverKeys", err);
+ throwExceptionIfError("Region::serverKeys", err);
switch (reply.getMessageType()) {
case TcrMessage::RESPONSE: {
@@ -754,7 +754,7 @@ std::vector<std::shared_ptr<CacheableKey>> ThinClientRegion::serverKeys() {
break;
}
}
- GfErrTypeToException("Region::serverKeys", err);
+ throwExceptionIfError("Region::serverKeys", err);
return serverKeys;
}
@@ -800,7 +800,7 @@ bool ThinClientRegion::containsKeyOnServer(
auto rptr = CacheableBoolean::create(ret);
rptr = std::dynamic_pointer_cast<CacheableBoolean>(handleReplay(err, rptr));
- GfErrTypeToException("Region::containsKeyOnServer ", err);
+ throwExceptionIfError("Region::containsKeyOnServer ", err);
return rptr->value();
}
@@ -848,7 +848,7 @@ bool ThinClientRegion::containsValueForKey_remote(
rptr = std::dynamic_pointer_cast<CacheableBoolean>(handleReplay(err, rptr));
- GfErrTypeToException("Region::containsValueForKey ", err);
+ throwExceptionIfError("Region::containsValueForKey ", err);
return rptr->value();
}
@@ -856,7 +856,7 @@ void ThinClientRegion::clear(
const std::shared_ptr<Serializable>& aCallbackArgument) {
GfErrType err = GF_NOERR;
err = localClearNoThrow(aCallbackArgument, CacheEventFlags::NORMAL);
- if (err != GF_NOERR) GfErrTypeToException("Region::clear", err);
+ if (err != GF_NOERR) throwExceptionIfError("Region::clear", err);
/** @brief Create message and send to bridge server */
@@ -865,7 +865,7 @@ void ThinClientRegion::clear(
std::chrono::milliseconds(-1), m_tcrdm.get());
TcrMessageReply reply(true, m_tcrdm.get());
err = m_tcrdm->sendSyncRequest(request, reply);
- if (err != GF_NOERR) GfErrTypeToException("Region::clear", err);
+ if (err != GF_NOERR) throwExceptionIfError("Region::clear", err);
switch (reply.getMessageType()) {
case TcrMessage::REPLY:
@@ -892,7 +892,7 @@ void ThinClientRegion::clear(
err = invokeCacheListenerForRegionEvent(
aCallbackArgument, CacheEventFlags::NORMAL, AFTER_REGION_CLEAR);
}
- GfErrTypeToException("Region::clear", err);
+ throwExceptionIfError("Region::clear", err);
}
GfErrType ThinClientRegion::getNoThrow_remote(
@@ -1983,7 +1983,7 @@ uint32_t ThinClientRegion::size_remote() {
err = m_tcrdm->sendSyncRequest(request, reply);
if (err != GF_NOERR) {
- GfErrTypeToException("Region::size", err);
+ throwExceptionIfError("Region::size", err);
}
switch (reply.getMessageType()) {
@@ -2004,7 +2004,7 @@ uint32_t ThinClientRegion::size_remote() {
err = GF_NOTOBJ;
}
- GfErrTypeToException("Region::size", err);
+ throwExceptionIfError("Region::size", err);
return 0;
}
@@ -2881,7 +2881,7 @@ void ThinClientRegion::registerInterestGetValues(
auto exceptions = std::make_shared<HashMapOfException>();
auto err = getAllNoThrow_remote(keys, nullptr, exceptions, resultKeys, true,
nullptr);
- GfErrTypeToException(method, err);
+ throwExceptionIfError(method, err);
// log any exceptions here
for (const auto& iter : *exceptions) {
LOGWARN("%s Exception for key %s:: %s: %s", method,
@@ -2988,7 +2988,7 @@ void ThinClientRegion::executeFunction(
}
if (ThinClientBaseDM::isFatalClientError(err)) {
- GfErrTypeToException("ExecuteOnRegion:", err);
+ throwExceptionIfError("ExecuteOnRegion:", err);
} else if (err != GF_NOERR) {
if (err == GF_FUNCTION_EXCEPTION) {
reExecute = true;
@@ -3009,17 +3009,17 @@ void ThinClientRegion::executeFunction(
"%d ",
attempt);
if (attempt > retryAttempts) {
- GfErrTypeToException("ExecuteOnRegion:", err);
+ throwExceptionIfError("ExecuteOnRegion:", err);
}
reExecuteForServ = true;
rc->clearResults();
failedNodes->clear();
- } else if (err == GF_TIMOUT) {
+ } else if (err == GF_TIMEOUT) {
LOGINFO(
"function timeout. Name: %s, timeout: %d, params: %d, "
"retryAttempts: %d ",
func.c_str(), timeout.count(), getResult, retryAttempts);
- GfErrTypeToException("ExecuteOnRegion", GF_TIMOUT);
+ throwExceptionIfError("ExecuteOnRegion", GF_TIMEOUT);
} else if (err == GF_CLIENT_WAIT_TIMEOUT ||
err == GF_CLIENT_WAIT_TIMEOUT_REFRESH_PRMETADATA) {
LOGINFO(
@@ -3027,10 +3027,10 @@ void ThinClientRegion::executeFunction(
"blacklisted. Name: %s, timeout: %d, params: %d, retryAttempts: "
"%d ",
func.c_str(), timeout.count(), getResult, retryAttempts);
- GfErrTypeToException("ExecuteOnRegion", GF_CLIENT_WAIT_TIMEOUT);
+ throwExceptionIfError("ExecuteOnRegion", GF_CLIENT_WAIT_TIMEOUT);
} else {
LOGDEBUG("executeFunction err = %d ", err);
- GfErrTypeToException("ExecuteOnRegion:", err);
+ throwExceptionIfError("ExecuteOnRegion:", err);
}
} else {
reExecute = false;
@@ -3081,7 +3081,7 @@ std::shared_ptr<CacheableVector> ThinClientRegion::reExecuteFunction(
}
if (ThinClientBaseDM::isFatalClientError(err)) {
- GfErrTypeToException("ExecuteOnRegion:", err);
+ throwExceptionIfError("ExecuteOnRegion:", err);
} else if (err != GF_NOERR) {
if (err == GF_FUNCTION_EXCEPTION) {
reExecute = true;
@@ -3104,17 +3104,17 @@ std::shared_ptr<CacheableVector> ThinClientRegion::reExecuteFunction(
"= %d ",
attempt);
if (attempt > retryAttempts) {
- GfErrTypeToException("ExecuteOnRegion:", err);
+ throwExceptionIfError("ExecuteOnRegion:", err);
}
reExecute = true;
rc->clearResults();
failedNodes->clear();
- } else if (err == GF_TIMOUT) {
+ } else if (err == GF_TIMEOUT) {
LOGINFO("function timeout");
- GfErrTypeToException("ExecuteOnRegion", GF_CACHE_TIMEOUT_EXCEPTION);
+ throwExceptionIfError("ExecuteOnRegion", GF_CACHE_TIMEOUT_EXCEPTION);
} else {
LOGDEBUG("reExecuteFunction err = %d ", err);
- GfErrTypeToException("ExecuteOnRegion:", err);
+ throwExceptionIfError("ExecuteOnRegion:", err);
}
}
} while (reExecute);
@@ -3218,7 +3218,7 @@ bool ThinClientRegion::executeFunctionSH(
}
if (abortError != GF_NOERR) {
- GfErrTypeToException("ExecuteOnRegion:", abortError);
+ throwExceptionIfError("ExecuteOnRegion:", abortError);
}
return reExecute;
}
@@ -3282,7 +3282,7 @@ void ThinClientRegion::txDestroy(
std::shared_ptr<VersionTag> versionTag) {
GfErrType err = destroyNoThrowTX(key, aCallbackArgument, -1,
CacheEventFlags::NORMAL, versionTag);
- GfErrTypeToException("Region::destroyTX", err);
+ throwExceptionIfError("Region::destroyTX", err);
}
void ThinClientRegion::txInvalidate(
@@ -3291,7 +3291,7 @@ void ThinClientRegion::txInvalidate(
std::shared_ptr<VersionTag> versionTag) {
GfErrType err = invalidateNoThrowTX(key, aCallbackArgument, -1,
CacheEventFlags::NORMAL, versionTag);
- GfErrTypeToException("Region::invalidateTX", err);
+ throwExceptionIfError("Region::invalidateTX", err);
}
void ThinClientRegion::txPut(
@@ -3306,7 +3306,7 @@ void ThinClientRegion::txPut(
updateStatOpTime(m_regionStats->getStat(), m_regionStats->getPutTimeId(),
sampleStartNanos);
- GfErrTypeToException("Region::putTX", err);
+ throwExceptionIfError("Region::putTX", err);
}
void ThinClientRegion::setProcessedMarker(bool) {}
diff --git a/cppcache/src/Utils.cpp b/cppcache/src/Utils.cpp
index a832446..48004c8 100644
--- a/cppcache/src/Utils.cpp
+++ b/cppcache/src/Utils.cpp
@@ -164,6 +164,12 @@ std::string Utils::convertBytesToString(const uint8_t* bytes, size_t length,
return "";
}
+std::string Utils::convertBytesToString(const int8_t* bytes, size_t length,
+ size_t maxLength) {
+ return Utils::convertBytesToString(reinterpret_cast<const uint8_t*>(bytes),
+ length, maxLength);
+}
+
int64_t Utils::startStatOpTime() {
return std::chrono::duration_cast<std::chrono::nanoseconds>(
std::chrono::steady_clock::now().time_since_epoch())
diff --git a/cppcache/src/Utils.hpp b/cppcache/src/Utils.hpp
index eb048fb..39de1c7 100644
--- a/cppcache/src/Utils.hpp
+++ b/cppcache/src/Utils.hpp
@@ -168,6 +168,13 @@ class APACHE_GEODE_EXPORT Utils {
* Convert the byte array to a string as "%d %d ...".
* <code>maxLength</code> as zero implies no limit.
*/
+ static std::string convertBytesToString(const int8_t* bytes, size_t length,
+ size_t maxLength = _GF_MSG_LIMIT);
+
+ /**
+ * Convert the byte array to a string as "%d %d ...".
+ * <code>maxLength</code> as zero implies no limit.
+ */
inline static std::string convertBytesToString(
const char* bytes, size_t length, size_t maxLength = _GF_MSG_LIMIT) {
return convertBytesToString(reinterpret_cast<const uint8_t*>(bytes), length,
diff --git a/cppcache/src/util/exception.hpp b/cppcache/src/util/exception.hpp
index fdebdb4..c148390 100644
--- a/cppcache/src/util/exception.hpp
+++ b/cppcache/src/util/exception.hpp
@@ -33,7 +33,7 @@ namespace client {
extern void APACHE_GEODE_EXPORT GfErrTypeThrowException(const char* str,
GfErrType err);
-#define GfErrTypeToException(str, err) \
+#define throwExceptionIfError(str, err) \
{ \
if (err != GF_NOERR) { \
GfErrTypeThrowException(str, err); \