You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by rb...@apache.org on 2020/10/06 19:04:48 UTC
[celix] 04/11: Add maxmsg
This is an automated email from the ASF dual-hosted git repository.
rbulter pushed a commit to branch feature/add_msg_segemenation_to_tcp_admin_with_wire_v2_add_make_non_blocking_v2
in repository https://gitbox.apache.org/repos/asf/celix.git
commit 78ed4319ee012645f2101e12bfafd0a2ccb65744
Author: Roy Bulter <ro...@gmail.com>
AuthorDate: Mon Jun 29 21:28:36 2020 +0200
Add maxmsg
---
bundles/pubsub/examples/CMakeLists.txt | 1 +
bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h | 7 +++----
bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h | 1 +
bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c | 4 ++--
4 files changed, 7 insertions(+), 6 deletions(-)
diff --git a/bundles/pubsub/examples/CMakeLists.txt b/bundles/pubsub/examples/CMakeLists.txt
index b224797..4f54917 100644
--- a/bundles/pubsub/examples/CMakeLists.txt
+++ b/bundles/pubsub/examples/CMakeLists.txt
@@ -157,6 +157,7 @@ if (BUILD_PUBSUB_PSA_TCP)
)
target_link_libraries(pubsub_subscriber2_tcp PRIVATE ${PUBSUB_CONTAINER_LIBS})
+ message("RB ${ETCD_CMD}" )
if (ETCD_CMD AND XTERM_CMD)
# Runtime starting a publish and subscriber for tcp
add_celix_runtime(pubsub_rt_tcp
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h
index ff8e68f..3490f36 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h
@@ -23,7 +23,7 @@
#define PSA_TCP_BASE_PORT "PSA_TCP_BASE_PORT"
#define PSA_TCP_MAX_PORT "PSA_TCP_MAX_PORT"
-#define PSA_TCP_MAX_RECV_SESSIONS "PSA_TCP_MAX_RECV_SESSIONS"
+#define PSA_TCP_MAX_MESSAGE_SIZE "PSA_TCP_MAX_MESSAGE_SIZE"
#define PSA_TCP_RECV_BUFFER_SIZE "PSA_TCP_RECV_BUFFER_SIZE"
#define PSA_TCP_TIMEOUT "PSA_TCP_TIMEOUT"
#define PSA_TCP_SUBSCRIBER_CONNECTION_TIMEOUT "PSA_TCP_SUBSCRIBER_CONNECTION_TIMEOUT"
@@ -31,8 +31,7 @@
#define PSA_TCP_DEFAULT_BASE_PORT 5501
#define PSA_TCP_DEFAULT_MAX_PORT 6000
-#define PSA_TCP_DEFAULT_MAX_RECV_SESSIONS 1
-
+#define PSA_TCP_DEFAULT_MAX_MESSAGE_SIZE 0
#define PSA_TCP_DEFAULT_RECV_BUFFER_SIZE 65 * 1024
#define PSA_TCP_DEFAULT_TIMEOUT 2000 // 2 seconds
#define PSA_TCP_SUBSCRIBER_CONNECTION_DEFAULT_TIMEOUT 250 // 250 ms
@@ -60,7 +59,7 @@
//Time-out settings are only for BLOCKING connections
#define PUBSUB_TCP_PUBLISHER_SNDTIMEO_KEY "PUBSUB_TCP_PUBLISHER_SEND_TIMEOUT"
-#define PUBSUB_TCP_PUBLISHER_SNDTIMEO_DEFAULT 0.0 //5.0
+#define PUBSUB_TCP_PUBLISHER_SNDTIMEO_DEFAULT 5.0
#define PUBSUB_TCP_PUBLISHER_SNDTIMEO_ENDPOINT_DEFAULT 0.0
#define PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_KEY "PUBSUB_TCP_SUBSCRIBER_RCV_TIMEOUT"
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h
index ed4581c..28df5b7 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h
@@ -62,6 +62,7 @@ int pubsub_tcpHandler_listen(pubsub_tcpHandler_t *handle, char *url);
int pubsub_tcpHandler_createReceiveBufferStore(pubsub_tcpHandler_t *handle,
unsigned int maxNofBuffers,
unsigned int bufferSize);
+int pubsub_tcpHandler_setMaxMsgSize(pubsub_tcpHandler_t *handle, unsigned int size);
void pubsub_tcpHandler_setTimeout(pubsub_tcpHandler_t *handle, unsigned int timeout);
void pubsub_tcpHandler_setSendRetryCnt(pubsub_tcpHandler_t *handle, unsigned int count);
void pubsub_tcpHandler_setReceiveRetryCnt(pubsub_tcpHandler_t *handle, unsigned int count);
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
index 47dc888..645cdac 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
@@ -63,9 +63,7 @@ struct pubsub_tcp_topic_sender {
char *topic;
char *url;
bool isStatic;
-
bool verbose;
-
struct {
long svcId;
celix_service_factory_t factory;
@@ -196,10 +194,12 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
double timeout = celix_properties_getAsDouble(topicProperties, PUBSUB_TCP_PUBLISHER_SNDTIMEO_KEY,
(!isEndpoint) ? PUBSUB_TCP_PUBLISHER_SNDTIMEO_DEFAULT :
PUBSUB_TCP_PUBLISHER_SNDTIMEO_ENDPOINT_DEFAULT);
+ long maxMsgSize = celix_properties_getAsLong(topicProperties, PSA_TCP_MAX_MESSAGE_SIZE, PSA_TCP_DEFAULT_MAX_MESSAGE_SIZE);
pubsub_tcpHandler_setThreadName(sender->socketHandler, topic, scope);
pubsub_tcpHandler_setThreadPriority(sender->socketHandler, prio, sched);
pubsub_tcpHandler_setSendRetryCnt(sender->socketHandler, (unsigned int) retryCnt);
pubsub_tcpHandler_setSendTimeOut(sender->socketHandler, timeout);
+ pubsub_tcpHandler_setMaxMsgSize(sender->socketHandler, (unsigned int) maxMsgSize);
}
//setting up tcp socket for TCP TopicSender