You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@celix.apache.org by GitBox <gi...@apache.org> on 2021/04/02 14:42:18 UTC

[GitHub] [celix] Oipo opened a new pull request #334: Feature/pubsub tcp serialization handlers

Oipo opened a new pull request #334:
URL: https://github.com/apache/celix/pull/334


   Much like https://github.com/apache/celix/pull/292
   
   Create a v2 of the tcp pubsub which uses the new serializer services.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [celix] Oipo commented on pull request #334: Feature/pubsub tcp serialization handlers

Posted by GitBox <gi...@apache.org>.
Oipo commented on pull request #334:
URL: https://github.com/apache/celix/pull/334#issuecomment-812560316


   The metrics/monitoring in the pubsub have been removed, mostly because for the receiver none of the metrics were actually recorded, the sender only had a couple relevant metrics being recorded but most importantly, users so far have not been using the monitoring part.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [celix] pnoltes commented on a change in pull request #334: Feature/pubsub tcp serialization handlers

Posted by GitBox <gi...@apache.org>.
pnoltes commented on a change in pull request #334:
URL: https://github.com/apache/celix/pull/334#discussion_r607874286



##########
File path: bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.c
##########
@@ -0,0 +1,505 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <pubsub_serializer.h>
+#include <stdlib.h>
+#include <stdint.h>
+#include <memory.h>
+#include <pubsub_constants.h>
+#include <pubsub/publisher.h>
+#include <utils.h>
+#include <zconf.h>
+#include <arpa/inet.h>
+#include <celix_log_helper.h>
+#include "pubsub_psa_tcp_constants.h"
+#include "pubsub_tcp_topic_sender.h"
+#include "pubsub_tcp_handler.h"
+#include "pubsub_tcp_common.h"
+#include <uuid/uuid.h>
+#include "celix_constants.h"
+#include <pubsub_utils.h>
+#include "pubsub_interceptors_handler.h"
+#include "pubsub_tcp_admin.h"
+
+#define TCP_BIND_MAX_RETRY                      10
+
+#define L_DEBUG(...) \
+    celix_logHelper_log(sender->logHelper, CELIX_LOG_LEVEL_DEBUG, __VA_ARGS__)
+#define L_INFO(...) \
+    celix_logHelper_log(sender->logHelper, CELIX_LOG_LEVEL_INFO, __VA_ARGS__)
+#define L_WARN(...) \
+    celix_logHelper_log(sender->logHelper, CELIX_LOG_LEVEL_WARNING, __VA_ARGS__)
+#define L_ERROR(...) \
+    celix_logHelper_log(sender->logHelper, CELIX_LOG_LEVEL_ERROR, __VA_ARGS__)
+
+struct pubsub_tcp_topic_sender {
+    celix_bundle_context_t *ctx;
+    celix_log_helper_t *logHelper;
+    long protocolSvcId;
+    pubsub_protocol_service_t *protocol;
+    uuid_t fwUUID;
+    pubsub_tcpHandler_t *socketHandler;
+    pubsub_tcpHandler_t *sharedSocketHandler;
+    pubsub_interceptors_handler_t *interceptorsHandler;
+
+    void *admin;
+    char *scope;
+    char *topic;
+    char *url;
+    char *serializerType;
+    bool isStatic;
+    bool isPassive;
+    bool verbose;
+    unsigned long send_delay;
+
+    struct {
+        long svcId;
+        celix_service_factory_t factory;
+    } publisher;
+
+    struct {
+        celix_thread_mutex_t mutex;
+        hash_map_t *map;  //key = bndId, value = psa_tcp_bounded_service_entry_t
+    } boundedServices;
+};
+
+typedef struct psa_tcp_send_msg_entry {
+    uint32_t type; //msg type id (hash of fqn)
+    const char *fqn;
+    uint8_t major;
+    uint8_t minor;
+    unsigned char originUUID[16];
+//    pubsub_msg_serializer_t *msgSer;
+    pubsub_protocol_service_t *protSer;
+    struct iovec *serializedIoVecOutput;
+    size_t serializedIoVecOutputLen;
+    unsigned int seqNr;
+} psa_tcp_send_msg_entry_t;
+
+typedef struct psa_tcp_bounded_service_entry {
+    pubsub_tcp_topic_sender_t *parent;
+    pubsub_publisher_t service;
+    long bndId;
+    hash_map_t *msgEntries; //key = msg type id, value = psa_tcp_send_msg_entry_t
+    int getCount;
+} psa_tcp_bounded_service_entry_t;
+
+static int psa_tcp_localMsgTypeIdForMsgType(void *handle, const char *msgType, unsigned int *msgTypeId);
+
+static void *psa_tcp_getPublisherService(void *handle, const celix_bundle_t *requestingBundle,
+                                         const celix_properties_t *svcProperties);
+
+static void psa_tcp_ungetPublisherService(void *handle, const celix_bundle_t *requestingBundle,
+                                          const celix_properties_t *svcProperties);
+
+static void delay_first_send_for_late_joiners(pubsub_tcp_topic_sender_t *sender);
+
+static int
+psa_tcp_topicPublicationSend(void *handle, unsigned int msgTypeId, const void *msg, celix_properties_t *metadata);
+
+pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
+    celix_bundle_context_t *ctx,
+    celix_log_helper_t *logHelper,
+    const char *scope,
+    const char *topic,
+    const char *serializerType,
+    void *admin,
+    const celix_properties_t *topicProperties,
+    pubsub_tcp_endPointStore_t *handlerStore,
+    long protocolSvcId,
+    pubsub_protocol_service_t *protocol) {
+    pubsub_tcp_topic_sender_t *sender = calloc(1, sizeof(*sender));
+    sender->ctx = ctx;
+    sender->logHelper = logHelper;
+    sender->serializerType = celix_utils_strdup(serializerType);
+    sender->admin = admin;
+    sender->protocolSvcId = protocolSvcId;
+    sender->protocol = protocol;
+    const char *uuid = celix_bundleContext_getProperty(ctx, OSGI_FRAMEWORK_FRAMEWORK_UUID, NULL);
+    if (uuid != NULL) {
+        uuid_parse(uuid, sender->fwUUID);
+    }
+    pubsubInterceptorsHandler_create(ctx, scope, topic, &sender->interceptorsHandler);
+    sender->isPassive = false;
+    char *urls = NULL;
+    const char *ip = celix_bundleContext_getProperty(ctx, PUBSUB_TCP_PSA_IP_KEY, NULL);
+    const char *discUrl = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_STATIC_BIND_URL_FOR, topic, scope);
+    const char *isPassive = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_PASSIVE_ENABLED, topic, scope);
+    const char *passiveKey = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_PASSIVE_SELECTION_KEY, topic, scope);
+
+    if (isPassive) {
+        sender->isPassive = psa_tcp_isPassive(isPassive);
+    }
+    if (topicProperties != NULL) {
+        if (discUrl == NULL) {
+            discUrl = celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_DISCOVER_URL, NULL);
+        }
+        if (isPassive == NULL) {
+            sender->isPassive = celix_properties_getAsBool(topicProperties, PUBSUB_TCP_PASSIVE_CONFIGURED, false);
+        }
+        if (passiveKey == NULL) {
+            passiveKey = celix_properties_get(topicProperties, PUBSUB_TCP_PASSIVE_KEY, NULL);
+        }
+    }
+    /* When it's an endpoint share the socket with the receiver */
+    if (passiveKey != NULL) {
+        celixThreadMutex_lock(&handlerStore->mutex);
+        pubsub_tcpHandler_t *entry = hashMap_get(handlerStore->map, passiveKey);
+        if (entry == NULL) {
+            if (sender->socketHandler == NULL)
+                sender->socketHandler = pubsub_tcpHandler_create(sender->protocol, sender->logHelper);
+            entry = sender->socketHandler;
+            sender->sharedSocketHandler = sender->socketHandler;
+            hashMap_put(handlerStore->map, (void *) passiveKey, entry);
+        } else {
+            sender->socketHandler = entry;
+            sender->sharedSocketHandler = entry;
+        }
+        celixThreadMutex_unlock(&handlerStore->mutex);
+    } else {
+        sender->socketHandler = pubsub_tcpHandler_create(sender->protocol, sender->logHelper);
+    }
+
+    if ((sender->socketHandler != NULL) && (topicProperties != NULL)) {
+        long prio = celix_properties_getAsLong(topicProperties, PUBSUB_TCP_THREAD_REALTIME_PRIO, -1L);
+        const char *sched = celix_properties_get(topicProperties, PUBSUB_TCP_THREAD_REALTIME_SCHED, NULL);
+        long retryCnt = celix_properties_getAsLong(topicProperties, PUBSUB_TCP_PUBLISHER_RETRY_CNT_KEY, PUBSUB_TCP_PUBLISHER_RETRY_CNT_DEFAULT);
+        double sendTimeout = celix_properties_getAsDouble(topicProperties, PUBSUB_TCP_PUBLISHER_SNDTIMEO_KEY, PUBSUB_TCP_PUBLISHER_SNDTIMEO_DEFAULT);
+        long maxMsgSize = celix_properties_getAsLong(topicProperties, PSA_TCP_MAX_MESSAGE_SIZE, PSA_TCP_DEFAULT_MAX_MESSAGE_SIZE);
+        long timeout = celix_bundleContext_getPropertyAsLong(ctx, PSA_TCP_TIMEOUT, PSA_TCP_DEFAULT_TIMEOUT);
+        sender->send_delay = celix_bundleContext_getPropertyAsLong(ctx,  PUBSUB_UTILS_PSA_SEND_DELAY, PUBSUB_UTILS_PSA_DEFAULT_SEND_DELAY);
+        pubsub_tcpHandler_setThreadName(sender->socketHandler, topic, scope);
+        pubsub_tcpHandler_setThreadPriority(sender->socketHandler, prio, sched);
+        pubsub_tcpHandler_setSendRetryCnt(sender->socketHandler, (unsigned int) retryCnt);
+        pubsub_tcpHandler_setSendTimeOut(sender->socketHandler, sendTimeout);
+        pubsub_tcpHandler_setMaxMsgSize(sender->socketHandler, (unsigned int) maxMsgSize);
+        // Hhen passiveKey is specified, enable receive event for full-duplex connection using key.
+        // Because the topic receiver is already started, enable the receive event.
+        pubsub_tcpHandler_enableReceiveEvent(sender->socketHandler, (passiveKey) ? true : false);
+        pubsub_tcpHandler_setTimeout(sender->socketHandler, (unsigned int) timeout);
+    }
+
+    if (!sender->isPassive) {
+        //setting up tcp socket for TCP TopicSender
+        if (discUrl != NULL) {
+            urls = strndup(discUrl, 1024 * 1024);
+            sender->isStatic = true;
+        } else if (ip != NULL) {
+            urls = strndup(ip, 1024 * 1024);
+        } else {
+            struct sockaddr_in *sin = pubsub_utils_url_getInAddr(NULL, 0);
+            urls = pubsub_utils_url_get_url(sin, NULL);
+            free(sin);
+        }
+        if (!sender->url) {
+            char *urlsCopy = strndup(urls, 1024 * 1024);
+            char *url;
+            char *save = urlsCopy;
+            while ((url = strtok_r(save, " ", &save))) {
+                int retry = 0;
+                while (url && retry < TCP_BIND_MAX_RETRY) {
+                    pubsub_utils_url_t *urlInfo = pubsub_utils_url_parse(url);
+                    int rc = pubsub_tcpHandler_listen(sender->socketHandler, urlInfo->url);
+                    if (rc < 0) {
+                        L_WARN("Error for tcp_bind using dynamic bind url '%s'. %s", urlInfo->url, strerror(errno));
+                    } else {
+                        url = NULL;
+                    }
+                    pubsub_utils_url_free(urlInfo);
+                    retry++;
+                }
+            }
+            free(urlsCopy);
+            sender->url = pubsub_tcpHandler_get_interface_url(sender->socketHandler);
+        }
+        free(urls);
+    }
+
+    //register publisher services using a service factory
+    if ((sender->url != NULL) ||  (sender->isPassive)) {
+        sender->scope = scope == NULL ? NULL : strndup(scope, 1024 * 1024);
+        sender->topic = strndup(topic, 1024 * 1024);
+
+        celixThreadMutex_create(&sender->boundedServices.mutex, NULL);
+        sender->boundedServices.map = hashMap_create(NULL, NULL, NULL, NULL);
+
+        sender->publisher.factory.handle = sender;
+        sender->publisher.factory.getService = psa_tcp_getPublisherService;
+        sender->publisher.factory.ungetService = psa_tcp_ungetPublisherService;
+
+        celix_properties_t *props = celix_properties_create();
+        celix_properties_set(props, PUBSUB_PUBLISHER_TOPIC, sender->topic);
+        if (sender->scope != NULL) {
+            celix_properties_set(props, PUBSUB_PUBLISHER_SCOPE, sender->scope);
+        }
+
+        celix_service_registration_options_t opts = CELIX_EMPTY_SERVICE_REGISTRATION_OPTIONS;
+        opts.factory = &sender->publisher.factory;

Review comment:
       Too much for this PR, but when using the serializer services publisher do not need to be service factories anymore.
   
   Service factories where needed so that publishers could read descriptor files per bundle. With serializer services this strategy has changes (and hopefully a bit simpler) and as result a publisher per bundle is not needed anymore.
   
   Downside that is that it is no longer possible to have serializer per bundle.. but maybe that good.  




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [celix] Oipo merged pull request #334: Feature/pubsub tcp serialization handlers

Posted by GitBox <gi...@apache.org>.
Oipo merged pull request #334:
URL: https://github.com/apache/celix/pull/334


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org