You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by rb...@apache.org on 2020/10/06 19:04:48 UTC

[celix] 04/11: Add maxmsg

This is an automated email from the ASF dual-hosted git repository.

rbulter pushed a commit to branch feature/add_msg_segemenation_to_tcp_admin_with_wire_v2_add_make_non_blocking_v2
in repository https://gitbox.apache.org/repos/asf/celix.git

commit 78ed4319ee012645f2101e12bfafd0a2ccb65744
Author: Roy Bulter <ro...@gmail.com>
AuthorDate: Mon Jun 29 21:28:36 2020 +0200

    Add maxmsg
---
 bundles/pubsub/examples/CMakeLists.txt                         | 1 +
 bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h | 7 +++----
 bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h       | 1 +
 bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c  | 4 ++--
 4 files changed, 7 insertions(+), 6 deletions(-)

diff --git a/bundles/pubsub/examples/CMakeLists.txt b/bundles/pubsub/examples/CMakeLists.txt
index b224797..4f54917 100644
--- a/bundles/pubsub/examples/CMakeLists.txt
+++ b/bundles/pubsub/examples/CMakeLists.txt
@@ -157,6 +157,7 @@ if (BUILD_PUBSUB_PSA_TCP)
     )
     target_link_libraries(pubsub_subscriber2_tcp PRIVATE ${PUBSUB_CONTAINER_LIBS})
 
+    message("RB ${ETCD_CMD}" )
     if (ETCD_CMD AND XTERM_CMD)
         # Runtime starting a publish and subscriber for tcp
         add_celix_runtime(pubsub_rt_tcp
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h
index ff8e68f..3490f36 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h
@@ -23,7 +23,7 @@
 #define PSA_TCP_BASE_PORT                       "PSA_TCP_BASE_PORT"
 #define PSA_TCP_MAX_PORT                        "PSA_TCP_MAX_PORT"
 
-#define PSA_TCP_MAX_RECV_SESSIONS               "PSA_TCP_MAX_RECV_SESSIONS"
+#define PSA_TCP_MAX_MESSAGE_SIZE                "PSA_TCP_MAX_MESSAGE_SIZE"
 #define PSA_TCP_RECV_BUFFER_SIZE                "PSA_TCP_RECV_BUFFER_SIZE"
 #define PSA_TCP_TIMEOUT                         "PSA_TCP_TIMEOUT"
 #define PSA_TCP_SUBSCRIBER_CONNECTION_TIMEOUT   "PSA_TCP_SUBSCRIBER_CONNECTION_TIMEOUT"
@@ -31,8 +31,7 @@
 #define PSA_TCP_DEFAULT_BASE_PORT               5501
 #define PSA_TCP_DEFAULT_MAX_PORT                6000
 
-#define PSA_TCP_DEFAULT_MAX_RECV_SESSIONS       1
-
+#define PSA_TCP_DEFAULT_MAX_MESSAGE_SIZE        0
 #define PSA_TCP_DEFAULT_RECV_BUFFER_SIZE        65 * 1024
 #define PSA_TCP_DEFAULT_TIMEOUT                 2000 // 2 seconds
 #define PSA_TCP_SUBSCRIBER_CONNECTION_DEFAULT_TIMEOUT 250 // 250 ms
@@ -60,7 +59,7 @@
 
 //Time-out settings are only for BLOCKING connections
 #define PUBSUB_TCP_PUBLISHER_SNDTIMEO_KEY       "PUBSUB_TCP_PUBLISHER_SEND_TIMEOUT"
-#define PUBSUB_TCP_PUBLISHER_SNDTIMEO_DEFAULT   0.0 //5.0
+#define PUBSUB_TCP_PUBLISHER_SNDTIMEO_DEFAULT   5.0
 #define PUBSUB_TCP_PUBLISHER_SNDTIMEO_ENDPOINT_DEFAULT   0.0
 
 #define PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_KEY      "PUBSUB_TCP_SUBSCRIBER_RCV_TIMEOUT"
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h
index ed4581c..28df5b7 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h
@@ -62,6 +62,7 @@ int pubsub_tcpHandler_listen(pubsub_tcpHandler_t *handle, char *url);
 int pubsub_tcpHandler_createReceiveBufferStore(pubsub_tcpHandler_t *handle,
                                                unsigned int maxNofBuffers,
                                                unsigned int bufferSize);
+int pubsub_tcpHandler_setMaxMsgSize(pubsub_tcpHandler_t *handle, unsigned int size);
 void pubsub_tcpHandler_setTimeout(pubsub_tcpHandler_t *handle, unsigned int timeout);
 void pubsub_tcpHandler_setSendRetryCnt(pubsub_tcpHandler_t *handle, unsigned int count);
 void pubsub_tcpHandler_setReceiveRetryCnt(pubsub_tcpHandler_t *handle, unsigned int count);
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
index 47dc888..645cdac 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
@@ -63,9 +63,7 @@ struct pubsub_tcp_topic_sender {
     char *topic;
     char *url;
     bool isStatic;
-
     bool verbose;
-
     struct {
         long svcId;
         celix_service_factory_t factory;
@@ -196,10 +194,12 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
         double timeout = celix_properties_getAsDouble(topicProperties, PUBSUB_TCP_PUBLISHER_SNDTIMEO_KEY,
                                                                        (!isEndpoint) ? PUBSUB_TCP_PUBLISHER_SNDTIMEO_DEFAULT :
                                                                                        PUBSUB_TCP_PUBLISHER_SNDTIMEO_ENDPOINT_DEFAULT);
+        long maxMsgSize = celix_properties_getAsLong(topicProperties, PSA_TCP_MAX_MESSAGE_SIZE, PSA_TCP_DEFAULT_MAX_MESSAGE_SIZE);
         pubsub_tcpHandler_setThreadName(sender->socketHandler, topic, scope);
         pubsub_tcpHandler_setThreadPriority(sender->socketHandler, prio, sched);
         pubsub_tcpHandler_setSendRetryCnt(sender->socketHandler, (unsigned int) retryCnt);
         pubsub_tcpHandler_setSendTimeOut(sender->socketHandler, timeout);
+        pubsub_tcpHandler_setMaxMsgSize(sender->socketHandler, (unsigned int) maxMsgSize);
     }
 
     //setting up tcp socket for TCP TopicSender