You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by rb...@apache.org on 2022/01/09 19:57:52 UTC
[celix] 01/01: Add pubsub udp admin
This is an automated email from the ASF dual-hosted git repository.
rbulter pushed a commit to branch feature/add_pubsub_udp
in repository https://gitbox.apache.org/repos/asf/celix.git
commit 372049a062dee046c975bb8d1c9988a267b6ef58
Author: Roy Bulter <ro...@gmail.com>
AuthorDate: Sun Jan 9 20:57:05 2022 +0100
Add pubsub udp admin
---
CMakeLists.txt | 2 +-
bundles/pubsub/CMakeLists.txt | 5 +
bundles/pubsub/README.md | 25 +-
bundles/pubsub/examples/CMakeLists.txt | 80 ++
bundles/pubsub/integration/CMakeLists.txt | 84 ++
.../integration/gtest/sut_endpoint_activator.c | 4 +-
.../pubsub/integration/meta_data/ping.properties | 6 +
.../pubsub/integration/meta_data/ping2.properties | 3 +
.../pubsub/integration/meta_data/ping3.properties | 2 +
.../pubsub/integration/meta_data/pong2.properties | 4 +
.../pubsub/integration/meta_data/pong3.properties | 4 +
bundles/pubsub/pubsub_admin_tcp/CMakeLists.txt | 2 -
bundles/pubsub/pubsub_admin_tcp/README.md | 112 +++
.../pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c | 29 +-
.../pubsub_admin_tcp/src/pubsub_tcp_common.c | 38 -
.../pubsub_admin_tcp/src/pubsub_tcp_common.h | 33 -
.../pubsub_admin_tcp/src/pubsub_tcp_handler.h | 92 --
.../src/pubsub_tcp_topic_receiver.c | 46 +-
.../src/pubsub_tcp_topic_receiver.h | 4 +-
.../pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c | 42 +-
.../pubsub_admin_tcp/src/pubsub_tcp_topic_sender.h | 4 +-
.../CMakeLists.txt | 26 +-
bundles/pubsub/pubsub_admin_udp/README.md | 149 ++++
.../pubsub/pubsub_admin_udp/src/psa_activator.c | 129 +++
.../src/pubsub_psa_udp_constants.h | 136 +++
.../src/pubsub_udp_admin.c} | 387 ++++----
.../pubsub/pubsub_admin_udp/src/pubsub_udp_admin.h | 88 ++
.../src/pubsub_udp_topic_receiver.c} | 341 ++++---
.../src/pubsub_udp_topic_receiver.h} | 34 +-
.../src/pubsub_udp_topic_sender.c} | 250 ++++--
.../src/pubsub_udp_topic_sender.h} | 33 +-
bundles/pubsub/pubsub_utils/CMakeLists.txt | 1 +
.../pubsub_utils/include/pubsub_skt_handler.h | 103 +++
.../pubsub/pubsub_utils/include/pubsub_utils_url.h | 5 +-
.../src/pubsub_skt_handler.c} | 983 ++++++++++++++-------
bundles/pubsub/pubsub_utils/src/pubsub_utils_url.c | 34 +-
36 files changed, 2346 insertions(+), 974 deletions(-)
diff --git a/CMakeLists.txt b/CMakeLists.txt
index deea4e5..a3e2cb8 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -33,7 +33,7 @@ IF (${CMAKE_MAJOR_VERSION}.${CMAKE_MINOR_VERSION} EQUAL 3.3 AND ${CMAKE_GENERATO
ENDIF()
# Options
-option(ENABLE_TESTING "Enables unit/bundle testing" FALSE)
+option(ENABLE_TESTING "Enables unit/bundle testing" TRUE)
if (ENABLE_TESTING)
find_package(GTest CONFIG QUIET)
if (NOT GTest_FOUND)
diff --git a/bundles/pubsub/CMakeLists.txt b/bundles/pubsub/CMakeLists.txt
index 39275c5..d0b157d 100644
--- a/bundles/pubsub/CMakeLists.txt
+++ b/bundles/pubsub/CMakeLists.txt
@@ -29,6 +29,11 @@ if (PUBSUB)
add_subdirectory(pubsub_admin_tcp)
endif (BUILD_PUBSUB_PSA_TCP)
+ option(BUILD_PUBSUB_PSA_UDP "Build UDP PubSub Admin" ON)
+ if (BUILD_PUBSUB_PSA_UDP)
+ add_subdirectory(pubsub_admin_udp)
+ endif (BUILD_PUBSUB_PSA_UDP)
+
option(BUILD_PUBSUB_PSA_UDP_MC "Build UDP MC PubSub Admin" ON)
if (BUILD_PUBSUB_PSA_UDP_MC)
add_subdirectory(pubsub_admin_udp_mc)
diff --git a/bundles/pubsub/README.md b/bundles/pubsub/README.md
index 744cfd9..797f5a5 100644
--- a/bundles/pubsub/README.md
+++ b/bundles/pubsub/README.md
@@ -39,7 +39,8 @@ The publisher/subscriber implementation supports sending of a single message and
## Getting started
The publisher/subscriber implementation contains 3 different PubSubAdmins for managing connections:
- * PubsubAdminUDP: This pubsub admin is using udp (multicast) linux sockets to setup a connection.
+ * PubsubAdminUDPMC: This pubsub admin is using udp (multicast) linux sockets to setup a connection.
+ * PubsubAdminUDP: This pubsub admin is using udp (unicast/multicast/broadcast) linux sockets to setup a connection.
* PubsubAdminTCP: This pubsub admin is using tcp linux sockets to setup a connection.
* PubsubAdminZMQ (LGPL License): This pubsub admin is using ZeroMQ and is disabled as default. This is a because the pubsub admin is using ZeroMQ which is licensed as LGPL ([View ZeroMQ License](https://github.com/zeromq/libzmq#license)).
@@ -60,6 +61,26 @@ The publisher/subscriber implementation contains 3 different PubSubAdmins for ma
Design information can be found at pubsub\_admin\_udp\_mc/README.md
+### Running PSA UDP
+
+1. Open a terminal
+1. Run `cd runtimes/pubsub/udp`
+1. Run `sh start.sh`
+
+### Properties PSA UDP
+
+Some properties can be set to configure the PSA-UDP. If not configured defaults will be used. These
+properties can be set in the config.properties file (<PROPERTY>=<VALUE> format)
+
+
+ PSA_IP The url address to be used by the UDP admin to publish its data. Default the first IP not on localhost
+ This can be hostname / IP address / IP address with postfix, e.g. 192.168.1.0/24
+ For Multicast use 224.100.0.0/24@192.168.1.0/24, the last digit of the multicast address will be
+ the same as the last digit of the interface.
+ Note when interface is not set, the last digit of the multicast address is generated.
+
+
+Detailed information can be found at pubsub_admin_udp/README.md
### Running PSA TCP
@@ -77,6 +98,8 @@ properties can be set in the config.properties file (<PROPERTY>=<VALUE> format)
This can be hostname / IP address / IP address with postfix, e.g. 192.168.1.0/24
+Detailed information can be found at pubsub_admin_tcp/README.md
+
### Running PSA ZMQ
For ZeroMQ without encryption, skip the steps 1-12 below
diff --git a/bundles/pubsub/examples/CMakeLists.txt b/bundles/pubsub/examples/CMakeLists.txt
index a34a28b..7e1d0e5 100644
--- a/bundles/pubsub/examples/CMakeLists.txt
+++ b/bundles/pubsub/examples/CMakeLists.txt
@@ -97,6 +97,86 @@ if (BUILD_PUBSUB_PSA_UDP_MC)
endif ()
endif()
+if (BUILD_PUBSUB_PSA_UDP)
+ # TCP
+ add_celix_container(pubsub_publisher_udp
+ GROUP pubsub
+ BUNDLES
+ Celix::log_admin
+ Celix::shell
+ Celix::shell_tui
+ Celix::celix_pubsub_serializer_json
+ Celix::celix_pubsub_discovery_etcd
+ Celix::celix_pubsub_topology_manager
+ Celix::celix_pubsub_admin_udp
+ Celix::celix_pubsub_protocol_wire_v2
+ celix_pubsub_poi_publisher
+ celix_pubsub_poi_publisher2
+ PROPERTIES
+ PSA_UDP_VERBOSE=true
+ PSA_IP=224.100.0.0/24@192.168.1.0/24
+ PUBSUB_ETCD_DISCOVERY_VERBOSE=true
+ PUBSUB_TOPOLOGY_MANAGER_VERBOSE=true
+ )
+ target_link_libraries(pubsub_publisher_udp PRIVATE ${PUBSUB_CONTAINER_LIBS})
+
+ add_celix_container(pubsub_subscriber_udp
+ GROUP pubsub
+ BUNDLES
+ Celix::log_admin
+ Celix::shell
+ Celix::shell_tui
+ Celix::celix_pubsub_serializer_json
+ Celix::celix_pubsub_discovery_etcd
+ Celix::celix_pubsub_topology_manager
+ Celix::celix_pubsub_admin_udp
+ Celix::celix_pubsub_protocol_wire_v2
+ celix_pubsub_poi_subscriber
+ PROPERTIES
+ PSA_UDP_VERBOSE=true
+ PSA_IP=224.100.0.0/24@192.168.1.0/24
+ PUBSUB_ETCD_DISCOVERY_VERBOSE=true
+ PUBSUB_TOPOLOGY_MANAGER_VERBOSE=true
+ )
+ target_link_libraries(pubsub_subscriber_udp PRIVATE ${PUBSUB_CONTAINER_LIBS})
+
+ add_celix_container(pubsub_subscriber2_udp
+ GROUP pubsub
+ BUNDLES
+ Celix::log_admin
+ Celix::shell
+ Celix::shell_tui
+ Celix::celix_pubsub_serializer_json
+ Celix::celix_pubsub_discovery_etcd
+ Celix::celix_pubsub_topology_manager
+ Celix::celix_pubsub_admin_udp
+ Celix::celix_pubsub_protocol_wire_v2
+ celix_pubsub_poi_subscriber
+ PROPERTIES
+ PSA_UDP_VERBOSE=true
+ PSA_IP=224.100.0.0/24@192.168.1.0/24
+ PUBSUB_ETCD_DISCOVERY_VERBOSE=true
+ PUBSUB_TOPOLOGY_MANAGER_VERBOSE=true
+ )
+ target_link_libraries(pubsub_subscriber2_udp PRIVATE ${PUBSUB_CONTAINER_LIBS})
+
+ if (ETCD_CMD AND XTERM_CMD)
+ # Runtime starting a publish and subscriber for tcp
+ add_celix_runtime(pubsub_rt_udp
+ NAME udp
+ GROUP pubsub
+ CONTAINERS
+ pubsub_publisher_udp
+ pubsub_subscriber_udp
+ pubsub_subscriber2_udp
+ COMMANDS
+ etcd
+ USE_TERM
+ )
+ endif ()
+endif()
+
+
if (BUILD_PUBSUB_PSA_TCP)
# TCP
add_celix_container(pubsub_publisher_tcp
diff --git a/bundles/pubsub/integration/CMakeLists.txt b/bundles/pubsub/integration/CMakeLists.txt
index cc675e1..9083582 100644
--- a/bundles/pubsub/integration/CMakeLists.txt
+++ b/bundles/pubsub/integration/CMakeLists.txt
@@ -188,6 +188,90 @@ if (BUILD_PUBSUB_PSA_UDP_MC)
#setup_target_for_coverage(pubsub_udpmc_tests SCAN_DIR ..)
endif()
+if (BUILD_PUBSUB_PSA_UDP)
+ add_celix_container(pubsub_udp_tests
+ USE_CONFIG #ensures that a config.properties will be created with the launch bundles.
+ LAUNCHER_SRC ${CMAKE_CURRENT_LIST_DIR}/gtest/PubSubIntegrationTestSuite.cc
+ DIR ${CMAKE_CURRENT_BINARY_DIR}
+ PROPERTIES
+ LOGHELPER_STDOUT_FALLBACK_INCLUDE_DEBUG=true
+ CELIX_LOGGING_DEFAULT_ACTIVE_LOG_LEVEL=trace
+ BUNDLES
+ Celix::celix_pubsub_serializer_json
+ Celix::celix_pubsub_protocol_wire_v2
+ Celix::celix_pubsub_topology_manager
+ Celix::celix_pubsub_admin_udp
+ Celix::shell
+ Celix::shell_tui
+ pubsub_sut
+ pubsub_tst
+ )
+ target_link_libraries(pubsub_udp_tests PRIVATE Celix::pubsub_api Jansson Celix::dfi GTest::gtest GTest::gtest_main)
+ target_include_directories(pubsub_udp_tests SYSTEM PRIVATE gtest)
+
+ add_celix_container(pstm_deadlock_udp_test
+ USE_CONFIG #ensures that a config.properties will be created with the launch bundles.
+ LAUNCHER_SRC ${CMAKE_CURRENT_LIST_DIR}/pstm_deadlock_test/test_runner.cc
+ DIR ${CMAKE_CURRENT_BINARY_DIR}
+ PROPERTIES
+ LOGHELPER_STDOUT_FALLBACK_INCLUDE_DEBUG=true
+ CELIX_LOGGING_DEFAULT_ACTIVE_LOG_LEVEL=trace
+ BUNDLES
+ Celix::celix_pubsub_serializer_json
+ Celix::celix_pubsub_protocol_wire_v2
+ Celix::celix_pubsub_topology_manager
+ Celix::celix_pubsub_admin_udp
+ Celix::shell
+ Celix::shell_tui
+ )
+ target_compile_definitions(pstm_deadlock_udp_test PRIVATE -DDEADLOCK_SUT_BUNDLE_FILE=\"${DEADLOCK_SUT_BUNDLE_FILE}\")
+ target_link_libraries(pstm_deadlock_udp_test PRIVATE Celix::pubsub_api Jansson Celix::dfi GTest::gtest GTest::gtest_main)
+ target_include_directories(pstm_deadlock_udp_test SYSTEM PRIVATE pstm_deadlock_udp_test)
+
+ #Note we do not link to bundles, as result (to ensure a bundle zip file is created) an dependency on the bundle is needed.
+ add_dependencies(pstm_deadlock_udp_test pubsub_deadlock_sut_bundle)
+
+ #Framework "bundle" has no cache dir. Default as "cache dir" the cwd is used.
+ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/meta_data/msg.descriptor ${CMAKE_CURRENT_BINARY_DIR}/pstm_deadlock_udp_gtest/META-INF/descriptors/msg.descriptor COPYONLY)
+ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/meta_data/deadlock.scope.properties ${CMAKE_CURRENT_BINARY_DIR}/pstm_deadlock_udp_gtest/META-INF/topics/pub/deadlock.properties COPYONLY)
+
+ add_test(NAME pstm_deadlock_udp_test COMMAND pstm_deadlock_udp_test WORKING_DIRECTORY $<TARGET_PROPERTY:pstm_deadlock_udp_test,CONTAINER_LOC>)
+ setup_target_for_coverage(pstm_deadlock_udp_test SCAN_DIR ..)
+
+ add_test(NAME pubsub_udp_tests COMMAND pubsub_udp_tests WORKING_DIRECTORY $<TARGET_PROPERTY:pubsub_udp_tests,CONTAINER_LOC>)
+ setup_target_for_coverage(pubsub_udp_tests SCAN_DIR ..)
+
+ add_celix_container(pubsub_udp_v2_endpoint_tests
+ USE_CONFIG #ensures that a config.properties will be created with the launch bundles.
+ LAUNCHER_SRC ${CMAKE_CURRENT_LIST_DIR}/gtest/PubSubEndpointIntegrationTestSuite.cc
+ DIR ${CMAKE_CURRENT_BINARY_DIR}
+ PROPERTIES
+ LOGHELPER_STDOUT_FALLBACK_INCLUDE_DEBUG=true
+ CELIX_LOGGING_DEFAULT_ACTIVE_LOG_LEVEL=trace
+ BUNDLES
+ Celix::shell
+ Celix::shell_tui
+ Celix::celix_pubsub_serializer_json
+ Celix::celix_pubsub_protocol_wire_v2
+ Celix::celix_pubsub_topology_manager
+ Celix::celix_pubsub_admin_udp
+ pubsub_endpoint_tst
+ pubsub_endpoint_sut
+ pubsub_loopback
+ pubsub_serializer
+ )
+ target_link_libraries(pubsub_udp_v2_endpoint_tests PRIVATE Celix::pubsub_api Celix::dfi GTest::gtest GTest::gtest_main)
+ target_include_directories(pubsub_udp_v2_endpoint_tests SYSTEM PRIVATE gtest)
+
+ #TCP Endpoint test is disabled because the test is not stable when running on Travis
+ if (ENABLE_PUBSUB_PSA_UDP_ENDPOINT_TEST)
+ add_test(NAME pubsub_udp_v2_endpoint_tests COMMAND pubsub_udp_v2_endpoint_tests WORKING_DIRECTORY $<TARGET_PROPERTY:pubsub_udp_v2_endpoint_tests,CONTAINER_LOC>)
+ setup_target_for_coverage(pubsub_udp_v2_endpoint_tests SCAN_DIR ..)
+ endif()
+
+
+endif()
+
if (BUILD_PUBSUB_PSA_TCP)
# TCP v2 tests
diff --git a/bundles/pubsub/integration/gtest/sut_endpoint_activator.c b/bundles/pubsub/integration/gtest/sut_endpoint_activator.c
index 53c7559..44bfafa 100644
--- a/bundles/pubsub/integration/gtest/sut_endpoint_activator.c
+++ b/bundles/pubsub/integration/gtest/sut_endpoint_activator.c
@@ -44,9 +44,9 @@ celix_status_t bnd_start(struct activator *act, celix_bundle_context_t *ctx) {
char filter[512];
bool useNegativeScopeFilter = celix_bundleContext_getPropertyAsBool(ctx, "CELIX_PUBSUB_TEST_USE_NEGATIVE_SCOPE_FILTER", true);
if (useNegativeScopeFilter) {
- snprintf(filter, 512, "(%s=%s)(!(scope=*))", PUBSUB_PUBLISHER_TOPIC, "ping");
+ snprintf(filter, 512, "(%s=%s)(!(scope=*))", PUBSUB_PUBLISHER_TOPIC, "ping2");
} else {
- snprintf(filter, 512, "(%s=%s)", PUBSUB_PUBLISHER_TOPIC, "ping");
+ snprintf(filter, 512, "(%s=%s)", PUBSUB_PUBLISHER_TOPIC, "ping2");
}
celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
opts.set = sut_pubSet;
diff --git a/bundles/pubsub/integration/meta_data/ping.properties b/bundles/pubsub/integration/meta_data/ping.properties
index 8114413..69019ae 100644
--- a/bundles/pubsub/integration/meta_data/ping.properties
+++ b/bundles/pubsub/integration/meta_data/ping.properties
@@ -18,6 +18,12 @@ zmq.static.bind.url=ipc:///tmp/pubsub-pingtest
zmq.static.connect.urls=ipc:///tmp/pubsub-pingtest
tcp.static.bind.url=tcp://localhost:9000
tcp.static.connect.urls=tcp://localhost:9000
+
+#udp.static.bind.url=udp://224.100.0.1:50678
+#udp.static.connect.urls=udp://224.100.0.1:50678
+udp.static.bind.url=udp://localhost:50679
+udp.static.connect.urls=udp://localhost:50679@localhost:50678
+
udpmc.static.bind.port=50678
udpmc.static.connect.socket_addresses=224.100.0.1:50678
websocket.static.connect.socket_addresses=127.0.0.1:58080
diff --git a/bundles/pubsub/integration/meta_data/ping2.properties b/bundles/pubsub/integration/meta_data/ping2.properties
index ff0dbed..2b7e11b 100644
--- a/bundles/pubsub/integration/meta_data/ping2.properties
+++ b/bundles/pubsub/integration/meta_data/ping2.properties
@@ -17,6 +17,9 @@
tcp.static.bind.url=tcp://localhost:9500
tcp.passive.key=tcp://localhost:9500
+udp.static.connect.urls=udp://localhost:9500
+udp.passive.key=udp://localhost:9500
+
#note only effective if run as root
thread.realtime.sched=SCHED_FIFO
thread.realtime.prio=50
diff --git a/bundles/pubsub/integration/meta_data/ping3.properties b/bundles/pubsub/integration/meta_data/ping3.properties
index 5571705..f4e87a7 100644
--- a/bundles/pubsub/integration/meta_data/ping3.properties
+++ b/bundles/pubsub/integration/meta_data/ping3.properties
@@ -16,6 +16,8 @@
# under the License.
tcp.passive.key=tcp://localhost
tcp.passive.configured=true
+udp.passive.key=udp://localhost
+udp.passive.configured=true
#note only effective if run as root
thread.realtime.sched=SCHED_FIFO
thread.realtime.prio=50
diff --git a/bundles/pubsub/integration/meta_data/pong2.properties b/bundles/pubsub/integration/meta_data/pong2.properties
index b95f3bc..50c82ac 100644
--- a/bundles/pubsub/integration/meta_data/pong2.properties
+++ b/bundles/pubsub/integration/meta_data/pong2.properties
@@ -17,6 +17,10 @@
tcp.static.connect.urls=tcp://localhost:9500
tcp.passive.key=tcp://localhost
+#udp.static.connect.urls=udp://localhost:9500
+udp.static.bind.url=udp://localhost:9500
+udp.passive.key=udp://localhost
+
#note only effective if run as root
thread.realtime.sched=SCHED_FIFO
thread.realtime.prio=50
diff --git a/bundles/pubsub/integration/meta_data/pong3.properties b/bundles/pubsub/integration/meta_data/pong3.properties
index cb64543..b4153d4 100644
--- a/bundles/pubsub/integration/meta_data/pong3.properties
+++ b/bundles/pubsub/integration/meta_data/pong3.properties
@@ -16,6 +16,10 @@
# under the License.
tcp.passive.key=tcp://localhost:9500
tcp.passive.configured=true
+
+udp.passive.key=udp://localhost:9500
+udp.passive.configured=true
+
#note only effective if run as root
thread.realtime.sched=SCHED_FIFO
thread.realtime.prio=50
diff --git a/bundles/pubsub/pubsub_admin_tcp/CMakeLists.txt b/bundles/pubsub/pubsub_admin_tcp/CMakeLists.txt
index c3e032c..b98a704 100644
--- a/bundles/pubsub/pubsub_admin_tcp/CMakeLists.txt
+++ b/bundles/pubsub/pubsub_admin_tcp/CMakeLists.txt
@@ -26,8 +26,6 @@ add_celix_bundle(celix_pubsub_admin_tcp
src/pubsub_tcp_admin.c
src/pubsub_tcp_topic_sender.c
src/pubsub_tcp_topic_receiver.c
- src/pubsub_tcp_handler.c
- src/pubsub_tcp_common.c
)
target_link_libraries(celix_pubsub_admin_tcp PRIVATE Celix::pubsub_spi Celix::pubsub_utils)
diff --git a/bundles/pubsub/pubsub_admin_tcp/README.md b/bundles/pubsub/pubsub_admin_tcp/README.md
new file mode 100644
index 0000000..128bf55
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_tcp/README.md
@@ -0,0 +1,112 @@
+---
+title: PSA TCP
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# PUBSUB-Admin TCP
+
+---
+
+## Description
+
+The scope of this description is the TCP PubSub admin.
+
+The TCP pubsub admin is used to transfer user data transparent via TCP.
+
+### IP Addresses
+
+To use TCP, 1 IP address is needed:
+This is the IP address that is bound to an (ethernet) interface (from the publisher).
+This IP address is defined with the `PSA_IP` property.
+For example: `PSA_IP=192.168.1.0/24`.
+Note the example uses CIDR notation, the CIDR notation specifies the IP range that is used
+by the pubsub admin for searching the network interfaces.
+
+Note the port will be automatically assigned, when none is specified.
+A fixed port can be assigned for example with: `PSA_IP=192.168.1.0/24:34000`.
+
+
+### Discovery
+
+When a publisher wants to publish a topic a TopicSender and publisher endpoint is created by the Pubsub Topology Manager.
+When a subscriber wants to subscribe to topic a TopicReceiver and subscriber endpoint is created by the Pubsub Topology Manager.
+The endpoints are published by the PubSubDiscovery within its topic in ETCD (i.e. udp://192.168.1.20:40123).
+A subscriber, interested in the topic, is informed by the TopologyManager that there is a new endpoint.
+The TopicReceiver at the subscriber side creates a listening socket based on this endpoint.
+Now a data-connection is created and data send by the publisher will be received by the subscriber.
+
+### Static endpoint
+
+TCP pubsub admin also be used as a static endpoint that can be used to communicate with external
+TCP publish/subsribers that don't use Discovery.
+---
+With the `tcp.static.bind.url` property, the static UDP bind can be configured.
+For TCP the publisher topic properties should configure the `udp.static.bind.url` property to configure TCP bind interface
+that accepts incomming connections.
+
+---
+With the `udp.static.connect.urls` property, the static TCP connections can be configured. (Note The urls are space separate)
+For TCP the subscriber topic properties should configure the `udp.static.connect.urls` property
+to configure TCP connections that subsribe to the data.
+
+---
+Note a special/dedicated protocol service implemenation can be used to communcate with external publisher/subscribers
+
+---
+
+## Passive Endpoints
+
+Each TCP pubsub publisher/subscriber can be comfigured as a passive endpoint.
+A passive endpoint reuses an existing socket of a publisher/subscriber using properties.
+This can be used to support full duplex socket connections.
+This can be used for both discovery and static end points.
+
+The topic `tcp.passive.key` property defines the unique key to select the socket.
+The `tcp.passive.key` property needs to be defined both in the topic properties of the topic connections that will be shared.
+and the topic properties of the 'passive' topic that will re-use the connection.
+The `tcp.passive.configured` topic property will indicate that is topic is passive and will re-use the connections
+indicated with the `tcp.passive.key` property
+
+For example: `tcp.passive.key=udp://localhost:9500` and `tcp.passive.configured=true`
+
+## IOVEC
+
+The TCP pubsub uses the socket function calls sendnsg / recvmsg with iovec for sending/receiving messages.
+When the serialisation service supports iovec serialisation.
+iovec serialisation can be used for high throughput and low latency serialisation with avoiding of copying
+of data. Because of iovec limitations the protocol service should support message segmentation.
+
+## Properties
+
+<table border="1">
+ <tr><th>Property</th><th>Description</th></tr>
+ <tr><td>PSA_IP</td><td>IP address that is used by the bundle</td></tr>
+ <tr><td>tcp.static.bind.url</td><td>The static interface url of the bind interface</td></tr>
+ <tr><td>tcp.static.connect.urls</td><td>a space seperated list with static connections urls</td></tr>
+ <tr><td>tcp.passive.key</td><td>key of the shared connection</td></tr>
+ <tr><td>tcp.passive.configured</td><td>Indicates if the connection is passive and reuses the connection</td></tr>
+ <tr><td>thread.realtime.prio</td><td>Configures the thread priority of the receive thread</td></tr>
+ <tr><td>thread.realtime.sched</td><td>Configures the thread scheduling type of the receive thread</td></tr>
+
+</table>
+
+---
+
+
+
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c
index 9806991..79e52fc 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c
@@ -25,7 +25,7 @@
#include <pubsub_matching.h>
#include "pubsub_utils.h"
#include "pubsub_tcp_admin.h"
-#include "pubsub_tcp_handler.h"
+#include "pubsub_skt_handler.h"
#include "pubsub_psa_tcp_constants.h"
#include "pubsub_tcp_topic_sender.h"
#include "pubsub_tcp_topic_receiver.h"
@@ -79,7 +79,7 @@ struct pubsub_tcp_admin {
hash_map_t *map; //key = pubsub message serialization marker svc id (long), pubsub_serialization_handler_t*.
} serializationHandlers;
- pubsub_tcp_endPointStore_t endpointStore;
+ pubsub_sktHandler_endPointStore_t endpointStore;
};
typedef struct psa_tcp_protocol_entry {
@@ -144,8 +144,8 @@ void pubsub_tcpAdmin_destroy(pubsub_tcp_admin_t *psa) {
celixThreadMutex_lock(&psa->endpointStore.mutex);
hash_map_iterator_t iter = hashMapIterator_construct(psa->endpointStore.map);
while (hashMapIterator_hasNext(&iter)) {
- pubsub_tcpHandler_t *tcpHandler = hashMapIterator_nextValue(&iter);
- pubsub_tcpHandler_destroy(tcpHandler);
+ pubsub_sktHandler_t *tcpHandler = hashMapIterator_nextValue(&iter);
+ pubsub_sktHandler_destroy(tcpHandler);
}
celixThreadMutex_unlock(&psa->endpointStore.mutex);
@@ -219,7 +219,7 @@ void pubsub_tcpAdmin_addProtocolSvc(void *handle, void *svc, const celix_propert
long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1L);
if (protType == NULL) {
- L_INFO("[PSA_tcp] Ignoring protocol service without %s property", PUBSUB_PROTOCOL_TYPE_KEY);
+ L_INFO("[PSA_TCP] Ignoring protocol service without %s property", PUBSUB_PROTOCOL_TYPE_KEY);
return;
}
@@ -486,15 +486,18 @@ celix_status_t pubsub_tcpAdmin_setupTopicReceiver(void *handle, const char *scop
celixThreadMutex_unlock(&psa->protocols.mutex);
if (receiver != NULL && newEndpoint != NULL) {
- celixThreadMutex_lock(&psa->discoveredEndpoints.mutex);
- hash_map_iterator_t iter = hashMapIterator_construct(psa->discoveredEndpoints.map);
- while (hashMapIterator_hasNext(&iter)) {
- celix_properties_t *endpoint = hashMapIterator_nextValue(&iter);
- if (pubsub_tcpAdmin_endpointIsPublisher(endpoint) && pubsubEndpoint_matchWithTopicAndScope(endpoint, topic, scope)) {
- pubsub_tcpAdmin_connectEndpointToReceiver(psa, receiver, endpoint);
- }
+ if (pubsub_tcpAdmin_endpointIsPublisher(newEndpoint) && pubsubEndpoint_matchWithTopicAndScope(newEndpoint, topic, scope)) {
+ pubsub_tcpAdmin_connectEndpointToReceiver(psa, receiver, newEndpoint);
}
- celixThreadMutex_unlock(&psa->discoveredEndpoints.mutex);
+ //celixThreadMutex_lock(&psa->discoveredEndpoints.mutex);
+ //hash_map_iterator_t iter = hashMapIterator_construct(psa->discoveredEndpoints.map);
+ //while (hashMapIterator_hasNext(&iter)) {
+ // celix_properties_t *endpoint = hashMapIterator_nextValue(&iter);
+ // if (pubsub_tcpAdmin_endpointIsPublisher(endpoint) && pubsubEndpoint_matchWithTopicAndScope(endpoint, topic, scope)) {
+ // pubsub_tcpAdmin_connectEndpointToReceiver(psa, receiver, endpoint);
+ // }
+ //}
+ //celixThreadMutex_unlock(&psa->discoveredEndpoints.mutex);
}
if (newEndpoint != NULL && outSubscriberEndpoint != NULL) {
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_common.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_common.c
deleted file mode 100644
index d8f05f7..0000000
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_common.c
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-#include <stdio.h>
-#include <string.h>
-#include "pubsub_tcp_common.h"
-
-
-bool psa_tcp_isPassive(const char* buffer) {
- bool isPassive = false;
- // Parse Properties
- if (buffer != NULL) {
- char buf[32];
- snprintf(buf, 32, "%s", buffer);
- char *trimmed = utils_stringTrim(buf);
- if (strncasecmp("true", trimmed, strlen("true")) == 0) {
- isPassive = true;
- } else if (strncasecmp("false", trimmed, strlen("false")) == 0) {
- isPassive = false;
- }
- }
- return isPassive;
-}
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_common.h b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_common.h
deleted file mode 100644
index 9ea31db..0000000
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_common.h
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#ifndef CELIX_PUBSUB_TCP_COMMON_H
-#define CELIX_PUBSUB_TCP_COMMON_H
-
-#include <utils.h>
-#include <hash_map.h>
-
-typedef struct pubsub_tcp_endPointStore {
- celix_thread_mutex_t mutex;
- hash_map_t *map;
-} pubsub_tcp_endPointStore_t;
-
-bool psa_tcp_isPassive(const char* buffer);
-
-#endif //CELIX_PUBSUB_TCP_COMMON_H
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h
deleted file mode 100644
index 2d97634..0000000
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-/*
- * pubsub_tcp_handler.h
- *
- * \date July 18, 2016
- * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- * \copyright Apache License, Version 2.0
- */
-
-#ifndef _PUBSUB_TCP_BUFFER_HANDLER_H_
-#define _PUBSUB_TCP_BUFFER_HANDLER_H_
-
-#include <stdbool.h>
-#include <stdlib.h>
-#include <sys/types.h>
-#include <sys/socket.h>
-#include <netinet/in.h>
-#include <celix_log_helper.h>
-#include "celix_threads.h"
-#include "pubsub_utils_url.h"
-#include <pubsub_protocol.h>
-
-#ifndef MIN
-#define MIN(a, b) ((a<b) ? (a) : (b))
-#endif
-
-#ifndef MAX
-#define MAX(a, b) ((a>b) ? (a) : (b))
-#endif
-
-typedef struct pubsub_tcpHandler pubsub_tcpHandler_t;
-typedef void(*pubsub_tcpHandler_processMessage_callback_t)
- (void *payload, const pubsub_protocol_message_t *header, bool *release, struct timespec *receiveTime);
-typedef void (*pubsub_tcpHandler_receiverConnectMessage_callback_t)(void *payload, const char *url, bool lock);
-typedef void (*pubsub_tcpHandler_acceptConnectMessage_callback_t)(void *payload, const char *url);
-
-pubsub_tcpHandler_t *pubsub_tcpHandler_create(pubsub_protocol_service_t *protocol, celix_log_helper_t *logHelper);
-void pubsub_tcpHandler_destroy(pubsub_tcpHandler_t *handle);
-int pubsub_tcpHandler_open(pubsub_tcpHandler_t *handle, char *url);
-int pubsub_tcpHandler_close(pubsub_tcpHandler_t *handle, int fd);
-int pubsub_tcpHandler_connect(pubsub_tcpHandler_t *handle, char *url);
-int pubsub_tcpHandler_disconnect(pubsub_tcpHandler_t *handle, char *url);
-int pubsub_tcpHandler_listen(pubsub_tcpHandler_t *handle, char *url);
-int pubsub_tcpHandler_setReceiveBufferSize(pubsub_tcpHandler_t *handle, unsigned int size);
-int pubsub_tcpHandler_setMaxMsgSize(pubsub_tcpHandler_t *handle, unsigned int size);
-void pubsub_tcpHandler_setTimeout(pubsub_tcpHandler_t *handle, unsigned int timeout);
-void pubsub_tcpHandler_setSendRetryCnt(pubsub_tcpHandler_t *handle, unsigned int count);
-void pubsub_tcpHandler_setReceiveRetryCnt(pubsub_tcpHandler_t *handle, unsigned int count);
-void pubsub_tcpHandler_setSendTimeOut(pubsub_tcpHandler_t *handle, double timeout);
-void pubsub_tcpHandler_setReceiveTimeOut(pubsub_tcpHandler_t *handle, double timeout);
-void pubsub_tcpHandler_enableReceiveEvent(pubsub_tcpHandler_t *handle, bool enable);
-
-int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd);
-int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle,
- pubsub_protocol_message_t *message,
- struct iovec *msg_iovec,
- size_t msg_iov_len,
- int flags);
-int pubsub_tcpHandler_addMessageHandler(pubsub_tcpHandler_t *handle,
- void *payload,
- pubsub_tcpHandler_processMessage_callback_t processMessageCallback);
-int pubsub_tcpHandler_addReceiverConnectionCallback(pubsub_tcpHandler_t *handle,
- void *payload,
- pubsub_tcpHandler_receiverConnectMessage_callback_t connectMessageCallback,
- pubsub_tcpHandler_receiverConnectMessage_callback_t disconnectMessageCallback);
-int pubsub_tcpHandler_addAcceptConnectionCallback(pubsub_tcpHandler_t *handle,
- void *payload,
- pubsub_tcpHandler_acceptConnectMessage_callback_t connectMessageCallback,
- pubsub_tcpHandler_acceptConnectMessage_callback_t disconnectMessageCallback);
-char *pubsub_tcpHandler_get_interface_url(pubsub_tcpHandler_t *handle);
-char *pubsub_tcpHandler_get_connection_url(pubsub_tcpHandler_t *handle);
-void pubsub_tcpHandler_setThreadPriority(pubsub_tcpHandler_t *handle, long prio, const char *sched);
-void pubsub_tcpHandler_setThreadName(pubsub_tcpHandler_t *handle, const char *topic, const char *scope);
-
-#endif /* _PUBSUB_TCP_BUFFER_HANDLER_H_ */
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
index 597e4ff..d64c79c 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
@@ -25,11 +25,9 @@
#include <memory.h>
#include <arpa/inet.h>
#include <celix_log_helper.h>
-#include "pubsub_tcp_handler.h"
+#include "pubsub_skt_handler.h"
#include "pubsub_tcp_topic_receiver.h"
#include "pubsub_psa_tcp_constants.h"
-#include "pubsub_tcp_common.h"
-#include "pubsub_tcp_admin.h"
#include <uuid/uuid.h>
#include <pubsub_utils.h>
@@ -62,8 +60,8 @@ struct pubsub_tcp_topic_receiver {
void *admin;
size_t timeout;
bool isPassive;
- pubsub_tcpHandler_t *socketHandler;
- pubsub_tcpHandler_t *sharedSocketHandler;
+ pubsub_sktHandler_t *socketHandler;
+ pubsub_sktHandler_t *sharedSocketHandler;
pubsub_interceptors_handler_t *interceptorsHandler;
struct {
@@ -114,7 +112,7 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context
pubsub_serializer_handler_t* serializerHandler,
void *admin,
const celix_properties_t *topicProperties,
- pubsub_tcp_endPointStore_t *handlerStore,
+ pubsub_sktHandler_endPointStore_t *handlerStore,
long protocolSvcId,
pubsub_protocol_service_t *protocol) {
pubsub_tcp_topic_receiver_t *receiver = calloc(1, sizeof(*receiver));
@@ -133,7 +131,7 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context
const char *passiveKey = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_PASSIVE_SELECTION_KEY, topic, scope);
if (isPassive) {
- receiver->isPassive = psa_tcp_isPassive(isPassive);
+ receiver->isPassive = pubsub_sktHandler_isPassive(isPassive);
}
if (topicProperties != NULL) {
if(staticConnectUrls == NULL) {
@@ -154,10 +152,10 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context
/* When it's an endpoint share the socket with the sender */
if (passiveKey != NULL) {
celixThreadMutex_lock(&handlerStore->mutex);
- pubsub_tcpHandler_t *entry = hashMap_get(handlerStore->map, passiveKey);
+ pubsub_sktHandler_t *entry = hashMap_get(handlerStore->map, passiveKey);
if (entry == NULL) {
if (receiver->socketHandler == NULL)
- receiver->socketHandler = pubsub_tcpHandler_create(receiver->protocol, receiver->logHelper);
+ receiver->socketHandler = pubsub_sktHandler_create(receiver->protocol, receiver->logHelper);
entry = receiver->socketHandler;
receiver->sharedSocketHandler = receiver->socketHandler;
hashMap_put(handlerStore->map, (void *) passiveKey, entry);
@@ -167,7 +165,7 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context
}
celixThreadMutex_unlock(&handlerStore->mutex);
} else {
- receiver->socketHandler = pubsub_tcpHandler_create(receiver->protocol, receiver->logHelper);
+ receiver->socketHandler = pubsub_sktHandler_create(receiver->protocol, receiver->logHelper);
}
if (receiver->socketHandler != NULL) {
@@ -180,15 +178,15 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context
PSA_TCP_DEFAULT_RECV_BUFFER_SIZE);
long timeout = celix_bundleContext_getPropertyAsLong(ctx, PSA_TCP_TIMEOUT, PSA_TCP_DEFAULT_TIMEOUT);
- pubsub_tcpHandler_setThreadName(receiver->socketHandler, topic, scope);
- pubsub_tcpHandler_setReceiveBufferSize(receiver->socketHandler, (unsigned int) bufferSize);
- pubsub_tcpHandler_setTimeout(receiver->socketHandler, (unsigned int) timeout);
- pubsub_tcpHandler_addMessageHandler(receiver->socketHandler, receiver, processMsg);
- pubsub_tcpHandler_addReceiverConnectionCallback(receiver->socketHandler, receiver, psa_tcp_connectHandler,
+ pubsub_sktHandler_setThreadName(receiver->socketHandler, topic, scope);
+ pubsub_sktHandler_setReceiveBufferSize(receiver->socketHandler, (unsigned int) bufferSize);
+ pubsub_sktHandler_setTimeout(receiver->socketHandler, (unsigned int) timeout);
+ pubsub_sktHandler_addMessageHandler(receiver->socketHandler, receiver, processMsg);
+ pubsub_sktHandler_addReceiverConnectionCallback(receiver->socketHandler, receiver, psa_tcp_connectHandler,
psa_tcp_disConnectHandler);
- pubsub_tcpHandler_setThreadPriority(receiver->socketHandler, prio, sched);
- pubsub_tcpHandler_setReceiveRetryCnt(receiver->socketHandler, (unsigned int) retryCnt);
- pubsub_tcpHandler_setReceiveTimeOut(receiver->socketHandler, rcvTimeout);
+ pubsub_sktHandler_setThreadPriority(receiver->socketHandler, prio, sched);
+ pubsub_sktHandler_setReceiveRetryCnt(receiver->socketHandler, (unsigned int) retryCnt);
+ pubsub_sktHandler_setReceiveTimeOut(receiver->socketHandler, rcvTimeout);
}
celixThreadMutex_create(&receiver->subscribers.mutex, NULL);
celixThreadMutex_create(&receiver->requestedConnections.mutex, NULL);
@@ -281,10 +279,10 @@ void pubsub_tcpTopicReceiver_destroy(pubsub_tcp_topic_receiver_t *receiver) {
celixThreadMutex_destroy(&receiver->requestedConnections.mutex);
celixThreadMutex_destroy(&receiver->thread.mutex);
- pubsub_tcpHandler_addMessageHandler(receiver->socketHandler, NULL, NULL);
- pubsub_tcpHandler_addReceiverConnectionCallback(receiver->socketHandler, NULL, NULL, NULL);
+ pubsub_sktHandler_addMessageHandler(receiver->socketHandler, NULL, NULL);
+ pubsub_sktHandler_addReceiverConnectionCallback(receiver->socketHandler, NULL, NULL, NULL);
if ((receiver->socketHandler) && (receiver->sharedSocketHandler == NULL)) {
- pubsub_tcpHandler_destroy(receiver->socketHandler);
+ pubsub_sktHandler_destroy(receiver->socketHandler);
receiver->socketHandler = NULL;
}
pubsubInterceptorsHandler_destroy(receiver->interceptorsHandler);
@@ -316,7 +314,7 @@ void pubsub_tcpTopicReceiver_listConnections(pubsub_tcp_topic_receiver_t *receiv
celix_array_list_t *unconnectedUrls) {
celixThreadMutex_lock(&receiver->requestedConnections.mutex);
if (receiver->isPassive) {
- char* interface_url = pubsub_tcpHandler_get_interface_url(receiver->socketHandler);
+ char* interface_url = pubsub_sktHandler_get_interface_url(receiver->socketHandler);
char *url = NULL;
asprintf(&url, "%s (passive)", interface_url ? interface_url : "");
if (interface_url) {
@@ -378,7 +376,7 @@ void pubsub_tcpTopicReceiver_disconnectFrom(pubsub_tcp_topic_receiver_t *receive
celixThreadMutex_lock(&receiver->requestedConnections.mutex);
psa_tcp_requested_connection_entry_t *entry = hashMap_remove(receiver->requestedConnections.map, url);
if (entry != NULL) {
- int rc = pubsub_tcpHandler_disconnect(receiver->socketHandler, entry->url);
+ int rc = pubsub_sktHandler_disconnect(receiver->socketHandler, entry->url);
if (rc < 0)
L_WARN("[PSA_TCP] Error disconnecting from tcp url %s. (%s)", url, strerror(errno));
}
@@ -582,7 +580,7 @@ static void psa_tcp_connectToAllRequestedConnections(pubsub_tcp_topic_receiver_t
while (hashMapIterator_hasNext(&iter)) {
psa_tcp_requested_connection_entry_t *entry = hashMapIterator_nextValue(&iter);
if ((entry) && (!entry->connected) && (!receiver->isPassive)) {
- int rc = pubsub_tcpHandler_connect(entry->parent->socketHandler, entry->url);
+ int rc = pubsub_sktHandler_tcp_connect(entry->parent->socketHandler, entry->url);
if (rc < 0) {
allConnected = false;
}
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.h b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.h
index 35c14c6..9fbd59a 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.h
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.h
@@ -23,7 +23,7 @@
#include "pubsub_admin_metrics.h"
#include "celix_bundle_context.h"
#include "pubsub_protocol.h"
-#include "pubsub_tcp_common.h"
+#include "pubsub_skt_handler.h"
#include "pubsub_serializer_handler.h"
typedef struct pubsub_tcp_topic_receiver pubsub_tcp_topic_receiver_t;
@@ -35,7 +35,7 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context
pubsub_serializer_handler_t* serializerHandler,
void *admin,
const celix_properties_t *topicProperties,
- pubsub_tcp_endPointStore_t *handlerStore,
+ pubsub_sktHandler_endPointStore_t *handlerStore,
long protocolSvcId,
pubsub_protocol_service_t *protocol);
void pubsub_tcpTopicReceiver_destroy(pubsub_tcp_topic_receiver_t *receiver);
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
index e318829..cdcdb8f 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
@@ -29,13 +29,11 @@
#include <celix_log_helper.h>
#include "pubsub_psa_tcp_constants.h"
#include "pubsub_tcp_topic_sender.h"
-#include "pubsub_tcp_handler.h"
-#include "pubsub_tcp_common.h"
+#include "pubsub_skt_handler.h"
#include <uuid/uuid.h>
#include "celix_constants.h"
#include <pubsub_utils.h>
#include "pubsub_interceptors_handler.h"
-#include "pubsub_tcp_admin.h"
#define TCP_BIND_MAX_RETRY 10
@@ -54,8 +52,8 @@ struct pubsub_tcp_topic_sender {
long protocolSvcId;
pubsub_protocol_service_t *protocol;
uuid_t fwUUID;
- pubsub_tcpHandler_t *socketHandler;
- pubsub_tcpHandler_t *sharedSocketHandler;
+ pubsub_sktHandler_t *socketHandler;
+ pubsub_sktHandler_t *sharedSocketHandler;
pubsub_interceptors_handler_t *interceptorsHandler;
pubsub_serializer_handler_t* serializerHandler;
@@ -108,7 +106,7 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
pubsub_serializer_handler_t* serializerHandler,
void *admin,
const celix_properties_t *topicProperties,
- pubsub_tcp_endPointStore_t *handlerStore,
+ pubsub_sktHandler_endPointStore_t *handlerStore,
long protocolSvcId,
pubsub_protocol_service_t *protocol) {
pubsub_tcp_topic_sender_t *sender = calloc(1, sizeof(*sender));
@@ -132,7 +130,7 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
const char *passiveKey = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_PASSIVE_SELECTION_KEY, topic, scope);
if (isPassive) {
- sender->isPassive = psa_tcp_isPassive(isPassive);
+ sender->isPassive = pubsub_sktHandler_isPassive(isPassive);
}
if (topicProperties != NULL) {
if (discUrl == NULL) {
@@ -148,10 +146,10 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
/* When it's an endpoint share the socket with the receiver */
if (passiveKey != NULL) {
celixThreadMutex_lock(&handlerStore->mutex);
- pubsub_tcpHandler_t *entry = hashMap_get(handlerStore->map, passiveKey);
+ pubsub_sktHandler_t *entry = hashMap_get(handlerStore->map, passiveKey);
if (entry == NULL) {
if (sender->socketHandler == NULL)
- sender->socketHandler = pubsub_tcpHandler_create(sender->protocol, sender->logHelper);
+ sender->socketHandler = pubsub_sktHandler_create(sender->protocol, sender->logHelper);
entry = sender->socketHandler;
sender->sharedSocketHandler = sender->socketHandler;
hashMap_put(handlerStore->map, (void *) passiveKey, entry);
@@ -161,7 +159,7 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
}
celixThreadMutex_unlock(&handlerStore->mutex);
} else {
- sender->socketHandler = pubsub_tcpHandler_create(sender->protocol, sender->logHelper);
+ sender->socketHandler = pubsub_sktHandler_create(sender->protocol, sender->logHelper);
}
if ((sender->socketHandler != NULL) && (topicProperties != NULL)) {
@@ -172,15 +170,15 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
long maxMsgSize = celix_properties_getAsLong(topicProperties, PSA_TCP_MAX_MESSAGE_SIZE, PSA_TCP_DEFAULT_MAX_MESSAGE_SIZE);
long timeout = celix_bundleContext_getPropertyAsLong(ctx, PSA_TCP_TIMEOUT, PSA_TCP_DEFAULT_TIMEOUT);
sender->send_delay = celix_bundleContext_getPropertyAsLong(ctx, PUBSUB_UTILS_PSA_SEND_DELAY, PUBSUB_UTILS_PSA_DEFAULT_SEND_DELAY);
- pubsub_tcpHandler_setThreadName(sender->socketHandler, topic, scope);
- pubsub_tcpHandler_setThreadPriority(sender->socketHandler, prio, sched);
- pubsub_tcpHandler_setSendRetryCnt(sender->socketHandler, (unsigned int) retryCnt);
- pubsub_tcpHandler_setSendTimeOut(sender->socketHandler, sendTimeout);
- pubsub_tcpHandler_setMaxMsgSize(sender->socketHandler, (unsigned int) maxMsgSize);
+ pubsub_sktHandler_setThreadName(sender->socketHandler, topic, scope);
+ pubsub_sktHandler_setThreadPriority(sender->socketHandler, prio, sched);
+ pubsub_sktHandler_setSendRetryCnt(sender->socketHandler, (unsigned int) retryCnt);
+ pubsub_sktHandler_setSendTimeOut(sender->socketHandler, sendTimeout);
+ pubsub_sktHandler_setMaxMsgSize(sender->socketHandler, (unsigned int) maxMsgSize);
// When passiveKey is specified, enable receive event for full-duplex connection using key.
// Because the topic receiver is already started, enable the receive event.
- pubsub_tcpHandler_enableReceiveEvent(sender->socketHandler, (passiveKey) ? true : false);
- pubsub_tcpHandler_setTimeout(sender->socketHandler, (unsigned int) timeout);
+ pubsub_sktHandler_enableReceiveEvent(sender->socketHandler, (passiveKey) ? true : false);
+ pubsub_sktHandler_setTimeout(sender->socketHandler, (unsigned int) timeout);
}
if (!sender->isPassive) {
@@ -203,7 +201,7 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
int retry = 0;
while (url && retry < TCP_BIND_MAX_RETRY) {
pubsub_utils_url_t *urlInfo = pubsub_utils_url_parse(url);
- int rc = pubsub_tcpHandler_listen(sender->socketHandler, urlInfo->url);
+ int rc = pubsub_sktHandler_tcp_listen(sender->socketHandler, urlInfo->url);
if (rc < 0) {
L_WARN("Error for tcp_bind using dynamic bind url '%s'. %s", urlInfo->url, strerror(errno));
} else {
@@ -214,7 +212,7 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
}
}
free(urlsCopy);
- sender->url = pubsub_tcpHandler_get_interface_url(sender->socketHandler);
+ sender->url = pubsub_sktHandler_get_interface_url(sender->socketHandler);
}
free(urls);
}
@@ -269,7 +267,7 @@ void pubsub_tcpTopicSender_destroy(pubsub_tcp_topic_sender_t *sender) {
pubsubInterceptorsHandler_destroy(sender->interceptorsHandler);
if ((sender->socketHandler) && (sender->sharedSocketHandler == NULL)) {
- pubsub_tcpHandler_destroy(sender->socketHandler);
+ pubsub_sktHandler_destroy(sender->socketHandler);
sender->socketHandler = NULL;
}
@@ -300,7 +298,7 @@ const char* pubsub_tcpTopicSender_serializerType(pubsub_tcp_topic_sender_t *send
const char *pubsub_tcpTopicSender_url(pubsub_tcp_topic_sender_t *sender) {
if (sender->isPassive) {
- return pubsub_tcpHandler_get_connection_url(sender->socketHandler);
+ return pubsub_sktHandler_get_connection_url(sender->socketHandler);
} else {
return sender->url;
}
@@ -411,7 +409,7 @@ psa_tcp_topicPublicationSend(void *handle, unsigned int msgTypeId, const void *i
}
bool sendOk = true;
{
- int rc = pubsub_tcpHandler_write(sender->socketHandler, &message, serializedIoVecOutput, serializedIoVecOutputLen, 0);
+ int rc = pubsub_sktHandler_write(sender->socketHandler, &message, serializedIoVecOutput, serializedIoVecOutputLen, 0);
if (rc < 0) {
status = -1;
sendOk = false;
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.h b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.h
index 57b13a6..1a4962e 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.h
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.h
@@ -23,7 +23,7 @@
#include "celix_bundle_context.h"
#include "pubsub_admin_metrics.h"
#include "pubsub_protocol.h"
-#include "pubsub_tcp_common.h"
+#include "pubsub_skt_handler.h"
#include "pubsub_serializer_handler.h"
typedef struct pubsub_tcp_topic_sender pubsub_tcp_topic_sender_t;
@@ -36,7 +36,7 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
pubsub_serializer_handler_t* serializerHandler,
void *admin,
const celix_properties_t *topicProperties,
- pubsub_tcp_endPointStore_t *handlerStore,
+ pubsub_sktHandler_endPointStore_t *handlerStore,
long protocolSvcId,
pubsub_protocol_service_t *prot);
diff --git a/bundles/pubsub/pubsub_admin_tcp/CMakeLists.txt b/bundles/pubsub/pubsub_admin_udp/CMakeLists.txt
similarity index 59%
copy from bundles/pubsub/pubsub_admin_tcp/CMakeLists.txt
copy to bundles/pubsub/pubsub_admin_udp/CMakeLists.txt
index c3e032c..fba7211 100644
--- a/bundles/pubsub/pubsub_admin_tcp/CMakeLists.txt
+++ b/bundles/pubsub/pubsub_admin_udp/CMakeLists.txt
@@ -17,27 +17,25 @@
find_package(UUID REQUIRED)
-add_celix_bundle(celix_pubsub_admin_tcp
- BUNDLE_SYMBOLICNAME "apache_celix_pubsub_admin_tcp"
+add_celix_bundle(celix_pubsub_admin_udp
+ BUNDLE_SYMBOLICNAME "apache_celix_pubsub_admin_udp"
VERSION "2.0.0"
GROUP "Celix/PubSub"
SOURCES
src/psa_activator.c
- src/pubsub_tcp_admin.c
- src/pubsub_tcp_topic_sender.c
- src/pubsub_tcp_topic_receiver.c
- src/pubsub_tcp_handler.c
- src/pubsub_tcp_common.c
+ src/pubsub_udp_admin.c
+ src/pubsub_udp_topic_sender.c
+ src/pubsub_udp_topic_receiver.c
)
-target_link_libraries(celix_pubsub_admin_tcp PRIVATE Celix::pubsub_spi Celix::pubsub_utils)
-target_link_libraries(celix_pubsub_admin_tcp PRIVATE Celix::framework Celix::log_helper)
-target_link_libraries(celix_pubsub_admin_tcp PRIVATE Celix::shell_api)
-target_include_directories(celix_pubsub_admin_tcp PRIVATE src)
+target_link_libraries(celix_pubsub_admin_udp PRIVATE Celix::pubsub_spi Celix::pubsub_utils)
+target_link_libraries(celix_pubsub_admin_udp PRIVATE Celix::framework Celix::log_helper)
+target_link_libraries(celix_pubsub_admin_udp PRIVATE Celix::shell_api)
+target_include_directories(celix_pubsub_admin_udp PRIVATE src)
# cmake find package UUID set the wrong include dir for OSX
if (NOT APPLE)
- target_link_libraries(celix_pubsub_admin_tcp PRIVATE UUID::lib)
+ target_link_libraries(celix_pubsub_admin_udp PRIVATE UUID::lib)
endif()
-install_celix_bundle(celix_pubsub_admin_tcp EXPORT celix COMPONENT pubsub)
-add_library(Celix::celix_pubsub_admin_tcp ALIAS celix_pubsub_admin_tcp)
+install_celix_bundle(celix_pubsub_admin_udp EXPORT celix COMPONENT pubsub)
+add_library(Celix::celix_pubsub_admin_udp ALIAS celix_pubsub_admin_udp)
diff --git a/bundles/pubsub/pubsub_admin_udp/README.md b/bundles/pubsub/pubsub_admin_udp/README.md
new file mode 100644
index 0000000..5e96625
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_udp/README.md
@@ -0,0 +1,149 @@
+---
+title: PSA UDP
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# PUBSUB-Admin UDP
+
+---
+
+## Description
+
+The scope of this description is the UDP PubSub admin.
+
+The UDP pubsub admin is used to transfer user data transparent via UDP unicast, UDP broadcast and UDP multicast.
+UDP support packets with a maximum of 64kB . To overcome this limit the admin uses a protocol service on top of UDP
+which fragments the data to be send and these fragments are reassembled at the reception side.
+
+### IP Addresses Unicast
+
+To use UDP-unicast, 1 IP address is needed:
+This is the IP address that is bound to an (ethernet) interface (from the subscriber).
+This IP address is defined with the `PSA_IP` property.
+For example: `PSA_IP=192.168.1.0/24`.
+Note the example uses CIDR notation, the CIDR notation specifies the IP range that is used
+by the pubsub admin for searching the network interfaces.
+
+Note the port will be automatically assigned, when none is specified.
+A fixed port can be assigned for example with: `PSA_IP=192.168.1.0/24:34000`.
+
+### IP Address Mulicast
+
+To use UDP-multicast 2 IP addresses are needed:
+
+1. The multicast address (in the range 224.X.X.X - 239.X.X.X)
+2. IP address which is bound to an (ethernet) interface (from the publisher)
+
+These IP address are defined with the `PSA_IP` property,
+with the definition multicast ip address at (ethernet) interface ip address
+For example: `PSA_IP=224.100.0.0/24@192.168.1.0/24`.
+Note the example uses CIDR notation, the CIDR notation specifies the IP range that is used
+by the pubsub admin for searching the network interfaces.
+The multicast address will use the last digits of the network interface IP address
+for an unique multicast address.
+
+Note the port will be automatically assigned, when none is specified.
+A fixed port can be assigned for example with: `PSA_IP=224.100.0.0/24:34000@192.168.1.0/24`.
+
+### IP Address Broadcast
+
+To use UDP-broad 2 IP addresses are needed:
+
+1. The broadcast address (X.X.X.255)
+2. IP address which is bound to an (ethernet) interface (from the publisher)
+
+These IP address are defined with the `PSA_IP` property,
+with the definition broadcast ip address at (etnernet) interface ip address
+For example: `PSA_IP=192.168.1.255@192.168.1.0/24`.
+Note the example uses CIDR notation, the CIDR notation specifies the IP range that is used
+by the pubsub admin for searching the network interfaces.
+
+Note the port will be automatically assigned, when none is specified.
+A fixed port can be assigned for example with: `PSA_IP=192.168.1.255:34000@192.168.1.0/24:34000`.
+
+
+### Discovery
+
+When a publisher wants to publish a topic a TopicSender and publisher endpoint is created by the PubSub Topology Manager.
+When a subscriber wants to subscribe to topic a TopicReceiver and subscriber endpoint is created by the Pubsub Topology Manager.
+The endpoints are published by the PubSubDiscovery within its topic in ETCD (i.e. udp://192.168.1.20:40123).
+A subscriber, interested in the topic, is informed by the TopologyManager that there is a new endpoint.
+The TopicReceiver at the subscriber side creates a listening socket based on this endpoint.
+Now a data-connection is created and data send by the publisher will be received by the subscriber.
+
+
+### Static endpoint
+
+UDP pubsub admin also be used as a static endpoint that can be used to communicate with external
+UDP publish/subsribers that don't use Discovery.
+---
+With the `udp.static.bind.url` property, the static UDP bind can be configured.
+1. For UDP unicast the subscriber topic properties should configure the `udp.static.bind.url` property to configure UDP destination.
+2. For UDP multicast the publisher topic properties should configure the `udp.static.bind.url` property to configure UDP multicast bind interface.
+3. For UDP broadcast the publisher topic properties should configure the `udp.static.bind.url` property to configure UDP broadcast bind interface.
+---
+With the `udp.static.connect.urls` property, the static UDP connections can be configured. (Note The urls are space separate)
+1. For UDP unicast the publisher topic properties should configure the `udp.static.connect.urls` property to configure UDP connections were to send the data.
+2. For UDP multicast the subscriber topic properties should configure the `udp.static.connect.urls` property to configure UDP multicast connection.
+3. For UDP broadcast the subscriber topic properties should configure the `udp.static.connect.urls` property to configure UDP broadcast connection.
+---
+Note a special/dedicated protocol service implemenation can be used to communcate with external publisher/subscribers
+
+---
+
+## Passive Endpoints
+
+Each UDP pubsub publisher/subscriber can be comfigured as a passive endpoint.
+A passive endpoint reuses an existing socket of a publisher/subscriber using properties.
+This can be used to support full duplex socket connections.
+This can be used for both discovery and static end points.
+
+The topic `udp.passive.key` property defines the unique key to select the socket.
+The `udp.passive.key` property needs to be defined both in the topic properties of the topic connections that will be shared.
+and the topic properties of the 'passive' topic that will re-use the connection.
+The `udp.passive.configured` topic property will indicate that is topic is passive and will re-use the connections
+indicated with the `udp.passive.key` property
+
+For example: `udp.passive.key=udp://localhost:9500` and `udp.passive.configured=true`
+
+## IOVEC
+
+The UDP pubsub uses the socket function calls sendnsg / recvmsg with iovec for sending/receiving messages.
+When the serialisation service supports iovec serialisation.
+iovec serialisation can be used for high throughput and low latency serialisation with avoiding of copying
+of data. Because of iovec limitations the protocol service should support message segmentation.
+
+## Properties
+
+<table border="1">
+ <tr><th>Property</th><th>Description</th></tr>
+ <tr><td>PSA_IP</td><td>Unicast/Multicast/Broadcast IP address that is used by the bundle</td></tr>
+ <tr><td>udp.static.bind.url</td><td>The static interface url of the bind interface</td></tr>
+ <tr><td>udp.static.connect.urls</td><td>a space seperated list with static connections urls</td></tr>
+ <tr><td>udp.passive.key</td><td>key of the shared connection</td></tr>
+ <tr><td>udp.passive.configured</td><td>Indicates if the connection is passive and reuses the connection</td></tr>
+ <tr><td>thread.realtime.prio</td><td>Configures the thread priority of the receive thread</td></tr>
+ <tr><td>thread.realtime.sched</td><td>Configures the thread scheduling type of the receive thread</td></tr>
+
+</table>
+
+---
+
+
+
diff --git a/bundles/pubsub/pubsub_admin_udp/src/psa_activator.c b/bundles/pubsub/pubsub_admin_udp/src/psa_activator.c
new file mode 100644
index 0000000..c6a9c10
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_udp/src/psa_activator.c
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <stdlib.h>
+
+#include "celix_api.h"
+#include "pubsub_protocol.h"
+#include "celix_log_helper.h"
+
+#include "pubsub_admin.h"
+#include "pubsub_admin_metrics.h"
+#include "pubsub_udp_admin.h"
+#include "celix_shell_command.h"
+
+typedef struct psa_udp_activator {
+ celix_log_helper_t *logHelper;
+
+ pubsub_udp_admin_t *admin;
+
+ long protocolsTrackerId;
+
+ pubsub_admin_service_t adminService;
+ long adminSvcId;
+
+ pubsub_admin_metrics_service_t adminMetricsService;
+ long adminMetricsSvcId;
+
+ celix_shell_command_t cmdSvc;
+ long cmdSvcId;
+} psa_udp_activator_t;
+
+int psa_udp_start(psa_udp_activator_t *act, celix_bundle_context_t *ctx) {
+ act->adminSvcId = -1L;
+ act->cmdSvcId = -1L;
+ act->protocolsTrackerId = -1L;
+
+ act->logHelper = celix_logHelper_create(ctx, "celix_psa_admin_udp_v2");
+
+ act->admin = pubsub_udpAdmin_create(ctx, act->logHelper);
+ celix_status_t status = act->admin != NULL ? CELIX_SUCCESS : CELIX_BUNDLE_EXCEPTION;
+
+ //track protocols
+ if (status == CELIX_SUCCESS) {
+ celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
+ opts.filter.serviceName = PUBSUB_PROTOCOL_SERVICE_NAME;
+ opts.filter.ignoreServiceLanguage = true;
+ opts.callbackHandle = act->admin;
+ opts.addWithProperties = pubsub_udpAdmin_addProtocolSvc;
+ opts.removeWithProperties = pubsub_udpAdmin_removeProtocolSvc;
+ act->protocolsTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
+ }
+
+ //register pubsub admin service
+ if (status == CELIX_SUCCESS) {
+ pubsub_admin_service_t *psaSvc = &act->adminService;
+ psaSvc->handle = act->admin;
+ psaSvc->matchPublisher = pubsub_udpAdmin_matchPublisher;
+ psaSvc->matchSubscriber = pubsub_udpAdmin_matchSubscriber;
+ psaSvc->matchDiscoveredEndpoint = pubsub_udpAdmin_matchDiscoveredEndpoint;
+ psaSvc->setupTopicSender = pubsub_udpAdmin_setupTopicSender;
+ psaSvc->teardownTopicSender = pubsub_udpAdmin_teardownTopicSender;
+ psaSvc->setupTopicReceiver = pubsub_udpAdmin_setupTopicReceiver;
+ psaSvc->teardownTopicReceiver = pubsub_udpAdmin_teardownTopicReceiver;
+ psaSvc->addDiscoveredEndpoint = pubsub_udpAdmin_addDiscoveredEndpoint;
+ psaSvc->removeDiscoveredEndpoint = pubsub_udpAdmin_removeDiscoveredEndpoint;
+
+ celix_properties_t *props = celix_properties_create();
+ celix_properties_set(props, PUBSUB_ADMIN_SERVICE_TYPE, PUBSUB_UDP_ADMIN_TYPE);
+
+ act->adminSvcId = celix_bundleContext_registerService(ctx, psaSvc, PUBSUB_ADMIN_SERVICE_NAME, props);
+ }
+
+ if (status == CELIX_SUCCESS) {
+ act->adminMetricsService.handle = act->admin;
+ act->adminMetricsService.metrics = pubsub_udpAdmin_metrics;
+
+ celix_properties_t *props = celix_properties_create();
+ celix_properties_set(props, PUBSUB_ADMIN_SERVICE_TYPE, PUBSUB_UDP_ADMIN_TYPE);
+
+ act->adminMetricsSvcId =
+ celix_bundleContext_registerService(ctx,
+ &act->adminMetricsService,
+ PUBSUB_ADMIN_METRICS_SERVICE_NAME,
+ props);
+ }
+
+ //register shell command service
+ {
+ act->cmdSvc.handle = act->admin;
+ act->cmdSvc.executeCommand = pubsub_udpAdmin_executeCommand;
+ celix_properties_t *props = celix_properties_create();
+ celix_properties_set(props, CELIX_SHELL_COMMAND_NAME, "celix::psa_udp");
+ celix_properties_set(props, CELIX_SHELL_COMMAND_USAGE, "psa_udp");
+ celix_properties_set(props, CELIX_SHELL_COMMAND_DESCRIPTION, "Print the information about the TopicSender and TopicReceivers for the udp PSA");
+ act->cmdSvcId = celix_bundleContext_registerService(ctx, &act->cmdSvc, CELIX_SHELL_COMMAND_SERVICE_NAME, props);
+ }
+
+ return status;
+}
+
+int psa_udp_stop(psa_udp_activator_t *act, celix_bundle_context_t *ctx) {
+ celix_bundleContext_unregisterService(ctx, act->adminSvcId);
+ celix_bundleContext_unregisterService(ctx, act->cmdSvcId);
+ celix_bundleContext_unregisterService(ctx, act->adminMetricsSvcId);
+ celix_bundleContext_stopTracker(ctx, act->protocolsTrackerId);
+ pubsub_udpAdmin_destroy(act->admin);
+
+ celix_logHelper_destroy(act->logHelper);
+
+ return CELIX_SUCCESS;
+}
+
+CELIX_GEN_BUNDLE_ACTIVATOR(psa_udp_activator_t, psa_udp_start, psa_udp_stop);
diff --git a/bundles/pubsub/pubsub_admin_udp/src/pubsub_psa_udp_constants.h b/bundles/pubsub/pubsub_admin_udp/src/pubsub_psa_udp_constants.h
new file mode 100644
index 0000000..a79d2f0
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_udp/src/pubsub_psa_udp_constants.h
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef PUBSUB_PSA_UDP_CONSTANTS_H_
+#define PUBSUB_PSA_UDP_CONSTANTS_H_
+
+#define PSA_UDP_BASE_PORT "PSA_UDP_BASE_PORT"
+#define PSA_UDP_MAX_PORT "PSA_UDP_MAX_PORT"
+
+#define PSA_UDP_MAX_MESSAGE_SIZE "PSA_UDP_MAX_MESSAGE_SIZE"
+#define PSA_UDP_RECV_BUFFER_SIZE "PSA_UDP_RECV_BUFFER_SIZE"
+#define PSA_UDP_TIMEOUT "PSA_UDP_TIMEOUT"
+#define PSA_UDP_SUBSCRIBER_CONNECTION_TIMEOUT "PSA_UDP_SUBSCRIBER_CONNECTION_TIMEOUT"
+
+#define PSA_UDP_DEFAULT_BASE_PORT 5501
+#define PSA_UDP_DEFAULT_MAX_PORT 6000
+
+#define PSA_UDP_DEFAULT_MAX_MESSAGE_SIZE 64 * 1024
+#define PSA_UDP_DEFAULT_RECV_BUFFER_SIZE 64 * 1024
+#define PSA_UDP_DEFAULT_TIMEOUT 2000 // 2 seconds
+#define PSA_UDP_SUBSCRIBER_CONNECTION_DEFAULT_TIMEOUT 250 // 250 ms
+
+#define PSA_UDP_DEFAULT_QOS_SAMPLE_SCORE 30
+#define PSA_UDP_DEFAULT_QOS_CONTROL_SCORE 70
+#define PSA_UDP_DEFAULT_SCORE 30
+
+#define PSA_UDP_QOS_SAMPLE_SCORE_KEY "PSA_UDP_QOS_SAMPLE_SCORE"
+#define PSA_UDP_QOS_CONTROL_SCORE_KEY "PSA_UDP_QOS_CONTROL_SCORE"
+#define PSA_UDP_DEFAULT_SCORE_KEY "PSA_UDP_DEFAULT_SCORE"
+
+#define PUBSUB_UDP_VERBOSE_KEY "PSA_UDP_VERBOSE"
+#define PUBSUB_UDP_VERBOSE_DEFAULT false
+
+#define PUBSUB_UDP_PUBLISHER_RETRY_CNT_KEY "PUBSUB_UDP_PUBLISHER_RETRY_COUNT"
+#define PUBSUB_UDP_PUBLISHER_RETRY_CNT_DEFAULT 5
+
+#define PUBSUB_UDP_SUBSCRIBER_RETRY_CNT_KEY "PUBSUB_UDP_SUBSCRIBER_RETRY_COUNT"
+#define PUBSUB_UDP_SUBSCRIBER_RETRY_CNT_DEFAULT 5
+
+
+//Time-out settings are only for BLOCKING connections
+#define PUBSUB_UDP_PUBLISHER_SNDTIMEO_KEY "PUBSUB_UDP_PUBLISHER_SEND_TIMEOUT"
+#define PUBSUB_UDP_PUBLISHER_SNDTIMEO_DEFAULT 5.0
+
+#define PUBSUB_UDP_SUBSCRIBER_RCVTIMEO_KEY "PUBSUB_UDP_SUBSCRIBER_RCV_TIMEOUT"
+#define PUBSUB_UDP_SUBSCRIBER_RCVTIMEO_DEFAULT 5.0
+
+#define PUBSUB_UDP_PSA_IP_KEY "PSA_IP"
+#define PUBSUB_UDP_ADMIN_TYPE "udp"
+
+/**
+ * The UDP url key for the topic sender endpoints
+ */
+#define PUBSUB_UDP_URL_KEY "udp.url"
+
+/**
+ * Can be set in the topic properties to fix a static bind url
+ */
+#define PUBSUB_UDP_STATIC_BIND_URL "udp.static.bind.url"
+
+/**
+ * Name of environment variable with ip/url to bind to
+ * e.g. PSA_UDP_STATIC_BIND_FOR_topic_scope="UDP://0.0.0.0:4444"
+ */
+#define PUBSUB_UDP_STATIC_BIND_URL_FOR "PSA_UDP_STATIC_BIND_URL_FOR_"
+
+/**
+ * Can be set in the topic properties to fix a static url used for discovery
+ */
+#define PUBSUB_UDP_STATIC_DISCOVER_URL "udp.static.bind.url"
+
+/**
+ * If set true on the endpoint, the UDP TopicSender bind and/or discovery url is statically configured.
+ */
+#define PUBSUB_UDP_STATIC_CONFIGURED "udp.static.configured"
+
+/**
+ * The static url which a subscriber should try to connect to.
+ * The urls are space separated.
+ * Can be set in the topic properties
+ */
+#define PUBSUB_UDP_STATIC_CONNECT_URLS "udp.static.connect.urls"
+
+
+/**
+ * Defines if the publisher / subscriber is a passive endpoint and shares
+ * the connection with publisher / subscriber endpoint with the matching (passive) key
+ * e.g. UDP.passive.configured="true" means that a publisher / subscriber is passive,
+ * when a publisher / subscriber is found with a matching key (for example UDP.passive.key="localhost").
+ * This creates full-duplex connection using a single socket.
+ */
+#define PUBSUB_UDP_PASSIVE_CONFIGURED "udp.passive.configured"
+#define PUBSUB_UDP_PASSIVE_KEY "udp.passive.key"
+
+/**
+ * Name of environment variable to indicate that passive endpoint is configured
+ * e.g. PSA_UDP_PASSIVE_CONFIGURED_topic_scope="true"
+ */
+#define PUBSUB_UDP_PASSIVE_ENABLED "PSA_UDP_PASSIVE_CONFIGURED_"
+/**
+ * Name of environment variable to configure the passive key (see PUBSUB_UDP_PASSIVE_KEY )
+ * e.g. PSA_UDP_PASSIVE_KEY__topic_scope="UDP://localhost:4444"
+ */
+#define PUBSUB_UDP_PASSIVE_SELECTION_KEY "PSA_UDP_PASSIVE_KEY_"
+
+/**
+ * Name of environment variable with space-separated list of ips/urls to connect to
+ * e.g. PSA_UDP_STATIC_CONNECT_FOR_topic_scope="udp://127.0.0.1:4444 udp://127.0.0.2:4444"
+ */
+#define PUBSUB_UDP_STATIC_CONNECT_URLS_FOR "PSA_UDP_STATIC_CONNECT_URL_FOR_"
+
+/**
+ * Realtime thread prio and scheduling information. This is used to setup the thread prio/sched of the
+ * internal UDP threads.
+ * Can be set in the topic properties.
+ */
+#define PUBSUB_UDP_THREAD_REALTIME_PRIO "thread.realtime.prio"
+#define PUBSUB_UDP_THREAD_REALTIME_SCHED "thread.realtime.sched"
+
+#endif /* PUBSUB_PSA_UDP_CONSTANTS_H_ */
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c b/bundles/pubsub/pubsub_admin_udp/src/pubsub_udp_admin.c
similarity index 63%
copy from bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c
copy to bundles/pubsub/pubsub_admin_udp/src/pubsub_udp_admin.c
index 9806991..44d53c6 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c
+++ b/bundles/pubsub/pubsub_admin_udp/src/pubsub_udp_admin.c
@@ -24,11 +24,12 @@
#include <pubsub_matching.h>
#include "pubsub_utils.h"
-#include "pubsub_tcp_admin.h"
-#include "pubsub_tcp_handler.h"
-#include "pubsub_psa_tcp_constants.h"
-#include "pubsub_tcp_topic_sender.h"
-#include "pubsub_tcp_topic_receiver.h"
+#include "pubsub_udp_admin.h"
+#include "pubsub_skt_handler.h"
+#include "pubsub_psa_udp_constants.h"
+#include "pubsub_udp_topic_sender.h"
+#include "pubsub_udp_topic_receiver.h"
+#include "celix_properties.h"
#define L_DEBUG(...) \
celix_logHelper_log(psa->log, CELIX_LOG_LEVEL_DEBUG, __VA_ARGS__)
@@ -39,7 +40,7 @@
#define L_ERROR(...) \
celix_logHelper_log(psa->log, CELIX_LOG_LEVEL_ERROR, __VA_ARGS__)
-struct pubsub_tcp_admin {
+struct pubsub_udp_admin {
celix_bundle_context_t *ctx;
celix_log_helper_t *log;
const char *fwUUID;
@@ -56,17 +57,17 @@ struct pubsub_tcp_admin {
struct {
celix_thread_mutex_t mutex;
- hash_map_t *map; //key = svcId, value = psa_tcp_protocol_entry_t*
+ hash_map_t *map; //key = svcId, value = psa_udp_protocol_entry_t*
} protocols;
struct {
celix_thread_mutex_t mutex;
- hash_map_t *map; //key = scope:topic key, value = pubsub_tcp_topic_sender_t*
+ hash_map_t *map; //key = scope:topic key, value = pubsub_udp_topic_sender_t*
} topicSenders;
struct {
celix_thread_mutex_t mutex;
- hash_map_t *map; //key = scope:topic key, value = pubsub_tcp_topic_sender_t*
+ hash_map_t *map; //key = scope:topic key, value = pubsub_udp_topic_sender_t*
} topicReceivers;
struct {
@@ -79,41 +80,49 @@ struct pubsub_tcp_admin {
hash_map_t *map; //key = pubsub message serialization marker svc id (long), pubsub_serialization_handler_t*.
} serializationHandlers;
- pubsub_tcp_endPointStore_t endpointStore;
+ pubsub_sktHandler_endPointStore_t endpointStore;
};
-typedef struct psa_tcp_protocol_entry {
+typedef struct psa_udp_protocol_entry {
const char *protType;
long svcId;
pubsub_protocol_service_t *svc;
-} psa_tcp_protocol_entry_t;
+} psa_udp_protocol_entry_t;
static celix_status_t
-pubsub_tcpAdmin_connectEndpointToReceiver(pubsub_tcp_admin_t *psa, pubsub_tcp_topic_receiver_t *receiver,
+pubsub_udpAdmin_connectEndpointToReceiver(pubsub_udp_admin_t *psa, pubsub_udp_topic_receiver_t *receiver,
const celix_properties_t *endpoint);
+static celix_status_t
+pubsub_udpAdmin_connectEndpointToSender(pubsub_udp_admin_t *psa, pubsub_udp_topic_sender_t *sender,
+ const celix_properties_t *endpoint);
static celix_status_t
-pubsub_tcpAdmin_disconnectEndpointFromReceiver(pubsub_tcp_admin_t *psa, pubsub_tcp_topic_receiver_t *receiver,
+pubsub_udpAdmin_disconnectEndpointFromReceiver(pubsub_udp_admin_t *psa, pubsub_udp_topic_receiver_t *receiver,
const celix_properties_t *endpoint);
-static bool pubsub_tcpAdmin_endpointIsPublisher(const celix_properties_t *endpoint) {
+static bool pubsub_udpAdmin_endpointIsPublisher(const celix_properties_t *endpoint) {
const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, NULL);
return type != NULL && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0;
}
-pubsub_tcp_admin_t *pubsub_tcpAdmin_create(celix_bundle_context_t *ctx, celix_log_helper_t *logHelper) {
- pubsub_tcp_admin_t *psa = calloc(1, sizeof(*psa));
+static bool pubsub_udpAdmin_endpointIsSubScriber(const celix_properties_t *endpoint) {
+ const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, NULL);
+ return type != NULL && strncmp(PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, type, strlen(PUBSUB_SUBSCRIBER_ENDPOINT_TYPE)) == 0;
+}
+
+pubsub_udp_admin_t *pubsub_udpAdmin_create(celix_bundle_context_t *ctx, celix_log_helper_t *logHelper) {
+ pubsub_udp_admin_t *psa = calloc(1, sizeof(*psa));
psa->ctx = ctx;
psa->log = logHelper;
- psa->verbose = celix_bundleContext_getPropertyAsBool(ctx, PUBSUB_TCP_VERBOSE_KEY, PUBSUB_TCP_VERBOSE_DEFAULT);
+ psa->verbose = celix_bundleContext_getPropertyAsBool(ctx, PUBSUB_UDP_VERBOSE_KEY, PUBSUB_UDP_VERBOSE_DEFAULT);
psa->fwUUID = celix_bundleContext_getProperty(ctx, OSGI_FRAMEWORK_FRAMEWORK_UUID, NULL);
- long basePort = celix_bundleContext_getPropertyAsLong(ctx, PSA_TCP_BASE_PORT, PSA_TCP_DEFAULT_BASE_PORT);
+ long basePort = celix_bundleContext_getPropertyAsLong(ctx, PSA_UDP_BASE_PORT, PSA_UDP_DEFAULT_BASE_PORT);
psa->basePort = (unsigned int) basePort;
- psa->defaultScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_TCP_DEFAULT_SCORE_KEY, PSA_TCP_DEFAULT_SCORE);
- psa->qosSampleScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_TCP_QOS_SAMPLE_SCORE_KEY,
- PSA_TCP_DEFAULT_QOS_SAMPLE_SCORE);
- psa->qosControlScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_TCP_QOS_CONTROL_SCORE_KEY,
- PSA_TCP_DEFAULT_QOS_CONTROL_SCORE);
+ psa->defaultScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_UDP_DEFAULT_SCORE_KEY, PSA_UDP_DEFAULT_SCORE);
+ psa->qosSampleScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_UDP_QOS_SAMPLE_SCORE_KEY,
+ PSA_UDP_DEFAULT_QOS_SAMPLE_SCORE);
+ psa->qosControlScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_UDP_QOS_CONTROL_SCORE_KEY,
+ PSA_UDP_DEFAULT_QOS_CONTROL_SCORE);
celixThreadMutex_create(&psa->protocols.mutex, NULL);
psa->protocols.map = hashMap_create(NULL, NULL, NULL, NULL);
@@ -136,7 +145,7 @@ pubsub_tcp_admin_t *pubsub_tcpAdmin_create(celix_bundle_context_t *ctx, celix_lo
return psa;
}
-void pubsub_tcpAdmin_destroy(pubsub_tcp_admin_t *psa) {
+void pubsub_udpAdmin_destroy(pubsub_udp_admin_t *psa) {
if (psa == NULL) {
return;
}
@@ -144,24 +153,24 @@ void pubsub_tcpAdmin_destroy(pubsub_tcp_admin_t *psa) {
celixThreadMutex_lock(&psa->endpointStore.mutex);
hash_map_iterator_t iter = hashMapIterator_construct(psa->endpointStore.map);
while (hashMapIterator_hasNext(&iter)) {
- pubsub_tcpHandler_t *tcpHandler = hashMapIterator_nextValue(&iter);
- pubsub_tcpHandler_destroy(tcpHandler);
+ pubsub_sktHandler_t *udpHandler = hashMapIterator_nextValue(&iter);
+ pubsub_sktHandler_destroy(udpHandler);
}
celixThreadMutex_unlock(&psa->endpointStore.mutex);
celixThreadMutex_lock(&psa->topicSenders.mutex);
iter = hashMapIterator_construct(psa->topicSenders.map);
while (hashMapIterator_hasNext(&iter)) {
- pubsub_tcp_topic_sender_t *sender = hashMapIterator_nextValue(&iter);
- pubsub_tcpTopicSender_destroy(sender);
+ pubsub_udp_topic_sender_t *sender = hashMapIterator_nextValue(&iter);
+ pubsub_udpTopicSender_destroy(sender);
}
celixThreadMutex_unlock(&psa->topicSenders.mutex);
celixThreadMutex_lock(&psa->topicReceivers.mutex);
iter = hashMapIterator_construct(psa->topicReceivers.map);
while (hashMapIterator_hasNext(&iter)) {
- pubsub_tcp_topic_receiver_t *recv = hashMapIterator_nextValue(&iter);
- pubsub_tcpTopicReceiver_destroy(recv);
+ pubsub_udp_topic_receiver_t *recv = hashMapIterator_nextValue(&iter);
+ pubsub_udpTopicReceiver_destroy(recv);
}
celixThreadMutex_unlock(&psa->topicReceivers.mutex);
@@ -184,7 +193,7 @@ void pubsub_tcpAdmin_destroy(pubsub_tcp_admin_t *psa) {
celixThreadMutex_lock(&psa->protocols.mutex);
iter = hashMapIterator_construct(psa->protocols.map);
while (hashMapIterator_hasNext(&iter)) {
- psa_tcp_protocol_entry_t *entry = hashMapIterator_nextValue(&iter);
+ psa_udp_protocol_entry_t *entry = hashMapIterator_nextValue(&iter);
free(entry);
}
celixThreadMutex_unlock(&psa->protocols.mutex);
@@ -212,19 +221,19 @@ void pubsub_tcpAdmin_destroy(pubsub_tcp_admin_t *psa) {
free(psa);
}
-void pubsub_tcpAdmin_addProtocolSvc(void *handle, void *svc, const celix_properties_t *props) {
- pubsub_tcp_admin_t *psa = handle;
+void pubsub_udpAdmin_addProtocolSvc(void *handle, void *svc, const celix_properties_t *props) {
+ pubsub_udp_admin_t *psa = handle;
const char *protType = celix_properties_get(props, PUBSUB_PROTOCOL_TYPE_KEY, NULL);
long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1L);
if (protType == NULL) {
- L_INFO("[PSA_tcp] Ignoring protocol service without %s property", PUBSUB_PROTOCOL_TYPE_KEY);
+ L_INFO("[PSA_UDP] Ignoring protocol service without %s property", PUBSUB_PROTOCOL_TYPE_KEY);
return;
}
celixThreadMutex_lock(&psa->protocols.mutex);
- psa_tcp_protocol_entry_t *entry = hashMap_get(psa->protocols.map, (void *) svcId);
+ psa_udp_protocol_entry_t *entry = hashMap_get(psa->protocols.map, (void *) svcId);
if (entry == NULL) {
entry = calloc(1, sizeof(*entry));
entry->protType = protType;
@@ -235,8 +244,8 @@ void pubsub_tcpAdmin_addProtocolSvc(void *handle, void *svc, const celix_propert
celixThreadMutex_unlock(&psa->protocols.mutex);
}
-void pubsub_tcpAdmin_removeProtocolSvc(void *handle, void *svc, const celix_properties_t *props) {
- pubsub_tcp_admin_t *psa = handle;
+void pubsub_udpAdmin_removeProtocolSvc(void *handle, void *svc, const celix_properties_t *props) {
+ pubsub_udp_admin_t *psa = handle;
long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1L);
//remove protocol
@@ -246,7 +255,7 @@ void pubsub_tcpAdmin_removeProtocolSvc(void *handle, void *svc, const celix_prop
// Note that it is the responsibility of the topology manager to create new topic senders/receivers
celixThreadMutex_lock(&psa->protocols.mutex);
- psa_tcp_protocol_entry_t *entry = hashMap_remove(psa->protocols.map, (void *) svcId);
+ psa_udp_protocol_entry_t *entry = hashMap_remove(psa->protocols.map, (void *) svcId);
celixThreadMutex_unlock(&psa->protocols.mutex);
if (entry != NULL) {
@@ -254,11 +263,11 @@ void pubsub_tcpAdmin_removeProtocolSvc(void *handle, void *svc, const celix_prop
hash_map_iterator_t iter = hashMapIterator_construct(psa->topicSenders.map);
while (hashMapIterator_hasNext(&iter)) {
hash_map_entry_t *senderEntry = hashMapIterator_nextEntry(&iter);
- pubsub_tcp_topic_sender_t *sender = hashMapEntry_getValue(senderEntry);
- if (sender != NULL && entry->svcId == pubsub_tcpTopicSender_protocolSvcId(sender)) {
+ pubsub_udp_topic_sender_t *sender = hashMapEntry_getValue(senderEntry);
+ if (sender != NULL && entry->svcId == pubsub_udpTopicSender_protocolSvcId(sender)) {
char *key = hashMapEntry_getKey(senderEntry);
hashMapIterator_remove(&iter);
- pubsub_tcpTopicSender_destroy(sender);
+ pubsub_udpTopicSender_destroy(sender);
free(key);
}
}
@@ -268,11 +277,11 @@ void pubsub_tcpAdmin_removeProtocolSvc(void *handle, void *svc, const celix_prop
iter = hashMapIterator_construct(psa->topicReceivers.map);
while (hashMapIterator_hasNext(&iter)) {
hash_map_entry_t *senderEntry = hashMapIterator_nextEntry(&iter);
- pubsub_tcp_topic_receiver_t *receiver = hashMapEntry_getValue(senderEntry);
- if (receiver != NULL && entry->svcId == pubsub_tcpTopicReceiver_protocolSvcId(receiver)) {
+ pubsub_udp_topic_receiver_t *receiver = hashMapEntry_getValue(senderEntry);
+ if (receiver != NULL && entry->svcId == pubsub_udpTopicReceiver_protocolSvcId(receiver)) {
char *key = hashMapEntry_getKey(senderEntry);
hashMapIterator_remove(&iter);
- pubsub_tcpTopicReceiver_destroy(receiver);
+ pubsub_udpTopicReceiver_destroy(receiver);
free(key);
}
}
@@ -282,13 +291,13 @@ void pubsub_tcpAdmin_removeProtocolSvc(void *handle, void *svc, const celix_prop
}
}
-celix_status_t pubsub_tcpAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter,
+celix_status_t pubsub_udpAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter,
celix_properties_t **topicProperties, double *outScore,
long *outSerializerSvcId, long *outProtocolSvcId) {
- pubsub_tcp_admin_t *psa = handle;
- L_DEBUG("[PSA_TCP_V2] pubsub_tcpAdmin_matchPublisher");
+ pubsub_udp_admin_t *psa = handle;
+ L_DEBUG("[PSA_UDP_V2] pubsub_udpAdmin_matchPublisher");
celix_status_t status = CELIX_SUCCESS;
- double score = pubsub_utils_matchPublisher(psa->ctx, svcRequesterBndId, svcFilter->filterStr, PUBSUB_TCP_ADMIN_TYPE,
+ double score = pubsub_utils_matchPublisher(psa->ctx, svcRequesterBndId, svcFilter->filterStr, PUBSUB_UDP_ADMIN_TYPE,
psa->qosSampleScore, psa->qosControlScore, psa->defaultScore, true, topicProperties, outSerializerSvcId, outProtocolSvcId);
*outScore = score;
@@ -297,13 +306,13 @@ celix_status_t pubsub_tcpAdmin_matchPublisher(void *handle, long svcRequesterBnd
}
celix_status_t
-pubsub_tcpAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties,
+pubsub_udpAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties,
celix_properties_t **topicProperties, double *outScore, long *outSerializerSvcId,
long *outProtocolSvcId) {
- pubsub_tcp_admin_t *psa = handle;
- L_DEBUG("[PSA_TCP_V2] pubsub_tcpAdmin_matchSubscriber");
+ pubsub_udp_admin_t *psa = handle;
+ L_DEBUG("[PSA_UDP_V2] pubsub_udpAdmin_matchSubscriber");
celix_status_t status = CELIX_SUCCESS;
- double score = pubsub_utils_matchSubscriber(psa->ctx, svcProviderBndId, svcProperties, PUBSUB_TCP_ADMIN_TYPE,
+ double score = pubsub_utils_matchSubscriber(psa->ctx, svcProviderBndId, svcProperties, PUBSUB_UDP_ADMIN_TYPE,
psa->qosSampleScore, psa->qosControlScore, psa->defaultScore, true, topicProperties, outSerializerSvcId, outProtocolSvcId);
if (outScore != NULL) {
*outScore = score;
@@ -312,18 +321,18 @@ pubsub_tcpAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix
}
celix_status_t
-pubsub_tcpAdmin_matchDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint, bool *outMatch) {
- pubsub_tcp_admin_t *psa = handle;
- L_DEBUG("[PSA_TCP_V2] pubsub_tcpAdmin_matchEndpoint");
+pubsub_udpAdmin_matchDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint, bool *outMatch) {
+ pubsub_udp_admin_t *psa = handle;
+ L_DEBUG("[PSA_UDP_V2] pubsub_udpAdmin_matchEndpoint");
celix_status_t status = CELIX_SUCCESS;
- bool match = pubsub_utils_matchEndpoint(psa->ctx, psa->log, endpoint, PUBSUB_TCP_ADMIN_TYPE, true, NULL, NULL);
+ bool match = pubsub_utils_matchEndpoint(psa->ctx, psa->log, endpoint, PUBSUB_UDP_ADMIN_TYPE, true, NULL, NULL);
if (outMatch != NULL) {
*outMatch = match;
}
return status;
}
-static pubsub_serializer_handler_t* pubsub_tcpAdmin_getSerializationHandler(pubsub_tcp_admin_t* psa, long msgSerializationMarkerSvcId) {
+static pubsub_serializer_handler_t* pubsub_udpAdmin_getSerializationHandler(pubsub_udp_admin_t* psa, long msgSerializationMarkerSvcId) {
pubsub_serializer_handler_t* handler = NULL;
celixThreadMutex_lock(&psa->serializationHandlers.mutex);
handler = hashMap_get(psa->serializationHandlers.map, (void*)msgSerializationMarkerSvcId);
@@ -337,10 +346,10 @@ static pubsub_serializer_handler_t* pubsub_tcpAdmin_getSerializationHandler(pubs
return handler;
}
-celix_status_t pubsub_tcpAdmin_setupTopicSender(void *handle, const char *scope, const char *topic,
+celix_status_t pubsub_udpAdmin_setupTopicSender(void *handle, const char *scope, const char *topic,
const celix_properties_t *topicProperties, long serializerSvcId,
long protocolSvcId, celix_properties_t **outPublisherEndpoint) {
- pubsub_tcp_admin_t *psa = handle;
+ pubsub_udp_admin_t *psa = handle;
celix_status_t status = CELIX_SUCCESS;
//1) Get serialization handler
@@ -349,7 +358,7 @@ celix_status_t pubsub_tcpAdmin_setupTopicSender(void *handle, const char *scope,
//4) Connect existing endpoints
//5) set outPublisherEndpoint
- pubsub_serializer_handler_t* handler = pubsub_tcpAdmin_getSerializationHandler(psa, serializerSvcId);
+ pubsub_serializer_handler_t* handler = pubsub_udpAdmin_getSerializationHandler(psa, serializerSvcId);
if (handler == NULL) {
L_ERROR("Cannot create topic sender without serialization handler");
return CELIX_ILLEGAL_STATE;
@@ -360,24 +369,25 @@ celix_status_t pubsub_tcpAdmin_setupTopicSender(void *handle, const char *scope,
celixThreadMutex_lock(&psa->protocols.mutex);
celixThreadMutex_lock(&psa->topicSenders.mutex);
- pubsub_tcp_topic_sender_t *sender = hashMap_get(psa->topicSenders.map, key);
+ pubsub_udp_topic_sender_t *sender = hashMap_get(psa->topicSenders.map, key);
celixThreadMutex_unlock(&psa->topicSenders.mutex);
if (sender == NULL) {
- psa_tcp_protocol_entry_t *protEntry = hashMap_get(psa->protocols.map, (void *) protocolSvcId);
+ psa_udp_protocol_entry_t *protEntry = hashMap_get(psa->protocols.map, (void *) protocolSvcId);
if (protEntry != NULL) {
- sender = pubsub_tcpTopicSender_create(psa->ctx, psa->log, scope, topic, handler, handle, topicProperties,
+ sender = pubsub_udpTopicSender_create(psa->ctx, psa->log, scope, topic, handler, handle, topicProperties,
&psa->endpointStore, protocolSvcId,
protEntry->svc);
}
if (sender != NULL) {
- const char *psaType = PUBSUB_TCP_ADMIN_TYPE;
+ const char *psaType = PUBSUB_UDP_ADMIN_TYPE;
const char *protType = protEntry->protType;
newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType,
pubsub_serializerHandler_getSerializationType(handler), protType, NULL);
- celix_properties_set(newEndpoint, PUBSUB_TCP_URL_KEY, pubsub_tcpTopicSender_url(sender));
+ const char* url = pubsub_udpTopicSender_url(sender);
+ if (url) celix_properties_set(newEndpoint, PUBSUB_UDP_URL_KEY, url);
- celix_properties_setBool(newEndpoint, PUBSUB_TCP_STATIC_CONFIGURED, pubsub_tcpTopicSender_isStatic(sender));
- if (pubsub_tcpTopicSender_isPassive(sender)) {
+ celix_properties_setBool(newEndpoint, PUBSUB_UDP_STATIC_CONFIGURED, pubsub_udpTopicSender_isStatic(sender));
+ if (pubsub_udpTopicSender_isPassive(sender)) {
celix_properties_set(newEndpoint, PUBSUB_ENDPOINT_VISIBILITY, PUBSUB_ENDPOINT_LOCAL_VISIBILITY);
} else {
celix_properties_set(newEndpoint, PUBSUB_ENDPOINT_VISIBILITY, PUBSUB_ENDPOINT_SYSTEM_VISIBILITY);
@@ -390,15 +400,31 @@ celix_status_t pubsub_tcpAdmin_setupTopicSender(void *handle, const char *scope,
hashMap_put(psa->topicSenders.map, key, sender);
celixThreadMutex_unlock(&psa->topicSenders.mutex);
} else {
- L_ERROR("[PSA_TCP_V2] Error creating a TopicSender");
+ L_ERROR("[PSA_UDP_V2] Error creating a TopicSender");
free(key);
}
} else {
free(key);
- L_ERROR("[PSA_TCP_V2] Cannot setup already existing TopicSender for scope/topic %s/%s!", scope, topic);
+ L_ERROR("[PSA_UDP_V2] Cannot setup already existing TopicSender for scope/topic %s/%s!", scope, topic);
}
celixThreadMutex_unlock(&psa->protocols.mutex);
+ if (sender != NULL && newEndpoint != NULL) {
+ if (pubsub_udpAdmin_endpointIsSubScriber(newEndpoint) && pubsubEndpoint_matchWithTopicAndScope(newEndpoint, topic, scope)) {
+ pubsub_udpAdmin_connectEndpointToSender(psa, sender, newEndpoint);
+ }
+ if (pubsub_udpAdmin_endpointIsPublisher(newEndpoint) && pubsubEndpoint_matchWithTopicAndScope(newEndpoint, topic, scope)) {
+ celixThreadMutex_lock(&psa->topicReceivers.mutex);
+ hash_map_iterator_t senderIter = hashMapIterator_construct(psa->topicReceivers.map);
+ while (hashMapIterator_hasNext(&senderIter)) {
+ pubsub_udp_topic_receiver_t *receiver = hashMapIterator_nextValue(&senderIter);
+ pubsub_udpAdmin_connectEndpointToReceiver(psa, receiver, newEndpoint);
+ }
+ celixThreadMutex_unlock(&psa->topicReceivers.mutex);
+ }
+
+ }
+
if (newEndpoint != NULL && outPublisherEndpoint != NULL) {
*outPublisherEndpoint = newEndpoint;
}
@@ -406,8 +432,8 @@ celix_status_t pubsub_tcpAdmin_setupTopicSender(void *handle, const char *scope,
return status;
}
-celix_status_t pubsub_tcpAdmin_teardownTopicSender(void *handle, const char *scope, const char *topic) {
- pubsub_tcp_admin_t *psa = handle;
+celix_status_t pubsub_udpAdmin_teardownTopicSender(void *handle, const char *scope, const char *topic) {
+ pubsub_udp_admin_t *psa = handle;
celix_status_t status = CELIX_SUCCESS;
//1) Find and remove TopicSender from map
@@ -418,13 +444,13 @@ celix_status_t pubsub_tcpAdmin_teardownTopicSender(void *handle, const char *sco
hash_map_entry_t *entry = hashMap_getEntry(psa->topicSenders.map, key);
if (entry != NULL) {
char *mapKey = hashMapEntry_getKey(entry);
- pubsub_tcp_topic_sender_t *sender = hashMap_remove(psa->topicSenders.map, key);
+ pubsub_udp_topic_sender_t *sender = hashMap_remove(psa->topicSenders.map, key);
celixThreadMutex_unlock(&psa->topicSenders.mutex);
free(mapKey);
- pubsub_tcpTopicSender_destroy(sender);
+ pubsub_udpTopicSender_destroy(sender);
} else {
celixThreadMutex_unlock(&psa->topicSenders.mutex);
- L_ERROR("[PSA_TCP_V2] Cannot teardown TopicSender with scope/topic %s/%s. Does not exists",
+ L_ERROR("[PSA_UDP_V2] Cannot teardown TopicSender with scope/topic %s/%s. Does not exists",
scope == NULL ? "(null)" : scope,
topic);
}
@@ -433,12 +459,12 @@ celix_status_t pubsub_tcpAdmin_teardownTopicSender(void *handle, const char *sco
return status;
}
-celix_status_t pubsub_tcpAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic,
+celix_status_t pubsub_udpAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic,
const celix_properties_t *topicProperties, long serializerSvcId,
long protocolSvcId, celix_properties_t **outSubscriberEndpoint) {
- pubsub_tcp_admin_t *psa = handle;
+ pubsub_udp_admin_t *psa = handle;
- pubsub_serializer_handler_t* handler = pubsub_tcpAdmin_getSerializationHandler(psa, serializerSvcId);
+ pubsub_serializer_handler_t* handler = pubsub_udpAdmin_getSerializationHandler(psa, serializerSvcId);
if (handler == NULL) {
L_ERROR("Cannot create topic receiver without serialization handler");
return CELIX_ILLEGAL_STATE;
@@ -449,22 +475,32 @@ celix_status_t pubsub_tcpAdmin_setupTopicReceiver(void *handle, const char *scop
celixThreadMutex_lock(&psa->protocols.mutex);
celixThreadMutex_lock(&psa->topicReceivers.mutex);
- pubsub_tcp_topic_receiver_t *receiver = hashMap_get(psa->topicReceivers.map, key);
+ pubsub_udp_topic_receiver_t *receiver = hashMap_get(psa->topicReceivers.map, key);
celixThreadMutex_unlock(&psa->topicReceivers.mutex);
if (receiver == NULL) {
- psa_tcp_protocol_entry_t *protEntry = hashMap_get(psa->protocols.map, (void *) protocolSvcId);
+ psa_udp_protocol_entry_t *protEntry = hashMap_get(psa->protocols.map, (void *) protocolSvcId);
if (protEntry != NULL) {
- receiver = pubsub_tcpTopicReceiver_create(psa->ctx, psa->log, scope, topic,
+ receiver = pubsub_udpTopicReceiver_create(psa->ctx, psa->log, scope, topic,
handler, handle, topicProperties,
&psa->endpointStore, protocolSvcId, protEntry->svc);
} else {
- L_ERROR("[PSA_TCP_V2] Cannot find serializer or protocol for TopicSender %s/%s", scope == NULL ? "(null)" : scope, topic);
+ L_ERROR("[PSA_UDP_V2] Cannot find serializer or protocol for TopicSender %s/%s", scope == NULL ? "(null)" : scope, topic);
}
if (receiver != NULL) {
- const char *psaType = PUBSUB_TCP_ADMIN_TYPE;
+ const char *psaType = PUBSUB_UDP_ADMIN_TYPE;
const char *protType = protEntry->protType;
newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic,
PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType, pubsub_serializerHandler_getSerializationType(handler), protType, NULL);
+ const char* url = pubsub_udpTopicReceiver_url(receiver);
+ if (url) celix_properties_set(newEndpoint, PUBSUB_UDP_URL_KEY, url);
+
+ celix_properties_setBool(newEndpoint, PUBSUB_UDP_STATIC_CONFIGURED, pubsub_udpTopicReceiver_isStatic(receiver));
+ if (pubsub_udpTopicReceiver_isPassive(receiver)) {
+ celix_properties_set(newEndpoint, PUBSUB_ENDPOINT_VISIBILITY, PUBSUB_ENDPOINT_LOCAL_VISIBILITY);
+ } else {
+ celix_properties_set(newEndpoint, PUBSUB_ENDPOINT_VISIBILITY, PUBSUB_ENDPOINT_SYSTEM_VISIBILITY);
+ }
+
//if available also set container name
const char *cn = celix_bundleContext_getProperty(psa->ctx, "CELIX_CONTAINER_NAME", NULL);
if (cn != NULL) {
@@ -474,27 +510,28 @@ celix_status_t pubsub_tcpAdmin_setupTopicReceiver(void *handle, const char *scop
hashMap_put(psa->topicReceivers.map, key, receiver);
celixThreadMutex_unlock(&psa->topicReceivers.mutex);
} else {
- L_ERROR("[PSA_TCP_V2] Error creating a TopicReceiver.");
+ L_ERROR("[PSA_UDP_V2] Error creating a TopicReceiver.");
free(key);
}
} else {
free(key);
- L_ERROR("[PSA_TCP_V2] Cannot setup already existing TopicReceiver for scope/topic %s/%s!",
+ L_ERROR("[PSA_UDP_V2] Cannot setup already existing TopicReceiver for scope/topic %s/%s!",
scope == NULL ? "(null)" : scope,
topic);
}
celixThreadMutex_unlock(&psa->protocols.mutex);
if (receiver != NULL && newEndpoint != NULL) {
- celixThreadMutex_lock(&psa->discoveredEndpoints.mutex);
- hash_map_iterator_t iter = hashMapIterator_construct(psa->discoveredEndpoints.map);
- while (hashMapIterator_hasNext(&iter)) {
- celix_properties_t *endpoint = hashMapIterator_nextValue(&iter);
- if (pubsub_tcpAdmin_endpointIsPublisher(endpoint) && pubsubEndpoint_matchWithTopicAndScope(endpoint, topic, scope)) {
- pubsub_tcpAdmin_connectEndpointToReceiver(psa, receiver, endpoint);
- }
+ if (pubsub_udpAdmin_endpointIsPublisher(newEndpoint) && pubsubEndpoint_matchWithTopicAndScope(newEndpoint, topic, scope)) {
+ pubsub_udpAdmin_connectEndpointToReceiver(psa, receiver, newEndpoint);
}
- celixThreadMutex_unlock(&psa->discoveredEndpoints.mutex);
+ celixThreadMutex_lock(&psa->topicSenders.mutex);
+ hash_map_iterator_t senderIter = hashMapIterator_construct(psa->topicSenders.map);
+ while (hashMapIterator_hasNext(&senderIter)) {
+ pubsub_udp_topic_sender_t *sender = hashMapIterator_nextValue(&senderIter);
+ pubsub_udpAdmin_connectEndpointToSender(psa, sender, newEndpoint);
+ }
+ celixThreadMutex_unlock(&psa->topicSenders.mutex);
}
if (newEndpoint != NULL && outSubscriberEndpoint != NULL) {
@@ -505,8 +542,8 @@ celix_status_t pubsub_tcpAdmin_setupTopicReceiver(void *handle, const char *scop
return status;
}
-celix_status_t pubsub_tcpAdmin_teardownTopicReceiver(void *handle, const char *scope, const char *topic) {
- pubsub_tcp_admin_t *psa = handle;
+celix_status_t pubsub_udpAdmin_teardownTopicReceiver(void *handle, const char *scope, const char *topic) {
+ pubsub_udp_admin_t *psa = handle;
char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
celixThreadMutex_lock(&psa->topicReceivers.mutex);
@@ -514,12 +551,12 @@ celix_status_t pubsub_tcpAdmin_teardownTopicReceiver(void *handle, const char *s
free(key);
if (entry != NULL) {
char *receiverKey = hashMapEntry_getKey(entry);
- pubsub_tcp_topic_receiver_t *receiver = hashMapEntry_getValue(entry);
+ pubsub_udp_topic_receiver_t *receiver = hashMapEntry_getValue(entry);
hashMap_remove(psa->topicReceivers.map, receiverKey);
celixThreadMutex_unlock(&psa->topicReceivers.mutex);
free(receiverKey);
- pubsub_tcpTopicReceiver_destroy(receiver);
+ pubsub_udpTopicReceiver_destroy(receiver);
} else {
celixThreadMutex_unlock(&psa->topicReceivers.mutex);
}
@@ -529,40 +566,53 @@ celix_status_t pubsub_tcpAdmin_teardownTopicReceiver(void *handle, const char *s
}
static celix_status_t
-pubsub_tcpAdmin_connectEndpointToReceiver(pubsub_tcp_admin_t *psa, pubsub_tcp_topic_receiver_t *receiver,
+pubsub_udpAdmin_connectEndpointToReceiver(pubsub_udp_admin_t *psa, pubsub_udp_topic_receiver_t *receiver,
const celix_properties_t *endpoint) {
//note can be called with discoveredEndpoint.mutex lock
celix_status_t status = CELIX_SUCCESS;
+ const char *url = celix_properties_get(endpoint, PUBSUB_UDP_URL_KEY, NULL);
+ pubsub_udpTopicReceiver_connectTo(receiver, url);
+ return status;
+}
- const char *url = celix_properties_get(endpoint, PUBSUB_TCP_URL_KEY, NULL);
-
- if (url == NULL) {
- const char *admin = celix_properties_get(endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, NULL);
- const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, NULL);
- L_WARN("[PSA_TCP_V2] Error got endpoint without a tcp url (admin: %s, type: %s)", admin, type);
- status = CELIX_BUNDLE_EXCEPTION;
- } else {
- pubsub_tcpTopicReceiver_connectTo(receiver, url);
- }
-
+static celix_status_t
+pubsub_udpAdmin_connectEndpointToSender(pubsub_udp_admin_t *psa, pubsub_udp_topic_sender_t *sender,
+ const celix_properties_t *endpoint) {
+ //note can be called with discoveredEndpoint.mutex lock
+ celix_status_t status = CELIX_SUCCESS;
+ const char *url = celix_properties_get(endpoint, PUBSUB_UDP_URL_KEY, NULL);
+ pubsub_udpTopicSender_connectTo(sender, url);
return status;
}
-celix_status_t pubsub_tcpAdmin_addDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint) {
- pubsub_tcp_admin_t *psa = handle;
- if (pubsub_tcpAdmin_endpointIsPublisher(endpoint)) {
+celix_status_t pubsub_udpAdmin_addDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint) {
+ pubsub_udp_admin_t *psa = handle;
+
+ if (pubsub_udpAdmin_endpointIsPublisher(endpoint)) {
celixThreadMutex_lock(&psa->topicReceivers.mutex);
hash_map_iterator_t iter = hashMapIterator_construct(psa->topicReceivers.map);
while (hashMapIterator_hasNext(&iter)) {
- pubsub_tcp_topic_receiver_t *receiver = hashMapIterator_nextValue(&iter);
- if (pubsubEndpoint_matchWithTopicAndScope(endpoint, pubsub_tcpTopicReceiver_topic(receiver), pubsub_tcpTopicReceiver_scope(receiver))) {
- pubsub_tcpAdmin_connectEndpointToReceiver(psa, receiver, endpoint);
+ pubsub_udp_topic_receiver_t *receiver = hashMapIterator_nextValue(&iter);
+ if (pubsubEndpoint_matchWithTopicAndScope(endpoint, pubsub_udpTopicReceiver_topic(receiver), pubsub_udpTopicReceiver_scope(receiver))) {
+ pubsub_udpAdmin_connectEndpointToReceiver(psa, receiver, endpoint);
}
}
celixThreadMutex_unlock(&psa->topicReceivers.mutex);
}
+ if (pubsub_udpAdmin_endpointIsSubScriber(endpoint)) {
+ celixThreadMutex_lock(&psa->topicSenders.mutex);
+ hash_map_iterator_t iter = hashMapIterator_construct(psa->topicSenders.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ pubsub_udp_topic_sender_t *sender = hashMapIterator_nextValue(&iter);
+ if (pubsubEndpoint_matchWithTopicAndScope(endpoint, pubsub_udpTopicSender_topic(sender), pubsub_udpTopicSender_scope(sender))) {
+ pubsub_udpAdmin_connectEndpointToSender(psa, sender, endpoint);
+ }
+ }
+ celixThreadMutex_unlock(&psa->topicSenders.mutex);
+ }
+
celix_properties_t *cpy = celix_properties_copy(endpoint);
const char *uuid = celix_properties_get(cpy, PUBSUB_ENDPOINT_UUID, NULL);
@@ -575,36 +625,48 @@ celix_status_t pubsub_tcpAdmin_addDiscoveredEndpoint(void *handle, const celix_p
}
static celix_status_t
-pubsub_tcpAdmin_disconnectEndpointFromReceiver(pubsub_tcp_admin_t *psa, pubsub_tcp_topic_receiver_t *receiver,
+pubsub_udpAdmin_disconnectEndpointFromReceiver(pubsub_udp_admin_t *psa, pubsub_udp_topic_receiver_t *receiver,
const celix_properties_t *endpoint) {
//note can be called with discoveredEndpoint.mutex lock
celix_status_t status = CELIX_SUCCESS;
+ const char *url = celix_properties_get(endpoint, PUBSUB_UDP_URL_KEY, NULL);
+ pubsub_udpTopicReceiver_disconnectFrom(receiver, url);
+ return status;
+}
- const char *url = celix_properties_get(endpoint, PUBSUB_TCP_URL_KEY, NULL);
-
- if (url == NULL) {
- L_WARN("[PSA_TCP_V2] Error got endpoint without tcp url");
- status = CELIX_BUNDLE_EXCEPTION;
- } else {
- pubsub_tcpTopicReceiver_disconnectFrom(receiver, url);
- }
-
+static celix_status_t
+pubsub_udpAdmin_disconnectEndpointFromSender(pubsub_udp_admin_t *psa, pubsub_udp_topic_sender_t *sender,
+ const celix_properties_t *endpoint) {
+ //note can be called with discoveredEndpoint.mutex lock
+ celix_status_t status = CELIX_SUCCESS;
+ const char *url = celix_properties_get(endpoint, PUBSUB_UDP_URL_KEY, NULL);
+ pubsub_udpTopicSender_disconnectFrom(sender, url);
return status;
}
-celix_status_t pubsub_tcpAdmin_removeDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint) {
- pubsub_tcp_admin_t *psa = handle;
+celix_status_t pubsub_udpAdmin_removeDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint) {
+ pubsub_udp_admin_t *psa = handle;
- if (pubsub_tcpAdmin_endpointIsPublisher(endpoint)) {
+ if (pubsub_udpAdmin_endpointIsPublisher(endpoint)) {
celixThreadMutex_lock(&psa->topicReceivers.mutex);
hash_map_iterator_t iter = hashMapIterator_construct(psa->topicReceivers.map);
while (hashMapIterator_hasNext(&iter)) {
- pubsub_tcp_topic_receiver_t *receiver = hashMapIterator_nextValue(&iter);
- pubsub_tcpAdmin_disconnectEndpointFromReceiver(psa, receiver, endpoint);
+ pubsub_udp_topic_receiver_t *receiver = hashMapIterator_nextValue(&iter);
+ pubsub_udpAdmin_disconnectEndpointFromReceiver(psa, receiver, endpoint);
}
celixThreadMutex_unlock(&psa->topicReceivers.mutex);
}
+ if (pubsub_udpAdmin_endpointIsSubScriber(endpoint)) {
+ celixThreadMutex_lock(&psa->topicSenders.mutex);
+ hash_map_iterator_t iter = hashMapIterator_construct(psa->topicSenders.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ pubsub_udp_topic_sender_t *sender = hashMapIterator_nextValue(&iter);
+ pubsub_udpAdmin_disconnectEndpointFromSender(psa, sender, endpoint);
+ }
+ celixThreadMutex_unlock(&psa->topicSenders.mutex);
+ }
+
celixThreadMutex_lock(&psa->discoveredEndpoints.mutex);
const char *uuid = celix_properties_get(endpoint, PUBSUB_ENDPOINT_UUID, NULL);
celix_properties_t *found = hashMap_remove(psa->discoveredEndpoints.map, (void *) uuid);
@@ -618,9 +680,9 @@ celix_status_t pubsub_tcpAdmin_removeDiscoveredEndpoint(void *handle, const celi
return status;
}
-bool pubsub_tcpAdmin_executeCommand(void *handle, const char *commandLine __attribute__((unused)), FILE *out,
+bool pubsub_udpAdmin_executeCommand(void *handle, const char *commandLine __attribute__((unused)), FILE *out,
FILE *errStream __attribute__((unused))) {
- pubsub_tcp_admin_t *psa = handle;
+ pubsub_udp_admin_t *psa = handle;
celix_status_t status = CELIX_SUCCESS;
char *line = celix_utils_strdup(commandLine);
char *token = line;
@@ -644,20 +706,29 @@ bool pubsub_tcpAdmin_executeCommand(void *handle, const char *commandLine __attr
celixThreadMutex_lock(&psa->topicSenders.mutex);
hash_map_iterator_t iter = hashMapIterator_construct(psa->topicSenders.map);
while (hashMapIterator_hasNext(&iter)) {
- pubsub_tcp_topic_sender_t *sender = hashMapIterator_nextValue(&iter);
- long protSvcId = pubsub_tcpTopicSender_protocolSvcId(sender);
- psa_tcp_protocol_entry_t *protEntry = hashMap_get(psa->protocols.map, (void *) protSvcId);
- const char *serType = pubsub_tcpTopicSender_serializerType(sender);
+ pubsub_udp_topic_sender_t *sender = hashMapIterator_nextValue(&iter);
+ long protSvcId = pubsub_udpTopicSender_protocolSvcId(sender);
+ psa_udp_protocol_entry_t *protEntry = hashMap_get(psa->protocols.map, (void *) protSvcId);
+ const char *serType = pubsub_udpTopicSender_serializerType(sender);
const char *protType = protEntry == NULL ? "!Error!" : protEntry->protType;
- const char *scope = pubsub_tcpTopicSender_scope(sender);
- const char *topic = pubsub_tcpTopicSender_topic(sender);
- const char *url = pubsub_tcpTopicSender_url(sender);
- const char *isPassive = pubsub_tcpTopicSender_isPassive(sender) ? " (passive)" : "";
- const char *postUrl = pubsub_tcpTopicSender_isStatic(sender) ? " (static)" : "";
+ const char *scope = pubsub_udpTopicSender_scope(sender);
+ const char *topic = pubsub_udpTopicSender_topic(sender);
+ const char *url = pubsub_udpTopicSender_url(sender);
+ const char *isPassive = pubsub_udpTopicSender_isPassive(sender) ? " (passive)" : "";
+ const char *postUrl = pubsub_udpTopicSender_isStatic(sender) ? " (static)" : "";
+ celix_array_list_t *urls = celix_arrayList_create();
+ pubsub_udpTopicSender_listConnections(sender, urls);
+
fprintf(out, "|- Topic Sender %s/%s\n", scope == NULL ? "(null)" : scope, topic);
fprintf(out, " |- serializer type = %s\n", serType);
fprintf(out, " |- protocol type = %s\n", protType);
- fprintf(out, " |- url = %s%s%s\n", url, postUrl, isPassive);
+ if (url) fprintf(out, " |- url = %s%s%s\n", url, postUrl, isPassive);
+ for (int i = 0; i < celix_arrayList_size(urls); ++i) {
+ char *_url = celix_arrayList_get(urls, i);
+ fprintf(out, " |- connected url = %s\n", _url);
+ free(_url);
+ }
+ celix_arrayList_destroy(urls);
}
celixThreadMutex_unlock(&psa->topicSenders.mutex);
celixThreadMutex_unlock(&psa->protocols.mutex);
@@ -668,30 +739,34 @@ bool pubsub_tcpAdmin_executeCommand(void *handle, const char *commandLine __attr
celixThreadMutex_lock(&psa->topicReceivers.mutex);
iter = hashMapIterator_construct(psa->topicReceivers.map);
while (hashMapIterator_hasNext(&iter)) {
- pubsub_tcp_topic_receiver_t *receiver = hashMapIterator_nextValue(&iter);
- long protSvcId = pubsub_tcpTopicReceiver_protocolSvcId(receiver);
- psa_tcp_protocol_entry_t *protEntry = hashMap_get(psa->protocols.map, (void *) protSvcId);
- const char *serType = pubsub_tcpTopicReceiver_serializerType(receiver);
+ pubsub_udp_topic_receiver_t *receiver = hashMapIterator_nextValue(&iter);
+ long protSvcId = pubsub_udpTopicReceiver_protocolSvcId(receiver);
+ psa_udp_protocol_entry_t *protEntry = hashMap_get(psa->protocols.map, (void *) protSvcId);
+ const char *serType = pubsub_udpTopicReceiver_serializerType(receiver);
const char *protType = protEntry == NULL ? "!Error!" : protEntry->protType;
- const char *scope = pubsub_tcpTopicReceiver_scope(receiver);
- const char *topic = pubsub_tcpTopicReceiver_topic(receiver);
+ const char *scope = pubsub_udpTopicReceiver_scope(receiver);
+ const char *topic = pubsub_udpTopicReceiver_topic(receiver);
+ const char *url = pubsub_udpTopicReceiver_url(receiver);
+ const char *isPassive = pubsub_udpTopicReceiver_isPassive(receiver) ? " (passive)" : "";
+ const char *postUrl = pubsub_udpTopicReceiver_isStatic(receiver) ? " (static)" : "";
celix_array_list_t *connected = celix_arrayList_create();
celix_array_list_t *unconnected = celix_arrayList_create();
- pubsub_tcpTopicReceiver_listConnections(receiver, connected, unconnected);
+ pubsub_udpTopicReceiver_listConnections(receiver, connected, unconnected);
fprintf(out, "|- Topic Receiver %s/%s\n", scope == NULL ? "(null)" : scope, topic);
fprintf(out, " |- serializer type = %s\n", serType);
fprintf(out, " |- protocol type = %s\n", protType);
+ if (url) fprintf(out, " |- url = %s%s%s\n", url, postUrl, isPassive);
for (int i = 0; i < celix_arrayList_size(connected); ++i) {
- char *url = celix_arrayList_get(connected, i);
- fprintf(out, " |- connected url = %s\n", url);
- free(url);
+ char *_url = celix_arrayList_get(connected, i);
+ fprintf(out, " |- connected url = %s\n", _url);
+ free(_url);
}
for (int i = 0; i < celix_arrayList_size(unconnected); ++i) {
- char *url = celix_arrayList_get(unconnected, i);
- fprintf(out, " |- unconnected url = %s\n", url);
- free(url);
+ char *_url = celix_arrayList_get(unconnected, i);
+ fprintf(out, " |- unconnected url = %s\n", _url);
+ free(_url);
}
celix_arrayList_destroy(connected);
celix_arrayList_destroy(unconnected);
@@ -704,6 +779,6 @@ bool pubsub_tcpAdmin_executeCommand(void *handle, const char *commandLine __attr
return status;
}
-pubsub_admin_metrics_t *pubsub_tcpAdmin_metrics(void *handle) {
+pubsub_admin_metrics_t *pubsub_udpAdmin_metrics(void *handle) {
return NULL;
}
diff --git a/bundles/pubsub/pubsub_admin_udp/src/pubsub_udp_admin.h b/bundles/pubsub/pubsub_admin_udp/src/pubsub_udp_admin.h
new file mode 100644
index 0000000..fd5ea24
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_udp/src/pubsub_udp_admin.h
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef CELIX_PUBSUB_UDP_ADMIN_H
+#define CELIX_PUBSUB_UDP_ADMIN_H
+
+#include <pubsub_admin_metrics.h>
+#include <pubsub_message_serialization_service.h>
+#include <stdint.h>
+#include "celix_api.h"
+#include "celix_log_helper.h"
+#include "pubsub_psa_udp_constants.h"
+
+typedef struct pubsub_udp_admin pubsub_udp_admin_t;
+
+typedef struct psa_udp_serializer_entry {
+ const char *fqn;
+ const char *version;
+ pubsub_message_serialization_service_t *svc;
+} psa_udp_serializer_entry_t;
+
+pubsub_udp_admin_t *pubsub_udpAdmin_create(celix_bundle_context_t *ctx, celix_log_helper_t *logHelper);
+void pubsub_udpAdmin_destroy(pubsub_udp_admin_t *psa);
+
+celix_status_t pubsub_udpAdmin_matchPublisher(void *handle,
+ long svcRequesterBndId,
+ const celix_filter_t *svcFilter,
+ celix_properties_t **topicProperties,
+ double *score,
+ long *serializerSvcId,
+ long *protocolSvcId);
+celix_status_t pubsub_udpAdmin_matchSubscriber(void *handle,
+ long svcProviderBndId,
+ const celix_properties_t *svcProperties,
+ celix_properties_t **topicProperties,
+ double *score,
+ long *serializerSvcId,
+ long *protocolSvcId);
+celix_status_t pubsub_udpAdmin_matchDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint, bool *match);
+
+celix_status_t pubsub_udpAdmin_setupTopicSender(void *handle,
+ const char *scope,
+ const char *topic,
+ const celix_properties_t *topicProperties,
+ long serializerSvcId,
+ long protocolSvcId,
+ celix_properties_t **publisherEndpoint);
+celix_status_t pubsub_udpAdmin_teardownTopicSender(void *handle, const char *scope, const char *topic);
+
+celix_status_t pubsub_udpAdmin_setupTopicReceiver(void *handle,
+ const char *scope,
+ const char *topic,
+ const celix_properties_t *topicProperties,
+ long serializerSvcId,
+ long protocolSvcId,
+ celix_properties_t **subscriberEndpoint);
+celix_status_t pubsub_udpAdmin_teardownTopicReceiver(void *handle, const char *scope, const char *topic);
+
+celix_status_t pubsub_udpAdmin_addDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint);
+celix_status_t pubsub_udpAdmin_removeDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint);
+
+void pubsub_udpAdmin_addSerializerSvc(void *handle, void *svc, const celix_properties_t *props);
+void pubsub_udpAdmin_removeSerializerSvc(void *handle, void *svc, const celix_properties_t *props);
+
+void pubsub_udpAdmin_addProtocolSvc(void *handle, void *svc, const celix_properties_t *props);
+void pubsub_udpAdmin_removeProtocolSvc(void *handle, void *svc, const celix_properties_t *props);
+bool pubsub_udpAdmin_executeCommand(void *handle, const char *commandLine, FILE *outStream, FILE *errStream);
+
+pubsub_admin_metrics_t *pubsub_udpAdmin_metrics(void *handle);
+
+#endif //CELIX_PUBSUB_udp_ADMIN_H
+
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c b/bundles/pubsub/pubsub_admin_udp/src/pubsub_udp_topic_receiver.c
similarity index 70%
copy from bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
copy to bundles/pubsub/pubsub_admin_udp/src/pubsub_udp_topic_receiver.c
index 597e4ff..6e78c9d 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_udp/src/pubsub_udp_topic_receiver.c
@@ -25,11 +25,9 @@
#include <memory.h>
#include <arpa/inet.h>
#include <celix_log_helper.h>
-#include "pubsub_tcp_handler.h"
-#include "pubsub_tcp_topic_receiver.h"
-#include "pubsub_psa_tcp_constants.h"
-#include "pubsub_tcp_common.h"
-#include "pubsub_tcp_admin.h"
+#include "pubsub_skt_handler.h"
+#include "pubsub_udp_topic_receiver.h"
+#include "pubsub_psa_udp_constants.h"
#include <uuid/uuid.h>
#include <pubsub_utils.h>
@@ -51,19 +49,21 @@
#define L_ERROR(...) \
celix_logHelper_log(receiver->logHelper, CELIX_LOG_LEVEL_ERROR, __VA_ARGS__)
-struct pubsub_tcp_topic_receiver {
+struct pubsub_udp_topic_receiver {
celix_bundle_context_t *ctx;
celix_log_helper_t *logHelper;
long protocolSvcId;
pubsub_protocol_service_t *protocol;
char *scope;
char *topic;
+ char *url;
pubsub_serializer_handler_t* serializerHandler;
void *admin;
size_t timeout;
bool isPassive;
- pubsub_tcpHandler_t *socketHandler;
- pubsub_tcpHandler_t *sharedSocketHandler;
+ bool isStatic;
+ pubsub_sktHandler_t *socketHandler;
+ pubsub_sktHandler_t *sharedSocketHandler;
pubsub_interceptors_handler_t *interceptorsHandler;
struct {
@@ -74,50 +74,50 @@ struct pubsub_tcp_topic_receiver {
struct {
celix_thread_mutex_t mutex;
- hash_map_t *map; //key = tcp url, value = psa_tcp_requested_connection_entry_t*
+ hash_map_t *map; //key = udp url, value = psa_udp_requested_connection_entry_t*
bool allConnected; //true if all requestedConnection are connected
} requestedConnections;
long subscriberTrackerId;
struct {
celix_thread_mutex_t mutex;
- hash_map_t *map; //key = long svc id, value = psa_tcp_subscriber_entry_t
+ hash_map_t *map; //key = long svc id, value = psa_udp_subscriber_entry_t
bool allInitialized;
} subscribers;
};
-typedef struct psa_tcp_requested_connection_entry {
- pubsub_tcp_topic_receiver_t *parent;
+typedef struct psa_udp_requested_connection_entry {
+ pubsub_udp_topic_receiver_t *parent;
char *url;
bool connected;
bool statically; //true if the connection is statically configured through the topic properties.
-} psa_tcp_requested_connection_entry_t;
+} psa_udp_requested_connection_entry_t;
-typedef struct psa_tcp_subscriber_entry {
+typedef struct psa_udp_subscriber_entry {
pubsub_subscriber_t* subscriberSvc;
bool initialized; //true if the init function is called through the receive thread
-} psa_tcp_subscriber_entry_t;
+} psa_udp_subscriber_entry_t;
-static void pubsub_tcpTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props);
-static void pubsub_tcpTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props);
-static void *psa_tcp_recvThread(void *data);
-static void psa_tcp_connectToAllRequestedConnections(pubsub_tcp_topic_receiver_t *receiver);
-static void psa_tcp_initializeAllSubscribers(pubsub_tcp_topic_receiver_t *receiver);
+static void pubsub_udpTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props);
+static void pubsub_udpTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props);
+static void *psa_udp_recvThread(void *data);
+static void psa_udp_connectToAllRequestedConnections(pubsub_udp_topic_receiver_t *receiver);
+static void psa_udp_initializeAllSubscribers(pubsub_udp_topic_receiver_t *receiver);
static void processMsg(void *handle, const pubsub_protocol_message_t *message, bool *release, struct timespec *receiveTime);
-static void psa_tcp_connectHandler(void *handle, const char *url, bool lock);
-static void psa_tcp_disConnectHandler(void *handle, const char *url, bool lock);
+static void psa_udp_connectHandler(void *handle, const char *url, bool lock);
+static void psa_udp_disConnectHandler(void *handle, const char *url, bool lock);
-pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context_t *ctx,
+pubsub_udp_topic_receiver_t *pubsub_udpTopicReceiver_create(celix_bundle_context_t *ctx,
celix_log_helper_t *logHelper,
const char *scope,
const char *topic,
pubsub_serializer_handler_t* serializerHandler,
void *admin,
const celix_properties_t *topicProperties,
- pubsub_tcp_endPointStore_t *handlerStore,
+ pubsub_sktHandler_endPointStore_t *handlerStore,
long protocolSvcId,
pubsub_protocol_service_t *protocol) {
- pubsub_tcp_topic_receiver_t *receiver = calloc(1, sizeof(*receiver));
+ pubsub_udp_topic_receiver_t *receiver = calloc(1, sizeof(*receiver));
receiver->ctx = ctx;
receiver->logHelper = logHelper;
receiver->serializerHandler = serializerHandler;
@@ -126,38 +126,43 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context
receiver->protocol = protocol;
receiver->scope = celix_utils_strdup(scope);
receiver->topic = celix_utils_strdup(topic);
- receiver->interceptorsHandler = pubsubInterceptorsHandler_create(ctx, scope, topic, PUBSUB_TCP_ADMIN_TYPE,
+ receiver->interceptorsHandler = pubsubInterceptorsHandler_create(ctx, scope, topic, PUBSUB_UDP_ADMIN_TYPE,
pubsub_serializerHandler_getSerializationType(serializerHandler));
- const char *staticConnectUrls = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_STATIC_CONNECT_URLS_FOR, topic, scope);
- const char *isPassive = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_PASSIVE_ENABLED, topic, scope);
- const char *passiveKey = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_PASSIVE_SELECTION_KEY, topic, scope);
+ const char *ip = celix_bundleContext_getProperty(ctx, PUBSUB_UDP_PSA_IP_KEY, NULL);
+ const char *discUrl = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_UDP_STATIC_BIND_URL_FOR, topic, scope);
+ const char *staticConnectUrls = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_UDP_STATIC_CONNECT_URLS_FOR, topic, scope);
+ const char *isPassive = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_UDP_PASSIVE_ENABLED, topic, scope);
+ const char *passiveKey = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_UDP_PASSIVE_SELECTION_KEY, topic, scope);
if (isPassive) {
- receiver->isPassive = psa_tcp_isPassive(isPassive);
+ receiver->isPassive = pubsub_sktHandler_isPassive(isPassive);
}
if (topicProperties != NULL) {
+ if (discUrl == NULL) {
+ discUrl = celix_properties_get(topicProperties, PUBSUB_UDP_STATIC_DISCOVER_URL, NULL);
+ }
if(staticConnectUrls == NULL) {
- staticConnectUrls = celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_CONNECT_URLS, NULL);
+ staticConnectUrls = celix_properties_get(topicProperties, PUBSUB_UDP_STATIC_CONNECT_URLS, NULL);
}
if (isPassive == NULL) {
- receiver->isPassive = celix_properties_getAsBool(topicProperties, PUBSUB_TCP_PASSIVE_CONFIGURED, false);
+ receiver->isPassive = celix_properties_getAsBool(topicProperties, PUBSUB_UDP_PASSIVE_CONFIGURED, false);
}
if (passiveKey == NULL) {
- passiveKey = celix_properties_get(topicProperties, PUBSUB_TCP_PASSIVE_KEY, NULL);
+ passiveKey = celix_properties_get(topicProperties, PUBSUB_UDP_PASSIVE_KEY, NULL);
}
}
// Set receiver connection thread timeout.
// property is in ms, timeout value in us. (convert ms to us).
- receiver->timeout = celix_bundleContext_getPropertyAsLong(ctx, PSA_TCP_SUBSCRIBER_CONNECTION_TIMEOUT,
- PSA_TCP_SUBSCRIBER_CONNECTION_DEFAULT_TIMEOUT) * 1000;
+ receiver->timeout = celix_bundleContext_getPropertyAsLong(ctx, PSA_UDP_SUBSCRIBER_CONNECTION_TIMEOUT,
+ PSA_UDP_SUBSCRIBER_CONNECTION_DEFAULT_TIMEOUT) * 1000;
/* When it's an endpoint share the socket with the sender */
if (passiveKey != NULL) {
celixThreadMutex_lock(&handlerStore->mutex);
- pubsub_tcpHandler_t *entry = hashMap_get(handlerStore->map, passiveKey);
+ pubsub_sktHandler_t *entry = hashMap_get(handlerStore->map, passiveKey);
if (entry == NULL) {
if (receiver->socketHandler == NULL)
- receiver->socketHandler = pubsub_tcpHandler_create(receiver->protocol, receiver->logHelper);
+ receiver->socketHandler = pubsub_sktHandler_create(receiver->protocol, receiver->logHelper);
entry = receiver->socketHandler;
receiver->sharedSocketHandler = receiver->socketHandler;
hashMap_put(handlerStore->map, (void *) passiveKey, entry);
@@ -167,28 +172,28 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context
}
celixThreadMutex_unlock(&handlerStore->mutex);
} else {
- receiver->socketHandler = pubsub_tcpHandler_create(receiver->protocol, receiver->logHelper);
+ receiver->socketHandler = pubsub_sktHandler_create(receiver->protocol, receiver->logHelper);
}
if (receiver->socketHandler != NULL) {
- long prio = celix_properties_getAsLong(topicProperties, PUBSUB_TCP_THREAD_REALTIME_PRIO, -1L);
- const char *sched = celix_properties_get(topicProperties, PUBSUB_TCP_THREAD_REALTIME_SCHED, NULL);
- long retryCnt = celix_properties_getAsLong(topicProperties, PUBSUB_TCP_SUBSCRIBER_RETRY_CNT_KEY,
- PUBSUB_TCP_SUBSCRIBER_RETRY_CNT_DEFAULT);
- double rcvTimeout = celix_properties_getAsDouble(topicProperties, PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_KEY, PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_DEFAULT);
- long bufferSize = celix_bundleContext_getPropertyAsLong(ctx, PSA_TCP_RECV_BUFFER_SIZE,
- PSA_TCP_DEFAULT_RECV_BUFFER_SIZE);
- long timeout = celix_bundleContext_getPropertyAsLong(ctx, PSA_TCP_TIMEOUT, PSA_TCP_DEFAULT_TIMEOUT);
-
- pubsub_tcpHandler_setThreadName(receiver->socketHandler, topic, scope);
- pubsub_tcpHandler_setReceiveBufferSize(receiver->socketHandler, (unsigned int) bufferSize);
- pubsub_tcpHandler_setTimeout(receiver->socketHandler, (unsigned int) timeout);
- pubsub_tcpHandler_addMessageHandler(receiver->socketHandler, receiver, processMsg);
- pubsub_tcpHandler_addReceiverConnectionCallback(receiver->socketHandler, receiver, psa_tcp_connectHandler,
- psa_tcp_disConnectHandler);
- pubsub_tcpHandler_setThreadPriority(receiver->socketHandler, prio, sched);
- pubsub_tcpHandler_setReceiveRetryCnt(receiver->socketHandler, (unsigned int) retryCnt);
- pubsub_tcpHandler_setReceiveTimeOut(receiver->socketHandler, rcvTimeout);
+ long prio = celix_properties_getAsLong(topicProperties, PUBSUB_UDP_THREAD_REALTIME_PRIO, -1L);
+ const char *sched = celix_properties_get(topicProperties, PUBSUB_UDP_THREAD_REALTIME_SCHED, NULL);
+ long retryCnt = celix_properties_getAsLong(topicProperties, PUBSUB_UDP_SUBSCRIBER_RETRY_CNT_KEY,
+ PUBSUB_UDP_SUBSCRIBER_RETRY_CNT_DEFAULT);
+ double rcvTimeout = celix_properties_getAsDouble(topicProperties, PUBSUB_UDP_SUBSCRIBER_RCVTIMEO_KEY, PUBSUB_UDP_SUBSCRIBER_RCVTIMEO_DEFAULT);
+ long bufferSize = celix_bundleContext_getPropertyAsLong(ctx, PSA_UDP_RECV_BUFFER_SIZE,
+ PSA_UDP_DEFAULT_RECV_BUFFER_SIZE);
+ long timeout = celix_bundleContext_getPropertyAsLong(ctx, PSA_UDP_TIMEOUT, PSA_UDP_DEFAULT_TIMEOUT);
+
+ pubsub_sktHandler_setThreadName(receiver->socketHandler, topic, scope);
+ pubsub_sktHandler_setReceiveBufferSize(receiver->socketHandler, (unsigned int) bufferSize);
+ pubsub_sktHandler_setTimeout(receiver->socketHandler, (unsigned int) timeout);
+ pubsub_sktHandler_addMessageHandler(receiver->socketHandler, receiver, processMsg);
+ pubsub_sktHandler_addReceiverConnectionCallback(receiver->socketHandler, receiver, psa_udp_connectHandler,
+ psa_udp_disConnectHandler);
+ pubsub_sktHandler_setThreadPriority(receiver->socketHandler, prio, sched);
+ pubsub_sktHandler_setReceiveRetryCnt(receiver->socketHandler, (unsigned int) retryCnt);
+ pubsub_sktHandler_setReceiveTimeOut(receiver->socketHandler, rcvTimeout);
}
celixThreadMutex_create(&receiver->subscribers.mutex, NULL);
celixThreadMutex_create(&receiver->requestedConnections.mutex, NULL);
@@ -202,23 +207,63 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context
char *url;
char *save = urlsCopy;
while ((url = strtok_r(save, " ", &save))) {
- psa_tcp_requested_connection_entry_t *entry = calloc(1, sizeof(*entry));
- entry->statically = true;
- entry->connected = false;
- entry->url = celix_utils_strdup(url);
- entry->parent = receiver;
- hashMap_put(receiver->requestedConnections.map, entry->url, entry);
- receiver->requestedConnections.allConnected = false;
+ pubsub_utils_url_t *url_info = pubsub_utils_url_parse(url);
+ bool is_multicast = pubsub_utils_url_is_multicast(url_info->hostname);
+ bool is_broadcast = pubsub_utils_url_is_broadcast(url_info->hostname);
+ if (is_multicast || is_broadcast) {
+ psa_udp_requested_connection_entry_t *entry = calloc(1, sizeof(*entry));
+ entry->statically = true;
+ entry->connected = false;
+ entry->url = celix_utils_strdup(url);
+ entry->parent = receiver;
+ hashMap_put(receiver->requestedConnections.map, entry->url, entry);
+ receiver->requestedConnections.allConnected = false;
+ }
+ pubsub_utils_url_free(url_info);
}
free(urlsCopy);
}
+ if (!receiver->isPassive) {
+ //setting up tcp socket for UDP TopicReceiver
+ char *urls = NULL;
+ if (discUrl != NULL) {
+ urls = celix_utils_strdup(discUrl);
+ } else if (ip != NULL) {
+ urls = celix_utils_strdup(ip);
+ } else {
+ struct sockaddr_in *sin = pubsub_utils_url_getInAddr(NULL, 0);
+ urls = pubsub_utils_url_get_url(sin, NULL);
+ free(sin);
+ }
+ if (urls) {
+ char *urlsCopy = celix_utils_strdup(urls);
+ char *url;
+ char *save = urlsCopy;
+ while ((url = strtok_r(save, " ", &save))) {
+ pubsub_utils_url_t *url_info = pubsub_utils_url_parse(url);
+ bool is_multicast = pubsub_utils_url_is_multicast(url_info->hostname);
+ bool is_broadcast = pubsub_utils_url_is_broadcast(url_info->hostname);
+ if (!is_multicast && !is_broadcast) {
+ int rc = pubsub_sktHandler_udp_bind(receiver->socketHandler, url);
+ if (rc < 0) {
+ L_WARN("Error for udp listen using dynamic bind url '%s'. %s", url, strerror(errno));
+ } else {
+ url = NULL;}
+ }
+ pubsub_utils_url_free(url_info);
+ }
+ receiver->url = pubsub_sktHandler_get_interface_url(receiver->socketHandler);
+ free(urlsCopy);
+ }
+ free(urls);
+ }
if (receiver->socketHandler != NULL && (!receiver->isPassive)) {
// Configure Receiver thread
receiver->thread.running = true;
- celixThread_create(&receiver->thread.thread, NULL, psa_tcp_recvThread, receiver);
+ celixThread_create(&receiver->thread.thread, NULL, psa_udp_recvThread, receiver);
char name[64];
- snprintf(name, 64, "TCP TR %s/%s", scope == NULL ? "(null)" : scope, topic);
+ snprintf(name, 64, "UDP TR %s/%s", scope == NULL ? "(null)" : scope, topic);
celixThread_setName(&receiver->thread.thread, name);
}
@@ -232,8 +277,8 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context
opts.filter.serviceName = PUBSUB_SUBSCRIBER_SERVICE_NAME;
opts.filter.filter = buf;
opts.callbackHandle = receiver;
- opts.addWithProperties = pubsub_tcpTopicReceiver_addSubscriber;
- opts.removeWithProperties = pubsub_tcpTopicReceiver_removeSubscriber;
+ opts.addWithProperties = pubsub_udpTopicReceiver_addSubscriber;
+ opts.removeWithProperties = pubsub_udpTopicReceiver_removeSubscriber;
receiver->subscriberTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
}
@@ -244,12 +289,12 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context
free(receiver->topic);
free(receiver);
receiver = NULL;
- L_ERROR("[PSA_TCP] Cannot create TopicReceiver for %s/%s", scope == NULL ? "(null)" : scope, topic);
+ L_ERROR("[PSA_udp] Cannot create TopicReceiver for %s/%s", scope == NULL ? "(null)" : scope, topic);
}
return receiver;
}
-void pubsub_tcpTopicReceiver_destroy(pubsub_tcp_topic_receiver_t *receiver) {
+void pubsub_udpTopicReceiver_destroy(pubsub_udp_topic_receiver_t *receiver) {
if (receiver != NULL) {
celixThreadMutex_lock(&receiver->thread.mutex);
@@ -268,7 +313,7 @@ void pubsub_tcpTopicReceiver_destroy(pubsub_tcp_topic_receiver_t *receiver) {
celixThreadMutex_lock(&receiver->requestedConnections.mutex);
hash_map_iterator_t iter = hashMapIterator_construct(receiver->requestedConnections.map);
while (hashMapIterator_hasNext(&iter)) {
- psa_tcp_requested_connection_entry_t *entry = hashMapIterator_nextValue(&iter);
+ psa_udp_requested_connection_entry_t *entry = hashMapIterator_nextValue(&iter);
if (entry != NULL) {
free(entry->url);
free(entry);
@@ -281,10 +326,10 @@ void pubsub_tcpTopicReceiver_destroy(pubsub_tcp_topic_receiver_t *receiver) {
celixThreadMutex_destroy(&receiver->requestedConnections.mutex);
celixThreadMutex_destroy(&receiver->thread.mutex);
- pubsub_tcpHandler_addMessageHandler(receiver->socketHandler, NULL, NULL);
- pubsub_tcpHandler_addReceiverConnectionCallback(receiver->socketHandler, NULL, NULL, NULL);
+ pubsub_sktHandler_addMessageHandler(receiver->socketHandler, NULL, NULL);
+ pubsub_sktHandler_addReceiverConnectionCallback(receiver->socketHandler, NULL, NULL, NULL);
if ((receiver->socketHandler) && (receiver->sharedSocketHandler == NULL)) {
- pubsub_tcpHandler_destroy(receiver->socketHandler);
+ pubsub_sktHandler_destroy(receiver->socketHandler);
receiver->socketHandler = NULL;
}
pubsubInterceptorsHandler_destroy(receiver->interceptorsHandler);
@@ -296,27 +341,44 @@ void pubsub_tcpTopicReceiver_destroy(pubsub_tcp_topic_receiver_t *receiver) {
free(receiver);
}
-const char *pubsub_tcpTopicReceiver_scope(pubsub_tcp_topic_receiver_t *receiver) {
+const char *pubsub_udpTopicReceiver_scope(pubsub_udp_topic_receiver_t *receiver) {
return receiver->scope;
}
-const char *pubsub_tcpTopicReceiver_topic(pubsub_tcp_topic_receiver_t *receiver) {
+const char *pubsub_udpTopicReceiver_topic(pubsub_udp_topic_receiver_t *receiver) {
return receiver->topic;
}
-const char *pubsub_tcpTopicReceiver_serializerType(pubsub_tcp_topic_receiver_t *receiver) {
+const char *pubsub_udpTopicReceiver_serializerType(pubsub_udp_topic_receiver_t *receiver) {
return pubsub_serializerHandler_getSerializationType(receiver->serializerHandler);
}
-long pubsub_tcpTopicReceiver_protocolSvcId(pubsub_tcp_topic_receiver_t *receiver) {
+long pubsub_udpTopicReceiver_protocolSvcId(pubsub_udp_topic_receiver_t *receiver) {
return receiver->protocolSvcId;
}
-void pubsub_tcpTopicReceiver_listConnections(pubsub_tcp_topic_receiver_t *receiver, celix_array_list_t *connectedUrls,
+bool pubsub_udpTopicReceiver_isStatic(pubsub_udp_topic_receiver_t *receiver) {
+ return receiver->isStatic;
+}
+
+bool pubsub_udpTopicReceiver_isPassive(pubsub_udp_topic_receiver_t *receiver) {
+ return receiver->isPassive;
+}
+
+const char *pubsub_udpTopicReceiver_url(pubsub_udp_topic_receiver_t *receiver) {
+ if (receiver->isPassive) {
+ return pubsub_sktHandler_get_connection_url(receiver->socketHandler);
+ } else {
+ return receiver->url;
+ }
+}
+
+
+void pubsub_udpTopicReceiver_listConnections(pubsub_udp_topic_receiver_t *receiver, celix_array_list_t *connectedUrls,
celix_array_list_t *unconnectedUrls) {
celixThreadMutex_lock(&receiver->requestedConnections.mutex);
if (receiver->isPassive) {
- char* interface_url = pubsub_tcpHandler_get_interface_url(receiver->socketHandler);
+ char* interface_url = pubsub_sktHandler_get_interface_url(receiver->socketHandler);
char *url = NULL;
asprintf(&url, "%s (passive)", interface_url ? interface_url : "");
if (interface_url) {
@@ -328,7 +390,7 @@ void pubsub_tcpTopicReceiver_listConnections(pubsub_tcp_topic_receiver_t *receiv
} else {
hash_map_iterator_t iter = hashMapIterator_construct(receiver->requestedConnections.map);
while (hashMapIterator_hasNext(&iter)) {
- psa_tcp_requested_connection_entry_t *entry = hashMapIterator_nextValue(&iter);
+ psa_udp_requested_connection_entry_t *entry = hashMapIterator_nextValue(&iter);
char *url = NULL;
asprintf(&url, "%s%s", entry->url, entry->statically ? " (static)" : "");
if (entry->connected) {
@@ -341,46 +403,49 @@ void pubsub_tcpTopicReceiver_listConnections(pubsub_tcp_topic_receiver_t *receiv
celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
}
-bool pubsub_tcpTopicReceiver_isPassive(pubsub_tcp_topic_receiver_t *receiver) {
- return receiver->isPassive;
-}
+void pubsub_udpTopicReceiver_connectTo(pubsub_udp_topic_receiver_t *receiver, const char *url) {
+ if (!url) return;
+ char *connectUrl = celix_utils_strdup(url);
+ pubsub_utils_url_t *urlInfo = pubsub_utils_url_parse(connectUrl);
+ bool is_multicast = pubsub_utils_url_is_multicast(urlInfo->hostname);
+ bool is_broadcast = pubsub_utils_url_is_broadcast(urlInfo->hostname);
+ pubsub_utils_url_free(urlInfo);
-void pubsub_tcpTopicReceiver_connectTo(
- pubsub_tcp_topic_receiver_t *receiver,
- const char *url) {
- L_DEBUG("[PSA_TCP] TopicReceiver %s/%s connecting to tcp url %s",
- receiver->scope == NULL ? "(null)" : receiver->scope,
- receiver->topic,
- url);
+ if (is_multicast || is_broadcast) {
+ L_DEBUG("[PSA_udp] TopicReceiver %s/%s connecting to udp url %s",
+ receiver->scope == NULL ? "(null)" : receiver->scope,
+ receiver->topic,
+ url);
- celixThreadMutex_lock(&receiver->requestedConnections.mutex);
- psa_tcp_requested_connection_entry_t *entry = hashMap_get(receiver->requestedConnections.map, url);
- if (entry == NULL) {
- entry = calloc(1, sizeof(*entry));
- entry->url = celix_utils_strdup(url);
- entry->connected = false;
- entry->statically = false;
- entry->parent = receiver;
- hashMap_put(receiver->requestedConnections.map, (void *) entry->url, entry);
- receiver->requestedConnections.allConnected = false;
+ celixThreadMutex_lock(&receiver->requestedConnections.mutex);
+ psa_udp_requested_connection_entry_t *entry = hashMap_get(receiver->requestedConnections.map, url);
+ if (entry == NULL) {
+ entry = calloc(1, sizeof(*entry));
+ entry->url = celix_utils_strdup(url);
+ entry->connected = false;
+ entry->statically = false;
+ entry->parent = receiver;
+ hashMap_put(receiver->requestedConnections.map, (void *) entry->url, entry);
+ receiver->requestedConnections.allConnected = false;
+ }
+ celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
+ psa_udp_connectToAllRequestedConnections(receiver);
}
- celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
-
- psa_tcp_connectToAllRequestedConnections(receiver);
}
-void pubsub_tcpTopicReceiver_disconnectFrom(pubsub_tcp_topic_receiver_t *receiver, const char *url) {
- L_DEBUG("[PSA TCP] TopicReceiver %s/%s disconnect from tcp url %s",
+void pubsub_udpTopicReceiver_disconnectFrom(pubsub_udp_topic_receiver_t *receiver, const char *url) {
+ if (!url) return;
+ L_DEBUG("[PSA udp] TopicReceiver %s/%s disconnect from udp url %s",
receiver->scope == NULL ? "(null)" : receiver->scope,
receiver->topic,
url);
celixThreadMutex_lock(&receiver->requestedConnections.mutex);
- psa_tcp_requested_connection_entry_t *entry = hashMap_remove(receiver->requestedConnections.map, url);
+ psa_udp_requested_connection_entry_t *entry = hashMap_remove(receiver->requestedConnections.map, url);
if (entry != NULL) {
- int rc = pubsub_tcpHandler_disconnect(receiver->socketHandler, entry->url);
+ int rc = pubsub_sktHandler_disconnect(receiver->socketHandler, entry->url);
if (rc < 0)
- L_WARN("[PSA_TCP] Error disconnecting from tcp url %s. (%s)", url, strerror(errno));
+ L_WARN("[PSA_udp] Error disconnecting from udp url %s. (%s)", url, strerror(errno));
}
if (entry != NULL) {
free(entry->url);
@@ -389,8 +454,8 @@ void pubsub_tcpTopicReceiver_disconnectFrom(pubsub_tcp_topic_receiver_t *receive
celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
}
-static void pubsub_tcpTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props) {
- pubsub_tcp_topic_receiver_t *receiver = handle;
+static void pubsub_udpTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props) {
+ pubsub_udp_topic_receiver_t *receiver = handle;
long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1);
const char *subScope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, NULL);
@@ -408,7 +473,7 @@ static void pubsub_tcpTopicReceiver_addSubscriber(void *handle, void *svc, const
return;
}
- psa_tcp_subscriber_entry_t *entry = calloc(1, sizeof(*entry));
+ psa_udp_subscriber_entry_t *entry = calloc(1, sizeof(*entry));
entry->subscriberSvc = svc;
entry->initialized = false;
@@ -418,23 +483,23 @@ static void pubsub_tcpTopicReceiver_addSubscriber(void *handle, void *svc, const
celixThreadMutex_unlock(&receiver->subscribers.mutex);
}
-static void pubsub_tcpTopicReceiver_removeSubscriber(void *handle, void *svc __attribute__((unused)), const celix_properties_t *props) {
- pubsub_tcp_topic_receiver_t *receiver = handle;
+static void pubsub_udpTopicReceiver_removeSubscriber(void *handle, void *svc __attribute__((unused)), const celix_properties_t *props) {
+ pubsub_udp_topic_receiver_t *receiver = handle;
long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1);
celixThreadMutex_lock(&receiver->subscribers.mutex);
- psa_tcp_subscriber_entry_t *entry = hashMap_remove(receiver->subscribers.map, (void *)svcId);
+ psa_udp_subscriber_entry_t *entry = hashMap_remove(receiver->subscribers.map, (void *)svcId);
free(entry);
celixThreadMutex_unlock(&receiver->subscribers.mutex);
}
-static void callReceivers(pubsub_tcp_topic_receiver_t *receiver, const char* msgFqn, const pubsub_protocol_message_t *message, void** msg, bool* release, const celix_properties_t* metadata) {
+static void callReceivers(pubsub_udp_topic_receiver_t *receiver, const char* msgFqn, const pubsub_protocol_message_t *message, void** msg, bool* release, const celix_properties_t* metadata) {
*release = true;
celixThreadMutex_lock(&receiver->subscribers.mutex);
hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map);
while (hashMapIterator_hasNext(&iter)) {
- psa_tcp_subscriber_entry_t* entry = hashMapIterator_nextValue(&iter);
+ psa_udp_subscriber_entry_t* entry = hashMapIterator_nextValue(&iter);
if (entry != NULL && entry->subscriberSvc->receive != NULL) {
entry->subscriberSvc->receive(entry->subscriberSvc->handle, msgFqn, message->header.msgId, *msg, metadata, release);
if (!(*release) && hashMapIterator_hasNext(&iter)) { //receive function has taken ownership, deserialize again for new message
@@ -447,7 +512,7 @@ static void callReceivers(pubsub_tcp_topic_receiver_t *receiver, const char* msg
message->header.msgMinorVersion,
&deSerializeBuffer, 0, msg);
if (status != CELIX_SUCCESS) {
- L_WARN("[PSA_TCP_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgFqn,
+ L_WARN("[PSA_UDP_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgFqn,
receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
break;
}
@@ -461,7 +526,7 @@ static void callReceivers(pubsub_tcp_topic_receiver_t *receiver, const char* msg
}
static inline void processMsg(void* handle, const pubsub_protocol_message_t *message, bool* releaseMsg, struct timespec *receiveTime) {
- pubsub_tcp_topic_receiver_t *receiver = handle;
+ pubsub_udp_topic_receiver_t *receiver = handle;
const char *msgFqn = pubsub_serializerHandler_getMsgFqn(receiver->serializerHandler, message->header.msgId);
if (msgFqn == NULL) {
@@ -501,7 +566,7 @@ static inline void processMsg(void* handle, const pubsub_protocol_message_t *mes
message->header.msgMinorVersion,
&deSerializeBuffer, 0, &deSerializedMsg);
if (status != CELIX_SUCCESS) {
- L_WARN("[PSA_TCP_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgFqn,
+ L_WARN("[PSA_UDP_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgFqn,
receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
} else {
pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgFqn, message->header.msgId, deSerializedMsg, metadata);
@@ -521,11 +586,11 @@ static inline void processMsg(void* handle, const pubsub_protocol_message_t *mes
celix_properties_destroy(metadata);
}
} else {
- L_WARN("[PSA_TCP_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgFqn,
+ L_WARN("[PSA_UDP_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgFqn,
receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
}
} else {
- L_WARN("[PSA_TCP_TR] Cannot deserialize message '%s' using %s, version mismatch. Version received: %i.%i.x, version local: %i.%i.x",
+ L_WARN("[PSA_UDP_TR] Cannot deserialize message '%s' using %s, version mismatch. Version received: %i.%i.x, version local: %i.%i.x",
msgFqn,
pubsub_serializerHandler_getSerializationType(receiver->serializerHandler),
(int) message->header.msgMajorVersion,
@@ -535,8 +600,8 @@ static inline void processMsg(void* handle, const pubsub_protocol_message_t *mes
}
}
-static void *psa_tcp_recvThread(void *data) {
- pubsub_tcp_topic_receiver_t *receiver = data;
+static void *psa_udp_recvThread(void *data) {
+ pubsub_udp_topic_receiver_t *receiver = data;
celixThreadMutex_lock(&receiver->thread.mutex);
bool running = receiver->thread.running;
@@ -552,10 +617,10 @@ static void *psa_tcp_recvThread(void *data) {
while (running) {
if (!allConnected) {
- psa_tcp_connectToAllRequestedConnections(receiver);
+ psa_udp_connectToAllRequestedConnections(receiver);
}
if (!allInitialized) {
- psa_tcp_initializeAllSubscribers(receiver);
+ psa_udp_initializeAllSubscribers(receiver);
}
usleep(receiver->timeout);
@@ -574,15 +639,15 @@ static void *psa_tcp_recvThread(void *data) {
return NULL;
}
-static void psa_tcp_connectToAllRequestedConnections(pubsub_tcp_topic_receiver_t *receiver) {
+static void psa_udp_connectToAllRequestedConnections(pubsub_udp_topic_receiver_t *receiver) {
celixThreadMutex_lock(&receiver->requestedConnections.mutex);
if (!receiver->requestedConnections.allConnected) {
bool allConnected = true;
hash_map_iterator_t iter = hashMapIterator_construct(receiver->requestedConnections.map);
while (hashMapIterator_hasNext(&iter)) {
- psa_tcp_requested_connection_entry_t *entry = hashMapIterator_nextValue(&iter);
+ psa_udp_requested_connection_entry_t *entry = hashMapIterator_nextValue(&iter);
if ((entry) && (!entry->connected) && (!receiver->isPassive)) {
- int rc = pubsub_tcpHandler_connect(entry->parent->socketHandler, entry->url);
+ int rc = pubsub_sktHandler_udp_connect(entry->parent->socketHandler, entry->url);
if (rc < 0) {
allConnected = false;
}
@@ -593,15 +658,15 @@ static void psa_tcp_connectToAllRequestedConnections(pubsub_tcp_topic_receiver_t
celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
}
-static void psa_tcp_connectHandler(void *handle, const char *url, bool lock) {
- pubsub_tcp_topic_receiver_t *receiver = handle;
- L_DEBUG("[PSA_TCP] TopicReceiver %s/%s connecting to tcp url %s",
+static void psa_udp_connectHandler(void *handle, const char *url, bool lock) {
+ pubsub_udp_topic_receiver_t *receiver = handle;
+ L_DEBUG("[PSA_udp] TopicReceiver %s/%s connecting to udp url %s",
receiver->scope == NULL ? "(null)" : receiver->scope,
receiver->topic,
url);
if (lock)
celixThreadMutex_lock(&receiver->requestedConnections.mutex);
- psa_tcp_requested_connection_entry_t *entry = hashMap_get(receiver->requestedConnections.map, url);
+ psa_udp_requested_connection_entry_t *entry = hashMap_get(receiver->requestedConnections.map, url);
if (entry == NULL) {
entry = calloc(1, sizeof(*entry));
entry->parent = receiver;
@@ -615,15 +680,15 @@ static void psa_tcp_connectHandler(void *handle, const char *url, bool lock) {
celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
}
-static void psa_tcp_disConnectHandler(void *handle, const char *url, bool lock) {
- pubsub_tcp_topic_receiver_t *receiver = handle;
- L_DEBUG("[PSA TCP] TopicReceiver %s/%s disconnect from tcp url %s",
+static void psa_udp_disConnectHandler(void *handle, const char *url, bool lock) {
+ pubsub_udp_topic_receiver_t *receiver = handle;
+ L_DEBUG("[PSA udp] TopicReceiver %s/%s disconnect from udp url %s",
receiver->scope == NULL ? "(null)" : receiver->scope,
receiver->topic,
url);
if (lock)
celixThreadMutex_lock(&receiver->requestedConnections.mutex);
- psa_tcp_requested_connection_entry_t *entry = hashMap_get(receiver->requestedConnections.map, url);
+ psa_udp_requested_connection_entry_t *entry = hashMap_get(receiver->requestedConnections.map, url);
if (entry != NULL) {
entry->connected = false;
receiver->requestedConnections.allConnected = false;
@@ -632,13 +697,13 @@ static void psa_tcp_disConnectHandler(void *handle, const char *url, bool lock)
celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
}
-static void psa_tcp_initializeAllSubscribers(pubsub_tcp_topic_receiver_t *receiver) {
+static void psa_udp_initializeAllSubscribers(pubsub_udp_topic_receiver_t *receiver) {
celixThreadMutex_lock(&receiver->subscribers.mutex);
if (!receiver->subscribers.allInitialized) {
bool allInitialized = true;
hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map);
while (hashMapIterator_hasNext(&iter)) {
- psa_tcp_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
+ psa_udp_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
if (!entry->initialized) {
int rc = 0;
if (entry->subscriberSvc->init != NULL) {
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.h b/bundles/pubsub/pubsub_admin_udp/src/pubsub_udp_topic_receiver.h
similarity index 66%
copy from bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.h
copy to bundles/pubsub/pubsub_admin_udp/src/pubsub_udp_topic_receiver.h
index 35c14c6..40a18c4 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.h
+++ b/bundles/pubsub/pubsub_admin_udp/src/pubsub_udp_topic_receiver.h
@@ -17,40 +17,40 @@
* under the License.
*/
-#ifndef CELIX_PUBSUB_TCP_TOPIC_RECEIVER_H
-#define CELIX_PUBSUB_TCP_TOPIC_RECEIVER_H
+#ifndef CELIX_PUBSUB_UDP_TOPIC_RECEIVER_H
+#define CELIX_PUBSUB_UDP_TOPIC_RECEIVER_H
#include "pubsub_admin_metrics.h"
#include "celix_bundle_context.h"
#include "pubsub_protocol.h"
-#include "pubsub_tcp_common.h"
#include "pubsub_serializer_handler.h"
-typedef struct pubsub_tcp_topic_receiver pubsub_tcp_topic_receiver_t;
+typedef struct pubsub_udp_topic_receiver pubsub_udp_topic_receiver_t;
-pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context_t *ctx,
+pubsub_udp_topic_receiver_t *pubsub_udpTopicReceiver_create(celix_bundle_context_t *ctx,
celix_log_helper_t *logHelper,
const char *scope,
const char *topic,
pubsub_serializer_handler_t* serializerHandler,
void *admin,
const celix_properties_t *topicProperties,
- pubsub_tcp_endPointStore_t *handlerStore,
+ pubsub_sktHandler_endPointStore_t *handlerStore,
long protocolSvcId,
pubsub_protocol_service_t *protocol);
-void pubsub_tcpTopicReceiver_destroy(pubsub_tcp_topic_receiver_t *receiver);
+void pubsub_udpTopicReceiver_destroy(pubsub_udp_topic_receiver_t *receiver);
-const char *pubsub_tcpTopicReceiver_scope(pubsub_tcp_topic_receiver_t *receiver);
-const char *pubsub_tcpTopicReceiver_topic(pubsub_tcp_topic_receiver_t *receiver);
-const char *pubsub_tcpTopicReceiver_serializerType(pubsub_tcp_topic_receiver_t *sender);
+const char *pubsub_udpTopicReceiver_scope(pubsub_udp_topic_receiver_t *receiver);
+const char *pubsub_udpTopicReceiver_topic(pubsub_udp_topic_receiver_t *receiver);
+const char *pubsub_udpTopicReceiver_serializerType(pubsub_udp_topic_receiver_t *sender);
-long pubsub_tcpTopicReceiver_protocolSvcId(pubsub_tcp_topic_receiver_t *receiver);
-void pubsub_tcpTopicReceiver_listConnections(pubsub_tcp_topic_receiver_t *receiver,
+long pubsub_udpTopicReceiver_protocolSvcId(pubsub_udp_topic_receiver_t *receiver);
+void pubsub_udpTopicReceiver_listConnections(pubsub_udp_topic_receiver_t *receiver,
celix_array_list_t *connectedUrls,
celix_array_list_t *unconnectedUrls);
-bool pubsub_tcpTopicReceiver_isPassive(pubsub_tcp_topic_receiver_t *sender);
+bool pubsub_udpTopicReceiver_isPassive(pubsub_udp_topic_receiver_t *receiver);
+const char *pubsub_udpTopicReceiver_url(pubsub_udp_topic_receiver_t *receiver);
+bool pubsub_udpTopicReceiver_isStatic(pubsub_udp_topic_receiver_t *receiver);
+void pubsub_udpTopicReceiver_connectTo(pubsub_udp_topic_receiver_t *receiver, const char *url);
+void pubsub_udpTopicReceiver_disconnectFrom(pubsub_udp_topic_receiver_t *receiver, const char *url);
-void pubsub_tcpTopicReceiver_connectTo(pubsub_tcp_topic_receiver_t *receiver, const char *url);
-void pubsub_tcpTopicReceiver_disconnectFrom(pubsub_tcp_topic_receiver_t *receiver, const char *url);
-
-#endif //CELIX_PUBSUB_TCP_TOPIC_RECEIVER_H
+#endif //CELIX_PUBSUB_UDP_TOPIC_RECEIVER_H
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c b/bundles/pubsub/pubsub_admin_udp/src/pubsub_udp_topic_sender.c
similarity index 62%
copy from bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
copy to bundles/pubsub/pubsub_admin_udp/src/pubsub_udp_topic_sender.c
index e318829..71efebd 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_udp/src/pubsub_udp_topic_sender.c
@@ -27,17 +27,17 @@
#include <zconf.h>
#include <arpa/inet.h>
#include <celix_log_helper.h>
-#include "pubsub_psa_tcp_constants.h"
-#include "pubsub_tcp_topic_sender.h"
-#include "pubsub_tcp_handler.h"
-#include "pubsub_tcp_common.h"
+#include "pubsub_psa_udp_constants.h"
+#include "pubsub_udp_topic_sender.h"
+#include "pubsub_skt_handler.h"
#include <uuid/uuid.h>
#include "celix_constants.h"
#include <pubsub_utils.h>
#include "pubsub_interceptors_handler.h"
-#include "pubsub_tcp_admin.h"
-#define TCP_BIND_MAX_RETRY 10
+#define UDP_BIND_MAX_RETRY 10
+// Max message size is 64k - 8byte UDP header - 20byte IP header
+#define UDP_MAX_MSG_SIZE (((64 * 1024) - 1) - 28)
#define L_DEBUG(...) \
celix_logHelper_log(sender->logHelper, CELIX_LOG_LEVEL_DEBUG, __VA_ARGS__)
@@ -48,14 +48,14 @@
#define L_ERROR(...) \
celix_logHelper_log(sender->logHelper, CELIX_LOG_LEVEL_ERROR, __VA_ARGS__)
-struct pubsub_tcp_topic_sender {
+struct pubsub_udp_topic_sender {
celix_bundle_context_t *ctx;
celix_log_helper_t *logHelper;
long protocolSvcId;
pubsub_protocol_service_t *protocol;
uuid_t fwUUID;
- pubsub_tcpHandler_t *socketHandler;
- pubsub_tcpHandler_t *sharedSocketHandler;
+ pubsub_sktHandler_t *socketHandler;
+ pubsub_sktHandler_t *sharedSocketHandler;
pubsub_interceptors_handler_t *interceptorsHandler;
pubsub_serializer_handler_t* serializerHandler;
@@ -76,31 +76,31 @@ struct pubsub_tcp_topic_sender {
struct {
celix_thread_mutex_t mutex;
- hash_map_t *map; //key = bndId, value = psa_tcp_bounded_service_entry_t
+ hash_map_t *map; //key = bndId, value = psa_udp_bounded_service_entry_t
} boundedServices;
};
-typedef struct psa_tcp_bounded_service_entry {
- pubsub_tcp_topic_sender_t *parent;
+typedef struct psa_udp_bounded_service_entry {
+ pubsub_udp_topic_sender_t *parent;
pubsub_publisher_t service;
long bndId;
int getCount;
-} psa_tcp_bounded_service_entry_t;
+} psa_udp_bounded_service_entry_t;
-static int psa_tcp_localMsgTypeIdForMsgType(void *handle, const char *msgType, unsigned int *msgTypeId);
+static int psa_udp_localMsgTypeIdForMsgType(void *handle, const char *msgType, unsigned int *msgTypeId);
-static void *psa_tcp_getPublisherService(void *handle, const celix_bundle_t *requestingBundle,
+static void *psa_udp_getPublisherService(void *handle, const celix_bundle_t *requestingBundle,
const celix_properties_t *svcProperties);
-static void psa_tcp_ungetPublisherService(void *handle, const celix_bundle_t *requestingBundle,
+static void psa_udp_ungetPublisherService(void *handle, const celix_bundle_t *requestingBundle,
const celix_properties_t *svcProperties);
-static void delay_first_send_for_late_joiners(pubsub_tcp_topic_sender_t *sender);
+static void delay_first_send_for_late_joiners(pubsub_udp_topic_sender_t *sender);
static int
-psa_tcp_topicPublicationSend(void *handle, unsigned int msgTypeId, const void *msg, celix_properties_t *metadata);
+psa_udp_topicPublicationSend(void *handle, unsigned int msgTypeId, const void *msg, celix_properties_t *metadata);
-pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
+pubsub_udp_topic_sender_t *pubsub_udpTopicSender_create(
celix_bundle_context_t *ctx,
celix_log_helper_t *logHelper,
const char *scope,
@@ -108,10 +108,10 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
pubsub_serializer_handler_t* serializerHandler,
void *admin,
const celix_properties_t *topicProperties,
- pubsub_tcp_endPointStore_t *handlerStore,
+ pubsub_sktHandler_endPointStore_t *handlerStore,
long protocolSvcId,
pubsub_protocol_service_t *protocol) {
- pubsub_tcp_topic_sender_t *sender = calloc(1, sizeof(*sender));
+ pubsub_udp_topic_sender_t *sender = calloc(1, sizeof(*sender));
sender->ctx = ctx;
sender->logHelper = logHelper;
sender->serializerHandler = serializerHandler;
@@ -122,36 +122,40 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
if (uuid != NULL) {
uuid_parse(uuid, sender->fwUUID);
}
- sender->interceptorsHandler = pubsubInterceptorsHandler_create(ctx, scope, topic, PUBSUB_TCP_ADMIN_TYPE,
+ sender->interceptorsHandler = pubsubInterceptorsHandler_create(ctx, scope, topic, PUBSUB_UDP_ADMIN_TYPE,
pubsub_serializerHandler_getSerializationType(serializerHandler));
sender->isPassive = false;
char *urls = NULL;
- const char *ip = celix_bundleContext_getProperty(ctx, PUBSUB_TCP_PSA_IP_KEY, NULL);
- const char *discUrl = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_STATIC_BIND_URL_FOR, topic, scope);
- const char *isPassive = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_PASSIVE_ENABLED, topic, scope);
- const char *passiveKey = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_PASSIVE_SELECTION_KEY, topic, scope);
+ const char *ip = celix_bundleContext_getProperty(ctx, PUBSUB_UDP_PSA_IP_KEY, NULL);
+ const char *discUrl = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_UDP_STATIC_BIND_URL_FOR, topic, scope);
+ const char *isPassive = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_UDP_PASSIVE_ENABLED, topic, scope);
+ const char *passiveKey = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_UDP_PASSIVE_SELECTION_KEY, topic, scope);
+ const char *staticConnectUrls = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_UDP_STATIC_CONNECT_URLS_FOR, topic, scope);
if (isPassive) {
- sender->isPassive = psa_tcp_isPassive(isPassive);
+ sender->isPassive = pubsub_sktHandler_isPassive(isPassive);
}
if (topicProperties != NULL) {
if (discUrl == NULL) {
- discUrl = celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_DISCOVER_URL, NULL);
+ discUrl = celix_properties_get(topicProperties, PUBSUB_UDP_STATIC_DISCOVER_URL, NULL);
}
if (isPassive == NULL) {
- sender->isPassive = celix_properties_getAsBool(topicProperties, PUBSUB_TCP_PASSIVE_CONFIGURED, false);
+ sender->isPassive = celix_properties_getAsBool(topicProperties, PUBSUB_UDP_PASSIVE_CONFIGURED, false);
}
if (passiveKey == NULL) {
- passiveKey = celix_properties_get(topicProperties, PUBSUB_TCP_PASSIVE_KEY, NULL);
+ passiveKey = celix_properties_get(topicProperties, PUBSUB_UDP_PASSIVE_KEY, NULL);
+ }
+ if(staticConnectUrls == NULL) {
+ staticConnectUrls = celix_properties_get(topicProperties, PUBSUB_UDP_STATIC_CONNECT_URLS, NULL);
}
}
/* When it's an endpoint share the socket with the receiver */
if (passiveKey != NULL) {
celixThreadMutex_lock(&handlerStore->mutex);
- pubsub_tcpHandler_t *entry = hashMap_get(handlerStore->map, passiveKey);
+ pubsub_sktHandler_t *entry = hashMap_get(handlerStore->map, passiveKey);
if (entry == NULL) {
if (sender->socketHandler == NULL)
- sender->socketHandler = pubsub_tcpHandler_create(sender->protocol, sender->logHelper);
+ sender->socketHandler = pubsub_sktHandler_create(sender->protocol, sender->logHelper);
entry = sender->socketHandler;
sender->sharedSocketHandler = sender->socketHandler;
hashMap_put(handlerStore->map, (void *) passiveKey, entry);
@@ -161,66 +165,91 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
}
celixThreadMutex_unlock(&handlerStore->mutex);
} else {
- sender->socketHandler = pubsub_tcpHandler_create(sender->protocol, sender->logHelper);
+ sender->socketHandler = pubsub_sktHandler_create(sender->protocol, sender->logHelper);
}
if ((sender->socketHandler != NULL) && (topicProperties != NULL)) {
- long prio = celix_properties_getAsLong(topicProperties, PUBSUB_TCP_THREAD_REALTIME_PRIO, -1L);
- const char *sched = celix_properties_get(topicProperties, PUBSUB_TCP_THREAD_REALTIME_SCHED, NULL);
- long retryCnt = celix_properties_getAsLong(topicProperties, PUBSUB_TCP_PUBLISHER_RETRY_CNT_KEY, PUBSUB_TCP_PUBLISHER_RETRY_CNT_DEFAULT);
- double sendTimeout = celix_properties_getAsDouble(topicProperties, PUBSUB_TCP_PUBLISHER_SNDTIMEO_KEY, PUBSUB_TCP_PUBLISHER_SNDTIMEO_DEFAULT);
- long maxMsgSize = celix_properties_getAsLong(topicProperties, PSA_TCP_MAX_MESSAGE_SIZE, PSA_TCP_DEFAULT_MAX_MESSAGE_SIZE);
- long timeout = celix_bundleContext_getPropertyAsLong(ctx, PSA_TCP_TIMEOUT, PSA_TCP_DEFAULT_TIMEOUT);
+ long prio = celix_properties_getAsLong(topicProperties, PUBSUB_UDP_THREAD_REALTIME_PRIO, -1L);
+ const char *sched = celix_properties_get(topicProperties, PUBSUB_UDP_THREAD_REALTIME_SCHED, NULL);
+ long retryCnt = celix_properties_getAsLong(topicProperties, PUBSUB_UDP_PUBLISHER_RETRY_CNT_KEY, PUBSUB_UDP_PUBLISHER_RETRY_CNT_DEFAULT);
+ double sendTimeout = celix_properties_getAsDouble(topicProperties, PUBSUB_UDP_PUBLISHER_SNDTIMEO_KEY, PUBSUB_UDP_PUBLISHER_SNDTIMEO_DEFAULT);
+ long maxMsgSize = celix_properties_getAsLong(topicProperties, PSA_UDP_MAX_MESSAGE_SIZE, PSA_UDP_DEFAULT_MAX_MESSAGE_SIZE);
+ long timeout = celix_bundleContext_getPropertyAsLong(ctx, PSA_UDP_TIMEOUT, PSA_UDP_DEFAULT_TIMEOUT);
sender->send_delay = celix_bundleContext_getPropertyAsLong(ctx, PUBSUB_UTILS_PSA_SEND_DELAY, PUBSUB_UTILS_PSA_DEFAULT_SEND_DELAY);
- pubsub_tcpHandler_setThreadName(sender->socketHandler, topic, scope);
- pubsub_tcpHandler_setThreadPriority(sender->socketHandler, prio, sched);
- pubsub_tcpHandler_setSendRetryCnt(sender->socketHandler, (unsigned int) retryCnt);
- pubsub_tcpHandler_setSendTimeOut(sender->socketHandler, sendTimeout);
- pubsub_tcpHandler_setMaxMsgSize(sender->socketHandler, (unsigned int) maxMsgSize);
+ maxMsgSize = MIN(UDP_MAX_MSG_SIZE, maxMsgSize);
+ pubsub_sktHandler_setThreadName(sender->socketHandler, topic, scope);
+ pubsub_sktHandler_setThreadPriority(sender->socketHandler, prio, sched);
+ pubsub_sktHandler_setSendRetryCnt(sender->socketHandler, (unsigned int) retryCnt);
+ pubsub_sktHandler_setSendTimeOut(sender->socketHandler, sendTimeout);
+ pubsub_sktHandler_setMaxMsgSize(sender->socketHandler, (unsigned int) maxMsgSize);
// When passiveKey is specified, enable receive event for full-duplex connection using key.
// Because the topic receiver is already started, enable the receive event.
- pubsub_tcpHandler_enableReceiveEvent(sender->socketHandler, (passiveKey) ? true : false);
- pubsub_tcpHandler_setTimeout(sender->socketHandler, (unsigned int) timeout);
+ pubsub_sktHandler_enableReceiveEvent(sender->socketHandler, (passiveKey) ? true : false);
+ pubsub_sktHandler_setTimeout(sender->socketHandler, (unsigned int) timeout);
}
if (!sender->isPassive) {
- //setting up tcp socket for TCP TopicSender
+ //setting up tcp socket for UDP TopicSender
if (discUrl != NULL) {
- urls = strndup(discUrl, 1024 * 1024);
+ urls = celix_utils_strdup(discUrl);
sender->isStatic = true;
} else if (ip != NULL) {
- urls = strndup(ip, 1024 * 1024);
+ urls = celix_utils_strdup(ip);
} else {
struct sockaddr_in *sin = pubsub_utils_url_getInAddr(NULL, 0);
urls = pubsub_utils_url_get_url(sin, NULL);
free(sin);
}
- if (!sender->url) {
- char *urlsCopy = strndup(urls, 1024 * 1024);
+ if (!sender->url && urls) {
+ char *urlsCopy = celix_utils_strdup(urls);
char *url;
char *save = urlsCopy;
while ((url = strtok_r(save, " ", &save))) {
int retry = 0;
- while (url && retry < TCP_BIND_MAX_RETRY) {
- pubsub_utils_url_t *urlInfo = pubsub_utils_url_parse(url);
- int rc = pubsub_tcpHandler_listen(sender->socketHandler, urlInfo->url);
+ pubsub_utils_url_t *urlInfo = pubsub_utils_url_parse(url);
+ bool is_multicast = pubsub_utils_url_is_multicast(urlInfo->hostname);
+ bool is_broadcast = pubsub_utils_url_is_broadcast(urlInfo->hostname);
+ pubsub_utils_url_free(urlInfo);
+ while ((is_multicast || is_broadcast) && url && retry < UDP_BIND_MAX_RETRY) {
+ int rc = pubsub_sktHandler_udp_bind(sender->socketHandler, url);
if (rc < 0) {
- L_WARN("Error for tcp_bind using dynamic bind url '%s'. %s", urlInfo->url, strerror(errno));
+ L_WARN("Error for udp bind using dynamic bind url '%s'. %s", url, strerror(errno));
} else {
url = NULL;
}
- pubsub_utils_url_free(urlInfo);
retry++;
}
}
free(urlsCopy);
- sender->url = pubsub_tcpHandler_get_interface_url(sender->socketHandler);
+ sender->url = pubsub_sktHandler_get_interface_url(sender->socketHandler);
}
free(urls);
+ if (staticConnectUrls){
+ char *urlsCopy = celix_utils_strdup(staticConnectUrls);
+ char *url;
+ char *save = urlsCopy;
+ while ((url = strtok_r(save, " ", &save))) {
+ if (url) {
+ int rc = 0;
+ pubsub_utils_url_t *urlInfo = pubsub_utils_url_parse(url);
+ bool is_multicast = pubsub_utils_url_is_multicast(urlInfo->hostname);
+ bool is_broadcast = pubsub_utils_url_is_broadcast(urlInfo->hostname);
+ if (!is_multicast && !is_broadcast) {
+ L_INFO("Udp static connect '%s'", url);
+ rc = pubsub_sktHandler_udp_connect(sender->socketHandler, url);
+ }
+ if (rc < 0) {
+ L_WARN("Error for udp static connect '%s'. %s", url, strerror(errno));
+ }
+ pubsub_utils_url_free(urlInfo);
+ }
+ }
+ free(urlsCopy);
+ }
}
//register publisher services using a service factory
- if ((sender->url != NULL) || (sender->isPassive)) {
+ {
sender->scope = scope == NULL ? NULL : strndup(scope, 1024 * 1024);
sender->topic = strndup(topic, 1024 * 1024);
@@ -228,8 +257,8 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
sender->boundedServices.map = hashMap_create(NULL, NULL, NULL, NULL);
sender->publisher.factory.handle = sender;
- sender->publisher.factory.getService = psa_tcp_getPublisherService;
- sender->publisher.factory.ungetService = psa_tcp_ungetPublisherService;
+ sender->publisher.factory.getService = psa_udp_getPublisherService;
+ sender->publisher.factory.ungetService = psa_udp_ungetPublisherService;
celix_properties_t *props = celix_properties_create();
celix_properties_set(props, PUBSUB_PUBLISHER_TOPIC, sender->topic);
@@ -244,15 +273,11 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
opts.properties = props;
sender->publisher.svcId = celix_bundleContext_registerServiceWithOptions(ctx, &opts);
- } else {
- free(sender);
- sender = NULL;
}
-
return sender;
}
-void pubsub_tcpTopicSender_destroy(pubsub_tcp_topic_sender_t *sender) {
+void pubsub_udpTopicSender_destroy(pubsub_udp_topic_sender_t *sender) {
if (sender != NULL) {
celix_bundleContext_unregisterService(sender->ctx, sender->publisher.svcId);
@@ -260,7 +285,7 @@ void pubsub_tcpTopicSender_destroy(pubsub_tcp_topic_sender_t *sender) {
celixThreadMutex_lock(&sender->boundedServices.mutex);
hash_map_iterator_t iter = hashMapIterator_construct(sender->boundedServices.map);
while (hashMapIterator_hasNext(&iter)) {
- psa_tcp_bounded_service_entry_t *entry = hashMapIterator_nextValue(&iter);
+ psa_udp_bounded_service_entry_t *entry = hashMapIterator_nextValue(&iter);
free(entry);
}
hashMap_destroy(sender->boundedServices.map, false, false);
@@ -269,7 +294,7 @@ void pubsub_tcpTopicSender_destroy(pubsub_tcp_topic_sender_t *sender) {
pubsubInterceptorsHandler_destroy(sender->interceptorsHandler);
if ((sender->socketHandler) && (sender->sharedSocketHandler == NULL)) {
- pubsub_tcpHandler_destroy(sender->socketHandler);
+ pubsub_sktHandler_destroy(sender->socketHandler);
sender->socketHandler = NULL;
}
@@ -282,44 +307,83 @@ void pubsub_tcpTopicSender_destroy(pubsub_tcp_topic_sender_t *sender) {
}
}
-long pubsub_tcpTopicSender_protocolSvcId(pubsub_tcp_topic_sender_t *sender) {
+long pubsub_udpTopicSender_protocolSvcId(pubsub_udp_topic_sender_t *sender) {
return sender->protocolSvcId;
}
-const char *pubsub_tcpTopicSender_scope(pubsub_tcp_topic_sender_t *sender) {
+const char *pubsub_udpTopicSender_scope(pubsub_udp_topic_sender_t *sender) {
return sender->scope;
}
-const char *pubsub_tcpTopicSender_topic(pubsub_tcp_topic_sender_t *sender) {
+const char *pubsub_udpTopicSender_topic(pubsub_udp_topic_sender_t *sender) {
return sender->topic;
}
-const char* pubsub_tcpTopicSender_serializerType(pubsub_tcp_topic_sender_t *sender) {
+const char* pubsub_udpTopicSender_serializerType(pubsub_udp_topic_sender_t *sender) {
return pubsub_serializerHandler_getSerializationType(sender->serializerHandler);
}
-const char *pubsub_tcpTopicSender_url(pubsub_tcp_topic_sender_t *sender) {
+const char *pubsub_udpTopicSender_url(pubsub_udp_topic_sender_t *sender) {
if (sender->isPassive) {
- return pubsub_tcpHandler_get_connection_url(sender->socketHandler);
+ return pubsub_sktHandler_get_connection_url(sender->socketHandler);
} else {
return sender->url;
}
}
-bool pubsub_tcpTopicSender_isStatic(pubsub_tcp_topic_sender_t *sender) {
+
+void pubsub_udpTopicSender_listConnections(pubsub_udp_topic_sender_t *sender, celix_array_list_t *urls) {
+ pubsub_sktHandler_get_connection_urls(sender->socketHandler, urls);
+}
+
+bool pubsub_udpTopicSender_isStatic(pubsub_udp_topic_sender_t *sender) {
return sender->isStatic;
}
-bool pubsub_tcpTopicSender_isPassive(pubsub_tcp_topic_sender_t *sender) {
+bool pubsub_udpTopicSender_isPassive(pubsub_udp_topic_sender_t *sender) {
return sender->isPassive;
}
-static void *psa_tcp_getPublisherService(void *handle, const celix_bundle_t *requestingBundle,
+void pubsub_udpTopicSender_connectTo(pubsub_udp_topic_sender_t *sender, const char *url) {
+ if (!url) return;
+ char *connectUrl = celix_utils_strdup(url);
+ pubsub_utils_url_t *urlInfo = pubsub_utils_url_parse(connectUrl);
+ bool is_multicast = pubsub_utils_url_is_multicast(urlInfo->hostname);
+ bool is_broadcast = pubsub_utils_url_is_broadcast(urlInfo->hostname);
+ pubsub_utils_url_free(urlInfo);
+
+ if (!is_multicast && !is_broadcast) {
+ L_DEBUG("[PSA_udp] TopicSender %s/%s connecting to udp url %s",
+ sender->scope == NULL ? "(null)" : sender->scope,
+ sender->topic,
+ url);
+ pubsub_sktHandler_udp_connect(sender->socketHandler, connectUrl);
+ }
+ free(connectUrl);
+}
+
+void pubsub_udpTopicSender_disconnectFrom(pubsub_udp_topic_sender_t *sender, const char *url) {
+ if (!url) return;
+ char *connectUrl = celix_utils_strdup(url);
+ pubsub_utils_url_t *urlInfo = pubsub_utils_url_parse(connectUrl);
+ bool is_multicast = pubsub_utils_url_is_multicast(urlInfo->hostname);
+ bool is_broadcast = pubsub_utils_url_is_broadcast(urlInfo->hostname);
+ pubsub_utils_url_free(urlInfo);
+ if (!is_multicast && !is_broadcast) {
+ L_DEBUG("[PSA udp] TopicSender %s/%s disconnect from udp url %s",
+ sender->scope == NULL ? "(null)" : sender->scope,
+ sender->topic,
+ url);
+ pubsub_sktHandler_disconnect(sender->socketHandler, connectUrl);
+ }
+}
+
+static void *psa_udp_getPublisherService(void *handle, const celix_bundle_t *requestingBundle,
const celix_properties_t *svcProperties __attribute__((unused))) {
- pubsub_tcp_topic_sender_t *sender = handle;
+ pubsub_udp_topic_sender_t *sender = handle;
long bndId = celix_bundle_getId(requestingBundle);
celixThreadMutex_lock(&sender->boundedServices.mutex);
- psa_tcp_bounded_service_entry_t *entry = hashMap_get(sender->boundedServices.map, (void *) bndId);
+ psa_udp_bounded_service_entry_t *entry = hashMap_get(sender->boundedServices.map, (void *) bndId);
if (entry != NULL) {
entry->getCount += 1;
} else {
@@ -328,8 +392,8 @@ static void *psa_tcp_getPublisherService(void *handle, const celix_bundle_t *req
entry->parent = sender;
entry->bndId = bndId;
entry->service.handle = entry;
- entry->service.localMsgTypeIdForMsgType = psa_tcp_localMsgTypeIdForMsgType;
- entry->service.send = psa_tcp_topicPublicationSend;
+ entry->service.localMsgTypeIdForMsgType = psa_udp_localMsgTypeIdForMsgType;
+ entry->service.send = psa_udp_topicPublicationSend;
hashMap_put(sender->boundedServices.map, (void *) bndId, entry);
}
celixThreadMutex_unlock(&sender->boundedServices.mutex);
@@ -337,13 +401,13 @@ static void *psa_tcp_getPublisherService(void *handle, const celix_bundle_t *req
return &entry->service;
}
-static void psa_tcp_ungetPublisherService(void *handle, const celix_bundle_t *requestingBundle,
+static void psa_udp_ungetPublisherService(void *handle, const celix_bundle_t *requestingBundle,
const celix_properties_t *svcProperties __attribute__((unused))) {
- pubsub_tcp_topic_sender_t *sender = handle;
+ pubsub_udp_topic_sender_t *sender = handle;
long bndId = celix_bundle_getId(requestingBundle);
celixThreadMutex_lock(&sender->boundedServices.mutex);
- psa_tcp_bounded_service_entry_t *entry = hashMap_get(sender->boundedServices.map, (void *) bndId);
+ psa_udp_bounded_service_entry_t *entry = hashMap_get(sender->boundedServices.map, (void *) bndId);
if (entry != NULL) {
entry->getCount -= 1;
}
@@ -357,9 +421,9 @@ static void psa_tcp_ungetPublisherService(void *handle, const celix_bundle_t *re
}
static int
-psa_tcp_topicPublicationSend(void *handle, unsigned int msgTypeId, const void *inMsg, celix_properties_t *metadata) {
- psa_tcp_bounded_service_entry_t *bound = handle;
- pubsub_tcp_topic_sender_t *sender = bound->parent;
+psa_udp_topicPublicationSend(void *handle, unsigned int msgTypeId, const void *inMsg, celix_properties_t *metadata) {
+ psa_udp_bounded_service_entry_t *bound = handle;
+ pubsub_udp_topic_sender_t *sender = bound->parent;
const char* msgFqn;
int majorVersion;
int minorVersion;
@@ -382,7 +446,7 @@ psa_tcp_topicPublicationSend(void *handle, unsigned int msgTypeId, const void *i
struct iovec *serializedIoVecOutput = NULL;
status = pubsub_serializerHandler_serialize(sender->serializerHandler, msgTypeId, inMsg, &serializedIoVecOutput, &serializedIoVecOutputLen);
if (status != CELIX_SUCCESS) {
- L_WARN("[PSA_TCP_V2_TS] Error serialize message of type %s for scope/topic %s/%s", msgFqn,
+ L_WARN("[PSA_UDP_V2_TS] Error serialize message of type %s for scope/topic %s/%s", msgFqn,
sender->scope == NULL ? "(null)" : sender->scope, sender->topic);
celix_properties_destroy(metadata);
return status;
@@ -411,7 +475,7 @@ psa_tcp_topicPublicationSend(void *handle, unsigned int msgTypeId, const void *i
}
bool sendOk = true;
{
- int rc = pubsub_tcpHandler_write(sender->socketHandler, &message, serializedIoVecOutput, serializedIoVecOutputLen, 0);
+ int rc = pubsub_sktHandler_write(sender->socketHandler, &message, serializedIoVecOutput, serializedIoVecOutputLen, 0);
if (rc < 0) {
status = -1;
sendOk = false;
@@ -424,28 +488,28 @@ psa_tcp_topicPublicationSend(void *handle, unsigned int msgTypeId, const void *i
}
if (!sendOk) {
- L_WARN("[PSA_TCP_V2_TS] Error sending msg. %s", strerror(errno));
+ L_WARN("[PSA_UDP_V2_TS] Error sending msg. %s", strerror(errno));
}
celix_properties_destroy(metadata);
return status;
}
-static void delay_first_send_for_late_joiners(pubsub_tcp_topic_sender_t *sender) {
+static void delay_first_send_for_late_joiners(pubsub_udp_topic_sender_t *sender) {
static bool firstSend = true;
if (firstSend) {
if (sender->send_delay ) {
- L_INFO("PSA_TCP_TP: Delaying first send for late joiners...\n");
+ L_INFO("PSA_UDP_TP: Delaying first send for late joiners...\n");
}
usleep(sender->send_delay * 1000);
firstSend = false;
}
}
-static int psa_tcp_localMsgTypeIdForMsgType(void *handle, const char *msgType, unsigned int *msgTypeId) {
- psa_tcp_bounded_service_entry_t* entry = handle;
+static int psa_udp_localMsgTypeIdForMsgType(void *handle, const char *msgType, unsigned int *msgTypeId) {
+ psa_udp_bounded_service_entry_t* entry = handle;
uint32_t msgId = pubsub_serializerHandler_getMsgId(entry->parent->serializerHandler, msgType);
*msgTypeId = (unsigned int)msgId;
return 0;
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.h b/bundles/pubsub/pubsub_admin_udp/src/pubsub_udp_topic_sender.h
similarity index 54%
copy from bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.h
copy to bundles/pubsub/pubsub_admin_udp/src/pubsub_udp_topic_sender.h
index 57b13a6..8b142e1 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.h
+++ b/bundles/pubsub/pubsub_admin_udp/src/pubsub_udp_topic_sender.h
@@ -17,18 +17,18 @@
* under the License.
*/
-#ifndef CELIX_PUBSUB_TCP_TOPIC_SENDER_H
-#define CELIX_PUBSUB_TCP_TOPIC_SENDER_H
+#ifndef CELIX_PUBSUB_UDP_TOPIC_SENDER_H
+#define CELIX_PUBSUB_UDP_TOPIC_SENDER_H
#include "celix_bundle_context.h"
#include "pubsub_admin_metrics.h"
#include "pubsub_protocol.h"
-#include "pubsub_tcp_common.h"
+#include "pubsub_skt_handler.h"
#include "pubsub_serializer_handler.h"
-typedef struct pubsub_tcp_topic_sender pubsub_tcp_topic_sender_t;
+typedef struct pubsub_udp_topic_sender pubsub_udp_topic_sender_t;
-pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
+pubsub_udp_topic_sender_t *pubsub_udpTopicSender_create(
celix_bundle_context_t *ctx,
celix_log_helper_t *logHelper,
const char *scope,
@@ -36,17 +36,20 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
pubsub_serializer_handler_t* serializerHandler,
void *admin,
const celix_properties_t *topicProperties,
- pubsub_tcp_endPointStore_t *handlerStore,
+ pubsub_sktHandler_endPointStore_t *handlerStore,
long protocolSvcId,
pubsub_protocol_service_t *prot);
-void pubsub_tcpTopicSender_destroy(pubsub_tcp_topic_sender_t *sender);
-const char *pubsub_tcpTopicSender_scope(pubsub_tcp_topic_sender_t *sender);
-const char *pubsub_tcpTopicSender_topic(pubsub_tcp_topic_sender_t *sender);
-const char *pubsub_tcpTopicSender_url(pubsub_tcp_topic_sender_t *sender);
-const char* pubsub_tcpTopicSender_serializerType(pubsub_tcp_topic_sender_t *sender);
-bool pubsub_tcpTopicSender_isStatic(pubsub_tcp_topic_sender_t *sender);
-bool pubsub_tcpTopicSender_isPassive(pubsub_tcp_topic_sender_t *sender);
-long pubsub_tcpTopicSender_protocolSvcId(pubsub_tcp_topic_sender_t *sender);
+void pubsub_udpTopicSender_destroy(pubsub_udp_topic_sender_t *sender);
+const char *pubsub_udpTopicSender_scope(pubsub_udp_topic_sender_t *sender);
+const char *pubsub_udpTopicSender_topic(pubsub_udp_topic_sender_t *sender);
+const char *pubsub_udpTopicSender_url(pubsub_udp_topic_sender_t *sender);
+const char* pubsub_udpTopicSender_serializerType(pubsub_udp_topic_sender_t *sender);
+bool pubsub_udpTopicSender_isStatic(pubsub_udp_topic_sender_t *sender);
+bool pubsub_udpTopicSender_isPassive(pubsub_udp_topic_sender_t *sender);
+long pubsub_udpTopicSender_protocolSvcId(pubsub_udp_topic_sender_t *sender);
+void pubsub_udpTopicSender_connectTo(pubsub_udp_topic_sender_t *receiver, const char *url);
+void pubsub_udpTopicSender_disconnectFrom(pubsub_udp_topic_sender_t *receiver, const char *url);
+void pubsub_udpTopicSender_listConnections(pubsub_udp_topic_sender_t *sender, celix_array_list_t *urls);
-#endif //CELIX_PUBSUB_TCP_TOPIC_SENDER_H
+#endif //CELIX_PUBSUB_udp_TOPIC_SENDER_H
diff --git a/bundles/pubsub/pubsub_utils/CMakeLists.txt b/bundles/pubsub/pubsub_utils/CMakeLists.txt
index 0c285aa..967cc52 100644
--- a/bundles/pubsub/pubsub_utils/CMakeLists.txt
+++ b/bundles/pubsub/pubsub_utils/CMakeLists.txt
@@ -19,6 +19,7 @@ add_library(pubsub_utils STATIC
src/pubsub_utils.c
src/pubsub_utils_url.c
src/pubsub_serializer_handler.c
+ src/pubsub_skt_handler.c
src/pubsub_serialization_provider.c
src/pubsub_matching.c
)
diff --git a/bundles/pubsub/pubsub_utils/include/pubsub_skt_handler.h b/bundles/pubsub/pubsub_utils/include/pubsub_skt_handler.h
new file mode 100644
index 0000000..bfd52ca
--- /dev/null
+++ b/bundles/pubsub/pubsub_utils/include/pubsub_skt_handler.h
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * pubsub_skt_handler.h
+ *
+ * \date July 18, 2016
+ * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ * \copyright Apache License, Version 2.0
+ */
+
+#ifndef _PUBSUB_SKT_BUFFER_HANDLER_H_
+#define _PUBSUB_SKT_BUFFER_HANDLER_H_
+
+#include <stdbool.h>
+#include <stdlib.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <celix_log_helper.h>
+#include "celix_threads.h"
+#include "pubsub_utils_url.h"
+#include <pubsub_protocol.h>
+
+#ifndef MIN
+#define MIN(a, b) ((a<b) ? (a) : (b))
+#endif
+
+#ifndef MAX
+#define MAX(a, b) ((a>b) ? (a) : (b))
+#endif
+
+typedef struct pubsub_sktHandler_endPointStore {
+ celix_thread_mutex_t mutex;
+ hash_map_t *map;
+} pubsub_sktHandler_endPointStore_t;
+
+typedef struct pubsub_sktHandler pubsub_sktHandler_t;
+typedef void(*pubsub_sktHandler_processMessage_callback_t)
+ (void *payload, const pubsub_protocol_message_t *header, bool *release, struct timespec *receiveTime);
+typedef void (*pubsub_sktHandler_receiverConnectMessage_callback_t)(void *payload, const char *url, bool lock);
+typedef void (*pubsub_sktHandler_acceptConnectMessage_callback_t)(void *payload, const char *url);
+
+pubsub_sktHandler_t *pubsub_sktHandler_create(pubsub_protocol_service_t *protocol, celix_log_helper_t *logHelper);
+void pubsub_sktHandler_destroy(pubsub_sktHandler_t *handle);
+int pubsub_sktHandler_open(pubsub_sktHandler_t *handle, int socket_type, char *url);
+int pubsub_sktHandler_bind(pubsub_sktHandler_t *handle, int fd,char *url, unsigned int port_nr);
+int pubsub_sktHandler_close(pubsub_sktHandler_t *handle, int fd);
+int pubsub_sktHandler_tcp_connect(pubsub_sktHandler_t *handle, char *url);
+int pubsub_sktHandler_disconnect(pubsub_sktHandler_t *handle, char *url);
+int pubsub_sktHandler_udp_connect(pubsub_sktHandler_t *handle, char *url);
+int pubsub_sktHandler_udp_bind(pubsub_sktHandler_t *handle, char *url);
+int pubsub_sktHandler_udp_listen(pubsub_sktHandler_t *handle, char *url);
+int pubsub_sktHandler_tcp_listen(pubsub_sktHandler_t *handle, char *url);
+int pubsub_sktHandler_setReceiveBufferSize(pubsub_sktHandler_t *handle, unsigned int size);
+int pubsub_sktHandler_setMaxMsgSize(pubsub_sktHandler_t *handle, unsigned int size);
+void pubsub_sktHandler_setTimeout(pubsub_sktHandler_t *handle, unsigned int timeout);
+void pubsub_sktHandler_setSendRetryCnt(pubsub_sktHandler_t *handle, unsigned int count);
+void pubsub_sktHandler_setReceiveRetryCnt(pubsub_sktHandler_t *handle, unsigned int count);
+void pubsub_sktHandler_setSendTimeOut(pubsub_sktHandler_t *handle, double timeout);
+void pubsub_sktHandler_setReceiveTimeOut(pubsub_sktHandler_t *handle, double timeout);
+void pubsub_sktHandler_enableReceiveEvent(pubsub_sktHandler_t *handle, bool enable);
+
+int pubsub_sktHandler_read(pubsub_sktHandler_t *handle, int fd);
+int pubsub_sktHandler_write(pubsub_sktHandler_t *handle,
+ pubsub_protocol_message_t *message,
+ struct iovec *msg_iovec,
+ size_t msg_iov_len,
+ int flags);
+int pubsub_sktHandler_addMessageHandler(pubsub_sktHandler_t *handle,
+ void *payload,
+ pubsub_sktHandler_processMessage_callback_t processMessageCallback);
+int pubsub_sktHandler_addReceiverConnectionCallback(pubsub_sktHandler_t *handle,
+ void *payload,
+ pubsub_sktHandler_receiverConnectMessage_callback_t connectMessageCallback,
+ pubsub_sktHandler_receiverConnectMessage_callback_t disconnectMessageCallback);
+int pubsub_sktHandler_addAcceptConnectionCallback(pubsub_sktHandler_t *handle,
+ void *payload,
+ pubsub_sktHandler_acceptConnectMessage_callback_t connectMessageCallback,
+ pubsub_sktHandler_acceptConnectMessage_callback_t disconnectMessageCallback);
+char *pubsub_sktHandler_get_interface_url(pubsub_sktHandler_t *handle);
+char *pubsub_sktHandler_get_connection_url(pubsub_sktHandler_t *handle);
+void pubsub_sktHandler_get_connection_urls(pubsub_sktHandler_t *handle, celix_array_list_t *urls);
+bool pubsub_sktHandler_isPassive(const char* buffer);
+void pubsub_sktHandler_setThreadPriority(pubsub_sktHandler_t *handle, long prio, const char *sched);
+void pubsub_sktHandler_setThreadName(pubsub_sktHandler_t *handle, const char *topic, const char *scope);
+
+#endif /* _PUBSUB_SKT_BUFFER_HANDLER_H_ */
diff --git a/bundles/pubsub/pubsub_utils/include/pubsub_utils_url.h b/bundles/pubsub/pubsub_utils/include/pubsub_utils_url.h
index b10863c..efea1c8 100644
--- a/bundles/pubsub/pubsub_utils/include/pubsub_utils_url.h
+++ b/bundles/pubsub/pubsub_utils/include/pubsub_utils_url.h
@@ -40,8 +40,9 @@ struct sockaddr_in *pubsub_utils_url_getInAddr(const char *hostname, unsigned in
char *pubsub_utils_url_generate_url(char *hostname, unsigned int port_nr, char *protocol);
char *pubsub_utils_url_get_url(struct sockaddr_in *inp, char *protocol);
bool pubsub_utils_url_is_multicast(char *hostname);
-char *pubsub_utils_url_get_multicast_ip(char *hostname);
-char *pubsub_utils_url_get_ip(char *hostname);
+bool pubsub_utils_url_is_broadcast(char *hostname);
+char *pubsub_utils_url_get_multicast_ip(char *hostname, in_addr_t* intf_addr);
+char *pubsub_utils_url_get_ip(char *hostname, in_addr_t* intf_addr);
void pubsub_utils_url_parse_url(char *_url, pubsub_utils_url_t *url_info);
pubsub_utils_url_t *pubsub_utils_url_parse(char *url);
void pubsub_utils_url_free(pubsub_utils_url_t *url_info);
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c b/bundles/pubsub/pubsub_utils/src/pubsub_skt_handler.c
similarity index 56%
rename from bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
rename to bundles/pubsub/pubsub_utils/src/pubsub_skt_handler.c
index eb02f79..fbc96b8 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
+++ b/bundles/pubsub/pubsub_utils/src/pubsub_skt_handler.c
@@ -17,7 +17,7 @@
* under the License.
*/
/*
- * pubsub_tcp_handler.c
+ * pubsub_skt_handler.c
*
* \date July 18, 2019
* \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
@@ -42,9 +42,10 @@
#include <fcntl.h>
#include <arpa/inet.h>
#include <netinet/tcp.h>
+//#include <netinet/udp.h>
#include "hash_map.h"
#include "utils.h"
-#include "pubsub_tcp_handler.h"
+#include "pubsub_skt_handler.h"
#define MAX_EVENTS 64
#define MAX_DEFAULT_BUFFER_SIZE 4u
@@ -67,11 +68,15 @@
//
// Entry administration
//
-typedef struct psa_tcp_connection_entry {
+typedef struct psa_skt_connection_entry {
char *interface_url;
char *url;
int fd;
+ int socket_domain;
+ int socket_type;
+ char* protocol;
struct sockaddr_in addr;
+ struct sockaddr_in dst_addr;
socklen_t len;
bool connected;
bool headerError;
@@ -96,12 +101,13 @@ typedef struct psa_tcp_connection_entry {
unsigned int retryCount;
celix_thread_mutex_t writeMutex;
struct msghdr readMsg;
-} psa_tcp_connection_entry_t;
+ struct sockaddr_in readMsgAddr;
+} psa_skt_connection_entry_t;
//
// Handle administration
//
-struct pubsub_tcpHandler {
+struct pubsub_sktHandler {
celix_thread_rwlock_t dbLock;
unsigned int timeout;
hash_map_t *connection_url_map;
@@ -109,13 +115,14 @@ struct pubsub_tcpHandler {
hash_map_t *interface_url_map;
hash_map_t *interface_fd_map;
int efd;
- pubsub_tcpHandler_receiverConnectMessage_callback_t receiverConnectMessageCallback;
- pubsub_tcpHandler_receiverConnectMessage_callback_t receiverDisconnectMessageCallback;
+ int fd;
+ pubsub_sktHandler_receiverConnectMessage_callback_t receiverConnectMessageCallback;
+ pubsub_sktHandler_receiverConnectMessage_callback_t receiverDisconnectMessageCallback;
void *receiverConnectPayload;
- pubsub_tcpHandler_acceptConnectMessage_callback_t acceptConnectMessageCallback;
- pubsub_tcpHandler_acceptConnectMessage_callback_t acceptDisconnectMessageCallback;
+ pubsub_sktHandler_acceptConnectMessage_callback_t acceptConnectMessageCallback;
+ pubsub_sktHandler_acceptConnectMessage_callback_t acceptDisconnectMessageCallback;
void *acceptConnectPayload;
- pubsub_tcpHandler_processMessage_callback_t processMessageCallback;
+ pubsub_sktHandler_processMessage_callback_t processMessageCallback;
void *processMessagePayload;
celix_log_helper_t *logHelper;
pubsub_protocol_service_t *protocol;
@@ -130,34 +137,36 @@ struct pubsub_tcpHandler {
bool enableReceiveEvent;
};
-static inline int pubsub_tcpHandler_closeConnectionEntry(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, bool lock);
-static inline int pubsub_tcpHandler_closeInterfaceEntry(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry);
-static inline int pubsub_tcpHandler_makeNonBlocking(pubsub_tcpHandler_t *handle, int fd);
-static inline psa_tcp_connection_entry_t* pubsub_tcpHandler_createEntry(pubsub_tcpHandler_t *handle, int fd, char *url, char *interface_url, struct sockaddr_in *addr);
-static inline void pubsub_tcpHandler_freeEntry(psa_tcp_connection_entry_t *entry);
-static inline void pubsub_tcpHandler_releaseEntryBuffer(pubsub_tcpHandler_t *handle, int fd, unsigned int index);
-static inline long int pubsub_tcpHandler_getMsgSize(psa_tcp_connection_entry_t *entry);
-static inline void pubsub_tcpHandler_ensureReadBufferCapacity(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry);
-static inline bool pubsub_tcpHandler_readHeader(pubsub_tcpHandler_t *handle, int fd, psa_tcp_connection_entry_t *entry, long int* msgSize);
-static inline void pubsub_tcpHandler_decodePayload(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry);
-static inline long int pubsub_tcpHandler_readPayload(pubsub_tcpHandler_t *handle, int fd, psa_tcp_connection_entry_t *entry);
-static inline void pubsub_tcpHandler_connectionHandler(pubsub_tcpHandler_t *handle, int fd);
-static inline void pubsub_tcpHandler_handler(pubsub_tcpHandler_t *handle);
-static void *pubsub_tcpHandler_thread(void *data);
+static inline int pubsub_sktHandler_closeConnectionEntry(pubsub_sktHandler_t *handle, psa_skt_connection_entry_t *entry, bool lock);
+static inline int pubsub_sktHandler_closeInterfaceEntry(pubsub_sktHandler_t *handle, psa_skt_connection_entry_t *entry);
+static inline int pubsub_sktHandler_makeNonBlocking(pubsub_sktHandler_t *handle, int fd);
+static inline struct sockaddr_in pubsub_sktHandler_getMultiCastAddr(psa_skt_connection_entry_t *entry, struct sockaddr_in* sin, struct sockaddr_in* intf_addr );
+static inline psa_skt_connection_entry_t* pubsub_sktHandler_createEntry(pubsub_sktHandler_t *handle, int fd, char *url, char *interface_url, struct sockaddr_in *addr);
+static inline void pubsub_sktHandler_freeEntry(psa_skt_connection_entry_t *entry);
+static inline void pubsub_sktHandler_releaseEntryBuffer(pubsub_sktHandler_t *handle, int fd, unsigned int index);
+static inline long int pubsub_sktHandler_getMsgSize(psa_skt_connection_entry_t *entry);
+static inline void pubsub_sktHandler_ensureReadBufferCapacity(pubsub_sktHandler_t *handle, psa_skt_connection_entry_t *entry);
+static inline bool pubsub_sktHandler_readHeader(pubsub_sktHandler_t *handle, int fd, psa_skt_connection_entry_t *entry, long int* msgSize);
+static inline void pubsub_sktHandler_decodePayload(pubsub_sktHandler_t *handle, psa_skt_connection_entry_t *entry);
+static inline long int pubsub_sktHandler_readPayload(pubsub_sktHandler_t *handle, int fd, psa_skt_connection_entry_t *entry);
+static inline void pubsub_sktHandler_connectionHandler(pubsub_sktHandler_t *handle, int fd);
+static inline void pubsub_sktHandler_handler(pubsub_sktHandler_t *handle);
+static void *pubsub_sktHandler_thread(void *data);
//
// Create a handle
//
-pubsub_tcpHandler_t *pubsub_tcpHandler_create(pubsub_protocol_service_t *protocol, celix_log_helper_t *logHelper) {
- pubsub_tcpHandler_t *handle = calloc(sizeof(*handle), 1);
+pubsub_sktHandler_t *pubsub_sktHandler_create(pubsub_protocol_service_t *protocol, celix_log_helper_t *logHelper) {
+ pubsub_sktHandler_t *handle = calloc(sizeof(*handle), 1);
if (handle != NULL) {
#if defined(__APPLE__)
handle->efd = kqueue();
#else
handle->efd = epoll_create1(0);
#endif
+ handle->fd = -1;
handle->connection_url_map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
handle->connection_fd_map = hashMap_create(NULL, NULL, NULL, NULL);
handle->interface_url_map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
@@ -168,7 +177,7 @@ pubsub_tcpHandler_t *pubsub_tcpHandler_create(pubsub_protocol_service_t *protoco
handle->bufferSize = MAX_DEFAULT_BUFFER_SIZE;
celixThreadRwlock_create(&handle->dbLock, 0);
handle->running = true;
- celixThread_create(&handle->thread, NULL, pubsub_tcpHandler_thread, handle);
+ celixThread_create(&handle->thread, NULL, pubsub_sktHandler_thread, handle);
// signal(SIGPIPE, SIG_IGN);
}
return handle;
@@ -177,7 +186,7 @@ pubsub_tcpHandler_t *pubsub_tcpHandler_create(pubsub_protocol_service_t *protoco
//
// Destroys the handle
//
-void pubsub_tcpHandler_destroy(pubsub_tcpHandler_t *handle) {
+void pubsub_sktHandler_destroy(pubsub_sktHandler_t *handle) {
if (handle != NULL) {
celixThreadRwlock_readLock(&handle->dbLock);
bool running = handle->running;
@@ -189,19 +198,18 @@ void pubsub_tcpHandler_destroy(pubsub_tcpHandler_t *handle) {
celixThread_join(handle->thread, NULL);
}
celixThreadRwlock_writeLock(&handle->dbLock);
- hash_map_iterator_t interface_iter = hashMapIterator_construct(handle->interface_url_map);
- while (hashMapIterator_hasNext(&interface_iter)) {
- psa_tcp_connection_entry_t *entry = hashMapIterator_nextValue(&interface_iter);
+ hash_map_iterator_t connection_iter = hashMapIterator_construct(handle->connection_url_map);
+ while (hashMapIterator_hasNext(&connection_iter)) {
+ psa_skt_connection_entry_t *entry = hashMapIterator_nextValue(&connection_iter);
if (entry != NULL) {
- pubsub_tcpHandler_closeInterfaceEntry(handle, entry);
+ pubsub_sktHandler_closeConnectionEntry(handle, entry, true);
}
}
-
- hash_map_iterator_t connection_iter = hashMapIterator_construct(handle->connection_url_map);
- while (hashMapIterator_hasNext(&connection_iter)) {
- psa_tcp_connection_entry_t *entry = hashMapIterator_nextValue(&connection_iter);
+ hash_map_iterator_t interface_iter = hashMapIterator_construct(handle->interface_url_map);
+ while (hashMapIterator_hasNext(&interface_iter)) {
+ psa_skt_connection_entry_t *entry = hashMapIterator_nextValue(&interface_iter);
if (entry != NULL) {
- pubsub_tcpHandler_closeConnectionEntry(handle, entry, true);
+ pubsub_sktHandler_closeInterfaceEntry(handle, entry);
}
}
if (handle->efd >= 0) close(handle->efd);
@@ -215,31 +223,45 @@ void pubsub_tcpHandler_destroy(pubsub_tcpHandler_t *handle) {
}
}
+
+
//
// Open the socket using an url
//
-int pubsub_tcpHandler_open(pubsub_tcpHandler_t *handle, char *url) {
+int pubsub_sktHandler_open(pubsub_sktHandler_t *handle, int socket_type, char *url) {
int rc = 0;
celixThreadRwlock_readLock(&handle->dbLock);
pubsub_utils_url_t *url_info = pubsub_utils_url_parse(url);
- int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
- if (rc >= 0) {
+ int socket_domain = AF_INET;
+ //int socket_type = SOCK_STREAM(tcp); SOCK_DGRAM(udp);
+ if (url_info->protocol) {
+ // IPC is not supported !!!
+ //socket_domain = (!strcmp("ipc", url_info->protocol)) ? AF_LOCAL : AF_INET;
+ int url_socket_type = (!strcmp("udp", url_info->protocol)) ? SOCK_DGRAM : SOCK_STREAM;
+ if (url_socket_type != socket_type) {
+ L_ERROR("[SKT Socket] unexpected url socket type %s != %s \n", url, socket_type==SOCK_STREAM ? "tcp" : "udp");
+ return -1;
+ }
+ }
+ // bool useBind = (socket_type == SOCK_DGRAM) ? false : true;
+ int fd = socket(socket_domain , socket_type, socket_type == SOCK_STREAM ? IPPROTO_TCP : IPPROTO_UDP);
+ if (fd >= 0) {
int setting = 1;
- if (rc == 0) {
- rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &setting, sizeof(setting));
- if (rc != 0) {
- close(fd);
- L_ERROR("[TCP Socket] Error setsockopt(SO_REUSEADDR): %s\n", strerror(errno));
- }
+ rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &setting, sizeof(setting));
+ if (rc != 0) {
+ close(fd);
+ L_ERROR("[SKT Handler] Error setsockopt(SO_REUSEADDR): %s\n", strerror(errno));
}
- if (rc == 0) {
- rc = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &setting, sizeof(setting));
- if (rc != 0) {
- close(fd);
- L_ERROR("[TCP Socket] Error setsockopt(TCP_NODELAY): %s\n", strerror(errno));
+ if (socket_type == SOCK_STREAM) {
+ if (rc == 0) {
+ rc = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &setting, sizeof(setting));
+ if (rc != 0) {
+ close(fd);
+ L_ERROR("[TCP SKT Handler] Error setsockopt(SKT_NODELAY): %s\n", strerror(errno));
+ }
+ } else {
+ L_ERROR("[TCP SKT Handler] Error creating socket: %s\n", strerror(errno));
}
- } else {
- L_ERROR("[TCP Socket] Error creating socket: %s\n", strerror(errno));
}
if (rc == 0 && handle->sendTimeout != 0.0) {
struct timeval tv;
@@ -247,7 +269,7 @@ int pubsub_tcpHandler_open(pubsub_tcpHandler_t *handle, char *url) {
tv.tv_usec = (long int) ((handle->sendTimeout - tv.tv_sec) * 1000000.0);
rc = setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv));
if (rc != 0) {
- L_ERROR("[TCP Socket] Error setsockopt (SO_SNDTIMEO) to set send timeout: %s", strerror(errno));
+ L_ERROR("[SKT Handler] Error setsockopt (SO_SNDTIMEO) to set send timeout: %s", strerror(errno));
}
}
if (rc == 0 && handle->rcvTimeout != 0.0) {
@@ -256,18 +278,11 @@ int pubsub_tcpHandler_open(pubsub_tcpHandler_t *handle, char *url) {
tv.tv_usec = (long int) ((handle->rcvTimeout - tv.tv_sec) * 1000000.0);
rc = setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
if (rc != 0) {
- L_ERROR("[TCP Socket] Error setsockopt (SO_RCVTIMEO) to set send timeout: %s", strerror(errno));
+ L_ERROR("[SKT Handler] Error setsockopt (SO_RCVTIMEO) to set send timeout: %s", strerror(errno));
}
}
- struct sockaddr_in *addr = pubsub_utils_url_getInAddr(url_info->hostname, url_info->port_nr);
- if (addr) {
- rc = bind(fd, (struct sockaddr *) addr, sizeof(struct sockaddr));
- if (rc != 0) {
- close(fd);
- L_ERROR("[TCP Socket] Error bind: %s\n", strerror(errno));
- }
- free(addr);
- }
+ } else {
+ L_ERROR("[SKT Handler] Error creating socket: %s\n", strerror(errno));
}
pubsub_utils_url_free(url_info);
celixThreadRwlock_unlock(&handle->dbLock);
@@ -275,23 +290,60 @@ int pubsub_tcpHandler_open(pubsub_tcpHandler_t *handle, char *url) {
}
//
+// Open the socket using an url
+//
+int pubsub_sktHandler_bind(pubsub_sktHandler_t *handle, int fd, char *url, unsigned int port_nr) {
+ int rc = 0;
+ celixThreadRwlock_readLock(&handle->dbLock);
+ struct sockaddr_in *addr = NULL;
+ socklen_t length = sizeof(int);
+ int socket_domain;
+ rc = getsockopt(fd, SOL_SOCKET, SO_DOMAIN, &socket_domain, &length);
+
+ if (url) {
+ pubsub_utils_url_t *url_info = pubsub_utils_url_parse(url);
+ if (url_info->interface) {
+ addr = pubsub_utils_url_getInAddr(url_info->interface, (!port_nr) ? url_info->interface_port_nr : port_nr);
+ } else {
+ addr = pubsub_utils_url_getInAddr(url_info->hostname, (!port_nr) ? url_info->port_nr : port_nr);
+ }
+ pubsub_utils_url_free(url_info);
+ } else {
+ addr = pubsub_utils_url_getInAddr(NULL, port_nr);
+ }
+ if (addr) {
+ addr->sin_family = socket_domain;
+ rc = bind(fd, (struct sockaddr *) addr, sizeof(struct sockaddr));
+ if (rc != 0) {
+ close(fd);
+ L_ERROR("[SKT Handler] Error bind: %s\n", strerror(errno));
+ fd = -1;
+ }
+ free(addr);
+ }
+ celixThreadRwlock_unlock(&handle->dbLock);
+ return (!rc) ? fd : rc;
+}
+
+
+//
// Closes the discriptor with it's connection/interfaces (receiver/sender)
//
-int pubsub_tcpHandler_close(pubsub_tcpHandler_t *handle, int fd) {
+int pubsub_sktHandler_close(pubsub_sktHandler_t *handle, int fd) {
int rc = 0;
if (handle != NULL) {
- psa_tcp_connection_entry_t *entry = NULL;
+ psa_skt_connection_entry_t *entry = NULL;
celixThreadRwlock_writeLock(&handle->dbLock);
entry = hashMap_get(handle->interface_fd_map, (void *) (intptr_t) fd);
if (entry) {
entry = hashMap_remove(handle->interface_url_map, (void *) (intptr_t) entry->url);
- rc = pubsub_tcpHandler_closeInterfaceEntry(handle, entry);
+ rc = pubsub_sktHandler_closeInterfaceEntry(handle, entry);
entry = NULL;
}
entry = hashMap_get(handle->connection_fd_map, (void *) (intptr_t) fd);
if (entry) {
entry = hashMap_remove(handle->connection_url_map, (void *) (intptr_t) entry->url);
- rc = pubsub_tcpHandler_closeConnectionEntry(handle, entry, false);
+ rc = pubsub_sktHandler_closeConnectionEntry(handle, entry, false);
entry = NULL;
}
celixThreadRwlock_unlock(&handle->dbLock);
@@ -302,12 +354,11 @@ int pubsub_tcpHandler_close(pubsub_tcpHandler_t *handle, int fd) {
//
// Create connection/interface entry
//
-static inline psa_tcp_connection_entry_t *
-pubsub_tcpHandler_createEntry(pubsub_tcpHandler_t *handle, int fd, char *url, char *interface_url,
- struct sockaddr_in *addr) {
- psa_tcp_connection_entry_t *entry = NULL;
+static inline psa_skt_connection_entry_t *
+pubsub_sktHandler_createEntry(pubsub_sktHandler_t *handle, int fd, char *url, char *interface_url, struct sockaddr_in *addr) {
+ psa_skt_connection_entry_t *entry = NULL;
if (fd >= 0) {
- entry = calloc(sizeof(psa_tcp_connection_entry_t), 1);
+ entry = calloc(sizeof(psa_skt_connection_entry_t), 1);
entry->fd = fd;
celixThreadMutex_create(&entry->writeMutex, NULL);
if (url) {
@@ -315,17 +366,18 @@ pubsub_tcpHandler_createEntry(pubsub_tcpHandler_t *handle, int fd, char *url, ch
}
if (interface_url) {
entry->interface_url = strndup(interface_url, 1024 * 1024);
- } else {
- if (url) {
- entry->interface_url = strndup(url, 1024 * 1024);
- }
- }
- if (addr) {
- entry->addr = *addr;
}
entry->len = sizeof(struct sockaddr_in);
size_t headerSize = 0;
size_t footerSize = 0;
+ socklen_t length = sizeof(int);
+ getsockopt(fd, SOL_SOCKET, SO_DOMAIN, &entry->socket_domain, &length);
+ getsockopt(fd, SOL_SOCKET, SO_TYPE, &entry->socket_type, &length);
+ if (addr) {
+ entry->addr = *addr;
+ entry->addr.sin_family = entry->socket_domain;
+ }
+ entry->protocol = strndup((entry->socket_type == SOCK_STREAM) ? "tcp" : "udp",10);
handle->protocol->getHeaderSize(handle->protocol->handle, &headerSize);
handle->protocol->getFooterSize(handle->protocol->handle, &footerSize);
entry->readHeaderBufferSize = headerSize;
@@ -337,13 +389,13 @@ pubsub_tcpHandler_createEntry(pubsub_tcpHandler_t *handle, int fd, char *url, ch
entry->connected = false;
unsigned minimalMsgSize = entry->writeHeaderBufferSize + entry->writeFooterBufferSize;
if ((minimalMsgSize > handle->maxMsgSize) && (handle->maxMsgSize)) {
- L_ERROR("[TCP Socket] maxMsgSize (%d) < headerSize + FooterSize (%d): %s\n", handle->maxMsgSize, minimalMsgSize);
+ L_ERROR("[SKT Handler] maxMsgSize (%d) < headerSize + FooterSize (%d): %s\n", handle->maxMsgSize, minimalMsgSize);
} else {
entry->maxMsgSize = (handle->maxMsgSize) ? handle->maxMsgSize : LONG_MAX;
}
entry->readHeaderBuffer = calloc(sizeof(char), headerSize);
entry->writeHeaderBuffer = calloc(sizeof(char), headerSize);
- if (entry->readFooterBufferSize ) entry->readFooterBuffer = calloc(sizeof(char), entry->readFooterBufferSize );
+ if (entry->readFooterBufferSize ) entry->readFooterBuffer = calloc(sizeof(char), entry->readFooterBufferSize);
if (entry->writeFooterBufferSize) entry->writeFooterBuffer = calloc(sizeof(char), entry->writeFooterBufferSize);
if (entry->bufferSize) entry->buffer = calloc(sizeof(char), entry->bufferSize);
memset(&entry->readMsg, 0x00, sizeof(struct msghdr));
@@ -356,12 +408,12 @@ pubsub_tcpHandler_createEntry(pubsub_tcpHandler_t *handle, int fd, char *url, ch
// Free connection/interface entry
//
static inline void
-pubsub_tcpHandler_freeEntry(psa_tcp_connection_entry_t *entry) {
+pubsub_sktHandler_freeEntry(psa_skt_connection_entry_t *entry) {
if (entry) {
free(entry->url);
free(entry->interface_url);
- if (entry->fd >= 0) close(entry->fd);
free(entry->buffer);
+ free(entry->protocol);
free(entry->readHeaderBuffer);
free(entry->writeHeaderBuffer);
free(entry->readFooterBuffer);
@@ -378,23 +430,94 @@ pubsub_tcpHandler_freeEntry(psa_tcp_connection_entry_t *entry) {
// Releases the Buffer
//
static inline void
-pubsub_tcpHandler_releaseEntryBuffer(pubsub_tcpHandler_t *handle, int fd, unsigned int index __attribute__((unused))) {
- psa_tcp_connection_entry_t *entry = hashMap_get(handle->connection_fd_map, (void *) (intptr_t) fd);
+pubsub_sktHandler_releaseEntryBuffer(pubsub_sktHandler_t *handle, int fd, unsigned int index __attribute__((unused))) {
+ psa_skt_connection_entry_t *entry = hashMap_get(handle->connection_fd_map, (void *) (intptr_t) fd);
if (entry != NULL) {
entry->buffer = NULL;
entry->bufferSize = 0;
}
}
+static
+int pubsub_sktHandler_add_fd_event(pubsub_sktHandler_t *handle, psa_skt_connection_entry_t *entry, bool useInputEvent)
+{
+ int rc = 0;
+ if ((handle->efd >= 0) && entry) {
+#if defined(__APPLE__)
+ struct kevent ev;
+ EV_SET (&ev, fd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, 0); // EVFILT_READ | EVFILT_WRITE
+ rc = kevent(handle->efd, &ev, 1, NULL, 0, NULL);
+#else
+ struct epoll_event event;
+ bzero(&event, sizeof(event)); // zero the struct
+ event.events = EPOLLRDHUP | EPOLLERR;
+ if (useInputEvent) {
+ event.events |= EPOLLIN;
+ }
+ event.data.fd = entry->fd;
+ rc = epoll_ctl(handle->efd, EPOLL_CTL_ADD, entry->fd, &event);
+#endif
+
+ if (rc < 0) {
+ L_ERROR("[ %s SKT Handler] Cannot create poll: %s\n", entry->protocol, strerror(errno));
+ errno = 0;
+ }
+ }
+ return rc;
+};
+
+
+
+
+//
+// Connect to url (receiver)
+//
+static
+int pubsub_sktHandler_config_udp_connect(pubsub_sktHandler_t *handle, psa_skt_connection_entry_t *entry, pubsub_utils_url_t *url_info) {
+ int rc = 0;
+ struct sockaddr_in *addr = pubsub_utils_url_getInAddr(url_info->hostname, url_info->port_nr);
+ if (!addr) return -1;
+ bool is_multicast = pubsub_utils_url_is_multicast(url_info->hostname);
+ bool is_broadcast = pubsub_utils_url_is_broadcast(url_info->hostname);
+ if (is_multicast) {
+ struct ip_mreq mc_addr;
+ bzero(&mc_addr, sizeof(struct ip_mreq));
+ mc_addr.imr_multiaddr.s_addr = addr->sin_addr.s_addr;
+ mc_addr.imr_interface.s_addr = entry->addr.sin_addr.s_addr;
+ if (rc == 0) {
+ rc = setsockopt(entry->fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, (char *) &mc_addr, sizeof(mc_addr));
+ }
+ if (rc != 0) {
+ L_ERROR("[UDP SKT Handler] Error setsockopt (IP_ADD_MEMBERSHIP): %s", strerror(errno));
+ }
+ } else if (is_broadcast) {
+ int setting = 1;
+ rc = setsockopt(entry->fd, SOL_SOCKET, SO_BROADCAST, &setting, sizeof(setting));
+ if (rc != 0) {
+ L_ERROR("[UDP SKT Handler] Error setsockopt(SO_BROADCAST): %s", strerror(errno));
+ }
+ } else {
+ entry->dst_addr = *addr;
+ }
+
+ if (rc != 0) {
+ L_ERROR("[UDP SKT Handler] Cannot connect %s\n", strerror(errno));
+ }
+
+ free(addr);
+ return rc;
+}
+
//
// Connect to url (receiver)
//
-int pubsub_tcpHandler_connect(pubsub_tcpHandler_t *handle, char *url) {
+int pubsub_sktHandler_tcp_connect(pubsub_sktHandler_t *handle, char *url) {
int rc = 0;
- psa_tcp_connection_entry_t *entry = hashMap_get(handle->connection_url_map, (void *) (intptr_t) url);
+ psa_skt_connection_entry_t *entry = hashMap_get(handle->connection_url_map, (void *) (intptr_t) url);
if (entry == NULL) {
pubsub_utils_url_t *url_info = pubsub_utils_url_parse(url);
- int fd = pubsub_tcpHandler_open(handle, url_info->interface_url);
+ int fd = pubsub_sktHandler_open(handle, SOCK_STREAM, url_info->interface_url);
+ fd = pubsub_sktHandler_bind(handle, fd, url_info->interface_url, 0);
rc = fd;
// Connect to sender
struct sockaddr_in sin;
@@ -405,41 +528,98 @@ int pubsub_tcpHandler_connect(pubsub_tcpHandler_t *handle, char *url) {
if ((rc >= 0) && addr) {
rc = connect(fd, (struct sockaddr *) addr, sizeof(struct sockaddr));
if (rc < 0 && errno != EINPROGRESS) {
- L_ERROR("[TCP Socket] Cannot connect to %s:%d: using %s err(%d): %s\n", url_info->hostname, url_info->port_nr, interface_url, errno, strerror(errno));
+ L_ERROR("[TCP SKT Handler] Cannot connect to %s:%d: using %s err(%d): %s\n", url_info->hostname, url_info->port_nr, interface_url, errno, strerror(errno));
close(fd);
} else {
- entry = pubsub_tcpHandler_createEntry(handle, fd, url, interface_url, &sin);
+ entry = pubsub_sktHandler_createEntry(handle, fd, url, interface_url, &sin);
}
free(addr);
}
free(interface_url);
- // Subscribe File Descriptor to epoll
+ if (rc >= 0) {
+ rc = pubsub_sktHandler_add_fd_event(handle, entry, true);
+ }
+
+ if (rc < 0) {
+ pubsub_sktHandler_freeEntry(entry);
+ entry = NULL;
+ }
+
if ((rc >= 0) && (entry)) {
-#if defined(__APPLE__)
- struct kevent ev;
- EV_SET (&ev, entry->fd, EVFILT_READ | EVFILT_WRITE, EV_ADD | EV_ENABLE, 0, 0, 0);
- rc = kevent (handle->efd, &ev, 1, NULL, 0, NULL);
-#else
- struct epoll_event event;
- bzero(&event, sizeof(struct epoll_event)); // zero the struct
- event.events = EPOLLIN | EPOLLRDHUP | EPOLLERR;
- event.data.fd = entry->fd;
- rc = epoll_ctl(handle->efd, EPOLL_CTL_ADD, entry->fd, &event);
-#endif
- if (rc < 0) {
- pubsub_tcpHandler_freeEntry(entry);
- L_ERROR("[TCP Socket] Cannot create poll event %s\n", strerror(errno));
- entry = NULL;
+ celixThreadRwlock_writeLock(&handle->dbLock);
+ hashMap_put(handle->connection_url_map, entry->url, entry);
+ hashMap_put(handle->connection_fd_map, (void *) (intptr_t) entry->fd, entry);
+ celixThreadRwlock_unlock(&handle->dbLock);
+ pubsub_sktHandler_connectionHandler(handle, fd);
+ if (entry->interface_url) {
+ L_INFO("[TCP SKT Handler] Connect to %s using: %s\n", entry->url, entry->interface_url);
+ } else {
+ L_INFO("[TCP SKT Handler] Connect to %s\n", entry->url);
}
}
- if ((rc >= 0) && (entry)) {
+ pubsub_utils_url_free(url_info);
+ }
+ return rc;
+}
+
+//
+// Connect to url (receiver)
+//
+int pubsub_sktHandler_udp_connect(pubsub_sktHandler_t *handle, char *url) {
+ int rc = 0;
+ psa_skt_connection_entry_t *entry = hashMap_get(handle->connection_url_map, (void *) (intptr_t) url);
+ if (entry == NULL) {
+ pubsub_utils_url_t *url_info = pubsub_utils_url_parse(url);
+ bool is_multicast = pubsub_utils_url_is_multicast(url_info->hostname);
+ bool is_broadcast = pubsub_utils_url_is_broadcast(url_info->hostname);
+ int fd = pubsub_sktHandler_open(handle, SOCK_DGRAM, url_info->interface_url);
+ if (is_multicast || is_broadcast) {
+ fd = pubsub_sktHandler_bind(handle, fd, NULL, url_info->port_nr);
+ } else {
+ fd = pubsub_sktHandler_bind(handle, fd, url_info->interface_url, 0);
+ }
+ rc = fd;
+ char *pUrl = NULL;
+ struct sockaddr_in *sin = NULL;
+ sin = pubsub_utils_url_from_fd(fd);
+ // check if socket is bind
+ if (sin->sin_port) {
+ pUrl = pubsub_utils_url_get_url(sin, "udp");
+ }
+ // Make handler fd entry
+ if (fd >= 0) {
+ entry = pubsub_sktHandler_createEntry(handle, fd, url_info->url, pUrl, sin);
+ rc = pubsub_sktHandler_config_udp_connect(handle, entry, url_info);
+ }
+
+ if (rc >= 0) {
+ rc = pubsub_sktHandler_add_fd_event(handle, entry, true);
+ }
+
+ if (rc < 0) {
+ pubsub_sktHandler_freeEntry(entry);
+ entry = NULL;
+ }
+
+ if ((rc>=0) && entry) {
+ L_INFO("[%s SKT Handler] Using %s for service annunciation", entry->protocol, entry->url);
celixThreadRwlock_writeLock(&handle->dbLock);
hashMap_put(handle->connection_url_map, entry->url, entry);
hashMap_put(handle->connection_fd_map, (void *) (intptr_t) entry->fd, entry);
+ psa_skt_connection_entry_t *interface_entry = pubsub_sktHandler_createEntry(handle, entry->fd, entry->url, entry->interface_url, &entry->addr);
+ hashMap_put(handle->interface_fd_map, (void *) (intptr_t) interface_entry->fd, interface_entry);
+ hashMap_put(handle->interface_url_map, entry->interface_url ? interface_entry->interface_url : interface_entry->url, interface_entry);
celixThreadRwlock_unlock(&handle->dbLock);
- pubsub_tcpHandler_connectionHandler(handle, fd);
- L_INFO("[TCP Socket] Connect to %s using: %s\n", entry->url, entry->interface_url);
+ pubsub_sktHandler_connectionHandler(handle, entry->fd);
+ __atomic_store_n(&interface_entry->connected, true, __ATOMIC_RELEASE);
+ __atomic_store_n(&entry->connected, true, __ATOMIC_RELEASE);
+ if (!entry->interface_url) {
+ L_INFO("[%s SKT Handler] Connect to %s", entry->protocol, entry->url);
+ } else {
+ L_INFO("[%s SKT Handler] Connect to %s using: %s", entry->protocol, entry->url, entry->interface_url);
+ }
}
+ free(sin);
pubsub_utils_url_free(url_info);
}
return rc;
@@ -448,14 +628,14 @@ int pubsub_tcpHandler_connect(pubsub_tcpHandler_t *handle, char *url) {
//
// Disconnect from url
//
-int pubsub_tcpHandler_disconnect(pubsub_tcpHandler_t *handle, char *url) {
+int pubsub_sktHandler_disconnect(pubsub_sktHandler_t *handle, char *url) {
int rc = 0;
if (handle != NULL) {
celixThreadRwlock_writeLock(&handle->dbLock);
- psa_tcp_connection_entry_t *entry = NULL;
+ psa_skt_connection_entry_t *entry = NULL;
entry = hashMap_remove(handle->connection_url_map, url);
if (entry) {
- pubsub_tcpHandler_closeConnectionEntry(handle, entry, false);
+ pubsub_sktHandler_closeConnectionEntry(handle, entry, false);
}
celixThreadRwlock_unlock(&handle->dbLock);
}
@@ -464,24 +644,27 @@ int pubsub_tcpHandler_disconnect(pubsub_tcpHandler_t *handle, char *url) {
// loses the connection entry (of receiver)
//
-static inline int pubsub_tcpHandler_closeConnectionEntry(
- pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, bool lock) {
+static inline int pubsub_sktHandler_closeConnectionEntry(
+ pubsub_sktHandler_t *handle, psa_skt_connection_entry_t *entry, bool lock) {
int rc = 0;
if (handle != NULL && entry != NULL) {
- fprintf(stdout, "[TCP Socket] Close connection to url: %s: \n", entry->url);
+ L_INFO("[%s SKT Handler] Close connection to url: %s: ", entry->protocol, entry->url);
hashMap_remove(handle->connection_fd_map, (void *) (intptr_t) entry->fd);
if ((handle->efd >= 0)) {
+ // For TCP remove the connection socket
+ if (entry->socket_type == SOCK_STREAM) {
#if defined(__APPLE__)
- struct kevent ev;
- EV_SET (&ev, entry->fd, EVFILT_READ, EV_DELETE , 0, 0, 0);
- rc = kevent (handle->efd, &ev, 1, NULL, 0, NULL);
+ struct kevent ev;
+ EV_SET (&ev, entry->fd, EVFILT_READ, EV_DELETE , 0, 0, 0);
+ rc = kevent (handle->efd, &ev, 1, NULL, 0, NULL);
#else
- struct epoll_event event;
- bzero(&event, sizeof(struct epoll_event)); // zero the struct
- rc = epoll_ctl(handle->efd, EPOLL_CTL_DEL, entry->fd, &event);
+ struct epoll_event event;
+ bzero(&event, sizeof(struct epoll_event)); // zero the struct
+ rc = epoll_ctl(handle->efd, EPOLL_CTL_DEL, entry->fd, &event);
#endif
- if (rc < 0) {
- L_ERROR("[PSA TCP] Error disconnecting %s\n", strerror(errno));
+ if (rc < 0) {
+ L_ERROR("[SKT Handler] Error disconnecting %s", strerror(errno));
+ }
}
}
if (entry->fd >= 0) {
@@ -489,7 +672,10 @@ static inline int pubsub_tcpHandler_closeConnectionEntry(
handle->receiverDisconnectMessageCallback(handle->receiverConnectPayload, entry->url, lock);
if (handle->acceptConnectMessageCallback)
handle->acceptConnectMessageCallback(handle->acceptConnectPayload, entry->url);
- pubsub_tcpHandler_freeEntry(entry);
+ if (entry->socket_type == SOCK_STREAM) {
+ close(entry->fd);
+ }
+ pubsub_sktHandler_freeEntry(entry);
entry = NULL;
}
}
@@ -500,11 +686,11 @@ static inline int pubsub_tcpHandler_closeConnectionEntry(
// Closes the interface entry (of sender)
//
static inline int
-pubsub_tcpHandler_closeInterfaceEntry(pubsub_tcpHandler_t *handle,
- psa_tcp_connection_entry_t *entry) {
+pubsub_sktHandler_closeInterfaceEntry(pubsub_sktHandler_t *handle,
+ psa_skt_connection_entry_t *entry) {
int rc = 0;
if (handle != NULL && entry != NULL) {
- L_INFO("[TCP Socket] Close interface url: %s: \n", entry->url);
+ L_INFO("[%s SKT Handler] Close interface url: %s: ", entry->protocol ,entry->interface_url ? entry->interface_url : entry->url);
hashMap_remove(handle->interface_fd_map, (void *) (intptr_t) entry->fd);
if ((handle->efd >= 0)) {
#if defined(__APPLE__)
@@ -517,20 +703,37 @@ pubsub_tcpHandler_closeInterfaceEntry(pubsub_tcpHandler_t *handle,
rc = epoll_ctl(handle->efd, EPOLL_CTL_DEL, entry->fd, &event);
#endif
if (rc < 0) {
- L_ERROR("[PSA TCP] Error disconnecting %s\n", strerror(errno));
+ L_ERROR("[SKT Handler] Error disconnecting %s", strerror(errno));
}
}
if (entry->fd >= 0) {
- pubsub_tcpHandler_freeEntry(entry);
+ close(entry->fd);
+ pubsub_sktHandler_freeEntry(entry);
}
}
return rc;
}
+static inline
+struct sockaddr_in pubsub_sktHandler_getMultiCastAddr(psa_skt_connection_entry_t *entry, struct sockaddr_in* sin, struct sockaddr_in* intf_addr ) {
+ pubsub_utils_url_t* multiCastUrl = calloc(1, sizeof(pubsub_utils_url_t));
+ pubsub_utils_url_parse_url(entry->url, multiCastUrl);
+ char* hostname = NULL;
+ if (multiCastUrl->hostname) hostname = strchr(multiCastUrl->hostname, '/');
+ if (intf_addr->sin_addr.s_addr && hostname) {
+ in_addr_t listDigit = inet_lnaof(sin->sin_addr);
+ in_addr_t listDigitIntf = inet_lnaof(intf_addr->sin_addr);
+ uint32_t s_addr = ntohl(sin->sin_addr.s_addr);
+ sin->sin_addr.s_addr = htonl(s_addr - listDigit + listDigitIntf);
+ }
+ pubsub_utils_url_free(multiCastUrl);
+ return *sin;
+}
+
//
// Make accept file descriptor non blocking
//
-static inline int pubsub_tcpHandler_makeNonBlocking(pubsub_tcpHandler_t *handle, int fd) {
+static inline int pubsub_sktHandler_makeNonBlocking(pubsub_sktHandler_t *handle, int fd) {
int rc = 0;
int flags = fcntl(fd, F_GETFL, 0);
if (flags == -1)
@@ -538,7 +741,7 @@ static inline int pubsub_tcpHandler_makeNonBlocking(pubsub_tcpHandler_t *handle,
else {
rc = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
if (rc < 0) {
- L_ERROR("[TCP Socket] Cannot set to NON_BLOCKING: %s\n", strerror(errno));
+ L_ERROR("[SKT Handler] Cannot set to NON_BLOCKING: %s\n", strerror(errno));
}
}
return rc;
@@ -547,20 +750,20 @@ static inline int pubsub_tcpHandler_makeNonBlocking(pubsub_tcpHandler_t *handle,
//
// setup listening to interface (sender) using an url
//
-int pubsub_tcpHandler_listen(pubsub_tcpHandler_t *handle, char *url) {
+int pubsub_sktHandler_tcp_listen(pubsub_sktHandler_t *handle, char *url) {
int rc = 0;
celixThreadRwlock_readLock(&handle->dbLock);
- psa_tcp_connection_entry_t *entry =
+ psa_skt_connection_entry_t *entry =
hashMap_get(handle->connection_url_map, (void *) (intptr_t) url);
celixThreadRwlock_unlock(&handle->dbLock);
if (entry == NULL) {
- char protocol[] = "tcp";
- int fd = pubsub_tcpHandler_open(handle, url);
+ int fd = pubsub_sktHandler_open(handle, SOCK_STREAM, url);
+ fd = pubsub_sktHandler_bind(handle, fd, url, 0);
rc = fd;
struct sockaddr_in *sin = pubsub_utils_url_from_fd(fd);
// Make handler fd entry
- char *pUrl = pubsub_utils_url_get_url(sin, protocol);
- entry = pubsub_tcpHandler_createEntry(handle, fd, pUrl, NULL, sin);
+ char *pUrl = pubsub_utils_url_get_url(sin, "tcp");
+ entry = pubsub_sktHandler_createEntry(handle, fd, pUrl, NULL, sin);
if (entry != NULL) {
__atomic_store_n(&entry->connected, true, __ATOMIC_RELEASE);
free(pUrl);
@@ -569,46 +772,190 @@ int pubsub_tcpHandler_listen(pubsub_tcpHandler_t *handle, char *url) {
if (rc >= 0) {
rc = listen(fd, SOMAXCONN);
if (rc != 0) {
- L_ERROR("[TCP Socket] Error listen: %s\n", strerror(errno));
- pubsub_tcpHandler_freeEntry(entry);
+ L_ERROR("[TCP SKT Handler] Error listen: %s\n", strerror(errno));
+ pubsub_sktHandler_freeEntry(entry);
entry = NULL;
}
}
if (rc >= 0) {
- rc = pubsub_tcpHandler_makeNonBlocking(handle, fd);
+ rc = pubsub_sktHandler_makeNonBlocking(handle, fd);
if (rc < 0) {
- pubsub_tcpHandler_freeEntry(entry);
+ pubsub_sktHandler_freeEntry(entry);
entry = NULL;
}
}
- if ((rc >= 0) && (handle->efd >= 0)) {
-#if defined(__APPLE__)
- struct kevent ev;
- EV_SET (&ev, fd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, 0);
- rc = kevent(handle->efd, &ev, 1, NULL, 0, NULL);
-#else
- struct epoll_event event;
- bzero(&event, sizeof(event)); // zero the struct
- event.events = EPOLLIN | EPOLLRDHUP | EPOLLERR;
- event.data.fd = fd;
- rc = epoll_ctl(handle->efd, EPOLL_CTL_ADD, fd, &event);
-#endif
- if (rc < 0) {
- L_ERROR("[TCP Socket] Cannot create poll: %s\n", strerror(errno));
- errno = 0;
- pubsub_tcpHandler_freeEntry(entry);
- entry = NULL;
- }
- if (entry) {
- L_INFO("[TCP Socket] Using %s for service annunciation", entry->url);
- hashMap_put(handle->interface_fd_map, (void *) (intptr_t) entry->fd, entry);
- hashMap_put(handle->interface_url_map, entry->url, entry);
+ if (rc >= 0) {
+ rc = pubsub_sktHandler_add_fd_event(handle, entry, true);
+ }
+
+ if (rc < 0) {
+ pubsub_sktHandler_freeEntry(entry);
+ entry = NULL;
+ }
+
+ if ((rc>=0) && entry) {
+ if (entry->interface_url) {
+ L_INFO("[TCP SKT Handler] Using %s:%s for service annunciation", entry->protocol, entry->url, entry->interface_url);
+ } else {
+ L_INFO("[TCP SKT Handler] Using %s for service annunciation", entry->protocol, entry->url);
}
+ hashMap_put(handle->interface_fd_map, (void *) (intptr_t) entry->fd, entry);
+ hashMap_put(handle->interface_url_map, entry->url, entry);
}
celixThreadRwlock_unlock(&handle->dbLock);
} else {
- L_ERROR("[TCP Socket] Error listen socket cannot bind to %s: %s\n", url ? url : "", strerror(errno));
+ L_ERROR("[TCP SKT Handler] Error listen socket cannot bind to %s: %s\n", url ? url : "", strerror(errno));
+ }
+ }
+ return rc;
+}
+
+//
+// setup listening to interface (sender) using an url
+//
+static
+int pubsub_sktHandler_config_udp_bind(pubsub_sktHandler_t *handle, psa_skt_connection_entry_t *entry, pubsub_utils_url_t *url_info) {
+ /** Check UDP type*/
+ int rc = 0;
+ bool is_multicast = pubsub_utils_url_is_multicast(url_info->hostname);
+ bool is_broadcast = pubsub_utils_url_is_broadcast(url_info->hostname);
+ if (is_multicast) {
+ char loop = 1;
+ char ttl = 1;
+ struct sockaddr_in *intf_addr = pubsub_utils_url_getInAddr(url_info->interface, url_info->interface_port_nr);
+ if (!intf_addr) return -1;
+ rc = setsockopt(entry->fd, IPPROTO_IP, IP_MULTICAST_LOOP, &loop, sizeof(loop));
+ if (rc != 0) {
+ L_ERROR("[UDP SKT Handler] Error setsockopt (IP_MULTICAST_LOOP): %s", strerror(errno));
+ pubsub_sktHandler_freeEntry(entry);
+ entry = NULL;
+ }
+ rc = setsockopt(entry->fd, IPPROTO_IP, IP_MULTICAST_TTL, &ttl, sizeof(ttl));
+ if (rc != 0) {
+ L_ERROR("[UDP SKT Handler] Error setsockopt (IP_MULTICAST_LOOP): %s", strerror(errno));
+ pubsub_sktHandler_freeEntry(entry);
+ entry = NULL;
+ }
+ if (!rc) {
+ rc = setsockopt(entry->fd, IPPROTO_IP, IP_MULTICAST_IF, &intf_addr->sin_addr, sizeof(struct in_addr));
+ if (rc != 0) {
+ L_ERROR("[UDP SKT Handler] Error setsockopt(IP_MULTICAST_IF): %s", strerror(errno));
+ pubsub_sktHandler_freeEntry(entry);
+ entry = NULL;
+ }
+ }
+ // bind multi cast address
+ struct sockaddr_in *addr = pubsub_utils_url_getInAddr(url_info->hostname, url_info->port_nr);
+ if (!rc && addr) {
+ rc = bind(entry->fd, (struct sockaddr *) addr, sizeof(*addr));
+ if (rc != 0) {
+ L_ERROR("[UDP SKT Handler] Cannot bind to multicast %s:%d: err(%d): %s\n", url_info->url, url_info->port_nr, errno, strerror(errno));
+ pubsub_sktHandler_freeEntry(entry);
+ entry = NULL;
+ } else {
+ struct sockaddr_in *sin = pubsub_utils_url_from_fd(entry->fd);
+ entry->dst_addr = pubsub_sktHandler_getMultiCastAddr(entry, sin, &entry->addr);
+ free(sin);
+ }
+ }
+ free(addr);
+ free(intf_addr);
+ } else if (is_broadcast) {
+ int setting = 1;
+ if (!rc) {
+ rc = setsockopt(entry->fd, SOL_SOCKET, SO_BROADCAST, &setting, sizeof(setting));
+ }
+ if (!entry->dst_addr.sin_port) entry->dst_addr.sin_port = entry->addr.sin_port;
+ if (rc != 0) {
+ L_ERROR("[UDP SKT Handler] Error setsockopt(SO_BROADCAST): %s", strerror(errno));
+ pubsub_sktHandler_freeEntry(entry);
+ entry = NULL;
+ }
+ }
+
+ // Store connection);
+ if (!rc && entry) {
+ if (is_multicast || is_broadcast) {
+ free(entry->url);
+ free(entry->interface_url);
+ entry->url = pubsub_utils_url_get_url(&entry->dst_addr, url_info->protocol ? url_info->protocol : entry->protocol);
+ entry->interface_url = pubsub_utils_url_get_url(&entry->addr, url_info->protocol ? url_info->protocol : entry->protocol);
+ }
+ psa_skt_connection_entry_t *connection_entry = pubsub_sktHandler_createEntry(handle, entry->fd, entry->url, entry->interface_url, &entry->addr);
+ connection_entry->dst_addr = entry->dst_addr;
+ __atomic_store_n(&entry->connected, true, __ATOMIC_RELEASE);
+ __atomic_store_n(&connection_entry->connected, true, __ATOMIC_RELEASE);
+ hashMap_put(handle->connection_fd_map, (void *) (intptr_t) connection_entry->fd, connection_entry);
+ hashMap_put(handle->connection_url_map, connection_entry->url, connection_entry);
+ }
+
+ // Remove not connected interface
+ if (!entry->connected) {
+ pubsub_sktHandler_freeEntry(entry);
+ entry = NULL;
+ }
+ if (!rc && entry) {
+ rc = pubsub_sktHandler_add_fd_event(handle, entry, (!is_multicast && !is_broadcast));
+ }
+ if (rc < 0) {
+ pubsub_sktHandler_freeEntry(entry);
+ entry = NULL;
+ }
+
+ if ((rc>=0) && entry) {
+ if (entry->interface_url) {
+ L_INFO("[UDP SKT Handler] Using %s:%s for service annunciation", entry->protocol, entry->url, entry->interface_url);
+ } else {
+ L_INFO("[UDP SKT Handler] Using %s for service annunciation", entry->protocol, entry->url);
}
+ hashMap_put(handle->interface_fd_map, (void *) (intptr_t) entry->fd, entry);
+ hashMap_put(handle->interface_url_map, entry->url, entry);
+ }
+ return rc;
+}
+
+//
+// setup listening to interface (sender) using an url
+//
+int pubsub_sktHandler_udp_bind(pubsub_sktHandler_t *handle, char *url) {
+ int rc = 0;
+ celixThreadRwlock_readLock(&handle->dbLock);
+ psa_skt_connection_entry_t *entry = hashMap_get(handle->interface_url_map, (void *) (intptr_t) url);
+ celixThreadRwlock_unlock(&handle->dbLock);
+ if (entry == NULL) {
+ pubsub_utils_url_t *url_info = pubsub_utils_url_parse(url);
+ bool is_multicast = pubsub_utils_url_is_multicast(url_info->hostname);
+ bool is_broadcast = pubsub_utils_url_is_broadcast(url_info->hostname);
+ int fd = pubsub_sktHandler_open(handle, SOCK_DGRAM, url);
+ char *pUrl = NULL;
+ struct sockaddr_in *sin = NULL;
+ if (!is_multicast) {
+ // Make handler fd entry
+ if (is_broadcast) {
+ fd = pubsub_sktHandler_bind(handle, fd, url_info->interface_url ? url_info->interface_url : NULL, url_info->port_nr);
+ } else {
+ fd = pubsub_sktHandler_bind(handle, fd, url, 0);
+ }
+ sin = pubsub_utils_url_from_fd(fd);
+ pUrl = pubsub_utils_url_get_url(sin, "udp");
+ }
+ rc = fd;
+ if (is_multicast || is_broadcast) {
+ // Create entry for multicast / broadcast
+ entry = pubsub_sktHandler_createEntry(handle, fd, url_info->url, pUrl, sin);
+ } else {
+ // Create entry for unicast
+ entry = pubsub_sktHandler_createEntry(handle, fd, pUrl, NULL, sin);
+ }
+ if (entry != NULL) {
+ celixThreadRwlock_writeLock(&handle->dbLock);
+ rc = pubsub_sktHandler_config_udp_bind(handle, entry, url_info);
+ celixThreadRwlock_unlock(&handle->dbLock);
+ } else {
+ L_ERROR("[UDP SKT Socket] Error publish socket cannot bind to %s: %s\n", url ? url : "", strerror(errno));
+ }
+ free(sin);
+ pubsub_utils_url_free(url_info);
}
return rc;
}
@@ -618,7 +965,7 @@ int pubsub_tcpHandler_listen(pubsub_tcpHandler_t *handle, char *url) {
// This size is used to allocated the initial read buffer, to avoid receive buffer reallocting.
// The default receive buffer is allocated in the createEntry when the connection is establised
//
-int pubsub_tcpHandler_setReceiveBufferSize(pubsub_tcpHandler_t *handle, unsigned int size) {
+int pubsub_sktHandler_setReceiveBufferSize(pubsub_sktHandler_t *handle, unsigned int size) {
if (handle != NULL) {
celixThreadRwlock_writeLock(&handle->dbLock);
handle->bufferSize = size;
@@ -630,7 +977,7 @@ int pubsub_tcpHandler_setReceiveBufferSize(pubsub_tcpHandler_t *handle, unsigned
//
// Set Maximum message size
//
-int pubsub_tcpHandler_setMaxMsgSize(pubsub_tcpHandler_t *handle, unsigned int size) {
+int pubsub_sktHandler_setMaxMsgSize(pubsub_sktHandler_t *handle, unsigned int size) {
if (handle != NULL) {
celixThreadRwlock_writeLock(&handle->dbLock);
handle->maxMsgSize = size;
@@ -642,7 +989,7 @@ int pubsub_tcpHandler_setMaxMsgSize(pubsub_tcpHandler_t *handle, unsigned int si
//
// Setup thread timeout
//
-void pubsub_tcpHandler_setTimeout(pubsub_tcpHandler_t *handle,
+void pubsub_sktHandler_setTimeout(pubsub_sktHandler_t *handle,
unsigned int timeout) {
if (handle != NULL) {
celixThreadRwlock_writeLock(&handle->dbLock);
@@ -654,14 +1001,14 @@ void pubsub_tcpHandler_setTimeout(pubsub_tcpHandler_t *handle,
//
// Setup thread name
//
-void pubsub_tcpHandler_setThreadName(pubsub_tcpHandler_t *handle,
+void pubsub_sktHandler_setThreadName(pubsub_sktHandler_t *handle,
const char *topic, const char *scope) {
if ((handle != NULL) && (topic)) {
char *thread_name = NULL;
if ((scope) && (topic))
- asprintf(&thread_name, "TCP TS %s/%s", scope, topic);
+ asprintf(&thread_name, "SKT TS %s/%s", scope, topic);
else
- asprintf(&thread_name, "TCP TS %s", topic);
+ asprintf(&thread_name, "SKT TS %s", topic);
celixThreadRwlock_writeLock(&handle->dbLock);
celixThread_setName(&handle->thread, thread_name);
celixThreadRwlock_unlock(&handle->dbLock);
@@ -672,18 +1019,11 @@ void pubsub_tcpHandler_setThreadName(pubsub_tcpHandler_t *handle,
//
// Setup thread priorities
//
-void pubsub_tcpHandler_setThreadPriority(pubsub_tcpHandler_t *handle, long prio,
+void pubsub_sktHandler_setThreadPriority(pubsub_sktHandler_t *handle, long prio,
const char *sched) {
if (handle == NULL)
return;
- // NOTE. Function will abort when performing a sched_setscheduler without
- // permission. As result permission has to be checked first.
- // TODO update this to use cap_get_pid and cap-get_flag instead of check user
- // is root (note adds dep to -lcap)
- bool gotPermission = false;
- if (getuid() == 0) {
- gotPermission = true;
- }
+
if (sched != NULL) {
int policy = SCHED_OTHER;
if (strncmp("SCHED_OTHER", sched, 16) == 0) {
@@ -699,24 +1039,23 @@ void pubsub_tcpHandler_setThreadPriority(pubsub_tcpHandler_t *handle, long prio,
} else if (strncmp("SCHED_RR", sched, 16) == 0) {
policy = SCHED_RR;
}
- if (gotPermission) {
- celixThreadRwlock_writeLock(&handle->dbLock);
- if (prio > 0 && prio < 100) {
- struct sched_param sch;
- bzero(&sch, sizeof(struct sched_param));
- sch.sched_priority = (int)prio;
- pthread_setschedparam(handle->thread.thread, policy, &sch);
- } else {
- L_INFO("Skipping configuration of thread prio to %i and thread "
- "scheduling to %s. No permission\n",
- (int) prio, sched);
- }
- celixThreadRwlock_unlock(&handle->dbLock);
+
+ celixThreadRwlock_writeLock(&handle->dbLock);
+ if (prio > 0 && prio < 100) {
+ struct sched_param sch;
+ bzero(&sch, sizeof(struct sched_param));
+ sch.sched_priority = (int)prio;
+ pthread_setschedparam(handle->thread.thread, policy, &sch);
+ } else {
+ L_INFO("Skipping configuration of thread prio to %i and thread "
+ "scheduling to %s. No permission\n",
+ (int) prio, sched);
}
+ celixThreadRwlock_unlock(&handle->dbLock);
}
}
-void pubsub_tcpHandler_setSendRetryCnt(pubsub_tcpHandler_t *handle, unsigned int count) {
+void pubsub_sktHandler_setSendRetryCnt(pubsub_sktHandler_t *handle, unsigned int count) {
if (handle != NULL) {
celixThreadRwlock_writeLock(&handle->dbLock);
handle->maxSendRetryCount = count;
@@ -724,7 +1063,7 @@ void pubsub_tcpHandler_setSendRetryCnt(pubsub_tcpHandler_t *handle, unsigned int
}
}
-void pubsub_tcpHandler_setReceiveRetryCnt(pubsub_tcpHandler_t *handle, unsigned int count) {
+void pubsub_sktHandler_setReceiveRetryCnt(pubsub_sktHandler_t *handle, unsigned int count) {
if (handle != NULL) {
celixThreadRwlock_writeLock(&handle->dbLock);
handle->maxRcvRetryCount = count;
@@ -732,7 +1071,7 @@ void pubsub_tcpHandler_setReceiveRetryCnt(pubsub_tcpHandler_t *handle, unsigned
}
}
-void pubsub_tcpHandler_setSendTimeOut(pubsub_tcpHandler_t *handle, double timeout) {
+void pubsub_sktHandler_setSendTimeOut(pubsub_sktHandler_t *handle, double timeout) {
if (handle != NULL) {
celixThreadRwlock_writeLock(&handle->dbLock);
handle->sendTimeout = timeout;
@@ -740,7 +1079,7 @@ void pubsub_tcpHandler_setSendTimeOut(pubsub_tcpHandler_t *handle, double timeou
}
}
-void pubsub_tcpHandler_setReceiveTimeOut(pubsub_tcpHandler_t *handle, double timeout) {
+void pubsub_sktHandler_setReceiveTimeOut(pubsub_sktHandler_t *handle, double timeout) {
if (handle != NULL) {
celixThreadRwlock_writeLock(&handle->dbLock);
handle->rcvTimeout = timeout;
@@ -748,7 +1087,7 @@ void pubsub_tcpHandler_setReceiveTimeOut(pubsub_tcpHandler_t *handle, double tim
}
}
-void pubsub_tcpHandler_enableReceiveEvent(pubsub_tcpHandler_t *handle,bool enable) {
+void pubsub_sktHandler_enableReceiveEvent(pubsub_sktHandler_t *handle,bool enable) {
if (handle != NULL) {
celixThreadRwlock_writeLock(&handle->dbLock);
handle->enableReceiveEvent = enable;
@@ -756,13 +1095,31 @@ void pubsub_tcpHandler_enableReceiveEvent(pubsub_tcpHandler_t *handle,bool enabl
}
}
-static inline long int pubsub_tcpHandler_getMsgSize(psa_tcp_connection_entry_t *entry) {
+
+bool pubsub_sktHandler_isPassive(const char* buffer) {
+ bool isPassive = false;
+ // Parse Properties
+ if (buffer != NULL) {
+ char buf[32];
+ snprintf(buf, 32, "%s", buffer);
+ char *trimmed = utils_stringTrim(buf);
+ if (strncasecmp("true", trimmed, strlen("true")) == 0) {
+ isPassive = true;
+ } else if (strncasecmp("false", trimmed, strlen("false")) == 0) {
+ isPassive = false;
+ }
+ }
+ return isPassive;
+}
+
+
+static inline long int pubsub_sktHandler_getMsgSize(psa_skt_connection_entry_t *entry) {
// Note header message is already read
return (long int)entry->header.header.payloadPartSize + (long int)entry->header.header.metadataSize + (long int)entry->readFooterSize;
}
static inline
-bool pubsub_tcpHandler_readHeader(pubsub_tcpHandler_t *handle, int fd, psa_tcp_connection_entry_t *entry, long int* msgSize) {
+bool pubsub_sktHandler_readHeader(pubsub_sktHandler_t *handle, int fd, psa_skt_connection_entry_t *entry, long int* msgSize) {
bool result = false;
size_t syncSize = 0;
size_t protocolHeaderBufferSize = 0;
@@ -774,8 +1131,10 @@ bool pubsub_tcpHandler_readHeader(pubsub_tcpHandler_t *handle, int fd, psa_tcp_c
handle->protocol->getHeaderBufferSize(handle->protocol->handle, &protocolHeaderBufferSize);
// Ensure capacity in header buffer
- pubsub_tcpHandler_ensureReadBufferCapacity(handle, entry);
+ pubsub_sktHandler_ensureReadBufferCapacity(handle, entry);
+ entry->readMsg.msg_name = &entry->readMsgAddr;
+ entry->readMsg.msg_namelen = entry->len;
entry->readMsg.msg_iovlen = 0;
entry->readMsg.msg_iov[entry->readMsg.msg_iovlen].iov_base = entry->readHeaderBuffer;
entry->readMsg.msg_iov[entry->readMsg.msg_iovlen].iov_len = entry->readHeaderBufferSize;
@@ -784,7 +1143,8 @@ bool pubsub_tcpHandler_readHeader(pubsub_tcpHandler_t *handle, int fd, psa_tcp_c
// Read the message
long int nbytes = 0;
// Use peek flag to find sync word or when header is part of the payload
- unsigned int flag = (entry->headerError || (!protocolHeaderBufferSize)) ? MSG_PEEK : 0;
+ bool isUdp = (entry->socket_type == SOCK_DGRAM) ? true : false;
+ unsigned int flag = (entry->headerError || (!protocolHeaderBufferSize) || isUdp) ? MSG_PEEK : 0;
if (entry->readHeaderSize) nbytes = recvmsg(fd, &(entry->readMsg), MSG_NOSIGNAL | MSG_WAITALL | flag);
if (nbytes >= entry->readHeaderSize) {
if (handle->protocol->decodeHeader(handle->protocol->handle,
@@ -799,7 +1159,7 @@ bool pubsub_tcpHandler_readHeader(pubsub_tcpHandler_t *handle, int fd, psa_tcp_c
// Did not receive correct header
// skip sync word and try to read next header
if (!entry->headerError) {
- L_WARN("[TCP Socket] Failed to decode message header (fd: %d) (url: %s)", entry->fd, entry->url);
+ L_WARN("[SKT Handler] Failed to decode message header (fd: %d) (url: %s)", entry->fd, entry->url);
}
entry->headerError = true;
entry->readMsg.msg_iovlen = 0;
@@ -814,7 +1174,7 @@ bool pubsub_tcpHandler_readHeader(pubsub_tcpHandler_t *handle, int fd, psa_tcp_c
}
-static inline void pubsub_tcpHandler_ensureReadBufferCapacity(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry) {
+static inline void pubsub_sktHandler_ensureReadBufferCapacity(pubsub_sktHandler_t *handle, psa_skt_connection_entry_t *entry) {
if (entry->readHeaderSize > entry->readHeaderBufferSize) {
free(entry->readHeaderBuffer);
entry->readHeaderBuffer = malloc((size_t) entry->readHeaderSize);
@@ -841,7 +1201,7 @@ static inline void pubsub_tcpHandler_ensureReadBufferCapacity(pubsub_tcpHandler_
}
static inline
-void pubsub_tcpHandler_decodePayload(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry) {
+void pubsub_sktHandler_decodePayload(pubsub_sktHandler_t *handle, psa_skt_connection_entry_t *entry) {
if (entry->header.header.payloadSize > 0) {
handle->protocol->decodePayload(handle->protocol->handle, entry->buffer, entry->header.header.payloadSize, &entry->header);
@@ -856,7 +1216,7 @@ void pubsub_tcpHandler_decodePayload(pubsub_tcpHandler_t *handle, psa_tcp_connec
bool releaseEntryBuffer = false;
handle->processMessageCallback(handle->processMessagePayload, &entry->header, &releaseEntryBuffer, &receiveTime);
if (releaseEntryBuffer) {
- pubsub_tcpHandler_releaseEntryBuffer(handle, entry->fd, 0);
+ pubsub_sktHandler_releaseEntryBuffer(handle, entry->fd, 0);
}
}
celix_properties_destroy(entry->header.metadata.metadata);
@@ -864,13 +1224,19 @@ void pubsub_tcpHandler_decodePayload(pubsub_tcpHandler_t *handle, psa_tcp_connec
}
static inline
-long int pubsub_tcpHandler_readPayload(pubsub_tcpHandler_t *handle, int fd, psa_tcp_connection_entry_t *entry) {
+long int pubsub_sktHandler_readPayload(pubsub_sktHandler_t *handle, int fd, psa_skt_connection_entry_t *entry) {
entry->readMsg.msg_iovlen = 0;
handle->protocol->getFooterSize(handle->protocol->handle, &entry->readFooterSize);
// from the header can be determined how large buffers should be. Even before receiving all data these buffers can be allocated
- pubsub_tcpHandler_ensureReadBufferCapacity(handle, entry);
+ pubsub_sktHandler_ensureReadBufferCapacity(handle, entry);
+ // Read UDP packet in one message
+ if (entry->readHeaderSize && (entry->socket_type == SOCK_DGRAM)) {
+ entry->readMsg.msg_iov[entry->readMsg.msg_iovlen].iov_base = entry->readHeaderBuffer;
+ entry->readMsg.msg_iov[entry->readMsg.msg_iovlen].iov_len = entry->readHeaderBufferSize;
+ entry->readMsg.msg_iovlen++;
+ }
if (entry->header.header.payloadPartSize) {
char* buffer = entry->buffer;
entry->readMsg.msg_iov[entry->readMsg.msg_iovlen].iov_base = &buffer[entry->header.header.payloadOffset];
@@ -890,12 +1256,12 @@ long int pubsub_tcpHandler_readPayload(pubsub_tcpHandler_t *handle, int fd, psa_
}
long int nbytes = recvmsg(fd, &(entry->readMsg), MSG_NOSIGNAL | MSG_WAITALL);
- if (nbytes >= pubsub_tcpHandler_getMsgSize(entry)) {
+ if (nbytes >= pubsub_sktHandler_getMsgSize(entry)) {
bool valid = true;
if (entry->readFooterSize) {
if (handle->protocol->decodeFooter(handle->protocol->handle, entry->readFooterBuffer, entry->readFooterBufferSize, &entry->header) != CELIX_SUCCESS) {
// Did not receive correct footer
- L_ERROR("[TCP Socket] Failed to decode message footer seq %d (received corrupt message, transmit buffer full?) (fd: %d) (url: %s)", entry->header.header.seqNr, entry->fd, entry->url);
+ L_ERROR("[SKT Handler] Failed to decode message footer seq %d (received corrupt message, transmit buffer full?) (fd: %d) (url: %s)", entry->header.header.seqNr, entry->fd, entry->url);
valid = false;
}
}
@@ -904,9 +1270,17 @@ long int pubsub_tcpHandler_readPayload(pubsub_tcpHandler_t *handle, int fd, psa_
valid = false;
}
+ if (entry->socket_type == SOCK_DGRAM && entry->readMsg.msg_name && !entry->dst_addr.sin_port) {
+ entry->dst_addr = entry->readMsgAddr;
+ psa_skt_connection_entry_t *connection_entry = hashMap_get(handle->connection_fd_map, (void *) (intptr_t) fd);
+ if (connection_entry != NULL) {
+ connection_entry->dst_addr = entry->readMsgAddr;;
+ }
+ }
+
if (valid) {
// Complete message is received
- pubsub_tcpHandler_decodePayload(handle, entry);
+ pubsub_sktHandler_decodePayload(handle, entry);
}
}
return nbytes;
@@ -916,9 +1290,9 @@ long int pubsub_tcpHandler_readPayload(pubsub_tcpHandler_t *handle, int fd, psa_
// Reads data from the filedescriptor which has date (determined by epoll()) and stores it in the internal structure
// If the message is completely reassembled true is returned and the index and size have valid values
//
-int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd) {
+int pubsub_sktHandler_read(pubsub_sktHandler_t *handle, int fd) {
celixThreadRwlock_readLock(&handle->dbLock);
- psa_tcp_connection_entry_t *entry = hashMap_get(handle->interface_fd_map, (void *) (intptr_t) fd);
+ psa_skt_connection_entry_t *entry = hashMap_get(handle->interface_fd_map, (void *) (intptr_t) fd);
if (entry == NULL) {
entry = hashMap_get(handle->connection_fd_map, (void *) (intptr_t) fd);
}
@@ -934,8 +1308,8 @@ int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd) {
}
long int nbytes = 0;
// if not yet enough bytes are received the header can not be read
- if (pubsub_tcpHandler_readHeader(handle, fd, entry, &nbytes)) {
- nbytes = pubsub_tcpHandler_readPayload(handle, fd, entry);
+ if (pubsub_sktHandler_readHeader(handle, fd, entry, &nbytes)) {
+ nbytes = pubsub_sktHandler_readPayload(handle, fd, entry);
}
if (nbytes > 0) {
entry->retryCount = 0;
@@ -946,10 +1320,10 @@ int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd) {
} else if (entry->retryCount < handle->maxRcvRetryCount) {
entry->retryCount++;
L_WARN(
- "[TCP Socket] Failed to receive message (fd: %d), try again. error(%d): %s, Retry count %u of %u.",
+ "[SKT Handler] Failed to receive message (fd: %d), try again. error(%d): %s, Retry count %u of %u.",
entry->fd, errno, strerror(errno), entry->retryCount, handle->maxSendRetryCount);
} else {
- L_ERROR("[TCP Socket] Failed to receive message (fd: %d) after %u retries! Closing connection... Error: %s",
+ L_ERROR("[SKT Handler] Failed to receive message (fd: %d) after %u retries! Closing connection... Error: %s",
entry->fd, handle->maxRcvRetryCount, strerror(errno));
nbytes = 0; //Return 0 as indicator to close the connection
}
@@ -958,8 +1332,8 @@ int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd) {
return (int)nbytes;
}
-int pubsub_tcpHandler_addMessageHandler(pubsub_tcpHandler_t *handle, void *payload,
- pubsub_tcpHandler_processMessage_callback_t processMessageCallback) {
+int pubsub_sktHandler_addMessageHandler(pubsub_sktHandler_t *handle, void *payload,
+ pubsub_sktHandler_processMessage_callback_t processMessageCallback) {
int result = 0;
celixThreadRwlock_writeLock(&handle->dbLock);
handle->processMessageCallback = processMessageCallback;
@@ -968,9 +1342,9 @@ int pubsub_tcpHandler_addMessageHandler(pubsub_tcpHandler_t *handle, void *paylo
return result;
}
-int pubsub_tcpHandler_addReceiverConnectionCallback(pubsub_tcpHandler_t *handle, void *payload,
- pubsub_tcpHandler_receiverConnectMessage_callback_t connectMessageCallback,
- pubsub_tcpHandler_receiverConnectMessage_callback_t disconnectMessageCallback) {
+int pubsub_sktHandler_addReceiverConnectionCallback(pubsub_sktHandler_t *handle, void *payload,
+ pubsub_sktHandler_receiverConnectMessage_callback_t connectMessageCallback,
+ pubsub_sktHandler_receiverConnectMessage_callback_t disconnectMessageCallback) {
int result = 0;
celixThreadRwlock_writeLock(&handle->dbLock);
handle->receiverConnectMessageCallback = connectMessageCallback;
@@ -981,9 +1355,9 @@ int pubsub_tcpHandler_addReceiverConnectionCallback(pubsub_tcpHandler_t *handle,
}
//
-// Write large data to TCP. .
+// Write large data to socket .
//
-int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message_t *message, struct iovec *msgIoVec,
+int pubsub_sktHandler_write(pubsub_sktHandler_t *handle, pubsub_protocol_message_t *message, struct iovec *msgIoVec,
size_t msg_iov_len, int flags) {
int result = 0;
if (handle == NULL) {
@@ -994,9 +1368,8 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
if (handle) {
celixThreadRwlock_readLock(&handle->dbLock);
hash_map_iterator_t iter = hashMapIterator_construct(handle->connection_fd_map);
- size_t max_msg_iov_len = IOV_MAX - 2; // header , footer, padding
while (hashMapIterator_hasNext(&iter)) {
- psa_tcp_connection_entry_t *entry = hashMapIterator_nextValue(&iter);
+ psa_skt_connection_entry_t *entry = hashMapIterator_nextValue(&iter);
if (!__atomic_load_n(&entry->connected, __ATOMIC_ACQUIRE)) {
continue;
}
@@ -1016,11 +1389,21 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
}
}
+ size_t protocolHeaderBufferSize = 0;
+ // Get HeaderBufferSize of the Protocol Header, when headerBufferSize == 0, the protocol header is included in the payload (needed for endpoints)
+ handle->protocol->getHeaderBufferSize(handle->protocol->handle, &protocolHeaderBufferSize);
+ size_t footerSize = 0;
+ // Get size of the Protocol Footer
+ handle->protocol->getFooterSize(handle->protocol->handle, &footerSize);
+ size_t max_msg_iov_len = IOV_MAX; // header , footer, padding
+ max_msg_iov_len -= (protocolHeaderBufferSize) ? 1 : 0;
+ max_msg_iov_len -= (footerSize) ? 1 : 0;
+
// check if message is not too large
bool isMessageSegmentationSupported = false;
handle->protocol->isMessageSegmentationSupported(handle->protocol->handle, &isMessageSegmentationSupported);
if (!isMessageSegmentationSupported && (msg_iov_len > max_msg_iov_len || payloadSize > entry->maxMsgSize)) {
- L_WARN("[TCP Socket] Failed to send message (fd: %d), Message segmentation is not supported\n", entry->fd);
+ L_WARN("[SKT Handler] Failed to send message (fd: %d), Message segmentation is not supported\n", entry->fd);
celixThreadMutex_unlock(&entry->writeMutex);
continue;
}
@@ -1034,15 +1417,14 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
void *metadataData = NULL;
size_t metadataSize = 0;
if (message->metadata.metadata) {
- metadataSize = entry->writeMetaBufferSize;
metadataData = entry->writeMetaBuffer;
- // When maxMsgSize is smaller then meta data is disabled
+ metadataSize = entry->writeMetaBufferSize;
+ // When maxMsgSize is smaller than meta data is disabled
if (metadataSize > entry->maxMsgSize) {
metadataSize = 0;
}
handle->protocol->encodeMetadata(handle->protocol->handle, message, &metadataData, &metadataSize);
}
-
message->header.metadataSize = metadataSize;
size_t totalMsgSize = payloadSize + metadataSize;
@@ -1055,7 +1437,7 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
struct msghdr msg;
struct iovec msg_iov[IOV_MAX];
memset(&msg, 0x00, sizeof(struct msghdr));
- msg.msg_name = &entry->addr;
+ msg.msg_name = &entry->dst_addr;
msg.msg_namelen = entry->len;
msg.msg_flags = flags;
msg.msg_iov = msg_iov;
@@ -1065,13 +1447,6 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
message->header.payloadOffset = 0;
message->header.metadataSize = 0;
message->header.isLastSegment = 0;
-
- size_t protocolHeaderBufferSize = 0;
- // Get HeaderBufferSize of the Protocol Header, when headerBufferSize == 0, the protocol header is included in the payload (needed for endpoints)
- handle->protocol->getHeaderBufferSize(handle->protocol->handle, &protocolHeaderBufferSize);
- size_t footerSize = 0;
- // Get size of the Protocol Footer
- handle->protocol->getFooterSize(handle->protocol->handle, &footerSize);
size_t maxMsgSize = entry->maxMsgSize - protocolHeaderBufferSize - footerSize;
// reserve space for the header if required, header is added later when size of message is known (message can split in parts)
@@ -1094,6 +1469,10 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
if ((msgPartSize + msgIoVec[i].iov_len) > maxMsgSize) {
break;
}
+ // When iov_len is zero, skip item and send next item
+ if (!msgIoVec[i].iov_len) {
+ msgIovOffset = ++i;
+ }
msg.msg_iov[msg.msg_iovlen].iov_base = msgIoVec[i].iov_base;
msg.msg_iov[msg.msg_iovlen].iov_len = msgIoVec[i].iov_len;
msgPartSize += msg.msg_iov[msg.msg_iovlen].iov_len;
@@ -1102,7 +1481,7 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
// if no entry could be added
if (i == msgIovOffset) {
// TODO element can be split in parts?
- L_ERROR("[TCP Socket] vector io element is larger than max msg size");
+ L_ERROR("[SKT Handler] vector io element is larger than max msg size");
break;
}
msgIovOffset = i;
@@ -1150,7 +1529,7 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
msg.msg_iov[0].iov_len = headerSize;
msgPartSize += msg.msg_iov[0].iov_len;
} else {
- L_ERROR("[TCP Socket] No header buffer is generated");
+ L_ERROR("[SKT Handler] No header buffer is generated");
break;
}
}
@@ -1181,27 +1560,24 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
if (entry->retryCount < handle->maxSendRetryCount) {
entry->retryCount++;
L_ERROR(
- "[TCP Socket] Failed to send message (fd: %d), try again. Retry count %u of %u, error(%d): %s.",
+ "[SKT Handler] Failed to send message (fd: %d), try again. Retry count %u of %u, error(%d): %s.",
entry->fd, entry->retryCount, handle->maxSendRetryCount, errno, strerror(errno));
} else {
L_ERROR(
- "[TCP Socket] Failed to send message (fd: %d) after %u retries! Closing connection... Error: %s", entry->fd, handle->maxSendRetryCount, strerror(errno));
+ "[SKT Handler] Failed to send message (fd: %d) after %u retries! Closing connection... Error: %s", entry->fd, handle->maxSendRetryCount, strerror(errno));
connFdCloseQueue[nofConnToClose++] = entry->fd;
}
result = -1; //At least one connection failed sending
} else if (msgPartSize) {
entry->retryCount = 0;
if (nbytes != msgPartSize) {
- L_ERROR("[TCP Socket] seq: %d MsgSize not correct: %d != %d (%s)\n", message->header.seqNr, msgPartSize, nbytes, strerror(errno));
+ L_ERROR("[SKT Handler] seq: %d MsgSize not correct: %d != %d (%s)\n", message->header.seqNr, msgPartSize, nbytes, strerror(errno));
}
}
// Note: serialized Payload is deleted by serializer
if (payloadData && (payloadData != message->payload.payload)) {
free(payloadData);
}
- if (metadataData && metadataSize > 0) {
- free(metadataData);
- }
}
celixThreadMutex_unlock(&entry->writeMutex);
}
@@ -1209,7 +1585,7 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
}
//Force close all connections that are queued in a list, done outside of locking handle->dbLock to prevent deadlock
for (int i = 0; i < nofConnToClose; i++) {
- pubsub_tcpHandler_close(handle, connFdCloseQueue[i]);
+ pubsub_sktHandler_close(handle, connFdCloseQueue[i]);
}
return result;
}
@@ -1217,12 +1593,12 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
//
// get interface URL
//
-char *pubsub_tcpHandler_get_interface_url(pubsub_tcpHandler_t *handle) {
+char *pubsub_sktHandler_get_interface_url(pubsub_sktHandler_t *handle) {
hash_map_iterator_t iter =
hashMapIterator_construct(handle->interface_url_map);
char *url = NULL;
while (hashMapIterator_hasNext(&iter)) {
- psa_tcp_connection_entry_t *entry =
+ psa_skt_connection_entry_t *entry =
hashMapIterator_nextValue(&iter);
if (entry && entry->url) {
if (!url) {
@@ -1236,16 +1612,17 @@ char *pubsub_tcpHandler_get_interface_url(pubsub_tcpHandler_t *handle) {
}
return url;
}
+
//
// get interface URL
//
-char *pubsub_tcpHandler_get_connection_url(pubsub_tcpHandler_t *handle) {
+char *pubsub_sktHandler_get_connection_url(pubsub_sktHandler_t *handle) {
celixThreadRwlock_writeLock(&handle->dbLock);
hash_map_iterator_t iter =
hashMapIterator_construct(handle->connection_url_map);
char *url = NULL;
while (hashMapIterator_hasNext(&iter)) {
- psa_tcp_connection_entry_t *entry =
+ psa_skt_connection_entry_t *entry =
hashMapIterator_nextValue(&iter);
if (entry && entry->url) {
if (!url) {
@@ -1266,10 +1643,33 @@ char *pubsub_tcpHandler_get_connection_url(pubsub_tcpHandler_t *handle) {
}
//
+// get interface URL
+//
+void pubsub_sktHandler_get_connection_urls(pubsub_sktHandler_t *handle, celix_array_list_t *urls) {
+ celixThreadRwlock_writeLock(&handle->dbLock);
+ hash_map_iterator_t iter =
+ hashMapIterator_construct(handle->connection_url_map);
+ char *url = NULL;
+ while (hashMapIterator_hasNext(&iter)) {
+ psa_skt_connection_entry_t *entry =
+ hashMapIterator_nextValue(&iter);
+ if (entry && entry->url) {
+ asprintf(&url, "%s", entry->url);
+ celix_arrayList_add(urls, url);
+ free(url);
+ url = NULL;
+ }
+ }
+ celixThreadRwlock_unlock(&handle->dbLock);
+}
+
+
+
+//
// Handle non-blocking accept (sender)
//
static inline
-int pubsub_tcpHandler_acceptHandler(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *pendingConnectionEntry) {
+int pubsub_sktHandler_acceptHandler(pubsub_sktHandler_t *handle, psa_skt_connection_entry_t *pendingConnectionEntry) {
celixThreadRwlock_writeLock(&handle->dbLock);
// new connection available
struct sockaddr_in their_addr;
@@ -1277,7 +1677,7 @@ int pubsub_tcpHandler_acceptHandler(pubsub_tcpHandler_t *handle, psa_tcp_connect
int fd = accept(pendingConnectionEntry->fd, (struct sockaddr*)&their_addr, &len);
int rc = fd;
if (rc == -1) {
- L_ERROR("[TCP Socket] accept failed: %s\n", strerror(errno));
+ L_ERROR("[TCP SKT Handler] accept failed: %s\n", strerror(errno));
}
if (rc >= 0) {
// handle new connection:
@@ -1285,7 +1685,7 @@ int pubsub_tcpHandler_acceptHandler(pubsub_tcpHandler_t *handle, psa_tcp_connect
getsockname(pendingConnectionEntry->fd, (struct sockaddr *) &sin, &len);
char *interface_url = pubsub_utils_url_get_url(&sin, NULL);
char *url = pubsub_utils_url_get_url(&their_addr, NULL);
- psa_tcp_connection_entry_t *entry = pubsub_tcpHandler_createEntry(handle, fd, url, interface_url, &their_addr);
+ psa_skt_connection_entry_t *entry = pubsub_sktHandler_createEntry(handle, fd, url, interface_url, &their_addr);
#if defined(__APPLE__)
struct kevent ev;
EV_SET (&ev, entry->fd, EVFILT_READ, EV_ADD | EV_ENABLE , 0, 0, 0);
@@ -1300,16 +1700,16 @@ int pubsub_tcpHandler_acceptHandler(pubsub_tcpHandler_t *handle, psa_tcp_connect
rc = epoll_ctl(handle->efd, EPOLL_CTL_ADD, entry->fd, &event);
#endif
if (rc < 0) {
- pubsub_tcpHandler_freeEntry(entry);
+ pubsub_sktHandler_freeEntry(entry);
free(entry);
- L_ERROR("[TCP Socket] Cannot create epoll\n");
+ L_ERROR("[TCP SKT Handler] Cannot create epoll\n");
} else {
// Call Accept Connection callback
if (handle->acceptConnectMessageCallback)
handle->acceptConnectMessageCallback(handle->acceptConnectPayload, url);
hashMap_put(handle->connection_fd_map, (void *) (intptr_t) entry->fd, entry);
hashMap_put(handle->connection_url_map, entry->url, entry);
- L_INFO("[TCP Socket] New connection to url: %s: \n", url);
+ L_INFO("[TCP SKT Handler] New connection to url: %s: \n", url);
}
free(url);
free(interface_url);
@@ -1322,9 +1722,9 @@ int pubsub_tcpHandler_acceptHandler(pubsub_tcpHandler_t *handle, psa_tcp_connect
// Handle sockets connection (sender)
//
static inline
-void pubsub_tcpHandler_connectionHandler(pubsub_tcpHandler_t *handle, int fd) {
+void pubsub_sktHandler_connectionHandler(pubsub_sktHandler_t *handle, int fd) {
celixThreadRwlock_readLock(&handle->dbLock);
- psa_tcp_connection_entry_t *entry = hashMap_get(handle->connection_fd_map, (void *) (intptr_t) fd);
+ psa_skt_connection_entry_t *entry = hashMap_get(handle->connection_fd_map, (void *) (intptr_t) fd);
if (entry)
if (!__atomic_exchange_n(&entry->connected, true, __ATOMIC_ACQ_REL)) {
// tell sender that an receiver is connected
@@ -1339,7 +1739,7 @@ void pubsub_tcpHandler_connectionHandler(pubsub_tcpHandler_t *handle, int fd) {
// The main socket event loop
//
static inline
-void pubsub_tcpHandler_handler(pubsub_tcpHandler_t *handle) {
+void pubsub_sktHandler_handler(pubsub_sktHandler_t *handle) {
int rc = 0;
if (handle->efd >= 0) {
int nof_events = 0;
@@ -1350,34 +1750,34 @@ void pubsub_tcpHandler_handler(pubsub_tcpHandler_t *handle) {
if (nof_events < 0) {
if ((errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)) {
} else
- L_ERROR("[TCP Socket] Cannot create poll wait (%d) %s\n", nof_events, strerror(errno));
+ L_ERROR("[SKT Handler] Cannot create poll wait (%d) %s\n", nof_events, strerror(errno));
}
for (int i = 0; i < nof_events; i++) {
hash_map_iterator_t iter = hashMapIterator_construct(handle->interface_fd_map);
- psa_tcp_connection_entry_t *pendingConnectionEntry = NULL;
+ psa_skt_connection_entry_t *pendingConnectionEntry = NULL;
while (hashMapIterator_hasNext(&iter)) {
- psa_tcp_connection_entry_t *entry = hashMapIterator_nextValue(&iter);
+ psa_skt_connection_entry_t *entry = hashMapIterator_nextValue(&iter);
if (events[i].ident == entry->fd)
pendingConnectionEntry = entry;
}
if (pendingConnectionEntry) {
- int fd = pubsub_tcpHandler_acceptHandler(handle, pendingConnectionEntry);
- pubsub_tcpHandler_connectionHandler(handle, fd);
+ int fd = pubsub_sktHandler_acceptHandler(handle, pendingConnectionEntry);
+ pubsub_sktHandler_connectionHandler(handle, fd);
} else if (events[i].filter & EVFILT_READ) {
- int rc = pubsub_tcpHandler_read(handle, events[i].ident);
- if (rc == 0) pubsub_tcpHandler_close(handle, events[i].ident);
+ int rc = pubsub_sktHandler_read(handle, events[i].ident);
+ if (rc == 0) pubsub_sktHandler_close(handle, events[i].ident);
} else if (events[i].flags & EV_EOF) {
int err = 0;
socklen_t len = sizeof(int);
rc = getsockopt(events[i].ident, SOL_SOCKET, SO_ERROR, &err, &len);
if (rc != 0) {
- L_ERROR("[TCP Socket]:EPOLLRDHUP ERROR read from socket %s\n", strerror(errno));
+ L_ERROR("[SKT Handler]:EPOLLRDHUP ERROR read from socket %s\n", strerror(errno));
continue;
}
- pubsub_tcpHandler_close(handle, events[i].ident);
+ pubsub_sktHandler_close(handle, events[i].ident);
} else if (events[i].flags & EV_ERROR) {
- L_ERROR("[TCP Socket]:EPOLLERR ERROR read from socket %s\n", strerror(errno));
- pubsub_tcpHandler_close(handle, events[i].ident);
+ L_ERROR("[SKT Handler]:EPOLLERR ERROR read from socket %s\n", strerror(errno));
+ pubsub_sktHandler_close(handle, events[i].ident);
continue;
}
}
@@ -1391,43 +1791,34 @@ void pubsub_tcpHandler_handler(pubsub_tcpHandler_t *handle) {
// The main socket event loop
//
static inline
-void pubsub_tcpHandler_handler(pubsub_tcpHandler_t *handle) {
+void pubsub_sktHandler_handler(pubsub_sktHandler_t *handle) {
int rc = 0;
if (handle->efd >= 0) {
int nof_events = 0;
struct epoll_event events[MAX_EVENTS];
nof_events = epoll_wait(handle->efd, events, MAX_EVENTS, (int)handle->timeout);
- if (nof_events < 0) {
- if ((errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)) {
- } else
- L_ERROR("[TCP Socket] Cannot create epoll wait (%d) %s\n", nof_events, strerror(errno));
- }
+ if ((nof_events < 0) && (!((errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR))))
+ L_ERROR("[SKT Socket] Cannot create epoll wait (%d) %s\n", nof_events, strerror(errno));
for (int i = 0; i < nof_events; i++) {
- hash_map_iterator_t iter = hashMapIterator_construct(handle->interface_fd_map);
- psa_tcp_connection_entry_t *pendingConnectionEntry = NULL;
- while (hashMapIterator_hasNext(&iter)) {
- psa_tcp_connection_entry_t *entry = hashMapIterator_nextValue(&iter);
- if (events[i].data.fd == entry->fd)
- pendingConnectionEntry = entry;
- }
- if (pendingConnectionEntry) {
- int fd = pubsub_tcpHandler_acceptHandler(handle, pendingConnectionEntry);
- pubsub_tcpHandler_connectionHandler(handle, fd);
+ psa_skt_connection_entry_t *entry = hashMap_get(handle->interface_fd_map, (void *) (intptr_t) events[i].data.fd );
+ if (entry && (entry->socket_type == SOCK_STREAM)) {
+ int fd = pubsub_sktHandler_acceptHandler(handle, entry);
+ pubsub_sktHandler_connectionHandler(handle, fd);
} else if (events[i].events & EPOLLIN) {
- rc = pubsub_tcpHandler_read(handle, events[i].data.fd);
- if (rc == 0) pubsub_tcpHandler_close(handle, events[i].data.fd);
+ rc = pubsub_sktHandler_read(handle, events[i].data.fd);
+ if (rc == 0) pubsub_sktHandler_close(handle, events[i].data.fd);
} else if (events[i].events & EPOLLRDHUP) {
int err = 0;
socklen_t len = sizeof(int);
rc = getsockopt(events[i].data.fd, SOL_SOCKET, SO_ERROR, &err, &len);
if (rc != 0) {
- L_ERROR("[TCP Socket]:EPOLLRDHUP ERROR read from socket %s\n", strerror(errno));
+ L_ERROR("[SKT Handler]:EPOLLRDHUP ERROR read from socket %s\n", strerror(errno));
continue;
}
- pubsub_tcpHandler_close(handle, events[i].data.fd);
+ pubsub_sktHandler_close(handle, events[i].data.fd);
} else if (events[i].events & EPOLLERR) {
- L_ERROR("[TCP Socket]:EPOLLERR ERROR read from socket %s\n", strerror(errno));
- pubsub_tcpHandler_close(handle, events[i].data.fd);
+ L_ERROR("[SKT Handler]:EPOLLERR ERROR read from socket %s\n", strerror(errno));
+ pubsub_sktHandler_close(handle, events[i].data.fd);
continue;
}
}
@@ -1438,14 +1829,14 @@ void pubsub_tcpHandler_handler(pubsub_tcpHandler_t *handle) {
//
// The socket thread
//
-static void *pubsub_tcpHandler_thread(void *data) {
- pubsub_tcpHandler_t *handle = data;
+static void *pubsub_sktHandler_thread(void *data) {
+ pubsub_sktHandler_t *handle = data;
celixThreadRwlock_readLock(&handle->dbLock);
bool running = handle->running;
celixThreadRwlock_unlock(&handle->dbLock);
while (running) {
- pubsub_tcpHandler_handler(handle);
+ pubsub_sktHandler_handler(handle);
celixThreadRwlock_readLock(&handle->dbLock);
running = handle->running;
celixThreadRwlock_unlock(&handle->dbLock);
diff --git a/bundles/pubsub/pubsub_utils/src/pubsub_utils_url.c b/bundles/pubsub/pubsub_utils/src/pubsub_utils_url.c
index 65a1ff2..c4cec1a 100644
--- a/bundles/pubsub/pubsub_utils/src/pubsub_utils_url.c
+++ b/bundles/pubsub/pubsub_utils/src/pubsub_utils_url.c
@@ -115,14 +115,19 @@ bool pubsub_utils_url_is_multicast(char *hostname) {
return (ntohl(inet_addr(hostname)) >= ntohl(inet_addr("224.0.0.0"))) ? true : false;
}
+bool pubsub_utils_url_is_broadcast(char *hostname) {
+ return (((inet_addr("0.0.0.255") & inet_addr(hostname)) == inet_addr("0.0.0.255"))) ? true : false;
+}
+
/** Finds an IP of the available network interfaces of the machine by specifying an CIDR subnet.
*
- * @param ipWithPrefix IP with prefix, e.g. 192.168.1.0/24
+ * @param hostname IP with prefix, e.g. 200.100.0.0/24
+ * @param intf_addr interface addr
* @return ip In case a matching interface could be found, an allocated string containing the IP of the
- * interface will be returned, e.g. 192.168.1.16. Memory for the new string can be freed with free().
+ * interface will be returned, e.g. 200.100.0.16. Memory for the new string can be freed with free().
* When no matching interface is found NULL will be returned.
*/
-char *pubsub_utils_url_get_multicast_ip(char *hostname) {
+char* pubsub_utils_url_get_multicast_ip(char *hostname, in_addr_t* intf_addr) {
char *ip = NULL;
if (hostname) {
char *subNet = strchr(hostname, '/');
@@ -144,8 +149,14 @@ char *pubsub_utils_url_get_multicast_ip(char *hostname) {
unsigned int ipAsUint = ntohl(address.sin_addr.s_addr);
unsigned int bitmask = ipUtils_prefixToBitmask(inputPrefix);
unsigned int ipRangeStart = ipAsUint & bitmask;
- unsigned int ipRangeStop = ipAsUint | ~bitmask;
- unsigned int addr = pubsub_utils_url_rand_range(ipRangeStart, ipRangeStop);
+ unsigned int addr = 0;
+ if (intf_addr && *intf_addr) {
+ unsigned int intfIpAsUint = ntohl(*intf_addr);
+ addr = ipRangeStart + (intfIpAsUint & ~bitmask);
+ } else {
+ unsigned int ipRangeStop = ipAsUint | ~bitmask;
+ addr = pubsub_utils_url_rand_range(ipRangeStart, ipRangeStop);
+ }
address.sin_addr.s_addr = htonl(addr);
ip = pubsub_utils_url_get_url(&address, NULL);
}
@@ -153,7 +164,7 @@ char *pubsub_utils_url_get_multicast_ip(char *hostname) {
return ip;
}
-char *pubsub_utils_url_get_ip(char *_hostname) {
+char *pubsub_utils_url_get_ip(char *_hostname, in_addr_t* intf_addr) {
char *ip = NULL;
if (_hostname) {
char *subNet = strstr(_hostname, "/");
@@ -163,7 +174,7 @@ char *pubsub_utils_url_get_ip(char *_hostname) {
if ((length > 1) && isdigit(subNet[1])) {
bool is_multicast = pubsub_utils_url_is_multicast(hostname);
if (is_multicast)
- ip = pubsub_utils_url_get_multicast_ip(_hostname);
+ ip = pubsub_utils_url_get_multicast_ip(_hostname, intf_addr);
else
ip = ipUtils_findIpBySubnet(_hostname);
if (ip == NULL)
@@ -284,7 +295,7 @@ pubsub_utils_url_t *pubsub_utils_url_parse(char *url) {
if (url_info->interface) {
pubsub_utils_url_t interface_url_info;
bzero(&interface_url_info, sizeof(pubsub_utils_url_t));
- char *ip = pubsub_utils_url_get_ip(url_info->interface);
+ char *ip = pubsub_utils_url_get_ip(url_info->interface, NULL);
if (ip != NULL) {
free(url_info->interface);
url_info->interface = ip;
@@ -299,9 +310,10 @@ pubsub_utils_url_t *pubsub_utils_url_parse(char *url) {
}
if (url_info->hostname) {
- if (url_info->url)
- free(url_info->url);
- char *ip = pubsub_utils_url_get_ip(url_info->hostname);
+ if (url_info->url) free(url_info->url);
+ struct sockaddr_in *m_sin = pubsub_utils_url_getInAddr(url_info->interface, url_info->interface_port_nr);
+ char *ip = pubsub_utils_url_get_ip(url_info->hostname, &m_sin->sin_addr.s_addr);
+ free(m_sin);
if (ip != NULL) {
free(url_info->hostname);
url_info->hostname = ip;