You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2018/11/06 11:44:44 UTC
[2/3] celix git commit: CELIX-454: Refactors PubSub API. Multipart is
no longer part of the current API.
http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_api/include/pubsub/publisher.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_api/include/pubsub/publisher.h b/bundles/pubsub/pubsub_api/include/pubsub/publisher.h
index 9f7f3b6..fb8c5f6 100644
--- a/bundles/pubsub/pubsub_api/include/pubsub/publisher.h
+++ b/bundles/pubsub/pubsub_api/include/pubsub/publisher.h
@@ -30,7 +30,7 @@
#include <stdlib.h>
#define PUBSUB_PUBLISHER_SERVICE_NAME "pubsub.publisher"
-#define PUBSUB_PUBLISHER_SERVICE_VERSION "2.0.0"
+#define PUBSUB_PUBLISHER_SERVICE_VERSION "3.0.0"
//properties
#define PUBSUB_PUBLISHER_TOPIC "topic"
@@ -38,17 +38,7 @@
#define PUBSUB_PUBLISHER_CONFIG "pubsub.config"
#define PUBSUB_PUBLISHER_SCOPE_DEFAULT "default"
-//flags
-#define PUBSUB_PUBLISHER_FIRST_MSG 01
-#define PUBSUB_PUBLISHER_PART_MSG 02
-#define PUBSUB_PUBLISHER_LAST_MSG 04
-struct pubsub_release_callback_struct {
- void *handle;
- void (*release)(char *buf, void *handle);
-};
-typedef struct pubsub_release_callback_struct pubsub_release_callback_t;
-typedef struct pubsub_release_callback_struct* pubsub_release_callback_pt;
struct pubsub_publisher {
@@ -71,15 +61,6 @@ struct pubsub_publisher {
*/
int (*send)(void *handle, unsigned int msgTypeId, const void *msg);
-
- /**
- * sendMultipart is a async function, but the msg can be safely deleted after send returns.
- * The first (primary) message of a multipart message must have the flag PUBLISHER_PRIMARY_MSG
- * The last message of a multipart message must have the flag PUBLISHER_LAST_MSG
- * Returns 0 on success.
- */
- int (*sendMultipart)(void *handle, unsigned int msgTypeId, const void *msg, int flags);
-
};
typedef struct pubsub_publisher pubsub_publisher_t;
typedef struct pubsub_publisher* pubsub_publisher_pt;
http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_api/include/pubsub/subscriber.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_api/include/pubsub/subscriber.h b/bundles/pubsub/pubsub_api/include/pubsub/subscriber.h
index ca6d4d1..7a0d85b 100644
--- a/bundles/pubsub/pubsub_api/include/pubsub/subscriber.h
+++ b/bundles/pubsub/pubsub_api/include/pubsub/subscriber.h
@@ -30,7 +30,7 @@
#include <stdbool.h>
#define PUBSUB_SUBSCRIBER_SERVICE_NAME "pubsub.subscriber"
-#define PUBSUB_SUBSCRIBER_SERVICE_VERSION "2.0.0"
+#define PUBSUB_SUBSCRIBER_SERVICE_VERSION "3.0.0"
//properties
#define PUBSUB_SUBSCRIBER_TOPIC "topic"
@@ -38,17 +38,15 @@
#define PUBSUB_SUBSCRIBER_CONFIG "pubsub.config"
#define PUBSUB_SUBSCRIBER_SCOPE_DEFAULT "default"
-
-struct pubsub_multipart_callbacks_struct {
- void *handle;
- int (*localMsgTypeIdForMsgType)(void *handle, const char *msgType, unsigned int *msgId);
- int (*getMultipart)(void *handle, unsigned int msgTypeId, bool retain, void **part);
-};
-typedef struct pubsub_multipart_callbacks_struct pubsub_multipart_callbacks_t;
-typedef struct pubsub_multipart_callbacks_struct* pubsub_multipart_callbacks_pt;
-
+
struct pubsub_subscriber_struct {
void *handle;
+
+ /**
+ * Called to initialize the subscriber with the receiver thread.
+ * Can be used to tweak the receiver thread attributes
+ */
+ int (*init)(void *handle);
/**
* When a new message for a topic is available the receive will be called.
@@ -64,7 +62,7 @@ struct pubsub_subscriber_struct {
*
* this method can be NULL.
*/
- int (*receive)(void *handle, const char *msgType, unsigned int msgTypeId, void *msg, pubsub_multipart_callbacks_t *callbacks, bool *release);
+ int (*receive)(void *handle, const char *msgType, unsigned int msgTypeId, void *msg, bool *release);
};
typedef struct pubsub_subscriber_struct pubsub_subscriber_t;
http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_discovery/src/psd_activator.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_discovery/src/psd_activator.c b/bundles/pubsub/pubsub_discovery/src/psd_activator.c
index 56d8997..06e3c56 100644
--- a/bundles/pubsub/pubsub_discovery/src/psd_activator.c
+++ b/bundles/pubsub/pubsub_discovery/src/psd_activator.c
@@ -67,7 +67,7 @@ static celix_status_t psd_start(psd_activator_t *act, celix_bundle_context_t *ct
act->listenerSvc.handle = act->pubsub_discovery;
act->listenerSvc.announceEndpoint = pubsub_discovery_announceEndpoint;
- act->listenerSvc.removeEndpoint = pubsub_discovery_removeEndpoint;
+ act->listenerSvc.revokeEndpoint = pubsub_discovery_revokeEndpoint;
//register shell command service
//register shell command
http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
index de708ff..673bdfd 100644
--- a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
+++ b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
@@ -422,7 +422,7 @@ celix_status_t pubsub_discovery_announceEndpoint(void *handle, const celix_prope
return status;
}
-celix_status_t pubsub_discovery_removeEndpoint(void *handle, const celix_properties_t *endpoint) {
+celix_status_t pubsub_discovery_revokeEndpoint(void *handle, const celix_properties_t *endpoint) {
pubsub_discovery_t *disc = handle;
celix_status_t status = CELIX_SUCCESS;
http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h
index b2726fb..269fe5a 100644
--- a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h
+++ b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h
@@ -95,7 +95,7 @@ void pubsub_discovery_discoveredEndpointsListenerAdded(void *handle, void *svc,
void pubsub_discovery_discoveredEndpointsListenerRemoved(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd);
celix_status_t pubsub_discovery_announceEndpoint(void *handle, const celix_properties_t *endpoint);
-celix_status_t pubsub_discovery_removeEndpoint(void *handle, const celix_properties_t *endpoint);
+celix_status_t pubsub_discovery_revokeEndpoint(void *handle, const celix_properties_t *endpoint);
celix_status_t pubsub_discovery_executeCommand(void *handle, char * commandLine, FILE *os, FILE *errorStream);
http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_spi/include/pubsub_admin.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_admin.h b/bundles/pubsub/pubsub_spi/include/pubsub_admin.h
index 7f1cc73..64abe24 100644
--- a/bundles/pubsub/pubsub_spi/include/pubsub_admin.h
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_admin.h
@@ -32,8 +32,8 @@
#include "celix_filter.h"
#define PUBSUB_ADMIN_SERVICE_NAME "pubsub_admin"
-#define PUBSUB_ADMIN_SERVICE_VERSION "2.0.0"
-#define PUBSUB_ADMIN_SERVICE_RANGE "[2,3)"
+#define PUBSUB_ADMIN_SERVICE_VERSION "3.0.0"
+#define PUBSUB_ADMIN_SERVICE_RANGE "[3,4)"
//expected service properties
#define PUBSUB_ADMIN_SERVICE_TYPE "psa_type"
@@ -44,20 +44,20 @@
struct pubsub_admin_service {
void *handle;
- celix_status_t (*matchPublisher)(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, double *score, long *serializerSvcId);
- celix_status_t (*matchSubscriber)(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, double *score, long *serializerSvcId);
- celix_status_t (*matchEndpoint)(void *handle, const celix_properties_t *endpoint, bool *match);
+ celix_status_t (*matchPublisher)(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, celix_properties_t **outTopicProperties, double *outScopre, long *outSerializerSvcId);
+ celix_status_t (*matchSubscriber)(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, celix_properties_t **outTopicProperties, double *outScore, long *outSerializerSvcId);
+ celix_status_t (*matchDiscoveredEndpoint)(void *handle, const celix_properties_t *endpoint, bool *match);
//note endpoint is owned by caller
- celix_status_t (*setupTopicSender)(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **publisherEndpoint);
+ celix_status_t (*setupTopicSender)(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, celix_properties_t **publisherEndpoint);
celix_status_t (*teardownTopicSender)(void *handle, const char *scope, const char *topic);
//note endpoint is owned by caller
- celix_status_t (*setupTopicReceiver)(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **subscriberEndpoint);
+ celix_status_t (*setupTopicReceiver)(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, celix_properties_t **subscriberEndpoint);
celix_status_t (*teardownTopicReceiver)(void *handle, const char *scope, const char *topic);
- celix_status_t (*addEndpoint)(void *handle, const celix_properties_t *endpoint);
- celix_status_t (*removeEndpoint)(void *handle, const celix_properties_t *endpoint);
+ celix_status_t (*addDiscoveredEndpoint)(void *handle, const celix_properties_t *endpoint);
+ celix_status_t (*removeDiscoveredEndpoint)(void *handle, const celix_properties_t *endpoint);
};
typedef struct pubsub_admin_service pubsub_admin_service_t;
http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_spi/include/pubsub_listeners.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_listeners.h b/bundles/pubsub/pubsub_spi/include/pubsub_listeners.h
index 7e4bfec..be4e569 100644
--- a/bundles/pubsub/pubsub_spi/include/pubsub_listeners.h
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_listeners.h
@@ -33,15 +33,12 @@ typedef struct pubsub_discovered_endpoint_listener pubsub_discovered_endpoint_li
-//Informs the discovery admins to publish info into the network
+//Informs the pubsub discoveries to announce/revoke endpoint
struct pubsub_announce_endpoint_listener {
void *handle;
celix_status_t (*announceEndpoint)(void *handle, const celix_properties_t *properties);
- celix_status_t (*removeEndpoint)(void *handle, const celix_properties_t *properties);
-
- //getCurrentSubscriberEndPoints
- //getCurrentPublisherEndPoints
+ celix_status_t (*revokeEndpoint)(void *handle, const celix_properties_t *properties);
};
typedef struct pubsub_announce_endpoint_listener pubsub_announce_endpoint_listener_t;
http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_spi/include/pubsub_utils.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_utils.h b/bundles/pubsub/pubsub_spi/include/pubsub_utils.h
index 66cc44a..9d707f3 100644
--- a/bundles/pubsub/pubsub_spi/include/pubsub_utils.h
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_utils.h
@@ -48,6 +48,7 @@ double pubsub_utils_matchPublisher(
double sampleScore,
double controlScore,
double defaultScore,
+ celix_properties_t **outTopicProperties,
long *outSerializerSvcId);
double pubsub_utils_matchSubscriber(
@@ -58,6 +59,7 @@ double pubsub_utils_matchSubscriber(
double sampleScore,
double controlScore,
double defaultScore,
+ celix_properties_t **outTopicProperties,
long *outSerializerSvcId);
bool pubsub_utils_matchEndpoint(
http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_spi/src/pubsub_utils_match.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_spi/src/pubsub_utils_match.c b/bundles/pubsub/pubsub_spi/src/pubsub_utils_match.c
index dc5c35e..42c141f 100644
--- a/bundles/pubsub/pubsub_spi/src/pubsub_utils_match.c
+++ b/bundles/pubsub/pubsub_spi/src/pubsub_utils_match.c
@@ -107,6 +107,7 @@ double pubsub_utils_matchPublisher(
double sampleScore,
double controlScore,
double defaultScore,
+ celix_properties_t **outTopicProperties,
long *outSerializerSvcId) {
celix_properties_t *ep = pubsubEndpoint_createFromPublisherTrackerInfo(ctx, bundleId, filter);
@@ -132,8 +133,10 @@ double pubsub_utils_matchPublisher(
*outSerializerSvcId = serializerSvcId;
}
- if (ep != NULL) {
- celix_properties_destroy(ep); //TODO improve function to that tmp endpoint is not needed -> parse filter
+ if (outTopicProperties != NULL) {
+ *outTopicProperties = ep;
+ } else if (ep != NULL) {
+ celix_properties_destroy(ep);
}
return score;
@@ -159,6 +162,7 @@ double pubsub_utils_matchSubscriber(
double sampleScore,
double controlScore,
double defaultScore,
+ celix_properties_t **outTopicProperties,
long *outSerializerSvcId) {
pubsub_get_topic_properties_data_t data;
@@ -188,7 +192,9 @@ double pubsub_utils_matchSubscriber(
*outSerializerSvcId = serializerSvcId;
}
- if (ep != NULL) {
+ if (outTopicProperties != NULL) {
+ *outTopicProperties = ep;
+ } else if (ep != NULL) {
celix_properties_destroy(ep);
}