You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2018/11/06 11:44:44 UTC

[2/3] celix git commit: CELIX-454: Refactors PubSub API. Multipart is no longer part of the current API.

http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_api/include/pubsub/publisher.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_api/include/pubsub/publisher.h b/bundles/pubsub/pubsub_api/include/pubsub/publisher.h
index 9f7f3b6..fb8c5f6 100644
--- a/bundles/pubsub/pubsub_api/include/pubsub/publisher.h
+++ b/bundles/pubsub/pubsub_api/include/pubsub/publisher.h
@@ -30,7 +30,7 @@
 #include <stdlib.h>
 
 #define PUBSUB_PUBLISHER_SERVICE_NAME           "pubsub.publisher"
-#define PUBSUB_PUBLISHER_SERVICE_VERSION	    "2.0.0"
+#define PUBSUB_PUBLISHER_SERVICE_VERSION	    "3.0.0"
  
 //properties
 #define PUBSUB_PUBLISHER_TOPIC                  "topic"
@@ -38,17 +38,7 @@
 #define PUBSUB_PUBLISHER_CONFIG                 "pubsub.config"
  
 #define PUBSUB_PUBLISHER_SCOPE_DEFAULT			"default"
-//flags
-#define PUBSUB_PUBLISHER_FIRST_MSG  01
-#define PUBSUB_PUBLISHER_PART_MSG   02
-#define PUBSUB_PUBLISHER_LAST_MSG   04
 
