You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@celix.apache.org by GitBox <gi...@apache.org> on 2021/10/10 12:29:14 UTC

[GitHub] [celix] pnoltes commented on a change in pull request #372: Initial version of test showing remote pushstreams

pnoltes commented on a change in pull request #372:
URL: https://github.com/apache/celix/pull/372#discussion_r725630660



##########
File path: bundles/cxx_remote_services/integration/src/CalculatorProvider.cc
##########
@@ -30,14 +31,57 @@ class CalculatorImpl final : public ICalculator {
     celix::Promise<double> add(double a, double b) override {
         auto deferred = factory->deferred<double>();
         deferred.resolve(a+b);
+
         return deferred.getPromise();
     }
 
+    void setPushStreamProvider(const std::shared_ptr<celix::PushStreamProvider>& provider) {
+        psp = provider;
+    }
+
     void setFactory(const std::shared_ptr<celix::PromiseFactory>& fac) {
         factory = fac;
     }
+
+    std::shared_ptr<celix::PushStream<double>> result() override {
+        return psp->createUnbufferedStream<double>(ses);
+    }
+
+    int init() {
+        return CELIX_SUCCESS;
+    }
+
+    int start() {
+        ses = psp->template createSynchronousEventSource<double>();
+
+        t = std::make_unique<std::thread>([&]() {
+            int counter = 0;
+            stopThread = false;
+            while(!stopThread) {
+                ses->publish((double)counter);
+                counter++;
+                std::this_thread::sleep_for(std::chrono::milliseconds{100});
+            }
+        });
+        return CELIX_SUCCESS;
+    }
+
+    int stop() {
+        stopThread = true;
+        t->join();
+        return CELIX_SUCCESS;
+    }
+
+    int deinit() {
+        return CELIX_SUCCESS;
+    }
+
 private:
+    std::unique_ptr<std::thread> t{};
     std::shared_ptr<celix::PromiseFactory> factory{};
+    std::shared_ptr<celix::PushStreamProvider> psp {};
+    std::shared_ptr<celix::SynchronousPushEventSource<double>> ses {};
+    volatile bool stopThread{false};

Review comment:
       volatile is not thread safe, use std::atomic instead. 

##########
File path: bundles/cxx_remote_services/integration/gtest/src/RemoteServicesIntegrationTestSuite.cc
##########
@@ -115,16 +115,27 @@ TEST_F(RemoteServicesIntegrationTestSuite, InvokeRemoteCalcService) {
                 cmd.executeCommand(cmd.handle, "psa_zmq", stdout, stdout);
             })
             .build();
-
+    std::shared_ptr<celix::PushStream<double>> stream;
     //When I call the calculator service from the client, I expect a answer
+    int streamCount = 0;
+    double lastValue = 0.0;
     count = clientCtx->useService<ICalculator>()
-            .addUseCallback([](auto& calc) {
+            .addUseCallback([&](auto& calc) {
+                stream = calc.result();

Review comment:
       Very nice to see that the use of the PushStream is relatively easy.
   
   IMO reversing the data stream from provider -> user is very unique, useful and in when using the PushStream solution surprisingly clean :)
   
   👍 

##########
File path: bundles/cxx_remote_services/integration/src/TestExportImportRemoteServiceFactory.cc
##########
@@ -29,7 +31,7 @@
 #include "pubsub/publisher.h"
 #include "pubsub/subscriber.h"
 
-constexpr auto INVOKE_TIMEOUT = std::chrono::seconds{10};
+constexpr auto INVOKE_TIMEOUT = std::chrono::seconds{5}; //TODO make configurable

Review comment:
       IMO the timeout for these examples/test do not need to be configureable. 

##########
File path: bundles/cxx_remote_services/integration/gtest/src/RemoteServicesIntegrationTestSuite.cc
##########
@@ -115,16 +115,27 @@ TEST_F(RemoteServicesIntegrationTestSuite, InvokeRemoteCalcService) {
                 cmd.executeCommand(cmd.handle, "psa_zmq", stdout, stdout);
             })
             .build();
-
+    std::shared_ptr<celix::PushStream<double>> stream;
     //When I call the calculator service from the client, I expect a answer
+    int streamCount = 0;
+    double lastValue = 0.0;

Review comment:
       streamCount and lastValue should be made std::atomic, because it is accessed by the main thread and Celix event thread. 

##########
File path: bundles/cxx_remote_services/integration/src/TestExportImportRemoteServiceFactory.cc
##########
@@ -323,6 +389,47 @@ class ExportedCalculator final {
         factory = fac;
     }
 
+    int init() {
+        return CELIX_SUCCESS;
+    }
+
+    int start() {
+        resultStream = calculator->result();
+        resultStream->forEach([weakSvc = std::weak_ptr<ICalculator>{calculator}, weakPub = std::weak_ptr<pubsub_publisher>{publisher}](const double& event){
+            auto pub = weakPub.lock();
+            auto svc = weakSvc.lock();
+            if (pub && svc) {
+                thread_local unsigned int eventMsgId = 0;
+                if (eventMsgId == 0) {
+                    pub->localMsgTypeIdForMsgType(pub->handle, "Calculator$result$Event", &eventMsgId);
+                }
+
+                auto* metaProps = celix_properties_create();
+                celix_properties_set(metaProps, "invoke.id", std::to_string(0).c_str());
+                Calculator$result$Event wireEvent;
+                wireEvent.optionalReturnValue.buf = (double *) malloc(sizeof(*wireEvent.optionalReturnValue.buf));
+                wireEvent.optionalReturnValue.len = 1;
+                wireEvent.optionalReturnValue.cap = 1;
+                wireEvent.optionalReturnValue.buf[0] = event;
+                wireEvent.optionalError = nullptr;
+                pub->send(pub->handle, eventMsgId, &wireEvent, metaProps);
+                free(wireEvent.optionalReturnValue.buf);
+            } else {
+                //TODO error handling

Review comment:
       IMO for example/testing a simple log is enough (publisher and/or Calculator service already gone)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@celix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org