You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by ab...@apache.org on 2020/02/24 10:16:37 UTC
[celix] 01/01: Added protocol service API and wire protocol
implementation to be used by admin (sender/receiver). Actual used service
is matched, similar to serializer.
This is an automated email from the ASF dual-hosted git repository.
abroekhuis pushed a commit to branch feature/pubsubadmin_protocol
in repository https://gitbox.apache.org/repos/asf/celix.git
commit 1a1ffe3112567d008a939089ce30f4eded6e33f0
Author: Alexander Broekhuis <al...@luminis.eu>
AuthorDate: Mon Feb 24 11:16:17 2020 +0100
Added protocol service API and wire protocol implementation to be used by admin (sender/receiver). Actual used service is matched, similar to serializer.
---
bundles/pubsub/CMakeLists.txt | 1 +
bundles/pubsub/examples/CMakeLists.txt | 2 +
.../publisher/private/src/pubsub_publisher.c | 5 +-
.../private/include/pubsub_websocket_private.h | 2 +-
.../private/src/pubsub_websocket_example.c | 4 +-
.../private/include/pubsub_subscriber_private.h | 2 +-
.../subscriber/private/src/pubsub_subscriber.c | 14 +-
bundles/pubsub/mock/src/publisher_mock.cc | 4 +-
bundles/pubsub/mock/tst/pubsubmock_test.cc | 5 +-
.../pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c | 18 +-
.../pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.h | 8 +-
.../src/pubsub_tcp_topic_receiver.c | 2 +-
.../pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c | 4 +-
.../pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c | 18 +-
.../pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h | 8 +-
.../src/pubsub_udpmc_topic_receiver.c | 4 +-
.../src/pubsub_udpmc_topic_sender.c | 4 +-
.../src/pubsub_websocket_admin.c | 18 +-
.../src/pubsub_websocket_admin.h | 8 +-
.../src/pubsub_websocket_topic_receiver.c | 2 +-
.../src/pubsub_websocket_topic_sender.c | 4 +-
bundles/pubsub/pubsub_admin_zmq/CMakeLists.txt | 1 -
.../pubsub/pubsub_admin_zmq/src/psa_activator.c | 16 ++
.../pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c | 164 ++++++++++--
.../pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.h | 11 +-
.../pubsub_admin_zmq/src/pubsub_zmq_common.c | 132 ----------
.../pubsub_admin_zmq/src/pubsub_zmq_common.h | 59 -----
.../src/pubsub_zmq_topic_receiver.c | 191 ++++++++------
.../src/pubsub_zmq_topic_receiver.h | 7 +-
.../pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c | 118 ++++++---
.../pubsub_admin_zmq/src/pubsub_zmq_topic_sender.h | 3 +
.../pubsub/pubsub_api/include/pubsub/publisher.h | 4 +-
.../pubsub/pubsub_api/include/pubsub/subscriber.h | 4 +-
.../pubsub_discovery/src/pubsub_discovery_impl.c | 14 +-
.../pubsub/pubsub_protocol_wire_v1/CMakeLists.txt | 36 +++
.../src/ps_wire_protocol_activator.c | 64 +++++
.../src/pubsub_wire_protocol_common.c | 59 +++++
.../src/pubsub_wire_protocol_common.h} | 29 +--
.../src/pubsub_wire_protocol_impl.c | 287 +++++++++++++++++++++
.../src/pubsub_wire_protocol_impl.h | 43 +++
bundles/pubsub/pubsub_spi/include/pubsub_admin.h | 8 +-
.../pubsub/pubsub_spi/include/pubsub_constants.h | 5 +-
.../pubsub/pubsub_spi/include/pubsub_endpoint.h | 3 +-
.../pubsub/pubsub_spi/include/pubsub_protocol.h | 77 ++++++
bundles/pubsub/pubsub_spi/include/pubsub_utils.h | 12 +-
bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c | 15 +-
bundles/pubsub/pubsub_spi/src/pubsub_utils_match.c | 103 +++++++-
.../src/pubsub_topology_manager.c | 33 ++-
.../src/pubsub_topology_manager.h | 1 +
bundles/pubsub/test/test/loopback_activator.c | 6 +-
bundles/pubsub/test/test/sut_activator.c | 4 +-
bundles/pubsub/test/test/sut_endpoint_activator.c | 3 +-
bundles/pubsub/test/test/tst_activator.c | 4 +-
bundles/pubsub/test/test/tst_endpoint_activator.c | 4 +-
libs/utils/include/celix_properties.h | 2 +
libs/utils/src/properties.c | 15 ++
56 files changed, 1227 insertions(+), 447 deletions(-)
diff --git a/bundles/pubsub/CMakeLists.txt b/bundles/pubsub/CMakeLists.txt
index 1941e49..ca026fa 100644
--- a/bundles/pubsub/CMakeLists.txt
+++ b/bundles/pubsub/CMakeLists.txt
@@ -30,6 +30,7 @@ if (PUBSUB)
add_subdirectory(pubsub_discovery)
add_subdirectory(pubsub_serializer_json)
add_subdirectory(pubsub_serializer_avrobin)
+ add_subdirectory(pubsub_protocol_wire_v1)
add_subdirectory(pubsub_admin_zmq)
add_subdirectory(pubsub_admin_tcp)
add_subdirectory(pubsub_admin_udp_mc)
diff --git a/bundles/pubsub/examples/CMakeLists.txt b/bundles/pubsub/examples/CMakeLists.txt
index b19e327..74a4b57 100644
--- a/bundles/pubsub/examples/CMakeLists.txt
+++ b/bundles/pubsub/examples/CMakeLists.txt
@@ -227,6 +227,7 @@ if (BUILD_PUBSUB_PSA_ZMQ)
Celix::shell
Celix::shell_tui
Celix::pubsub_serializer_json
+ Celix::pubsub_protocol_wire_v1
Celix::pubsub_discovery_etcd
Celix::pubsub_topology_manager
Celix::pubsub_admin_zmq
@@ -246,6 +247,7 @@ if (BUILD_PUBSUB_PSA_ZMQ)
Celix::shell
Celix::shell_tui
Celix::pubsub_serializer_json
+ Celix::pubsub_protocol_wire_v1
Celix::pubsub_discovery_etcd
Celix::pubsub_topology_manager
Celix::pubsub_admin_zmq
diff --git a/bundles/pubsub/examples/pubsub/publisher/private/src/pubsub_publisher.c b/bundles/pubsub/examples/pubsub/publisher/private/src/pubsub_publisher.c
index 42b3197..e326f01 100644
--- a/bundles/pubsub/examples/pubsub/publisher/private/src/pubsub_publisher.c
+++ b/bundles/pubsub/examples/pubsub/publisher/private/src/pubsub_publisher.c
@@ -33,6 +33,7 @@
#include "celix_threads.h"
#include "poi.h"
+#include "hash_map.h"
#include "pubsub_publisher_private.h"
@@ -84,7 +85,9 @@ static void* send_thread(void* arg) {
}
place->data[nr_char - 1] = '\0';
if (publish_svc->send) {
- if (publish_svc->send(publish_svc->handle, msgId, place) == 0) {
+ celix_properties_t *metadata = celix_properties_create();
+ celix_properties_set(metadata, "Key", "Value");
+ if (publish_svc->send(publish_svc->handle, msgId, place, metadata) == 0) {
printf("Sent %s [%f, %f] (%s, %s) data len = %d\n", st_struct->topic,
place->position.lat, place->position.lon, place->name, place->description, nr_char);
}
diff --git a/bundles/pubsub/examples/pubsub/pubsub_websocket/private/include/pubsub_websocket_private.h b/bundles/pubsub/examples/pubsub/pubsub_websocket/private/include/pubsub_websocket_private.h
index 938b6f4..5a26a68 100644
--- a/bundles/pubsub/examples/pubsub/pubsub_websocket/private/include/pubsub_websocket_private.h
+++ b/bundles/pubsub/examples/pubsub/pubsub_websocket/private/include/pubsub_websocket_private.h
@@ -91,7 +91,7 @@ void subscriber_start(pubsub_receiver_t* client);
void subscriber_stop(pubsub_receiver_t* client);
void subscriber_destroy(pubsub_receiver_t* client);
-int pubsub_subscriber_recv(void* handle, const char* msgType, unsigned int msgTypeId, void* msg, bool* release);
+int pubsub_subscriber_recv(void* handle, const char* msgType, unsigned int msgTypeId, void* msg, const celix_properties_t *metadata, bool* release);
#endif /* PUBSUB_WEBSOCKET_PRIVATE_H_ */
diff --git a/bundles/pubsub/examples/pubsub/pubsub_websocket/private/src/pubsub_websocket_example.c b/bundles/pubsub/examples/pubsub/pubsub_websocket/private/src/pubsub_websocket_example.c
index 7b820f0..ad8c0d0 100644
--- a/bundles/pubsub/examples/pubsub/pubsub_websocket/private/src/pubsub_websocket_example.c
+++ b/bundles/pubsub/examples/pubsub/pubsub_websocket/private/src/pubsub_websocket_example.c
@@ -87,7 +87,7 @@ static void* send_thread(void* arg) {
}
place->data[nr_char - 1] = '\0';
if (publish_svc->send) {
- if (publish_svc->send(publish_svc->handle, msgId, place) == 0) {
+ if (publish_svc->send(publish_svc->handle, msgId, place, NULL) == 0) {
printf("Sent %s [%f, %f] (%s, %s) data len = %d\n", st_struct->topic,
place->position.lat, place->position.lon, place->name, place->description, nr_char);
}
@@ -193,7 +193,7 @@ void subscriber_destroy(pubsub_receiver_t *subscriber) {
free(subscriber);
}
-int pubsub_subscriber_recv(void* handle, const char* msgType, unsigned int msgTypeId, void* msg, bool* release) {
+int pubsub_subscriber_recv(void* handle, const char* msgType, unsigned int msgTypeId, void* msg, const celix_properties_t *metadata, bool* release) {
poi_cmd_t *cmd = (poi_cmd_t *) msg;
pubsub_info_t *pubsub = (pubsub_info_t *) handle;
printf("Received command %s\n", cmd->command);
diff --git a/bundles/pubsub/examples/pubsub/subscriber/private/include/pubsub_subscriber_private.h b/bundles/pubsub/examples/pubsub/subscriber/private/include/pubsub_subscriber_private.h
index 77da5d2..feb1f55 100644
--- a/bundles/pubsub/examples/pubsub/subscriber/private/include/pubsub_subscriber_private.h
+++ b/bundles/pubsub/examples/pubsub/subscriber/private/include/pubsub_subscriber_private.h
@@ -45,7 +45,7 @@ void subscriber_start(pubsub_receiver_t* client);
void subscriber_stop(pubsub_receiver_t* client);
void subscriber_destroy(pubsub_receiver_t* client);
-int pubsub_subscriber_recv(void* handle, const char* msgType, unsigned int msgTypeId, void* msg, bool* release);
+int pubsub_subscriber_recv(void* handle, const char* msgType, unsigned int msgTypeId, void* msg, const celix_properties_t *metadata, bool* release);
#endif /* PUBSUB_SUBSCRIBER_PRIVATE_H_ */
diff --git a/bundles/pubsub/examples/pubsub/subscriber/private/src/pubsub_subscriber.c b/bundles/pubsub/examples/pubsub/subscriber/private/src/pubsub_subscriber.c
index 9f3ca5e..272ff74 100644
--- a/bundles/pubsub/examples/pubsub/subscriber/private/src/pubsub_subscriber.c
+++ b/bundles/pubsub/examples/pubsub/subscriber/private/src/pubsub_subscriber.c
@@ -26,6 +26,8 @@
#include <stdlib.h>
#include <stdio.h>
+#include <hash_map.h>
+#include <celix_properties.h>
#include "poi.h"
#include "pubsub_subscriber_private.h"
@@ -53,9 +55,19 @@ void subscriber_destroy(pubsub_receiver_t *subscriber) {
free(subscriber);
}
-int pubsub_subscriber_recv(void* handle, const char* msgType, unsigned int msgTypeId, void* msg, bool* release) {
+int pubsub_subscriber_recv(void* handle, const char* msgType, unsigned int msgTypeId, void* msg, const celix_properties_t *metadata, bool* release) {
location_t place = (location_t)msg;
printf("Recv (%s): [%f, %f] (%s, %s, %s, len data %li)\n", msgType, place->position.lat, place->position.lon, place->name, place->description, place->extra, (long)(strlen(place->data) + 1));
+ if (metadata == NULL || celix_properties_size(metadata) == 0) {
+ printf("No metadata\n");
+ } else {
+ const char *key;
+ CELIX_PROPERTIES_FOR_EACH(metadata, key) {
+ const char *val = celix_properties_get(metadata, key, "!Error!");
+ printf("%s=%s\n", key, val);
+ }
+ }
+
return 0;
}
diff --git a/bundles/pubsub/mock/src/publisher_mock.cc b/bundles/pubsub/mock/src/publisher_mock.cc
index 0cf2e1f..8d6f7e5 100644
--- a/bundles/pubsub/mock/src/publisher_mock.cc
+++ b/bundles/pubsub/mock/src/publisher_mock.cc
@@ -17,6 +17,7 @@
* under the License.
*/
+#include <celix_properties.h>
#include "pubsub/publisher_mock.h"
#include "CppUTest/TestHarness.h"
#include "CppUTestExt/MockSupport.h"
@@ -36,12 +37,13 @@ static int pubsub__publisherMock_localMsgTypeIdForMsgType(void *handle, const ch
/*============================================================================
MOCK - mock function for pubsub_publisher->send
============================================================================*/
-static int pubsub__publisherMock_send(void *handle, unsigned int msgTypeId, const void *msg) {
+static int pubsub__publisherMock_send(void *handle, unsigned int msgTypeId, const void *msg, celix_properties_t *metadata) {
return mock(PUBSUB_PUBLISHERMOCK_SCOPE)
.actualCall(PUBSUB_PUBLISHERMOCK_SEND_METHOD)
.withPointerParameter("handle", handle)
.withParameter("msgTypeId", msgTypeId)
.withPointerParameter("msg", (void*)msg)
+ .withPointerParameter("metadata", (void*)metadata)
.returnIntValue();
}
diff --git a/bundles/pubsub/mock/tst/pubsubmock_test.cc b/bundles/pubsub/mock/tst/pubsubmock_test.cc
index 462bb2c..64a8afb 100644
--- a/bundles/pubsub/mock/tst/pubsubmock_test.cc
+++ b/bundles/pubsub/mock/tst/pubsubmock_test.cc
@@ -28,6 +28,7 @@
#include "pubsub/publisher_mock.h"
+#include "celix_utils_api.h"
static pubsub_publisher_t mockSrv;
static void* mockHandle = (void*)0x42;
@@ -69,8 +70,8 @@ TEST(pubsubmock, publishermock) {
CHECK(msgId != 0);
//set msg
- void *dummyMsg = (void*)0x43;
- srv->send(srv->handle, msgId, dummyMsg); //should satisfy the expectOneCalls
+ void *dummyMsg = (void*)0x43;
+ srv->send(srv->handle, msgId, dummyMsg, NULL); //should satisfy the expectOneCalls
//srv->send(srv->handle, msgId, dummyMsg); //enabling this should fail the test
}
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c
index 5c871de..3df67c2 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c
@@ -298,23 +298,23 @@ void pubsub_tcpAdmin_removeSerializerSvc(void *handle, void *svc, const celix_pr
}
}
-celix_status_t pubsub_tcpAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, celix_properties_t **topicProperties, double *outScore, long *outSerializerSvcId) {
+celix_status_t pubsub_tcpAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, celix_properties_t **topicProperties, double *outScore, long *outSerializerSvcId, long *outProtocolSvcId) {
pubsub_tcp_admin_t *psa = handle;
L_DEBUG("[PSA_TCP] pubsub_tcpAdmin_matchPublisher");
celix_status_t status = CELIX_SUCCESS;
double score = pubsub_utils_matchPublisher(psa->ctx, svcRequesterBndId, svcFilter->filterStr, PUBSUB_TCP_ADMIN_TYPE,
- psa->qosSampleScore, psa->qosControlScore, psa->defaultScore, topicProperties, outSerializerSvcId);
+ psa->qosSampleScore, psa->qosControlScore, psa->defaultScore, false, topicProperties, outSerializerSvcId, outProtocolSvcId);
*outScore = score;
return status;
}
-celix_status_t pubsub_tcpAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, celix_properties_t **topicProperties, double *outScore, long *outSerializerSvcId) {
+celix_status_t pubsub_tcpAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, celix_properties_t **topicProperties, double *outScore, long *outSerializerSvcId, long *outProtocolSvcId) {
pubsub_tcp_admin_t *psa = handle;
L_DEBUG("[PSA_TCP] pubsub_tcpAdmin_matchSubscriber");
celix_status_t status = CELIX_SUCCESS;
double score = pubsub_utils_matchSubscriber(psa->ctx, svcProviderBndId, svcProperties, PUBSUB_TCP_ADMIN_TYPE,
- psa->qosSampleScore, psa->qosControlScore, psa->defaultScore, topicProperties, outSerializerSvcId);
+ psa->qosSampleScore, psa->qosControlScore, psa->defaultScore, false, topicProperties, outSerializerSvcId, outProtocolSvcId);
if (outScore != NULL) {
*outScore = score;
}
@@ -325,14 +325,14 @@ celix_status_t pubsub_tcpAdmin_matchDiscoveredEndpoint(void *handle, const celix
pubsub_tcp_admin_t *psa = handle;
L_DEBUG("[PSA_TCP] pubsub_tcpAdmin_matchEndpoint");
celix_status_t status = CELIX_SUCCESS;
- bool match = pubsub_utils_matchEndpoint(psa->ctx, endpoint, PUBSUB_TCP_ADMIN_TYPE, NULL);
+ bool match = pubsub_utils_matchEndpoint(psa->ctx, endpoint, PUBSUB_TCP_ADMIN_TYPE, false, NULL, NULL);
if (outMatch != NULL) {
*outMatch = match;
}
return status;
}
-celix_status_t pubsub_tcpAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, celix_properties_t **outPublisherEndpoint) {
+celix_status_t pubsub_tcpAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, long protocolSvcId, celix_properties_t **outPublisherEndpoint) {
pubsub_tcp_admin_t *psa = handle;
celix_status_t status = CELIX_SUCCESS;
@@ -368,7 +368,7 @@ celix_status_t pubsub_tcpAdmin_setupTopicSender(void *handle, const char *scope,
if (sender != NULL) {
const char *psaType = PUBSUB_TCP_ADMIN_TYPE;
const char *serType = serEntry->serType;
- newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType, serType, NULL);
+ newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType, serType, NULL, NULL);
celix_properties_set(newEndpoint, PUBSUB_TCP_URL_KEY, pubsub_tcpTopicSender_url(sender));
//if configured use a static discover url
@@ -432,7 +432,7 @@ celix_status_t pubsub_tcpAdmin_teardownTopicSender(void *handle, const char *sco
return status;
}
-celix_status_t pubsub_tcpAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, celix_properties_t **outSubscriberEndpoint) {
+celix_status_t pubsub_tcpAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, long protocolSvcId, celix_properties_t **outSubscriberEndpoint) {
pubsub_tcp_admin_t *psa = handle;
celix_properties_t *newEndpoint = NULL;
@@ -453,7 +453,7 @@ celix_status_t pubsub_tcpAdmin_setupTopicReceiver(void *handle, const char *scop
const char *psaType = PUBSUB_TCP_ADMIN_TYPE;
const char *serType = serEntry->serType;
newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic,
- PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType, serType, NULL);
+ PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType, serType, NULL, NULL);
//if available also set container name
const char *cn = celix_bundleContext_getProperty(psa->ctx, "CELIX_CONTAINER_NAME", NULL);
if (cn != NULL) {
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.h b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.h
index 7ed1702..87a1fe7 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.h
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.h
@@ -30,14 +30,14 @@ typedef struct pubsub_tcp_admin pubsub_tcp_admin_t;
pubsub_tcp_admin_t* pubsub_tcpAdmin_create(celix_bundle_context_t *ctx, log_helper_t *logHelper);
void pubsub_tcpAdmin_destroy(pubsub_tcp_admin_t *psa);
-celix_status_t pubsub_tcpAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, celix_properties_t **topicProperties, double *score, long *serializerSvcId);
-celix_status_t pubsub_tcpAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, celix_properties_t **topicProperties, double *score, long *serializerSvcId);
+celix_status_t pubsub_tcpAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, celix_properties_t **topicProperties, double *score, long *serializerSvcId, long *protocolSvcId);
+celix_status_t pubsub_tcpAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, celix_properties_t **topicProperties, double *score, long *serializerSvcId, long *protocolSvcId);
celix_status_t pubsub_tcpAdmin_matchDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint, bool *match);
-celix_status_t pubsub_tcpAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, const celix_properties_t* topicProperties, long serializerSvcId, celix_properties_t **publisherEndpoint);
+celix_status_t pubsub_tcpAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, const celix_properties_t* topicProperties, long serializerSvcId, long protocolSvcId, celix_properties_t **publisherEndpoint);
celix_status_t pubsub_tcpAdmin_teardownTopicSender(void *handle, const char *scope, const char *topic);
-celix_status_t pubsub_tcpAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, const celix_properties_t* topicProperties, long serializerSvcId, celix_properties_t **subscriberEndpoint);
+celix_status_t pubsub_tcpAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, const celix_properties_t* topicProperties, long serializerSvcId, long protocolSvcId, celix_properties_t **subscriberEndpoint);
celix_status_t pubsub_tcpAdmin_teardownTopicReceiver(void *handle, const char *scope, const char *topic);
celix_status_t pubsub_tcpAdmin_addDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint);
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
index 23af6c3..8e61291 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
@@ -498,7 +498,7 @@ processMsgForSubscriberEntry(pubsub_tcp_topic_receiver_t *receiver, psa_tcp_subs
}
if (status == CELIX_SUCCESS) {
bool release = true;
- svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deserializedMsg, &release);
+ svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deserializedMsg, NULL, &release);
if (release) {
msgSer->freeMsg(msgSer->handle, deserializedMsg);
}
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
index 6795af1..fe4b8fa 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
@@ -112,7 +112,7 @@ static void psa_tcp_ungetPublisherService(void *handle, const celix_bundle_t *re
static unsigned int rand_range(unsigned int min, unsigned int max);
static void delay_first_send_for_late_joiners(pubsub_tcp_topic_sender_t *sender);
static void *psa_tcp_sendThread(void *data);
-static int psa_tcp_topicPublicationSend(void *handle, unsigned int msgTypeId, const void *msg);
+static int psa_tcp_topicPublicationSend(void *handle, unsigned int msgTypeId, const void *msg, celix_properties_t *metadata);
pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
celix_bundle_context_t *ctx,
@@ -477,7 +477,7 @@ pubsub_admin_sender_metrics_t *pubsub_tcpTopicSender_metrics(pubsub_tcp_topic_se
return result;
}
-static int psa_tcp_topicPublicationSend(void *handle, unsigned int msgTypeId, const void *inMsg) {
+static int psa_tcp_topicPublicationSend(void *handle, unsigned int msgTypeId, const void *inMsg, celix_properties_t *metadata) {
int status = CELIX_SUCCESS;
psa_tcp_bounded_service_entry_t *bound = handle;
pubsub_tcp_topic_sender_t *sender = bound->parent;
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c
index 7dfd00f..87e4e63 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c
@@ -254,23 +254,23 @@ void pubsub_udpmcAdmin_destroy(pubsub_udpmc_admin_t *psa) {
free(psa);
}
-celix_status_t pubsub_udpmcAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, celix_properties_t **topicProperties, double *outScore, long *outSerializerSvcId) {
+celix_status_t pubsub_udpmcAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, celix_properties_t **topicProperties, double *outScore, long *outSerializerSvcId, long *outProtocolSvcId) {
pubsub_udpmc_admin_t *psa = handle;
L_DEBUG("[PSA_UDPMC] pubsub_udpmcAdmin_matchPublisher");
celix_status_t status = CELIX_SUCCESS;
double score = pubsub_utils_matchPublisher(psa->ctx, svcRequesterBndId, svcFilter->filterStr, PUBSUB_UDPMC_ADMIN_TYPE,
- psa->qosSampleScore, psa->qosControlScore, psa->defaultScore, topicProperties, outSerializerSvcId);
+ psa->qosSampleScore, psa->qosControlScore, psa->defaultScore, false, topicProperties, outSerializerSvcId, outProtocolSvcId);
*outScore = score;
return status;
}
-celix_status_t pubsub_udpmcAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, celix_properties_t **topicProperties, double *outScore, long *outSerializerSvcId) {
+celix_status_t pubsub_udpmcAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, celix_properties_t **topicProperties, double *outScore, long *outSerializerSvcId, long *outProtocolSvcId) {
pubsub_udpmc_admin_t *psa = handle;
L_DEBUG("[PSA_UDPMC] pubsub_udpmcAdmin_matchSubscriber");
celix_status_t status = CELIX_SUCCESS;
double score = pubsub_utils_matchSubscriber(psa->ctx, svcProviderBndId, svcProperties, PUBSUB_UDPMC_ADMIN_TYPE,
- psa->qosSampleScore, psa->qosControlScore, psa->defaultScore, topicProperties, outSerializerSvcId);
+ psa->qosSampleScore, psa->qosControlScore, psa->defaultScore, false, topicProperties, outSerializerSvcId, outProtocolSvcId);
if (outScore != NULL) {
*outScore = score;
}
@@ -281,14 +281,14 @@ celix_status_t pubsub_udpmcAdmin_matchEndpoint(void *handle, const celix_propert
pubsub_udpmc_admin_t *psa = handle;
L_DEBUG("[PSA_UDPMC] pubsub_udpmcAdmin_matchEndpoint");
celix_status_t status = CELIX_SUCCESS;
- bool match = pubsub_utils_matchEndpoint(psa->ctx, endpoint, PUBSUB_UDPMC_ADMIN_TYPE, NULL);
+ bool match = pubsub_utils_matchEndpoint(psa->ctx, endpoint, PUBSUB_UDPMC_ADMIN_TYPE, false, NULL, NULL);
if (outMatch != NULL) {
*outMatch = match;
}
return status;
}
-celix_status_t pubsub_udpmcAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProps, long serializerSvcId, celix_properties_t **outPublisherEndpoint) {
+celix_status_t pubsub_udpmcAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProps, long serializerSvcId, long protocolSvcId, celix_properties_t **outPublisherEndpoint) {
pubsub_udpmc_admin_t *psa = handle;
celix_status_t status = CELIX_SUCCESS;
@@ -311,7 +311,7 @@ celix_status_t pubsub_udpmcAdmin_setupTopicSender(void *handle, const char *scop
if (sender != NULL) {
const char *psaType = PSA_UDPMC_PUBSUB_ADMIN_TYPE;
const char *serType = serEntry->serType;
- newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType, serType, NULL);
+ newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType, serType, NULL, NULL);
celix_properties_set(newEndpoint, PUBSUB_UDPMC_SOCKET_ADDRESS_KEY, pubsub_udpmcTopicSender_socketAddress(sender));
celix_properties_setLong(newEndpoint, PUBSUB_UDPMC_SOCKET_PORT_KEY, pubsub_udpmcTopicSender_socketPort(sender));
//if available also set container name
@@ -369,7 +369,7 @@ celix_status_t pubsub_udpmcAdmin_teardownTopicSender(void *handle, const char *s
return status;
}
-celix_status_t pubsub_udpmcAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProps, long serializerSvcId, celix_properties_t **outSubscriberEndpoint) {
+celix_status_t pubsub_udpmcAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProps, long serializerSvcId, long protocolSvcId, celix_properties_t **outSubscriberEndpoint) {
pubsub_udpmc_admin_t *psa = handle;
celix_properties_t *newEndpoint = NULL;
@@ -387,7 +387,7 @@ celix_status_t pubsub_udpmcAdmin_setupTopicReceiver(void *handle, const char *sc
const char *psaType = PSA_UDPMC_PUBSUB_ADMIN_TYPE;
const char *serType = serEntry->serType;
newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic,
- PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType, serType, NULL);
+ PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType, serType, NULL, NULL);
//if available also set container name
const char *cn = celix_bundleContext_getProperty(psa->ctx, "CELIX_CONTAINER_NAME", NULL);
if (cn != NULL) {
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h
index 6a42c5e..ac28c63 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h
@@ -29,14 +29,14 @@ typedef struct pubsub_udpmc_admin pubsub_udpmc_admin_t;
pubsub_udpmc_admin_t* pubsub_udpmcAdmin_create(celix_bundle_context_t *ctx, log_helper_t *logHelper);
void pubsub_udpmcAdmin_destroy(pubsub_udpmc_admin_t *psa);
-celix_status_t pubsub_udpmcAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, celix_properties_t **topicProperties, double *score, long *serializerSvcId);
-celix_status_t pubsub_udpmcAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, celix_properties_t **topicProperties, double *score, long *serializerSvcId);
+celix_status_t pubsub_udpmcAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, celix_properties_t **topicProperties, double *score, long *serializerSvcId, long *protocolSvcId);
+celix_status_t pubsub_udpmcAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, celix_properties_t **topicProperties, double *score, long *serializerSvcId, long *protocolSvcId);
celix_status_t pubsub_udpmcAdmin_matchEndpoint(void *handle, const celix_properties_t *endpoint, bool *match);
-celix_status_t pubsub_udpmcAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, celix_properties_t **publisherEndpoint);
+celix_status_t pubsub_udpmcAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, long protocolSvcId, celix_properties_t **publisherEndpoint);
celix_status_t pubsub_udpmcAdmin_teardownTopicSender(void *handle, const char *scope, const char *topic);
-celix_status_t pubsub_udpmcAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, celix_properties_t **subscriberEndpoint);
+celix_status_t pubsub_udpmcAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, long protocolSvcId, celix_properties_t **subscriberEndpoint);
celix_status_t pubsub_udpmcAdmin_teardownTopicReceiver(void *handle, const char *scope, const char *topic);
void pubsub_udpmcAdmin_addSerializerSvc(void *handle, void *svc, const celix_properties_t *props);
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c
index 0cf9b7e..faf40eb 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c
@@ -32,6 +32,8 @@
#include "large_udp.h"
#include "pubsub_udpmc_common.h"
+#include "hash_map.h"
+
#define MAX_EPOLL_EVENTS 10
#define RECV_THREAD_TIMEOUT 5
#define UDP_BUFFER_SIZE 65535
@@ -447,7 +449,7 @@ static void psa_udpmc_processMsg(pubsub_udpmc_topic_receiver_t *receiver, pubsub
if (status == CELIX_SUCCESS) {
bool release = true;
pubsub_subscriber_t *svc = entry->svc;
- svc->receive(svc->handle, msgSer->msgName, msg->header.type, msgInst, &release);
+ svc->receive(svc->handle, msgSer->msgName, msg->header.type, msgInst, NULL, &release);
if (release) {
msgSer->freeMsg(msgSer->handle, msgInst);
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c
index 97c62eb..876700e 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c
@@ -81,7 +81,7 @@ typedef struct pubsub_msg {
static int psa_udpmc_localMsgTypeIdForMsgType(void* handle, const char* msgType, unsigned int* msgTypeId);
static void* psa_udpmc_getPublisherService(void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties);
static void psa_udpmc_ungetPublisherService(void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties);
-static int psa_udpmc_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *inMsg);
+static int psa_udpmc_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *inMsg, celix_properties_t *metadata);
static bool psa_udpmc_sendMsg(psa_udpmc_bounded_service_entry_t *entry, pubsub_udp_msg_t* msg);
static unsigned int rand_range(unsigned int min, unsigned int max);
@@ -267,7 +267,7 @@ static void psa_udpmc_ungetPublisherService(void *handle, const celix_bundle_t *
celixThreadMutex_unlock(&sender->boundedServices.mutex);
}
-static int psa_udpmc_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *inMsg) {
+static int psa_udpmc_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *inMsg, celix_properties_t *metadata) {
psa_udpmc_bounded_service_entry_t *entry = handle;
int status = 0;
diff --git a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.c b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.c
index 6fa32b7..5d61e82 100644
--- a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.c
+++ b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.c
@@ -231,25 +231,25 @@ void pubsub_websocketAdmin_removeSerializerSvc(void *handle, void *svc, const ce
}
}
-celix_status_t pubsub_websocketAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, celix_properties_t **topicProperties, double *outScore, long *outSerializerSvcId) {
+celix_status_t pubsub_websocketAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, celix_properties_t **topicProperties, double *outScore, long *outSerializerSvcId, long *outProtocolSvcId) {
pubsub_websocket_admin_t *psa = handle;
L_DEBUG("[PSA_WEBSOCKET] pubsub_websocketAdmin_matchPublisher");
celix_status_t status = CELIX_SUCCESS;
double score = pubsub_utils_matchPublisher(psa->ctx, svcRequesterBndId, svcFilter->filterStr, PUBSUB_WEBSOCKET_ADMIN_TYPE,
psa->qosSampleScore, psa->qosControlScore, psa->defaultScore,
- topicProperties, outSerializerSvcId);
+ false, topicProperties, outSerializerSvcId, outProtocolSvcId);
*outScore = score;
return status;
}
-celix_status_t pubsub_websocketAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, celix_properties_t **topicProperties, double *outScore, long *outSerializerSvcId) {
+celix_status_t pubsub_websocketAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, celix_properties_t **topicProperties, double *outScore, long *outSerializerSvcId, long *outProtocolSvcId) {
pubsub_websocket_admin_t *psa = handle;
L_DEBUG("[PSA_WEBSOCKET] pubsub_websocketAdmin_matchSubscriber");
celix_status_t status = CELIX_SUCCESS;
double score = pubsub_utils_matchSubscriber(psa->ctx, svcProviderBndId, svcProperties, PUBSUB_WEBSOCKET_ADMIN_TYPE,
psa->qosSampleScore, psa->qosControlScore, psa->defaultScore,
- topicProperties, outSerializerSvcId);
+ false, topicProperties, outSerializerSvcId, outProtocolSvcId);
if (outScore != NULL) {
*outScore = score;
}
@@ -260,14 +260,14 @@ celix_status_t pubsub_websocketAdmin_matchDiscoveredEndpoint(void *handle, const
pubsub_websocket_admin_t *psa = handle;
L_DEBUG("[PSA_WEBSOCKET] pubsub_websocketAdmin_matchEndpoint");
celix_status_t status = CELIX_SUCCESS;
- bool match = pubsub_utils_matchEndpoint(psa->ctx, endpoint, PUBSUB_WEBSOCKET_ADMIN_TYPE, NULL);
+ bool match = pubsub_utils_matchEndpoint(psa->ctx, endpoint, PUBSUB_WEBSOCKET_ADMIN_TYPE, false, NULL, NULL);
if (outMatch != NULL) {
*outMatch = match;
}
return status;
}
-celix_status_t pubsub_websocketAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, celix_properties_t **outPublisherEndpoint) {
+celix_status_t pubsub_websocketAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, long protocolSvcId, celix_properties_t **outPublisherEndpoint) {
pubsub_websocket_admin_t *psa = handle;
celix_status_t status = CELIX_SUCCESS;
@@ -292,7 +292,7 @@ celix_status_t pubsub_websocketAdmin_setupTopicSender(void *handle, const char *
const char *psaType = PUBSUB_WEBSOCKET_ADMIN_TYPE;
const char *serType = serEntry->serType;
newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType,
- serType, NULL);
+ serType, NULL, NULL);
//Set endpoint visibility to local because the http server handles discovery
celix_properties_set(newEndpoint, PUBSUB_ENDPOINT_VISIBILITY, PUBSUB_ENDPOINT_LOCAL_VISIBILITY);
@@ -345,7 +345,7 @@ celix_status_t pubsub_websocketAdmin_teardownTopicSender(void *handle, const cha
return status;
}
-celix_status_t pubsub_websocketAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, celix_properties_t **outSubscriberEndpoint) {
+celix_status_t pubsub_websocketAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, long protocolSvcId, celix_properties_t **outSubscriberEndpoint) {
pubsub_websocket_admin_t *psa = handle;
celix_properties_t *newEndpoint = NULL;
@@ -365,7 +365,7 @@ celix_status_t pubsub_websocketAdmin_setupTopicReceiver(void *handle, const char
const char *psaType = PUBSUB_WEBSOCKET_ADMIN_TYPE;
const char *serType = serEntry->serType;
newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic,
- PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType, serType, NULL);
+ PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType, serType, NULL, NULL);
//Set endpoint visibility to local because the http server handles discovery
celix_properties_set(newEndpoint, PUBSUB_ENDPOINT_VISIBILITY, PUBSUB_ENDPOINT_LOCAL_VISIBILITY);
diff --git a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.h b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.h
index c718a3b..36495dc 100644
--- a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.h
+++ b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.h
@@ -30,14 +30,14 @@ typedef struct pubsub_websocket_admin pubsub_websocket_admin_t;
pubsub_websocket_admin_t* pubsub_websocketAdmin_create(celix_bundle_context_t *ctx, log_helper_t *logHelper);
void pubsub_websocketAdmin_destroy(pubsub_websocket_admin_t *psa);
-celix_status_t pubsub_websocketAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, celix_properties_t **topicProperties, double *score, long *serializerSvcId);
-celix_status_t pubsub_websocketAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, celix_properties_t **topicProperties, double *score, long *serializerSvcId);
+celix_status_t pubsub_websocketAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, celix_properties_t **topicProperties, double *score, long *serializerSvcId, long *protocolSvcId);
+celix_status_t pubsub_websocketAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, celix_properties_t **topicProperties, double *score, long *serializerSvcId, long *protocolSvcId);
celix_status_t pubsub_websocketAdmin_matchDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint, bool *match);
-celix_status_t pubsub_websocketAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, const celix_properties_t* topicProperties, long serializerSvcId, celix_properties_t **publisherEndpoint);
+celix_status_t pubsub_websocketAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, const celix_properties_t* topicProperties, long serializerSvcId, long protocolSvcId, celix_properties_t **publisherEndpoint);
celix_status_t pubsub_websocketAdmin_teardownTopicSender(void *handle, const char *scope, const char *topic);
-celix_status_t pubsub_websocketAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, const celix_properties_t* topicProperties, long serializerSvcId, celix_properties_t **subscriberEndpoint);
+celix_status_t pubsub_websocketAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, const celix_properties_t* topicProperties, long serializerSvcId, long protocolSvcId, celix_properties_t **subscriberEndpoint);
celix_status_t pubsub_websocketAdmin_teardownTopicReceiver(void *handle, const char *scope, const char *topic);
celix_status_t pubsub_websocketAdmin_addDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint);
diff --git a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.c b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.c
index b6dfa71..c45ce57 100644
--- a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.c
@@ -477,7 +477,7 @@ static inline void processMsgForSubscriberEntry(pubsub_websocket_topic_receiver_
if (status == CELIX_SUCCESS) {
bool release = true;
- svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deserializedMsg, &release);
+ svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deserializedMsg, NULL, &release);
if (release) {
msgSer->freeMsg(msgSer->handle, deserializedMsg);
}
diff --git a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_sender.c b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_sender.c
index b533567..af10a1b 100644
--- a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_sender.c
@@ -92,7 +92,7 @@ static void* psa_websocket_getPublisherService(void *handle, const celix_bundle_
static void psa_websocket_ungetPublisherService(void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties);
static void delay_first_send_for_late_joiners(pubsub_websocket_topic_sender_t *sender);
-static int psa_websocket_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *msg);
+static int psa_websocket_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *msg, celix_properties_t *metadata);
static void psa_websocketTopicSender_ready(struct mg_connection *connection, void *handle);
static void psa_websocketTopicSender_close(const struct mg_connection *connection, void *handle);
@@ -295,7 +295,7 @@ static void psa_websocket_ungetPublisherService(void *handle, const celix_bundle
celixThreadMutex_unlock(&sender->boundedServices.mutex);
}
-static int psa_websocket_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *inMsg) {
+static int psa_websocket_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *inMsg, celix_properties_t *metadata) {
int status = CELIX_SERVICE_EXCEPTION;
psa_websocket_bounded_service_entry_t *bound = handle;
pubsub_websocket_topic_sender_t *sender = bound->parent;
diff --git a/bundles/pubsub/pubsub_admin_zmq/CMakeLists.txt b/bundles/pubsub/pubsub_admin_zmq/CMakeLists.txt
index 3aeba26..ecd94d6 100644
--- a/bundles/pubsub/pubsub_admin_zmq/CMakeLists.txt
+++ b/bundles/pubsub/pubsub_admin_zmq/CMakeLists.txt
@@ -42,7 +42,6 @@ if (BUILD_PUBSUB_PSA_ZMQ)
src/pubsub_zmq_admin.c
src/pubsub_zmq_topic_sender.c
src/pubsub_zmq_topic_receiver.c
- src/pubsub_zmq_common.c
${ZMQ_CRYPTO_C}
)
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/psa_activator.c b/bundles/pubsub/pubsub_admin_zmq/src/psa_activator.c
index 3e66f42..a70763c 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/psa_activator.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/psa_activator.c
@@ -21,6 +21,7 @@
#include "celix_api.h"
#include "pubsub_serializer.h"
+#include "pubsub_protocol.h"
#include "log_helper.h"
#include "pubsub_admin.h"
@@ -35,6 +36,8 @@ typedef struct psa_zmq_activator {
long serializersTrackerId;
+ long protocolsTrackerId;
+
pubsub_admin_service_t adminService;
long adminSvcId;
@@ -49,6 +52,7 @@ int psa_zmq_start(psa_zmq_activator_t *act, celix_bundle_context_t *ctx) {
act->adminSvcId = -1L;
act->cmdSvcId = -1L;
act->serializersTrackerId = -1L;
+ act->protocolsTrackerId = -1L;
logHelper_create(ctx, &act->logHelper);
logHelper_start(act->logHelper);
@@ -67,6 +71,17 @@ int psa_zmq_start(psa_zmq_activator_t *act, celix_bundle_context_t *ctx) {
act->serializersTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
}
+ //track protocols
+ if (status == CELIX_SUCCESS) {
+ celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
+ opts.filter.serviceName = PUBSUB_PROTOCOL_SERVICE_NAME;
+ opts.filter.ignoreServiceLanguage = true;
+ opts.callbackHandle = act->admin;
+ opts.addWithProperties = pubsub_zmqAdmin_addProtocolSvc;
+ opts.removeWithProperties = pubsub_zmqAdmin_removeProtocolSvc;
+ act->protocolsTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
+ }
+
//register pubsub admin service
if (status == CELIX_SUCCESS) {
pubsub_admin_service_t *psaSvc = &act->adminService;
@@ -116,6 +131,7 @@ int psa_zmq_stop(psa_zmq_activator_t *act, celix_bundle_context_t *ctx) {
celix_bundleContext_unregisterService(ctx, act->cmdSvcId);
celix_bundleContext_unregisterService(ctx, act->adminMetricsSvcId);
celix_bundleContext_stopTracker(ctx, act->serializersTrackerId);
+ celix_bundleContext_stopTracker(ctx, act->protocolsTrackerId);
pubsub_zmqAdmin_destroy(act->admin);
logHelper_stop(act->logHelper);
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
index e7241e6..b251cae 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
@@ -26,6 +26,7 @@
#include <pubsub_endpoint.h>
#include <czmq.h>
#include <pubsub_serializer.h>
+#include <pubsub_protocol.h>
#include <ip_utils.h>
#include "pubsub_utils.h"
@@ -67,6 +68,11 @@ struct pubsub_zmq_admin {
struct {
celix_thread_mutex_t mutex;
+ hash_map_t *map; //key = svcId, value = psa_zmq_protocol_entry_t*
+ } protocols;
+
+ struct {
+ celix_thread_mutex_t mutex;
hash_map_t *map; //key = scope:topic key, value = pubsub_zmq_topic_sender_t*
} topicSenders;
@@ -88,6 +94,12 @@ typedef struct psa_zmq_serializer_entry {
pubsub_serializer_service_t *svc;
} psa_zmq_serializer_entry_t;
+typedef struct psa_zmq_protocol_entry {
+ const char *protType;
+ long svcId;
+ pubsub_protocol_service_t *svc;
+} psa_zmq_protocol_entry_t;
+
static celix_status_t zmq_getIpAddress(const char* interface, char** ip);
static celix_status_t pubsub_zmqAdmin_connectEndpointToReceiver(pubsub_zmq_admin_t* psa, pubsub_zmq_topic_receiver_t *receiver, const celix_properties_t *endpoint);
static celix_status_t pubsub_zmqAdmin_disconnectEndpointFromReceiver(pubsub_zmq_admin_t* psa, pubsub_zmq_topic_receiver_t *receiver, const celix_properties_t *endpoint);
@@ -177,6 +189,9 @@ pubsub_zmq_admin_t* pubsub_zmqAdmin_create(celix_bundle_context_t *ctx, log_help
celixThreadMutex_create(&psa->serializers.mutex, NULL);
psa->serializers.map = hashMap_create(NULL, NULL, NULL, NULL);
+ celixThreadMutex_create(&psa->protocols.mutex, NULL);
+ psa->protocols.map = hashMap_create(NULL, NULL, NULL, NULL);
+
celixThreadMutex_create(&psa->topicSenders.mutex, NULL);
psa->topicSenders.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
@@ -228,6 +243,14 @@ void pubsub_zmqAdmin_destroy(pubsub_zmq_admin_t *psa) {
}
celixThreadMutex_unlock(&psa->serializers.mutex);
+ celixThreadMutex_lock(&psa->protocols.mutex);
+ iter = hashMapIterator_construct(psa->protocols.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ psa_zmq_protocol_entry_t *entry = hashMapIterator_nextValue(&iter);
+ free(entry);
+ }
+ celixThreadMutex_unlock(&psa->protocols.mutex);
+
celixThreadMutex_destroy(&psa->topicSenders.mutex);
hashMap_destroy(psa->topicSenders.map, true, false);
@@ -240,6 +263,9 @@ void pubsub_zmqAdmin_destroy(pubsub_zmq_admin_t *psa) {
celixThreadMutex_destroy(&psa->serializers.mutex);
hashMap_destroy(psa->serializers.map, false, false);
+ celixThreadMutex_destroy(&psa->protocols.mutex);
+ hashMap_destroy(psa->protocols.map, false, false);
+
if (psa->zmq_auth != NULL) {
zactor_destroy(&psa->zmq_auth);
}
@@ -319,23 +345,93 @@ void pubsub_zmqAdmin_removeSerializerSvc(void *handle, void *svc, const celix_pr
}
}
-celix_status_t pubsub_zmqAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, celix_properties_t **topicProperties, double *outScore, long *outSerializerSvcId) {
+void pubsub_zmqAdmin_addProtocolSvc(void *handle, void *svc, const celix_properties_t *props) {
+ pubsub_zmq_admin_t *psa = handle;
+
+ const char *protType = celix_properties_get(props, PUBSUB_PROTOCOL_TYPE_KEY, NULL);
+ long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1L);
+
+ if (protType == NULL) {
+ L_INFO("[PSA_ZMQ] Ignoring protocol service without %s property", PUBSUB_PROTOCOL_TYPE_KEY);
+ return;
+ }
+
+ celixThreadMutex_lock(&psa->protocols.mutex);
+ psa_zmq_protocol_entry_t *entry = hashMap_get(psa->protocols.map, (void*)svcId);
+ if (entry == NULL) {
+ entry = calloc(1, sizeof(*entry));
+ entry->protType = protType;
+ entry->svcId = svcId;
+ entry->svc = svc;
+ hashMap_put(psa->protocols.map, (void*)svcId, entry);
+ }
+ celixThreadMutex_unlock(&psa->protocols.mutex);
+}
+
+void pubsub_zmqAdmin_removeProtocolSvc(void *handle, void *svc, const celix_properties_t *props) {
+ pubsub_zmq_admin_t *psa = handle;
+ long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1L);
+
+ //remove protocol
+ // 1) First find entry and
+ // 2) loop and destroy all topic sender using the protocol and
+ // 3) loop and destroy all topic receivers using the protocol
+ // Note that it is the responsibility of the topology manager to create new topic senders/receivers
+
+ celixThreadMutex_lock(&psa->protocols.mutex);
+ psa_zmq_protocol_entry_t *entry = hashMap_remove(psa->protocols.map, (void*)svcId);
+ celixThreadMutex_unlock(&psa->protocols.mutex);
+
+ if (entry != NULL) {
+ celixThreadMutex_lock(&psa->topicSenders.mutex);
+ hash_map_iterator_t iter = hashMapIterator_construct(psa->topicSenders.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ hash_map_entry_t *senderEntry = hashMapIterator_nextEntry(&iter);
+ pubsub_zmq_topic_sender_t *sender = hashMapEntry_getValue(senderEntry);
+ if (sender != NULL && entry->svcId == pubsub_zmqTopicSender_protocolSvcId(sender)) {
+ char *key = hashMapEntry_getKey(senderEntry);
+ hashMapIterator_remove(&iter);
+ pubsub_zmqTopicSender_destroy(sender);
+ free(key);
+ }
+ }
+ celixThreadMutex_unlock(&psa->topicSenders.mutex);
+
+ celixThreadMutex_lock(&psa->topicReceivers.mutex);
+ iter = hashMapIterator_construct(psa->topicReceivers.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ hash_map_entry_t *senderEntry = hashMapIterator_nextEntry(&iter);
+ pubsub_zmq_topic_receiver_t *receiver = hashMapEntry_getValue(senderEntry);
+ if (receiver != NULL && entry->svcId == pubsub_zmqTopicReceiver_protocolSvcId(receiver)) {
+ char *key = hashMapEntry_getKey(senderEntry);
+ hashMapIterator_remove(&iter);
+ pubsub_zmqTopicReceiver_destroy(receiver);
+ free(key);
+ }
+ }
+ celixThreadMutex_unlock(&psa->topicReceivers.mutex);
+
+ free(entry);
+ }
+}
+
+celix_status_t pubsub_zmqAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, celix_properties_t **topicProperties, double *outScore, long *outSerializerSvcId, long *outProtocolSvcId) {
pubsub_zmq_admin_t *psa = handle;
L_DEBUG("[PSA_ZMQ] pubsub_zmqAdmin_matchPublisher");
celix_status_t status = CELIX_SUCCESS;
double score = pubsub_utils_matchPublisher(psa->ctx, svcRequesterBndId, svcFilter->filterStr, PUBSUB_ZMQ_ADMIN_TYPE,
- psa->qosSampleScore, psa->qosControlScore, psa->defaultScore, topicProperties, outSerializerSvcId);
+ psa->qosSampleScore, psa->qosControlScore, psa->defaultScore, topicProperties, false, outSerializerSvcId, outProtocolSvcId);
*outScore = score;
return status;
}
-celix_status_t pubsub_zmqAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, celix_properties_t **topicProperties, double *outScore, long *outSerializerSvcId) {
+celix_status_t pubsub_zmqAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, celix_properties_t **topicProperties, double *outScore, long *outSerializerSvcId, long *outProtocolSvcId) {
pubsub_zmq_admin_t *psa = handle;
L_DEBUG("[PSA_ZMQ] pubsub_zmqAdmin_matchSubscriber");
celix_status_t status = CELIX_SUCCESS;
double score = pubsub_utils_matchSubscriber(psa->ctx, svcProviderBndId, svcProperties, PUBSUB_ZMQ_ADMIN_TYPE,
- psa->qosSampleScore, psa->qosControlScore, psa->defaultScore, topicProperties, outSerializerSvcId);
+ psa->qosSampleScore, psa->qosControlScore, psa->defaultScore, topicProperties, false, outSerializerSvcId, outProtocolSvcId);
if (outScore != NULL) {
*outScore = score;
}
@@ -346,14 +442,14 @@ celix_status_t pubsub_zmqAdmin_matchDiscoveredEndpoint(void *handle, const celix
pubsub_zmq_admin_t *psa = handle;
L_DEBUG("[PSA_ZMQ] pubsub_zmqAdmin_matchEndpoint");
celix_status_t status = CELIX_SUCCESS;
- bool match = pubsub_utils_matchEndpoint(psa->ctx, endpoint, PUBSUB_ZMQ_ADMIN_TYPE, NULL);
+ bool match = pubsub_utils_matchEndpoint(psa->ctx, endpoint, PUBSUB_ZMQ_ADMIN_TYPE, true, NULL, NULL);
if (outMatch != NULL) {
*outMatch = match;
}
return status;
}
-celix_status_t pubsub_zmqAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, celix_properties_t **outPublisherEndpoint) {
+celix_status_t pubsub_zmqAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, long protocolSvcId, celix_properties_t **outPublisherEndpoint) {
pubsub_zmq_admin_t *psa = handle;
celix_status_t status = CELIX_SUCCESS;
@@ -369,19 +465,22 @@ celix_status_t pubsub_zmqAdmin_setupTopicSender(void *handle, const char *scope,
char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
celixThreadMutex_lock(&psa->serializers.mutex);
+ celixThreadMutex_lock(&psa->protocols.mutex);
celixThreadMutex_lock(&psa->topicSenders.mutex);
pubsub_zmq_topic_sender_t *sender = hashMap_get(psa->topicSenders.map, key);
if (sender == NULL) {
psa_zmq_serializer_entry_t *serEntry = hashMap_get(psa->serializers.map, (void*)serializerSvcId);
- if (serEntry != NULL) {
+ psa_zmq_protocol_entry_t *protEntry = hashMap_get(psa->protocols.map, (void*)protocolSvcId);
+ if (serEntry != NULL && protEntry != NULL) {
sender = pubsub_zmqTopicSender_create(psa->ctx, psa->log, scope, topic, serializerSvcId, serEntry->svc,
- psa->ipAddress, staticBindUrl, psa->basePort, psa->maxPort);
+ protocolSvcId, protEntry->svc, psa->ipAddress, staticBindUrl, psa->basePort, psa->maxPort);
}
if (sender != NULL) {
const char *psaType = PUBSUB_ZMQ_ADMIN_TYPE;
const char *serType = serEntry->serType;
+ const char *protType = protEntry->protType;
newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType,
- serType, NULL);
+ serType, protType, NULL);
celix_properties_set(newEndpoint, PUBSUB_ZMQ_URL_KEY, pubsub_zmqTopicSender_url(sender));
//if configured use a static discover url
@@ -414,6 +513,7 @@ celix_status_t pubsub_zmqAdmin_setupTopicSender(void *handle, const char *scope,
L_ERROR("[PSA_ZMQ] Cannot setup already existing TopicSender for scope/topic %s/%s!", scope, topic);
}
celixThreadMutex_unlock(&psa->topicSenders.mutex);
+ celixThreadMutex_unlock(&psa->protocols.mutex);
celixThreadMutex_unlock(&psa->serializers.mutex);
if (sender != NULL && newEndpoint != NULL) {
@@ -452,27 +552,30 @@ celix_status_t pubsub_zmqAdmin_teardownTopicSender(void *handle, const char *sco
return status;
}
-celix_status_t pubsub_zmqAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, celix_properties_t **outSubscriberEndpoint) {
+celix_status_t pubsub_zmqAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, long protocolSvcId, celix_properties_t **outSubscriberEndpoint) {
pubsub_zmq_admin_t *psa = handle;
celix_properties_t *newEndpoint = NULL;
char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
celixThreadMutex_lock(&psa->serializers.mutex);
+ celixThreadMutex_lock(&psa->protocols.mutex);
celixThreadMutex_lock(&psa->topicReceivers.mutex);
pubsub_zmq_topic_receiver_t *receiver = hashMap_get(psa->topicReceivers.map, key);
if (receiver == NULL) {
psa_zmq_serializer_entry_t *serEntry = hashMap_get(psa->serializers.map, (void*)serializerSvcId);
- if (serEntry != NULL) {
- receiver = pubsub_zmqTopicReceiver_create(psa->ctx, psa->log, scope, topic, topicProperties, serializerSvcId, serEntry->svc);
+ psa_zmq_protocol_entry_t *protEntry = hashMap_get(psa->protocols.map, (void*)protocolSvcId);
+ if (serEntry != NULL && protEntry != NULL) {
+ receiver = pubsub_zmqTopicReceiver_create(psa->ctx, psa->log, scope, topic, topicProperties, serializerSvcId, serEntry->svc, protocolSvcId, protEntry->svc);
} else {
- L_ERROR("[PSA_ZMQ] Cannot find serializer for TopicSender %s/%s", scope, topic);
+ L_ERROR("[PSA_ZMQ] Cannot find serializer or protocol for TopicSender %s/%s", scope, topic);
}
if (receiver != NULL) {
const char *psaType = PUBSUB_ZMQ_ADMIN_TYPE;
const char *serType = serEntry->serType;
+ const char *protType = protEntry->protType;
newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic,
- PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType, serType, NULL);
+ PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType, serType, protType, NULL);
//if available also set container name
const char *cn = celix_bundleContext_getProperty(psa->ctx, "CELIX_CONTAINER_NAME", NULL);
if (cn != NULL) {
@@ -488,6 +591,7 @@ celix_status_t pubsub_zmqAdmin_setupTopicReceiver(void *handle, const char *scop
L_ERROR("[PSA_ZMQ] Cannot setup already existing TopicReceiver for scope/topic %s/%s!", scope, topic);
}
celixThreadMutex_unlock(&psa->topicReceivers.mutex);
+ celixThreadMutex_unlock(&psa->protocols.mutex);
celixThreadMutex_unlock(&psa->serializers.mutex);
if (receiver != NULL && newEndpoint != NULL) {
@@ -551,16 +655,24 @@ static celix_status_t pubsub_zmqAdmin_connectEndpointToReceiver(pubsub_zmq_admin
if (serializerEntry != NULL) {
serializer = serializerEntry->serType;
}
+ const char *protocol = NULL;
+ long protocolSvcId = pubsub_zmqTopicReceiver_protocolSvcId(receiver);
+ psa_zmq_protocol_entry_t *protocolEntry = hashMap_get(psa->protocols.map, (void*)protocolSvcId);
+ if (protocolEntry != NULL) {
+ protocol = protocolEntry->protType;
+ }
const char *eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL);
const char *eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL);
const char *eSerializer = celix_properties_get(endpoint, PUBSUB_ENDPOINT_SERIALIZER, NULL);
+ const char *eProtocol = celix_properties_get(endpoint, PUBSUB_ENDPOINT_PROTOCOL, NULL);
- if (scope != NULL && topic != NULL && serializer != NULL
- && eScope != NULL && eTopic != NULL && eSerializer != NULL
+ if (scope != NULL && topic != NULL && serializer != NULL && protocol != NULL
+ && eScope != NULL && eTopic != NULL && eSerializer != NULL && eProtocol != NULL
&& strncmp(eScope, scope, 1024*1024) == 0
&& strncmp(eTopic, topic, 1024*1024) == 0
- && strncmp(eSerializer, serializer, 1024*1024) == 0) {
+ && strncmp(eSerializer, serializer, 1024*1024) == 0
+ && strncmp(eProtocol, protocol, 1024*1024) == 0) {
pubsub_zmqTopicReceiver_connectTo(receiver, url);
}
}
@@ -641,34 +753,50 @@ bool pubsub_zmqAdmin_executeCommand(void *handle, const char *commandLine __attr
fprintf(out, "\n");
fprintf(out, "Topic Senders:\n");
celixThreadMutex_lock(&psa->serializers.mutex);
+ celixThreadMutex_lock(&psa->protocols.mutex);
celixThreadMutex_lock(&psa->topicSenders.mutex);
hash_map_iterator_t iter = hashMapIterator_construct(psa->topicSenders.map);
while (hashMapIterator_hasNext(&iter)) {
pubsub_zmq_topic_sender_t *sender = hashMapIterator_nextValue(&iter);
+
long serSvcId = pubsub_zmqTopicSender_serializerSvcId(sender);
psa_zmq_serializer_entry_t *serEntry = hashMap_get(psa->serializers.map, (void*)serSvcId);
+
+ long protSvcId = pubsub_zmqTopicSender_protocolSvcId(sender);
+ psa_zmq_protocol_entry_t *protEntry = hashMap_get(psa->protocols.map, (void*)protSvcId);
+
const char *serType = serEntry == NULL ? "!Error!" : serEntry->serType;
+ const char *protType = protEntry == NULL ? "!Error!" : protEntry->protType;
const char *scope = pubsub_zmqTopicSender_scope(sender);
const char *topic = pubsub_zmqTopicSender_topic(sender);
const char *url = pubsub_zmqTopicSender_url(sender);
const char *postUrl = pubsub_zmqTopicSender_isStatic(sender) ? " (static)" : "";
fprintf(out, "|- Topic Sender %s/%s\n", scope, topic);
fprintf(out, " |- serializer type = %s\n", serType);
+ fprintf(out, " |- protocol type = %s\n", protType);
fprintf(out, " |- url = %s%s\n", url, postUrl);
}
celixThreadMutex_unlock(&psa->topicSenders.mutex);
+ celixThreadMutex_unlock(&psa->protocols.mutex);
celixThreadMutex_unlock(&psa->serializers.mutex);
fprintf(out, "\n");
fprintf(out, "\nTopic Receivers:\n");
celixThreadMutex_lock(&psa->serializers.mutex);
+ celixThreadMutex_lock(&psa->protocols.mutex);
celixThreadMutex_lock(&psa->topicReceivers.mutex);
iter = hashMapIterator_construct(psa->topicReceivers.map);
while (hashMapIterator_hasNext(&iter)) {
pubsub_zmq_topic_receiver_t *receiver = hashMapIterator_nextValue(&iter);
long serSvcId = pubsub_zmqTopicReceiver_serializerSvcId(receiver);
psa_zmq_serializer_entry_t *serEntry = hashMap_get(psa->serializers.map, (void*)serSvcId);
+
+ long protSvcId = pubsub_zmqTopicReceiver_protocolSvcId(receiver);
+ psa_zmq_protocol_entry_t *protEntry = hashMap_get(psa->protocols.map, (void*)protSvcId);
+
+
const char *serType = serEntry == NULL ? "!Error!" : serEntry->serType;
+ const char *protType = protEntry == NULL ? "!Error!" : protEntry->protType;
const char *scope = pubsub_zmqTopicReceiver_scope(receiver);
const char *topic = pubsub_zmqTopicReceiver_topic(receiver);
@@ -678,6 +806,7 @@ bool pubsub_zmqAdmin_executeCommand(void *handle, const char *commandLine __attr
fprintf(out, "|- Topic Receiver %s/%s\n", scope, topic);
fprintf(out, " |- serializer type = %s\n", serType);
+ fprintf(out, " |- protocol type = %s\n", protType);
for (int i = 0; i < celix_arrayList_size(connected); ++i) {
char *url = celix_arrayList_get(connected, i);
fprintf(out, " |- connected url = %s\n", url);
@@ -692,6 +821,7 @@ bool pubsub_zmqAdmin_executeCommand(void *handle, const char *commandLine __attr
celix_arrayList_destroy(unconnected);
}
celixThreadMutex_unlock(&psa->topicReceivers.mutex);
+ celixThreadMutex_unlock(&psa->protocols.mutex);
celixThreadMutex_unlock(&psa->serializers.mutex);
fprintf(out, "\n");
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.h b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.h
index 8d8c7c0..dd28eca 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.h
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.h
@@ -30,14 +30,14 @@ typedef struct pubsub_zmq_admin pubsub_zmq_admin_t;
pubsub_zmq_admin_t* pubsub_zmqAdmin_create(celix_bundle_context_t *ctx, log_helper_t *logHelper);
void pubsub_zmqAdmin_destroy(pubsub_zmq_admin_t *psa);
-celix_status_t pubsub_zmqAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, celix_properties_t **topicProperties, double *score, long *serializerSvcId);
-celix_status_t pubsub_zmqAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, celix_properties_t **topicProperties, double *score, long *serializerSvcId);
+celix_status_t pubsub_zmqAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, celix_properties_t **topicProperties, double *score, long *serializerSvcId, long *ProtocolSvcId);
+celix_status_t pubsub_zmqAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, celix_properties_t **topicProperties, double *score, long *serializerSvcId, long *ProtocolSvcId);
celix_status_t pubsub_zmqAdmin_matchDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint, bool *match);
-celix_status_t pubsub_zmqAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, const celix_properties_t* topicProperties, long serializerSvcId, celix_properties_t **publisherEndpoint);
+celix_status_t pubsub_zmqAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, const celix_properties_t* topicProperties, long serializerSvcId, long protocolSvcId, celix_properties_t **publisherEndpoint);
celix_status_t pubsub_zmqAdmin_teardownTopicSender(void *handle, const char *scope, const char *topic);
-celix_status_t pubsub_zmqAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, const celix_properties_t* topicProperties, long serializerSvcId, celix_properties_t **subscriberEndpoint);
+celix_status_t pubsub_zmqAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, const celix_properties_t* topicProperties, long serializerSvcId, long protocolSvcId, celix_properties_t **subscriberEndpoint);
celix_status_t pubsub_zmqAdmin_teardownTopicReceiver(void *handle, const char *scope, const char *topic);
celix_status_t pubsub_zmqAdmin_addDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint);
@@ -46,6 +46,9 @@ celix_status_t pubsub_zmqAdmin_removeDiscoveredEndpoint(void *handle, const celi
void pubsub_zmqAdmin_addSerializerSvc(void *handle, void *svc, const celix_properties_t *props);
void pubsub_zmqAdmin_removeSerializerSvc(void *handle, void *svc, const celix_properties_t *props);
+void pubsub_zmqAdmin_addProtocolSvc(void *handle, void *svc, const celix_properties_t *props);
+void pubsub_zmqAdmin_removeProtocolSvc(void *handle, void *svc, const celix_properties_t *props);
+
bool pubsub_zmqAdmin_executeCommand(void *handle, const char *commandLine, FILE *outStream, FILE *errStream);
pubsub_admin_metrics_t* pubsub_zmqAdmin_metrics(void *handle);
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_common.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_common.c
deleted file mode 100644
index 6e7c4d8..0000000
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_common.c
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include <memory.h>
-#include <assert.h>
-#include "pubsub_zmq_common.h"
-
-bool psa_zmq_checkVersion(version_pt msgVersion, const pubsub_zmq_msg_header_t *hdr) {
- bool check=false;
- int major=0,minor=0;
-
- if (hdr->major == 0 && hdr->minor == 0) {
- //no check
- return true;
- }
-
- if (msgVersion!=NULL) {
- version_getMajor(msgVersion,&major);
- version_getMinor(msgVersion,&minor);
- if (hdr->major==((unsigned char)major)) { /* Different major means incompatible */
- check = (hdr->minor>=((unsigned char)minor)); /* Compatible only if the provider has a minor equals or greater (means compatible update) */
- }
- }
-
- return check;
-}
-
-void psa_zmq_setScopeAndTopicFilter(const char* scope, const char *topic, char *filter) {
- for (int i = 0; i < 5; ++i) {
- filter[i] = '\0';
- }
- if (scope != NULL && strnlen(scope, 3) >= 2) {
- filter[0] = scope[0];
- filter[1] = scope[1];
- }
- if (topic != NULL && strnlen(topic, 3) >= 2) {
- filter[2] = topic[0];
- filter[3] = topic[1];
- }
-}
-
-static int readInt(const unsigned char *data, int offset, uint32_t *val) {
- *val = ((data[offset+0] << 24) | (data[offset+1] << 16) | (data[offset+2] << 8) | (data[offset+3] << 0));
- return offset + 4;
-}
-
-static int readLong(const unsigned char *data, int offset, uint64_t *val) {
- *val = (
- ((int64_t)data[offset+0] << 56) |
- ((int64_t)data[offset+1] << 48) |
- ((int64_t)data[offset+2] << 40) |
- ((int64_t)data[offset+3] << 32) |
- ((int64_t)data[offset+4] << 24) |
- ((int64_t)data[offset+5] << 16) |
- ((int64_t)data[offset+6] << 8 ) |
- ((int64_t)data[offset+7] << 0 )
- );
- return offset + 8;
-}
-
-celix_status_t psa_zmq_decodeHeader(const unsigned char *data, size_t dataLen, pubsub_zmq_msg_header_t *header) {
- int status = CELIX_ILLEGAL_ARGUMENT;
- if (dataLen == sizeof(pubsub_zmq_msg_header_t)) {
- int index = 0;
- index = readInt(data, index, &header->type);
- header->major = (unsigned char) data[index++];
- header->minor = (unsigned char) data[index++];
-
- index = readInt(data, index, &header->seqNr);
- for (int i = 0; i < 16; ++i) {
- header->originUUID[i] = data[index+i];
- }
- index += 16;
- index = readLong(data, index, &header->sendtimeSeconds);
- readLong(data, index, &header->sendTimeNanoseconds);
-
- status = CELIX_SUCCESS;
- }
- return status;
-}
-
-
-static int writeInt(unsigned char *data, int offset, int32_t val) {
- data[offset+0] = (unsigned char)((val >> 24) & 0xFF);
- data[offset+1] = (unsigned char)((val >> 16) & 0xFF);
- data[offset+2] = (unsigned char)((val >> 8 ) & 0xFF);
- data[offset+3] = (unsigned char)((val >> 0 ) & 0xFF);
- return offset + 4;
-}
-
-static int writeLong(unsigned char *data, int offset, int64_t val) {
- data[offset+0] = (unsigned char)((val >> 56) & 0xFF);
- data[offset+1] = (unsigned char)((val >> 48) & 0xFF);
- data[offset+2] = (unsigned char)((val >> 40) & 0xFF);
- data[offset+3] = (unsigned char)((val >> 32) & 0xFF);
- data[offset+4] = (unsigned char)((val >> 24) & 0xFF);
- data[offset+5] = (unsigned char)((val >> 16) & 0xFF);
- data[offset+6] = (unsigned char)((val >> 8 ) & 0xFF);
- data[offset+7] = (unsigned char)((val >> 0 ) & 0xFF);
- return offset + 8;
-}
-
-void psa_zmq_encodeHeader(const pubsub_zmq_msg_header_t *msgHeader, unsigned char *data, size_t dataLen) {
- assert(dataLen == sizeof(*msgHeader));
- int index = 0;
- index = writeInt(data, index, msgHeader->type);
- data[index++] = (unsigned char)msgHeader->major;
- data[index++] = (unsigned char)msgHeader->minor;
- index = writeInt(data, index, msgHeader->seqNr);
- for (int i = 0; i < 16; ++i) {
- data[index+i] = msgHeader->originUUID[i];
- }
- index += 16;
- index = writeLong(data, index, msgHeader->sendtimeSeconds);
- writeLong(data, index, msgHeader->sendTimeNanoseconds);
-}
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_common.h b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_common.h
deleted file mode 100644
index 044520c..0000000
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_common.h
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#ifndef CELIX_PUBSUB_ZMQ_COMMON_H
-#define CELIX_PUBSUB_ZMQ_COMMON_H
-
-#include <utils.h>
-#include <stdint.h>
-
-#include "version.h"
-
-
-/*
- * NOTE zmq is used by first sending three frames:
- * 1) A subscription filter.
- * This is a 5 char string of the first two chars of scope and topic combined and terminated with a '\0'.
- *
- * 2) The pubsub_zmq_msg_header_t is send containg the type id and major/minor version
- *
- * 3) The actual payload
- */
-
-
-struct pubsub_zmq_msg_header {
- uint32_t type; //msg type id (hash of fqn)
- uint8_t major;
- uint8_t minor;
- uint32_t seqNr;
- unsigned char originUUID[16];
- uint64_t sendtimeSeconds; //seconds since epoch
- uint64_t sendTimeNanoseconds; //ns since epoch
-};
-
-typedef struct pubsub_zmq_msg_header pubsub_zmq_msg_header_t;
-
-
-void psa_zmq_setScopeAndTopicFilter(const char* scope, const char *topic, char *filter);
-
-bool psa_zmq_checkVersion(version_pt msgVersion, const pubsub_zmq_msg_header_t *hdr);
-
-celix_status_t psa_zmq_decodeHeader(const unsigned char *data, size_t dataLen, pubsub_zmq_msg_header_t *header);
-void psa_zmq_encodeHeader(const pubsub_zmq_msg_header_t *msgHeader, unsigned char *data, size_t dataLen);
-#endif //CELIX_PUBSUB_ZMQ_COMMON_H
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
index d636867..2f80d31 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
@@ -18,6 +18,7 @@
*/
#include <pubsub_serializer.h>
+#include <pubsub_protocol.h>
#include <stdlib.h>
#include <pubsub/subscriber.h>
#include <memory.h>
@@ -30,11 +31,12 @@
#include <log_helper.h>
#include "pubsub_zmq_topic_receiver.h"
#include "pubsub_psa_zmq_constants.h"
-#include "pubsub_zmq_common.h"
#include <uuid/uuid.h>
#include <pubsub_admin_metrics.h>
+#include "celix_utils_api.h"
+
#define PSA_ZMQ_RECV_TIMEOUT 1000
#ifndef UUID_STR_LEN
@@ -56,9 +58,10 @@ struct pubsub_zmq_topic_receiver {
log_helper_t *logHelper;
long serializerSvcId;
pubsub_serializer_service_t *serializer;
+ long protocolSvcId;
+ pubsub_protocol_service_t *protocol;
char *scope;
char *topic;
- char scopeAndTopicFilter[5];
bool metricsEnabled;
void *zmqCtx;
@@ -122,7 +125,7 @@ static void psa_zmq_connectToAllRequestedConnections(pubsub_zmq_topic_receiver_t
static void psa_zmq_initializeAllSubscribers(pubsub_zmq_topic_receiver_t *receiver);
static void psa_zmq_setupZmqContext(pubsub_zmq_topic_receiver_t *receiver, const celix_properties_t *topicProperties);
static void psa_zmq_setupZmqSocket(pubsub_zmq_topic_receiver_t *receiver, const celix_properties_t *topicProperties);
-
+static bool psa_zmq_checkVersion(version_pt msgVersion, uint16_t major, uint16_t minor);
pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context_t *ctx,
@@ -131,15 +134,18 @@ pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context
const char *topic,
const celix_properties_t *topicProperties,
long serializerSvcId,
- pubsub_serializer_service_t *serializer) {
+ pubsub_serializer_service_t *serializer,
+ long protocolSvcId,
+ pubsub_protocol_service_t *protocol) {
pubsub_zmq_topic_receiver_t *receiver = calloc(1, sizeof(*receiver));
receiver->ctx = ctx;
receiver->logHelper = logHelper;
receiver->serializerSvcId = serializerSvcId;
receiver->serializer = serializer;
+ receiver->protocolSvcId = protocolSvcId;
+ receiver->protocol = protocol;
receiver->scope = strndup(scope, 1024 * 1024);
receiver->topic = strndup(topic, 1024 * 1024);
- psa_zmq_setScopeAndTopicFilter(scope, topic, receiver->scopeAndTopicFilter);
receiver->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_ZMQ_METRICS_ENABLED, PSA_ZMQ_DEFAULT_METRICS_ENABLED);
@@ -329,6 +335,10 @@ long pubsub_zmqTopicReceiver_serializerSvcId(pubsub_zmq_topic_receiver_t *receiv
return receiver->serializerSvcId;
}
+long pubsub_zmqTopicReceiver_protocolSvcId(pubsub_zmq_topic_receiver_t *receiver) {
+ return receiver->protocolSvcId;
+}
+
void pubsub_zmqTopicReceiver_listConnections(pubsub_zmq_topic_receiver_t *receiver, celix_array_list_t *connectedUrls, celix_array_list_t *unconnectedUrls) {
celixThreadMutex_lock(&receiver->requestedConnections.mutex);
hash_map_iterator_t iter = hashMapIterator_construct(receiver->requestedConnections.map);
@@ -456,9 +466,9 @@ static void pubsub_zmqTopicReceiver_removeSubscriber(void *handle, void *svc, co
celixThreadMutex_unlock(&receiver->subscribers.mutex);
}
-static inline void processMsgForSubscriberEntry(pubsub_zmq_topic_receiver_t *receiver, psa_zmq_subscriber_entry_t* entry, const pubsub_zmq_msg_header_t *hdr, const byte* payload, size_t payloadSize, struct timespec *receiveTime) {
+static inline void processMsgForSubscriberEntry(pubsub_zmq_topic_receiver_t *receiver, psa_zmq_subscriber_entry_t* entry, pubsub_protocol_message_t *message, struct timespec *receiveTime) {
//NOTE receiver->subscribers.mutex locked
- pubsub_msg_serializer_t* msgSer = hashMap_get(entry->msgTypes, (void*)(uintptr_t)(hdr->type));
+ pubsub_msg_serializer_t* msgSer = hashMap_get(entry->msgTypes, (void*)(uintptr_t)(message->header.msgId));
pubsub_subscriber_t *svc = entry->svc;
bool monitor = receiver->metricsEnabled;
@@ -470,18 +480,18 @@ static inline void processMsgForSubscriberEntry(pubsub_zmq_topic_receiver_t *rec
if (msgSer!= NULL) {
void *deserializedMsg = NULL;
- bool validVersion = psa_zmq_checkVersion(msgSer->msgVersion, hdr);
+ bool validVersion = psa_zmq_checkVersion(msgSer->msgVersion, message->header.msgMajorVersion, message->header.msgMinorVersion);
if (validVersion) {
if (monitor) {
clock_gettime(CLOCK_REALTIME, &beginSer);
}
- celix_status_t status = msgSer->deserialize(msgSer->handle, payload, payloadSize, &deserializedMsg);
+ celix_status_t status = msgSer->deserialize(msgSer->handle, message->payload.payload, message->payload.length, &deserializedMsg);
if (monitor) {
clock_gettime(CLOCK_REALTIME, &endSer);
}
if (status == CELIX_SUCCESS) {
bool release = true;
- svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deserializedMsg, &release);
+ svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deserializedMsg, message->metadata.metadata, &release);
if (release) {
msgSer->freeMsg(msgSer->handle, deserializedMsg);
}
@@ -492,68 +502,69 @@ static inline void processMsgForSubscriberEntry(pubsub_zmq_topic_receiver_t *rec
}
}
} else {
- L_WARN("[PSA_ZMQ_TR] Cannot find serializer for type id 0x%X", hdr->type);
+ L_WARN("[PSA_ZMQ_TR] Cannot find serializer for type id 0x%X", message->header.msgId);
}
if (msgSer != NULL && monitor) {
- hash_map_t *origins = hashMap_get(entry->metrics, (void*)(uintptr_t )hdr->type);
- char uuidStr[UUID_STR_LEN+1];
- uuid_unparse(hdr->originUUID, uuidStr);
- psa_zmq_subscriber_metrics_entry_t *metrics = hashMap_get(origins, uuidStr);
-
- if (metrics == NULL) {
- metrics = calloc(1, sizeof(*metrics));
- hashMap_put(origins, strndup(uuidStr, UUID_STR_LEN+1), metrics);
- uuid_copy(metrics->origin, hdr->originUUID);
- metrics->msgTypeId = hdr->type;
- metrics->maxDelayInSeconds = -INFINITY;
- metrics->minDelayInSeconds = INFINITY;
- metrics->lastSeqNr = 0;
- }
-
- double diff = celix_difftime(&beginSer, &endSer);
- long n = metrics->nrOfMessagesReceived;
- metrics->averageSerializationTimeInSeconds = (metrics->averageSerializationTimeInSeconds * n + diff) / (n+1);
-
- diff = celix_difftime(&metrics->lastMessageReceived, receiveTime);
- n = metrics->nrOfMessagesReceived;
- if (metrics->nrOfMessagesReceived >= 1) {
- metrics->averageTimeBetweenMessagesInSeconds = (metrics->averageTimeBetweenMessagesInSeconds * n + diff) / (n + 1);
- }
- metrics->lastMessageReceived = *receiveTime;
-
-
- int incr = hdr->seqNr - metrics->lastSeqNr;
- if (metrics->lastSeqNr >0 && incr > 1) {
- metrics->nrOfMissingSeqNumbers += (incr - 1);
- L_WARN("Missing message seq nr went from %i to %i", metrics->lastSeqNr, hdr->seqNr);
- }
- metrics->lastSeqNr = hdr->seqNr;
-
- struct timespec sendTime;
- sendTime.tv_sec = (time_t)hdr->sendtimeSeconds;
- sendTime.tv_nsec = (long)hdr->sendTimeNanoseconds; //TODO FIXME the tv_nsec is not correct
- diff = celix_difftime(&sendTime, receiveTime);
- metrics->averageDelayInSeconds = (metrics->averageDelayInSeconds * n + diff) / (n+1);
- if (diff < metrics->minDelayInSeconds) {
- metrics->minDelayInSeconds = diff;
- }
- if (diff > metrics->maxDelayInSeconds) {
- metrics->maxDelayInSeconds = diff;
- }
-
- metrics->nrOfMessagesReceived += updateReceiveCount;
- metrics->nrOfSerializationErrors += updateSerError;
+ // TODO disabled for now, should move to an interceptor?
+// hash_map_t *origins = hashMap_get(entry->metrics, (void*)(uintptr_t )message->header.msgId);
+// char uuidStr[UUID_STR_LEN+1];
+// uuid_unparse(hdr->originUUID, uuidStr);
+// psa_zmq_subscriber_metrics_entry_t *metrics = hashMap_get(origins, uuidStr);
+//
+// if (metrics == NULL) {
+// metrics = calloc(1, sizeof(*metrics));
+// hashMap_put(origins, strndup(uuidStr, UUID_STR_LEN+1), metrics);
+// uuid_copy(metrics->origin, hdr->originUUID);
+// metrics->msgTypeId = hdr->type;
+// metrics->maxDelayInSeconds = -INFINITY;
+// metrics->minDelayInSeconds = INFINITY;
+// metrics->lastSeqNr = 0;
+// }
+//
+// double diff = celix_difftime(&beginSer, &endSer);
+// long n = metrics->nrOfMessagesReceived;
+// metrics->averageSerializationTimeInSeconds = (metrics->averageSerializationTimeInSeconds * n + diff) / (n+1);
+//
+// diff = celix_difftime(&metrics->lastMessageReceived, receiveTime);
+// n = metrics->nrOfMessagesReceived;
+// if (metrics->nrOfMessagesReceived >= 1) {
+// metrics->averageTimeBetweenMessagesInSeconds = (metrics->averageTimeBetweenMessagesInSeconds * n + diff) / (n + 1);
+// }
+// metrics->lastMessageReceived = *receiveTime;
+//
+//
+// int incr = hdr->seqNr - metrics->lastSeqNr;
+// if (metrics->lastSeqNr >0 && incr > 1) {
+// metrics->nrOfMissingSeqNumbers += (incr - 1);
+// L_WARN("Missing message seq nr went from %i to %i", metrics->lastSeqNr, hdr->seqNr);
+// }
+// metrics->lastSeqNr = hdr->seqNr;
+//
+// struct timespec sendTime;
+// sendTime.tv_sec = (time_t)hdr->sendtimeSeconds;
+// sendTime.tv_nsec = (long)hdr->sendTimeNanoseconds; //TODO FIXME the tv_nsec is not correct
+// diff = celix_difftime(&sendTime, receiveTime);
+// metrics->averageDelayInSeconds = (metrics->averageDelayInSeconds * n + diff) / (n+1);
+// if (diff < metrics->minDelayInSeconds) {
+// metrics->minDelayInSeconds = diff;
+// }
+// if (diff > metrics->maxDelayInSeconds) {
+// metrics->maxDelayInSeconds = diff;
+// }
+//
+// metrics->nrOfMessagesReceived += updateReceiveCount;
+// metrics->nrOfSerializationErrors += updateSerError;
}
}
-static inline void processMsg(pubsub_zmq_topic_receiver_t *receiver, const pubsub_zmq_msg_header_t *hdr, const byte *payload, size_t payloadSize, struct timespec *receiveTime) {
+static inline void processMsg(pubsub_zmq_topic_receiver_t *receiver, pubsub_protocol_message_t *message, struct timespec *receiveTime) {
celixThreadMutex_lock(&receiver->subscribers.mutex);
hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map);
while (hashMapIterator_hasNext(&iter)) {
psa_zmq_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
if (entry != NULL) {
- processMsgForSubscriberEntry(receiver, entry, hdr, payload, payloadSize, receiveTime);
+ processMsgForSubscriberEntry(receiver, entry, message, receiveTime);
}
}
celixThreadMutex_unlock(&receiver->subscribers.mutex);
@@ -574,7 +585,6 @@ static void* psa_zmq_recvThread(void * data) {
bool allInitialized = receiver->subscribers.allInitialized;
celixThreadMutex_unlock(&receiver->subscribers.mutex);
-
while (running) {
if (!allConnected) {
psa_zmq_connectToAllRequestedConnections(receiver);
@@ -585,24 +595,32 @@ static void* psa_zmq_recvThread(void * data) {
zmsg_t *zmsg = zmsg_recv(receiver->zmqSock);
if (zmsg != NULL) {
- if (zmsg_size(zmsg) != 3) {
- L_WARN("[PSA_ZMQ_TR] Always expecting 3 frames per zmsg (filter + header + payload), got %i frames", (int)zmsg_size(zmsg));
+ if (zmsg_size(zmsg) < 2) {
+ L_WARN("[PSA_ZMQ_TR] Always expecting at least frames per zmsg (header + payload (+ metadata)), got %i frames", (int)zmsg_size(zmsg));
} else {
- zframe_t *filter = zmsg_pop(zmsg); //char[5] filter
- zframe_t *header = zmsg_pop(zmsg); //pubsub_zmq_msg_header_t
- zframe_t *payload = zmsg_pop(zmsg); //serialized payload
- if (filter != NULL && strncmp(receiver->scopeAndTopicFilter, (char*)zframe_data(filter), zframe_size(filter)) != 0 ) {
- L_ERROR("[PSA_ZMQ_TR] Invalid ZQM filter, Found '%4s'. Expected %s\n", (char*)zframe_data(filter), receiver->scopeAndTopicFilter);
- } else if (header != NULL && payload != NULL) {
+ zframe_t *header = zmsg_pop(zmsg); // header
+ zframe_t *payload = NULL;
+ zframe_t *metadata = NULL;
+
+ pubsub_protocol_message_t message;
+ receiver->protocol->decodeHeader(receiver->protocol->handle, zframe_data(header), zframe_size(header), &message);
+ if (message.header.payloadSize > 0) {
+ payload = zmsg_pop(zmsg);
+ receiver->protocol->decodePayload(receiver->protocol->handle, zframe_data(payload), zframe_size(payload), &message);
+ }
+ if (message.header.metadataSize > 0) {
+ metadata = zmsg_pop(zmsg);
+ receiver->protocol->decodeMetadata(receiver->protocol->handle, zframe_data(metadata), zframe_size(metadata), &message);
+ }
+ if (header != NULL && payload != NULL) {
struct timespec receiveTime;
clock_gettime(CLOCK_REALTIME, &receiveTime);
- pubsub_zmq_msg_header_t msgHeader;
- psa_zmq_decodeHeader(zframe_data(header), zframe_size(header), &msgHeader);
- processMsg(receiver, &msgHeader, zframe_data(payload), zframe_size(payload), &receiveTime);
+ processMsg(receiver, &message, &receiveTime);
}
- zframe_destroy(&filter);
+ celix_properties_destroy(message.metadata.metadata);
zframe_destroy(&header);
zframe_destroy(&payload);
+ zframe_destroy(&metadata);
}
zmsg_destroy(&zmsg);
} else {
@@ -800,10 +818,33 @@ static void psa_zmq_setupZmqSocket(pubsub_zmq_topic_receiver_t *receiver, const
zcert_apply (sub_cert, zmq_s);
zsock_set_curve_serverkey (zmq_s, pub_key); //apply key of publisher to socket of subscriber
#endif
- zsock_set_subscribe(receiver->zmqSock, receiver->scopeAndTopicFilter);
+ char sync[5];
+ receiver->protocol->getSyncHeader(receiver->protocol->handle, sync);
+ zsock_set_subscribe(receiver->zmqSock, sync);
#ifdef BUILD_WITH_ZMQ_SECURITY
ts->zmq_cert = sub_cert;
ts->zmq_pub_cert = pub_cert;
#endif
}
+
+static bool psa_zmq_checkVersion(version_pt msgVersion, uint16_t major, uint16_t minor) {
+ bool check=false;
+
+ if (major == 0 && minor == 0) {
+ //no check
+ return true;
+ }
+
+ int versionMajor;
+ int versionMinor;
+ if (msgVersion!=NULL) {
+ version_getMajor(msgVersion, &versionMajor);
+ version_getMinor(msgVersion, &versionMinor);
+ if (major==((unsigned char)versionMajor)) { /* Different major means incompatible */
+ check = (minor>=((unsigned char)versionMinor)); /* Compatible only if the provider has a minor equals or greater (means compatible update) */
+ }
+ }
+
+ return check;
+}
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.h b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.h
index b9ab962..1792c48 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.h
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.h
@@ -27,17 +27,20 @@ typedef struct pubsub_zmq_topic_receiver pubsub_zmq_topic_receiver_t;
pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context_t *ctx,
log_helper_t *logHelper,
- const char *scope,
+ const char *scope,
const char *topic,
const celix_properties_t *topicProperties,
long serializerSvcId,
- pubsub_serializer_service_t *serializer);
+ pubsub_serializer_service_t *serializer,
+ long protocolSvcId,
+ pubsub_protocol_service_t *protocol);
void pubsub_zmqTopicReceiver_destroy(pubsub_zmq_topic_receiver_t *receiver);
const char* pubsub_zmqTopicReceiver_scope(pubsub_zmq_topic_receiver_t *receiver);
const char* pubsub_zmqTopicReceiver_topic(pubsub_zmq_topic_receiver_t *receiver);
long pubsub_zmqTopicReceiver_serializerSvcId(pubsub_zmq_topic_receiver_t *receiver);
+long pubsub_zmqTopicReceiver_protocolSvcId(pubsub_zmq_topic_receiver_t *receiver);
void pubsub_zmqTopicReceiver_listConnections(pubsub_zmq_topic_receiver_t *receiver, celix_array_list_t *connectedUrls, celix_array_list_t *unconnectedUrls);
void pubsub_zmqTopicReceiver_connectTo(pubsub_zmq_topic_receiver_t *receiver, const char *url);
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
index 96b6461..d6f3604 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
@@ -18,6 +18,7 @@
*/
#include <pubsub_serializer.h>
+#include <pubsub_protocol.h>
#include <stdlib.h>
#include <memory.h>
#include <pubsub_constants.h>
@@ -29,7 +30,6 @@
#include <log_helper.h>
#include "pubsub_zmq_topic_sender.h"
#include "pubsub_psa_zmq_constants.h"
-#include "pubsub_zmq_common.h"
#include <uuid/uuid.h>
#include "celix_constants.h"
@@ -50,13 +50,14 @@ struct pubsub_zmq_topic_sender {
log_helper_t *logHelper;
long serializerSvcId;
pubsub_serializer_service_t *serializer;
+ long protocolSvcId;
+ pubsub_protocol_service_t *protocol;
uuid_t fwUUID;
bool metricsEnabled;
bool zeroCopyEnabled;
char *scope;
char *topic;
- char scopeAndTopicFilter[5];
char *url;
bool isStatic;
@@ -78,8 +79,12 @@ struct pubsub_zmq_topic_sender {
};
typedef struct psa_zmq_send_msg_entry {
- pubsub_zmq_msg_header_t header; //partially filled header (only seqnr and time needs to be updated per send)
+ uint32_t type; //msg type id (hash of fqn)
+ uint8_t major;
+ uint8_t minor;
+ unsigned char originUUID[16];
pubsub_msg_serializer_t *msgSer;
+ pubsub_protocol_service_t *protSer;
celix_thread_mutex_t sendLock; //protects send & Seqnr
unsigned int seqNr;
struct {
@@ -108,7 +113,7 @@ static void psa_zmq_ungetPublisherService(void *handle, const celix_bundle_t *re
static unsigned int rand_range(unsigned int min, unsigned int max);
static void delay_first_send_for_late_joiners(pubsub_zmq_topic_sender_t *sender);
-static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *msg);
+static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *msg, celix_properties_t *metadata);
pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create(
celix_bundle_context_t *ctx,
@@ -117,6 +122,8 @@ pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create(
const char *topic,
long serializerSvcId,
pubsub_serializer_service_t *ser,
+ long protocolSvcId,
+ pubsub_protocol_service_t *prot,
const char *bindIP,
const char *staticBindUrl,
unsigned int basePort,
@@ -126,7 +133,8 @@ pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create(
sender->logHelper = logHelper;
sender->serializerSvcId = serializerSvcId;
sender->serializer = ser;
- psa_zmq_setScopeAndTopicFilter(scope, topic, sender->scopeAndTopicFilter);
+ sender->protocolSvcId = protocolSvcId;
+ sender->protocol = prot;
const char* uuid = celix_bundleContext_getProperty(ctx, OSGI_FRAMEWORK_FRAMEWORK_UUID, NULL);
if (uuid != NULL) {
uuid_parse(uuid, sender->fwUUID);
@@ -321,6 +329,10 @@ long pubsub_zmqTopicSender_serializerSvcId(pubsub_zmq_topic_sender_t *sender) {
return sender->serializerSvcId;
}
+long pubsub_zmqTopicSender_protocolSvcId(pubsub_zmq_topic_sender_t *sender) {
+ return sender->protocolSvcId;
+}
+
const char* pubsub_zmqTopicSender_scope(pubsub_zmq_topic_sender_t *sender) {
return sender->scope;
}
@@ -375,14 +387,15 @@ static void* psa_zmq_getPublisherService(void *handle, const celix_bundle_t *req
void *key = hashMapEntry_getKey(hashMapEntry);
psa_zmq_send_msg_entry_t *sendEntry = calloc(1, sizeof(*sendEntry));
sendEntry->msgSer = hashMapEntry_getValue(hashMapEntry);
- sendEntry->header.type = (int32_t)sendEntry->msgSer->msgId;
+ sendEntry->protSer = sender->protocol;
+ sendEntry->type = (int32_t)sendEntry->msgSer->msgId;
int major;
int minor;
version_getMajor(sendEntry->msgSer->msgVersion, &major);
version_getMinor(sendEntry->msgSer->msgVersion, &minor);
- sendEntry->header.major = (uint8_t)major;
- sendEntry->header.minor = (uint8_t)minor;
- uuid_copy(sendEntry->header.originUUID, sender->fwUUID);
+ sendEntry->major = (uint8_t)major;
+ sendEntry->minor = (uint8_t)minor;
+ uuid_copy(sendEntry->originUUID, sender->fwUUID);
celixThreadMutex_create(&sendEntry->metrics.mutex, NULL);
hashMap_put(entry->msgEntries, key, sendEntry);
hashMap_put(entry->msgTypeIds, strndup(sendEntry->msgSer->msgName, 1024), (void *)(uintptr_t) sendEntry->msgSer->msgId);
@@ -464,7 +477,7 @@ pubsub_admin_sender_metrics_t* pubsub_zmqTopicSender_metrics(pubsub_zmq_topic_se
result->msgMetrics[i].averageTimeBetweenMessagesInSeconds = mEntry->metrics.averageTimeBetweenMessagesInSeconds;
result->msgMetrics[i].lastMessageSend = mEntry->metrics.lastMessageSend;
result->msgMetrics[i].bndId = entry->bndId;
- result->msgMetrics[i].typeId = mEntry->header.type;
+ result->msgMetrics[i].typeId = mEntry->type;
snprintf(result->msgMetrics[i].typeFqn, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", mEntry->msgSer->msgName);
i += 1;
celixThreadMutex_unlock(&mEntry->metrics.mutex);
@@ -480,7 +493,7 @@ static void psa_zmq_freeMsg(void *msg, void *hint __attribute__((unused))) {
free(msg);
}
-static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *inMsg) {
+static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *inMsg, celix_properties_t *metadata) {
int status = CELIX_SUCCESS;
psa_zmq_bounded_service_entry_t *bound = handle;
pubsub_zmq_topic_sender_t *sender = bound->parent;
@@ -515,55 +528,70 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co
}
if (status == CELIX_SUCCESS /*ser ok*/) {
- unsigned char *hdr = calloc(sizeof(pubsub_zmq_msg_header_t), sizeof(unsigned char));
+ pubsub_protocol_message_t message;
+ message.payload.payload = serializedOutput;
+ message.payload.length = serializedOutputLen;
+
+ void *payloadData = NULL;
+ size_t payloadLength = 0;
+ entry->protSer->encodePayload(entry->protSer->handle, &message, &payloadData, &payloadLength);
+
+ void *metadataData = NULL;
+ size_t metadataLength = 0;
+ if (metadata != NULL) {
+ message.metadata.metadata = metadata;
+ entry->protSer->encodeMetadata(entry->protSer->handle, &message, &metadataData, &metadataLength);
+ }
- celixThreadMutex_lock(&entry->sendLock);
+ message.header.msgId = msgTypeId;
+ message.header.msgMajorVersion = 0;
+ message.header.msgMinorVersion = 0;
+ message.header.payloadSize = payloadLength;
+ message.header.metadataSize = metadataLength;
- pubsub_zmq_msg_header_t msg_hdr = entry->header;
- msg_hdr.seqNr = 0;
- msg_hdr.sendtimeSeconds = 0;
- msg_hdr.sendTimeNanoseconds = 0;
- if (monitor) {
- clock_gettime(CLOCK_REALTIME, &sendTime);
- msg_hdr.sendtimeSeconds = (uint64_t) sendTime.tv_sec;
- msg_hdr.sendTimeNanoseconds = (uint64_t) sendTime.tv_nsec;
- msg_hdr.seqNr = entry->seqNr++;
- }
- psa_zmq_encodeHeader(&msg_hdr, hdr, sizeof(pubsub_zmq_msg_header_t));
+ void *headerData = NULL;
+ size_t headerLength = 0;
+
+ entry->protSer->encodeHeader(entry->protSer->handle, &message, &headerData, &headerLength);
+
+ celixThreadMutex_lock(&entry->sendLock);
errno = 0;
bool sendOk;
if (bound->parent->zeroCopyEnabled) {
- zmq_msg_t msg1; //filter
- zmq_msg_t msg2; //header
- zmq_msg_t msg3; //payload
+ zmq_msg_t msg1; // Header
+ zmq_msg_t msg2; // Payload
+ zmq_msg_t msg3; // Metadata
void *socket = zsock_resolve(sender->zmq.socket);
- zmq_msg_init_data(&msg1, sender->scopeAndTopicFilter, 4, NULL, bound);
- //send filter
+ zmq_msg_init_data(&msg1, headerData, headerLength, psa_zmq_freeMsg, bound);
+ //send header
int rc = zmq_msg_send(&msg1, socket, ZMQ_SNDMORE);
if (rc == -1) {
- L_WARN("Error sending filter msg. %s", strerror(errno));
+ L_WARN("Error sending header msg. %s", strerror(errno));
zmq_msg_close(&msg1);
}
//send header
if (rc > 0) {
- zmq_msg_init_data(&msg2, hdr, sizeof(pubsub_zmq_msg_header_t), psa_zmq_freeMsg, bound);
- rc = zmq_msg_send(&msg2, socket, ZMQ_SNDMORE);
+ zmq_msg_init_data(&msg2, payloadData, payloadLength, psa_zmq_freeMsg, bound);
+ int flags = ZMQ_SNDMORE;
+ if (metadataLength > 0) {
+ flags = 0;
+ }
+ rc = zmq_msg_send(&msg2, socket, flags);
if (rc == -1) {
- L_WARN("Error sending header msg. %s", strerror(errno));
+ L_WARN("Error sending payload msg. %s", strerror(errno));
zmq_msg_close(&msg2);
}
}
-
- if (rc > 0) {
- zmq_msg_init_data(&msg3, serializedOutput, serializedOutputLen, psa_zmq_freeMsg, bound);
+ if (rc > 0 && metadataLength > 0) {
+ zmq_msg_init_data(&msg3, metadataData, metadataLength, psa_zmq_freeMsg, bound);
rc = zmq_msg_send(&msg3, socket, 0);
if (rc == -1) {
- L_WARN("Error sending payload msg. %s", strerror(errno));
+ L_WARN("Error sending metadata msg. %s", strerror(errno));
zmq_msg_close(&msg3);
}
}
@@ -571,18 +599,24 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co
sendOk = rc > 0;
} else {
zmsg_t *msg = zmsg_new();
- zmsg_addstr(msg, sender->scopeAndTopicFilter);
- zmsg_addmem(msg, hdr, sizeof(pubsub_zmq_msg_header_t));
- zmsg_addmem(msg, serializedOutput, serializedOutputLen);
+ zmsg_addmem(msg, headerData, headerLength);
+ zmsg_addmem(msg, payloadData, payloadLength);
+ if (metadataLength > 0) {
+ zmsg_addmem(msg, metadataData, metadataLength);
+ }
int rc = zmsg_send(&msg, sender->zmq.socket);
sendOk = rc == 0;
- free(serializedOutput);
- free(hdr);
+
if (!sendOk) {
zmsg_destroy(&msg); //if send was not ok, no owner change -> destroy msg
}
}
+ celix_properties_destroy(message.metadata.metadata);
+ free(headerData);
+ free(payloadData);
+ free(metadataData);
+
celixThreadMutex_unlock(&entry->sendLock);
if (sendOk) {
sendCountUpdate = 1;
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.h b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.h
index c5c69a4..4923c62 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.h
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.h
@@ -32,6 +32,8 @@ pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create(
const char *topic,
long serializerSvcId,
pubsub_serializer_service_t *ser,
+ long protocolSvcId,
+ pubsub_protocol_service_t *prot,
const char *bindIP,
const char *staticBindUrl,
unsigned int basePort,
@@ -44,6 +46,7 @@ const char* pubsub_zmqTopicSender_url(pubsub_zmq_topic_sender_t *sender);
bool pubsub_zmqTopicSender_isStatic(pubsub_zmq_topic_sender_t *sender);
long pubsub_zmqTopicSender_serializerSvcId(pubsub_zmq_topic_sender_t *sender);
+long pubsub_zmqTopicSender_protocolSvcId(pubsub_zmq_topic_sender_t *sender);
void pubsub_zmqTopicSender_connectTo(pubsub_zmq_topic_sender_t *sender, const celix_properties_t *endpoint);
void pubsub_zmqTopicSender_disconnectFrom(pubsub_zmq_topic_sender_t *sender, const celix_properties_t *endpoint);
diff --git a/bundles/pubsub/pubsub_api/include/pubsub/publisher.h b/bundles/pubsub/pubsub_api/include/pubsub/publisher.h
index 3324c51..fad5a0d 100644
--- a/bundles/pubsub/pubsub_api/include/pubsub/publisher.h
+++ b/bundles/pubsub/pubsub_api/include/pubsub/publisher.h
@@ -29,6 +29,8 @@
#include <stdlib.h>
+#include "hash_map.h"
+
#define PUBSUB_PUBLISHER_SERVICE_NAME "pubsub.publisher"
#define PUBSUB_PUBLISHER_SERVICE_VERSION "3.0.0"
@@ -59,7 +61,7 @@ struct pubsub_publisher {
* send is a async function, but the msg can be safely deleted after send returns.
* Returns 0 on success.
*/
- int (*send)(void *handle, unsigned int msgTypeId, const void *msg);
+ int (*send)(void *handle, unsigned int msgTypeId, const void *msg, celix_properties_t *metadata);
};
typedef struct pubsub_publisher pubsub_publisher_t;
diff --git a/bundles/pubsub/pubsub_api/include/pubsub/subscriber.h b/bundles/pubsub/pubsub_api/include/pubsub/subscriber.h
index 71b6956..9f5433f 100644
--- a/bundles/pubsub/pubsub_api/include/pubsub/subscriber.h
+++ b/bundles/pubsub/pubsub_api/include/pubsub/subscriber.h
@@ -29,6 +29,8 @@
#include <stdbool.h>
+#include "celix_properties.h"
+
#define PUBSUB_SUBSCRIBER_SERVICE_NAME "pubsub.subscriber"
#define PUBSUB_SUBSCRIBER_SERVICE_VERSION "3.0.0"
@@ -62,7 +64,7 @@ struct pubsub_subscriber_struct {
*
* this method can be NULL.
*/
- int (*receive)(void *handle, const char *msgType, unsigned int msgTypeId, void *msg, bool *release);
+ int (*receive)(void *handle, const char *msgType, unsigned int msgTypeId, void *msg, const celix_properties_t *metadata, bool *release);
};
typedef struct pubsub_subscriber_struct pubsub_subscriber_t;
diff --git a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
index 964afd9..283db2f 100644
--- a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
+++ b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
@@ -481,8 +481,9 @@ static void pubsub_discovery_addDiscoveredEndpoint(pubsub_discovery_t *disc, cel
const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, "!Error!");
const char *admin = celix_properties_get(endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
const char *ser = celix_properties_get(endpoint, PUBSUB_SERIALIZER_TYPE_KEY, "!Error!");
- L_INFO("[PSD] Adding discovered endpoint %s. type is %s, admin is %s, serializer is %s.\n",
- uuid, type, admin, ser);
+ const char *prot = celix_properties_get(endpoint, PUBSUB_PROTOCOL_TYPE_KEY, "!Error!");
+ L_INFO("[PSD] Adding discovered endpoint %s. type is %s, admin is %s, serializer is %s, protocol is %s.\n",
+ uuid, type, admin, ser, prot);
}
celixThreadMutex_lock(&disc->discoveredEndpointsListenersMutex);
@@ -511,8 +512,9 @@ static void pubsub_discovery_removeDiscoveredEndpoint(pubsub_discovery_t *disc,
const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, "!Error!");
const char *admin = celix_properties_get(endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
const char *ser = celix_properties_get(endpoint, PUBSUB_SERIALIZER_TYPE_KEY, "!Error!");
- L_INFO("[PSD] Removing discovered endpoint %s. type is %s, admin is %s, serializer is %s.\n",
- uuid, type, admin, ser);
+ const char *prot = celix_properties_get(endpoint, PUBSUB_PROTOCOL_TYPE_KEY, "!Error!");
+ L_INFO("[PSD] Removing discovered endpoint %s. type is %s, admin is %s, serializer is %s, protocol = %s.\n",
+ uuid, type, admin, ser, prot);
}
if (endpoint != NULL) {
@@ -607,6 +609,7 @@ bool pubsub_discovery_executeCommand(void *handle, const char * commandLine __at
const char *topic = celix_properties_get(ep, PUBSUB_ENDPOINT_TOPIC_NAME, "!Error!");
const char *adminType = celix_properties_get(ep, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
const char *serType = celix_properties_get(ep, PUBSUB_ENDPOINT_SERIALIZER, "!Error!");
+ const char *protType = celix_properties_get(ep, PUBSUB_ENDPOINT_PROTOCOL, "!Error!");
const char *type = celix_properties_get(ep, PUBSUB_ENDPOINT_TYPE, "!Error!");
fprintf(os, "Endpoint %s:\n", uuid);
fprintf(os, " |- type = %s\n", type);
@@ -614,6 +617,7 @@ bool pubsub_discovery_executeCommand(void *handle, const char * commandLine __at
fprintf(os, " |- topic = %s\n", topic);
fprintf(os, " |- admin type = %s\n", adminType);
fprintf(os, " |- serializer = %s\n", serType);
+ fprintf(os, " |- protocol = %s\n", protType);
}
celixThreadMutex_unlock(&disc->discoveredEndpointsMutex);
@@ -628,6 +632,7 @@ bool pubsub_discovery_executeCommand(void *handle, const char * commandLine __at
const char *topic = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_TOPIC_NAME, "!Error!");
const char *adminType = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
const char *serType = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_SERIALIZER, "!Error!");
+ const char *protType = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_PROTOCOL, "!Error!");
const char *type = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_TYPE, "!Error!");
int age = (int)(now.tv_sec - entry->createTime.tv_sec);
fprintf(os, "Endpoint %s:\n", uuid);
@@ -636,6 +641,7 @@ bool pubsub_discovery_executeCommand(void *handle, const char * commandLine __at
fprintf(os, " |- topic = %s\n", topic);
fprintf(os, " |- admin type = %s\n", adminType);
fprintf(os, " |- serializer = %s\n", serType);
+ fprintf(os, " |- protocol = %s\n", protType);
fprintf(os, " |- age = %ds\n", age);
fprintf(os, " |- is set = %s\n", entry->isSet ? "true" : "false");
if (disc->verbose) {
diff --git a/bundles/pubsub/pubsub_protocol_wire_v1/CMakeLists.txt b/bundles/pubsub/pubsub_protocol_wire_v1/CMakeLists.txt
new file mode 100644
index 0000000..5e734e9
--- /dev/null
+++ b/bundles/pubsub/pubsub_protocol_wire_v1/CMakeLists.txt
@@ -0,0 +1,36 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+add_celix_bundle(celix_pubsub_protocol_wire_v1
+ BUNDLE_SYMBOLICNAME "apache_celix_pubsub_protocol_wire_v1"
+ VERSION "1.0.0"
+ GROUP "Celix/PubSub"
+ SOURCES
+ src/ps_wire_protocol_activator.c
+ src/pubsub_wire_protocol_impl.c
+ src/pubsub_wire_protocol_common.c
+)
+target_include_directories(celix_pubsub_protocol_wire_v1 PRIVATE
+ src
+)
+set_target_properties(celix_pubsub_protocol_wire_v1 PROPERTIES INSTALL_RPATH "$ORIGIN")
+target_link_libraries(celix_pubsub_protocol_wire_v1 PRIVATE Celix::pubsub_spi Celix::framework)
+
+install_celix_bundle(celix_pubsub_protocol_wire_v1 EXPORT celix COMPONENT pubsub)
+
+add_library(Celix::pubsub_protocol_wire_v1 ALIAS celix_pubsub_protocol_wire_v1)
+
diff --git a/bundles/pubsub/pubsub_protocol_wire_v1/src/ps_wire_protocol_activator.c b/bundles/pubsub/pubsub_protocol_wire_v1/src/ps_wire_protocol_activator.c
new file mode 100644
index 0000000..8d99969
--- /dev/null
+++ b/bundles/pubsub/pubsub_protocol_wire_v1/src/ps_wire_protocol_activator.c
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <stdlib.h>
+#include <pubsub_constants.h>
+
+#include "celix_api.h"
+#include "pubsub_wire_protocol_impl.h"
+
+typedef struct ps_wp_activator {
+ pubsub_protocol_wire_v1_t *wireprotocol;
+
+ pubsub_protocol_service_t protocolSvc;
+ long wireProtocolSvcId;
+} ps_wp_activator_t;
+
+static int ps_wp_start(ps_wp_activator_t *act, celix_bundle_context_t *ctx) {
+ act->wireProtocolSvcId = -1L;
+
+ celix_status_t status = pubsubProtocol_create(ctx, &(act->wireprotocol));
+ if (status == CELIX_SUCCESS) {
+ /* Set serializertype */
+ celix_properties_t *props = celix_properties_create();
+ celix_properties_set(props, PUBSUB_PROTOCOL_TYPE_KEY, PUBSUB_WIRE_PROTOCOL_TYPE);
+
+ act->protocolSvc.getSyncHeader = pubsubProtocol_getSyncHeader;
+
+ act->protocolSvc.encodeHeader = pubsubProtocol_encodeHeader;
+ act->protocolSvc.encodePayload = pubsubProtocol_encodePayload;
+ act->protocolSvc.encodeMetadata = pubsubProtocol_encodeMetadata;
+
+ act->protocolSvc.decodeHeader = pubsubProtocol_decodeHeader;
+ act->protocolSvc.decodePayload = pubsubProtocol_decodePayload;
+ act->protocolSvc.decodeMetadata = pubsubProtocol_decodeMetadata;
+
+ act->wireProtocolSvcId = celix_bundleContext_registerService(ctx, &act->protocolSvc, PUBSUB_PROTOCOL_SERVICE_NAME, props);
+ }
+ return status;
+}
+
+static int ps_wp_stop(ps_wp_activator_t *act, celix_bundle_context_t *ctx) {
+ celix_bundleContext_unregisterService(ctx, act->wireProtocolSvcId);
+ act->wireProtocolSvcId = -1L;
+ pubsubProtocol_destroy(act->wireprotocol);
+ return CELIX_SUCCESS;
+}
+
+CELIX_GEN_BUNDLE_ACTIVATOR(ps_wp_activator_t, ps_wp_start, ps_wp_stop)
\ No newline at end of file
diff --git a/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_common.c b/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_common.c
new file mode 100644
index 0000000..dbfd29e
--- /dev/null
+++ b/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_common.c
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "pubsub_wire_protocol_common.h"
+
+#include <string.h>
+#include <endian.h>
+
+int readShort(const unsigned char *data, int offset, uint16_t *val) {
+ memcpy(val, data + offset, sizeof(uint16_t));
+ *val = be16toh(*val);
+ return offset + sizeof(uint16_t);
+}
+
+int readInt(const unsigned char *data, int offset, uint32_t *val) {
+ memcpy(val, data + offset, sizeof(uint32_t));
+ *val = be32toh(*val);
+ return offset + sizeof(uint32_t);
+}
+
+int readLong(const unsigned char *data, int offset, uint64_t *val) {
+ memcpy(val, data + offset, sizeof(uint64_t));
+ *val = be64toh(*val);
+ return offset + sizeof(uint64_t);
+}
+
+int writeShort(unsigned char *data, int offset, uint16_t val) {
+ uint16_t nVal = htobe16(val);
+ memcpy(data + offset, &nVal, sizeof(uint16_t));
+ return offset + sizeof(uint16_t);
+}
+
+int writeInt(unsigned char *data, int offset, uint32_t val) {
+ uint32_t nVal = htobe32(val);
+ memcpy(data + offset, &nVal, sizeof(uint32_t));
+ return offset + sizeof(uint32_t);
+}
+
+int writeLong(unsigned char *data, int offset, uint64_t val) {
+ uint64_t nVal = htobe64(val);
+ memcpy(data + offset, &nVal, sizeof(uint64_t));
+ return offset + sizeof(uint64_t);
+}
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_constants.h b/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_common.h
similarity index 55%
copy from bundles/pubsub/pubsub_spi/include/pubsub_constants.h
copy to bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_common.h
index bbbe5d3..8edf788 100644
--- a/bundles/pubsub/pubsub_spi/include/pubsub_constants.h
+++ b/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_common.h
@@ -17,25 +17,20 @@
* under the License.
*/
-#ifndef PUBSUB_CONSTANTS_H_
-#define PUBSUB_CONSTANTS_H_
+#include <stdint.h>
-#define PUBSUB_ADMIN_TYPE_KEY "pubsub.config"
-#define PUBSUB_SERIALIZER_TYPE_KEY "pubsub.serializer"
+#ifndef CELIX_PUBSUB_WIRE_PROTOCOL_COMMON_H
+#define CELIX_PUBSUB_WIRE_PROTOCOL_COMMON_H
-/**
- * Endpoints with the system visibility should be discoverable through the complete system
- */
-#define PUBSUB_ENDPOINT_SYSTEM_VISIBILITY "system"
+static const unsigned int PROTOCOL_WIRE_SYNC = 0xABBABAAB;
+static const unsigned int PROTOCOL_WIRE_ENVELOPE_VERSION = 1;
-/**
- * Endpoints with the system visibility are discoverable for a single host (i.e. IPC)
- */
-#define PUBSUB_ENDPOINT_HOST_VISIBILITY "host"
+int readShort(const unsigned char *data, int offset, uint16_t *val);
+int readInt(const unsigned char *data, int offset, uint32_t *val);
+int readLong(const unsigned char *data, int offset, uint64_t *val);
-/**
- * Endpoints which are only visible within a single process
- */
-#define PUBSUB_ENDPOINT_LOCAL_VISIBILITY "local"
+int writeShort(unsigned char *data, int offset, uint16_t val);
+int writeInt(unsigned char *data, int offset, uint32_t val);
+int writeLong(unsigned char *data, int offset, uint64_t val);
-#endif /* PUBSUB_CONSTANTS_H_ */
+#endif //CELIX_PUBSUB_WIRE_PROTOCOL_COMMON_H
diff --git a/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.c b/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.c
new file mode 100644
index 0000000..4f40e84
--- /dev/null
+++ b/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.c
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <math.h>
+#include <string.h>
+
+#include "utils.h"
+#include "celix_properties.h"
+
+#include "pubsub_wire_protocol_impl.h"
+#include "pubsub_wire_protocol_common.h"
+
+#define NETSTRING_ERROR_TOO_LONG -1
+#define NETSTRING_ERROR_NO_COLON -2
+#define NETSTRING_ERROR_TOO_SHORT -3
+#define NETSTRING_ERROR_NO_COMMA -4
+#define NETSTRING_ERROR_LEADING_ZERO -5
+#define NETSTRING_ERROR_NO_LENGTH -6
+
+struct pubsub_protocol_wire_v1 {
+ celix_bundle_context_t *bundle_context;
+};
+
+static celix_status_t pubsubProtocol_createNetstring(const char* string, char** netstringOut);
+static int pubsubProtocol_parseNetstring(char *buffer, size_t buffer_length,
+ char **netstring_start, size_t *netstring_length);
+
+celix_status_t pubsubProtocol_create(celix_bundle_context_t *context, pubsub_protocol_wire_v1_t **protocol) {
+ celix_status_t status = CELIX_SUCCESS;
+
+ *protocol = calloc(1, sizeof(**protocol));
+
+ if (!*protocol) {
+ status = CELIX_ENOMEM;
+ }
+ else {
+ (*protocol)->bundle_context = context;
+ }
+
+ return status;
+}
+
+celix_status_t pubsubProtocol_destroy(pubsub_protocol_wire_v1_t* protocol) {
+ celix_status_t status = CELIX_SUCCESS;
+
+ free(protocol);
+
+ return status;
+}
+
+celix_status_t pubsubProtocol_getSyncHeader(void* handle, void *syncHeader) {
+ for (int i = 0; i < 5; ++i) {
+ ((char *) syncHeader)[i] = '\0';
+ }
+ writeInt(syncHeader, 0, PROTOCOL_WIRE_SYNC);
+
+ return CELIX_SUCCESS;
+}
+
+celix_status_t pubsubProtocol_encodeHeader(void *handle, pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength) {
+ celix_status_t status = CELIX_SUCCESS;
+
+ *outBuffer = calloc(1, 24);
+ if (*outBuffer == NULL) {
+ status = CELIX_ENOMEM;
+ } else {
+ int idx = 0;
+ idx = writeInt(*outBuffer, idx, PROTOCOL_WIRE_SYNC);
+ idx = writeInt(*outBuffer, idx, PROTOCOL_WIRE_ENVELOPE_VERSION);
+ idx = writeInt(*outBuffer, idx, message->header.msgId);
+ idx = writeShort(*outBuffer, idx, message->header.msgMajorVersion);
+ idx = writeShort(*outBuffer, idx, message->header.msgMinorVersion);
+ idx = writeInt(*outBuffer, idx, message->header.payloadSize);
+ idx = writeInt(*outBuffer, idx, message->header.metadataSize);
+
+ *outLength = idx;
+ }
+
+ return status;
+}
+
+celix_status_t pubsubProtocol_encodePayload(void *handle, pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength) {
+ *outBuffer = message->payload.payload;
+ *outLength = message->payload.length;
+
+ return CELIX_SUCCESS;
+}
+
+celix_status_t pubsubProtocol_encodeMetadata(void *handle, pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength) {
+ celix_status_t status = CELIX_SUCCESS;
+
+ char *line = calloc(1, 1);
+ size_t len = 0;
+
+ const char *key;
+ if (message->metadata.metadata != NULL || celix_properties_size(message->metadata.metadata) > 0) {
+ CELIX_PROPERTIES_FOR_EACH(message->metadata.metadata, key) {
+ const char *val = celix_properties_get(message->metadata.metadata, key, "!Error!");
+ char *keyNetString = NULL;
+ char *valueNetString = NULL;
+
+ pubsubProtocol_createNetstring(key, &keyNetString);
+ pubsubProtocol_createNetstring(val, &valueNetString);
+
+ len += strlen(keyNetString);
+ len += strlen(valueNetString);
+ char *tmp = realloc(line, len + 1);
+ if (!tmp) {
+ free(line);
+ status = CELIX_ENOMEM;
+ }
+ line = tmp;
+
+ strncat(line, keyNetString, strlen(keyNetString));
+ strncat(line, valueNetString, strlen(valueNetString));
+
+ free(keyNetString);
+ free(valueNetString);
+ }
+ }
+
+ *outBuffer = line;
+ *outLength = len;
+
+ return status;
+}
+
+celix_status_t pubsubProtocol_decodeHeader(void* handle, void *data, size_t length, pubsub_protocol_message_t *message) {
+ celix_status_t status = CELIX_SUCCESS;
+
+ int idx = 0;
+ unsigned int sync;
+ idx = readInt(data, idx, &sync);
+ if (sync != PROTOCOL_WIRE_SYNC) {
+ status = CELIX_ILLEGAL_ARGUMENT;
+ } else {
+ unsigned int envelopeVersion;
+ idx = readInt(data, idx, &envelopeVersion);
+ if (envelopeVersion != PROTOCOL_WIRE_ENVELOPE_VERSION) {
+ status = CELIX_ILLEGAL_ARGUMENT;
+ } else {
+ idx = readInt(data, idx, &message->header.msgId);
+ idx = readShort(data, idx, &message->header.msgMajorVersion);
+ idx = readShort(data, idx, &message->header.msgMinorVersion);
+ idx = readInt(data, idx, &message->header.payloadSize);
+ readInt(data, idx, &message->header.metadataSize);
+ }
+ }
+
+ return status;
+}
+
+celix_status_t pubsubProtocol_decodePayload(void* handle, void *data, size_t length, pubsub_protocol_message_t *message){
+ message->payload.payload = data;
+ message->payload.length = length;
+
+ return CELIX_SUCCESS;
+}
+
+celix_status_t pubsubProtocol_decodeMetadata(void* handle, void *data, size_t length, pubsub_protocol_message_t *message) {
+ celix_status_t status = CELIX_SUCCESS;
+
+ char *netstring = data;
+
+ message->metadata.metadata = celix_properties_create();
+ while (strlen(netstring) > 0) {
+ size_t outlen;
+ pubsubProtocol_parseNetstring(netstring, length, &netstring, &outlen);
+ char *key = strndup(netstring, outlen);
+ netstring += outlen + 1;
+
+ pubsubProtocol_parseNetstring(netstring, length, &netstring, &outlen);
+ char *value = strndup(netstring, outlen);
+ netstring += outlen + 1;
+
+ celix_properties_setWithoutCopy(message->metadata.metadata, key, value);
+ }
+
+ return status;
+}
+
+static celix_status_t pubsubProtocol_createNetstring(const char* string, char** netstringOut) {
+ celix_status_t status = CELIX_SUCCESS;
+
+ size_t str_len = strlen(string);
+ if (str_len == 0) {
+ // 0:,
+ *netstringOut = calloc(1, 4);
+ if (*netstringOut == NULL) {
+ status = CELIX_ENOMEM;
+ } else {
+ *netstringOut[0] = '0';
+ *netstringOut[1] = ':';
+ *netstringOut[2] = ',';
+ *netstringOut[3] = '\0';
+ }
+ } else {
+ size_t numlen = ceil(log10(str_len + 1));
+ *netstringOut = calloc(1, numlen + str_len + 3);
+ if (*netstringOut == NULL) {
+ status = CELIX_ENOMEM;
+ } else {
+ sprintf(*netstringOut, "%zu:%s,", str_len, string);
+ }
+ }
+
+ return status;
+}
+
+/* Reads a netstring from a `buffer` of length `buffer_length`. Writes
+ to `netstring_start` a pointer to the beginning of the string in
+ the buffer, and to `netstring_length` the length of the
+ string. Does not allocate any memory. If it reads successfully,
+ then it returns 0. If there is an error, then the return value will
+ be negative. The error values are:
+ NETSTRING_ERROR_TOO_LONG More than 999999999 bytes in a field
+ NETSTRING_ERROR_NO_COLON No colon was found after the number
+ NETSTRING_ERROR_TOO_SHORT Number of bytes greater than buffer length
+ NETSTRING_ERROR_NO_COMMA No comma was found at the end
+ NETSTRING_ERROR_LEADING_ZERO Leading zeros are not allowed
+ NETSTRING_ERROR_NO_LENGTH Length not given at start of netstring
+ If you're sending messages with more than 999999999 bytes -- about
+ 2 GB -- then you probably should not be doing so in the form of a
+ single netstring. This restriction is in place partially to protect
+ from malicious or erroneous input, and partly to be compatible with
+ D. J. Bernstein's reference implementation.
+ Example:
+ if (netstring_read("3:foo,", 6, &str, &len) < 0) explode_and_die();
+ */
+static int pubsubProtocol_parseNetstring(char *buffer, size_t buffer_length,
+ char **netstring_start, size_t *netstring_length) {
+ int i;
+ size_t len = 0;
+
+ /* Write default values for outputs */
+ *netstring_start = NULL; *netstring_length = 0;
+
+ /* Make sure buffer is big enough. Minimum size is 3. */
+ if (buffer_length < 3) return NETSTRING_ERROR_TOO_SHORT;
+
+ /* No leading zeros allowed! */
+ if (buffer[0] == '0' && isdigit(buffer[1]))
+ return NETSTRING_ERROR_LEADING_ZERO;
+
+ /* The netstring must start with a number */
+ if (!isdigit(buffer[0])) return NETSTRING_ERROR_NO_LENGTH;
+
+ /* Read the number of bytes */
+ for (i = 0; i < buffer_length && isdigit(buffer[i]); i++) {
+ /* Error if more than 9 digits */
+ if (i >= 9) return NETSTRING_ERROR_TOO_LONG;
+ /* Accumulate each digit, assuming ASCII. */
+ len = len*10 + (buffer[i] - '0');
+ }
+
+ /* Check buffer length once and for all. Specifically, we make sure
+ that the buffer is longer than the number we've read, the length
+ of the string itself, and the colon and comma. */
+ if (i + len + 1 >= buffer_length) return NETSTRING_ERROR_TOO_SHORT;
+
+ /* Read the colon */
+ if (buffer[i++] != ':') return NETSTRING_ERROR_NO_COLON;
+
+ /* Test for the trailing comma, and set the return values */
+ if (buffer[i + len] != ',') return NETSTRING_ERROR_NO_COMMA;
+ *netstring_start = &buffer[i]; *netstring_length = len;
+
+ return 0;
+}
\ No newline at end of file
diff --git a/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.h b/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.h
new file mode 100644
index 0000000..7705585
--- /dev/null
+++ b/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.h
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef PUBSUB_PROTOCOL_WIRE_H_
+#define PUBSUB_PROTOCOL_WIRE_H_
+
+#include "pubsub_protocol.h"
+
+#define PUBSUB_WIRE_PROTOCOL_TYPE "wire"
+
+typedef struct pubsub_protocol_wire_v1 pubsub_protocol_wire_v1_t;
+
+celix_status_t pubsubProtocol_create(celix_bundle_context_t *context, pubsub_protocol_wire_v1_t **protocol);
+celix_status_t pubsubProtocol_destroy(pubsub_protocol_wire_v1_t* protocol);
+
+celix_status_t pubsubProtocol_getSyncHeader(void* handle, void *syncHeader);
+
+celix_status_t pubsubProtocol_encodeHeader(void *handle, pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength);
+celix_status_t pubsubProtocol_encodePayload(void *handle, pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength);
+celix_status_t pubsubProtocol_encodeMetadata(void *handle, pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength);
+
+celix_status_t pubsubProtocol_decodeHeader(void* handle, void *data, size_t length, pubsub_protocol_message_t *message);
+celix_status_t pubsubProtocol_decodePayload(void* handle, void *data, size_t length, pubsub_protocol_message_t *message);
+celix_status_t pubsubProtocol_decodeMetadata(void* handle, void *data, size_t length, pubsub_protocol_message_t *message);
+
+
+#endif /* PUBSUB_PROTOCOL_WIRE_H_ */
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_admin.h b/bundles/pubsub/pubsub_spi/include/pubsub_admin.h
index cd172cc..15d5d16 100644
--- a/bundles/pubsub/pubsub_spi/include/pubsub_admin.h
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_admin.h
@@ -44,16 +44,16 @@
struct pubsub_admin_service {
void *handle;
- celix_status_t (*matchPublisher)(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, celix_properties_t **outTopicProperties, double *outScore, long *outSerializerSvcId);
- celix_status_t (*matchSubscriber)(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, celix_properties_t **outTopicProperties, double *outScore, long *outSerializerSvcId);
+ celix_status_t (*matchPublisher)(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, celix_properties_t **outTopicProperties, double *outScore, long *outSerializerSvcId, long *outProtocolSvcId);
+ celix_status_t (*matchSubscriber)(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, celix_properties_t **outTopicProperties, double *outScore, long *outSerializerSvcId, long *outProtocolSvcId);
celix_status_t (*matchDiscoveredEndpoint)(void *handle, const celix_properties_t *endpoint, bool *match);
//note endpoint is owned by caller
- celix_status_t (*setupTopicSender)(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, celix_properties_t **publisherEndpoint);
+ celix_status_t (*setupTopicSender)(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, long protocolSvcId, celix_properties_t **publisherEndpoint);
celix_status_t (*teardownTopicSender)(void *handle, const char *scope, const char *topic);
//note endpoint is owned by caller
- celix_status_t (*setupTopicReceiver)(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, celix_properties_t **subscriberEndpoint);
+ celix_status_t (*setupTopicReceiver)(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, long protocolSvcId, celix_properties_t **subscriberEndpoint);
celix_status_t (*teardownTopicReceiver)(void *handle, const char *scope, const char *topic);
celix_status_t (*addDiscoveredEndpoint)(void *handle, const celix_properties_t *endpoint);
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_constants.h b/bundles/pubsub/pubsub_spi/include/pubsub_constants.h
index bbbe5d3..671b874 100644
--- a/bundles/pubsub/pubsub_spi/include/pubsub_constants.h
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_constants.h
@@ -20,8 +20,9 @@
#ifndef PUBSUB_CONSTANTS_H_
#define PUBSUB_CONSTANTS_H_
-#define PUBSUB_ADMIN_TYPE_KEY "pubsub.config"
-#define PUBSUB_SERIALIZER_TYPE_KEY "pubsub.serializer"
+#define PUBSUB_ADMIN_TYPE_KEY "pubsub.config"
+#define PUBSUB_SERIALIZER_TYPE_KEY "pubsub.serializer"
+#define PUBSUB_PROTOCOL_TYPE_KEY "pubsub.protocol"
/**
* Endpoints with the system visibility should be discoverable through the complete system
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_endpoint.h b/bundles/pubsub/pubsub_spi/include/pubsub_endpoint.h
index 8e51474..5ba178b 100644
--- a/bundles/pubsub/pubsub_spi/include/pubsub_endpoint.h
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_endpoint.h
@@ -41,6 +41,7 @@ extern "C" {
#define PUBSUB_ENDPOINT_VISIBILITY "pubsub.endpoint.visibility" //local, host or system. e.g. for IPC host
#define PUBSUB_ENDPOINT_ADMIN_TYPE PUBSUB_ADMIN_TYPE_KEY
#define PUBSUB_ENDPOINT_SERIALIZER PUBSUB_SERIALIZER_TYPE_KEY
+#define PUBSUB_ENDPOINT_PROTOCOL PUBSUB_PROTOCOL_TYPE_KEY
#define PUBSUB_PUBLISHER_ENDPOINT_TYPE "publisher"
@@ -50,7 +51,7 @@ extern "C" {
celix_properties_t *
pubsubEndpoint_create(const char *fwUUID, const char *scope, const char *topic, const char *pubsubType,
- const char *adminType, const char *serType, celix_properties_t *topic_props);
+ const char *adminType, const char *serType, const char *protType, celix_properties_t *topic_props);
celix_properties_t *
pubsubEndpoint_createFromSubscriberSvc(bundle_context_t *ctx, long svcBndId, const celix_properties_t *svcProps);
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_protocol.h b/bundles/pubsub/pubsub_spi/include/pubsub_protocol.h
new file mode 100644
index 0000000..44575e4
--- /dev/null
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_protocol.h
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef PUBSUB_PROTOCOL_SERVICE_H_
+#define PUBSUB_PROTOCOL_SERVICE_H_
+
+#include "celix_properties.h"
+#include "version.h"
+#include "celix_bundle.h"
+
+#define PUBSUB_PROTOCOL_SERVICE_NAME "pubsub_protocol"
+#define PUBSUB_PROTOCOL_SERVICE_VERSION "1.0.0"
+#define PUBSUB_PROTOCOL_SERVICE_RANGE "[1,2)"
+
+typedef struct pubsub_protocol_header pubsub_protocol_header_t;
+
+struct pubsub_protocol_header {
+ unsigned int msgId;
+ unsigned short msgMajorVersion;
+ unsigned short msgMinorVersion;
+
+ unsigned int payloadSize;
+ unsigned int metadataSize;
+};
+
+typedef struct pubsub_protocol_payload pubsub_protocol_payload_t;
+
+struct pubsub_protocol_payload {
+ void *payload;
+ size_t length;
+};
+
+typedef struct pubsub_protocol_metadata pubsub_protocol_metadata_t;
+
+struct pubsub_protocol_metadata {
+ celix_properties_t *metadata;
+};
+
+typedef struct pubsub_protocol_message pubsub_protocol_message_t;
+
+struct pubsub_protocol_message {
+ pubsub_protocol_header_t header;
+ pubsub_protocol_payload_t payload;
+ pubsub_protocol_metadata_t metadata;
+};
+
+typedef struct pubsub_protocol_service {
+ void* handle;
+
+ celix_status_t (*getSyncHeader)(void *handle, void *sync);
+
+ celix_status_t (*encodeHeader)(void *handle, pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength);
+ celix_status_t (*encodePayload)(void *handle, pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength);
+ celix_status_t (*encodeMetadata)(void *handle, pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength);
+
+ celix_status_t (*decodeHeader)(void* handle, void *data, size_t length, pubsub_protocol_message_t *message);
+ celix_status_t (*decodePayload)(void* handle, void *data, size_t length, pubsub_protocol_message_t *message);
+ celix_status_t (*decodeMetadata)(void* handle, void *data, size_t length, pubsub_protocol_message_t *message);
+} pubsub_protocol_service_t;
+
+#endif /* PUBSUB_PROTOCOL_SERVICE_H_ */
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_utils.h b/bundles/pubsub/pubsub_spi/include/pubsub_utils.h
index 557014c..47bb957 100644
--- a/bundles/pubsub/pubsub_spi/include/pubsub_utils.h
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_utils.h
@@ -86,8 +86,10 @@ double pubsub_utils_matchPublisher(
double sampleScore,
double controlScore,
double defaultScore,
+ bool matchProtocol,
celix_properties_t **outTopicProperties,
- long *outSerializerSvcId);
+ long *outSerializerSvcId,
+ long *outProtocolSvcId);
/**
* Match a subscriber for a provided bnd (using the bundleId) and provided service properties.
@@ -127,8 +129,10 @@ double pubsub_utils_matchSubscriber(
double sampleScore,
double controlScore,
double defaultScore,
+ bool matchProtocol,
celix_properties_t **outTopicProperties,
- long *outSerializerSvcId);
+ long *outSerializerSvcId,
+ long *outProtocolSvcId);
/**
* Match an endpoint (subscriber or publisher endpoint) for the provided admin type.
@@ -146,7 +150,9 @@ bool pubsub_utils_matchEndpoint(
celix_bundle_context_t *ctx,
const celix_properties_t *endpoint,
const char *adminType,
- long *outSerializerSvcId);
+ bool matchProtocol,
+ long *outSerializerSvcId,
+ long *outProtocolSvcId);
/**
diff --git a/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c b/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c
index 91a01a3..f9e74b2 100644
--- a/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c
+++ b/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c
@@ -39,9 +39,9 @@
#include "pubsub_utils.h"
-static void pubsubEndpoint_setFields(celix_properties_t *psEp, const char* fwUUID, const char* scope, const char* topic, const char *pubsubType, const char *adminType, const char *serType, const celix_properties_t *topic_props);
+static void pubsubEndpoint_setFields(celix_properties_t *psEp, const char* fwUUID, const char* scope, const char* topic, const char *pubsubType, const char *adminType, const char *serType, const char *protType, const celix_properties_t *topic_props);
-static void pubsubEndpoint_setFields(celix_properties_t *ep, const char* fwUUID, const char* scope, const char* topic, const char *pubsubType, const char *adminType, const char *serType, const celix_properties_t *topic_props) {
+static void pubsubEndpoint_setFields(celix_properties_t *ep, const char* fwUUID, const char* scope, const char* topic, const char *pubsubType, const char *adminType, const char *serType, const char *protType, const celix_properties_t *topic_props) {
assert(ep != NULL);
//copy topic properties
@@ -82,6 +82,10 @@ static void pubsubEndpoint_setFields(celix_properties_t *ep, const char* fwUUID,
if (serType != NULL) {
celix_properties_set(ep, PUBSUB_ENDPOINT_SERIALIZER, serType);
}
+
+ if (protType != NULL) {
+ celix_properties_set(ep, PUBSUB_ENDPOINT_PROTOCOL, protType);
+ }
}
celix_properties_t* pubsubEndpoint_create(
@@ -91,9 +95,10 @@ celix_properties_t* pubsubEndpoint_create(
const char* pubsubType,
const char* adminType,
const char *serType,
+ const char *protType,
celix_properties_t *topic_props) {
celix_properties_t *ep = celix_properties_create();
- pubsubEndpoint_setFields(ep, fwUUID, scope, topic, pubsubType, adminType, serType, topic_props);
+ pubsubEndpoint_setFields(ep, fwUUID, scope, topic, pubsubType, adminType, serType, protType, topic_props);
if (!pubsubEndpoint_isValid(ep, true, true)) {
celix_properties_destroy(ep);
ep = NULL;
@@ -128,7 +133,7 @@ celix_properties_t* pubsubEndpoint_createFromSubscriberSvc(bundle_context_t* ctx
const char *pubsubType = PUBSUB_SUBSCRIBER_ENDPOINT_TYPE;
- pubsubEndpoint_setFields(ep, fwUUID, scope, topic, pubsubType, NULL, NULL, data.props);
+ pubsubEndpoint_setFields(ep, fwUUID, scope, topic, pubsubType, NULL, NULL, NULL, data.props);
if (data.props != NULL) {
celix_properties_destroy(data.props); //Can be deleted since setFields invokes properties_copy
@@ -161,7 +166,7 @@ celix_properties_t* pubsubEndpoint_createFromPublisherTrackerInfo(bundle_context
celix_bundleContext_useBundle(ctx, bundleId, &data, retrieveTopicProperties);
if (data.props != NULL) {
- pubsubEndpoint_setFields(ep, fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE, NULL, NULL, data.props);
+ pubsubEndpoint_setFields(ep, fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE, NULL, NULL, NULL, data.props);
celix_properties_destroy(data.props); //safe to delete, properties are copied in pubsubEndpoint_setFields
}
diff --git a/bundles/pubsub/pubsub_spi/src/pubsub_utils_match.c b/bundles/pubsub/pubsub_spi/src/pubsub_utils_match.c
index 7196dfb..226b6ed 100644
--- a/bundles/pubsub/pubsub_spi/src/pubsub_utils_match.c
+++ b/bundles/pubsub/pubsub_spi/src/pubsub_utils_match.c
@@ -21,6 +21,7 @@
#include <limits.h>
#include <pubsub_endpoint.h>
#include <pubsub_serializer.h>
+#include <pubsub_protocol.h>
#include "service_reference.h"
@@ -98,6 +99,50 @@ static long getPSASerializer(celix_bundle_context_t *ctx, const char *requested_
return svcId;
}
+struct psa_protocol_selection_data {
+ const char *requested_protocol;
+ long matchingSvcId;
+};
+
+void psa_protocol_selection_callback(void *handle, void *svc __attribute__((unused)), const celix_properties_t *props) {
+ struct psa_protocol_selection_data *data = handle;
+ const char *serType = celix_properties_get(props, PUBSUB_PROTOCOL_TYPE_KEY, NULL);
+ if (serType == NULL) {
+ fprintf(stderr, "Warning found protocol without mandatory protocol type key (%s)\n", PUBSUB_PROTOCOL_TYPE_KEY);
+ } else {
+ if (strncmp(data->requested_protocol, serType, 1024 * 1024) == 0) {
+ data->matchingSvcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1L);
+ }
+ }
+}
+
+static long getPSAProtocol(celix_bundle_context_t *ctx, const char *requested_protocol) {
+ long svcId;
+
+ if (requested_protocol != NULL) {
+ struct psa_protocol_selection_data data;
+ data.requested_protocol = requested_protocol;
+ data.matchingSvcId = -1L;
+
+ celix_service_use_options_t opts = CELIX_EMPTY_SERVICE_USE_OPTIONS;
+ opts.filter.serviceName = PUBSUB_PROTOCOL_SERVICE_NAME;
+ opts.filter.ignoreServiceLanguage = true;
+ opts.callbackHandle = &data;
+ opts.useWithProperties = psa_protocol_selection_callback;
+ celix_bundleContext_useServicesWithOptions(ctx, &opts);
+ svcId = data.matchingSvcId;
+ } else {
+ celix_service_filter_options_t opts = CELIX_EMPTY_SERVICE_FILTER_OPTIONS;
+ opts.serviceName = PUBSUB_PROTOCOL_SERVICE_NAME;
+ opts.ignoreServiceLanguage = true;
+
+ //note findService will automatically return the highest ranking service id
+ svcId = celix_bundleContext_findServiceWithOptions(ctx, &opts);
+ }
+
+ return svcId;
+}
+
double pubsub_utils_matchPublisher(
celix_bundle_context_t *ctx,
long bundleId,
@@ -106,8 +151,10 @@ double pubsub_utils_matchPublisher(
double sampleScore,
double controlScore,
double defaultScore,
+ bool matchProtocol,
celix_properties_t **outTopicProperties,
- long *outSerializerSvcId) {
+ long *outSerializerSvcId,
+ long *outProtocolSvcId) {
celix_properties_t *ep = pubsubEndpoint_createFromPublisherTrackerInfo(ctx, bundleId, filter);
const char *requested_admin = NULL;
@@ -126,12 +173,23 @@ double pubsub_utils_matchPublisher(
score = PUBSUB_ADMIN_NO_MATCH_SCORE; //no serializer, no match
}
-// printf("Score publisher service for psa type %s is %f\n", adminType, score);
-
if (outSerializerSvcId != NULL) {
*outSerializerSvcId = serializerSvcId;
}
+ if (matchProtocol) {
+ const char *requested_protocol = celix_properties_get(ep, PUBSUB_ENDPOINT_PROTOCOL, NULL);
+ long protocolSvcId = getPSAProtocol(ctx, requested_protocol);
+
+ if (protocolSvcId < 0) {
+ score = PUBSUB_ADMIN_NO_MATCH_SCORE;
+ }
+
+ if (outProtocolSvcId != NULL) {
+ *outProtocolSvcId = protocolSvcId;
+ }
+ }
+
if (outTopicProperties != NULL) {
*outTopicProperties = ep;
} else if (ep != NULL) {
@@ -161,8 +219,10 @@ double pubsub_utils_matchSubscriber(
double sampleScore,
double controlScore,
double defaultScore,
+ bool matchProtocol,
celix_properties_t **outTopicProperties,
- long *outSerializerSvcId) {
+ long *outSerializerSvcId,
+ long *outProtocolSvcId) {
pubsub_get_topic_properties_data_t data;
data.isPublisher = false;
@@ -174,10 +234,14 @@ double pubsub_utils_matchSubscriber(
const char *requested_admin = NULL;
const char *requested_qos = NULL;
const char *requested_serializer = NULL;
+ const char *requested_protocol = NULL;
if (ep != NULL) {
requested_admin = celix_properties_get(ep, PUBSUB_ENDPOINT_ADMIN_TYPE, NULL);
requested_qos = celix_properties_get(ep, PUBSUB_UTILS_QOS_ATTRIBUTE_KEY, NULL);
requested_serializer = celix_properties_get(ep, PUBSUB_ENDPOINT_SERIALIZER, NULL);
+ if (matchProtocol) {
+ requested_protocol = celix_properties_get(ep, PUBSUB_ENDPOINT_PROTOCOL, NULL);
+ }
}
double score = getPSAScore(requested_admin, requested_qos, adminType, sampleScore, controlScore, defaultScore);
@@ -191,6 +255,17 @@ double pubsub_utils_matchSubscriber(
*outSerializerSvcId = serializerSvcId;
}
+ if (matchProtocol) {
+ long protocolSvcId = getPSAProtocol(ctx, requested_protocol);
+ if (protocolSvcId < 0) {
+ score = PUBSUB_ADMIN_NO_MATCH_SCORE; //no protocol, no match
+ }
+
+ if (outProtocolSvcId != NULL) {
+ *outProtocolSvcId = protocolSvcId;
+ }
+ }
+
if (outTopicProperties != NULL) {
*outTopicProperties = ep;
} else if (ep != NULL) {
@@ -204,7 +279,9 @@ bool pubsub_utils_matchEndpoint(
celix_bundle_context_t *ctx,
const celix_properties_t *ep,
const char *adminType,
- long *outSerializerSvcId) {
+ bool matchProtocol,
+ long *outSerializerSvcId,
+ long *outProtocolSvcId) {
bool psaMatch = false;
const char *configured_admin = celix_properties_get(ep, PUBSUB_ENDPOINT_ADMIN_TYPE, NULL);
@@ -221,7 +298,21 @@ bool pubsub_utils_matchEndpoint(
}
bool match = psaMatch && serMatch;
-// printf("Match for endpoint for psa type %s is %s\n", adminType, match ? "true" : "false");
+
+ if (matchProtocol) {
+ bool protMatch = false;
+ long protocolSvcId = -1L;
+ if (psaMatch) {
+ const char *configured_protocol = celix_properties_get(ep, PUBSUB_ENDPOINT_PROTOCOL, NULL);
+ protocolSvcId = getPSAProtocol(ctx, configured_protocol);
+ protMatch = protocolSvcId >= 0;
+ }
+ match = match && protMatch;
+
+ if (outProtocolSvcId != NULL) {
+ *outProtocolSvcId = protocolSvcId;
+ }
+ }
if (outSerializerSvcId != NULL) {
*outSerializerSvcId = serializerSvcId;
diff --git a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
index a8388bb..5ec5140 100644
--- a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
+++ b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
@@ -263,6 +263,7 @@ void pubsub_topologyManager_psaRemoved(void *handle, void *svc __attribute__((un
entry->needsMatch = true;
entry->selectedSerializerSvcId = -1L;
+ entry->selectedProtocolSvcId = -1L;
entry->selectedPsaSvcId = -1L;
if (entry->endpoint != NULL) {
celix_properties_destroy(entry->endpoint);
@@ -290,6 +291,7 @@ void pubsub_topologyManager_psaRemoved(void *handle, void *svc __attribute__((un
entry->needsMatch = true;
entry->selectedSerializerSvcId = -1L;
+ entry->selectedProtocolSvcId = -1L;
entry->selectedPsaSvcId = -1L;
if (entry->endpoint != NULL) {
celix_properties_destroy(entry->endpoint);
@@ -458,6 +460,7 @@ void pubsub_topologyManager_publisherTrackerAdded(void *handle, const celix_serv
entry = calloc(1, sizeof(*entry));
entry->usageCount = 1;
entry->selectedSerializerSvcId = -1L;
+ entry->selectedProtocolSvcId = -1L;
entry->selectedPsaSvcId = -1L;
entry->scope = scope; //taking ownership
entry->topic = topic; //taking ownership
@@ -673,6 +676,7 @@ static void pstm_teardownTopicSenders(pubsub_topology_manager_t *manager) {
entry->endpoint = NULL;
entry->selectedPsaSvcId = -1L;
entry->selectedSerializerSvcId = -1L;
+ entry->selectedProtocolSvcId = -1L;
}
}
}
@@ -737,6 +741,7 @@ static void pstm_teardownTopicReceivers(pubsub_topology_manager_t *manager) {
entry->endpoint = NULL;
entry->selectedPsaSvcId = -1L;
entry->selectedSerializerSvcId = -1L;
+ entry->selectedProtocolSvcId = -1L;
}
}
}
@@ -789,7 +794,7 @@ static void pstm_findPsaForEndpoints(pubsub_topology_manager_t *manager) {
static void pstm_setupTopicSenderCallback(void *handle, void *svc) {
pstm_topic_receiver_or_sender_entry_t *entry = handle;
pubsub_admin_service_t *psa = svc;
- psa->setupTopicSender(psa->handle, entry->scope, entry->topic, entry->topicProperties, entry->selectedSerializerSvcId, &entry->endpoint);
+ psa->setupTopicSender(psa->handle, entry->scope, entry->topic, entry->topicProperties, entry->selectedSerializerSvcId, entry->selectedProtocolSvcId, &entry->endpoint);
}
static void pstm_setupTopicSenders(pubsub_topology_manager_t *manager) {
@@ -801,6 +806,7 @@ static void pstm_setupTopicSenders(pubsub_topology_manager_t *manager) {
//new topic sender needed, requesting match with current psa
double highestScore = PUBSUB_ADMIN_NO_MATCH_SCORE;
long serializerSvcId = -1L;
+ long protocolSvcId = -1L;
long selectedPsaSvcId = -1L;
celix_properties_t *topicPropertiesForHighestMatch = NULL;
@@ -812,14 +818,16 @@ static void pstm_setupTopicSenders(pubsub_topology_manager_t *manager) {
pubsub_admin_service_t *psa = hashMapEntry_getValue(mapEntry);
double score = PUBSUB_ADMIN_NO_MATCH_SCORE;
long serSvcId = -1L;
+ long protSvcId = -1L;
celix_properties_t *topicProps = NULL;
- psa->matchPublisher(psa->handle, entry->bndId, entry->publisherFilter, &topicProps, &score, &serSvcId);
+ psa->matchPublisher(psa->handle, entry->bndId, entry->publisherFilter, &topicProps, &score, &serSvcId, &protSvcId);
if (score > highestScore) {
if (topicPropertiesForHighestMatch != NULL) {
celix_properties_destroy(topicPropertiesForHighestMatch);
}
highestScore = score;
serializerSvcId = serSvcId;
+ protocolSvcId = protSvcId;
selectedPsaSvcId = svcId;
topicPropertiesForHighestMatch = topicProps;
} else if (topicProps != NULL) {
@@ -831,6 +839,7 @@ static void pstm_setupTopicSenders(pubsub_topology_manager_t *manager) {
if (highestScore > PUBSUB_ADMIN_NO_MATCH_SCORE) {
entry->selectedPsaSvcId = selectedPsaSvcId;
entry->selectedSerializerSvcId = serializerSvcId;
+ entry->selectedProtocolSvcId = protocolSvcId;
entry->topicProperties = topicPropertiesForHighestMatch;
bool called = celix_bundleContext_useServiceWithId(manager->context, selectedPsaSvcId, PUBSUB_ADMIN_SERVICE_NAME, entry, pstm_setupTopicSenderCallback);
@@ -858,7 +867,7 @@ static void pstm_setupTopicSenders(pubsub_topology_manager_t *manager) {
static void pstm_setupTopicReceiverCallback(void *handle, void *svc) {
pstm_topic_receiver_or_sender_entry_t *entry = handle;
pubsub_admin_service_t *psa = svc;
- psa->setupTopicReceiver(psa->handle, entry->scope, entry->topic, entry->topicProperties, entry->selectedSerializerSvcId, &entry->endpoint);
+ psa->setupTopicReceiver(psa->handle, entry->scope, entry->topic, entry->topicProperties, entry->selectedSerializerSvcId, entry->selectedProtocolSvcId, &entry->endpoint);
}
static void pstm_setupTopicReceivers(pubsub_topology_manager_t *manager) {
@@ -870,6 +879,7 @@ static void pstm_setupTopicReceivers(pubsub_topology_manager_t *manager) {
double highestScore = PUBSUB_ADMIN_NO_MATCH_SCORE;
long serializerSvcId = -1L;
+ long protocolSvcId = -1L;
long selectedPsaSvcId = -1L;
celix_properties_t *highestMatchTopicProperties = NULL;
@@ -881,15 +891,17 @@ static void pstm_setupTopicReceivers(pubsub_topology_manager_t *manager) {
pubsub_admin_service_t *psa = hashMapEntry_getValue(mapEntry);
double score = PUBSUB_ADMIN_NO_MATCH_SCORE;
long serSvcId = -1L;
+ long protSvcId = -1L;
celix_properties_t *topicProps = NULL;
- psa->matchSubscriber(psa->handle, entry->bndId, entry->subscriberProperties, &topicProps, &score, &serSvcId);
+ psa->matchSubscriber(psa->handle, entry->bndId, entry->subscriberProperties, &topicProps, &score, &serSvcId, &protSvcId);
if (score > highestScore) {
if (highestMatchTopicProperties != NULL) {
celix_properties_destroy(highestMatchTopicProperties);
}
highestScore = score;
serializerSvcId = serSvcId;
+ protocolSvcId = protSvcId;
selectedPsaSvcId = svcId;
highestMatchTopicProperties = topicProps;
} else if (topicProps != NULL) {
@@ -901,6 +913,7 @@ static void pstm_setupTopicReceivers(pubsub_topology_manager_t *manager) {
if (highestScore > PUBSUB_ADMIN_NO_MATCH_SCORE) {
entry->selectedPsaSvcId = selectedPsaSvcId;
entry->selectedSerializerSvcId = serializerSvcId;
+ entry->selectedProtocolSvcId = protocolSvcId;
entry->topicProperties = highestMatchTopicProperties;
bool called = celix_bundleContext_useServiceWithId(manager->context, selectedPsaSvcId, PUBSUB_ADMIN_SERVICE_NAME,
@@ -987,6 +1000,7 @@ static celix_status_t pubsub_topologyManager_topology(pubsub_topology_manager_t
const char *topic = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, "!Error!");
const char *adminType = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
const char *serType = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "!Error!");
+ const char *protType = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_PROTOCOL, "!Error!");
fprintf(os, "|- Discovered Endpoint %s:\n", discovered->uuid);
fprintf(os, " |- container name = %s\n", cn);
fprintf(os, " |- fw uuid = %s\n", fwuuid);
@@ -995,6 +1009,7 @@ static celix_status_t pubsub_topologyManager_topology(pubsub_topology_manager_t
fprintf(os, " |- topic = %s\n", topic);
fprintf(os, " |- admin type = %s\n", adminType);
fprintf(os, " |- serializer = %s\n", serType);
+ fprintf(os, " |- protocol = %s\n", protType);
if (manager->verbose) {
fprintf(os, " |- psa svc id = %li\n", discovered->selectedPsaSvcId);
fprintf(os, " |- usage count = %i\n", discovered->usageCount);
@@ -1017,14 +1032,17 @@ static celix_status_t pubsub_topologyManager_topology(pubsub_topology_manager_t
const char *uuid = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_UUID, "!Error!");
const char *adminType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
const char *serType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "!Error!");
+ const char *protType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_PROTOCOL, "!Error!");
fprintf(os, "|- Topic Sender for endpoint %s:\n", uuid);
fprintf(os, " |- scope = %s\n", entry->scope);
fprintf(os, " |- topic = %s\n", entry->topic);
fprintf(os, " |- admin type = %s\n", adminType);
fprintf(os, " |- serializer = %s\n", serType);
+ fprintf(os, " |- protocol = %s\n", protType);
if (manager->verbose) {
fprintf(os, " |- psa svc id = %li\n", entry->selectedPsaSvcId);
fprintf(os, " |- ser svc id = %li\n", entry->selectedSerializerSvcId);
+ fprintf(os, " |- prot svc id = %li\n", entry->selectedProtocolSvcId);
fprintf(os, " |- usage count = %i\n", entry->usageCount);
}
}
@@ -1044,14 +1062,17 @@ static celix_status_t pubsub_topologyManager_topology(pubsub_topology_manager_t
const char *uuid = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_UUID, "!Error!");
const char *adminType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
const char *serType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "!Error!");
+ const char *protType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_PROTOCOL, "!Error!");
fprintf(os, "|- Topic Receiver for endpoint %s:\n", uuid);
fprintf(os, " |- scope = %s\n", entry->scope);
fprintf(os, " |- topic = %s\n", entry->topic);
fprintf(os, " |- admin type = %s\n", adminType);
fprintf(os, " |- serializer = %s\n", serType);
+ fprintf(os, " |- protocol = %s\n", protType);
if (manager->verbose) {
fprintf(os, " |- psa svc id = %li\n", entry->selectedPsaSvcId);
fprintf(os, " |- ser svc id = %li\n", entry->selectedSerializerSvcId);
+ fprintf(os, " |- prot svc id = %li\n", entry->selectedProtocolSvcId);
fprintf(os, " |- usage count = %i\n", entry->usageCount);
}
}
@@ -1069,9 +1090,11 @@ static celix_status_t pubsub_topologyManager_topology(pubsub_topology_manager_t
const char *requestedQos = celix_properties_get(entry->topicProperties, PUBSUB_UTILS_QOS_ATTRIBUTE_KEY, "(None)");
const char *requestedConfig = celix_properties_get(entry->topicProperties, PUBSUB_ADMIN_TYPE_KEY, "(None)");
const char *requestedSer = celix_properties_get(entry->topicProperties, PUBSUB_SERIALIZER_TYPE_KEY, "(None)");
+ const char *requestedProt = celix_properties_get(entry->topicProperties, PUBSUB_PROTOCOL_TYPE_KEY, "(None)");
fprintf(os, " |- requested qos = %s\n", requestedQos);
fprintf(os, " |- requested config = %s\n", requestedConfig);
fprintf(os, " |- requested serializer = %s\n", requestedSer);
+ fprintf(os, " |- requested protocol = %s\n", requestedProt);
if (manager->verbose) {
fprintf(os, " |- usage count = %i\n", entry->usageCount);
}
@@ -1092,9 +1115,11 @@ static celix_status_t pubsub_topologyManager_topology(pubsub_topology_manager_t
const char *requestedQos = celix_properties_get(entry->topicProperties, PUBSUB_UTILS_QOS_ATTRIBUTE_KEY, "(None)");
const char *requestedConfig = celix_properties_get(entry->topicProperties, PUBSUB_ADMIN_TYPE_KEY, "(None)");
const char *requestedSer = celix_properties_get(entry->topicProperties, PUBSUB_SERIALIZER_TYPE_KEY, "(None)");
+ const char *requestedProt = celix_properties_get(entry->topicProperties, PUBSUB_PROTOCOL_TYPE_KEY, "(None)");
fprintf(os, " |- requested qos = %s\n", requestedQos);
fprintf(os, " |- requested config = %s\n", requestedConfig);
fprintf(os, " |- requested serializer = %s\n", requestedSer);
+ fprintf(os, " |- requested protocol = %s\n", requestedProt);
if (manager->verbose) {
fprintf(os, " |- usage count = %i\n", entry->usageCount);
}
diff --git a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
index 8e88641..817098e 100644
--- a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
+++ b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
@@ -96,6 +96,7 @@ typedef struct pstm_topic_receiver_or_sender_entry {
int usageCount; //nr of subscriber service for the topic receiver (matching scope & topic)
long selectedPsaSvcId;
long selectedSerializerSvcId;
+ long selectedProtocolSvcId;
long bndId;
celix_properties_t *topicProperties; //found in META-INF/(pub|sub)/(topic).properties
diff --git a/bundles/pubsub/test/test/loopback_activator.c b/bundles/pubsub/test/test/loopback_activator.c
index ad46879..44aa35b 100644
--- a/bundles/pubsub/test/test/loopback_activator.c
+++ b/bundles/pubsub/test/test/loopback_activator.c
@@ -26,7 +26,7 @@
#include "pubsub/api.h"
#include "msg.h"
-static int tst_receive(void *handle, const char *msgType, unsigned int msgTypeId, void *msg, bool *release);
+static int tst_receive(void *handle, const char *msgType, unsigned int msgTypeId, void *msg, celix_properties_t *metadata, bool *release);
static void sut_pubSet(void *handle, void *service);
struct activator {
@@ -81,7 +81,7 @@ static void sut_pubSet(void *handle, void *service) {
}
-static int tst_receive(void *handle, const char *msgType, unsigned int msgTypeId, void * voidMsg, bool *release) {
+static int tst_receive(void *handle, const char *msgType, unsigned int msgTypeId, void * voidMsg, celix_properties_t *metadata, bool *release) {
struct activator *act =handle;
msg_t *msg = voidMsg;
msg_t send_msg = *msg;
@@ -90,7 +90,7 @@ static int tst_receive(void *handle, const char *msgType, unsigned int msgTypeId
if (act->count == 0) {
act->pubSvc->localMsgTypeIdForMsgType(act->pubSvc->handle, MSG_NAME, &act->msgId);
}
- act->pubSvc->send(act->pubSvc->handle, act->msgId, &send_msg);
+ act->pubSvc->send(act->pubSvc->handle, act->msgId, &send_msg, metadata);
act->count += 1;
}
pthread_mutex_unlock(&act->mutex);
diff --git a/bundles/pubsub/test/test/sut_activator.c b/bundles/pubsub/test/test/sut_activator.c
index bd0da8c..80dc1a9 100644
--- a/bundles/pubsub/test/test/sut_activator.c
+++ b/bundles/pubsub/test/test/sut_activator.c
@@ -93,8 +93,8 @@ static void* sut_sendThread(void *data) {
if (msgId == 0) {
act->pubSvc->localMsgTypeIdForMsgType(act->pubSvc->handle, MSG_NAME, &msgId);
}
-
- act->pubSvc->send(act->pubSvc->handle, msgId, &msg);
+ celix_properties_t *metadata = hashMap_create(utils_stringHash, utils_stringHash, utils_stringEquals, utils_stringEquals);
+ act->pubSvc->send(act->pubSvc->handle, msgId, &msg, metadata);
if (msg.seqNr % 1000 == 0) {
printf("Send %i messages\n", msg.seqNr);
}
diff --git a/bundles/pubsub/test/test/sut_endpoint_activator.c b/bundles/pubsub/test/test/sut_endpoint_activator.c
index 4140ae8..a9e6520 100644
--- a/bundles/pubsub/test/test/sut_endpoint_activator.c
+++ b/bundles/pubsub/test/test/sut_endpoint_activator.c
@@ -94,7 +94,8 @@ static void* sut_sendThread(void *data) {
act->pubSvc->localMsgTypeIdForMsgType(act->pubSvc->handle, MSG_NAME, &msgId);
}
- act->pubSvc->send(act->pubSvc->handle, msgId, &msg);
+ celix_properties_t *metadata = hashMap_create(utils_stringHash, utils_stringHash, utils_stringEquals, utils_stringEquals);
+ act->pubSvc->send(act->pubSvc->handle, msgId, &msg, metadata);
if (msg.seqNr % 1000 == 0) {
printf("Send %i messages\n", msg.seqNr);
}
diff --git a/bundles/pubsub/test/test/tst_activator.c b/bundles/pubsub/test/test/tst_activator.c
index 3a8bcce..40900c9 100644
--- a/bundles/pubsub/test/test/tst_activator.c
+++ b/bundles/pubsub/test/test/tst_activator.c
@@ -28,7 +28,7 @@
#include "msg.h"
#include "receive_count_service.h"
-static int tst_receive(void *handle, const char *msgType, unsigned int msgTypeId, void *msg, bool *release);
+static int tst_receive(void *handle, const char *msgType, unsigned int msgTypeId, void *msg, celix_properties_t *metadata, bool *release);
static size_t tst_count(void *handle);
struct activator {
@@ -72,7 +72,7 @@ celix_status_t bnd_stop(struct activator *act, celix_bundle_context_t *ctx) {
CELIX_GEN_BUNDLE_ACTIVATOR(struct activator, bnd_start, bnd_stop) ;
-static int tst_receive(void *handle, const char * msgType __attribute__((unused)), unsigned int msgTypeId __attribute__((unused)), void * voidMsg, bool *release __attribute__((unused))) {
+static int tst_receive(void *handle, const char * msgType __attribute__((unused)), unsigned int msgTypeId __attribute__((unused)), void * voidMsg, celix_properties_t *metadata __attribute__((unused)), bool *release __attribute__((unused))) {
struct activator *act = handle;
msg_t *msg = voidMsg;
diff --git a/bundles/pubsub/test/test/tst_endpoint_activator.c b/bundles/pubsub/test/test/tst_endpoint_activator.c
index 8e501e4..ef3ce2f 100644
--- a/bundles/pubsub/test/test/tst_endpoint_activator.c
+++ b/bundles/pubsub/test/test/tst_endpoint_activator.c
@@ -29,7 +29,7 @@
#include "receive_count_service.h"
-static int tst_receive(void *handle, const char *msgType, unsigned int msgTypeId, void *msg, bool *release);
+static int tst_receive(void *handle, const char *msgType, unsigned int msgTypeId, void *msg, celix_properties_t *metadata, bool *release);
static size_t tst_count(void *handle);
struct activator {
@@ -75,7 +75,7 @@ celix_status_t bnd_stop(struct activator *act, celix_bundle_context_t *ctx) {
CELIX_GEN_BUNDLE_ACTIVATOR(struct activator, bnd_start, bnd_stop) ;
-static int tst_receive(void *handle, const char * msgType __attribute__((unused)), unsigned int msgTypeId __attribute__((unused)), void * voidMsg, bool *release __attribute__((unused))) {
+static int tst_receive(void *handle, const char * msgType __attribute__((unused)), unsigned int msgTypeId __attribute__((unused)), void * voidMsg, celix_properties_t *metadata __attribute__((unused)), bool *release __attribute__((unused))) {
struct activator *act = handle;
msg_t *msg = voidMsg;
diff --git a/libs/utils/include/celix_properties.h b/libs/utils/include/celix_properties.h
index 5648a7c..7c0a9ea 100644
--- a/libs/utils/include/celix_properties.h
+++ b/libs/utils/include/celix_properties.h
@@ -56,6 +56,8 @@ const char* celix_properties_get(const celix_properties_t *properties, const cha
void celix_properties_set(celix_properties_t *properties, const char *key, const char *value);
+void celix_properties_setWithoutCopy(celix_properties_t *properties, char *key, char *value);
+
void celix_properties_unset(celix_properties_t *properties, const char *key);
celix_properties_t* celix_properties_copy(const celix_properties_t *properties);
diff --git a/libs/utils/src/properties.c b/libs/utils/src/properties.c
index b28b5c1..67f164b 100644
--- a/libs/utils/src/properties.c
+++ b/libs/utils/src/properties.c
@@ -382,6 +382,21 @@ void celix_properties_set(celix_properties_t *properties, const char *key, const
}
}
+void celix_properties_setWithoutCopy(celix_properties_t *properties, char *key, char *value) {
+ if (properties != NULL) {
+ hash_map_entry_pt entry = hashMap_getEntry(properties, key);
+ char *oldVal = NULL;
+ if (entry != NULL) {
+ char *oldKey = hashMapEntry_getKey(entry);
+ oldVal = hashMapEntry_getValue(entry);
+ hashMap_put(properties, oldKey, value);
+ } else {
+ hashMap_put(properties, key, value);
+ }
+ free(oldVal);
+ }
+}
+
void celix_properties_unset(celix_properties_t *properties, const char *key) {
char* oldValue = hashMap_remove(properties, key);
free(oldValue);