-struct pubsub_release_callback_struct {
-    void *handle;
-    void (*release)(char *buf, void *handle);
-};
-typedef struct pubsub_release_callback_struct pubsub_release_callback_t;
-typedef struct pubsub_release_callback_struct* pubsub_release_callback_pt;
  
  
 struct pubsub_publisher {
@@ -71,15 +61,6 @@ struct pubsub_publisher {
      */
     int (*send)(void *handle, unsigned int msgTypeId, const void *msg);
  
-  
-    /**
-     * sendMultipart is a async function, but the msg can be safely deleted after send returns.
-     * The first (primary) message of a multipart message must have the flag PUBLISHER_PRIMARY_MSG
-     * The last message of a multipart message must have the flag PUBLISHER_LAST_MSG
-     * Returns 0 on success.
-     */
-    int (*sendMultipart)(void *handle, unsigned int msgTypeId, const void *msg, int flags);
- 
 };
 typedef struct pubsub_publisher pubsub_publisher_t;
 typedef struct pubsub_publisher* pubsub_publisher_pt;

http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_api/include/pubsub/subscriber.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_api/include/pubsub/subscriber.h b/bundles/pubsub/pubsub_api/include/pubsub/subscriber.h
index ca6d4d1..7a0d85b 100644
--- a/bundles/pubsub/pubsub_api/include/pubsub/subscriber.h
+++ b/bundles/pubsub/pubsub_api/include/pubsub/subscriber.h
@@ -30,7 +30,7 @@
 #include <stdbool.h>
 
 #define PUBSUB_SUBSCRIBER_SERVICE_NAME          "pubsub.subscriber"
-#define PUBSUB_SUBSCRIBER_SERVICE_VERSION       "2.0.0"
+#define PUBSUB_SUBSCRIBER_SERVICE_VERSION       "3.0.0"
  
 //properties
 #define PUBSUB_SUBSCRIBER_TOPIC                "topic"
@@ -38,17 +38,15 @@
 #define PUBSUB_SUBSCRIBER_CONFIG               "pubsub.config"
 
 #define PUBSUB_SUBSCRIBER_SCOPE_DEFAULT        "default"
- 
-struct pubsub_multipart_callbacks_struct {
-    void *handle;
-    int (*localMsgTypeIdForMsgType)(void *handle, const char *msgType, unsigned int *msgId);
-    int (*getMultipart)(void *handle, unsigned int msgTypeId, bool retain, void **part);
-};
-typedef struct pubsub_multipart_callbacks_struct pubsub_multipart_callbacks_t;
-typedef struct pubsub_multipart_callbacks_struct* pubsub_multipart_callbacks_pt;
- 
+
 struct pubsub_subscriber_struct {
     void *handle;
+
+    /**
+     * Called to initialize the subscriber with the receiver thread.
+     * Can be used to tweak the receiver thread attributes
+     */
+    int (*init)(void *handle);
      
     /**
      * When a new message for a topic is available the receive will be called.
@@ -64,7 +62,7 @@ struct pubsub_subscriber_struct {
      *
      * this method can be  NULL.
      */
-    int (*receive)(void *handle, const char *msgType, unsigned int msgTypeId, void *msg, pubsub_multipart_callbacks_t *callbacks, bool *release);
+    int (*receive)(void *handle, const char *msgType, unsigned int msgTypeId, void *msg, bool *release);
 
 };
 typedef struct pubsub_subscriber_struct pubsub_subscriber_t;

http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_discovery/src/psd_activator.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_discovery/src/psd_activator.c b/bundles/pubsub/pubsub_discovery/src/psd_activator.c
index 56d8997..06e3c56 100644
--- a/bundles/pubsub/pubsub_discovery/src/psd_activator.c
+++ b/bundles/pubsub/pubsub_discovery/src/psd_activator.c
@@ -67,7 +67,7 @@ static celix_status_t psd_start(psd_activator_t *act, celix_bundle_context_t *ct
 
 	act->listenerSvc.handle = act->pubsub_discovery;
 	act->listenerSvc.announceEndpoint = pubsub_discovery_announceEndpoint;
-	act->listenerSvc.removeEndpoint = pubsub_discovery_removeEndpoint;
+	act->listenerSvc.revokeEndpoint = pubsub_discovery_revokeEndpoint;
 
 	//register shell command service
 	//register shell command

http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
index de708ff..673bdfd 100644
--- a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
+++ b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
@@ -422,7 +422,7 @@ celix_status_t pubsub_discovery_announceEndpoint(void *handle, const celix_prope
 
     return status;
 }
-celix_status_t pubsub_discovery_removeEndpoint(void *handle, const celix_properties_t *endpoint) {
+celix_status_t pubsub_discovery_revokeEndpoint(void *handle, const celix_properties_t *endpoint) {
     pubsub_discovery_t *disc = handle;
     celix_status_t status = CELIX_SUCCESS;
 

http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h
index b2726fb..269fe5a 100644
--- a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h
+++ b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.h
@@ -95,7 +95,7 @@ void pubsub_discovery_discoveredEndpointsListenerAdded(void *handle, void *svc,
 void pubsub_discovery_discoveredEndpointsListenerRemoved(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd);
 
 celix_status_t pubsub_discovery_announceEndpoint(void *handle, const celix_properties_t *endpoint);
-celix_status_t pubsub_discovery_removeEndpoint(void *handle, const celix_properties_t *endpoint);
+celix_status_t pubsub_discovery_revokeEndpoint(void *handle, const celix_properties_t *endpoint);
 
 celix_status_t pubsub_discovery_executeCommand(void *handle, char * commandLine, FILE *os, FILE *errorStream);
 

http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_spi/include/pubsub_admin.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_admin.h b/bundles/pubsub/pubsub_spi/include/pubsub_admin.h
index 7f1cc73..64abe24 100644
--- a/bundles/pubsub/pubsub_spi/include/pubsub_admin.h
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_admin.h
@@ -32,8 +32,8 @@
 #include "celix_filter.h"
 
 #define PUBSUB_ADMIN_SERVICE_NAME		"pubsub_admin"
-#define PUBSUB_ADMIN_SERVICE_VERSION	"2.0.0"
-#define PUBSUB_ADMIN_SERVICE_RANGE		"[2,3)"
+#define PUBSUB_ADMIN_SERVICE_VERSION	"3.0.0"
+#define PUBSUB_ADMIN_SERVICE_RANGE		"[3,4)"
 
 //expected service properties
 #define PUBSUB_ADMIN_SERVICE_TYPE		"psa_type"
@@ -44,20 +44,20 @@
 struct pubsub_admin_service {
 	void *handle;
 
-	celix_status_t (*matchPublisher)(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, double *score, long *serializerSvcId);
-	celix_status_t (*matchSubscriber)(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, double *score, long *serializerSvcId);
-	celix_status_t (*matchEndpoint)(void *handle, const celix_properties_t *endpoint, bool *match);
+	celix_status_t (*matchPublisher)(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, celix_properties_t **outTopicProperties, double *outScopre, long *outSerializerSvcId);
+	celix_status_t (*matchSubscriber)(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, celix_properties_t **outTopicProperties, double *outScore, long *outSerializerSvcId);
+	celix_status_t (*matchDiscoveredEndpoint)(void *handle, const celix_properties_t *endpoint, bool *match);
 
 	//note endpoint is owned by caller
-	celix_status_t (*setupTopicSender)(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **publisherEndpoint);
+	celix_status_t (*setupTopicSender)(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, celix_properties_t **publisherEndpoint);
 	celix_status_t (*teardownTopicSender)(void *handle, const char *scope, const char *topic);
 
 	//note endpoint is owned by caller
-	celix_status_t (*setupTopicReceiver)(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **subscriberEndpoint);
+	celix_status_t (*setupTopicReceiver)(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, celix_properties_t **subscriberEndpoint);
 	celix_status_t (*teardownTopicReceiver)(void *handle, const char *scope, const char *topic);
 
-	celix_status_t (*addEndpoint)(void *handle, const celix_properties_t *endpoint);
-	celix_status_t (*removeEndpoint)(void *handle, const celix_properties_t *endpoint);
+	celix_status_t (*addDiscoveredEndpoint)(void *handle, const celix_properties_t *endpoint);
+	celix_status_t (*removeDiscoveredEndpoint)(void *handle, const celix_properties_t *endpoint);
 };
 
 typedef struct pubsub_admin_service pubsub_admin_service_t;

http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_spi/include/pubsub_listeners.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_listeners.h b/bundles/pubsub/pubsub_spi/include/pubsub_listeners.h
index 7e4bfec..be4e569 100644
--- a/bundles/pubsub/pubsub_spi/include/pubsub_listeners.h
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_listeners.h
@@ -33,15 +33,12 @@ typedef struct pubsub_discovered_endpoint_listener pubsub_discovered_endpoint_li
 
 
 
-//Informs the discovery admins to publish info into the network
+//Informs the pubsub discoveries to announce/revoke endpoint
 struct pubsub_announce_endpoint_listener {
 	void *handle;
 
 	celix_status_t (*announceEndpoint)(void *handle, const celix_properties_t *properties);
-	celix_status_t (*removeEndpoint)(void *handle, const celix_properties_t *properties);
-
-	//getCurrentSubscriberEndPoints
-	//getCurrentPublisherEndPoints
+	celix_status_t (*revokeEndpoint)(void *handle, const celix_properties_t *properties);
 };
 
 typedef struct pubsub_announce_endpoint_listener pubsub_announce_endpoint_listener_t;

http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_spi/include/pubsub_utils.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_utils.h b/bundles/pubsub/pubsub_spi/include/pubsub_utils.h
index 66cc44a..9d707f3 100644
--- a/bundles/pubsub/pubsub_spi/include/pubsub_utils.h
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_utils.h
@@ -48,6 +48,7 @@ double pubsub_utils_matchPublisher(
         double sampleScore,
         double controlScore,
         double defaultScore,
+        celix_properties_t **outTopicProperties,
         long *outSerializerSvcId);
 
 double pubsub_utils_matchSubscriber(
@@ -58,6 +59,7 @@ double pubsub_utils_matchSubscriber(
         double sampleScore,
         double controlScore,
         double defaultScore,
+        celix_properties_t **outTopicProperties,
         long *outSerializerSvcId);
 
 bool pubsub_utils_matchEndpoint(

http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_spi/src/pubsub_utils_match.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_spi/src/pubsub_utils_match.c b/bundles/pubsub/pubsub_spi/src/pubsub_utils_match.c
index dc5c35e..42c141f 100644
--- a/bundles/pubsub/pubsub_spi/src/pubsub_utils_match.c
+++ b/bundles/pubsub/pubsub_spi/src/pubsub_utils_match.c
@@ -107,6 +107,7 @@ double pubsub_utils_matchPublisher(
 		double sampleScore,
 		double controlScore,
 		double defaultScore,
+		celix_properties_t **outTopicProperties,
 		long *outSerializerSvcId) {
 
 	celix_properties_t *ep = pubsubEndpoint_createFromPublisherTrackerInfo(ctx, bundleId, filter);
@@ -132,8 +133,10 @@ double pubsub_utils_matchPublisher(
 		*outSerializerSvcId = serializerSvcId;
 	}
 
-	if (ep != NULL) {
-		celix_properties_destroy(ep); //TODO improve function to that tmp endpoint is not needed -> parse filter
+	if (outTopicProperties != NULL) {
+		*outTopicProperties = ep;
+	} else if (ep != NULL) {
+		celix_properties_destroy(ep);
 	}
 
 	return score;
@@ -159,6 +162,7 @@ double pubsub_utils_matchSubscriber(
 		double sampleScore,
 		double controlScore,
 		double defaultScore,
+		celix_properties_t **outTopicProperties,
 		long *outSerializerSvcId) {
 
 	pubsub_get_topic_properties_data_t data;
@@ -188,7 +192,9 @@ double pubsub_utils_matchSubscriber(
 		*outSerializerSvcId = serializerSvcId;
 	}
 
-	if (ep != NULL) {
+	if (outTopicProperties != NULL) {
+		*outTopicProperties = ep;
+	} else if (ep != NULL) {
 		celix_properties_destroy(ep);
 	}