You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2021/05/12 08:58:44 UTC

[celix] branch feature/cxx_rsa_update updated: Adds invoke integration gtest for remote services

This is an automated email from the ASF dual-hosted git repository.

pnoltes pushed a commit to branch feature/cxx_rsa_update
in repository https://gitbox.apache.org/repos/asf/celix.git


The following commit(s) were added to refs/heads/feature/cxx_rsa_update by this push:
     new dd844a1  Adds invoke integration gtest for remote services
dd844a1 is described below

commit dd844a1b3a4ce45b24c903d0064750e7b0ae15c5
Author: Pepijn Noltes <pe...@gmail.com>
AuthorDate: Wed May 12 10:58:27 2021 +0200

    Adds invoke integration gtest for remote services
---
 .../cxx_remote_services/integration/CMakeLists.txt | 16 +++---
 .../integration/gtest/CMakeLists.txt               |  2 +-
 .../src/RemoteServicesIntegrationTestSuite.cc      | 59 +++++++++++++++++++--
 .../resources/Calculator$add$Invoke.descriptor     |  2 +-
 .../src/TestExportImportRemoteServiceFactory.cc    | 60 +++++++++++++---------
 5 files changed, 99 insertions(+), 40 deletions(-)

diff --git a/bundles/cxx_remote_services/integration/CMakeLists.txt b/bundles/cxx_remote_services/integration/CMakeLists.txt
index 7f72d05..9034ee8 100644
--- a/bundles/cxx_remote_services/integration/CMakeLists.txt
+++ b/bundles/cxx_remote_services/integration/CMakeLists.txt
@@ -52,21 +52,19 @@ add_celix_container(RemoteCalculatorProvider
         GROUP rsa
         PROPERTIES
             CELIX_LOGGING_DEFAULT_ACTIVE_LOG_LEVEL=trace
-
-            #Configuration to let the pubsub zmq operate without dicscovery
-            #PSA_ZMQ_STATIC_BIND_URL_FOR_test_return=ipc:///tmp/pubsub-test-return
-            #PSA_ZMQ_STATIC_CONNECT_URL_FOR_test_invoke=ipc:///tmp/pubsub-test-invoke
         BUNDLES
             Celix::ShellCxx
             Celix::shell_tui
 
-            #Needed for remote services (full pubsub stack + remote services stack)
+            #Pubsub needed for remote services on pubsub
             Celix::pubsub_serializer_json
             Celix::pubsub_topology_manager
             #TODO replace with v1 when marker interfaces for a serializer type are introduced  Celix::pubsub_admin_zmq_v2
             Celix::pubsub_admin_zmq
             Celix::pubsub_protocol_wire_v2
             Celix::pubsub_discovery_etcd
+
+            #Remote Services
             Celix::RemoteServiceAdmin
             TestExportImportRemoteServiceFactory #needed to be able to create a ExportedService for ICalculator
 
@@ -77,20 +75,18 @@ add_celix_container(RemoteCalculatorConsumer
         GROUP rsa
         PROPERTIES
             CELIX_LOGGING_DEFAULT_ACTIVE_LOG_LEVEL=trace
-
-            #Configuration to let the pubsub zmq operate without dicscovery
-            #PSA_ZMQ_STATIC_BIND_URL_FOR_test_invoke=ipc:///tmp/pubsub-test-invoke
-            #PSA_ZMQ_STATIC_CONNECT_URL_FOR_test_return=ipc:///tmp/pubsub-test-return
         BUNDLES
             Celix::ShellCxx
             Celix::shell_tui
 
-            #Needed for remote services (full pubsub stack + remote services stack)
+            #Pubsub needed for remote services on pubsub
             Celix::pubsub_serializer_json
             Celix::pubsub_topology_manager
             Celix::pubsub_discovery_etcd
             Celix::pubsub_admin_zmq
             Celix::pubsub_protocol_wire_v2
+
+            #Remote Services
             Celix::RsaConfiguredDiscovery
             Celix::RemoteServiceAdmin
             TestExportImportRemoteServiceFactory #needed to be able to create a ExportedService for ICalculator
diff --git a/bundles/cxx_remote_services/integration/gtest/CMakeLists.txt b/bundles/cxx_remote_services/integration/gtest/CMakeLists.txt
index 9b03a0f..824bd48 100644
--- a/bundles/cxx_remote_services/integration/gtest/CMakeLists.txt
+++ b/bundles/cxx_remote_services/integration/gtest/CMakeLists.txt
@@ -18,7 +18,7 @@
 add_executable(test_cxx_remote_services_integration
     src/RemoteServicesIntegrationTestSuite.cc
 )
-target_link_libraries(test_cxx_remote_services_integration PRIVATE Celix::framework GTest::gtest GTest::gtest_main)
+target_link_libraries(test_cxx_remote_services_integration PRIVATE Celix::framework Celix::Promises Celix::shell_api GTest::gtest GTest::gtest_main)
 target_compile_options(test_cxx_remote_services_integration PRIVATE -std=c++17)
 target_include_directories(test_cxx_remote_services_integration PRIVATE ../include) #Add ICalculator
 
diff --git a/bundles/cxx_remote_services/integration/gtest/src/RemoteServicesIntegrationTestSuite.cc b/bundles/cxx_remote_services/integration/gtest/src/RemoteServicesIntegrationTestSuite.cc
index 9e460ba..3012f7e 100644
--- a/bundles/cxx_remote_services/integration/gtest/src/RemoteServicesIntegrationTestSuite.cc
+++ b/bundles/cxx_remote_services/integration/gtest/src/RemoteServicesIntegrationTestSuite.cc
@@ -19,6 +19,8 @@
 
 #include <gtest/gtest.h>
 
+#include "ICalculator.h"
+#include "celix_shell_command.h"
 #include "celix/FrameworkFactory.h"
 
 class RemoteServicesIntegrationTestSuite : public ::testing::Test {
@@ -26,20 +28,26 @@ public:
     RemoteServicesIntegrationTestSuite() {
         celix::Properties clientConfig{
                 {"CELIX_LOGGING_DEFAULT_ACTIVE_LOG_LEVEL", "trace"},
-                {celix::FRAMEWORK_CACHE_DIR, ".clientCache"}
+                {celix::FRAMEWORK_CACHE_DIR, ".clientCache"},
+                //Static configuration to let the pubsub zmq operate without discovery
+                {"PSA_ZMQ_STATIC_BIND_URL_FOR_test_invoke_default", "ipc:///tmp/pubsub-test-return"},
+                {"PSA_ZMQ_STATIC_CONNECT_URL_FOR_test_return_default", "ipc:///tmp/pubsub-test-invoke" }
         };
         clientFw = celix::createFramework(clientConfig);
         clientCtx = clientFw->getFrameworkBundleContext();
 
         celix::Properties serverConfig{
                 {"CELIX_LOGGING_DEFAULT_ACTIVE_LOG_LEVEL", "trace"},
-                {celix::FRAMEWORK_CACHE_DIR, ".serverCache"}
+                {celix::FRAMEWORK_CACHE_DIR, ".serverCache"},
+                //Static configuration to let the pubsub zmq operate without discovery
+                {"PSA_ZMQ_STATIC_BIND_URL_FOR_test_return_default", "ipc:///tmp/pubsub-test-invoke"},
+                {"PSA_ZMQ_STATIC_CONNECT_URL_FOR_test_invoke_default", "ipc:///tmp/pubsub-test-return" }
         };
         serverFw = celix::createFramework(serverConfig);
         serverCtx = serverFw->getFrameworkBundleContext();
     }
 
-    void installSharedBundles(std::shared_ptr<celix::BundleContext>& ctx) {
+    static void installSharedBundles(std::shared_ptr<celix::BundleContext>& ctx) {
         auto sharedBundles = {
                 PS_SER_BUNDLE_LOC,
                 PS_PSTM_BUNDLE_LOC,
@@ -73,6 +81,51 @@ public:
 };
 
 TEST_F(RemoteServicesIntegrationTestSuite, StartStopFrameworks) {
+    installProviderBundles();
     installConsumerBundles();
+}
+
+TEST_F(RemoteServicesIntegrationTestSuite, InvokeRemoteCalcService) {
     installProviderBundles();
+    installConsumerBundles();
+
+    //If a calculator provider bundle is installed I expect a exported calculator interface
+    auto count = serverCtx->useService<ICalculator>()
+            .setFilter("(service.exported.interfaces=*)")
+            .build();
+    EXPECT_EQ(count, 1);
+
+    //If a calculator consumer bundle is installed and also the needed remote services bundle,  I expect a import calculator interface
+    count = clientCtx->useService<ICalculator>()
+            .setTimeout(std::chrono::seconds{1})
+            .setFilter("(service.imported=*)")
+            .build();
+    EXPECT_EQ(count, 1);
+
+    /*DEBUG INFO*/
+    clientCtx->useService<celix_shell_command>(CELIX_SHELL_COMMAND_SERVICE_NAME)
+            .setFilter((std::string{"("}.append(CELIX_SHELL_COMMAND_NAME).append("=celix::psa_zmq)")))
+            .addUseCallback([](auto& cmd) {
+                cmd.executeCommand(cmd.handle, "psa_zmq", stdout, stdout);
+            })
+            .build();
+    serverCtx->useService<celix_shell_command>(CELIX_SHELL_COMMAND_SERVICE_NAME)
+            .setFilter((std::string{"("}.append(CELIX_SHELL_COMMAND_NAME).append("=celix::psa_zmq)")))
+            .addUseCallback([](auto& cmd) {
+                cmd.executeCommand(cmd.handle, "psa_zmq", stdout, stdout);
+            })
+            .build();
+
+    //When I call the calculator service from the client, I expect a answer
+    count = clientCtx->useService<ICalculator>()
+            .addUseCallback([](auto& calc) {
+                auto promise = calc.add(2, 4);
+                promise.wait();
+                EXPECT_TRUE(promise.isSuccessfullyResolved());
+                if (promise.isSuccessfullyResolved()) {
+                    EXPECT_EQ(6, promise.getValue());
+                }
+            })
+            .build();
+    EXPECT_EQ(count, 1);
 }
\ No newline at end of file
diff --git a/bundles/cxx_remote_services/integration/resources/Calculator$add$Invoke.descriptor b/bundles/cxx_remote_services/integration/resources/Calculator$add$Invoke.descriptor
index 84125ee..74777de 100644
--- a/bundles/cxx_remote_services/integration/resources/Calculator$add$Invoke.descriptor
+++ b/bundles/cxx_remote_services/integration/resources/Calculator$add$Invoke.descriptor
@@ -6,4 +6,4 @@ version=1.0.0
 classname=Calculator$add$Invoke
 :types
 :message
-{DD arg1 arg1}
+{DD arg1 arg2}
diff --git a/bundles/cxx_remote_services/integration/src/TestExportImportRemoteServiceFactory.cc b/bundles/cxx_remote_services/integration/src/TestExportImportRemoteServiceFactory.cc
index f4cd6ce..ce50cbd 100644
--- a/bundles/cxx_remote_services/integration/src/TestExportImportRemoteServiceFactory.cc
+++ b/bundles/cxx_remote_services/integration/src/TestExportImportRemoteServiceFactory.cc
@@ -32,17 +32,17 @@
 constexpr auto INVOKE_TIMEOUT = std::chrono::seconds{5}; //TODO make configurable
 
 struct Calculator$add$Invoke {
-    double arg1;
-    double arg2;
+    double arg1{};
+    double arg2{};
 };
 
 struct Calculator$add$Return {
     struct {
-        uint32_t cap;
-        uint32_t len;
-        double* buf;
-    } optionalReturnValue;
-    char* optionalError;
+        uint32_t cap{};
+        uint32_t len{};
+        double* buf{};
+    } optionalReturnValue{};
+    char* optionalError{};
 };
 
 /**
@@ -81,7 +81,7 @@ public:
             logHelper.error(msg);
             deferred.fail(celix::rsa::RemoteServicesException{msg});
         }
-        return deferred.getPromise().timeout(INVOKE_TIMEOUT);
+        return deferred.getPromise().timeout(INVOKE_TIMEOUT); //TODO fixme timeout can leak
     }
 
     void receive(const char *msgType, unsigned int msgTypeId, void *msg, const celix_properties_t* meta) {
@@ -259,24 +259,34 @@ public:
             std::lock_guard lock{mutex};
             auto promise = calculator->add(invoke->arg1, invoke->arg2);
             promise
-                .onFailure([svc = calculator, pub = publisher, msgId = returnMsgId, metaProps](const auto& exp) {
-                    //note this lambda makes copies of the std::shared_ptr<publisher> and svc, so a remove of publisher or service can only happen after the promise is done
-                    Calculator$add$Return ret;
-                    ret.optionalReturnValue.buf = nullptr;
-                    ret.optionalReturnValue.len = 0;
-                    ret.optionalReturnValue.cap = 0;
-                    ret.optionalError = celix_utils_strdup(exp.what());
-                    pub->send(pub->handle, msgId, &ret, metaProps);
+                .onFailure([weakPub = std::weak_ptr{publisher}, msgId = returnMsgId, metaProps](const auto& exp) {
+                    auto pub = weakPub.lock();
+                    if (pub) {
+                        Calculator$add$Return ret;
+                        ret.optionalReturnValue.buf = nullptr;
+                        ret.optionalReturnValue.len = 0;
+                        ret.optionalReturnValue.cap = 0;
+                        ret.optionalError = celix_utils_strdup(exp.what());
+                        pub->send(pub->handle, msgId, &ret, metaProps);
+                    } else {
+                        //TODO error handling
+                    }
                 })
-                .onSuccess([svc = calculator, pub = publisher, msgId = returnMsgId, metaProps](auto val) {
-                    //note this lambda makes copies of the std::shared_ptr<publisher> and svc, so a remove of publisher or service can only happen after the promise is done
-                    Calculator$add$Return ret;
-                    ret.optionalReturnValue.buf = (double*) malloc(sizeof(*ret.optionalReturnValue.buf));
-                    ret.optionalReturnValue.len = 1;
-                    ret.optionalReturnValue.cap = 1;
-                    ret.optionalReturnValue.buf[0] = val;
-                    ret.optionalError = nullptr;
-                    pub->send(pub->handle, msgId, &ret, metaProps);
+                .onSuccess([weakSvc = std::weak_ptr{calculator}, weakPub = std::weak_ptr{publisher}, msgId = returnMsgId, metaProps](auto val) {
+                    auto pub = weakPub.lock();
+                    auto svc = weakSvc.lock();
+                    if (pub && svc) {
+                        Calculator$add$Return ret;
+                        ret.optionalReturnValue.buf = (double *) malloc(sizeof(*ret.optionalReturnValue.buf));
+                        ret.optionalReturnValue.len = 1;
+                        ret.optionalReturnValue.cap = 1;
+                        ret.optionalReturnValue.buf[0] = val;
+                        ret.optionalError = nullptr;
+                        pub->send(pub->handle, msgId, &ret, metaProps);
+                        free(ret.optionalReturnValue.buf);
+                    } else {
+                        //TODO error handling
+                    }
                 });
         } else {
             logHelper.warning("Unexpected message type %s", msgType);