You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by er...@apache.org on 2018/02/06 20:12:23 UTC

celix git commit: Added shell command to pubsub to get overview of pubs and subs, fixed some code warnings

Repository: celix
Updated Branches:
  refs/heads/develop 6cd9df7ec -> 4b8222dc1


Added shell command to pubsub to get overview of pubs and subs, fixed some code warnings


Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/4b8222dc
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/4b8222dc
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/4b8222dc

Branch: refs/heads/develop
Commit: 4b8222dc11e7ae6aa75d5648bfd940e72518f755
Parents: 6cd9df7
Author: Erjan Altena <er...@gmail.com>
Authored: Tue Feb 6 21:11:07 2018 +0100
Committer: Erjan Altena <er...@gmail.com>
Committed: Tue Feb 6 21:11:50 2018 +0100

----------------------------------------------------------------------
 pubsub/pubsub_topology_manager/CMakeLists.txt   |  2 +-
 .../src/pubsub_topology_manager.c               | 96 ++++++++++++++------
 .../src/pubsub_topology_manager.h               |  5 +
 3 files changed, 72 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/celix/blob/4b8222dc/pubsub/pubsub_topology_manager/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_topology_manager/CMakeLists.txt b/pubsub/pubsub_topology_manager/CMakeLists.txt
index 7ae00f6..9cb452b 100644
--- a/pubsub/pubsub_topology_manager/CMakeLists.txt
+++ b/pubsub/pubsub_topology_manager/CMakeLists.txt
@@ -23,7 +23,7 @@ add_celix_bundle(celix_pubsub_topology_manager
     	src/pubsub_topology_manager.c
     	src/pubsub_topology_manager.h
 )
