You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by er...@apache.org on 2018/02/06 20:12:23 UTC
celix git commit: Added shell command to pubsub to get overview of
pubs and subs, fixed some code warnings
Repository: celix
Updated Branches:
refs/heads/develop 6cd9df7ec -> 4b8222dc1
Added shell command to pubsub to get overview of pubs and subs, fixed some code warnings
Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/4b8222dc
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/4b8222dc
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/4b8222dc
Branch: refs/heads/develop
Commit: 4b8222dc11e7ae6aa75d5648bfd940e72518f755
Parents: 6cd9df7
Author: Erjan Altena <er...@gmail.com>
Authored: Tue Feb 6 21:11:07 2018 +0100
Committer: Erjan Altena <er...@gmail.com>
Committed: Tue Feb 6 21:11:50 2018 +0100
----------------------------------------------------------------------
pubsub/pubsub_topology_manager/CMakeLists.txt | 2 +-
.../src/pubsub_topology_manager.c | 96 ++++++++++++++------
.../src/pubsub_topology_manager.h | 5 +
3 files changed, 72 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/celix/blob/4b8222dc/pubsub/pubsub_topology_manager/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_topology_manager/CMakeLists.txt b/pubsub/pubsub_topology_manager/CMakeLists.txt
index 7ae00f6..9cb452b 100644
--- a/pubsub/pubsub_topology_manager/CMakeLists.txt
+++ b/pubsub/pubsub_topology_manager/CMakeLists.txt
@@ -23,7 +23,7 @@ add_celix_bundle(celix_pubsub_topology_manager
src/pubsub_topology_manager.c
src/pubsub_topology_manager.h
)
-target_link_libraries(celix_pubsub_topology_manager PRIVATE Celix::framework Celix::log_helper Celix::pubsub_spi)
+target_link_libraries(celix_pubsub_topology_manager PRIVATE Celix::framework Celix::log_helper Celix::pubsub_spi Celix::shell_api)
get_target_property(DESC Celix::pubsub_spi TOPIC_INFO_DESCRIPTOR)
celix_bundle_files(celix_pubsub_topology_manager
${DESC}
http://git-wip-us.apache.org/repos/asf/celix/blob/4b8222dc/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c b/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
index a63b275..71a9ad9 100644
--- a/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
+++ b/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
@@ -32,22 +32,53 @@
#include "array_list.h"
#include "bundle_context.h"
#include "constants.h"
-#include "module.h"
-#include "bundle.h"
-#include "filter.h"
#include "listener_hook_service.h"
#include "utils.h"
-#include "service_reference.h"
-#include "service_registration.h"
#include "log_service.h"
#include "log_helper.h"
#include "publisher_endpoint_announce.h"
#include "pubsub_topology_manager.h"
-#include "pubsub_endpoint.h"
#include "pubsub_admin.h"
-#include "pubsub_utils.h"
+static void print_endpoint_info(hash_map_pt endpoints, FILE *outStream) {
+ for(hash_map_iterator_t iter = hashMapIterator_construct(endpoints); hashMapIterator_hasNext(&iter);) {
+ const char* key = (const char*)hashMapIterator_nextKey(&iter);
+ fprintf(outStream, " Topic=%s\n", key);
+ array_list_pt ep_list = hashMap_get(endpoints, key);
+ for(unsigned int i = 0; i < arrayList_size(ep_list); ++i) {
+ pubsub_endpoint_pt ep = arrayList_get(ep_list, i);
+ fprintf(outStream, " Endpoint %d\n", i);
+ fprintf(outStream, " Endpoint properties\n");
+ const char *propKey;
+ if(ep->endpoint_props) {
+ PROPERTIES_FOR_EACH(ep->endpoint_props, propKey) {
+ fprintf(outStream, " %s => %s\n", propKey, properties_get(ep->endpoint_props, propKey));
+ }
+ }
+ if(ep->topic_props) {
+ fprintf(outStream, " Topic properties\n");
+ PROPERTIES_FOR_EACH(ep->topic_props, propKey) {
+ fprintf(outStream, " %s => %s\n", propKey, properties_get(ep->topic_props, propKey));
+ }
+ }
+ }
+ }
+
+}
+
+static celix_status_t shellCommand(void *handle, char * commandLine, FILE *outStream, FILE *errorStream) {
+ pubsub_topology_manager_pt manager = (pubsub_topology_manager_pt) handle;
+ if (manager->publications && !hashMap_isEmpty(manager->publications)) {
+ fprintf(outStream, "Publications:\n");
+ print_endpoint_info(manager->publications, outStream);
+ }
+ if (manager->subscriptions && !hashMap_isEmpty(manager->subscriptions)) {
+ fprintf(outStream, "Subscriptions:\n");
+ print_endpoint_info(manager->subscriptions, outStream);
+ }
+ return CELIX_SUCCESS;
+}
celix_status_t pubsub_topologyManager_create(bundle_context_pt context, log_helper_pt logHelper, pubsub_topology_manager_pt *manager) {
celix_status_t status = CELIX_SUCCESS;
@@ -62,12 +93,12 @@ celix_status_t pubsub_topologyManager_create(bundle_context_pt context, log_help
celix_thread_mutexattr_t psaAttr;
celixThreadMutexAttr_create(&psaAttr);
celixThreadMutexAttr_settype(&psaAttr, CELIX_THREAD_MUTEX_RECURSIVE);
- status = celixThreadMutex_create(&(*manager)->psaListLock, &psaAttr);
+ status |= celixThreadMutex_create(&(*manager)->psaListLock, &psaAttr);
celixThreadMutexAttr_destroy(&psaAttr);
- status = celixThreadMutex_create(&(*manager)->publicationsLock, NULL);
- status = celixThreadMutex_create(&(*manager)->subscriptionsLock, NULL);
- status = celixThreadMutex_create(&(*manager)->discoveryListLock, NULL);
+ status |= celixThreadMutex_create(&(*manager)->publicationsLock, NULL);
+ status |= celixThreadMutex_create(&(*manager)->subscriptionsLock, NULL);
+ status |= celixThreadMutex_create(&(*manager)->discoveryListLock, NULL);
arrayList_create(&(*manager)->psaList);
@@ -76,7 +107,14 @@ celix_status_t pubsub_topologyManager_create(bundle_context_pt context, log_help
(*manager)->subscriptions = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
(*manager)->loghelper = logHelper;
-
+ (*manager)->shellCmdService.handle = *manager;
+ (*manager)->shellCmdService.executeCommand = shellCommand;
+
+ properties_pt shellProps = properties_create();
+ properties_set(shellProps, OSGI_SHELL_COMMAND_NAME, "ps_info");
+ properties_set(shellProps, OSGI_SHELL_COMMAND_USAGE, "ps_info");
+ properties_set(shellProps, OSGI_SHELL_COMMAND_DESCRIPTION, "ps_info: Overview of PubSub");
+ bundleContext_registerService(context, OSGI_SHELL_COMMAND_SERVICE_NAME, &((*manager)->shellCmdService), shellProps, &((*manager)->shellCmdReg));
return status;
}
@@ -97,7 +135,7 @@ celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_pt manager
hash_map_iterator_pt pubit = hashMapIterator_create(manager->publications);
while(hashMapIterator_hasNext(pubit)){
array_list_pt l = (array_list_pt)hashMapIterator_nextValue(pubit);
- int i;
+ unsigned int i;
for(i=0;i<arrayList_size(l);i++){
pubsubEndpoint_destroy((pubsub_endpoint_pt)arrayList_get(l,i));
}
@@ -112,7 +150,7 @@ celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_pt manager
hash_map_iterator_pt subit = hashMapIterator_create(manager->subscriptions);
while(hashMapIterator_hasNext(subit)){
array_list_pt l = (array_list_pt)hashMapIterator_nextValue(subit);
- int i;
+ unsigned int i;
for(i=0;i<arrayList_size(l);i++){
pubsubEndpoint_destroy((pubsub_endpoint_pt)arrayList_get(l,i));
}
@@ -122,7 +160,7 @@ celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_pt manager
hashMap_destroy(manager->subscriptions, true, false);
celixThreadMutex_unlock(&manager->subscriptionsLock);
celixThreadMutex_destroy(&manager->subscriptionsLock);
-
+ serviceRegistration_unregister(manager->shellCmdReg);
free(manager);
return status;
@@ -131,7 +169,7 @@ celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_pt manager
celix_status_t pubsub_topologyManager_psaAdded(void * handle, service_reference_pt reference, void * service) {
celix_status_t status = CELIX_SUCCESS;
pubsub_topology_manager_pt manager = handle;
- int i;
+ unsigned int i;
pubsub_admin_service_pt psa = (pubsub_admin_service_pt) service;
logHelper_log(manager->loghelper, OSGI_LOGSERVICE_INFO, "PSTM: Added PSA");
@@ -211,7 +249,7 @@ celix_status_t pubsub_topologyManager_psaRemoved(void * handle, service_referenc
bundleContext_getService(manager->context, disc_sr, (void**) &disc);
const char* fwUUID = NULL;
bundleContext_getProperty(manager->context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID);
- int i;
+ unsigned int i;
for(i=0;i<arrayList_size(pubEP_list);i++){
pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(pubEP_list,i);
if(strcmp(properties_get(pubEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID),fwUUID)==0){
@@ -273,7 +311,7 @@ celix_status_t pubsub_topologyManager_subscriberAdded(void * handle, service_ref
celixThreadMutex_unlock(&manager->subscriptionsLock);
- int j;
+ unsigned int j;
double score = 0;
double best_score = 0;
pubsub_admin_service_pt best_psa = NULL;
@@ -328,7 +366,7 @@ celix_status_t pubsub_topologyManager_subscriberRemoved(void * handle, service_r
pubsub_endpoint_pt subcmp = NULL;
if(pubsubEndpoint_createFromServiceReference(reference,&subcmp,false) == CELIX_SUCCESS){
- int j,k;
+ unsigned int j,k;
// Inform discoveries that we not interested in the topic any more
celixThreadMutex_lock(&manager->discoveryListLock);
@@ -411,7 +449,7 @@ celix_status_t pubsub_topologyManager_pubsubDiscoveryAdded(void* handle, service
hash_map_iterator_pt iter = hashMapIterator_create(manager->publications);
while(hashMapIterator_hasNext(iter)){
array_list_pt pubEP_list = (array_list_pt)hashMapIterator_nextValue(iter);
- for(int i = 0; i < arrayList_size(pubEP_list); i++) {
+ for(unsigned int i = 0; i < arrayList_size(pubEP_list); i++) {
pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(pubEP_list,i);
if( (strcmp(properties_get(pubEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID),fwUUID)==0) && (properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL)!=NULL)){
status += disc->announcePublisher(disc->handle,pubEP);
@@ -427,7 +465,7 @@ celix_status_t pubsub_topologyManager_pubsubDiscoveryAdded(void* handle, service
while(hashMapIterator_hasNext(iter)) {
array_list_pt l = (array_list_pt)hashMapIterator_nextValue(iter);
- int i;
+ unsigned int i;
for(i=0;i<arrayList_size(l);i++){
pubsub_endpoint_pt subEp = (pubsub_endpoint_pt)arrayList_get(l,i);
@@ -441,9 +479,7 @@ celix_status_t pubsub_topologyManager_pubsubDiscoveryAdded(void* handle, service
}
celix_status_t pubsub_topologyManager_pubsubDiscoveryModified(void * handle, service_reference_pt reference, void * service) {
- celix_status_t status = CELIX_SUCCESS;
-
- status = pubsub_topologyManager_pubsubDiscoveryRemoved(handle, reference, service);
+ celix_status_t status = pubsub_topologyManager_pubsubDiscoveryRemoved(handle, reference, service);
if (status == CELIX_SUCCESS) {
status = pubsub_topologyManager_pubsubDiscoveryAdded(handle, reference, service);
}
@@ -474,7 +510,7 @@ celix_status_t pubsub_topologyManager_publisherTrackerAdded(void *handle, array_
celix_status_t status = CELIX_SUCCESS;
pubsub_topology_manager_pt manager = handle;
- int l_index;
+ unsigned int l_index;
for (l_index = 0; l_index < arrayList_size(listeners); l_index++) {
@@ -495,7 +531,7 @@ celix_status_t pubsub_topologyManager_publisherTrackerAdded(void *handle, array_
celixThreadMutex_unlock(&manager->publicationsLock);
- int j;
+ unsigned int j;
double score = 0;
double best_score = 0;
pubsub_admin_service_pt best_psa = NULL;
@@ -542,7 +578,7 @@ celix_status_t pubsub_topologyManager_publisherTrackerRemoved(void *handle, arra
celix_status_t status = CELIX_SUCCESS;
pubsub_topology_manager_pt manager = handle;
- int l_index;
+ unsigned int l_index;
for (l_index = 0; l_index < arrayList_size(listeners); l_index++) {
@@ -552,7 +588,7 @@ celix_status_t pubsub_topologyManager_publisherTrackerRemoved(void *handle, arra
if(pubsubEndpoint_createFromListenerHookInfo(info,&pubcmp,true) == CELIX_SUCCESS){
- int j,k;
+ unsigned int j,k;
celixThreadMutex_lock(&manager->psaListLock);
celixThreadMutex_lock(&manager->publicationsLock);
@@ -638,7 +674,7 @@ celix_status_t pubsub_topologyManager_announcePublisher(void *handle, pubsub_end
pubsubEndpoint_clone(pubEP, &p);
arrayList_add(pub_list_by_topic,p);
- int j;
+ unsigned int j;
double score = 0;
double best_score = 0;
pubsub_admin_service_pt best_psa = NULL;
@@ -675,7 +711,7 @@ celix_status_t pubsub_topologyManager_removePublisher(void *handle, pubsub_endpo
pubsub_topology_manager_pt manager = handle;
celixThreadMutex_lock(&manager->psaListLock);
celixThreadMutex_lock(&manager->publicationsLock);
- int i;
+ unsigned int i;
char *pub_key = createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE), properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
array_list_pt pub_list_by_topic = hashMap_get(manager->publications,pub_key);
http://git-wip-us.apache.org/repos/asf/celix/blob/4b8222dc/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h b/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
index 0074a75..cdcc651 100644
--- a/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
+++ b/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
@@ -30,6 +30,7 @@
#include "service_reference.h"
#include "bundle_context.h"
#include "log_helper.h"
+#include "command.h"
#include "pubsub_common.h"
#include "pubsub_endpoint.h"
@@ -52,6 +53,10 @@ struct pubsub_topology_manager {
celix_thread_mutex_t subscriptionsLock;
hash_map_pt subscriptions; //<topic(string),list<pubsub_ep>>
+ command_service_t shellCmdService;
+ service_registration_pt shellCmdReg;
+
+
log_helper_pt loghelper;
};