You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@celix.apache.org by GitBox <gi...@apache.org> on 2020/08/29 08:55:15 UTC

[GitHub] [celix] rbulter opened a new pull request #279: Feature/tcp admin msg segmentation

rbulter opened a new pull request #279:
URL: https://github.com/apache/celix/pull/279


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [celix] codecov-io edited a comment on pull request #279: Feature/tcp admin msg segmentation

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #279:
URL: https://github.com/apache/celix/pull/279#issuecomment-704511759


   # [Codecov](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=h1) Report
   > Merging [#279](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=desc) into [master](https://codecov.io/gh/apache/celix/commit/20f794bd1dece8c986119553bc97205bee09cca8?el=desc) will **increase** coverage by `0.34%`.
   > The diff coverage is `65.53%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/celix/pull/279/graphs/tree.svg?width=650&height=150&src=pr&token=JdsiThga8P)](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master     #279      +/-   ##
   ==========================================
   + Coverage   68.30%   68.65%   +0.34%     
   ==========================================
     Files         136      137       +1     
     Lines       27371    27460      +89     
   ==========================================
   + Hits        18697    18852     +155     
   + Misses       8674     8608      -66     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...les/pubsub/pubsub\_admin\_tcp/src/pubsub\_tcp\_admin.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHVic3ViX3RjcF9hZG1pbi5j) | `48.47% <20.00%> (-0.54%)` | :arrow_down: |
   | [...es/pubsub/pubsub\_admin\_tcp/src/pubsub\_tcp\_common.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHVic3ViX3RjcF9jb21tb24uYw==) | `40.00% <40.00%> (ø)` | |
   | [bundles/pubsub/pubsub\_utils/src/pubsub\_utils\_url.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX3V0aWxzL3NyYy9wdWJzdWJfdXRpbHNfdXJsLmM=) | `51.75% <54.54%> (ø)` | |
   | [...b/pubsub\_admin\_tcp/src/pubsub\_tcp\_topic\_receiver.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHVic3ViX3RjcF90b3BpY19yZWNlaXZlci5j) | `63.61% <58.33%> (+0.35%)` | :arrow_up: |
   | [...s/pubsub/pubsub\_admin\_tcp/src/pubsub\_tcp\_handler.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHVic3ViX3RjcF9oYW5kbGVyLmM=) | `74.51% <67.00%> (-0.73%)` | :arrow_down: |
   | [...sub/pubsub\_admin\_tcp/src/pubsub\_tcp\_topic\_sender.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHVic3ViX3RjcF90b3BpY19zZW5kZXIuYw==) | `67.06% <80.95%> (+2.71%)` | :arrow_up: |
   | [...undles/pubsub/pubsub\_admin\_tcp/src/psa\_activator.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHNhX2FjdGl2YXRvci5j) | `100.00% <100.00%> (ø)` | |
   | [...sub\_topology\_manager/src/pubsub\_topology\_manager.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX3RvcG9sb2d5X21hbmFnZXIvc3JjL3B1YnN1Yl90b3BvbG9neV9tYW5hZ2VyLmM=) | `58.13% <100.00%> (+5.33%)` | :arrow_up: |
   | [libs/utils/src/hash\_map.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-bGlicy91dGlscy9zcmMvaGFzaF9tYXAuYw==) | `93.08% <0.00%> (-1.16%)` | :arrow_down: |
   | ... and [11 more](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=footer). Last update [20f794b...0492d9f](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [celix] rbulter commented on a change in pull request #279: Feature/tcp admin msg segmentation

Posted by GitBox <gi...@apache.org>.
rbulter commented on a change in pull request #279:
URL: https://github.com/apache/celix/pull/279#discussion_r542433619



##########
File path: bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
##########
@@ -964,58 +998,30 @@ int pubsub_tcpHandler_addAcceptConnectionCallback(pubsub_tcpHandler_t *handle, v
     return result;
 }
 
-static inline
-int pubsub_tcpHandler_writeSocket(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, struct msghdr* msg, unsigned int size, int flag ) {
-  int nbytes = 0;
-  int msgSize = 0;
-  if (entry->fd >= 0 && size && msg->msg_iovlen) {
-    int expectedReadSize = size;
-    unsigned int offset = 0;
-    nbytes = size;
-    while (nbytes > 0 && expectedReadSize > 0) {
-      // Read the message header
-      nbytes = sendmsg(entry->fd, msg, flag | MSG_NOSIGNAL);
-      // Update admin
-      expectedReadSize -= nbytes;
-      msgSize += nbytes;
-      // Not all written
-      if (expectedReadSize && nbytes > 0) {
-        unsigned int readSize = 0;
-        unsigned int readIndex = 0;
-        unsigned int i = 0;
-        for (i = 0; i < msg->msg_iovlen; i++) {
-          if (nbytes < msg->msg_iov[i].iov_len) {
-            readIndex = i;
-            break;
-          }
-          readSize+= msg->msg_iov[i].iov_len;
-        }
-        msg->msg_iov = &msg->msg_iov[readIndex];
-        msg->msg_iovlen -= readIndex;
-        char* buffer = (char*)msg->msg_iov->iov_base;
-        offset = nbytes - readSize;
-        msg->msg_iov->iov_base = &buffer[offset];
-        msg->msg_iov->iov_len  = msg->msg_iov->iov_len - offset;
-      }
-    }
-  }
-  if (nbytes <=0)  msgSize = nbytes;
-  return msgSize;
-}
+
 //
 // Write large data to TCP. .
 //
 int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message_t *message, struct iovec *msgIoVec,
                             size_t msg_iov_len, int flags) {
-    celixThreadRwlock_readLock(&handle->dbLock);
     int result = 0;
     int connFdCloseQueue[hashMap_size(handle->connection_fd_map)];

Review comment:
       Solved




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [celix] rbulter commented on a change in pull request #279: Feature/tcp admin msg segmentation

Posted by GitBox <gi...@apache.org>.
rbulter commented on a change in pull request #279:
URL: https://github.com/apache/celix/pull/279#discussion_r521397714



##########
File path: bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
##########
@@ -348,40 +354,18 @@ pubsub_tcpHandler_createEntry(pubsub_tcpHandler_t *handle, int fd, char *url, ch
 static inline void
 pubsub_tcpHandler_freeEntry(psa_tcp_connection_entry_t *entry) {
     if (entry) {
-        if (entry->url) {
-            free(entry->url);
-            entry->url = NULL;
-        }
-        if (entry->interface_url) {
-            free(entry->interface_url);
-            entry->interface_url = NULL;
-        }
-        if (entry->fd >= 0) {
-            close(entry->fd);
-            entry->fd = -1;
-        }
-        if (entry->buffer) {
-            free(entry->buffer);
-            entry->buffer = NULL;
-            entry->bufferSize = 0;
-        }
-        if (entry->headerBuffer) {
-            free(entry->headerBuffer);
-            entry->headerBuffer = NULL;
-            entry->headerBufferSize = 0;
-        }
-
-        if (entry->footerBuffer) {
-            free(entry->footerBuffer);
-            entry->footerBuffer = NULL;
-        }
-
-        if (entry->metaBuffer) {
-            free(entry->metaBuffer);
-            entry->metaBuffer = NULL;
-            entry->metaBufferSize = 0;
-        }
-        entry->connected = false;
+        if (entry->url) free(entry->url);

Review comment:
       Solved




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [celix] rbulter commented on a change in pull request #279: Feature/tcp admin msg segmentation

Posted by GitBox <gi...@apache.org>.
rbulter commented on a change in pull request #279:
URL: https://github.com/apache/celix/pull/279#discussion_r544359803



##########
File path: bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
##########
@@ -343,18 +335,25 @@ const char *pubsub_tcpTopicSender_topic(pubsub_tcp_topic_sender_t *sender) {
 }
 
 const char *pubsub_tcpTopicSender_url(pubsub_tcp_topic_sender_t *sender) {
-    return sender->url;
+    if (sender->isPassive) {
+        return pubsub_tcpHandler_get_connection_url(sender->socketHandler);
+    } else {
+        return sender->url;
+    }
 }
-
 bool pubsub_tcpTopicSender_isStatic(pubsub_tcp_topic_sender_t *sender) {
     return sender->isStatic;
 }
 
-void pubsub_tcpTopicSender_connectTo(pubsub_tcp_topic_sender_t *sender, const celix_properties_t *endpoint) {
+bool pubsub_tcpTopicSender_isPassive(pubsub_tcp_topic_sender_t *sender) {
+    return sender->isPassive;
+}
+
+void pubsub_tcpTopicSender_connectTo(pubsub_tcp_topic_sender_t *sender __attribute__((unused)), const celix_properties_t *endpoint __attribute__((unused))) {
     //TODO subscriber count -> topic info
 }
 
-void pubsub_tcpTopicSender_disconnectFrom(pubsub_tcp_topic_sender_t *sender, const celix_properties_t *endpoint) {
+void pubsub_tcpTopicSender_disconnectFrom(pubsub_tcp_topic_sender_t *sender __attribute__((unused)), const celix_properties_t *endpoint __attribute__((unused))) {

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [celix] rbulter commented on a change in pull request #279: Feature/tcp admin msg segmentation

Posted by GitBox <gi...@apache.org>.
rbulter commented on a change in pull request #279:
URL: https://github.com/apache/celix/pull/279#discussion_r534419969



##########
File path: bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_common.c
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <stdio.h>
+#include <string.h>
+#include "pubsub_tcp_common.h"
+
+
+bool psa_tcp_isPassive(const char* buffer) {
+    bool isPassive = false;
+    // Parse Properties
+    if (buffer != NULL) {
+        char buf[32];
+        snprintf(buf, 32, "%s", buffer);
+        char *trimmed = utils_stringTrim(buf);
+        if (strncasecmp("true", trimmed, strlen("true")) == 0) {
+            isPassive = true;
+        } else if (strncasecmp("false", trimmed, strlen("false")) == 0) {

Review comment:
       Solved




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [celix] Oipo commented on a change in pull request #279: Feature/tcp admin msg segmentation

Posted by GitBox <gi...@apache.org>.
Oipo commented on a change in pull request #279:
URL: https://github.com/apache/celix/pull/279#discussion_r544194500



##########
File path: bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
##########
@@ -628,16 +615,24 @@ int pubsub_tcpHandler_listen(pubsub_tcpHandler_t *handle, char *url) {
 }
 
 //
-// Setup buffer sizes
+// Setup receive buffer size
+//
+int pubsub_tcpHandler_setReceiveBufferSize(pubsub_tcpHandler_t *handle, unsigned int size) {
+    if (handle != NULL) {
+        celixThreadRwlock_writeLock(&handle->dbLock);
+        handle->bufferSize = size;

Review comment:
       Please add comment.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [celix] codecov-io edited a comment on pull request #279: Feature/tcp admin msg segmentation

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #279:
URL: https://github.com/apache/celix/pull/279#issuecomment-704511759


   # [Codecov](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=h1) Report
   > Merging [#279](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=desc) (21af91b) into [master](https://codecov.io/gh/apache/celix/commit/2546c643b98550a6fc22b5b4eb98ce772f955d79?el=desc) (2546c64) will **increase** coverage by `0.12%`.
   > The diff coverage is `65.34%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/celix/pull/279/graphs/tree.svg?width=650&height=150&src=pr&token=JdsiThga8P)](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master     #279      +/-   ##
   ==========================================
   + Coverage   66.74%   66.86%   +0.12%     
   ==========================================
     Files         147      148       +1     
     Lines       29947    30041      +94     
   ==========================================
   + Hits        19987    20088     +101     
   + Misses       9960     9953       -7     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...es/pubsub/pubsub\_admin\_tcp/src/pubsub\_tcp\_common.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHVic3ViX3RjcF9jb21tb24uYw==) | `0.00% <0.00%> (ø)` | |
   | [...les/pubsub/pubsub\_admin\_tcp/src/pubsub\_tcp\_admin.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHVic3ViX3RjcF9hZG1pbi5j) | `51.52% <20.00%> (+2.52%)` | :arrow_up: |
   | [...b/pubsub\_admin\_tcp/src/pubsub\_tcp\_topic\_receiver.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHVic3ViX3RjcF90b3BpY19yZWNlaXZlci5j) | `63.47% <53.84%> (+0.20%)` | :arrow_up: |
   | [bundles/pubsub/pubsub\_utils/src/pubsub\_utils\_url.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX3V0aWxzL3NyYy9wdWJzdWJfdXRpbHNfdXJsLmM=) | `53.07% <54.54%> (ø)` | |
   | [...s/pubsub/pubsub\_admin\_tcp/src/pubsub\_tcp\_handler.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHVic3ViX3RjcF9oYW5kbGVyLmM=) | `74.96% <68.13%> (-0.28%)` | :arrow_down: |
   | [...sub\_topology\_manager/src/pubsub\_topology\_manager.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX3RvcG9sb2d5X21hbmFnZXIvc3JjL3B1YnN1Yl90b3BvbG9neV9tYW5hZ2VyLmM=) | `56.29% <80.00%> (+3.38%)` | :arrow_up: |
   | [...sub/pubsub\_admin\_tcp/src/pubsub\_tcp\_topic\_sender.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHVic3ViX3RjcF90b3BpY19zZW5kZXIuYw==) | `68.93% <86.56%> (+2.77%)` | :arrow_up: |
   | [...undles/pubsub/pubsub\_admin\_tcp/src/psa\_activator.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHNhX2FjdGl2YXRvci5j) | `100.00% <100.00%> (ø)` | |
   | [libs/framework/src/celix\_log.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-bGlicy9mcmFtZXdvcmsvc3JjL2NlbGl4X2xvZy5j) | `78.12% <0.00%> (-10.94%)` | :arrow_down: |
   | [libs/framework/src/framework.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-bGlicy9mcmFtZXdvcmsvc3JjL2ZyYW1ld29yay5j) | `75.32% <0.00%> (-0.22%)` | :arrow_down: |
   | ... and [11 more](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=footer). Last update [2546c64...21af91b](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [celix] rbulter commented on pull request #279: Feature/tcp admin msg segmentation

Posted by GitBox <gi...@apache.org>.
rbulter commented on pull request #279:
URL: https://github.com/apache/celix/pull/279#issuecomment-747648329


   The pubsub admin should work with wire protocol v1, in the past it was working. I will add a unit test


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [celix] Oipo commented on pull request #279: Feature/tcp admin msg segmentation

Posted by GitBox <gi...@apache.org>.
Oipo commented on pull request #279:
URL: https://github.com/apache/celix/pull/279#issuecomment-727843566


   FYI, I've started reviewing on Friday, but it's taking a significant amount of time.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [celix] Oipo commented on a change in pull request #279: Feature/tcp admin msg segmentation

Posted by GitBox <gi...@apache.org>.
Oipo commented on a change in pull request #279:
URL: https://github.com/apache/celix/pull/279#discussion_r544260711



##########
File path: bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
##########
@@ -343,18 +335,25 @@ const char *pubsub_tcpTopicSender_topic(pubsub_tcp_topic_sender_t *sender) {
 }
 
 const char *pubsub_tcpTopicSender_url(pubsub_tcp_topic_sender_t *sender) {
-    return sender->url;
+    if (sender->isPassive) {
+        return pubsub_tcpHandler_get_connection_url(sender->socketHandler);
+    } else {
+        return sender->url;
+    }
 }
-
 bool pubsub_tcpTopicSender_isStatic(pubsub_tcp_topic_sender_t *sender) {
     return sender->isStatic;
 }
 
-void pubsub_tcpTopicSender_connectTo(pubsub_tcp_topic_sender_t *sender, const celix_properties_t *endpoint) {
+bool pubsub_tcpTopicSender_isPassive(pubsub_tcp_topic_sender_t *sender) {
+    return sender->isPassive;
+}
+
+void pubsub_tcpTopicSender_connectTo(pubsub_tcp_topic_sender_t *sender __attribute__((unused)), const celix_properties_t *endpoint __attribute__((unused))) {

Review comment:
       Please mark function as deprecated in header.

##########
File path: bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
##########
@@ -343,18 +335,25 @@ const char *pubsub_tcpTopicSender_topic(pubsub_tcp_topic_sender_t *sender) {
 }
 
 const char *pubsub_tcpTopicSender_url(pubsub_tcp_topic_sender_t *sender) {
-    return sender->url;
+    if (sender->isPassive) {
+        return pubsub_tcpHandler_get_connection_url(sender->socketHandler);
+    } else {
+        return sender->url;
+    }
 }
-
 bool pubsub_tcpTopicSender_isStatic(pubsub_tcp_topic_sender_t *sender) {
     return sender->isStatic;
 }
 
-void pubsub_tcpTopicSender_connectTo(pubsub_tcp_topic_sender_t *sender, const celix_properties_t *endpoint) {
+bool pubsub_tcpTopicSender_isPassive(pubsub_tcp_topic_sender_t *sender) {
+    return sender->isPassive;
+}
+
+void pubsub_tcpTopicSender_connectTo(pubsub_tcp_topic_sender_t *sender __attribute__((unused)), const celix_properties_t *endpoint __attribute__((unused))) {
     //TODO subscriber count -> topic info
 }
 
-void pubsub_tcpTopicSender_disconnectFrom(pubsub_tcp_topic_sender_t *sender, const celix_properties_t *endpoint) {
+void pubsub_tcpTopicSender_disconnectFrom(pubsub_tcp_topic_sender_t *sender __attribute__((unused)), const celix_properties_t *endpoint __attribute__((unused))) {

Review comment:
       Please mark function as deprecated in header.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [celix] codecov-io edited a comment on pull request #279: Feature/tcp admin msg segmentation

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #279:
URL: https://github.com/apache/celix/pull/279#issuecomment-704511759


   # [Codecov](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=h1) Report
   > Merging [#279](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=desc) into [master](https://codecov.io/gh/apache/celix/commit/20f794bd1dece8c986119553bc97205bee09cca8?el=desc) will **increase** coverage by `0.17%`.
   > The diff coverage is `64.41%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/celix/pull/279/graphs/tree.svg?width=650&height=150&src=pr&token=JdsiThga8P)](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master     #279      +/-   ##
   ==========================================
   + Coverage   68.30%   68.48%   +0.17%     
   ==========================================
     Files         136      137       +1     
     Lines       27371    27444      +73     
   ==========================================
   + Hits        18697    18796      +99     
   + Misses       8674     8648      -26     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...les/pubsub/pubsub\_admin\_tcp/src/pubsub\_tcp\_admin.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHVic3ViX3RjcF9hZG1pbi5j) | `48.47% <20.00%> (-0.54%)` | :arrow_down: |
   | [...es/pubsub/pubsub\_admin\_tcp/src/pubsub\_tcp\_common.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHVic3ViX3RjcF9jb21tb24uYw==) | `40.00% <40.00%> (ø)` | |
   | [bundles/pubsub/pubsub\_utils/src/pubsub\_utils\_url.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX3V0aWxzL3NyYy9wdWJzdWJfdXRpbHNfdXJsLmM=) | `51.75% <54.54%> (ø)` | |
   | [...b/pubsub\_admin\_tcp/src/pubsub\_tcp\_topic\_receiver.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHVic3ViX3RjcF90b3BpY19yZWNlaXZlci5j) | `61.60% <58.33%> (-1.66%)` | :arrow_down: |
   | [...s/pubsub/pubsub\_admin\_tcp/src/pubsub\_tcp\_handler.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHVic3ViX3RjcF9oYW5kbGVyLmM=) | `72.60% <65.76%> (-2.64%)` | :arrow_down: |
   | [...sub/pubsub\_admin\_tcp/src/pubsub\_tcp\_topic\_sender.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHVic3ViX3RjcF90b3BpY19zZW5kZXIuYw==) | `67.06% <80.95%> (+2.71%)` | :arrow_up: |
   | [...undles/pubsub/pubsub\_admin\_tcp/src/psa\_activator.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHNhX2FjdGl2YXRvci5j) | `100.00% <100.00%> (ø)` | |
   | [...sub\_topology\_manager/src/pubsub\_topology\_manager.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX3RvcG9sb2d5X21hbmFnZXIvc3JjL3B1YnN1Yl90b3BvbG9neV9tYW5hZ2VyLmM=) | `56.21% <100.00%> (+3.41%)` | :arrow_up: |
   | ... and [13 more](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=footer). Last update [20f794b...7ef5e0e](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [celix] codecov-io edited a comment on pull request #279: Feature/tcp admin msg segmentation

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #279:
URL: https://github.com/apache/celix/pull/279#issuecomment-704511759


   # [Codecov](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=h1) Report
   > Merging [#279](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=desc) (f544169) into [master](https://codecov.io/gh/apache/celix/commit/2546c643b98550a6fc22b5b4eb98ce772f955d79?el=desc) (2546c64) will **increase** coverage by `0.15%`.
   > The diff coverage is `64.66%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/celix/pull/279/graphs/tree.svg?width=650&height=150&src=pr&token=JdsiThga8P)](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master     #279      +/-   ##
   ==========================================
   + Coverage   66.74%   66.89%   +0.15%     
   ==========================================
     Files         147      148       +1     
     Lines       29947    30041      +94     
   ==========================================
   + Hits        19987    20095     +108     
   + Misses       9960     9946      -14     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...es/pubsub/pubsub\_admin\_tcp/src/pubsub\_tcp\_common.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHVic3ViX3RjcF9jb21tb24uYw==) | `0.00% <0.00%> (ø)` | |
   | [...les/pubsub/pubsub\_admin\_tcp/src/pubsub\_tcp\_admin.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHVic3ViX3RjcF9hZG1pbi5j) | `48.47% <20.00%> (-0.54%)` | :arrow_down: |
   | [...b/pubsub\_admin\_tcp/src/pubsub\_tcp\_topic\_receiver.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHVic3ViX3RjcF90b3BpY19yZWNlaXZlci5j) | `63.47% <50.00%> (+0.20%)` | :arrow_up: |
   | [bundles/pubsub/pubsub\_utils/src/pubsub\_utils\_url.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX3V0aWxzL3NyYy9wdWJzdWJfdXRpbHNfdXJsLmM=) | `53.07% <54.54%> (ø)` | |
   | [...s/pubsub/pubsub\_admin\_tcp/src/pubsub\_tcp\_handler.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHVic3ViX3RjcF9oYW5kbGVyLmM=) | `74.96% <68.13%> (-0.28%)` | :arrow_down: |
   | [...sub\_topology\_manager/src/pubsub\_topology\_manager.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX3RvcG9sb2d5X21hbmFnZXIvc3JjL3B1YnN1Yl90b3BvbG9neV9tYW5hZ2VyLmM=) | `56.18% <80.00%> (+3.27%)` | :arrow_up: |
   | [...sub/pubsub\_admin\_tcp/src/pubsub\_tcp\_topic\_sender.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHVic3ViX3RjcF90b3BpY19zZW5kZXIuYw==) | `68.93% <86.56%> (+2.77%)` | :arrow_up: |
   | [...undles/pubsub/pubsub\_admin\_tcp/src/psa\_activator.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHNhX2FjdGl2YXRvci5j) | `100.00% <100.00%> (ø)` | |
   | [libs/utils/src/hash\_map.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-bGlicy91dGlscy9zcmMvaGFzaF9tYXAuYw==) | `93.08% <0.00%> (-0.58%)` | :arrow_down: |
   | ... and [8 more](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=footer). Last update [2546c64...f544169](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [celix] Oipo commented on a change in pull request #279: Feature/tcp admin msg segmentation

Posted by GitBox <gi...@apache.org>.
Oipo commented on a change in pull request #279:
URL: https://github.com/apache/celix/pull/279#discussion_r544193775



##########
File path: bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
##########
@@ -78,18 +74,27 @@ typedef struct psa_tcp_connection_entry {
     bool connected;
     bool headerError;
     pubsub_protocol_message_t header;
-    unsigned int syncSize;
-    unsigned int headerSize;
-    unsigned int headerBufferSize; // Size of headerBuffer, size = 0, no headerBuffer -> included in payload
-    void *headerBuffer;
-    unsigned int footerSize;
-    void *footerBuffer;
-    unsigned int bufferSize;
+    size_t maxMsgSize;
+    size_t readHeaderSize;
+    size_t readHeaderBufferSize; // Size of headerBuffer
+    void *readHeaderBuffer;
+    size_t writeHeaderBufferSize; // Size of headerBuffer
+    void *writeHeaderBuffer;
+    size_t readFooterSize;
+    size_t readFooterBufferSize;
+    void *readFooterBuffer;
+    size_t writeFooterBufferSize;
+    void *writeFooterBuffer;
+    size_t bufferSize;
     void *buffer;
-    unsigned int bufferReadSize;
-    unsigned int metaBufferSize;
-    void *metaBuffer;
+    size_t readMetaBufferSize;
+    void *readMetaBuffer;
+    size_t writeMetaBufferSize;
+    void *writeMetaBuffer;
     unsigned int retryCount;
+    celix_thread_mutex_t writeMutex;
+    celix_thread_mutex_t readMutex;

Review comment:
       Please remove `readMutex`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [celix] codecov-io edited a comment on pull request #279: Feature/tcp admin msg segmentation

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #279:
URL: https://github.com/apache/celix/pull/279#issuecomment-704511759


   # [Codecov](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=h1) Report
   > Merging [#279](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=desc) (9663481) into [master](https://codecov.io/gh/apache/celix/commit/dd4dca1d52c36a794dfac71867f0abc3fd81241c?el=desc) (dd4dca1) will **increase** coverage by `0.07%`.
   > The diff coverage is `63.65%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/celix/pull/279/graphs/tree.svg?width=650&height=150&src=pr&token=JdsiThga8P)](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master     #279      +/-   ##
   ==========================================
   + Coverage   68.17%   68.24%   +0.07%     
   ==========================================
     Files         148      149       +1     
     Lines       30088    30171      +83     
   ==========================================
   + Hits        20511    20590      +79     
   - Misses       9577     9581       +4     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...es/pubsub/pubsub\_admin\_tcp/src/pubsub\_tcp\_common.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHVic3ViX3RjcF9jb21tb24uYw==) | `0.00% <0.00%> (ø)` | |
   | [...les/pubsub/pubsub\_admin\_tcp/src/pubsub\_tcp\_admin.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHVic3ViX3RjcF9hZG1pbi5j) | `48.24% <20.00%> (-0.76%)` | :arrow_down: |
   | [...b/pubsub\_admin\_tcp/src/pubsub\_tcp\_topic\_receiver.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHVic3ViX3RjcF90b3BpY19yZWNlaXZlci5j) | `63.47% <50.70%> (+0.20%)` | :arrow_up: |
   | [bundles/pubsub/pubsub\_utils/src/pubsub\_utils\_url.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX3V0aWxzL3NyYy9wdWJzdWJfdXRpbHNfdXJsLmM=) | `53.07% <54.54%> (ø)` | |
   | [...sub\_topology\_manager/src/pubsub\_topology\_manager.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX3RvcG9sb2d5X21hbmFnZXIvc3JjL3B1YnN1Yl90b3BvbG9neV9tYW5hZ2VyLmM=) | `54.14% <60.00%> (+1.22%)` | :arrow_up: |
   | [...s/pubsub/pubsub\_admin\_tcp/src/pubsub\_tcp\_handler.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHVic3ViX3RjcF9oYW5kbGVyLmM=) | `75.42% <67.34%> (+0.18%)` | :arrow_up: |
   | [...sub/pubsub\_admin\_tcp/src/pubsub\_tcp\_topic\_sender.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHVic3ViX3RjcF90b3BpY19zZW5kZXIuYw==) | `68.34% <84.05%> (+2.18%)` | :arrow_up: |
   | [...undles/pubsub/pubsub\_admin\_tcp/src/psa\_activator.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHNhX2FjdGl2YXRvci5j) | `100.00% <100.00%> (ø)` | |
   | [...b\_protocol\_wire\_v1/src/pubsub\_wire\_protocol\_impl.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX3Byb3RvY29sL3B1YnN1Yl9wcm90b2NvbF93aXJlX3YxL3NyYy9wdWJzdWJfd2lyZV9wcm90b2NvbF9pbXBsLmM=) | `96.38% <100.00%> (+9.80%)` | :arrow_up: |
   | ... and [11 more](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=footer). Last update [dd4dca1...9663481](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [celix] codecov-io edited a comment on pull request #279: Feature/tcp admin msg segmentation

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #279:
URL: https://github.com/apache/celix/pull/279#issuecomment-704511759


   # [Codecov](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=h1) Report
   > Merging [#279](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=desc) into [master](https://codecov.io/gh/apache/celix/commit/20f794bd1dece8c986119553bc97205bee09cca8?el=desc) will **increase** coverage by `0.31%`.
   > The diff coverage is `65.53%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/celix/pull/279/graphs/tree.svg?width=650&height=150&src=pr&token=JdsiThga8P)](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master     #279      +/-   ##
   ==========================================
   + Coverage   68.30%   68.62%   +0.31%     
   ==========================================
     Files         136      137       +1     
     Lines       27371    27460      +89     
   ==========================================
   + Hits        18697    18845     +148     
   + Misses       8674     8615      -59     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...les/pubsub/pubsub\_admin\_tcp/src/pubsub\_tcp\_admin.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHVic3ViX3RjcF9hZG1pbi5j) | `48.47% <20.00%> (-0.54%)` | :arrow_down: |
   | [...es/pubsub/pubsub\_admin\_tcp/src/pubsub\_tcp\_common.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHVic3ViX3RjcF9jb21tb24uYw==) | `40.00% <40.00%> (ø)` | |
   | [bundles/pubsub/pubsub\_utils/src/pubsub\_utils\_url.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX3V0aWxzL3NyYy9wdWJzdWJfdXRpbHNfdXJsLmM=) | `51.75% <54.54%> (ø)` | |
   | [...b/pubsub\_admin\_tcp/src/pubsub\_tcp\_topic\_receiver.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHVic3ViX3RjcF90b3BpY19yZWNlaXZlci5j) | `63.61% <58.33%> (+0.35%)` | :arrow_up: |
   | [...s/pubsub/pubsub\_admin\_tcp/src/pubsub\_tcp\_handler.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHVic3ViX3RjcF9oYW5kbGVyLmM=) | `74.51% <67.00%> (-0.73%)` | :arrow_down: |
   | [...sub/pubsub\_admin\_tcp/src/pubsub\_tcp\_topic\_sender.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHVic3ViX3RjcF90b3BpY19zZW5kZXIuYw==) | `67.06% <80.95%> (+2.71%)` | :arrow_up: |
   | [...undles/pubsub/pubsub\_admin\_tcp/src/psa\_activator.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHNhX2FjdGl2YXRvci5j) | `100.00% <100.00%> (ø)` | |
   | [...sub\_topology\_manager/src/pubsub\_topology\_manager.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX3RvcG9sb2d5X21hbmFnZXIvc3JjL3B1YnN1Yl90b3BvbG9neV9tYW5hZ2VyLmM=) | `57.79% <100.00%> (+4.99%)` | :arrow_up: |
   | [libs/utils/src/hash\_map.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-bGlicy91dGlscy9zcmMvaGFzaF9tYXAuYw==) | `93.37% <0.00%> (-0.87%)` | :arrow_down: |
   | ... and [12 more](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=footer). Last update [20f794b...000245e](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [celix] rbulter commented on a change in pull request #279: Feature/tcp admin msg segmentation

Posted by GitBox <gi...@apache.org>.
rbulter commented on a change in pull request #279:
URL: https://github.com/apache/celix/pull/279#discussion_r544349044



##########
File path: bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
##########
@@ -343,18 +335,25 @@ const char *pubsub_tcpTopicSender_topic(pubsub_tcp_topic_sender_t *sender) {
 }
 
 const char *pubsub_tcpTopicSender_url(pubsub_tcp_topic_sender_t *sender) {
-    return sender->url;
+    if (sender->isPassive) {
+        return pubsub_tcpHandler_get_connection_url(sender->socketHandler);
+    } else {
+        return sender->url;
+    }
 }
-
 bool pubsub_tcpTopicSender_isStatic(pubsub_tcp_topic_sender_t *sender) {
     return sender->isStatic;
 }
 
-void pubsub_tcpTopicSender_connectTo(pubsub_tcp_topic_sender_t *sender, const celix_properties_t *endpoint) {
+bool pubsub_tcpTopicSender_isPassive(pubsub_tcp_topic_sender_t *sender) {
+    return sender->isPassive;
+}
+
+void pubsub_tcpTopicSender_connectTo(pubsub_tcp_topic_sender_t *sender __attribute__((unused)), const celix_properties_t *endpoint __attribute__((unused))) {
     //TODO subscriber count -> topic info
 }
 
-void pubsub_tcpTopicSender_disconnectFrom(pubsub_tcp_topic_sender_t *sender, const celix_properties_t *endpoint) {
+void pubsub_tcpTopicSender_disconnectFrom(pubsub_tcp_topic_sender_t *sender __attribute__((unused)), const celix_properties_t *endpoint __attribute__((unused))) {

Review comment:
       This also applies to ZMQ please fix this aswell




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [celix] codecov-io edited a comment on pull request #279: Feature/tcp admin msg segmentation

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #279:
URL: https://github.com/apache/celix/pull/279#issuecomment-704511759


   # [Codecov](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=h1) Report
   > :exclamation: No coverage uploaded for pull request base (`master@20f794b`). [Click here to learn what that means](https://docs.codecov.io/docs/error-reference#section-missing-base-commit).
   > The diff coverage is `64.01%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/celix/pull/279/graphs/tree.svg?width=650&height=150&src=pr&token=JdsiThga8P)](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff            @@
   ##             master     #279   +/-   ##
   =========================================
     Coverage          ?   68.57%           
   =========================================
     Files             ?      137           
     Lines             ?    27443           
     Branches          ?        0           
   =========================================
     Hits              ?    18819           
     Misses            ?     8624           
     Partials          ?        0           
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...les/pubsub/pubsub\_admin\_tcp/src/pubsub\_tcp\_admin.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHVic3ViX3RjcF9hZG1pbi5j) | `51.74% <20.00%> (ø)` | |
   | [...es/pubsub/pubsub\_admin\_tcp/src/pubsub\_tcp\_common.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHVic3ViX3RjcF9jb21tb24uYw==) | `40.00% <40.00%> (ø)` | |
   | [bundles/pubsub/pubsub\_utils/src/pubsub\_utils\_url.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX3V0aWxzL3NyYy9wdWJzdWJfdXRpbHNfdXJsLmM=) | `51.75% <54.54%> (ø)` | |
   | [...b/pubsub\_admin\_tcp/src/pubsub\_tcp\_topic\_receiver.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHVic3ViX3RjcF90b3BpY19yZWNlaXZlci5j) | `61.60% <58.33%> (ø)` | |
   | [...s/pubsub/pubsub\_admin\_tcp/src/pubsub\_tcp\_handler.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHVic3ViX3RjcF9oYW5kbGVyLmM=) | `75.13% <65.51%> (ø)` | |
   | [...sub/pubsub\_admin\_tcp/src/pubsub\_tcp\_topic\_sender.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHVic3ViX3RjcF90b3BpY19zZW5kZXIuYw==) | `67.06% <80.95%> (ø)` | |
   | [...undles/pubsub/pubsub\_admin\_tcp/src/psa\_activator.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHNhX2FjdGl2YXRvci5j) | `100.00% <100.00%> (ø)` | |
   | [...sub\_topology\_manager/src/pubsub\_topology\_manager.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX3RvcG9sb2d5X21hbmFnZXIvc3JjL3B1YnN1Yl90b3BvbG9neV9tYW5hZ2VyLmM=) | `57.25% <100.00%> (ø)` | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=footer). Last update [20f794b...baeda66](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [celix] rbulter commented on pull request #279: Feature/tcp admin msg segmentation

Posted by GitBox <gi...@apache.org>.
rbulter commented on pull request #279:
URL: https://github.com/apache/celix/pull/279#issuecomment-747649224


   I added a test for wire_v1 and found the error in wire_v1. During the refactor of wire_v1 a flag was removed, which is needed by tcp_admin for message segmentation


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [celix] rbulter commented on a change in pull request #279: Feature/tcp admin msg segmentation

Posted by GitBox <gi...@apache.org>.
rbulter commented on a change in pull request #279:
URL: https://github.com/apache/celix/pull/279#discussion_r544358684



##########
File path: bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
##########
@@ -1187,6 +1245,32 @@ char *pubsub_tcpHandler_get_interface_url(pubsub_tcpHandler_t *handle) {
     }
     return url;
 }
+//
+// get interface URL
+//
+char *pubsub_tcpHandler_get_connection_url(pubsub_tcpHandler_t *handle) {
+    hash_map_iterator_t iter =

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [celix] codecov-io edited a comment on pull request #279: Feature/tcp admin msg segmentation

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #279:
URL: https://github.com/apache/celix/pull/279#issuecomment-704511759


   # [Codecov](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=h1) Report
   > Merging [#279](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=desc) (9663481) into [master](https://codecov.io/gh/apache/celix/commit/dd4dca1d52c36a794dfac71867f0abc3fd81241c?el=desc) (dd4dca1) will **increase** coverage by `0.09%`.
   > The diff coverage is `63.65%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/celix/pull/279/graphs/tree.svg?width=650&height=150&src=pr&token=JdsiThga8P)](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master     #279      +/-   ##
   ==========================================
   + Coverage   68.17%   68.26%   +0.09%     
   ==========================================
     Files         148      149       +1     
     Lines       30088    30171      +83     
   ==========================================
   + Hits        20511    20597      +86     
   + Misses       9577     9574       -3     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...es/pubsub/pubsub\_admin\_tcp/src/pubsub\_tcp\_common.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHVic3ViX3RjcF9jb21tb24uYw==) | `0.00% <0.00%> (ø)` | |
   | [...les/pubsub/pubsub\_admin\_tcp/src/pubsub\_tcp\_admin.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHVic3ViX3RjcF9hZG1pbi5j) | `48.24% <20.00%> (-0.76%)` | :arrow_down: |
   | [...b/pubsub\_admin\_tcp/src/pubsub\_tcp\_topic\_receiver.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHVic3ViX3RjcF90b3BpY19yZWNlaXZlci5j) | `63.47% <50.70%> (+0.20%)` | :arrow_up: |
   | [bundles/pubsub/pubsub\_utils/src/pubsub\_utils\_url.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX3V0aWxzL3NyYy9wdWJzdWJfdXRpbHNfdXJsLmM=) | `53.07% <54.54%> (ø)` | |
   | [...sub\_topology\_manager/src/pubsub\_topology\_manager.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX3RvcG9sb2d5X21hbmFnZXIvc3JjL3B1YnN1Yl90b3BvbG9neV9tYW5hZ2VyLmM=) | `54.25% <60.00%> (+1.34%)` | :arrow_up: |
   | [...s/pubsub/pubsub\_admin\_tcp/src/pubsub\_tcp\_handler.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHVic3ViX3RjcF9oYW5kbGVyLmM=) | `75.42% <67.34%> (+0.18%)` | :arrow_up: |
   | [...sub/pubsub\_admin\_tcp/src/pubsub\_tcp\_topic\_sender.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHVic3ViX3RjcF90b3BpY19zZW5kZXIuYw==) | `68.93% <84.05%> (+2.77%)` | :arrow_up: |
   | [...undles/pubsub/pubsub\_admin\_tcp/src/psa\_activator.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHNhX2FjdGl2YXRvci5j) | `100.00% <100.00%> (ø)` | |
   | [...b\_protocol\_wire\_v1/src/pubsub\_wire\_protocol\_impl.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX3Byb3RvY29sL3B1YnN1Yl9wcm90b2NvbF93aXJlX3YxL3NyYy9wdWJzdWJfd2lyZV9wcm90b2NvbF9pbXBsLmM=) | `96.38% <100.00%> (+9.80%)` | :arrow_up: |
   | ... and [12 more](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=footer). Last update [dd4dca1...9663481](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [celix] Oipo commented on a change in pull request #279: Feature/tcp admin msg segmentation

Posted by GitBox <gi...@apache.org>.
Oipo commented on a change in pull request #279:
URL: https://github.com/apache/celix/pull/279#discussion_r544261994



##########
File path: bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
##########
@@ -31,7 +31,8 @@
 #include "pubsub/subscriber.h"
 
 #define PUBSUB_TOPOLOGY_MANAGER_VERBOSE_KEY         "PUBSUB_TOPOLOGY_MANAGER_VERBOSE"
-#define PUBSUB_TOPOLOGY_MANAGER_HANDLING_THREAD_SLEEPTIME_SECONDS_KEY         "PUBSUB_TOPOLOGY_MANAGER_HANDLING_THREAD_SLEEPTIME_SECONDS"

Review comment:
       Please put previous define back.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [celix] rbulter commented on a change in pull request #279: Feature/tcp admin msg segmentation

Posted by GitBox <gi...@apache.org>.
rbulter commented on a change in pull request #279:
URL: https://github.com/apache/celix/pull/279#discussion_r534419702



##########
File path: bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h
##########
@@ -23,19 +23,20 @@
 #define PSA_TCP_BASE_PORT                       "PSA_TCP_BASE_PORT"
 #define PSA_TCP_MAX_PORT                        "PSA_TCP_MAX_PORT"
 
-#define PSA_TCP_MAX_RECV_SESSIONS               "PSA_TCP_MAX_RECV_SESSIONS"
+#define PSA_TCP_MAX_MESSAGE_SIZE                "PSA_TCP_MAX_MESSAGE_SIZE"
 #define PSA_TCP_RECV_BUFFER_SIZE                "PSA_TCP_RECV_BUFFER_SIZE"
 #define PSA_TCP_TIMEOUT                         "PSA_TCP_TIMEOUT"
 #define PSA_TCP_SUBSCRIBER_CONNECTION_TIMEOUT   "PSA_TCP_SUBSCRIBER_CONNECTION_TIMEOUT"
+#define PSA_TCP_SEND_DELAY                      "PSA_TCP_SEND_DELAY"
 
 #define PSA_TCP_DEFAULT_BASE_PORT               5501
 #define PSA_TCP_DEFAULT_MAX_PORT                6000
 
-#define PSA_TCP_DEFAULT_MAX_RECV_SESSIONS       1
-
+#define PSA_TCP_DEFAULT_MAX_MESSAGE_SIZE        UINT32_MAX
 #define PSA_TCP_DEFAULT_RECV_BUFFER_SIZE        65 * 1024
 #define PSA_TCP_DEFAULT_TIMEOUT                 2000 // 2 seconds
 #define PSA_TCP_SUBSCRIBER_CONNECTION_DEFAULT_TIMEOUT 250 // 250 ms
+#define PSA_TCP_DEFAULT_SEND_DELAY              250 //  250 ms

Review comment:
       I changed this to your suggestion




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [celix] Oipo commented on pull request #279: Feature/tcp admin msg segmentation

Posted by GitBox <gi...@apache.org>.
Oipo commented on pull request #279:
URL: https://github.com/apache/celix/pull/279#issuecomment-744327492


   @rbulter you have missed some review comments. Please check them all.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [celix] pnoltes commented on a change in pull request #279: Feature/tcp admin msg segmentation

Posted by GitBox <gi...@apache.org>.
pnoltes commented on a change in pull request #279:
URL: https://github.com/apache/celix/pull/279#discussion_r528975911



##########
File path: bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h
##########
@@ -102,21 +103,30 @@
  */
 #define PUBSUB_TCP_STATIC_CONNECT_URLS          "tcp.static.connect.urls"
 
+
 /**
- * Name of environment variable with space-separated list of ips/urls to connect to
- * e.g. PSA_TCP_STATIC_CONNECT_FOR_topic_scope="tcp://127.0.0.1:4444 tcp://127.0.0.2:4444"
+ * Defines if the publisher / subscriber is a passive endpoint and shares

Review comment:
       Can this be explained more. What is meant by passive. I am assuming not taking part in discovery.. correct?
   
   And what are the possible values for tcp.passive.configured / tcp.passive.key

##########
File path: bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
##########
@@ -144,119 +146,108 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
     if (uuid != NULL) {
         uuid_parse(uuid, sender->fwUUID);
     }
-    sender->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_TCP_METRICS_ENABLED,
-                                                                   PSA_TCP_DEFAULT_METRICS_ENABLED);
-    bool isEndpoint = false;
+    pubsubInterceptorsHandler_create(ctx, scope, topic, &sender->interceptorsHandler);
+    sender->isPassive = false;
+    sender->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_TCP_METRICS_ENABLED, PSA_TCP_DEFAULT_METRICS_ENABLED);
     char *urls = NULL;
     const char *ip = celix_bundleContext_getProperty(ctx, PUBSUB_TCP_PSA_IP_KEY, NULL);
-    const char *discUrl = NULL;
-    const char *staticClientEndPointUrls = NULL;
-    const char *staticServerEndPointUrls = NULL;
-
-    discUrl = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_STATIC_BIND_URL_FOR, topic, scope);
+    const char *discUrl = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_STATIC_BIND_URL_FOR, topic, scope);
+    const char *isPassive = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_PASSIVE_ENABLED, topic, scope);
+    const char *passiveKey = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_PASSIVE_SELECTION_KEY, topic, scope);
 
     if (topicProperties != NULL) {
         if (discUrl == NULL) {
             discUrl = celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_DISCOVER_URL, NULL);
         }
-        /* Check if it's a static endpoint */
-        const char *endPointType = celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_ENDPOINT_TYPE, NULL);
-        if (endPointType != NULL) {
-            isEndpoint = true;
-            if (strncmp(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_CLIENT, endPointType,
-                        strlen(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_CLIENT)) == 0) {
-                staticClientEndPointUrls = celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_CONNECT_URLS, NULL);
-            }
-            if (strncmp(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_SERVER, endPointType,
-                        strlen(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_SERVER)) == 0) {
-                staticServerEndPointUrls = discUrl;
-            }
+        if (isPassive == NULL) {
+            isPassive = celix_properties_get(topicProperties, PUBSUB_TCP_PASSIVE_CONFIGURED, NULL);

Review comment:
       please use celix_properties_getAsBoolean instead. This convert the string value already to a boolean




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [celix] rbulter commented on a change in pull request #279: Feature/tcp admin msg segmentation

Posted by GitBox <gi...@apache.org>.
rbulter commented on a change in pull request #279:
URL: https://github.com/apache/celix/pull/279#discussion_r534422059



##########
File path: bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
##########
@@ -144,119 +146,108 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
     if (uuid != NULL) {
         uuid_parse(uuid, sender->fwUUID);
     }
-    sender->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_TCP_METRICS_ENABLED,
-                                                                   PSA_TCP_DEFAULT_METRICS_ENABLED);
-    bool isEndpoint = false;
+    pubsubInterceptorsHandler_create(ctx, scope, topic, &sender->interceptorsHandler);
+    sender->isPassive = false;
+    sender->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_TCP_METRICS_ENABLED, PSA_TCP_DEFAULT_METRICS_ENABLED);
     char *urls = NULL;
     const char *ip = celix_bundleContext_getProperty(ctx, PUBSUB_TCP_PSA_IP_KEY, NULL);
-    const char *discUrl = NULL;
-    const char *staticClientEndPointUrls = NULL;
-    const char *staticServerEndPointUrls = NULL;
-
-    discUrl = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_STATIC_BIND_URL_FOR, topic, scope);
+    const char *discUrl = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_STATIC_BIND_URL_FOR, topic, scope);
+    const char *isPassive = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_PASSIVE_ENABLED, topic, scope);
+    const char *passiveKey = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_PASSIVE_SELECTION_KEY, topic, scope);
 
     if (topicProperties != NULL) {
         if (discUrl == NULL) {
             discUrl = celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_DISCOVER_URL, NULL);
         }
-        /* Check if it's a static endpoint */
-        const char *endPointType = celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_ENDPOINT_TYPE, NULL);
-        if (endPointType != NULL) {
-            isEndpoint = true;
-            if (strncmp(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_CLIENT, endPointType,
-                        strlen(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_CLIENT)) == 0) {
-                staticClientEndPointUrls = celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_CONNECT_URLS, NULL);
-            }
-            if (strncmp(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_SERVER, endPointType,
-                        strlen(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_SERVER)) == 0) {
-                staticServerEndPointUrls = discUrl;
-            }
+        if (isPassive == NULL) {
+            isPassive = celix_properties_get(topicProperties, PUBSUB_TCP_PASSIVE_CONFIGURED, NULL);
+        }
+        if (passiveKey == NULL) {
+            passiveKey = celix_properties_get(topicProperties, PUBSUB_TCP_PASSIVE_KEY, NULL);
         }
     }
+    sender->isPassive = psa_tcp_isPassive(isPassive);
 
     /* When it's an endpoint share the socket with the receiver */
-    if ((staticClientEndPointUrls != NULL) || (staticServerEndPointUrls)) {
-        celixThreadMutex_lock(&endPointStore->mutex);
-        const char *endPointUrl = (staticClientEndPointUrls) ? staticClientEndPointUrls : staticServerEndPointUrls;
-        pubsub_tcpHandler_t *entry = hashMap_get(endPointStore->map, endPointUrl);
+    if (passiveKey != NULL) {
+        celixThreadMutex_lock(&handlerStore->mutex);
+        pubsub_tcpHandler_t *entry = hashMap_get(handlerStore->map, passiveKey);
         if (entry == NULL) {
             if (sender->socketHandler == NULL)
                 sender->socketHandler = pubsub_tcpHandler_create(sender->protocol, sender->logHelper);
             entry = sender->socketHandler;
             sender->sharedSocketHandler = sender->socketHandler;
-            hashMap_put(endPointStore->map, (void *) endPointUrl, entry);
+            hashMap_put(handlerStore->map, (void *) passiveKey, entry);
         } else {
             sender->socketHandler = entry;
             sender->sharedSocketHandler = entry;
         }
-        celixThreadMutex_unlock(&endPointStore->mutex);
+        celixThreadMutex_unlock(&handlerStore->mutex);
     } else {
         sender->socketHandler = pubsub_tcpHandler_create(sender->protocol, sender->logHelper);
     }
 
     if ((sender->socketHandler != NULL) && (topicProperties != NULL)) {
         long prio = celix_properties_getAsLong(topicProperties, PUBSUB_TCP_THREAD_REALTIME_PRIO, -1L);
         const char *sched = celix_properties_get(topicProperties, PUBSUB_TCP_THREAD_REALTIME_SCHED, NULL);
-        long retryCnt = celix_properties_getAsLong(topicProperties, PUBSUB_TCP_PUBLISHER_RETRY_CNT_KEY,
-                                                   PUBSUB_TCP_PUBLISHER_RETRY_CNT_DEFAULT);
-        double timeout = celix_properties_getAsDouble(topicProperties, PUBSUB_TCP_PUBLISHER_SNDTIMEO_KEY,
-                                                                       (!isEndpoint) ? PUBSUB_TCP_PUBLISHER_SNDTIMEO_DEFAULT :
-                                                                                       PUBSUB_TCP_PUBLISHER_SNDTIMEO_ENDPOINT_DEFAULT);
+        long retryCnt = celix_properties_getAsLong(topicProperties, PUBSUB_TCP_PUBLISHER_RETRY_CNT_KEY, PUBSUB_TCP_PUBLISHER_RETRY_CNT_DEFAULT);
+        double sendTimeout = celix_properties_getAsDouble(topicProperties, PUBSUB_TCP_PUBLISHER_SNDTIMEO_KEY, PUBSUB_TCP_PUBLISHER_SNDTIMEO_DEFAULT);
+        long maxMsgSize = celix_properties_getAsLong(topicProperties, PSA_TCP_MAX_MESSAGE_SIZE, PSA_TCP_DEFAULT_MAX_MESSAGE_SIZE);
+        long timeout = celix_bundleContext_getPropertyAsLong(ctx, PSA_TCP_TIMEOUT, PSA_TCP_DEFAULT_TIMEOUT);
+        sender->send_delay = celix_bundleContext_getPropertyAsLong(ctx,  PSA_TCP_SEND_DELAY, PSA_TCP_DEFAULT_SEND_DELAY);
         pubsub_tcpHandler_setThreadName(sender->socketHandler, topic, scope);
         pubsub_tcpHandler_setThreadPriority(sender->socketHandler, prio, sched);
         pubsub_tcpHandler_setSendRetryCnt(sender->socketHandler, (unsigned int) retryCnt);
-        pubsub_tcpHandler_setSendTimeOut(sender->socketHandler, timeout);
+        pubsub_tcpHandler_setSendTimeOut(sender->socketHandler, sendTimeout);
+        pubsub_tcpHandler_setMaxMsgSize(sender->socketHandler, (unsigned int) maxMsgSize);
+        pubsub_tcpHandler_enableReceiveEvent(sender->socketHandler, (passiveKey) ? true : false);

Review comment:
       When the passive connection is used a full duplex connection is setup.
   This means also EPOLLIN is enabled for the publisher




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [celix] pnoltes commented on a change in pull request #279: Feature/tcp admin msg segmentation

Posted by GitBox <gi...@apache.org>.
pnoltes commented on a change in pull request #279:
URL: https://github.com/apache/celix/pull/279#discussion_r545132329



##########
File path: bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
##########
@@ -559,31 +549,39 @@ processMsgForSubscriberEntry(pubsub_tcp_topic_receiver_t *receiver, psa_tcp_subs
             }
 
             if (status == CELIX_SUCCESS) {
-                hash_map_iterator_t iter = hashMapIterator_construct(entry->subscriberServices);
+                const char *msgType = msgSer->msgName;
+                uint32_t msgId = message->header.msgId;
+                celix_properties_t *metadata = message->metadata.metadata;
+                bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler, msgType, msgId, deSerializedMsg, &metadata);
                 bool release = true;
-                while (hashMapIterator_hasNext(&iter)) {
-                    pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter);
-                    svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deSerializedMsg, message->metadata.metadata,
-                                 &release);
-                    if (!release && hashMapIterator_hasNext(&iter)) {
-                        //receive function has taken ownership and still more receive function to come ..
-                        //deserialize again for new message
-                        status = msgSer->deserialize(msgSer->handle, &deSerializeBuffer, 1, &deSerializedMsg);
-                        if (status != CELIX_SUCCESS) {
-                            L_WARN("[PSA_TCP_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgSer->msgName,
-                                   receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
-                            break;
+                if (cont) {
+                    hash_map_iterator_t iter = hashMapIterator_construct(entry->subscriberServices);
+                    while (hashMapIterator_hasNext(&iter)) {
+                        pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter);
+                        svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deSerializedMsg, message->metadata.metadata, &release);
+                        pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgType, msgId, deSerializedMsg, metadata);

Review comment:
       The interceptor can be used monitor (pre/post) pubsub. 
   For example to observe the the recieve processing time by writting the difftime using a pre and postReceive interceptor call. 
   
   The interceptors do not need the actual subscriber service. 
   Although the wish to intercept the usage of a pubsub receive was also discussed, this is more adapter than interceptor functionality. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [celix] rbulter commented on a change in pull request #279: Feature/tcp admin msg segmentation

Posted by GitBox <gi...@apache.org>.
rbulter commented on a change in pull request #279:
URL: https://github.com/apache/celix/pull/279#discussion_r534419429



##########
File path: bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h
##########
@@ -102,21 +103,30 @@
  */
 #define PUBSUB_TCP_STATIC_CONNECT_URLS          "tcp.static.connect.urls"
 
+
 /**
- * Name of environment variable with space-separated list of ips/urls to connect to
- * e.g. PSA_TCP_STATIC_CONNECT_FOR_topic_scope="tcp://127.0.0.1:4444 tcp://127.0.0.2:4444"
+ * Defines if the publisher / subscriber is a passive endpoint and shares

Review comment:
       I have added more comments to explain the concept.
   It's the same concept that is used in the web socket.
   See also the tcp endpoint test
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [celix] codecov-io edited a comment on pull request #279: Feature/tcp admin msg segmentation

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #279:
URL: https://github.com/apache/celix/pull/279#issuecomment-704511759


   # [Codecov](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=h1) Report
   > Merging [#279](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=desc) (ce1a8b3) into [master](https://codecov.io/gh/apache/celix/commit/2546c643b98550a6fc22b5b4eb98ce772f955d79?el=desc) (2546c64) will **decrease** coverage by `0.00%`.
   > The diff coverage is `63.58%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/celix/pull/279/graphs/tree.svg?width=650&height=150&src=pr&token=JdsiThga8P)](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master     #279      +/-   ##
   ==========================================
   - Coverage   66.74%   66.73%   -0.01%     
   ==========================================
     Files         147      148       +1     
     Lines       29947    30029      +82     
   ==========================================
   + Hits        19987    20040      +53     
   - Misses       9960     9989      +29     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...es/pubsub/pubsub\_admin\_tcp/src/pubsub\_tcp\_common.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHVic3ViX3RjcF9jb21tb24uYw==) | `0.00% <0.00%> (ø)` | |
   | [...les/pubsub/pubsub\_admin\_tcp/src/pubsub\_tcp\_admin.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHVic3ViX3RjcF9hZG1pbi5j) | `48.24% <20.00%> (-0.76%)` | :arrow_down: |
   | [...b/pubsub\_admin\_tcp/src/pubsub\_tcp\_topic\_receiver.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHVic3ViX3RjcF90b3BpY19yZWNlaXZlci5j) | `63.47% <50.70%> (+0.20%)` | :arrow_up: |
   | [bundles/pubsub/pubsub\_utils/src/pubsub\_utils\_url.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX3V0aWxzL3NyYy9wdWJzdWJfdXRpbHNfdXJsLmM=) | `53.07% <54.54%> (ø)` | |
   | [...sub\_topology\_manager/src/pubsub\_topology\_manager.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX3RvcG9sb2d5X21hbmFnZXIvc3JjL3B1YnN1Yl90b3BvbG9neV9tYW5hZ2VyLmM=) | `53.12% <60.00%> (+0.20%)` | :arrow_up: |
   | [...s/pubsub/pubsub\_admin\_tcp/src/pubsub\_tcp\_handler.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHVic3ViX3RjcF9oYW5kbGVyLmM=) | `75.42% <67.34%> (+0.18%)` | :arrow_up: |
   | [...sub/pubsub\_admin\_tcp/src/pubsub\_tcp\_topic\_sender.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHVic3ViX3RjcF90b3BpY19zZW5kZXIuYw==) | `68.93% <84.05%> (+2.77%)` | :arrow_up: |
   | [...undles/pubsub/pubsub\_admin\_tcp/src/psa\_activator.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHNhX2FjdGl2YXRvci5j) | `100.00% <100.00%> (ø)` | |
   | [libs/framework/src/celix\_log.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-bGlicy9mcmFtZXdvcmsvc3JjL2NlbGl4X2xvZy5j) | `78.12% <0.00%> (-10.94%)` | :arrow_down: |
   | [libs/utils/src/hash\_map.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-bGlicy91dGlscy9zcmMvaGFzaF9tYXAuYw==) | `93.08% <0.00%> (-0.58%)` | :arrow_down: |
   | ... and [10 more](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=footer). Last update [2546c64...ce1a8b3](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [celix] Oipo commented on a change in pull request #279: Feature/tcp admin msg segmentation

Posted by GitBox <gi...@apache.org>.
Oipo commented on a change in pull request #279:
URL: https://github.com/apache/celix/pull/279#discussion_r479677238



##########
File path: bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
##########
@@ -348,40 +354,18 @@ pubsub_tcpHandler_createEntry(pubsub_tcpHandler_t *handle, int fd, char *url, ch
 static inline void
 pubsub_tcpHandler_freeEntry(psa_tcp_connection_entry_t *entry) {
     if (entry) {
-        if (entry->url) {
-            free(entry->url);
-            entry->url = NULL;
-        }
-        if (entry->interface_url) {
-            free(entry->interface_url);
-            entry->interface_url = NULL;
-        }
-        if (entry->fd >= 0) {
-            close(entry->fd);
-            entry->fd = -1;
-        }
-        if (entry->buffer) {
-            free(entry->buffer);
-            entry->buffer = NULL;
-            entry->bufferSize = 0;
-        }
-        if (entry->headerBuffer) {
-            free(entry->headerBuffer);
-            entry->headerBuffer = NULL;
-            entry->headerBufferSize = 0;
-        }
-
-        if (entry->footerBuffer) {
-            free(entry->footerBuffer);
-            entry->footerBuffer = NULL;
-        }
-
-        if (entry->metaBuffer) {
-            free(entry->metaBuffer);
-            entry->metaBuffer = NULL;
-            entry->metaBufferSize = 0;
-        }
-        entry->connected = false;
+        if (entry->url) free(entry->url);

Review comment:
       `free()` deals gracefully with NULL values




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [celix] pnoltes commented on pull request #279: Feature/tcp admin msg segmentation

Posted by GitBox <gi...@apache.org>.
pnoltes commented on pull request #279:
URL: https://github.com/apache/celix/pull/279#issuecomment-747479169


   Our IVV test where failing for the updated PubSub TCP Admin,  apparently this update does not work with wire protocol v1. 
   
   @rbulter Is it correct that the updated PubSub TCP Admin does not work with wire protocol v1?
   If not, please add a pubsub tcp admin test with wire protocol v1. 
   If correct, please update the tcp admin match , so that only wire protocol v2 is searched and supported. 
   
   Maybe add an pubsubEndpoint_matchPublisherWithRequestedProtocolAndSerializer function which is used by the 'normal' pubsubEndpoint_matchPublisher and can be called directly by the tcp admin match with a filled in requestedProtcol="envelope-v2" argument. And the same for pubsubEnd_matchSubscriber.
   This should enforce the tcp admin to use the correct wire protocol or fail in the match.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [celix] codecov-commenter commented on pull request #279: Feature/tcp admin msg segmentation

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on pull request #279:
URL: https://github.com/apache/celix/pull/279#issuecomment-704497725


   # [Codecov](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=h1) Report
   > Merging [#279](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=desc) into [master](https://codecov.io/gh/apache/celix/commit/20f794bd1dece8c986119553bc97205bee09cca8?el=desc) will **increase** coverage by `0.22%`.
   > The diff coverage is `64.01%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/celix/pull/279/graphs/tree.svg?width=650&height=150&src=pr&token=JdsiThga8P)](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master     #279      +/-   ##
   ==========================================
   + Coverage   68.30%   68.53%   +0.22%     
   ==========================================
     Files         136      137       +1     
     Lines       27371    27443      +72     
   ==========================================
   + Hits        18696    18808     +112     
   + Misses       8675     8635      -40     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...les/pubsub/pubsub\_admin\_tcp/src/pubsub\_tcp\_admin.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHVic3ViX3RjcF9hZG1pbi5j) | `48.47% <20.00%> (-0.54%)` | :arrow_down: |
   | [...es/pubsub/pubsub\_admin\_tcp/src/pubsub\_tcp\_common.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHVic3ViX3RjcF9jb21tb24uYw==) | `40.00% <40.00%> (ø)` | |
   | [bundles/pubsub/pubsub\_utils/src/pubsub\_utils\_url.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX3V0aWxzL3NyYy9wdWJzdWJfdXRpbHNfdXJsLmM=) | `51.75% <54.54%> (ø)` | |
   | [...b/pubsub\_admin\_tcp/src/pubsub\_tcp\_topic\_receiver.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHVic3ViX3RjcF90b3BpY19yZWNlaXZlci5j) | `61.60% <58.33%> (-1.66%)` | :arrow_down: |
   | [...s/pubsub/pubsub\_admin\_tcp/src/pubsub\_tcp\_handler.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHVic3ViX3RjcF9oYW5kbGVyLmM=) | `72.51% <65.51%> (-2.74%)` | :arrow_down: |
   | [...sub/pubsub\_admin\_tcp/src/pubsub\_tcp\_topic\_sender.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHVic3ViX3RjcF90b3BpY19zZW5kZXIuYw==) | `67.06% <80.95%> (+2.71%)` | :arrow_up: |
   | [...undles/pubsub/pubsub\_admin\_tcp/src/psa\_activator.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHNhX2FjdGl2YXRvci5j) | `100.00% <100.00%> (ø)` | |
   | [...sub\_topology\_manager/src/pubsub\_topology\_manager.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX3RvcG9sb2d5X21hbmFnZXIvc3JjL3B1YnN1Yl90b3BvbG9neV9tYW5hZ2VyLmM=) | `57.48% <100.00%> (+4.68%)` | :arrow_up: |
   | ... and [10 more](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=footer). Last update [20f794b...17c6f7c](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [celix] codecov-io edited a comment on pull request #279: Feature/tcp admin msg segmentation

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #279:
URL: https://github.com/apache/celix/pull/279#issuecomment-704511759


   # [Codecov](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=h1) Report
   > Merging [#279](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=desc) into [master](https://codecov.io/gh/apache/celix/commit/20f794bd1dece8c986119553bc97205bee09cca8?el=desc) will **increase** coverage by `0.43%`.
   > The diff coverage is `65.53%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/celix/pull/279/graphs/tree.svg?width=650&height=150&src=pr&token=JdsiThga8P)](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master     #279      +/-   ##
   ==========================================
   + Coverage   68.30%   68.74%   +0.43%     
   ==========================================
     Files         136      137       +1     
     Lines       27371    27460      +89     
   ==========================================
   + Hits        18697    18877     +180     
   + Misses       8674     8583      -91     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...les/pubsub/pubsub\_admin\_tcp/src/pubsub\_tcp\_admin.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHVic3ViX3RjcF9hZG1pbi5j) | `51.74% <20.00%> (+2.74%)` | :arrow_up: |
   | [...es/pubsub/pubsub\_admin\_tcp/src/pubsub\_tcp\_common.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHVic3ViX3RjcF9jb21tb24uYw==) | `40.00% <40.00%> (ø)` | |
   | [bundles/pubsub/pubsub\_utils/src/pubsub\_utils\_url.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX3V0aWxzL3NyYy9wdWJzdWJfdXRpbHNfdXJsLmM=) | `51.75% <54.54%> (ø)` | |
   | [...b/pubsub\_admin\_tcp/src/pubsub\_tcp\_topic\_receiver.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHVic3ViX3RjcF90b3BpY19yZWNlaXZlci5j) | `63.61% <58.33%> (+0.35%)` | :arrow_up: |
   | [...s/pubsub/pubsub\_admin\_tcp/src/pubsub\_tcp\_handler.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHVic3ViX3RjcF9oYW5kbGVyLmM=) | `75.28% <67.00%> (+0.04%)` | :arrow_up: |
   | [...sub/pubsub\_admin\_tcp/src/pubsub\_tcp\_topic\_sender.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHVic3ViX3RjcF90b3BpY19zZW5kZXIuYw==) | `67.06% <80.95%> (+2.71%)` | :arrow_up: |
   | [...undles/pubsub/pubsub\_admin\_tcp/src/psa\_activator.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHNhX2FjdGl2YXRvci5j) | `100.00% <100.00%> (ø)` | |
   | [...sub\_topology\_manager/src/pubsub\_topology\_manager.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX3RvcG9sb2d5X21hbmFnZXIvc3JjL3B1YnN1Yl90b3BvbG9neV9tYW5hZ2VyLmM=) | `58.24% <100.00%> (+5.44%)` | :arrow_up: |
   | [libs/utils/src/hash\_map.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-bGlicy91dGlscy9zcmMvaGFzaF9tYXAuYw==) | `93.94% <0.00%> (-0.29%)` | :arrow_down: |
   | ... and [12 more](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=footer). Last update [20f794b...0492d9f](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [celix] rbulter commented on a change in pull request #279: Feature/tcp admin msg segmentation

Posted by GitBox <gi...@apache.org>.
rbulter commented on a change in pull request #279:
URL: https://github.com/apache/celix/pull/279#discussion_r544349458



##########
File path: bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
##########
@@ -343,18 +335,25 @@ const char *pubsub_tcpTopicSender_topic(pubsub_tcp_topic_sender_t *sender) {
 }
 
 const char *pubsub_tcpTopicSender_url(pubsub_tcp_topic_sender_t *sender) {
-    return sender->url;
+    if (sender->isPassive) {
+        return pubsub_tcpHandler_get_connection_url(sender->socketHandler);
+    } else {
+        return sender->url;
+    }
 }
-
 bool pubsub_tcpTopicSender_isStatic(pubsub_tcp_topic_sender_t *sender) {
     return sender->isStatic;
 }
 
-void pubsub_tcpTopicSender_connectTo(pubsub_tcp_topic_sender_t *sender, const celix_properties_t *endpoint) {
+bool pubsub_tcpTopicSender_isPassive(pubsub_tcp_topic_sender_t *sender) {
+    return sender->isPassive;
+}
+
+void pubsub_tcpTopicSender_connectTo(pubsub_tcp_topic_sender_t *sender __attribute__((unused)), const celix_properties_t *endpoint __attribute__((unused))) {

Review comment:
       This also applies to ZMQ (v1/v2) please fix this aswell




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [celix] Oipo commented on a change in pull request #279: Feature/tcp admin msg segmentation

Posted by GitBox <gi...@apache.org>.
Oipo commented on a change in pull request #279:
URL: https://github.com/apache/celix/pull/279#discussion_r544260711



##########
File path: bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
##########
@@ -343,18 +335,25 @@ const char *pubsub_tcpTopicSender_topic(pubsub_tcp_topic_sender_t *sender) {
 }
 
 const char *pubsub_tcpTopicSender_url(pubsub_tcp_topic_sender_t *sender) {
-    return sender->url;
+    if (sender->isPassive) {
+        return pubsub_tcpHandler_get_connection_url(sender->socketHandler);
+    } else {
+        return sender->url;
+    }
 }
-
 bool pubsub_tcpTopicSender_isStatic(pubsub_tcp_topic_sender_t *sender) {
     return sender->isStatic;
 }
 
-void pubsub_tcpTopicSender_connectTo(pubsub_tcp_topic_sender_t *sender, const celix_properties_t *endpoint) {
+bool pubsub_tcpTopicSender_isPassive(pubsub_tcp_topic_sender_t *sender) {
+    return sender->isPassive;
+}
+
+void pubsub_tcpTopicSender_connectTo(pubsub_tcp_topic_sender_t *sender __attribute__((unused)), const celix_properties_t *endpoint __attribute__((unused))) {

Review comment:
       Please remove unused function.

##########
File path: bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
##########
@@ -343,18 +335,25 @@ const char *pubsub_tcpTopicSender_topic(pubsub_tcp_topic_sender_t *sender) {
 }
 
 const char *pubsub_tcpTopicSender_url(pubsub_tcp_topic_sender_t *sender) {
-    return sender->url;
+    if (sender->isPassive) {
+        return pubsub_tcpHandler_get_connection_url(sender->socketHandler);
+    } else {
+        return sender->url;
+    }
 }
-
 bool pubsub_tcpTopicSender_isStatic(pubsub_tcp_topic_sender_t *sender) {
     return sender->isStatic;
 }
 
-void pubsub_tcpTopicSender_connectTo(pubsub_tcp_topic_sender_t *sender, const celix_properties_t *endpoint) {
+bool pubsub_tcpTopicSender_isPassive(pubsub_tcp_topic_sender_t *sender) {
+    return sender->isPassive;
+}
+
+void pubsub_tcpTopicSender_connectTo(pubsub_tcp_topic_sender_t *sender __attribute__((unused)), const celix_properties_t *endpoint __attribute__((unused))) {
     //TODO subscriber count -> topic info
 }
 
-void pubsub_tcpTopicSender_disconnectFrom(pubsub_tcp_topic_sender_t *sender, const celix_properties_t *endpoint) {
+void pubsub_tcpTopicSender_disconnectFrom(pubsub_tcp_topic_sender_t *sender __attribute__((unused)), const celix_properties_t *endpoint __attribute__((unused))) {

Review comment:
       Please remove unused function.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [celix] codecov-io edited a comment on pull request #279: Feature/tcp admin msg segmentation

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #279:
URL: https://github.com/apache/celix/pull/279#issuecomment-704511759


   # [Codecov](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=h1) Report
   > Merging [#279](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=desc) into [master](https://codecov.io/gh/apache/celix/commit/20f794bd1dece8c986119553bc97205bee09cca8?el=desc) will **increase** coverage by `0.11%`.
   > The diff coverage is `64.41%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/celix/pull/279/graphs/tree.svg?width=650&height=150&src=pr&token=JdsiThga8P)](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master     #279      +/-   ##
   ==========================================
   + Coverage   68.30%   68.42%   +0.11%     
   ==========================================
     Files         136      137       +1     
     Lines       27371    27444      +73     
   ==========================================
   + Hits        18697    18779      +82     
   + Misses       8674     8665       -9     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...les/pubsub/pubsub\_admin\_tcp/src/pubsub\_tcp\_admin.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHVic3ViX3RjcF9hZG1pbi5j) | `48.47% <20.00%> (-0.54%)` | :arrow_down: |
   | [...es/pubsub/pubsub\_admin\_tcp/src/pubsub\_tcp\_common.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHVic3ViX3RjcF9jb21tb24uYw==) | `40.00% <40.00%> (ø)` | |
   | [bundles/pubsub/pubsub\_utils/src/pubsub\_utils\_url.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX3V0aWxzL3NyYy9wdWJzdWJfdXRpbHNfdXJsLmM=) | `51.75% <54.54%> (ø)` | |
   | [...b/pubsub\_admin\_tcp/src/pubsub\_tcp\_topic\_receiver.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHVic3ViX3RjcF90b3BpY19yZWNlaXZlci5j) | `61.60% <58.33%> (-1.66%)` | :arrow_down: |
   | [...s/pubsub/pubsub\_admin\_tcp/src/pubsub\_tcp\_handler.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHVic3ViX3RjcF9oYW5kbGVyLmM=) | `72.60% <65.76%> (-2.64%)` | :arrow_down: |
   | [...sub/pubsub\_admin\_tcp/src/pubsub\_tcp\_topic\_sender.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHVic3ViX3RjcF90b3BpY19zZW5kZXIuYw==) | `67.06% <80.95%> (+2.71%)` | :arrow_up: |
   | [...undles/pubsub/pubsub\_admin\_tcp/src/psa\_activator.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHNhX2FjdGl2YXRvci5j) | `100.00% <100.00%> (ø)` | |
   | [...sub\_topology\_manager/src/pubsub\_topology\_manager.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX3RvcG9sb2d5X21hbmFnZXIvc3JjL3B1YnN1Yl90b3BvbG9neV9tYW5hZ2VyLmM=) | `56.10% <100.00%> (+3.30%)` | :arrow_up: |
   | ... and [13 more](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=footer). Last update [20f794b...7ef5e0e](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [celix] codecov-io edited a comment on pull request #279: Feature/tcp admin msg segmentation

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #279:
URL: https://github.com/apache/celix/pull/279#issuecomment-704511759


   # [Codecov](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=h1) Report
   > Merging [#279](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=desc) (21af91b) into [master](https://codecov.io/gh/apache/celix/commit/2546c643b98550a6fc22b5b4eb98ce772f955d79?el=desc) (2546c64) will **increase** coverage by `0.21%`.
   > The diff coverage is `65.34%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/celix/pull/279/graphs/tree.svg?width=650&height=150&src=pr&token=JdsiThga8P)](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master     #279      +/-   ##
   ==========================================
   + Coverage   66.74%   66.95%   +0.21%     
   ==========================================
     Files         147      148       +1     
     Lines       29947    30041      +94     
   ==========================================
   + Hits        19987    20115     +128     
   + Misses       9960     9926      -34     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...es/pubsub/pubsub\_admin\_tcp/src/pubsub\_tcp\_common.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHVic3ViX3RjcF9jb21tb24uYw==) | `0.00% <0.00%> (ø)` | |
   | [...les/pubsub/pubsub\_admin\_tcp/src/pubsub\_tcp\_admin.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHVic3ViX3RjcF9hZG1pbi5j) | `51.52% <20.00%> (+2.52%)` | :arrow_up: |
   | [...b/pubsub\_admin\_tcp/src/pubsub\_tcp\_topic\_receiver.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHVic3ViX3RjcF90b3BpY19yZWNlaXZlci5j) | `63.47% <53.84%> (+0.20%)` | :arrow_up: |
   | [bundles/pubsub/pubsub\_utils/src/pubsub\_utils\_url.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX3V0aWxzL3NyYy9wdWJzdWJfdXRpbHNfdXJsLmM=) | `53.07% <54.54%> (ø)` | |
   | [...s/pubsub/pubsub\_admin\_tcp/src/pubsub\_tcp\_handler.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHVic3ViX3RjcF9oYW5kbGVyLmM=) | `74.96% <68.13%> (-0.28%)` | :arrow_down: |
   | [...sub\_topology\_manager/src/pubsub\_topology\_manager.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX3RvcG9sb2d5X21hbmFnZXIvc3JjL3B1YnN1Yl90b3BvbG9neV9tYW5hZ2VyLmM=) | `56.29% <80.00%> (+3.38%)` | :arrow_up: |
   | [...sub/pubsub\_admin\_tcp/src/pubsub\_tcp\_topic\_sender.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHVic3ViX3RjcF90b3BpY19zZW5kZXIuYw==) | `68.93% <86.56%> (+2.77%)` | :arrow_up: |
   | [...undles/pubsub/pubsub\_admin\_tcp/src/psa\_activator.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHNhX2FjdGl2YXRvci5j) | `100.00% <100.00%> (ø)` | |
   | [.../pubsub/pubsub\_admin\_zmq/v1/src/pubsub\_zmq\_admin.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3ptcS92MS9zcmMvcHVic3ViX3ptcV9hZG1pbi5j) | `50.40% <0.00%> (ø)` | |
   | ... and [10 more](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=footer). Last update [2546c64...21af91b](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [celix] rbulter commented on a change in pull request #279: Feature/tcp admin msg segmentation

Posted by GitBox <gi...@apache.org>.
rbulter commented on a change in pull request #279:
URL: https://github.com/apache/celix/pull/279#discussion_r534422767



##########
File path: bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
##########
@@ -617,8 +614,8 @@ static void delay_first_send_for_late_joiners(pubsub_tcp_topic_sender_t *sender)
     static bool firstSend = true;
 
     if (firstSend) {
-        L_INFO("PSA_TCP_TP: Delaying first send for late joiners...\n");
-        sleep(FIRST_SEND_DELAY_IN_SECONDS);
+        if (sender->send_delay ) L_INFO("PSA_TCP_TP: Delaying first send for late joiners...\n");

Review comment:
       Solved




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [celix] rbulter commented on a change in pull request #279: Feature/tcp admin msg segmentation

Posted by GitBox <gi...@apache.org>.
rbulter commented on a change in pull request #279:
URL: https://github.com/apache/celix/pull/279#discussion_r534424768



##########
File path: bundles/pubsub/test/CMakeLists.txt
##########
@@ -47,7 +47,7 @@ celix_bundle_files(pubsub_endpoint_tst
         DESTINATION "META-INF/descriptors"
         )
 celix_bundle_files(pubsub_endpoint_tst
-        meta_data/ping2.properties
+        meta_data/ping3.properties

Review comment:
       I have changed something in the test, to make more readable.
   ping2/pong2 ping3/pong3 instead of ping2/pong2 pong2/ping2
   To be able to test endpoints (passive) I need this test.
   With one publisher and one subscriber I cannot test the full duplex endpoints




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [celix] rbulter commented on a change in pull request #279: Feature/tcp admin msg segmentation

Posted by GitBox <gi...@apache.org>.
rbulter commented on a change in pull request #279:
URL: https://github.com/apache/celix/pull/279#discussion_r544359961



##########
File path: bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
##########
@@ -31,7 +31,8 @@
 #include "pubsub/subscriber.h"
 
 #define PUBSUB_TOPOLOGY_MANAGER_VERBOSE_KEY         "PUBSUB_TOPOLOGY_MANAGER_VERBOSE"
-#define PUBSUB_TOPOLOGY_MANAGER_HANDLING_THREAD_SLEEPTIME_SECONDS_KEY         "PUBSUB_TOPOLOGY_MANAGER_HANDLING_THREAD_SLEEPTIME_SECONDS"

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [celix] Oipo commented on a change in pull request #279: Feature/tcp admin msg segmentation

Posted by GitBox <gi...@apache.org>.
Oipo commented on a change in pull request #279:
URL: https://github.com/apache/celix/pull/279#discussion_r544261779



##########
File path: bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
##########
@@ -39,7 +39,7 @@
 #include "pubsub_admin.h"
 #include "../../pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.h"
 
-#define PSTM_PSA_HANDLING_DEFAULT_SLEEPTIME_IN_SECONDS       30L
+#define PSTM_PSA_HANDLING_DEFAULT_SLEEPTIME       250 // 250 msecond

Review comment:
       Please put previous define back.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [celix] codecov-io edited a comment on pull request #279: Feature/tcp admin msg segmentation

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #279:
URL: https://github.com/apache/celix/pull/279#issuecomment-704511759


   # [Codecov](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=h1) Report
   > Merging [#279](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=desc) into [master](https://codecov.io/gh/apache/celix/commit/20f794bd1dece8c986119553bc97205bee09cca8?el=desc) will **increase** coverage by `0.30%`.
   > The diff coverage is `64.01%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/celix/pull/279/graphs/tree.svg?width=650&height=150&src=pr&token=JdsiThga8P)](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master     #279      +/-   ##
   ==========================================
   + Coverage   68.30%   68.61%   +0.30%     
   ==========================================
     Files         136      137       +1     
     Lines       27371    27443      +72     
   ==========================================
   + Hits        18697    18830     +133     
   + Misses       8674     8613      -61     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...les/pubsub/pubsub\_admin\_tcp/src/pubsub\_tcp\_admin.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHVic3ViX3RjcF9hZG1pbi5j) | `51.74% <20.00%> (+2.74%)` | :arrow_up: |
   | [...es/pubsub/pubsub\_admin\_tcp/src/pubsub\_tcp\_common.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHVic3ViX3RjcF9jb21tb24uYw==) | `40.00% <40.00%> (ø)` | |
   | [bundles/pubsub/pubsub\_utils/src/pubsub\_utils\_url.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX3V0aWxzL3NyYy9wdWJzdWJfdXRpbHNfdXJsLmM=) | `51.75% <54.54%> (ø)` | |
   | [...b/pubsub\_admin\_tcp/src/pubsub\_tcp\_topic\_receiver.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHVic3ViX3RjcF90b3BpY19yZWNlaXZlci5j) | `61.60% <58.33%> (-1.66%)` | :arrow_down: |
   | [...s/pubsub/pubsub\_admin\_tcp/src/pubsub\_tcp\_handler.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHVic3ViX3RjcF9oYW5kbGVyLmM=) | `75.13% <65.51%> (-0.12%)` | :arrow_down: |
   | [...sub/pubsub\_admin\_tcp/src/pubsub\_tcp\_topic\_sender.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHVic3ViX3RjcF90b3BpY19zZW5kZXIuYw==) | `67.06% <80.95%> (+2.71%)` | :arrow_up: |
   | [...undles/pubsub/pubsub\_admin\_tcp/src/psa\_activator.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHNhX2FjdGl2YXRvci5j) | `100.00% <100.00%> (ø)` | |
   | [...sub\_topology\_manager/src/pubsub\_topology\_manager.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX3RvcG9sb2d5X21hbmFnZXIvc3JjL3B1YnN1Yl90b3BvbG9neV9tYW5hZ2VyLmM=) | `57.37% <100.00%> (+4.57%)` | :arrow_up: |
   | [libs/utils/src/hash\_map.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-bGlicy91dGlscy9zcmMvaGFzaF9tYXAuYw==) | `93.37% <0.00%> (-0.87%)` | :arrow_down: |
   | ... and [11 more](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=footer). Last update [20f794b...3b95016](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [celix] codecov-io edited a comment on pull request #279: Feature/tcp admin msg segmentation

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #279:
URL: https://github.com/apache/celix/pull/279#issuecomment-704511759


   # [Codecov](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=h1) Report
   > Merging [#279](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=desc) (f544169) into [master](https://codecov.io/gh/apache/celix/commit/2546c643b98550a6fc22b5b4eb98ce772f955d79?el=desc) (2546c64) will **increase** coverage by `0.20%`.
   > The diff coverage is `64.66%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/celix/pull/279/graphs/tree.svg?width=650&height=150&src=pr&token=JdsiThga8P)](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master     #279      +/-   ##
   ==========================================
   + Coverage   66.74%   66.94%   +0.20%     
   ==========================================
     Files         147      148       +1     
     Lines       29947    30041      +94     
   ==========================================
   + Hits        19987    20111     +124     
   + Misses       9960     9930      -30     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...es/pubsub/pubsub\_admin\_tcp/src/pubsub\_tcp\_common.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHVic3ViX3RjcF9jb21tb24uYw==) | `0.00% <0.00%> (ø)` | |
   | [...les/pubsub/pubsub\_admin\_tcp/src/pubsub\_tcp\_admin.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHVic3ViX3RjcF9hZG1pbi5j) | `48.47% <20.00%> (-0.54%)` | :arrow_down: |
   | [...b/pubsub\_admin\_tcp/src/pubsub\_tcp\_topic\_receiver.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHVic3ViX3RjcF90b3BpY19yZWNlaXZlci5j) | `63.47% <50.00%> (+0.20%)` | :arrow_up: |
   | [bundles/pubsub/pubsub\_utils/src/pubsub\_utils\_url.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX3V0aWxzL3NyYy9wdWJzdWJfdXRpbHNfdXJsLmM=) | `53.07% <54.54%> (ø)` | |
   | [...s/pubsub/pubsub\_admin\_tcp/src/pubsub\_tcp\_handler.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHVic3ViX3RjcF9oYW5kbGVyLmM=) | `74.96% <68.13%> (-0.28%)` | :arrow_down: |
   | [...sub\_topology\_manager/src/pubsub\_topology\_manager.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX3RvcG9sb2d5X21hbmFnZXIvc3JjL3B1YnN1Yl90b3BvbG9neV9tYW5hZ2VyLmM=) | `56.29% <80.00%> (+3.38%)` | :arrow_up: |
   | [...sub/pubsub\_admin\_tcp/src/pubsub\_tcp\_topic\_sender.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHVic3ViX3RjcF90b3BpY19zZW5kZXIuYw==) | `68.93% <86.56%> (+2.77%)` | :arrow_up: |
   | [...undles/pubsub/pubsub\_admin\_tcp/src/psa\_activator.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHNhX2FjdGl2YXRvci5j) | `100.00% <100.00%> (ø)` | |
   | [libs/utils/src/hash\_map.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-bGlicy91dGlscy9zcmMvaGFzaF9tYXAuYw==) | `93.37% <0.00%> (-0.29%)` | :arrow_down: |
   | ... and [9 more](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=footer). Last update [2546c64...f544169](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [celix] codecov-io commented on pull request #279: Feature/tcp admin msg segmentation

Posted by GitBox <gi...@apache.org>.
codecov-io commented on pull request #279:
URL: https://github.com/apache/celix/pull/279#issuecomment-704511759


   # [Codecov](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=h1) Report
   > :exclamation: No coverage uploaded for pull request base (`master@20f794b`). [Click here to learn what that means](https://docs.codecov.io/docs/error-reference#section-missing-base-commit).
   > The diff coverage is `64.01%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/celix/pull/279/graphs/tree.svg?width=650&height=150&src=pr&token=JdsiThga8P)](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff            @@
   ##             master     #279   +/-   ##
   =========================================
     Coverage          ?   68.57%           
   =========================================
     Files             ?      137           
     Lines             ?    27443           
     Branches          ?        0           
   =========================================
     Hits              ?    18819           
     Misses            ?     8624           
     Partials          ?        0           
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...les/pubsub/pubsub\_admin\_tcp/src/pubsub\_tcp\_admin.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHVic3ViX3RjcF9hZG1pbi5j) | `51.74% <20.00%> (ø)` | |
   | [...es/pubsub/pubsub\_admin\_tcp/src/pubsub\_tcp\_common.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHVic3ViX3RjcF9jb21tb24uYw==) | `40.00% <40.00%> (ø)` | |
   | [bundles/pubsub/pubsub\_utils/src/pubsub\_utils\_url.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX3V0aWxzL3NyYy9wdWJzdWJfdXRpbHNfdXJsLmM=) | `51.75% <54.54%> (ø)` | |
   | [...b/pubsub\_admin\_tcp/src/pubsub\_tcp\_topic\_receiver.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHVic3ViX3RjcF90b3BpY19yZWNlaXZlci5j) | `61.60% <58.33%> (ø)` | |
   | [...s/pubsub/pubsub\_admin\_tcp/src/pubsub\_tcp\_handler.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHVic3ViX3RjcF9oYW5kbGVyLmM=) | `75.13% <65.51%> (ø)` | |
   | [...sub/pubsub\_admin\_tcp/src/pubsub\_tcp\_topic\_sender.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHVic3ViX3RjcF90b3BpY19zZW5kZXIuYw==) | `67.06% <80.95%> (ø)` | |
   | [...undles/pubsub/pubsub\_admin\_tcp/src/psa\_activator.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHNhX2FjdGl2YXRvci5j) | `100.00% <100.00%> (ø)` | |
   | [...sub\_topology\_manager/src/pubsub\_topology\_manager.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX3RvcG9sb2d5X21hbmFnZXIvc3JjL3B1YnN1Yl90b3BvbG9neV9tYW5hZ2VyLmM=) | `57.25% <100.00%> (ø)` | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=footer). Last update [20f794b...baeda66](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [celix] Oipo commented on a change in pull request #279: Feature/tcp admin msg segmentation

Posted by GitBox <gi...@apache.org>.
Oipo commented on a change in pull request #279:
URL: https://github.com/apache/celix/pull/279#discussion_r522991186



##########
File path: bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_common.c
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <stdio.h>
+#include <string.h>
+#include "pubsub_tcp_common.h"
+
+
+bool psa_tcp_isPassive(const char* buffer) {
+    bool isPassive = false;
+    // Parse Properties
+    if (buffer != NULL) {
+        char buf[32];
+        snprintf(buf, 32, "%s", buffer);
+        char *trimmed = utils_stringTrim(buf);
+        if (strncasecmp("true", trimmed, strlen("true")) == 0) {
+            isPassive = true;
+        } else if (strncasecmp("false", trimmed, strlen("false")) == 0) {

Review comment:
       This else if branch is not needed, since `isPassive` is false by default.

##########
File path: bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h
##########
@@ -23,19 +23,20 @@
 #define PSA_TCP_BASE_PORT                       "PSA_TCP_BASE_PORT"
 #define PSA_TCP_MAX_PORT                        "PSA_TCP_MAX_PORT"
 
-#define PSA_TCP_MAX_RECV_SESSIONS               "PSA_TCP_MAX_RECV_SESSIONS"
+#define PSA_TCP_MAX_MESSAGE_SIZE                "PSA_TCP_MAX_MESSAGE_SIZE"
 #define PSA_TCP_RECV_BUFFER_SIZE                "PSA_TCP_RECV_BUFFER_SIZE"
 #define PSA_TCP_TIMEOUT                         "PSA_TCP_TIMEOUT"
 #define PSA_TCP_SUBSCRIBER_CONNECTION_TIMEOUT   "PSA_TCP_SUBSCRIBER_CONNECTION_TIMEOUT"
+#define PSA_TCP_SEND_DELAY                      "PSA_TCP_SEND_DELAY"

Review comment:
       Please rename to `PSA_FIRST_SEND_DELAY` and move to pubsub_utils, as all pubsub admins can use it.

##########
File path: bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
##########
@@ -752,35 +747,116 @@ void pubsub_tcpHandler_setReceiveTimeOut(pubsub_tcpHandler_t *handle, double tim
     }
 }
 
-static inline
-int pubsub_tcpHandler_readSocket(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, int fd, void* _buffer, unsigned int offset, unsigned int size, int flag ) {
-    int expectedReadSize = size;
-    int nbytes = size;
-    int msgSize = 0;
-    char* buffer = (char*)_buffer;
-    while (nbytes > 0 && expectedReadSize > 0) {
-        // Read the message header
-        nbytes = recv(fd, &buffer[offset], expectedReadSize, flag | MSG_NOSIGNAL);
-        // Update buffer administration
-        offset += nbytes;
-        expectedReadSize -= nbytes;
-        msgSize += nbytes;
+void pubsub_tcpHandler_enableReceiveEvent(pubsub_tcpHandler_t *handle,bool enable) {
+    if (handle != NULL) {
+        celixThreadRwlock_writeLock(&handle->dbLock);
+        handle->enableReceiveEvent = enable;
+        celixThreadRwlock_unlock(&handle->dbLock);
+    }
+}
+
+static inline long int pubsub_tcpHandler_getMsgSize(psa_tcp_connection_entry_t *entry) {
+    // Note header message is already read
+    return (long int)entry->header.header.payloadPartSize + (long int)entry->header.header.metadataSize + (long int)entry->readFooterSize;
+}
+
+static inline 
+bool pubsub_tcpHandler_readHeader(pubsub_tcpHandler_t *handle, int fd, psa_tcp_connection_entry_t *entry, long int* msgSize) {
+    bool result = false;
+    size_t syncSize = 0;
+    size_t protocolHeaderBufferSize = 0;
+    // Get Sync Size
+    handle->protocol->getSyncHeaderSize(handle->protocol->handle, &syncSize);
+    // Get HeaderSize of the Protocol Header
+    handle->protocol->getHeaderSize(handle->protocol->handle, &entry->readHeaderSize);
+    // Get HeaderBufferSize of the Protocol Header, when headerBufferSize == 0, the protocol header is included in the payload (needed for endpoints)
+    handle->protocol->getHeaderBufferSize(handle->protocol->handle, &protocolHeaderBufferSize);
+
+    // Ensure capacity in header buffer
+    pubsub_tcpHandler_ensureReadBufferCapacity(handle, entry);
+
+    entry->readMsg.msg_iovlen = 0;
+    entry->readMsg.msg_iov[entry->readMsg.msg_iovlen].iov_base = entry->readHeaderBuffer;
+    entry->readMsg.msg_iov[entry->readMsg.msg_iovlen].iov_len  = entry->readHeaderBufferSize;
+    entry->readMsg.msg_iovlen++;
+
+    // Read the message
+    long int nbytes = 0;
+    // Use peek flag to find sync word or when header is part of the payload
+    unsigned int flag = (entry->headerError || (!protocolHeaderBufferSize)) ? MSG_PEEK : 0;
+    if (entry->readHeaderSize) nbytes = recvmsg(fd, &(entry->readMsg), MSG_NOSIGNAL | MSG_WAITALL | flag);
+    if (nbytes >= entry->readHeaderSize) {
+        if (handle->protocol->decodeHeader(handle->protocol->handle,
+                                           entry->readMsg.msg_iov[0].iov_base,
+                                           entry->readMsg.msg_iov[0].iov_len,
+                                           &entry->header) == CELIX_SUCCESS) {
+            // read header from queue, when recovered from headerError and when header is not part of the payload. (Because of MSG_PEEK)
+            if (entry->headerError && protocolHeaderBufferSize && entry->readHeaderSize) nbytes = recvmsg(fd, &(entry->readMsg), MSG_NOSIGNAL | MSG_WAITALL);

Review comment:
       Please add curly braces here, as it is confusing which line the if affects.

##########
File path: bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
##########
@@ -78,18 +74,27 @@ typedef struct psa_tcp_connection_entry {
     bool connected;
     bool headerError;
     pubsub_protocol_message_t header;
-    unsigned int syncSize;
-    unsigned int headerSize;
-    unsigned int headerBufferSize; // Size of headerBuffer, size = 0, no headerBuffer -> included in payload
-    void *headerBuffer;
-    unsigned int footerSize;
-    void *footerBuffer;
-    unsigned int bufferSize;
+    size_t maxMsgSize;
+    size_t readHeaderSize;
+    size_t readHeaderBufferSize; // Size of headerBuffer
+    void *readHeaderBuffer;
+    size_t writeHeaderBufferSize; // Size of headerBuffer
+    void *writeHeaderBuffer;
+    size_t readFooterSize;
+    size_t readFooterBufferSize;
+    void *readFooterBuffer;
+    size_t writeFooterBufferSize;
+    void *writeFooterBuffer;
+    size_t bufferSize;
     void *buffer;
-    unsigned int bufferReadSize;
-    unsigned int metaBufferSize;
-    void *metaBuffer;
+    size_t readMetaBufferSize;
+    void *readMetaBuffer;
+    size_t writeMetaBufferSize;
+    void *writeMetaBuffer;
     unsigned int retryCount;
+    celix_thread_mutex_t writeMutex;
+    celix_thread_mutex_t readMutex;

Review comment:
       The readMutex is only used in `pubsub_tcpHandler_read`, meaning it is never used concurrently. Please remove it.

##########
File path: bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
##########
@@ -1034,130 +1050,176 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
             void *metadataData = NULL;
             size_t metadataSize = 0;
             if (message->metadata.metadata) {
-                metadataData = entry->metaBuffer;
-                handle->protocol->encodeMetadata(handle->protocol->handle, message,
-                                                 &metadataData,
-                                                 &metadataSize);
-                entry->metaBufferSize = metadataSize;
+                metadataSize = entry->writeMetaBufferSize;
+                metadataData = entry->writeMetaBuffer;
+                // When maxMsgSize is smaller then meta data is disabled
+                if (metadataSize > entry->maxMsgSize) {
+                    metadataSize = 0;
+                }
+                handle->protocol->encodeMetadata(handle->protocol->handle, message, &metadataData, &metadataSize);
             }
+
             message->header.metadataSize = metadataSize;
+            size_t totalMsgSize = payloadSize + metadataSize;
+
+            size_t sendMsgSize = 0;
+            size_t msgPayloadOffset = 0;
+            size_t msgIovOffset     = 0;
+            bool allPayloadAdded = (payloadSize == 0);
+            long int nbytes = LONG_MAX;
+            while (sendMsgSize < totalMsgSize && nbytes > 0) {
+                struct msghdr msg;
+                struct iovec msg_iov[IOV_MAX];
+                memset(&msg, 0x00, sizeof(struct msghdr));
+                msg.msg_name = &entry->addr;
+                msg.msg_namelen = entry->len;
+                msg.msg_flags = flags;
+                msg.msg_iov = msg_iov;
+
+                size_t msgPartSize = 0;
+                message->header.payloadPartSize = 0;
+                message->header.payloadOffset = 0;
+                message->header.metadataSize = 0;
+                message->header.isLastSegment = 0;
+
+                size_t protocolHeaderBufferSize = 0;
+                // Get HeaderBufferSize of the Protocol Header, when headerBufferSize == 0, the protocol header is included in the payload (needed for endpoints)
+                handle->protocol->getHeaderBufferSize(handle->protocol->handle, &protocolHeaderBufferSize);
+                size_t footerSize = 0;
+                // Get size of the Protocol Footer
+                handle->protocol->getFooterSize(handle->protocol->handle, &footerSize);
+                size_t maxMsgSize = entry->maxMsgSize - protocolHeaderBufferSize - footerSize;
+
+                // reserve space for the header if required, header is added later when size of message is known (message can split in parts)
+                if (protocolHeaderBufferSize) {
+                    msg.msg_iovlen++;
+                }
+                // Write generic seralized payload in vector buffer
+                if (!allPayloadAdded) {
+                    if (payloadSize && payloadData && maxMsgSize) {
+                        char *buffer = payloadData;
+                        msg.msg_iov[msg.msg_iovlen].iov_base = &buffer[msgPayloadOffset];
+                        msg.msg_iov[msg.msg_iovlen].iov_len = MIN((payloadSize - msgPayloadOffset), maxMsgSize);
+                        msgPartSize += msg.msg_iov[msg.msg_iovlen].iov_len;
+                        msg.msg_iovlen++;
 
-            void *footerData = NULL;
-            size_t footerDataSize = 0;
-            if (entry->footerSize) {
-                footerData = entry->footerBuffer;
-                handle->protocol->encodeFooter(handle->protocol->handle, message,
-                                                 &footerData,
-                                                 &footerDataSize);
-                entry->footerSize = footerDataSize;
-            }
+                    } else {
+                        // copy serialized vector into vector buffer
+                        size_t i;
+                        for (i = msgIovOffset; i < MIN(msg_iov_len, msgIovOffset + max_msg_iov_len); i++) {
+                            if ((msgPartSize + msgIoVec[i].iov_len) > maxMsgSize) {
+                                break;
+                            }
+                            msg.msg_iov[msg.msg_iovlen].iov_base = msgIoVec[i].iov_base;
+                            msg.msg_iov[msg.msg_iovlen].iov_len = msgIoVec[i].iov_len;
+                            msgPartSize += msg.msg_iov[msg.msg_iovlen].iov_len;
+                            msg.msg_iovlen++;
+                        }
+                        // if no entry could be added
+                        if (i == msgIovOffset) {
+                            // TODO element can be split in parts?
+                            L_ERROR("[TCP Socket] vector io element is larger than max msg size");
+                            break;
+                        }
+                        msgIovOffset = i;
+                    }
+                    message->header.payloadPartSize = msgPartSize;
+                    message->header.payloadOffset   = msgPayloadOffset;
+                    msgPayloadOffset += message->header.payloadPartSize;
+                    sendMsgSize = msgPayloadOffset;
+                    allPayloadAdded= msgPayloadOffset >= payloadSize;
+                }
 
-            size_t msgSize = 0;
-            struct msghdr msg;
-            struct iovec msg_iov[IOV_MAX];
-            memset(&msg, 0x00, sizeof(struct msghdr));
-            msg.msg_name = &entry->addr;
-            msg.msg_namelen = entry->len;
-            msg.msg_flags = flags;
-            msg.msg_iov = msg_iov;
-
-            // Write generic seralized payload in vector buffer
-            if (payloadSize && payloadData) {
-                msg.msg_iovlen++;
-                msg.msg_iov[msg.msg_iovlen].iov_base = payloadData;
-                msg.msg_iov[msg.msg_iovlen].iov_len = payloadSize;
-                msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
-            } else {
-                // copy serialized vector into vector buffer
-                for (size_t i = 0; i < MIN(msg_iov_len, IOV_MAX - 2); i++) {
+                // Write optional metadata in vector buffer
+                if (allPayloadAdded &&
+                    (metadataSize != 0 && metadataData) &&
+                    (msgPartSize < maxMsgSize) &&
+                    (msg.msg_iovlen-1 < max_msg_iov_len)) {  // header is already included
+                    msg.msg_iov[msg.msg_iovlen].iov_base = metadataData;
+                    msg.msg_iov[msg.msg_iovlen].iov_len = metadataSize;
                     msg.msg_iovlen++;
-                    msg.msg_iov[msg.msg_iovlen].iov_base = msgIoVec[i].iov_base;
-                    msg.msg_iov[msg.msg_iovlen].iov_len = msgIoVec[i].iov_len;
-                    msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
+                    msgPartSize += metadataSize;
+                    message->header.metadataSize = metadataSize;
+                    sendMsgSize += metadataSize;
+                }
+                if (sendMsgSize >= totalMsgSize) {
+                    message->header.isLastSegment = 0x1;
                 }
-            }
-
-            // Write optional metadata in vector buffer
-            if (metadataSize && metadataData) {
-                msg.msg_iovlen++;
-                msg.msg_iov[msg.msg_iovlen].iov_base = metadataData;
-                msg.msg_iov[msg.msg_iovlen].iov_len = metadataSize;
-                msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
-            }
 
-            // Write optional footerData in vector buffer
-            if (footerData && footerDataSize) {
-                msg.msg_iovlen++;
-                msg.msg_iov[msg.msg_iovlen].iov_base = footerData;
-                msg.msg_iov[msg.msg_iovlen].iov_len = footerDataSize;
-                msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
-            }
+                void *headerData = NULL;
+                size_t headerSize = 0;
+                // Get HeaderSize of the Protocol Header
+                handle->protocol->getHeaderSize(handle->protocol->handle, &headerSize);
+
+                // check if header is not part of the payload (=> headerBufferSize = 0)
+                if (protocolHeaderBufferSize) {
+                    headerData = entry->writeHeaderBuffer;
+                    // Encode the header, with payload size and metadata size
+                    handle->protocol->encodeHeader(handle->protocol->handle, message, &headerData, &headerSize);
+                    entry->writeHeaderBufferSize = MAX(headerSize, entry->writeHeaderBufferSize);
+                    if (headerData && entry->writeHeaderBuffer != headerData) {
+                        entry->writeHeaderBuffer = headerData;
+                    }
+                    if (headerSize && headerData) {
+                        // Write header in 1st vector buffer item
+                        msg.msg_iov[0].iov_base = headerData;
+                        msg.msg_iov[0].iov_len = headerSize;
+                        msgPartSize += msg.msg_iov[0].iov_len;
+                    } else {
+                        L_ERROR("[TCP Socket] No header buffer is generated");
+                        break;
+                    }
+                }
 
-            void *headerData = NULL;
-            size_t headerSize = 0;
-            // check if header is not part of the payload (=> headerBufferSize = 0)s
-            if (entry->headerBufferSize) {
-              headerData = entry->headerBuffer;
-              // Encode the header, with payload size and metadata size
-              handle->protocol->encodeHeader(handle->protocol->handle, message,
-                                             &headerData,
-                                             &headerSize);
-              entry->headerBufferSize = headerSize;
-            }
-            if (!entry->headerBufferSize) {
-              // Skip header buffer, when header is part of payload;
-              msg.msg_iov = &msg_iov[1];
-            } else if (headerSize && headerData) {
-              // Write header in 1st vector buffer item
-                msg.msg_iov[0].iov_base = headerData;
-                msg.msg_iov[0].iov_len = headerSize;
-                msgSize += msg.msg_iov[0].iov_len;
-                msg.msg_iovlen++;
-            } else {
-              L_ERROR("[TCP Socket] No header buffer is generated");
-              msg.msg_iovlen = 0;
-            }
-            long int nbytes = pubsub_tcpHandler_writeSocket(handle, entry, &msg, msgSize, flags);
-            //  When a specific socket keeps reporting errors can indicate a subscriber
-            //  which is not active anymore, the connection will remain until the retry
-            //  counter exceeds the maximum retry count.
-            //  Btw, also, SIGSTOP issued by a debugging tool can result in EINTR error.
-            if (nbytes == -1) {
-                if (entry->retryCount < handle->maxSendRetryCount) {
-                    entry->retryCount++;
-                    L_ERROR(
-                        "[TCP Socket] Failed to send message (fd: %d), error: %s. try again. Retry count %u of %u, ",
-                        entry->fd, strerror(errno), entry->retryCount, handle->maxSendRetryCount);
-                } else {
-                    L_ERROR(
-                        "[TCP Socket] Failed to send message (fd: %d) after %u retries! Closing connection... Error: %s",
-                        entry->fd, handle->maxSendRetryCount, strerror(errno));
-                    connFdCloseQueue[nofConnToClose++] = entry->fd;
+                void *footerData = NULL;
+                // Write optional footerData in vector buffer
+                if (footerSize) {
+                    footerData = entry->writeFooterBuffer;
+                    handle->protocol->encodeFooter(handle->protocol->handle, message, &footerData, &footerSize);
+                    if (footerData && entry->writeFooterBuffer != footerData) {
+                        entry->writeFooterBuffer = footerData;
+                        entry->writeFooterBufferSize = footerSize;
+                    }
+                    if (footerData) {
+                        msg.msg_iov[msg.msg_iovlen].iov_base = footerData;
+                        msg.msg_iov[msg.msg_iovlen].iov_len  = footerSize;
+                        msg.msg_iovlen++;
+                        msgPartSize += footerSize;
+                    }
                 }
-                result = -1; //At least one connection failed sending
-            } else if (msgSize) {
-                entry->retryCount = 0;
-                if (nbytes != msgSize) {
-                    L_ERROR("[TCP Socket] seq: %d MsgSize not correct: %d != %d (%s)\n", message->header.seqNr, msgSize, nbytes,  strerror(errno));
+                nbytes = sendmsg(entry->fd, &msg, flags | MSG_NOSIGNAL);
+
+                //  When a specific socket keeps reporting errors can indicate a subscriber
+                //  which is not active anymore, the connection will remain until the retry
+                //  counter exceeds the maximum retry count.
+                //  Btw, also, SIGSTOP issued by a debugging tool can result in EINTR error.
+                if (nbytes == -1) {
+                    if (entry->retryCount < handle->maxSendRetryCount) {
+                        entry->retryCount++;
+                        L_ERROR(
+                            "[TCP Socket] Failed to send message (fd: %d), try again. Retry count %u of %u, error(%d): %s.",
+                            entry->fd, entry->retryCount, handle->maxSendRetryCount, errno, strerror(errno));
+                    } else {
+                        L_ERROR(
+                            "[TCP Socket] Failed to send message (fd: %d) after %u retries! Closing connection... Error: %s", entry->fd, handle->maxSendRetryCount, strerror(errno));
+                        connFdCloseQueue[nofConnToClose++] = entry->fd;
+                    }
+                    result = -1; //At least one connection failed sending
+                } else if (msgPartSize) {
+                    entry->retryCount = 0;
+                    if (nbytes != msgPartSize) {
+                        L_ERROR("[TCP Socket] seq: %d MsgSize not correct: %d != %d (%s)\n", message->header.seqNr, msgPartSize, nbytes, strerror(errno));
+                    }
+                }
+                // Note: serialized Payload is deleted by serializer
+                if (payloadData && (payloadData != message->payload.payload)) {
+                    free(payloadData);
                 }
             }
-            // Release data
-            if (headerData && headerData != entry->headerBuffer) {
-                free(headerData);
-            }
-            // Note: serialized Payload is deleted by serializer
-            if (payloadData && (payloadData != message->payload.payload)) {
-                free(payloadData);
-            }
-            if (metadataData && metadataData != entry->metaBuffer) {
-                free(metadataData);
-            }
-            if (footerData && footerData != entry->footerBuffer) {
-                free(footerData);
-            }
+            celixThreadMutex_unlock(&entry->writeMutex);
         }
+        celixThreadRwlock_unlock(&handle->dbLock);
     }
-    celixThreadRwlock_unlock(&handle->dbLock);
     //Force close all connections that are queued in a list, done outside of locking handle->dbLock to prevent deadlock
     for (int i = 0; i < nofConnToClose; i++) {
         pubsub_tcpHandler_close(handle, connFdCloseQueue[i]);

Review comment:
       Potential NULL dereference on `handle`

##########
File path: bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h
##########
@@ -23,19 +23,20 @@
 #define PSA_TCP_BASE_PORT                       "PSA_TCP_BASE_PORT"
 #define PSA_TCP_MAX_PORT                        "PSA_TCP_MAX_PORT"
 
-#define PSA_TCP_MAX_RECV_SESSIONS               "PSA_TCP_MAX_RECV_SESSIONS"
+#define PSA_TCP_MAX_MESSAGE_SIZE                "PSA_TCP_MAX_MESSAGE_SIZE"
 #define PSA_TCP_RECV_BUFFER_SIZE                "PSA_TCP_RECV_BUFFER_SIZE"
 #define PSA_TCP_TIMEOUT                         "PSA_TCP_TIMEOUT"
 #define PSA_TCP_SUBSCRIBER_CONNECTION_TIMEOUT   "PSA_TCP_SUBSCRIBER_CONNECTION_TIMEOUT"
+#define PSA_TCP_SEND_DELAY                      "PSA_TCP_SEND_DELAY"
 
 #define PSA_TCP_DEFAULT_BASE_PORT               5501
 #define PSA_TCP_DEFAULT_MAX_PORT                6000
 
-#define PSA_TCP_DEFAULT_MAX_RECV_SESSIONS       1
-
+#define PSA_TCP_DEFAULT_MAX_MESSAGE_SIZE        UINT32_MAX
 #define PSA_TCP_DEFAULT_RECV_BUFFER_SIZE        65 * 1024
 #define PSA_TCP_DEFAULT_TIMEOUT                 2000 // 2 seconds
 #define PSA_TCP_SUBSCRIBER_CONNECTION_DEFAULT_TIMEOUT 250 // 250 ms
+#define PSA_TCP_DEFAULT_SEND_DELAY              250 //  250 ms

Review comment:
       Please rename to `PSA_DEFAULT_FIRST_SEND_DELAY` and move to pubsub_utils, as all pubsub admins can use it.

##########
File path: bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
##########
@@ -144,119 +146,108 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
     if (uuid != NULL) {
         uuid_parse(uuid, sender->fwUUID);
     }
-    sender->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_TCP_METRICS_ENABLED,
-                                                                   PSA_TCP_DEFAULT_METRICS_ENABLED);
-    bool isEndpoint = false;
+    pubsubInterceptorsHandler_create(ctx, scope, topic, &sender->interceptorsHandler);
+    sender->isPassive = false;
+    sender->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_TCP_METRICS_ENABLED, PSA_TCP_DEFAULT_METRICS_ENABLED);
     char *urls = NULL;
     const char *ip = celix_bundleContext_getProperty(ctx, PUBSUB_TCP_PSA_IP_KEY, NULL);
-    const char *discUrl = NULL;
-    const char *staticClientEndPointUrls = NULL;
-    const char *staticServerEndPointUrls = NULL;
-
-    discUrl = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_STATIC_BIND_URL_FOR, topic, scope);
+    const char *discUrl = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_STATIC_BIND_URL_FOR, topic, scope);
+    const char *isPassive = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_PASSIVE_ENABLED, topic, scope);
+    const char *passiveKey = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_PASSIVE_SELECTION_KEY, topic, scope);
 
     if (topicProperties != NULL) {
         if (discUrl == NULL) {
             discUrl = celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_DISCOVER_URL, NULL);
         }
-        /* Check if it's a static endpoint */
-        const char *endPointType = celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_ENDPOINT_TYPE, NULL);
-        if (endPointType != NULL) {
-            isEndpoint = true;
-            if (strncmp(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_CLIENT, endPointType,
-                        strlen(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_CLIENT)) == 0) {
-                staticClientEndPointUrls = celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_CONNECT_URLS, NULL);
-            }
-            if (strncmp(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_SERVER, endPointType,
-                        strlen(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_SERVER)) == 0) {
-                staticServerEndPointUrls = discUrl;
-            }
+        if (isPassive == NULL) {
+            isPassive = celix_properties_get(topicProperties, PUBSUB_TCP_PASSIVE_CONFIGURED, NULL);
+        }
+        if (passiveKey == NULL) {
+            passiveKey = celix_properties_get(topicProperties, PUBSUB_TCP_PASSIVE_KEY, NULL);
         }
     }
+    sender->isPassive = psa_tcp_isPassive(isPassive);
 
     /* When it's an endpoint share the socket with the receiver */
-    if ((staticClientEndPointUrls != NULL) || (staticServerEndPointUrls)) {
-        celixThreadMutex_lock(&endPointStore->mutex);
-        const char *endPointUrl = (staticClientEndPointUrls) ? staticClientEndPointUrls : staticServerEndPointUrls;
-        pubsub_tcpHandler_t *entry = hashMap_get(endPointStore->map, endPointUrl);
+    if (passiveKey != NULL) {
+        celixThreadMutex_lock(&handlerStore->mutex);
+        pubsub_tcpHandler_t *entry = hashMap_get(handlerStore->map, passiveKey);
         if (entry == NULL) {
             if (sender->socketHandler == NULL)
                 sender->socketHandler = pubsub_tcpHandler_create(sender->protocol, sender->logHelper);
             entry = sender->socketHandler;
             sender->sharedSocketHandler = sender->socketHandler;
-            hashMap_put(endPointStore->map, (void *) endPointUrl, entry);
+            hashMap_put(handlerStore->map, (void *) passiveKey, entry);
         } else {
             sender->socketHandler = entry;
             sender->sharedSocketHandler = entry;
         }
-        celixThreadMutex_unlock(&endPointStore->mutex);
+        celixThreadMutex_unlock(&handlerStore->mutex);
     } else {
         sender->socketHandler = pubsub_tcpHandler_create(sender->protocol, sender->logHelper);
     }
 
     if ((sender->socketHandler != NULL) && (topicProperties != NULL)) {
         long prio = celix_properties_getAsLong(topicProperties, PUBSUB_TCP_THREAD_REALTIME_PRIO, -1L);
         const char *sched = celix_properties_get(topicProperties, PUBSUB_TCP_THREAD_REALTIME_SCHED, NULL);
-        long retryCnt = celix_properties_getAsLong(topicProperties, PUBSUB_TCP_PUBLISHER_RETRY_CNT_KEY,
-                                                   PUBSUB_TCP_PUBLISHER_RETRY_CNT_DEFAULT);
-        double timeout = celix_properties_getAsDouble(topicProperties, PUBSUB_TCP_PUBLISHER_SNDTIMEO_KEY,
-                                                                       (!isEndpoint) ? PUBSUB_TCP_PUBLISHER_SNDTIMEO_DEFAULT :
-                                                                                       PUBSUB_TCP_PUBLISHER_SNDTIMEO_ENDPOINT_DEFAULT);
+        long retryCnt = celix_properties_getAsLong(topicProperties, PUBSUB_TCP_PUBLISHER_RETRY_CNT_KEY, PUBSUB_TCP_PUBLISHER_RETRY_CNT_DEFAULT);
+        double sendTimeout = celix_properties_getAsDouble(topicProperties, PUBSUB_TCP_PUBLISHER_SNDTIMEO_KEY, PUBSUB_TCP_PUBLISHER_SNDTIMEO_DEFAULT);
+        long maxMsgSize = celix_properties_getAsLong(topicProperties, PSA_TCP_MAX_MESSAGE_SIZE, PSA_TCP_DEFAULT_MAX_MESSAGE_SIZE);
+        long timeout = celix_bundleContext_getPropertyAsLong(ctx, PSA_TCP_TIMEOUT, PSA_TCP_DEFAULT_TIMEOUT);
+        sender->send_delay = celix_bundleContext_getPropertyAsLong(ctx,  PSA_TCP_SEND_DELAY, PSA_TCP_DEFAULT_SEND_DELAY);
         pubsub_tcpHandler_setThreadName(sender->socketHandler, topic, scope);
         pubsub_tcpHandler_setThreadPriority(sender->socketHandler, prio, sched);
         pubsub_tcpHandler_setSendRetryCnt(sender->socketHandler, (unsigned int) retryCnt);
-        pubsub_tcpHandler_setSendTimeOut(sender->socketHandler, timeout);
+        pubsub_tcpHandler_setSendTimeOut(sender->socketHandler, sendTimeout);
+        pubsub_tcpHandler_setMaxMsgSize(sender->socketHandler, (unsigned int) maxMsgSize);
+        pubsub_tcpHandler_enableReceiveEvent(sender->socketHandler, (passiveKey) ? true : false);

Review comment:
       The receive event is used to enable `EPOLLIN`, but for a sender I would expect it to enable `EPOLLOUT`. What's the purpose of enabling it here?

##########
File path: bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
##########
@@ -628,16 +615,24 @@ int pubsub_tcpHandler_listen(pubsub_tcpHandler_t *handle, char *url) {
 }
 
 //
-// Setup buffer sizes
+// Setup receive buffer size
+//
+int pubsub_tcpHandler_setReceiveBufferSize(pubsub_tcpHandler_t *handle, unsigned int size) {
+    if (handle != NULL) {
+        celixThreadRwlock_writeLock(&handle->dbLock);
+        handle->bufferSize = size;

Review comment:
       I would expect that the `handle->buffer` here also gets resized accordingly. However, I see that `pubsub_tcpHandler_ensureReadBufferCapacity` gets called. Please add a comment here explaining why `handle->buffer` is left alone here.

##########
File path: bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
##########
@@ -374,6 +351,17 @@ long pubsub_tcpTopicReceiver_protocolSvcId(pubsub_tcp_topic_receiver_t *receiver
 void pubsub_tcpTopicReceiver_listConnections(pubsub_tcp_topic_receiver_t *receiver, celix_array_list_t *connectedUrls,
                                              celix_array_list_t *unconnectedUrls) {
     celixThreadMutex_lock(&receiver->requestedConnections.mutex);
+    if (receiver->isPassive) {
+        char* interface_url = pubsub_tcpHandler_get_interface_url(receiver->socketHandler);
+        char *url = NULL;
+        asprintf(&url, "%s (passive)", interface_url ? interface_url : "");
+        if (interface_url) {
+            celix_arrayList_add(connectedUrls, url);
+        } else {
+            celix_arrayList_add(unconnectedUrls, url);
+        }
+        free(interface_url);
+    } else {
     hash_map_iterator_t iter = hashMapIterator_construct(receiver->requestedConnections.map);

Review comment:
       Please indent this else branch

##########
File path: bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
##########
@@ -39,7 +39,7 @@
 #include "pubsub_admin.h"
 #include "../../pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.h"
 
-#define PSTM_PSA_HANDLING_DEFAULT_SLEEPTIME_IN_SECONDS       30L
+#define PSTM_PSA_HANDLING_DEFAULT_SLEEPTIME       250 // 250 msecond

Review comment:
       Renaming an existing environment variable is a breaking change. Please keep the existing variable and have the new one overrule the older one only if defined in the properties.

##########
File path: bundles/pubsub/test/CMakeLists.txt
##########
@@ -47,7 +47,7 @@ celix_bundle_files(pubsub_endpoint_tst
         DESTINATION "META-INF/descriptors"
         )
 celix_bundle_files(pubsub_endpoint_tst
-        meta_data/ping2.properties
+        meta_data/ping3.properties

Review comment:
       It's probably better to create two endpoint tests; one which uses ping2/pong2 and one which uses ping3/pong3

##########
File path: bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
##########
@@ -617,8 +614,8 @@ static void delay_first_send_for_late_joiners(pubsub_tcp_topic_sender_t *sender)
     static bool firstSend = true;
 
     if (firstSend) {
-        L_INFO("PSA_TCP_TP: Delaying first send for late joiners...\n");
-        sleep(FIRST_SEND_DELAY_IN_SECONDS);
+        if (sender->send_delay ) L_INFO("PSA_TCP_TP: Delaying first send for late joiners...\n");

Review comment:
       If statement should encompass more than 1 line, please add braces.

##########
File path: bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
##########
@@ -533,7 +529,8 @@ psa_tcp_topicPublicationSend(void *handle, unsigned int msgTypeId, const void *i
             clock_gettime(CLOCK_REALTIME, &serializationEnd);
         }
 
-        if (status == CELIX_SUCCESS /*ser ok*/) {
+        bool cont = pubsubInterceptorHandler_invokePreSend(sender->interceptorsHandler, entry->msgSer->msgName, msgTypeId, inMsg, &metadata);

Review comment:
       I'm not quite sure if we want to intercept messages if we can't serialize them. @pnoltes WDYT?
   
   Maybe we should do short-circruiting here
   ```
   bool cont = status == CELIX_SUCCESS && pubsubInterceptorHandler_invokePreSend(sender->interceptorsHandler, entry->msgSer->msgName, msgTypeId, inMsg, &metadata);
   ```

##########
File path: bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
##########
@@ -707,7 +700,7 @@ pubsub_admin_receiver_metrics_t *pubsub_tcpTopicReceiver_metrics(pubsub_tcp_topi
                            metrics->msgTypeId);
                 }
             }
-            i += 1;
+            i +=1 ;

Review comment:
       nitpick: space before 1 was fine

##########
File path: bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
##########
@@ -964,58 +998,30 @@ int pubsub_tcpHandler_addAcceptConnectionCallback(pubsub_tcpHandler_t *handle, v
     return result;
 }
 
-static inline
-int pubsub_tcpHandler_writeSocket(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, struct msghdr* msg, unsigned int size, int flag ) {
-  int nbytes = 0;
-  int msgSize = 0;
-  if (entry->fd >= 0 && size && msg->msg_iovlen) {
-    int expectedReadSize = size;
-    unsigned int offset = 0;
-    nbytes = size;
-    while (nbytes > 0 && expectedReadSize > 0) {
-      // Read the message header
-      nbytes = sendmsg(entry->fd, msg, flag | MSG_NOSIGNAL);
-      // Update admin
-      expectedReadSize -= nbytes;
-      msgSize += nbytes;
-      // Not all written
-      if (expectedReadSize && nbytes > 0) {
-        unsigned int readSize = 0;
-        unsigned int readIndex = 0;
-        unsigned int i = 0;
-        for (i = 0; i < msg->msg_iovlen; i++) {
-          if (nbytes < msg->msg_iov[i].iov_len) {
-            readIndex = i;
-            break;
-          }
-          readSize+= msg->msg_iov[i].iov_len;
-        }
-        msg->msg_iov = &msg->msg_iov[readIndex];
-        msg->msg_iovlen -= readIndex;
-        char* buffer = (char*)msg->msg_iov->iov_base;
-        offset = nbytes - readSize;
-        msg->msg_iov->iov_base = &buffer[offset];
-        msg->msg_iov->iov_len  = msg->msg_iov->iov_len - offset;
-      }
-    }
-  }
-  if (nbytes <=0)  msgSize = nbytes;
-  return msgSize;
-}
+
 //
 // Write large data to TCP. .
 //
 int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message_t *message, struct iovec *msgIoVec,
                             size_t msg_iov_len, int flags) {
-    celixThreadRwlock_readLock(&handle->dbLock);
     int result = 0;
     int connFdCloseQueue[hashMap_size(handle->connection_fd_map)];

Review comment:
       segfault if handle is NULL




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [celix] Oipo commented on a change in pull request #279: Feature/tcp admin msg segmentation

Posted by GitBox <gi...@apache.org>.
Oipo commented on a change in pull request #279:
URL: https://github.com/apache/celix/pull/279#discussion_r544250591



##########
File path: bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
##########
@@ -1187,6 +1245,32 @@ char *pubsub_tcpHandler_get_interface_url(pubsub_tcpHandler_t *handle) {
     }
     return url;
 }
+//
+// get interface URL
+//
+char *pubsub_tcpHandler_get_connection_url(pubsub_tcpHandler_t *handle) {
+    hash_map_iterator_t iter =

Review comment:
       Missing `celixThreadRwlock_writeLock(&handle->dbLock);` in this function or in the caller function `pubsub_tcpTopicSender_url`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [celix] codecov-io commented on pull request #279: Feature/tcp admin msg segmentation

Posted by GitBox <gi...@apache.org>.
codecov-io commented on pull request #279:
URL: https://github.com/apache/celix/pull/279#issuecomment-704496769


   # [Codecov](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=h1) Report
   > :exclamation: No coverage uploaded for pull request base (`master@20f794b`). [Click here to learn what that means](https://docs.codecov.io/docs/error-reference#section-missing-base-commit).
   > The diff coverage is `64.01%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/celix/pull/279/graphs/tree.svg?width=650&height=150&src=pr&token=JdsiThga8P)](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff            @@
   ##             master     #279   +/-   ##
   =========================================
     Coverage          ?   68.61%           
   =========================================
     Files             ?      137           
     Lines             ?    27382           
     Branches          ?        0           
   =========================================
     Hits              ?    18788           
     Misses            ?     8594           
     Partials          ?        0           
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...les/pubsub/pubsub\_admin\_tcp/src/pubsub\_tcp\_admin.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHVic3ViX3RjcF9hZG1pbi5j) | `51.74% <20.00%> (ø)` | |
   | [...es/pubsub/pubsub\_admin\_tcp/src/pubsub\_tcp\_common.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHVic3ViX3RjcF9jb21tb24uYw==) | `40.00% <40.00%> (ø)` | |
   | [bundles/pubsub/pubsub\_utils/src/pubsub\_utils\_url.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX3V0aWxzL3NyYy9wdWJzdWJfdXRpbHNfdXJsLmM=) | `51.75% <54.54%> (ø)` | |
   | [...b/pubsub\_admin\_tcp/src/pubsub\_tcp\_topic\_receiver.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHVic3ViX3RjcF90b3BpY19yZWNlaXZlci5j) | `61.60% <58.33%> (ø)` | |
   | [...s/pubsub/pubsub\_admin\_tcp/src/pubsub\_tcp\_handler.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHVic3ViX3RjcF9oYW5kbGVyLmM=) | `75.13% <65.51%> (ø)` | |
   | [...sub/pubsub\_admin\_tcp/src/pubsub\_tcp\_topic\_sender.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHVic3ViX3RjcF90b3BpY19zZW5kZXIuYw==) | `67.06% <80.95%> (ø)` | |
   | [...undles/pubsub/pubsub\_admin\_tcp/src/psa\_activator.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX2FkbWluX3RjcC9zcmMvcHNhX2FjdGl2YXRvci5j) | `100.00% <100.00%> (ø)` | |
   | [...sub\_topology\_manager/src/pubsub\_topology\_manager.c](https://codecov.io/gh/apache/celix/pull/279/diff?src=pr&el=tree#diff-YnVuZGxlcy9wdWJzdWIvcHVic3ViX3RvcG9sb2d5X21hbmFnZXIvc3JjL3B1YnN1Yl90b3BvbG9neV9tYW5hZ2VyLmM=) | `57.60% <100.00%> (ø)` | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=footer). Last update [20f794b...17c6f7c](https://codecov.io/gh/apache/celix/pull/279?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [celix] Oipo commented on a change in pull request #279: Feature/tcp admin msg segmentation

Posted by GitBox <gi...@apache.org>.
Oipo commented on a change in pull request #279:
URL: https://github.com/apache/celix/pull/279#discussion_r544444880



##########
File path: bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
##########
@@ -79,11 +79,11 @@ celix_status_t pubsub_topologyManager_create(celix_bundle_context_t *context, ce
 
     manager->loghelper = logHelper;
     manager->verbose = celix_bundleContext_getPropertyAsBool(context, PUBSUB_TOPOLOGY_MANAGER_VERBOSE_KEY, PUBSUB_TOPOLOGY_MANAGER_DEFAULT_VERBOSE);
-    manager->handlingThreadSleepTime = celix_bundleContext_getPropertyAsLong(context, PUBSUB_TOPOLOGY_MANAGER_HANDLING_THREAD_SLEEPTIME_MS_KEY, PSTM_PSA_HANDLING_DEFAULT_SLEEPTIME);
-    unsigned handlingThreadSleepTime = celix_bundleContext_getPropertyAsLong(context, PUBSUB_TOPOLOGY_MANAGER_HANDLING_THREAD_SLEEPTIME_KEY, -1L);
+    unsigned handlingThreadSleepTime = celix_bundleContext_getPropertyAsLong(context, PUBSUB_TOPOLOGY_MANAGER_HANDLING_THREAD_SLEEPTIME_SECONDS_KEY, PSTM_PSA_HANDLING_SLEEPTIME_IN_SECONDS);
     if ( handlingThreadSleepTime >= 0 ) {
         manager->handlingThreadSleepTime = handlingThreadSleepTime * 1000L;
     }
+    manager->handlingThreadSleepTime = celix_bundleContext_getPropertyAsLong(context, PUBSUB_TOPOLOGY_MANAGER_HANDLING_THREAD_SLEEPTIME_MS_KEY,  manager->handlingThreadSleepTime);

Review comment:
       :+1: 

##########
File path: bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.h
##########
@@ -47,7 +47,9 @@ bool pubsub_tcpTopicSender_isStatic(pubsub_tcp_topic_sender_t *sender);
 bool pubsub_tcpTopicSender_isPassive(pubsub_tcp_topic_sender_t *sender);
 long pubsub_tcpTopicSender_serializerSvcId(pubsub_tcp_topic_sender_t *sender);
 long pubsub_tcpTopicSender_protocolSvcId(pubsub_tcp_topic_sender_t *sender);
+/* Note this functions are deprecated and not used */

Review comment:
       :+1: 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [celix] rbulter commented on a change in pull request #279: Feature/tcp admin msg segmentation

Posted by GitBox <gi...@apache.org>.
rbulter commented on a change in pull request #279:
URL: https://github.com/apache/celix/pull/279#discussion_r544359643



##########
File path: bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
##########
@@ -343,18 +335,25 @@ const char *pubsub_tcpTopicSender_topic(pubsub_tcp_topic_sender_t *sender) {
 }
 
 const char *pubsub_tcpTopicSender_url(pubsub_tcp_topic_sender_t *sender) {
-    return sender->url;
+    if (sender->isPassive) {
+        return pubsub_tcpHandler_get_connection_url(sender->socketHandler);
+    } else {
+        return sender->url;
+    }
 }
-
 bool pubsub_tcpTopicSender_isStatic(pubsub_tcp_topic_sender_t *sender) {
     return sender->isStatic;
 }
 
-void pubsub_tcpTopicSender_connectTo(pubsub_tcp_topic_sender_t *sender, const celix_properties_t *endpoint) {
+bool pubsub_tcpTopicSender_isPassive(pubsub_tcp_topic_sender_t *sender) {
+    return sender->isPassive;
+}
+
+void pubsub_tcpTopicSender_connectTo(pubsub_tcp_topic_sender_t *sender __attribute__((unused)), const celix_properties_t *endpoint __attribute__((unused))) {

Review comment:
       Done 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [celix] rbulter commented on a change in pull request #279: Feature/tcp admin msg segmentation

Posted by GitBox <gi...@apache.org>.
rbulter commented on a change in pull request #279:
URL: https://github.com/apache/celix/pull/279#discussion_r542432682



##########
File path: bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
##########
@@ -707,7 +700,7 @@ pubsub_admin_receiver_metrics_t *pubsub_tcpTopicReceiver_metrics(pubsub_tcp_topi
                            metrics->msgTypeId);
                 }
             }
-            i += 1;
+            i +=1 ;

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [celix] rbulter commented on a change in pull request #279: Feature/tcp admin msg segmentation

Posted by GitBox <gi...@apache.org>.
rbulter commented on a change in pull request #279:
URL: https://github.com/apache/celix/pull/279#discussion_r542433864



##########
File path: bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
##########
@@ -1034,130 +1050,176 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
             void *metadataData = NULL;
             size_t metadataSize = 0;
             if (message->metadata.metadata) {
-                metadataData = entry->metaBuffer;
-                handle->protocol->encodeMetadata(handle->protocol->handle, message,
-                                                 &metadataData,
-                                                 &metadataSize);
-                entry->metaBufferSize = metadataSize;
+                metadataSize = entry->writeMetaBufferSize;
+                metadataData = entry->writeMetaBuffer;
+                // When maxMsgSize is smaller then meta data is disabled
+                if (metadataSize > entry->maxMsgSize) {
+                    metadataSize = 0;
+                }
+                handle->protocol->encodeMetadata(handle->protocol->handle, message, &metadataData, &metadataSize);
             }
+
             message->header.metadataSize = metadataSize;
+            size_t totalMsgSize = payloadSize + metadataSize;
+
+            size_t sendMsgSize = 0;
+            size_t msgPayloadOffset = 0;
+            size_t msgIovOffset     = 0;
+            bool allPayloadAdded = (payloadSize == 0);
+            long int nbytes = LONG_MAX;
+            while (sendMsgSize < totalMsgSize && nbytes > 0) {
+                struct msghdr msg;
+                struct iovec msg_iov[IOV_MAX];
+                memset(&msg, 0x00, sizeof(struct msghdr));
+                msg.msg_name = &entry->addr;
+                msg.msg_namelen = entry->len;
+                msg.msg_flags = flags;
+                msg.msg_iov = msg_iov;
+
+                size_t msgPartSize = 0;
+                message->header.payloadPartSize = 0;
+                message->header.payloadOffset = 0;
+                message->header.metadataSize = 0;
+                message->header.isLastSegment = 0;
+
+                size_t protocolHeaderBufferSize = 0;
+                // Get HeaderBufferSize of the Protocol Header, when headerBufferSize == 0, the protocol header is included in the payload (needed for endpoints)
+                handle->protocol->getHeaderBufferSize(handle->protocol->handle, &protocolHeaderBufferSize);
+                size_t footerSize = 0;
+                // Get size of the Protocol Footer
+                handle->protocol->getFooterSize(handle->protocol->handle, &footerSize);
+                size_t maxMsgSize = entry->maxMsgSize - protocolHeaderBufferSize - footerSize;
+
+                // reserve space for the header if required, header is added later when size of message is known (message can split in parts)
+                if (protocolHeaderBufferSize) {
+                    msg.msg_iovlen++;
+                }
+                // Write generic seralized payload in vector buffer
+                if (!allPayloadAdded) {
+                    if (payloadSize && payloadData && maxMsgSize) {
+                        char *buffer = payloadData;
+                        msg.msg_iov[msg.msg_iovlen].iov_base = &buffer[msgPayloadOffset];
+                        msg.msg_iov[msg.msg_iovlen].iov_len = MIN((payloadSize - msgPayloadOffset), maxMsgSize);
+                        msgPartSize += msg.msg_iov[msg.msg_iovlen].iov_len;
+                        msg.msg_iovlen++;
 
-            void *footerData = NULL;
-            size_t footerDataSize = 0;
-            if (entry->footerSize) {
-                footerData = entry->footerBuffer;
-                handle->protocol->encodeFooter(handle->protocol->handle, message,
-                                                 &footerData,
-                                                 &footerDataSize);
-                entry->footerSize = footerDataSize;
-            }
+                    } else {
+                        // copy serialized vector into vector buffer
+                        size_t i;
+                        for (i = msgIovOffset; i < MIN(msg_iov_len, msgIovOffset + max_msg_iov_len); i++) {
+                            if ((msgPartSize + msgIoVec[i].iov_len) > maxMsgSize) {
+                                break;
+                            }
+                            msg.msg_iov[msg.msg_iovlen].iov_base = msgIoVec[i].iov_base;
+                            msg.msg_iov[msg.msg_iovlen].iov_len = msgIoVec[i].iov_len;
+                            msgPartSize += msg.msg_iov[msg.msg_iovlen].iov_len;
+                            msg.msg_iovlen++;
+                        }
+                        // if no entry could be added
+                        if (i == msgIovOffset) {
+                            // TODO element can be split in parts?
+                            L_ERROR("[TCP Socket] vector io element is larger than max msg size");
+                            break;
+                        }
+                        msgIovOffset = i;
+                    }
+                    message->header.payloadPartSize = msgPartSize;
+                    message->header.payloadOffset   = msgPayloadOffset;
+                    msgPayloadOffset += message->header.payloadPartSize;
+                    sendMsgSize = msgPayloadOffset;
+                    allPayloadAdded= msgPayloadOffset >= payloadSize;
+                }
 
-            size_t msgSize = 0;
-            struct msghdr msg;
-            struct iovec msg_iov[IOV_MAX];
-            memset(&msg, 0x00, sizeof(struct msghdr));
-            msg.msg_name = &entry->addr;
-            msg.msg_namelen = entry->len;
-            msg.msg_flags = flags;
-            msg.msg_iov = msg_iov;
-
-            // Write generic seralized payload in vector buffer
-            if (payloadSize && payloadData) {
-                msg.msg_iovlen++;
-                msg.msg_iov[msg.msg_iovlen].iov_base = payloadData;
-                msg.msg_iov[msg.msg_iovlen].iov_len = payloadSize;
-                msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
-            } else {
-                // copy serialized vector into vector buffer
-                for (size_t i = 0; i < MIN(msg_iov_len, IOV_MAX - 2); i++) {
+                // Write optional metadata in vector buffer
+                if (allPayloadAdded &&
+                    (metadataSize != 0 && metadataData) &&
+                    (msgPartSize < maxMsgSize) &&
+                    (msg.msg_iovlen-1 < max_msg_iov_len)) {  // header is already included
+                    msg.msg_iov[msg.msg_iovlen].iov_base = metadataData;
+                    msg.msg_iov[msg.msg_iovlen].iov_len = metadataSize;
                     msg.msg_iovlen++;
-                    msg.msg_iov[msg.msg_iovlen].iov_base = msgIoVec[i].iov_base;
-                    msg.msg_iov[msg.msg_iovlen].iov_len = msgIoVec[i].iov_len;
-                    msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
+                    msgPartSize += metadataSize;
+                    message->header.metadataSize = metadataSize;
+                    sendMsgSize += metadataSize;
+                }
+                if (sendMsgSize >= totalMsgSize) {
+                    message->header.isLastSegment = 0x1;
                 }
-            }
-
-            // Write optional metadata in vector buffer
-            if (metadataSize && metadataData) {
-                msg.msg_iovlen++;
-                msg.msg_iov[msg.msg_iovlen].iov_base = metadataData;
-                msg.msg_iov[msg.msg_iovlen].iov_len = metadataSize;
-                msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
-            }
 
-            // Write optional footerData in vector buffer
-            if (footerData && footerDataSize) {
-                msg.msg_iovlen++;
-                msg.msg_iov[msg.msg_iovlen].iov_base = footerData;
-                msg.msg_iov[msg.msg_iovlen].iov_len = footerDataSize;
-                msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
-            }
+                void *headerData = NULL;
+                size_t headerSize = 0;
+                // Get HeaderSize of the Protocol Header
+                handle->protocol->getHeaderSize(handle->protocol->handle, &headerSize);
+
+                // check if header is not part of the payload (=> headerBufferSize = 0)
+                if (protocolHeaderBufferSize) {
+                    headerData = entry->writeHeaderBuffer;
+                    // Encode the header, with payload size and metadata size
+                    handle->protocol->encodeHeader(handle->protocol->handle, message, &headerData, &headerSize);
+                    entry->writeHeaderBufferSize = MAX(headerSize, entry->writeHeaderBufferSize);
+                    if (headerData && entry->writeHeaderBuffer != headerData) {
+                        entry->writeHeaderBuffer = headerData;
+                    }
+                    if (headerSize && headerData) {
+                        // Write header in 1st vector buffer item
+                        msg.msg_iov[0].iov_base = headerData;
+                        msg.msg_iov[0].iov_len = headerSize;
+                        msgPartSize += msg.msg_iov[0].iov_len;
+                    } else {
+                        L_ERROR("[TCP Socket] No header buffer is generated");
+                        break;
+                    }
+                }
 
-            void *headerData = NULL;
-            size_t headerSize = 0;
-            // check if header is not part of the payload (=> headerBufferSize = 0)s
-            if (entry->headerBufferSize) {
-              headerData = entry->headerBuffer;
-              // Encode the header, with payload size and metadata size
-              handle->protocol->encodeHeader(handle->protocol->handle, message,
-                                             &headerData,
-                                             &headerSize);
-              entry->headerBufferSize = headerSize;
-            }
-            if (!entry->headerBufferSize) {
-              // Skip header buffer, when header is part of payload;
-              msg.msg_iov = &msg_iov[1];
-            } else if (headerSize && headerData) {
-              // Write header in 1st vector buffer item
-                msg.msg_iov[0].iov_base = headerData;
-                msg.msg_iov[0].iov_len = headerSize;
-                msgSize += msg.msg_iov[0].iov_len;
-                msg.msg_iovlen++;
-            } else {
-              L_ERROR("[TCP Socket] No header buffer is generated");
-              msg.msg_iovlen = 0;
-            }
-            long int nbytes = pubsub_tcpHandler_writeSocket(handle, entry, &msg, msgSize, flags);
-            //  When a specific socket keeps reporting errors can indicate a subscriber
-            //  which is not active anymore, the connection will remain until the retry
-            //  counter exceeds the maximum retry count.
-            //  Btw, also, SIGSTOP issued by a debugging tool can result in EINTR error.
-            if (nbytes == -1) {
-                if (entry->retryCount < handle->maxSendRetryCount) {
-                    entry->retryCount++;
-                    L_ERROR(
-                        "[TCP Socket] Failed to send message (fd: %d), error: %s. try again. Retry count %u of %u, ",
-                        entry->fd, strerror(errno), entry->retryCount, handle->maxSendRetryCount);
-                } else {
-                    L_ERROR(
-                        "[TCP Socket] Failed to send message (fd: %d) after %u retries! Closing connection... Error: %s",
-                        entry->fd, handle->maxSendRetryCount, strerror(errno));
-                    connFdCloseQueue[nofConnToClose++] = entry->fd;
+                void *footerData = NULL;
+                // Write optional footerData in vector buffer
+                if (footerSize) {
+                    footerData = entry->writeFooterBuffer;
+                    handle->protocol->encodeFooter(handle->protocol->handle, message, &footerData, &footerSize);
+                    if (footerData && entry->writeFooterBuffer != footerData) {
+                        entry->writeFooterBuffer = footerData;
+                        entry->writeFooterBufferSize = footerSize;
+                    }
+                    if (footerData) {
+                        msg.msg_iov[msg.msg_iovlen].iov_base = footerData;
+                        msg.msg_iov[msg.msg_iovlen].iov_len  = footerSize;
+                        msg.msg_iovlen++;
+                        msgPartSize += footerSize;
+                    }
                 }
-                result = -1; //At least one connection failed sending
-            } else if (msgSize) {
-                entry->retryCount = 0;
-                if (nbytes != msgSize) {
-                    L_ERROR("[TCP Socket] seq: %d MsgSize not correct: %d != %d (%s)\n", message->header.seqNr, msgSize, nbytes,  strerror(errno));
+                nbytes = sendmsg(entry->fd, &msg, flags | MSG_NOSIGNAL);
+
+                //  When a specific socket keeps reporting errors can indicate a subscriber
+                //  which is not active anymore, the connection will remain until the retry
+                //  counter exceeds the maximum retry count.
+                //  Btw, also, SIGSTOP issued by a debugging tool can result in EINTR error.
+                if (nbytes == -1) {
+                    if (entry->retryCount < handle->maxSendRetryCount) {
+                        entry->retryCount++;
+                        L_ERROR(
+                            "[TCP Socket] Failed to send message (fd: %d), try again. Retry count %u of %u, error(%d): %s.",
+                            entry->fd, entry->retryCount, handle->maxSendRetryCount, errno, strerror(errno));
+                    } else {
+                        L_ERROR(
+                            "[TCP Socket] Failed to send message (fd: %d) after %u retries! Closing connection... Error: %s", entry->fd, handle->maxSendRetryCount, strerror(errno));
+                        connFdCloseQueue[nofConnToClose++] = entry->fd;
+                    }
+                    result = -1; //At least one connection failed sending
+                } else if (msgPartSize) {
+                    entry->retryCount = 0;
+                    if (nbytes != msgPartSize) {
+                        L_ERROR("[TCP Socket] seq: %d MsgSize not correct: %d != %d (%s)\n", message->header.seqNr, msgPartSize, nbytes, strerror(errno));
+                    }
+                }
+                // Note: serialized Payload is deleted by serializer
+                if (payloadData && (payloadData != message->payload.payload)) {
+                    free(payloadData);
                 }
             }
-            // Release data
-            if (headerData && headerData != entry->headerBuffer) {
-                free(headerData);
-            }
-            // Note: serialized Payload is deleted by serializer
-            if (payloadData && (payloadData != message->payload.payload)) {
-                free(payloadData);
-            }
-            if (metadataData && metadataData != entry->metaBuffer) {
-                free(metadataData);
-            }
-            if (footerData && footerData != entry->footerBuffer) {
-                free(footerData);
-            }
+            celixThreadMutex_unlock(&entry->writeMutex);
         }
+        celixThreadRwlock_unlock(&handle->dbLock);
     }
-    celixThreadRwlock_unlock(&handle->dbLock);
     //Force close all connections that are queued in a list, done outside of locking handle->dbLock to prevent deadlock
     for (int i = 0; i < nofConnToClose; i++) {
         pubsub_tcpHandler_close(handle, connFdCloseQueue[i]);

Review comment:
       Solved, with previous one




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [celix] pnoltes merged pull request #279: Feature/tcp admin msg segmentation

Posted by GitBox <gi...@apache.org>.
pnoltes merged pull request #279:
URL: https://github.com/apache/celix/pull/279


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [celix] rbulter commented on a change in pull request #279: Feature/tcp admin msg segmentation

Posted by GitBox <gi...@apache.org>.
rbulter commented on a change in pull request #279:
URL: https://github.com/apache/celix/pull/279#discussion_r534422385



##########
File path: bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
##########
@@ -533,7 +529,8 @@ psa_tcp_topicPublicationSend(void *handle, unsigned int msgTypeId, const void *i
             clock_gettime(CLOCK_REALTIME, &serializationEnd);
         }
 
-        if (status == CELIX_SUCCESS /*ser ok*/) {
+        bool cont = pubsubInterceptorHandler_invokePreSend(sender->interceptorsHandler, entry->msgSer->msgName, msgTypeId, inMsg, &metadata);

Review comment:
       I have changed this




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [celix] rbulter commented on a change in pull request #279: Feature/tcp admin msg segmentation

Posted by GitBox <gi...@apache.org>.
rbulter commented on a change in pull request #279:
URL: https://github.com/apache/celix/pull/279#discussion_r534420113



##########
File path: bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
##########
@@ -144,119 +146,108 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
     if (uuid != NULL) {
         uuid_parse(uuid, sender->fwUUID);
     }
-    sender->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_TCP_METRICS_ENABLED,
-                                                                   PSA_TCP_DEFAULT_METRICS_ENABLED);
-    bool isEndpoint = false;
+    pubsubInterceptorsHandler_create(ctx, scope, topic, &sender->interceptorsHandler);
+    sender->isPassive = false;
+    sender->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_TCP_METRICS_ENABLED, PSA_TCP_DEFAULT_METRICS_ENABLED);
     char *urls = NULL;
     const char *ip = celix_bundleContext_getProperty(ctx, PUBSUB_TCP_PSA_IP_KEY, NULL);
-    const char *discUrl = NULL;
-    const char *staticClientEndPointUrls = NULL;
-    const char *staticServerEndPointUrls = NULL;
-
-    discUrl = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_STATIC_BIND_URL_FOR, topic, scope);
+    const char *discUrl = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_STATIC_BIND_URL_FOR, topic, scope);
+    const char *isPassive = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_PASSIVE_ENABLED, topic, scope);
+    const char *passiveKey = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_PASSIVE_SELECTION_KEY, topic, scope);
 
     if (topicProperties != NULL) {
         if (discUrl == NULL) {
             discUrl = celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_DISCOVER_URL, NULL);
         }
-        /* Check if it's a static endpoint */
-        const char *endPointType = celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_ENDPOINT_TYPE, NULL);
-        if (endPointType != NULL) {
-            isEndpoint = true;
-            if (strncmp(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_CLIENT, endPointType,
-                        strlen(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_CLIENT)) == 0) {
-                staticClientEndPointUrls = celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_CONNECT_URLS, NULL);
-            }
-            if (strncmp(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_SERVER, endPointType,
-                        strlen(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_SERVER)) == 0) {
-                staticServerEndPointUrls = discUrl;
-            }
+        if (isPassive == NULL) {
+            isPassive = celix_properties_get(topicProperties, PUBSUB_TCP_PASSIVE_CONFIGURED, NULL);

Review comment:
       Solved




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [celix] Oipo commented on a change in pull request #279: Feature/tcp admin msg segmentation

Posted by GitBox <gi...@apache.org>.
Oipo commented on a change in pull request #279:
URL: https://github.com/apache/celix/pull/279#discussion_r544253608



##########
File path: bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
##########
@@ -559,31 +549,39 @@ processMsgForSubscriberEntry(pubsub_tcp_topic_receiver_t *receiver, psa_tcp_subs
             }
 
             if (status == CELIX_SUCCESS) {
-                hash_map_iterator_t iter = hashMapIterator_construct(entry->subscriberServices);
+                const char *msgType = msgSer->msgName;
+                uint32_t msgId = message->header.msgId;
+                celix_properties_t *metadata = message->metadata.metadata;
+                bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler, msgType, msgId, deSerializedMsg, &metadata);
                 bool release = true;
-                while (hashMapIterator_hasNext(&iter)) {
-                    pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter);
-                    svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deSerializedMsg, message->metadata.metadata,
-                                 &release);
-                    if (!release && hashMapIterator_hasNext(&iter)) {
-                        //receive function has taken ownership and still more receive function to come ..
-                        //deserialize again for new message
-                        status = msgSer->deserialize(msgSer->handle, &deSerializeBuffer, 1, &deSerializedMsg);
-                        if (status != CELIX_SUCCESS) {
-                            L_WARN("[PSA_TCP_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgSer->msgName,
-                                   receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
-                            break;
+                if (cont) {
+                    hash_map_iterator_t iter = hashMapIterator_construct(entry->subscriberServices);
+                    while (hashMapIterator_hasNext(&iter)) {
+                        pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter);
+                        svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deSerializedMsg, message->metadata.metadata, &release);
+                        pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgType, msgId, deSerializedMsg, metadata);

Review comment:
       @pnoltes the zmq admin also calls the post receive in this while loop. But the svc that has received the message is not passed to the interceptor, so what is the use?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org