-target_link_libraries(celix_pubsub_topology_manager PRIVATE Celix::framework Celix::log_helper Celix::pubsub_spi)
+target_link_libraries(celix_pubsub_topology_manager PRIVATE Celix::framework Celix::log_helper Celix::pubsub_spi Celix::shell_api)
 get_target_property(DESC Celix::pubsub_spi TOPIC_INFO_DESCRIPTOR)
 celix_bundle_files(celix_pubsub_topology_manager
 	${DESC}

http://git-wip-us.apache.org/repos/asf/celix/blob/4b8222dc/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c b/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
index a63b275..71a9ad9 100644
--- a/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
+++ b/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
@@ -32,22 +32,53 @@
 #include "array_list.h"
 #include "bundle_context.h"
 #include "constants.h"
-#include "module.h"
-#include "bundle.h"
-#include "filter.h"
 #include "listener_hook_service.h"
 #include "utils.h"
-#include "service_reference.h"
-#include "service_registration.h"
 #include "log_service.h"
 #include "log_helper.h"
 
 #include "publisher_endpoint_announce.h"
 #include "pubsub_topology_manager.h"
-#include "pubsub_endpoint.h"
 #include "pubsub_admin.h"
-#include "pubsub_utils.h"
 
+static void print_endpoint_info(hash_map_pt endpoints, FILE *outStream) {
+	for(hash_map_iterator_t iter = hashMapIterator_construct(endpoints); hashMapIterator_hasNext(&iter);) {
+		const char* key = (const char*)hashMapIterator_nextKey(&iter);
+		fprintf(outStream, "    Topic=%s\n", key);
+		array_list_pt ep_list = hashMap_get(endpoints, key);
+		for(unsigned int i = 0; i < arrayList_size(ep_list); ++i) {
+			pubsub_endpoint_pt ep = arrayList_get(ep_list, i);
+			fprintf(outStream, "        Endpoint %d\n", i);
+			fprintf(outStream, "            Endpoint properties\n");
+			const char *propKey;
+			if(ep->endpoint_props) {
+				PROPERTIES_FOR_EACH(ep->endpoint_props, propKey) {
+					fprintf(outStream, "                %s => %s\n", propKey, properties_get(ep->endpoint_props, propKey));
+				}
+			}
+			if(ep->topic_props) {
+				fprintf(outStream, "            Topic properties\n");
+				PROPERTIES_FOR_EACH(ep->topic_props, propKey) {
+					fprintf(outStream, "                %s => %s\n", propKey, properties_get(ep->topic_props, propKey));
+				}
+			}
+		}
+	}
+
+}
+
+static celix_status_t shellCommand(void *handle, char * commandLine, FILE *outStream, FILE *errorStream) {
+	pubsub_topology_manager_pt manager = (pubsub_topology_manager_pt) handle;
+	if (manager->publications && !hashMap_isEmpty(manager->publications)) {
+		fprintf(outStream, "Publications:\n");
+		print_endpoint_info(manager->publications, outStream);
+	}
+	if (manager->subscriptions && !hashMap_isEmpty(manager->subscriptions)) {
+		fprintf(outStream, "Subscriptions:\n");
+		print_endpoint_info(manager->subscriptions, outStream);
+	}
+	return CELIX_SUCCESS;
+}
 
 celix_status_t pubsub_topologyManager_create(bundle_context_pt context, log_helper_pt logHelper, pubsub_topology_manager_pt *manager) {
 	celix_status_t status = CELIX_SUCCESS;
@@ -62,12 +93,12 @@ celix_status_t pubsub_topologyManager_create(bundle_context_pt context, log_help
 	celix_thread_mutexattr_t psaAttr;
 	celixThreadMutexAttr_create(&psaAttr);
 	celixThreadMutexAttr_settype(&psaAttr, CELIX_THREAD_MUTEX_RECURSIVE);
-	status = celixThreadMutex_create(&(*manager)->psaListLock, &psaAttr);
+	status |= celixThreadMutex_create(&(*manager)->psaListLock, &psaAttr);
 	celixThreadMutexAttr_destroy(&psaAttr);
 
-	status = celixThreadMutex_create(&(*manager)->publicationsLock, NULL);
-	status = celixThreadMutex_create(&(*manager)->subscriptionsLock, NULL);
-	status = celixThreadMutex_create(&(*manager)->discoveryListLock, NULL);
+	status |= celixThreadMutex_create(&(*manager)->publicationsLock, NULL);
+	status |= celixThreadMutex_create(&(*manager)->subscriptionsLock, NULL);
+	status |= celixThreadMutex_create(&(*manager)->discoveryListLock, NULL);
 
 	arrayList_create(&(*manager)->psaList);
 
@@ -76,7 +107,14 @@ celix_status_t pubsub_topologyManager_create(bundle_context_pt context, log_help
 	(*manager)->subscriptions = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
 
 	(*manager)->loghelper = logHelper;
-
+	(*manager)->shellCmdService.handle = *manager;
+	(*manager)->shellCmdService.executeCommand = shellCommand;
+
+	properties_pt shellProps = properties_create();
+	properties_set(shellProps, OSGI_SHELL_COMMAND_NAME, "ps_info");
+	properties_set(shellProps, OSGI_SHELL_COMMAND_USAGE, "ps_info");
+	properties_set(shellProps, OSGI_SHELL_COMMAND_DESCRIPTION, "ps_info: Overview of PubSub");
+	bundleContext_registerService(context, OSGI_SHELL_COMMAND_SERVICE_NAME, &((*manager)->shellCmdService), shellProps, &((*manager)->shellCmdReg));
 	return status;
 }
 
@@ -97,7 +135,7 @@ celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_pt manager
 	hash_map_iterator_pt pubit = hashMapIterator_create(manager->publications);
 	while(hashMapIterator_hasNext(pubit)){
 		array_list_pt l = (array_list_pt)hashMapIterator_nextValue(pubit);
-		int i;
+		unsigned int i;
 		for(i=0;i<arrayList_size(l);i++){
 			pubsubEndpoint_destroy((pubsub_endpoint_pt)arrayList_get(l,i));
 		}
@@ -112,7 +150,7 @@ celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_pt manager
 	hash_map_iterator_pt subit = hashMapIterator_create(manager->subscriptions);
 	while(hashMapIterator_hasNext(subit)){
 		array_list_pt l = (array_list_pt)hashMapIterator_nextValue(subit);
-		int i;
+		unsigned int i;
 		for(i=0;i<arrayList_size(l);i++){
 			pubsubEndpoint_destroy((pubsub_endpoint_pt)arrayList_get(l,i));
 		}
@@ -122,7 +160,7 @@ celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_pt manager
 	hashMap_destroy(manager->subscriptions, true, false);
 	celixThreadMutex_unlock(&manager->subscriptionsLock);
 	celixThreadMutex_destroy(&manager->subscriptionsLock);
-
+	serviceRegistration_unregister(manager->shellCmdReg);
 	free(manager);
 
 	return status;
@@ -131,7 +169,7 @@ celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_pt manager
 celix_status_t pubsub_topologyManager_psaAdded(void * handle, service_reference_pt reference, void * service) {
 	celix_status_t status = CELIX_SUCCESS;
 	pubsub_topology_manager_pt manager = handle;
-	int i;
+	unsigned int i;
 
 	pubsub_admin_service_pt psa = (pubsub_admin_service_pt) service;
 	logHelper_log(manager->loghelper, OSGI_LOGSERVICE_INFO, "PSTM: Added PSA");
@@ -211,7 +249,7 @@ celix_status_t pubsub_topologyManager_psaRemoved(void * handle, service_referenc
 				bundleContext_getService(manager->context, disc_sr, (void**) &disc);
 				const char* fwUUID = NULL;
 				bundleContext_getProperty(manager->context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID);
-				int i;
+				unsigned int i;
 				for(i=0;i<arrayList_size(pubEP_list);i++){
 					pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(pubEP_list,i);
 					if(strcmp(properties_get(pubEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID),fwUUID)==0){
@@ -273,7 +311,7 @@ celix_status_t pubsub_topologyManager_subscriberAdded(void * handle, service_ref
 
 		celixThreadMutex_unlock(&manager->subscriptionsLock);
 
-		int j;
+		unsigned int j;
 		double score = 0;
 		double best_score = 0;
 		pubsub_admin_service_pt best_psa = NULL;
@@ -328,7 +366,7 @@ celix_status_t pubsub_topologyManager_subscriberRemoved(void * handle, service_r
 	pubsub_endpoint_pt subcmp = NULL;
 	if(pubsubEndpoint_createFromServiceReference(reference,&subcmp,false) == CELIX_SUCCESS){
 
-		int j,k;
+		unsigned int j,k;
 
 		// Inform discoveries that we not interested in the topic any more
 		celixThreadMutex_lock(&manager->discoveryListLock);
@@ -411,7 +449,7 @@ celix_status_t pubsub_topologyManager_pubsubDiscoveryAdded(void* handle, service
 	hash_map_iterator_pt iter = hashMapIterator_create(manager->publications);
 	while(hashMapIterator_hasNext(iter)){
 		array_list_pt pubEP_list = (array_list_pt)hashMapIterator_nextValue(iter);
-		for(int i = 0; i < arrayList_size(pubEP_list); i++) {
+		for(unsigned int i = 0; i < arrayList_size(pubEP_list); i++) {
 			pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(pubEP_list,i);
 			if( (strcmp(properties_get(pubEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID),fwUUID)==0) && (properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL)!=NULL)){
 				status += disc->announcePublisher(disc->handle,pubEP);
@@ -427,7 +465,7 @@ celix_status_t pubsub_topologyManager_pubsubDiscoveryAdded(void* handle, service
 
 	while(hashMapIterator_hasNext(iter)) {
 		array_list_pt l = (array_list_pt)hashMapIterator_nextValue(iter);
-		int i;
+		unsigned int i;
 		for(i=0;i<arrayList_size(l);i++){
 			pubsub_endpoint_pt subEp = (pubsub_endpoint_pt)arrayList_get(l,i);
 
@@ -441,9 +479,7 @@ celix_status_t pubsub_topologyManager_pubsubDiscoveryAdded(void* handle, service
 }
 
 celix_status_t pubsub_topologyManager_pubsubDiscoveryModified(void * handle, service_reference_pt reference, void * service) {
-	celix_status_t status = CELIX_SUCCESS;
-
-	status = pubsub_topologyManager_pubsubDiscoveryRemoved(handle, reference, service);
+	celix_status_t status = pubsub_topologyManager_pubsubDiscoveryRemoved(handle, reference, service);
 	if (status == CELIX_SUCCESS) {
 		status = pubsub_topologyManager_pubsubDiscoveryAdded(handle, reference, service);
 	}
@@ -474,7 +510,7 @@ celix_status_t pubsub_topologyManager_publisherTrackerAdded(void *handle, array_
 	celix_status_t status = CELIX_SUCCESS;
 	pubsub_topology_manager_pt manager = handle;
 
-	int l_index;
+	unsigned int l_index;
 
 	for (l_index = 0; l_index < arrayList_size(listeners); l_index++) {
 
@@ -495,7 +531,7 @@ celix_status_t pubsub_topologyManager_publisherTrackerAdded(void *handle, array_
 
 			celixThreadMutex_unlock(&manager->publicationsLock);
 
-			int j;
+			unsigned int j;
 			double score = 0;
 			double best_score = 0;
 			pubsub_admin_service_pt best_psa = NULL;
@@ -542,7 +578,7 @@ celix_status_t pubsub_topologyManager_publisherTrackerRemoved(void *handle, arra
 	celix_status_t status = CELIX_SUCCESS;
 	pubsub_topology_manager_pt manager = handle;
 
-	int l_index;
+	unsigned int l_index;
 
 	for (l_index = 0; l_index < arrayList_size(listeners); l_index++) {
 
@@ -552,7 +588,7 @@ celix_status_t pubsub_topologyManager_publisherTrackerRemoved(void *handle, arra
 		if(pubsubEndpoint_createFromListenerHookInfo(info,&pubcmp,true) == CELIX_SUCCESS){
 
 
-			int j,k;
+			unsigned int j,k;
 			celixThreadMutex_lock(&manager->psaListLock);
 			celixThreadMutex_lock(&manager->publicationsLock);
 
@@ -638,7 +674,7 @@ celix_status_t pubsub_topologyManager_announcePublisher(void *handle, pubsub_end
 	pubsubEndpoint_clone(pubEP, &p);
 	arrayList_add(pub_list_by_topic,p);
 
-	int j;
+	unsigned int j;
 	double score = 0;
 	double best_score = 0;
 	pubsub_admin_service_pt best_psa = NULL;
@@ -675,7 +711,7 @@ celix_status_t pubsub_topologyManager_removePublisher(void *handle, pubsub_endpo
 	pubsub_topology_manager_pt manager = handle;
 	celixThreadMutex_lock(&manager->psaListLock);
 	celixThreadMutex_lock(&manager->publicationsLock);
-	int i;
+	unsigned int i;
 
 	char *pub_key = createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE), properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
 	array_list_pt pub_list_by_topic = hashMap_get(manager->publications,pub_key);

http://git-wip-us.apache.org/repos/asf/celix/blob/4b8222dc/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h b/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
index 0074a75..cdcc651 100644
--- a/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
+++ b/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
@@ -30,6 +30,7 @@
 #include "service_reference.h"
 #include "bundle_context.h"
 #include "log_helper.h"
+#include "command.h"
 
 #include "pubsub_common.h"
 #include "pubsub_endpoint.h"
@@ -52,6 +53,10 @@ struct pubsub_topology_manager {
 	celix_thread_mutex_t subscriptionsLock;
 	hash_map_pt subscriptions; //<topic(string),list<pubsub_ep>>
 
+	command_service_t shellCmdService;
+	service_registration_pt  shellCmdReg;
+
+
 	log_helper_pt loghelper;
 };