You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by pn...@apache.org on 2018/11/06 11:44:45 UTC
[3/3] celix git commit: CELIX-454: Refactors PubSub API. Multipart is
no longer part of the current API.
CELIX-454: Refactors PubSub API. Multipart is no longer part of the current API.
Multipart support can come back, but then in seperated service (i.e. pubsub_multipart_subscriber / pubsub_multipart_publisher
Also
- remove multipart examples
- Add optional init function to the subscriber api, making it possible for subscriber to tweak the receiver thread
- Initial support for static zmq bind,discover and connect urls to use for (among others) testing
Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/b2548c84
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/b2548c84
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/b2548c84
Branch: refs/heads/feature/CELIX-454-pubsub-disc
Commit: b2548c84502e389b4e1565b9895e16459e9ef886
Parents: 68f69f8
Author: Pepijn Noltes <pe...@gmail.com>
Authored: Tue Nov 6 12:41:44 2018 +0100
Committer: Pepijn Noltes <pe...@gmail.com>
Committed: Tue Nov 6 12:41:44 2018 +0100
----------------------------------------------------------------------
bundles/pubsub/examples/CMakeLists.txt | 40 -
.../pubsub/examples/mp_pubsub/CMakeLists.txt | 23 -
.../examples/mp_pubsub/common/include/ew.h | 53 -
.../examples/mp_pubsub/common/include/ide.h | 49 -
.../mp_pubsub/common/include/kinematics.h | 55 -
.../mp_pubsub/msg_descriptors/msg_ew.descriptor | 9 -
.../msg_descriptors/msg_ide.descriptor | 9 -
.../msg_descriptors/msg_kinematics.descriptor | 10 -
.../examples/mp_pubsub/publisher/CMakeLists.txt | 43 -
.../private/include/mp_publisher_private.h | 58 -
.../publisher/private/src/mp_pub_activator.c | 144 --
.../publisher/private/src/mp_publisher.c | 160 --
.../mp_pubsub/subscriber/CMakeLists.txt | 43 -
.../private/include/mp_subscriber_private.h | 50 -
.../subscriber/private/src/mp_sub_activator.c | 108 --
.../subscriber/private/src/mp_subscriber.c | 119 --
.../private/include/pubsub_subscriber_private.h | 2 +-
.../subscriber/private/src/pubsub_subscriber.c | 2 +-
bundles/pubsub/mock/src/publisher_mock.cc | 14 -
.../pubsub_admin_udp_mc/src/psa_activator.c | 6 +-
.../src/pubsub_udpmc_admin.c | 12 +-
.../src/pubsub_udpmc_admin.h | 8 +-
.../src/pubsub_udpmc_topic_receiver.c | 7 +-
.../src/pubsub_udpmc_topic_sender.c | 10 +-
.../pubsub/pubsub_admin_zmq/src/psa_activator.c | 6 +-
.../pubsub_admin_zmq/src/pubsub_zmq_admin.c | 38 +-
.../pubsub_admin_zmq/src/pubsub_zmq_admin.h | 36 +-
.../src/pubsub_zmq_topic_receiver.c | 121 +-
.../src/pubsub_zmq_topic_receiver.h | 1 +
.../src/pubsub_zmq_topic_sender.c | 49 +-
.../src/pubsub_zmq_topic_sender.h | 1 +
.../pubsub_api/include/pubsub/publisher.h | 21 +-
.../pubsub_api/include/pubsub/subscriber.h | 20 +-
.../pubsub/pubsub_discovery/src/psd_activator.c | 2 +-
.../src/pubsub_discovery_impl.c | 2 +-
.../src/pubsub_discovery_impl.h | 2 +-
.../pubsub/pubsub_spi/include/pubsub_admin.h | 18 +-
.../pubsub_spi/include/pubsub_listeners.h | 7 +-
.../pubsub/pubsub_spi/include/pubsub_utils.h | 2 +
.../pubsub/pubsub_spi/src/pubsub_utils_match.c | 12 +-
.../src/pubsub_topology_manager.c | 1407 ++++++++++--------
.../src/pubsub_topology_manager.h | 2 +-
libs/utils/src/properties.c | 5 +-
43 files changed, 1009 insertions(+), 1777 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/examples/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/bundles/pubsub/examples/CMakeLists.txt b/bundles/pubsub/examples/CMakeLists.txt
index e8113b9..2e43e6d 100644
--- a/bundles/pubsub/examples/CMakeLists.txt
+++ b/bundles/pubsub/examples/CMakeLists.txt
@@ -16,7 +16,6 @@
# under the License.
add_subdirectory(pubsub)
-add_subdirectory(mp_pubsub)
find_program(ETCD_CMD NAMES etcd)
find_program(XTERM_CMD NAMES xterm)
@@ -198,33 +197,6 @@ if (BUILD_PUBSUB_PSA_ZMQ)
)
target_link_libraries(pubsub_subscriber2_zmq PRIVATE ${PUBSUB_CONTAINER_LIBS})
- # ZMQ Multipart
- add_celix_container("pubsub_mp_subscriber_zmq"
- GROUP "pubsub"
- BUNDLES
- Celix::shell
- Celix::shell_tui
- Celix::pubsub_serializer_json
- Celix::pubsub_discovery_etcd
- Celix::pubsub_topology_manager
- Celix::pubsub_admin_zmq
- org.apache.celix.pubsub_subscriber.MpSubscriber
- )
- target_link_libraries(pubsub_mp_subscriber_zmq PRIVATE ${PUBSUB_CONTAINER_LIBS})
-
- add_celix_container("pubsub_mp_publisher_zmq"
- GROUP "pubsub"
- BUNDLES
- Celix::shell
- Celix::shell_tui
- Celix::pubsub_serializer_json
- Celix::pubsub_discovery_etcd
- Celix::pubsub_topology_manager
- Celix::pubsub_admin_zmq
- org.apache.celix.pubsub_publisher.MpPublisher
- )
- target_link_libraries(pubsub_mp_publisher_zmq PRIVATE ${PUBSUB_CONTAINER_LIBS})
-
if (ETCD_CMD AND XTERM_CMD)
#Runtime starting two bundles using both zmq and upd mc pubsub
add_celix_runtime(pubsub_rt_zmq_udpmc_combi
@@ -251,18 +223,6 @@ if (BUILD_PUBSUB_PSA_ZMQ)
etcd
USE_TERM
)
-
- #Runtime starting a multipart (multiple message in one send) publish and subscriber for zmq
- add_celix_runtime(pubsub_rt_multipart_zmq
- NAME zmq_multipart
- GROUP pubsub
- CONTAINERS
- pubsub_mp_subscriber_zmq
- pubsub_mp_publisher_zmq
- COMMANDS
- etcd
- USE_TERM
- )
endif ()
endif()
http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/examples/mp_pubsub/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/bundles/pubsub/examples/mp_pubsub/CMakeLists.txt b/bundles/pubsub/examples/mp_pubsub/CMakeLists.txt
deleted file mode 100644
index c828832..0000000
--- a/bundles/pubsub/examples/mp_pubsub/CMakeLists.txt
+++ /dev/null
@@ -1,23 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-include_directories("common/include")
-
-add_subdirectory(publisher)
-add_subdirectory(subscriber)
-
-
http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/examples/mp_pubsub/common/include/ew.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/examples/mp_pubsub/common/include/ew.h b/bundles/pubsub/examples/mp_pubsub/common/include/ew.h
deleted file mode 100644
index 81ca5f3..0000000
--- a/bundles/pubsub/examples/mp_pubsub/common/include/ew.h
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements. See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership. The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * ew.h
- *
- * \date Jan 15, 2016
- * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- * \copyright Apache License, Version 2.0
- */
-
-#ifndef EW_H_
-#define EW_H_
-
-#define MIN_AREA 50.0F
-#define MAX_AREA 15000.0F
-
-#define MSG_EW_NAME "ew" //Has to match the message name in the msg descriptor!
-
-typedef enum color{
- GREEN,
- BLUE,
- RED,
- BLACK,
- WHITE,
- LAST_COLOR
-} color_e;
-
-const char* color_tostring[] = {"GREEN","BLUE","RED","BLACK","WHITE"};
-
-struct ew_data{
- double area;
- color_e color;
-};
-
-typedef struct ew_data* ew_data_pt;
-
-#endif /* EW_H_ */
http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/examples/mp_pubsub/common/include/ide.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/examples/mp_pubsub/common/include/ide.h b/bundles/pubsub/examples/mp_pubsub/common/include/ide.h
deleted file mode 100644
index 2b9588d..0000000
--- a/bundles/pubsub/examples/mp_pubsub/common/include/ide.h
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements. See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership. The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * ide.h
- *
- * \date Jan 15, 2016
- * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- * \copyright Apache License, Version 2.0
- */
-
-#ifndef IDE_H_
-#define IDE_H_
-
-#define MSG_IDE_NAME "ide" //Has to match the message name in the msg descriptor!
-
-typedef enum shape{
- SQUARE,
- CIRCLE,
- TRIANGLE,
- RECTANGLE,
- HEXAGON,
- LAST_SHAPE
-} shape_e;
-
-const char* shape_tostring[] = {"SQUARE","CIRCLE","TRIANGLE","RECTANGLE","HEXAGON"};
-
-struct ide{
- shape_e shape;
-};
-
-typedef struct ide* ide_pt;
-
-#endif /* IDE_H_ */
http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/examples/mp_pubsub/common/include/kinematics.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/examples/mp_pubsub/common/include/kinematics.h b/bundles/pubsub/examples/mp_pubsub/common/include/kinematics.h
deleted file mode 100644
index 5601509..0000000
--- a/bundles/pubsub/examples/mp_pubsub/common/include/kinematics.h
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements. See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership. The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * kinematics.h
- *
- * \date Nov 12, 2015
- * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- * \copyright Apache License, Version 2.0
- */
-
-#ifndef KINEMATICS_H_
-#define KINEMATICS_H_
-
-#define MIN_LAT -90.0F
-#define MAX_LAT 90.0F
-#define MIN_LON -180.0F
-#define MAX_LON 180.0F
-
-#define MIN_OCCUR 1
-#define MAX_OCCUR 5
-
-#define MSG_KINEMATICS_NAME "kinematics" //Has to match the message name in the msg descriptor!
-
-struct position{
- double lat;
- double lon;
-};
-
-typedef struct position position_t;
-
-struct kinematics{
- position_t position;
- int occurrences;
-};
-
-typedef struct kinematics* kinematics_pt;
-
-
-#endif /* KINEMATICS_H_ */
http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/examples/mp_pubsub/msg_descriptors/msg_ew.descriptor
----------------------------------------------------------------------
diff --git a/bundles/pubsub/examples/mp_pubsub/msg_descriptors/msg_ew.descriptor b/bundles/pubsub/examples/mp_pubsub/msg_descriptors/msg_ew.descriptor
deleted file mode 100644
index 7eb8c29..0000000
--- a/bundles/pubsub/examples/mp_pubsub/msg_descriptors/msg_ew.descriptor
+++ /dev/null
@@ -1,9 +0,0 @@
-:header
-type=message
-name=ew
-version=1.0.0
-:annotations
-classname=org.example.Ew
-:types
-:message
-{Di area color}
http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/examples/mp_pubsub/msg_descriptors/msg_ide.descriptor
----------------------------------------------------------------------
diff --git a/bundles/pubsub/examples/mp_pubsub/msg_descriptors/msg_ide.descriptor b/bundles/pubsub/examples/mp_pubsub/msg_descriptors/msg_ide.descriptor
deleted file mode 100644
index f26286d..0000000
--- a/bundles/pubsub/examples/mp_pubsub/msg_descriptors/msg_ide.descriptor
+++ /dev/null
@@ -1,9 +0,0 @@
-:header
-type=message
-name=ide
-version=1.0.0
-:annotations
-classname=org.example.Ide
-:types
-:message
-{i shape}
http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/examples/mp_pubsub/msg_descriptors/msg_kinematics.descriptor
----------------------------------------------------------------------
diff --git a/bundles/pubsub/examples/mp_pubsub/msg_descriptors/msg_kinematics.descriptor b/bundles/pubsub/examples/mp_pubsub/msg_descriptors/msg_kinematics.descriptor
deleted file mode 100644
index 447b645..0000000
--- a/bundles/pubsub/examples/mp_pubsub/msg_descriptors/msg_kinematics.descriptor
+++ /dev/null
@@ -1,10 +0,0 @@
-:header
-type=message
-name=kinematics
-version=1.0.0
-:annotations
-classname=org.example.Kinematics
-:types
-position={DD lat long}
-:message
-{lposition;N position occurrences}
http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/examples/mp_pubsub/publisher/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/bundles/pubsub/examples/mp_pubsub/publisher/CMakeLists.txt b/bundles/pubsub/examples/mp_pubsub/publisher/CMakeLists.txt
deleted file mode 100644
index 76c9bce..0000000
--- a/bundles/pubsub/examples/mp_pubsub/publisher/CMakeLists.txt
+++ /dev/null
@@ -1,43 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-add_celix_bundle(org.apache.celix.pubsub_publisher.MpPublisher
- SYMBOLIC_NAME "apache_celix_pubsub_mp_publisher"
- VERSION "1.0.0"
- SOURCES
- private/src/mp_pub_activator.c
- private/src/mp_publisher.c
-)
-target_link_libraries(org.apache.celix.pubsub_publisher.MpPublisher PRIVATE Celix::framework Celix::pubsub_api)
-target_include_directories(org.apache.celix.pubsub_publisher.MpPublisher PRIVATE private/include)
-
-celix_bundle_files(org.apache.celix.pubsub_publisher.MpPublisher
- ${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/mp_pubsub/msg_descriptors/msg_ew.descriptor
- ${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/mp_pubsub/msg_descriptors/msg_ide.descriptor
- ${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/mp_pubsub/msg_descriptors/msg_kinematics.descriptor
- DESTINATION "META-INF/descriptors"
-)
-
-celix_bundle_files(org.apache.celix.pubsub_publisher.MpPublisher
- ${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/keys/publisher
- DESTINATION "META-INF/keys"
-)
-
-celix_bundle_files(org.apache.celix.pubsub_publisher.MpPublisher
- ${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/keys/subscriber/public
- DESTINATION "META-INF/keys/subscriber"
-)
http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/examples/mp_pubsub/publisher/private/include/mp_publisher_private.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/examples/mp_pubsub/publisher/private/include/mp_publisher_private.h b/bundles/pubsub/examples/mp_pubsub/publisher/private/include/mp_publisher_private.h
deleted file mode 100644
index 9c227fd..0000000
--- a/bundles/pubsub/examples/mp_pubsub/publisher/private/include/mp_publisher_private.h
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements. See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership. The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * mp_publisher_private.h
- *
- * \date Sep 21, 2010
- * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- * \copyright Apache License, Version 2.0
- */
-
-#ifndef MP_PUBLISHER_PRIVATE_H_
-#define MP_PUBLISHER_PRIVATE_H_
-
-#include <celixbool.h>
-
-#include "pubsub/publisher.h"
-
-struct pubsub_sender {
- array_list_pt trackers;
- const char *ident;
- long bundleId;
-};
-
-typedef struct pubsub_sender * pubsub_sender_pt;
-
-typedef struct send_thread_struct{
- pubsub_publisher_pt service;
- pubsub_sender_pt publisher;
-} *send_thread_struct_pt;
-
-pubsub_sender_pt publisher_create(array_list_pt trackers,const char* ident,long bundleId);
-
-void publisher_start(pubsub_sender_pt client);
-void publisher_stop(pubsub_sender_pt client);
-
-void publisher_destroy(pubsub_sender_pt client);
-
-celix_status_t publisher_publishSvcAdded(void * handle, service_reference_pt reference, void * service);
-celix_status_t publisher_publishSvcRemoved(void * handle, service_reference_pt reference, void * service);
-
-
-#endif /* MP_PUBLISHER_PRIVATE_H_ */
http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/examples/mp_pubsub/publisher/private/src/mp_pub_activator.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/examples/mp_pubsub/publisher/private/src/mp_pub_activator.c b/bundles/pubsub/examples/mp_pubsub/publisher/private/src/mp_pub_activator.c
deleted file mode 100644
index 0b6041d..0000000
--- a/bundles/pubsub/examples/mp_pubsub/publisher/private/src/mp_pub_activator.c
+++ /dev/null
@@ -1,144 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements. See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership. The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * mp_pub_activator.c
- *
- * \date Sep 21, 2010
- * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- * \copyright Apache License, Version 2.0
- */
-
-#include <sys/cdefs.h>
-#include <stdlib.h>
-#include <string.h>
-
-#include "bundle_activator.h"
-#include "service_tracker.h"
-#include "constants.h"
-
-#include "mp_publisher_private.h"
-
-static const char * PUB_TOPICS[] = {
- "multipart",
- NULL
-};
-
-
-struct publisherActivator {
- pubsub_sender_pt client;
- array_list_pt trackerList;//List<service_tracker_pt>
-};
-
-celix_status_t bundleActivator_create(bundle_context_pt context, void **userData) {
- celix_status_t status = CELIX_SUCCESS;
-
- struct publisherActivator * act = malloc(sizeof(*act));
-
- const char* fwUUID = NULL;
-
- bundleContext_getProperty(context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID);
- if(fwUUID == NULL){
- printf("MP_PUBLISHER: Cannot retrieve fwUUID.\n");
- status = CELIX_INVALID_BUNDLE_CONTEXT;
- }
-
- if (status == CELIX_SUCCESS){
- bundle_pt bundle = NULL;
- long bundleId = 0;
- bundleContext_getBundle(context,&bundle);
- bundle_getBundleId(bundle,&bundleId);
-
- arrayList_create(&(act->trackerList));
- act->client = publisher_create(act->trackerList,fwUUID,bundleId);
- *userData = act;
- } else {
- free(act);
- }
-
- return status;
-}
-
-celix_status_t bundleActivator_start(void * userData, bundle_context_pt context) {
-
- struct publisherActivator * act = (struct publisherActivator *) userData;
-
- int i;
-
- char filter[128];
- for(i=0; PUB_TOPICS[i] != NULL; i++){
- const char* topic = PUB_TOPICS[i];
-
- bundle_pt bundle = NULL;
- long bundleId = 0;
- bundleContext_getBundle(context,&bundle);
- bundle_getBundleId(bundle,&bundleId);
-
- service_tracker_pt tracker = NULL;
- memset(filter,0,128);
-
- snprintf(filter, 128, "(&(%s=%s)(%s=%s))", (char*) OSGI_FRAMEWORK_OBJECTCLASS, PUBSUB_PUBLISHER_SERVICE_NAME, PUBSUB_PUBLISHER_TOPIC,topic);
-
- service_tracker_customizer_pt customizer = NULL;
-
- serviceTrackerCustomizer_create(act->client,NULL,publisher_publishSvcAdded,NULL,publisher_publishSvcRemoved,&customizer);
- serviceTracker_createWithFilter(context, filter, customizer, &tracker);
-
- arrayList_add(act->trackerList,tracker);
- }
-
- publisher_start(act->client);
-
- for(i=0;i<arrayList_size(act->trackerList);i++){
- service_tracker_pt tracker = arrayList_get(act->trackerList,i);
- serviceTracker_open(tracker);
- }
-
- return CELIX_SUCCESS;
-}
-
-celix_status_t bundleActivator_stop(void * userData, bundle_context_pt __attribute__((unused)) context) {
- struct publisherActivator * act = (struct publisherActivator *) userData;
- int i;
-
- for(i=0;i<arrayList_size(act->trackerList);i++){
- service_tracker_pt tracker = arrayList_get(act->trackerList,i);
- serviceTracker_close(tracker);
- }
-
- publisher_stop(act->client);
-
- return CELIX_SUCCESS;
-}
-
-celix_status_t bundleActivator_destroy(void * userData, bundle_context_pt __attribute__((unused)) context) {
- struct publisherActivator * act = (struct publisherActivator *) userData;
- int i;
-
- for(i=0;i<arrayList_size(act->trackerList);i++){
- service_tracker_pt tracker = arrayList_get(act->trackerList,i);
- serviceTracker_destroy(tracker);
- }
-
- publisher_destroy(act->client);
- arrayList_destroy(act->trackerList);
-
- free(act);
-
- return CELIX_SUCCESS;
-}
http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/examples/mp_pubsub/publisher/private/src/mp_publisher.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/examples/mp_pubsub/publisher/private/src/mp_publisher.c b/bundles/pubsub/examples/mp_pubsub/publisher/private/src/mp_publisher.c
deleted file mode 100644
index f47be29..0000000
--- a/bundles/pubsub/examples/mp_pubsub/publisher/private/src/mp_publisher.c
+++ /dev/null
@@ -1,160 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements. See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership. The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * mp_publisher.c
- *
- * \date Sep 21, 2010
- * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- * \copyright Apache License, Version 2.0
- */
-
-#include <stdlib.h>
-#include <string.h>
-#include <pthread.h>
-#include <unistd.h>
-
-#include "service_tracker.h"
-#include "celix_threads.h"
-
-#include "ew.h"
-#include "ide.h"
-#include "kinematics.h"
-#include "mp_publisher_private.h"
-
-
-static bool stop=false;
-static celix_thread_t tid;
-
-static double randDouble(double min, double max){
- double ret = min + (((double)random()) / (((double)RAND_MAX)/(max-min))) ;
- return ret;
-}
-
-static unsigned int randInt(unsigned int min, unsigned int max){
- double scaled = ((double)random())/((double)RAND_MAX);
- return (max - min +1)*scaled + min;
-}
-
-static void* send_thread(void* arg){
-
- send_thread_struct_pt st_struct = (send_thread_struct_pt)arg;
-
- pubsub_publisher_pt publish_svc = (pubsub_publisher_pt)st_struct->service;
-
- unsigned int kin_msg_id = 0;
- unsigned int ide_msg_id = 0;
- unsigned int ew_msg_id = 0;
-
- if( publish_svc->localMsgTypeIdForMsgType(publish_svc->handle,(const char*)MSG_KINEMATICS_NAME,&kin_msg_id) != 0 ){
- printf("MP_PUBLISHER: Cannot retrieve msgId for message '%s'\n",MSG_KINEMATICS_NAME);
- return NULL;
- }
-
- if( publish_svc->localMsgTypeIdForMsgType(publish_svc->handle,(const char*)MSG_IDE_NAME,&ide_msg_id) != 0 ){
- printf("MP_PUBLISHER: Cannot retrieve msgId for message '%s'\n",MSG_IDE_NAME);
- return NULL;
- }
-
- if( publish_svc->localMsgTypeIdForMsgType(publish_svc->handle,(const char*)MSG_EW_NAME,&ew_msg_id) != 0 ){
- printf("MP_PUBLISHER: Cannot retrieve msgId for message '%s'\n",MSG_EW_NAME);
- return NULL;
- }
-
- kinematics_pt kin_data = calloc(1,sizeof(*kin_data));
- ide_pt ide_data = calloc(1,sizeof(*ide_data));
- ew_data_pt ew_data = calloc(1,sizeof(*ew_data));
-
- unsigned int counter = 1;
-
- while(stop==false){
- kin_data->position.lat = randDouble(MIN_LAT,MAX_LAT);
- kin_data->position.lon = randDouble(MIN_LON,MAX_LON);
- kin_data->occurrences = randInt(MIN_OCCUR,MAX_OCCUR);
- publish_svc->sendMultipart(publish_svc->handle,kin_msg_id,kin_data, PUBSUB_PUBLISHER_FIRST_MSG);
- printf("Track#%u kin_data: pos=[%f, %f] occurrences=%d\n",counter,kin_data->position.lat,kin_data->position.lon, kin_data->occurrences);
-
- ide_data->shape = (shape_e)randInt(0,LAST_SHAPE-1);
- publish_svc->sendMultipart(publish_svc->handle,ide_msg_id,ide_data, PUBSUB_PUBLISHER_PART_MSG);
- printf("Track#%u ide_data: shape=%s\n",counter,shape_tostring[(int)ide_data->shape]);
-
- ew_data->area = randDouble(MIN_AREA,MAX_AREA);
- ew_data->color = (color_e)randInt(0,LAST_COLOR-1);
- publish_svc->sendMultipart(publish_svc->handle,ew_msg_id,ew_data, PUBSUB_PUBLISHER_LAST_MSG);
- printf("Track#%u ew_data: area=%f color=%s\n",counter,ew_data->area,color_tostring[(int)ew_data->color]);
-
- printf("\n");
- sleep(2);
- counter++;
- }
-
- free(ew_data);
- free(ide_data);
- free(kin_data);
- free(st_struct);
-
- return NULL;
-
-}
-
-pubsub_sender_pt publisher_create(array_list_pt trackers,const char* ident,long bundleId) {
- pubsub_sender_pt publisher = malloc(sizeof(*publisher));
-
- publisher->trackers = trackers;
- publisher->ident = ident;
- publisher->bundleId = bundleId;
-
- return publisher;
-}
-
-void publisher_start(pubsub_sender_pt client) {
- printf("MP_PUBLISHER: starting up...\n");
-}
-
-void publisher_stop(pubsub_sender_pt client) {
- printf("MP_PUBLISHER: stopping...\n");
-}
-
-void publisher_destroy(pubsub_sender_pt client) {
- client->trackers = NULL;
- client->ident = NULL;
- free(client);
-}
-
-celix_status_t publisher_publishSvcAdded(void * handle, service_reference_pt reference, void * service){
- pubsub_publisher_pt publish_svc = (pubsub_publisher_pt)service;
- pubsub_sender_pt manager = (pubsub_sender_pt)handle;
-
- printf("MP_PUBLISHER: new publish service exported (%s).\n",manager->ident);
-
- send_thread_struct_pt data = calloc(1,sizeof(struct send_thread_struct));
- data->service = publish_svc;
- data->publisher = manager;
-
- celixThread_create(&tid,NULL,send_thread,(void*)data);
- return CELIX_SUCCESS;
-}
-
-celix_status_t publisher_publishSvcRemoved(void * handle, service_reference_pt reference, void * service){
- //publish_service_pt publish_svc = (publish_service_pt)service;
- pubsub_sender_pt manager = (pubsub_sender_pt)handle;
- printf("MP_PUBLISHER: publish service unexported (%s)!\n",manager->ident);
- stop=true;
- celixThread_join(tid,NULL);
- return CELIX_SUCCESS;
-}
http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/examples/mp_pubsub/subscriber/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/bundles/pubsub/examples/mp_pubsub/subscriber/CMakeLists.txt b/bundles/pubsub/examples/mp_pubsub/subscriber/CMakeLists.txt
deleted file mode 100644
index 444764e..0000000
--- a/bundles/pubsub/examples/mp_pubsub/subscriber/CMakeLists.txt
+++ /dev/null
@@ -1,43 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-add_celix_bundle(org.apache.celix.pubsub_subscriber.MpSubscriber
- SYMBOLIC_NAME "apache_celix_pubsub_mp_subscriber"
- VERSION "1.0.0"
- SOURCES
- private/src/mp_sub_activator.c
- private/src/mp_subscriber.c
-)
-target_link_libraries(org.apache.celix.pubsub_subscriber.MpSubscriber PRIVATE Celix::framework Celix::pubsub_api)
-target_include_directories(org.apache.celix.pubsub_subscriber.MpSubscriber PRIVATE private/include)
-
-celix_bundle_files( org.apache.celix.pubsub_subscriber.MpSubscriber
- ${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/mp_pubsub/msg_descriptors/msg_ew.descriptor
- ${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/mp_pubsub/msg_descriptors/msg_ide.descriptor
- ${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/mp_pubsub/msg_descriptors/msg_kinematics.descriptor
- DESTINATION "META-INF/descriptors"
-)
-
-celix_bundle_files(org.apache.celix.pubsub_subscriber.MpSubscriber
- ${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/keys/subscriber
- DESTINATION "META-INF/keys"
-)
-
-celix_bundle_files(org.apache.celix.pubsub_subscriber.MpSubscriber
- ${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/keys/publisher/public
- DESTINATION "META-INF/keys/publisher"
-)
http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/examples/mp_pubsub/subscriber/private/include/mp_subscriber_private.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/examples/mp_pubsub/subscriber/private/include/mp_subscriber_private.h b/bundles/pubsub/examples/mp_pubsub/subscriber/private/include/mp_subscriber_private.h
deleted file mode 100644
index 2d582b3..0000000
--- a/bundles/pubsub/examples/mp_pubsub/subscriber/private/include/mp_subscriber_private.h
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements. See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership. The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * mp_subscriber_private.h
- *
- * \date Sep 21, 2010
- * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- * \copyright Apache License, Version 2.0
- */
-
-#ifndef PUBSUB_SUBSCRIBER_PRIVATE_H_
-#define PUBSUB_SUBSCRIBER_PRIVATE_H_
-
-
-#include <string.h>
-
-#include "celixbool.h"
-
-#include "pubsub/subscriber.h"
-
-struct pubsub_receiver {
- char * name;
-};
-
-typedef struct pubsub_receiver* pubsub_receiver_pt;
-
-pubsub_receiver_pt subscriber_create(char* topics);
-void subscriber_start(pubsub_receiver_pt client);
-void subscriber_stop(pubsub_receiver_pt client);
-void subscriber_destroy(pubsub_receiver_pt client);
-
-int pubsub_subscriber_recv(void* handle, const char* msgType, unsigned int msgTypeId, void* msg,pubsub_multipart_callbacks_t *callbacks, bool* release);
-
-#endif /* PUBSUB_SUBSCRIBER_PRIVATE_H_ */
http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/examples/mp_pubsub/subscriber/private/src/mp_sub_activator.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/examples/mp_pubsub/subscriber/private/src/mp_sub_activator.c b/bundles/pubsub/examples/mp_pubsub/subscriber/private/src/mp_sub_activator.c
deleted file mode 100644
index 591a395..0000000
--- a/bundles/pubsub/examples/mp_pubsub/subscriber/private/src/mp_sub_activator.c
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements. See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership. The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * mp_sub_activator.c
- *
- * \date Sep 21, 2010
- * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- * \copyright Apache License, Version 2.0
- */
-
-#include <stdlib.h>
-
-#include "pubsub/subscriber.h"
-#include "bundle_activator.h"
-
-#include "mp_subscriber_private.h"
-
-#define SUB_NAME "multipart"
-static const char * SUB_TOPICS[] = {
- "multipart",
- NULL
-};
-
-struct subscriberActivator {
- array_list_pt registrationList; //List<service_registration_pt>
- pubsub_subscriber_t* subsvc;
-};
-
-celix_status_t bundleActivator_create(bundle_context_pt context, void **userData) {
- struct subscriberActivator * act = calloc(1,sizeof(struct subscriberActivator));
- *userData = act;
- arrayList_create(&(act->registrationList));
- return CELIX_SUCCESS;
-}
-
-celix_status_t bundleActivator_start(void * userData, bundle_context_pt context) {
- struct subscriberActivator * act = (struct subscriberActivator *) userData;
-
-
- pubsub_subscriber_pt subsvc = calloc(1,sizeof(*subsvc));
- pubsub_receiver_pt sub = subscriber_create(SUB_NAME);
- subsvc->handle = sub;
- subsvc->receive = pubsub_subscriber_recv;
-
- act->subsvc = subsvc;
-
- int i;
- for(i=0; SUB_TOPICS[i] != NULL; i++){
- const char* topic = SUB_TOPICS[i];
-
- properties_pt props = properties_create();
- properties_set(props, PUBSUB_SUBSCRIBER_TOPIC,topic);
- service_registration_pt reg = NULL;
- bundleContext_registerService(context, PUBSUB_SUBSCRIBER_SERVICE_NAME, subsvc, props, ®);
- arrayList_add(act->registrationList,reg);
- }
-
- subscriber_start((pubsub_receiver_pt)act->subsvc->handle);
-
- return CELIX_SUCCESS;
-}
-
-celix_status_t bundleActivator_stop(void * userData, bundle_context_pt context) {
- struct subscriberActivator * act = (struct subscriberActivator *) userData;
-
- int i;
- for(i=0; i<arrayList_size(act->registrationList);i++){
- service_registration_pt reg = arrayList_get(act->registrationList,i);
- serviceRegistration_unregister(reg);
-
- }
-
- subscriber_stop((pubsub_receiver_pt)act->subsvc->handle);
-
- return CELIX_SUCCESS;
-}
-
-celix_status_t bundleActivator_destroy(void * userData, bundle_context_pt context) {
-
- struct subscriberActivator * act = (struct subscriberActivator *) userData;
-
- act->subsvc->receive = NULL;
- subscriber_destroy((pubsub_receiver_pt)act->subsvc->handle);
- act->subsvc->handle = NULL;
- free(act->subsvc);
- act->subsvc = NULL;
-
- arrayList_destroy(act->registrationList);
- free(act);
-
- return CELIX_SUCCESS;
-}
http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/examples/mp_pubsub/subscriber/private/src/mp_subscriber.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/examples/mp_pubsub/subscriber/private/src/mp_subscriber.c b/bundles/pubsub/examples/mp_pubsub/subscriber/private/src/mp_subscriber.c
deleted file mode 100644
index a5ad03a..0000000
--- a/bundles/pubsub/examples/mp_pubsub/subscriber/private/src/mp_subscriber.c
+++ /dev/null
@@ -1,119 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements. See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership. The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * mp_subscriber.c
- *
- * \date Sep 21, 2010
- * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- * \copyright Apache License, Version 2.0
- */
-
-#include <stdlib.h>
-#include <stdio.h>
-
-#include "ew.h"
-#include "ide.h"
-#include "kinematics.h"
-#include "mp_subscriber_private.h"
-
-pubsub_receiver_pt subscriber_create(char* topics) {
- pubsub_receiver_pt sub = calloc(1,sizeof(*sub));
- sub->name = strdup(topics);
- return sub;
-}
-
-
-void subscriber_start(pubsub_receiver_pt subscriber){
- printf("MP_SUBSCRIBER: starting up...\n");
-}
-
-void subscriber_stop(pubsub_receiver_pt subscriber){
- printf("MP_SUBSCRIBER: stopping...\n");
-}
-
-void subscriber_destroy(pubsub_receiver_pt subscriber){
- if(subscriber->name!=NULL){
- free(subscriber->name);
- }
- subscriber->name=NULL;
- free(subscriber);
-}
-
-int pubsub_subscriber_recv(void* handle, const char* msgType, unsigned int msgTypeId, void* msg,pubsub_multipart_callbacks_t *callbacks, bool* release){
-
- unsigned int kin_msg_id = 0;
- unsigned int ide_msg_id = 0;
- unsigned int ew_msg_id = 0;
-
- if( callbacks->localMsgTypeIdForMsgType(callbacks->handle,(const char*)MSG_KINEMATICS_NAME,&kin_msg_id) != 0 ){
- printf("MP_SUBSCRIBER: Cannot retrieve msgId for message '%s'\n",MSG_KINEMATICS_NAME);
- return -1;
- }
-
- if( callbacks->localMsgTypeIdForMsgType(callbacks->handle,(const char*)MSG_IDE_NAME,&ide_msg_id) != 0 ){
- printf("MP_SUBSCRIBER: Cannot retrieve msgId for message '%s'\n",MSG_IDE_NAME);
- return -1;
- }
-
- if( callbacks->localMsgTypeIdForMsgType(callbacks->handle,(const char*)MSG_EW_NAME,&ew_msg_id) != 0 ){
- printf("MP_SUBSCRIBER: Cannot retrieve msgId for message '%s'\n",MSG_EW_NAME);
- return -1;
- }
-
- if(msgTypeId!=kin_msg_id){
- printf("MP_SUBSCRIBER: Multipart Message started with wrong message (expected %u, got %u)\n",msgTypeId,kin_msg_id);
- return -1;
- }
-
- kinematics_pt kin_data = (kinematics_pt)msg;
-
- void* ide_msg = NULL;
- callbacks->getMultipart(callbacks->handle,ide_msg_id,false,&ide_msg);
-
- void* ew_msg = NULL;
- callbacks->getMultipart(callbacks->handle,ew_msg_id,false,&ew_msg);
-
- if(kin_data==NULL){
- printf("MP_SUBSCRIBER: Unexpected NULL data for message '%s'\n",MSG_KINEMATICS_NAME);
- }
- else{
- printf("kin_data: pos=[%f, %f] occurrences=%d\n",kin_data->position.lat,kin_data->position.lon, kin_data->occurrences);
- }
-
- if(ide_msg==NULL){
- printf("MP_SUBSCRIBER: Unexpected NULL data for message '%s'\n",MSG_IDE_NAME);
- }
- else{
- ide_pt ide_data = (ide_pt)ide_msg;
- printf("ide_data: shape=%s\n",shape_tostring[(int)ide_data->shape]);
- }
-
- if(ew_msg==NULL){
- printf("MP_SUBSCRIBER: Unexpected NULL data for message '%s'\n",MSG_EW_NAME);
- }
- else{
- ew_data_pt ew_data = (ew_data_pt)ew_msg;
- printf("ew_data: area=%f color=%s\n",ew_data->area,color_tostring[(int)ew_data->color]);
- }
-
- printf("\n");
-
- return 0;
-
-}
http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/examples/pubsub/subscriber/private/include/pubsub_subscriber_private.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/examples/pubsub/subscriber/private/include/pubsub_subscriber_private.h b/bundles/pubsub/examples/pubsub/subscriber/private/include/pubsub_subscriber_private.h
index 00ca9b4..291a6aa 100644
--- a/bundles/pubsub/examples/pubsub/subscriber/private/include/pubsub_subscriber_private.h
+++ b/bundles/pubsub/examples/pubsub/subscriber/private/include/pubsub_subscriber_private.h
@@ -45,7 +45,7 @@ void subscriber_start(pubsub_receiver_pt client);
void subscriber_stop(pubsub_receiver_pt client);
void subscriber_destroy(pubsub_receiver_pt client);
-int pubsub_subscriber_recv(void* handle, const char* msgType, unsigned int msgTypeId, void* msg,pubsub_multipart_callbacks_t *callbacks, bool* release);
+int pubsub_subscriber_recv(void* handle, const char* msgType, unsigned int msgTypeId, void* msg, bool* release);
#endif /* PUBSUB_SUBSCRIBER_PRIVATE_H_ */
http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/examples/pubsub/subscriber/private/src/pubsub_subscriber.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/examples/pubsub/subscriber/private/src/pubsub_subscriber.c b/bundles/pubsub/examples/pubsub/subscriber/private/src/pubsub_subscriber.c
index a137253..7cfbedb 100644
--- a/bundles/pubsub/examples/pubsub/subscriber/private/src/pubsub_subscriber.c
+++ b/bundles/pubsub/examples/pubsub/subscriber/private/src/pubsub_subscriber.c
@@ -53,7 +53,7 @@ void subscriber_destroy(pubsub_receiver_pt subscriber){
free(subscriber);
}
-int pubsub_subscriber_recv(void* handle, const char* msgType, unsigned int msgTypeId, void* msg,pubsub_multipart_callbacks_t *callbacks, bool* release){
+int pubsub_subscriber_recv(void* handle, const char* msgType, unsigned int msgTypeId, void* msg, bool* release){
location_t place = (location_t)msg;
int nrchars = 25;
http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/mock/src/publisher_mock.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/mock/src/publisher_mock.cc b/bundles/pubsub/mock/src/publisher_mock.cc
index e8902a8..1bdae98 100644
--- a/bundles/pubsub/mock/src/publisher_mock.cc
+++ b/bundles/pubsub/mock/src/publisher_mock.cc
@@ -45,24 +45,10 @@ static int pubsub__publisherMock_send(void *handle, unsigned int msgTypeId, cons
}
/*============================================================================
- MOCK - mock function for pubsub_publisher->sendMultipart
- ============================================================================*/
-static int pubsub__publisherMock_sendMultipart(void *handle, unsigned int msgTypeId, const void *msg, int flags) {
- return mock(PUBSUB_PUBLISHERMOCK_SCOPE)
- .actualCall(PUBSUB_PUBLISHERMOCK_SEND_MULTIPART_METHOD)
- .withPointerParameter("handle", handle)
- .withParameter("msgTypeId", msgTypeId)
- .withPointerParameter("msg", (void*)msg)
- .withParameter("flags", flags)
- .returnIntValue();
-}
-
-/*============================================================================
MOCK - mock setup for publisher service
============================================================================*/
void pubsub_publisherMock_init(pubsub_publisher_t* srv, void* handle) {
srv->handle = handle;
srv->localMsgTypeIdForMsgType = pubsub__publisherMock_localMsgTypeIdForMsgType;
srv->send = pubsub__publisherMock_send;
- srv->sendMultipart = pubsub__publisherMock_sendMultipart;
}
http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_admin_udp_mc/src/psa_activator.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/psa_activator.c b/bundles/pubsub/pubsub_admin_udp_mc/src/psa_activator.c
index c838899..63de809 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/psa_activator.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/psa_activator.c
@@ -71,13 +71,13 @@ int psa_udpmc_start(psa_udpmc_activator_t *act, celix_bundle_context_t *ctx) {
psaSvc->handle = act->admin;
psaSvc->matchPublisher = pubsub_udpmcAdmin_matchPublisher;
psaSvc->matchSubscriber = pubsub_udpmcAdmin_matchSubscriber;
- psaSvc->matchEndpoint = pubsub_udpmcAdmin_matchEndpoint;
+ psaSvc->matchDiscoveredEndpoint = pubsub_udpmcAdmin_matchEndpoint;
psaSvc->setupTopicSender = pubsub_udpmcAdmin_setupTopicSender;
psaSvc->teardownTopicSender = pubsub_udpmcAdmin_teardownTopicSender;
psaSvc->setupTopicReceiver = pubsub_udpmcAdmin_setupTopicReceiver;
psaSvc->teardownTopicReceiver = pubsub_udpmcAdmin_teardownTopicReceiver;
- psaSvc->addEndpoint = pubsub_udpmcAdmin_addEndpoint;
- psaSvc->removeEndpoint = pubsub_udpmcAdmin_removeEndpoint;
+ psaSvc->addDiscoveredEndpoint = pubsub_udpmcAdmin_addEndpoint;
+ psaSvc->removeDiscoveredEndpoint = pubsub_udpmcAdmin_removeEndpoint;
celix_properties_t *props = celix_properties_create();
celix_properties_set(props, PUBSUB_ADMIN_SERVICE_TYPE, PUBSUB_UDPMC_ADMIN_TYPE);
http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c
index 6d06ee6..73fab20 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c
@@ -244,23 +244,23 @@ void pubsub_udpmcAdmin_destroy(pubsub_udpmc_admin_t *psa) {
free(psa);
}
-celix_status_t pubsub_udpmcAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, double *outScore, long *outSerializerSvcId) {
+celix_status_t pubsub_udpmcAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, celix_properties_t **topicProperties, double *outScore, long *outSerializerSvcId) {
pubsub_udpmc_admin_t *psa = handle;
L_DEBUG("[PSA_UDPMC] pubsub_udpmcAdmin_matchPublisher");
celix_status_t status = CELIX_SUCCESS;
double score = pubsub_utils_matchPublisher(psa->ctx, svcRequesterBndId, svcFilter->filterStr, PUBSUB_UDPMC_ADMIN_TYPE,
- psa->qosSampleScore, psa->qosControlScore, psa->defaultScore, outSerializerSvcId);
+ psa->qosSampleScore, psa->qosControlScore, psa->defaultScore, topicProperties, outSerializerSvcId);
*outScore = score;
return status;
}
-celix_status_t pubsub_udpmcAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, double *outScore, long *outSerializerSvcId) {
+celix_status_t pubsub_udpmcAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, celix_properties_t **topicProperties, double *outScore, long *outSerializerSvcId) {
pubsub_udpmc_admin_t *psa = handle;
L_DEBUG("[PSA_UDPMC] pubsub_udpmcAdmin_matchSubscriber");
celix_status_t status = CELIX_SUCCESS;
double score = pubsub_utils_matchSubscriber(psa->ctx, svcProviderBndId, svcProperties, PUBSUB_UDPMC_ADMIN_TYPE,
- psa->qosSampleScore, psa->qosControlScore, psa->defaultScore, outSerializerSvcId);
+ psa->qosSampleScore, psa->qosControlScore, psa->defaultScore, topicProperties, outSerializerSvcId);
if (outScore != NULL) {
*outScore = score;
}
@@ -278,7 +278,7 @@ celix_status_t pubsub_udpmcAdmin_matchEndpoint(void *handle, const celix_propert
return status;
}
-celix_status_t pubsub_udpmcAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **outPublisherEndpoint) {
+celix_status_t pubsub_udpmcAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProps, long serializerSvcId, celix_properties_t **outPublisherEndpoint) {
pubsub_udpmc_admin_t *psa = handle;
celix_status_t status = CELIX_SUCCESS;
@@ -358,7 +358,7 @@ celix_status_t pubsub_udpmcAdmin_teardownTopicSender(void *handle, const char *s
return status;
}
-celix_status_t pubsub_udpmcAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **outSubscriberEndpoint) {
+celix_status_t pubsub_udpmcAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProps, long serializerSvcId, celix_properties_t **outSubscriberEndpoint) {
pubsub_udpmc_admin_t *psa = handle;
celix_properties_t *newEndpoint = NULL;
http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h
index 02ebd44..17b8957 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h
@@ -40,14 +40,14 @@ typedef struct pubsub_udpmc_admin pubsub_udpmc_admin_t;
pubsub_udpmc_admin_t* pubsub_udpmcAdmin_create(celix_bundle_context_t *ctx, log_helper_t *logHelper);
void pubsub_udpmcAdmin_destroy(pubsub_udpmc_admin_t *psa);
-celix_status_t pubsub_udpmcAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, double *score, long *serializerSvcId);
-celix_status_t pubsub_udpmcAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, double *score, long *serializerSvcId);
+celix_status_t pubsub_udpmcAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, celix_properties_t **topicProperties, double *score, long *serializerSvcId);
+celix_status_t pubsub_udpmcAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, celix_properties_t **topicProperties, double *score, long *serializerSvcId);
celix_status_t pubsub_udpmcAdmin_matchEndpoint(void *handle, const celix_properties_t *endpoint, bool *match);
-celix_status_t pubsub_udpmcAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **publisherEndpoint);
+celix_status_t pubsub_udpmcAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, celix_properties_t **publisherEndpoint);
celix_status_t pubsub_udpmcAdmin_teardownTopicSender(void *handle, const char *scope, const char *topic);
-celix_status_t pubsub_udpmcAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **subscriberEndpoint);
+celix_status_t pubsub_udpmcAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, celix_properties_t **subscriberEndpoint);
celix_status_t pubsub_udpmcAdmin_teardownTopicReceiver(void *handle, const char *scope, const char *topic);
void pubsub_udpmcAdmin_addSerializerSvc(void *handle, void *svc, const celix_properties_t *props);
http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c
index efebf7c..a2f37d4 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c
@@ -396,13 +396,8 @@ static void psa_udpmc_processMsg(pubsub_udpmc_topic_receiver_t *receiver, pubsub
if (status == CELIX_SUCCESS) {
bool release = true;
- pubsub_multipart_callbacks_t mp_callbacks;
- mp_callbacks.handle = receiver;
- mp_callbacks.localMsgTypeIdForMsgType = psa_udpmc_localMsgTypeIdForMsgType;
- mp_callbacks.getMultipart = NULL;
-
pubsub_subscriber_t *svc = entry->svc;
- svc->receive(svc->handle, msgSer->msgName, msg->header.type, msgInst, &mp_callbacks, &release);
+ svc->receive(svc->handle, msgSer->msgName, msg->header.type, msgInst, &release);
if(release){
msgSer->freeMsg(msgSer,msgInst);
http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c
index 59c809c..ddc6172 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c
@@ -79,7 +79,7 @@ typedef struct pubsub_msg{
static void* psa_udpmc_getPublisherService(void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties);
static void psa_udpmc_ungetPublisherService(void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties);
static int psa_udpmc_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *inMsg);
-static bool psa_udpmc_sendMsg(psa_udpmc_bounded_service_entry_t *entry, pubsub_msg_t* msg, bool last, pubsub_release_callback_t *releaseCallback);
+static bool psa_udpmc_sendMsg(psa_udpmc_bounded_service_entry_t *entry, pubsub_msg_t* msg);
static unsigned int rand_range(unsigned int min, unsigned int max);
pubsub_udpmc_topic_sender_t* pubsub_udpmcTopicSender_create(
@@ -205,7 +205,6 @@ static void* psa_udpmc_getPublisherService(void *handle, const celix_bundle_t *r
entry->service.handle = entry;
entry->service.localMsgTypeIdForMsgType = psa_udpmc_localMsgTypeIdForMsgType;
entry->service.send = psa_udpmc_topicPublicationSend;
- entry->service.sendMultipart = NULL; //note multipart not supported by MCUDP
hashMap_put(sender->boundedServices.map, (void*)bndId, entry);
svc = &entry->service;
} else {
@@ -277,7 +276,7 @@ static int psa_udpmc_topicPublicationSend(void* handle, unsigned int msgTypeId,
msg->payloadSize = serializedOutputLen;
- if (psa_udpmc_sendMsg(entry, msg, true, NULL) == false) {
+ if (psa_udpmc_sendMsg(entry, msg) == false) {
status = -1;
}
free(msg);
@@ -306,7 +305,7 @@ static void delay_first_send_for_late_joiners(){
}
}
-static bool psa_udpmc_sendMsg(psa_udpmc_bounded_service_entry_t *entry, pubsub_msg_t* msg, bool last, pubsub_release_callback_t *releaseCallback) {
+static bool psa_udpmc_sendMsg(psa_udpmc_bounded_service_entry_t *entry, pubsub_msg_t* msg) {
const int iovec_len = 3; // header + size + payload
bool ret = true;
@@ -325,9 +324,6 @@ static bool psa_udpmc_sendMsg(psa_udpmc_bounded_service_entry_t *entry, pubsub_m
ret = false;
}
- if(releaseCallback) {
- releaseCallback->release(msg->payload, entry);
- }
return ret;
}
http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_admin_zmq/src/psa_activator.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/psa_activator.c b/bundles/pubsub/pubsub_admin_zmq/src/psa_activator.c
index 01547cc..58f093a 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/psa_activator.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/psa_activator.c
@@ -70,13 +70,13 @@ int psa_zmq_start(psa_zmq_activator_t *act, celix_bundle_context_t *ctx) {
psaSvc->handle = act->admin;
psaSvc->matchPublisher = pubsub_zmqAdmin_matchPublisher;
psaSvc->matchSubscriber = pubsub_zmqAdmin_matchSubscriber;
- psaSvc->matchEndpoint = pubsub_zmqAdmin_matchEndpoint;
+ psaSvc->matchDiscoveredEndpoint = pubsub_zmqAdmin_matchDiscoveredEndpoint;
psaSvc->setupTopicSender = pubsub_zmqAdmin_setupTopicSender;
psaSvc->teardownTopicSender = pubsub_zmqAdmin_teardownTopicSender;
psaSvc->setupTopicReceiver = pubsub_zmqAdmin_setupTopicReceiver;
psaSvc->teardownTopicReceiver = pubsub_zmqAdmin_teardownTopicReceiver;
- psaSvc->addEndpoint = pubsub_zmqAdmin_addEndpoint;
- psaSvc->removeEndpoint = pubsub_zmqAdmin_removeEndpoint;
+ psaSvc->addDiscoveredEndpoint = pubsub_zmqAdmin_addDiscoveredEndpoint;
+ psaSvc->removeDiscoveredEndpoint = pubsub_zmqAdmin_removeDiscoveredEndpoint;
celix_properties_t *props = celix_properties_create();
celix_properties_set(props, PUBSUB_ADMIN_SERVICE_TYPE, PUBSUB_ZMQ_ADMIN_TYPE);
http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
index a13cb26..0192bb4 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
@@ -308,30 +308,30 @@ void pubsub_zmqAdmin_removeSerializerSvc(void *handle, void *svc, const celix_pr
celixThreadMutex_unlock(&psa->serializers.mutex);
}
-celix_status_t pubsub_zmqAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, double *outScore, long *outSerializerSvcId) {
+celix_status_t pubsub_zmqAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, celix_properties_t **topicProperties, double *outScore, long *outSerializerSvcId) {
pubsub_zmq_admin_t *psa = handle;
L_DEBUG("[PSA_ZMQ] pubsub_zmqAdmin_matchPublisher");
celix_status_t status = CELIX_SUCCESS;
double score = pubsub_utils_matchPublisher(psa->ctx, svcRequesterBndId, svcFilter->filterStr, PUBSUB_ZMQ_ADMIN_TYPE,
- psa->qosSampleScore, psa->qosControlScore, psa->defaultScore, outSerializerSvcId);
+ psa->qosSampleScore, psa->qosControlScore, psa->defaultScore, topicProperties, outSerializerSvcId);
*outScore = score;
return status;
}
-celix_status_t pubsub_zmqAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, double *outScore, long *outSerializerSvcId) {
+celix_status_t pubsub_zmqAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, celix_properties_t **topicProperties, double *outScore, long *outSerializerSvcId) {
pubsub_zmq_admin_t *psa = handle;
L_DEBUG("[PSA_ZMQ] pubsub_zmqAdmin_matchSubscriber");
celix_status_t status = CELIX_SUCCESS;
double score = pubsub_utils_matchSubscriber(psa->ctx, svcProviderBndId, svcProperties, PUBSUB_ZMQ_ADMIN_TYPE,
- psa->qosSampleScore, psa->qosControlScore, psa->defaultScore, outSerializerSvcId);
+ psa->qosSampleScore, psa->qosControlScore, psa->defaultScore, topicProperties, outSerializerSvcId);
if (outScore != NULL) {
*outScore = score;
}
return status;
}
-celix_status_t pubsub_zmqAdmin_matchEndpoint(void *handle, const celix_properties_t *endpoint, bool *outMatch) {
+celix_status_t pubsub_zmqAdmin_matchDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint, bool *outMatch) {
pubsub_zmq_admin_t *psa = handle;
L_DEBUG("[PSA_ZMQ] pubsub_zmqAdmin_matchEndpoint");
celix_status_t status = CELIX_SUCCESS;
@@ -342,7 +342,7 @@ celix_status_t pubsub_zmqAdmin_matchEndpoint(void *handle, const celix_propertie
return status;
}
-celix_status_t pubsub_zmqAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **outPublisherEndpoint) {
+celix_status_t pubsub_zmqAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, celix_properties_t **outPublisherEndpoint) {
pubsub_zmq_admin_t *psa = handle;
celix_status_t status = CELIX_SUCCESS;
@@ -353,6 +353,8 @@ celix_status_t pubsub_zmqAdmin_setupTopicSender(void *handle, const char *scope,
celix_properties_t *newEndpoint = NULL;
+ const char * staticBindUrl = topicProperties != NULL ?
+ celix_properties_get(topicProperties, PUBSUB_ZMQ_STATIC_BIND_URL, NULL) : NULL;
char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
celixThreadMutex_lock(&psa->serializers.mutex);
@@ -361,8 +363,8 @@ celix_status_t pubsub_zmqAdmin_setupTopicSender(void *handle, const char *scope,
if (sender == NULL) {
psa_zmq_serializer_entry_t *serEntry = hashMap_get(psa->serializers.map, (void*)serializerSvcId);
if (serEntry != NULL) {
- sender = pubsub_zmqTopicSender_create(psa->ctx, psa->log, scope, topic, serializerSvcId, serEntry->svc, psa->ipAddress,
- psa->basePort, psa->maxPort);
+ sender = pubsub_zmqTopicSender_create(psa->ctx, psa->log, scope, topic, serializerSvcId, serEntry->svc,
+ psa->ipAddress, staticBindUrl, psa->basePort, psa->maxPort);
}
if (sender != NULL) {
const char *psaType = PUBSUB_ZMQ_ADMIN_TYPE;
@@ -370,6 +372,15 @@ celix_status_t pubsub_zmqAdmin_setupTopicSender(void *handle, const char *scope,
newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType,
serType, NULL);
celix_properties_set(newEndpoint, PUBSUB_ZMQ_URL_KEY, pubsub_zmqTopicSender_url(sender));
+
+ //if configured use a static discover url
+ const char *staticDiscUrl = topicProperties != NULL ?
+ celix_properties_get(topicProperties, PUBSUB_ZMQ_STATIC_DISCOVER_URL, NULL) : NULL;
+ if (staticDiscUrl != NULL) {
+ celix_properties_get(newEndpoint, PUBSUB_ZMQ_URL_KEY, staticDiscUrl);
+ }
+ celix_properties_setBool(newEndpoint, PUBSUB_ZMQ_STATIC_CONFIGURED, staticBindUrl != NULL || staticDiscUrl != NULL);
+
//if available also set container name
const char *cn = celix_bundleContext_getProperty(psa->ctx, "CELIX_CONTAINER_NAME", NULL);
if (cn != NULL) {
@@ -423,11 +434,14 @@ celix_status_t pubsub_zmqAdmin_teardownTopicSender(void *handle, const char *sco
return status;
}
-celix_status_t pubsub_zmqAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **outSubscriberEndpoint) {
+celix_status_t pubsub_zmqAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, celix_properties_t **outSubscriberEndpoint) {
pubsub_zmq_admin_t *psa = handle;
celix_properties_t *newEndpoint = NULL;
+ const char *staticConnectUrls = topicProperties != NULL ?
+ celix_properties_get(topicProperties, PUBSUB_ZMQ_STATIC_CONNECT_URLS, NULL) : NULL;
+
char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
celixThreadMutex_lock(&psa->serializers.mutex);
celixThreadMutex_lock(&psa->topicReceivers.mutex);
@@ -435,7 +449,7 @@ celix_status_t pubsub_zmqAdmin_setupTopicReceiver(void *handle, const char *scop
if (receiver == NULL) {
psa_zmq_serializer_entry_t *serEntry = hashMap_get(psa->serializers.map, (void*)serializerSvcId);
if (serEntry != NULL) {
- receiver = pubsub_zmqTopicReceiver_create(psa->ctx, psa->log, scope, topic, serializerSvcId, serEntry->svc);
+ receiver = pubsub_zmqTopicReceiver_create(psa->ctx, psa->log, scope, topic, staticConnectUrls, serializerSvcId, serEntry->svc);
} else {
L_ERROR("[PSA_ZMQ] Cannot find serializer for TopicSender %s/%s", scope, topic);
}
@@ -530,7 +544,7 @@ static celix_status_t pubsub_zmqAdmin_connectEndpointToReceiver(pubsub_zmq_admin
return status;
}
-celix_status_t pubsub_zmqAdmin_addEndpoint(void *handle, const celix_properties_t *endpoint) {
+celix_status_t pubsub_zmqAdmin_addDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint) {
pubsub_zmq_admin_t *psa = handle;
const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, NULL);
@@ -581,7 +595,7 @@ static celix_status_t pubsub_zmqAdmin_disconnectEndpointFromReceiver(pubsub_zmq_
return status;
}
-celix_status_t pubsub_zmqAdmin_removeEndpoint(void *handle, const celix_properties_t *endpoint) {
+celix_status_t pubsub_zmqAdmin_removeDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint) {
pubsub_zmq_admin_t *psa = handle;
const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, NULL);
http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.h b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.h
index 180a9db..304ad11 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.h
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.h
@@ -35,23 +35,45 @@
#define PUBSUB_ZMQ_DEFAULT_IP "127.0.0.1"
+/**
+ * Can be set in the topic properties to fix a static bind url
+ */
+#define PUBSUB_ZMQ_STATIC_BIND_URL "zmq.static.bind.url"
+
+/**
+ * Can be set in the topic properties to fix a static url used for discovery
+ */
+#define PUBSUB_ZMQ_STATIC_DISCOVER_URL "zmq.static.bind.url"
+
+/**
+ * If set true on the endpoint, the zmq TopicSender bind and/or discovery url is statically configured.
+ */
+#define PUBSUB_ZMQ_STATIC_CONFIGURED "zmq.static.configured"
+
+/**
+ * The static url which a subscriber should try to connect to.
+ * The urls are space separated
+ */
+#define PUBSUB_ZMQ_STATIC_CONNECT_URLS "zmq.static.connect.url"
+
+
typedef struct pubsub_zmq_admin pubsub_zmq_admin_t;
pubsub_zmq_admin_t* pubsub_zmqAdmin_create(celix_bundle_context_t *ctx, log_helper_t *logHelper);
void pubsub_zmqAdmin_destroy(pubsub_zmq_admin_t *psa);
-celix_status_t pubsub_zmqAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, double *score, long *serializerSvcId);
-celix_status_t pubsub_zmqAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, double *score, long *serializerSvcId);
-celix_status_t pubsub_zmqAdmin_matchEndpoint(void *handle, const celix_properties_t *endpoint, bool *match);
+celix_status_t pubsub_zmqAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, celix_properties_t **topicProperties, double *score, long *serializerSvcId);
+celix_status_t pubsub_zmqAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, celix_properties_t **topicProperties, double *score, long *serializerSvcId);
+celix_status_t pubsub_zmqAdmin_matchDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint, bool *match);
-celix_status_t pubsub_zmqAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **publisherEndpoint);
+celix_status_t pubsub_zmqAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, const celix_properties_t* topicProperties, long serializerSvcId, celix_properties_t **publisherEndpoint);
celix_status_t pubsub_zmqAdmin_teardownTopicSender(void *handle, const char *scope, const char *topic);
-celix_status_t pubsub_zmqAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **subscriberEndpoint);
+celix_status_t pubsub_zmqAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, const celix_properties_t* topicProperties, long serializerSvcId, celix_properties_t **subscriberEndpoint);
celix_status_t pubsub_zmqAdmin_teardownTopicReceiver(void *handle, const char *scope, const char *topic);
-celix_status_t pubsub_zmqAdmin_addEndpoint(void *handle, const celix_properties_t *endpoint);
-celix_status_t pubsub_zmqAdmin_removeEndpoint(void *handle, const celix_properties_t *endpoint);
+celix_status_t pubsub_zmqAdmin_addDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint);
+celix_status_t pubsub_zmqAdmin_removeDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint);
void pubsub_zmqAdmin_addSerializerSvc(void *handle, void *svc, const celix_properties_t *props);
void pubsub_zmqAdmin_removeSerializerSvc(void *handle, void *svc, const celix_properties_t *props);
http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
index b348fdc..bb190cc 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
@@ -66,38 +66,46 @@ struct pubsub_zmq_topic_receiver {
struct {
celix_thread_mutex_t mutex;
hash_map_t *map; //key = zmq url, value = psa_zmq_requested_connection_entry_t*
+ bool allConnected; //true if all requestedConnectection are connected
} requestedConnections;
long subscriberTrackerId;
struct {
celix_thread_mutex_t mutex;
hash_map_t *map; //key = bnd id, value = psa_zmq_subscriber_entry_t
+ bool allInitialized;
} subscribers;
};
typedef struct psa_zmq_requested_connection_entry {
char *url;
bool connected;
+ bool statically; //true if the connection is statically configured through the topic properties.
} psa_zmq_requested_connection_entry_t;
typedef struct psa_zmq_subscriber_entry {
int usageCount;
hash_map_t *msgTypes; //map from serializer svc
pubsub_subscriber_t *svc;
+ bool initialized; //true if the init function is called through the receive thread
} psa_zmq_subscriber_entry_t;
static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *owner);
static void pubsub_zmqTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *owner);
static void* psa_zmq_recvThread(void * data);
+static void psa_zmq_connectToAllRequestedConnections(pubsub_zmq_topic_receiver_t *receiver);
+static void psa_zmq_initializeAllSubscribers(pubsub_zmq_topic_receiver_t *receiver);
+
pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context_t *ctx,
log_helper_t *logHelper,
const char *scope,
- const char *topic,
- long serializerSvcId,
- pubsub_serializer_service_t *serializer) {
+ const char *topic,
+ const char *staticConnectUrls,
+ long serializerSvcId,
+ pubsub_serializer_service_t *serializer) {
pubsub_zmq_topic_receiver_t *receiver = calloc(1, sizeof(*receiver));
receiver->ctx = ctx;
receiver->logHelper = logHelper;
@@ -185,6 +193,22 @@ pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context
receiver->requestedConnections.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
}
+ if (receiver->zmqSocket != NULL && staticConnectUrls != NULL) {
+ char *urlsCopy = strndup(staticConnectUrls, 1024*1024);
+ char* url;
+ char* save = urlsCopy;
+
+ while ((url = strtok_r(save, " ", &save))) {
+ psa_zmq_requested_connection_entry_t *entry = calloc(1, sizeof(*entry));
+ entry->statically = true;
+ entry->connected = false;
+ entry->url = strndup(url, 1024*1024);
+ hashMap_put(receiver->requestedConnections.map, entry->url, entry);
+ receiver->requestedConnections.allConnected = false;
+ }
+ free(urlsCopy);
+ }
+
//track subscribers
if (receiver->zmqSocket != NULL ) {
int size = snprintf(NULL, 0, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, topic);
@@ -280,10 +304,12 @@ void pubsub_zmqTopicReceiver_listConnections(pubsub_zmq_topic_receiver_t *receiv
hash_map_iterator_t iter = hashMapIterator_construct(receiver->requestedConnections.map);
while (hashMapIterator_hasNext(&iter)) {
psa_zmq_requested_connection_entry_t *entry = hashMapIterator_nextValue(&iter);
+ char *url = NULL;
+ asprintf(&url, "%s%s", entry->url, entry->statically ? " (static)" : "");
if (entry->connected) {
- celix_arrayList_add(connectedUrls, strndup(entry->url, 1024));
+ celix_arrayList_add(connectedUrls, url);
} else {
- celix_arrayList_add(unconnectedUrls, strndup(entry->url, 1024));
+ celix_arrayList_add(unconnectedUrls, url);
}
}
celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
@@ -301,16 +327,13 @@ void pubsub_zmqTopicReceiver_connectTo(
entry = calloc(1, sizeof(*entry));
entry->url = strndup(url, 1024*1024);
entry->connected = false;
+ entry->statically = false;
hashMap_put(receiver->requestedConnections.map, (void*)entry->url, entry);
- }
- if (!entry->connected) {
- if (zsock_connect(receiver->zmqSocket, "%s", url) == 0) {
- entry->connected = true;
- } else {
- L_WARN("[PSA_ZMQ] Error connecting to zmq url %s. (%s)", url, strerror(errno));
- }
+ receiver->requestedConnections.allConnected = false;
}
celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
+
+ psa_zmq_connectToAllRequestedConnections(receiver);
}
void pubsub_zmqTopicReceiver_disconnectFrom(pubsub_zmq_topic_receiver_t *receiver, const char *url) {
@@ -351,6 +374,7 @@ static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const
entry = calloc(1, sizeof(*entry));
entry->usageCount = 1;
entry->svc = svc;
+ entry->initialized = false;
int rc = receiver->serializer->createSerializerMap(receiver->serializer->handle, (celix_bundle_t*)bnd, &entry->msgTypes);
if (rc == 0) {
@@ -396,7 +420,7 @@ static inline void processMsgForSubscriberEntry(pubsub_zmq_topic_receiver_t *rec
celix_status_t status = msgSer->deserialize(msgSer, payload, payloadSize, &deserializedMsg);
if(status == CELIX_SUCCESS) {
bool release = true;
- svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deserializedMsg, NULL, &release);
+ svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deserializedMsg, &release);
if (release) {
msgSer->freeMsg(msgSer->handle, deserializedMsg);
}
@@ -428,7 +452,23 @@ static void* psa_zmq_recvThread(void * data) {
bool running = receiver->recvThread.running;
celixThreadMutex_unlock(&receiver->recvThread.mutex);
+ celixThreadMutex_lock(&receiver->requestedConnections.mutex);
+ bool allConnected = receiver->requestedConnections.allConnected;
+ celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
+
+ celixThreadMutex_lock(&receiver->subscribers.mutex);
+ bool allInitialized = receiver->subscribers.allInitialized;
+ celixThreadMutex_unlock(&receiver->subscribers.mutex);
+
+
while (running) {
+ if (!allConnected) {
+ psa_zmq_connectToAllRequestedConnections(receiver);
+ }
+ if (!allInitialized) {
+ psa_zmq_initializeAllSubscribers(receiver);
+ }
+
zmsg_t *zmsg = zmsg_recv(receiver->zmqSocket);
if (zmsg != NULL) {
if (zmsg_size(zmsg) != 3) {
@@ -458,7 +498,62 @@ static void* psa_zmq_recvThread(void * data) {
celixThreadMutex_lock(&receiver->recvThread.mutex);
running = receiver->recvThread.running;
celixThreadMutex_unlock(&receiver->recvThread.mutex);
+
+ celixThreadMutex_lock(&receiver->requestedConnections.mutex);
+ allConnected = receiver->requestedConnections.allConnected;
+ celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
+
+ celixThreadMutex_lock(&receiver->subscribers.mutex);
+ allInitialized = receiver->subscribers.allInitialized;
+ celixThreadMutex_unlock(&receiver->subscribers.mutex);
} // while
return NULL;
}
+
+
+static void psa_zmq_connectToAllRequestedConnections(pubsub_zmq_topic_receiver_t *receiver) {
+ celixThreadMutex_lock(&receiver->requestedConnections.mutex);
+ if (!receiver->requestedConnections.allConnected) {
+ bool allConnected = true;
+ hash_map_iterator_t iter = hashMapIterator_construct(receiver->requestedConnections.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ psa_zmq_requested_connection_entry_t *entry = hashMapIterator_nextValue(&iter);
+ if (!entry->connected){
+ if (zsock_connect(receiver->zmqSocket, "%s", entry->url) == 0) {
+ entry->connected = true;
+ } else {
+ L_WARN("[PSA_ZMQ] Error connecting to zmq url %s. (%s)", entry->url, strerror(errno));
+ allConnected = false;
+ }
+ }
+ }
+ receiver->requestedConnections.allConnected = allConnected;
+ }
+ celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
+}
+
+static void psa_zmq_initializeAllSubscribers(pubsub_zmq_topic_receiver_t *receiver) {
+ celixThreadMutex_lock(&receiver->subscribers.mutex);
+ if (!receiver->subscribers.allInitialized) {
+ bool allInitialized = true;
+ hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ psa_zmq_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
+ if (!entry->initialized) {
+ int rc = 0;
+ if (entry->svc != NULL && entry->svc->init != NULL) {
+ rc = entry->svc->init(entry->svc->handle);
+ }
+ if (rc == 0) {
+ entry->initialized = true;
+ } else {
+ L_WARN("Cannot initialize subscriber svc. Got rc %i", rc);
+ allInitialized = false;
+ }
+ }
+ }
+ receiver->subscribers.allInitialized = allInitialized;
+ }
+ celixThreadMutex_unlock(&receiver->subscribers.mutex);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.h b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.h
index 9049153..d093bfd 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.h
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.h
@@ -27,6 +27,7 @@ pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context
log_helper_t *logHelper,
const char *scope,
const char *topic,
+ const char *staticConnectUrls,
long serializerSvcId,
pubsub_serializer_service_t *serializer);
void pubsub_zmqTopicReceiver_destroy(pubsub_zmq_topic_receiver_t *receiver);
http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
index f6221a2..9ad7783 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
@@ -95,6 +95,7 @@ pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create(
long serializerSvcId,
pubsub_serializer_service_t *ser,
const char *bindIP,
+ const char *staticBindUrl,
unsigned int basePort,
unsigned int maxPort) {
pubsub_zmq_topic_sender_t *sender = calloc(1, sizeof(*sender));
@@ -172,29 +173,38 @@ pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create(
}
#endif
- int rv = -1, retry=0;
- while(rv==-1 && retry < ZMQ_BIND_MAX_RETRY ) {
- /* Randomized part due to same bundle publishing on different topics */
- unsigned int port = rand_range(basePort,maxPort);
+ if (staticBindUrl != NULL) {
+ int rv = zsock_bind (socket, "%s", staticBindUrl);
+ if (rv == -1) {
+ L_WARN("Error for zmq_bind using static bind url '%s'. %s", staticBindUrl, strerror(errno));
+ } else {
+ sender->url = strndup(staticBindUrl, 1024*1024);
+ }
+ } else {
- size_t len = (size_t)snprintf(NULL, 0, "tcp://%s:%u", bindIP, port) + 1;
- char *url = calloc(len, sizeof(char*));
- snprintf(url, len, "tcp://%s:%u", bindIP, port);
+ int retry = 0;
+ while (sender->url == NULL && retry < ZMQ_BIND_MAX_RETRY) {
+ /* Randomized part due to same bundle publishing on different topics */
+ unsigned int port = rand_range(basePort, maxPort);
- len = (size_t)snprintf(NULL, 0, "tcp://0.0.0.0:%u", port) + 1;
- char *bindUrl = calloc(len, sizeof(char));
- snprintf(bindUrl, len, "tcp://0.0.0.0:%u", port);
+ char *url = NULL;
+ asprintf(&url, "tcp://%s:%u", bindIP, port);
- rv = zsock_bind (socket, "%s", bindUrl);
- if (rv == -1) {
- perror("Error for zmq_bind");
- free(url);
- } else {
- sender->url = url;
- sender->zmq.socket = socket;
+ char *bindUrl = NULL;
+ asprintf(&bindUrl, "tcp://0.0.0.0:%u", port);
+
+
+ int rv = zsock_bind(socket, "%s", bindUrl);
+ if (rv == -1) {
+ L_WARN("Error for zmq_bind using dynamic bind url '%s'. %s", bindUrl, strerror(errno));
+ free(url);
+ } else {
+ sender->url = url;
+ sender->zmq.socket = socket;
+ }
+ retry++;
+ free(bindUrl);
}
- retry++;
- free(bindUrl);
}
}
@@ -305,7 +315,6 @@ static void* psa_zmq_getPublisherService(void *handle, const celix_bundle_t *req
entry->service.handle = entry;
entry->service.localMsgTypeIdForMsgType = psa_zmq_localMsgTypeIdForMsgType;
entry->service.send = psa_zmq_topicPublicationSend;
- entry->service.sendMultipart = NULL; //not supported TODO remove
hashMap_put(sender->boundedServices.map, (void*)bndId, entry);
} else {
L_ERROR("Error creating serializer map for ZMQ TopicSender %s/%s", sender->scope, sender->topic);
http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.h b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.h
index e537edd..af63870 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.h
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.h
@@ -31,6 +31,7 @@ pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create(
long serializerSvcId,
pubsub_serializer_service_t *ser,
const char *bindIP,
+ const char *staticBindUrl,
unsigned int basePort,
unsigned int maxPort);
void pubsub_zmqTopicSender_destroy(pubsub_zmq_topic_sender_t *sender);