You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by ab...@apache.org on 2014/07/25 14:06:45 UTC
svn commit: r1613403 - in /celix/trunk/remote_services/discovery_configured:
./ private/ private/include/ private/src/
Author: abroekhuis
Date: Fri Jul 25 12:06:44 2014
New Revision: 1613403
URL: http://svn.apache.org/r1613403
Log:
CELIX-130: Added missing files.
Added:
celix/trunk/remote_services/discovery_configured/
celix/trunk/remote_services/discovery_configured/CMakeLists.txt
celix/trunk/remote_services/discovery_configured/private/
celix/trunk/remote_services/discovery_configured/private/include/
celix/trunk/remote_services/discovery_configured/private/include/discovery.h
celix/trunk/remote_services/discovery_configured/private/include/endpoint_discovery_poller.h
celix/trunk/remote_services/discovery_configured/private/src/
celix/trunk/remote_services/discovery_configured/private/src/desc.xml
celix/trunk/remote_services/discovery_configured/private/src/discovery.c
celix/trunk/remote_services/discovery_configured/private/src/discovery_activator.c
celix/trunk/remote_services/discovery_configured/private/src/endpoint_description_reader.c
celix/trunk/remote_services/discovery_configured/private/src/endpoint_discovery_poller.c
Added: celix/trunk/remote_services/discovery_configured/CMakeLists.txt
URL: http://svn.apache.org/viewvc/celix/trunk/remote_services/discovery_configured/CMakeLists.txt?rev=1613403&view=auto
==============================================================================
--- celix/trunk/remote_services/discovery_configured/CMakeLists.txt (added)
+++ celix/trunk/remote_services/discovery_configured/CMakeLists.txt Fri Jul 25 12:06:44 2014
@@ -0,0 +1,51 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+#TODO find_package(DNS-SD REQUIRED)
+
+find_package(CURL REQUIRED)
+find_package(LibXml2 REQUIRED)
+include_directories(${LIBXML2_INCLUDE_DIR})
+
+
+include_directories("/usr/include") #TODO check if this has impact on the generated project indexer paths
+include_directories("${PROJECT_SOURCE_DIR}/utils/public/include")
+include_directories("${PROJECT_SOURCE_DIR}/remote_services/utils/public/include")
+include_directories("${PROJECT_SOURCE_DIR}/remote_services/discovery_configured/private/include")
+include_directories("${PROJECT_SOURCE_DIR}/remote_services/endpoint_listener/public/include")
+include_directories("${PROJECT_SOURCE_DIR}/remote_services/remote_service_admin/public/include")
+include_directories(private/include)
+
+include_directories("${CURL_INCLUDE_DIR}")
+
+SET_HEADER(BUNDLE_SYMBOLICNAME "apache_celix_rsa_discovery_configured")
+SET_HEADERS("Bundle-Name: Apache Celix RSA Configured Discovery")
+
+bundle(discovery_configured SOURCES
+ private/src/discovery.c
+ private/src/discovery_activator.c
+ private/src/endpoint_discovery_poller.c
+)
+
+install_bundle(discovery_configured)
+
+target_link_libraries(discovery_configured celix_framework ${CURL_LIBRARIES} ${APRUTIL_LIBRARY})
+
+add_executable(descparser private/src/endpoint_description_reader.c)
+
+target_link_libraries(descparser ${LIBXML2_LIBRARIES} celix_utils)
+
Added: celix/trunk/remote_services/discovery_configured/private/include/discovery.h
URL: http://svn.apache.org/viewvc/celix/trunk/remote_services/discovery_configured/private/include/discovery.h?rev=1613403&view=auto
==============================================================================
--- celix/trunk/remote_services/discovery_configured/private/include/discovery.h (added)
+++ celix/trunk/remote_services/discovery_configured/private/include/discovery.h Fri Jul 25 12:06:44 2014
@@ -0,0 +1,53 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership. The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * topology_manager.h
+ *
+ * \date Sep 29, 2011
+ * \author <a href="mailto:celix-dev@incubator.apache.org">Apache Celix Project Team</a>
+ * \copyright Apache License, Version 2.0
+ */
+
+#ifndef DISCOVERY_H_
+#define DISCOVERY_H_
+
+#include <apr_general.h>
+
+#include "bundle_context.h"
+#include "service_reference.h"
+
+#include "endpoint_listener.h"
+
+typedef struct discovery *discovery_pt;
+
+celix_status_t discovery_create(apr_pool_t *pool, bundle_context_pt context, discovery_pt *discovery);
+celix_status_t discovery_stop(discovery_pt discovery);
+
+celix_status_t discovery_endpointAdded(void *handle, endpoint_description_pt endpoint, char *machtedFilter);
+celix_status_t discovery_endpointRemoved(void *handle, endpoint_description_pt endpoint, char *machtedFilter);
+
+celix_status_t discovery_endpointListenerAdding(void * handle, service_reference_pt reference, void **service);
+celix_status_t discovery_endpointListenerAdded(void * handle, service_reference_pt reference, void * service);
+celix_status_t discovery_endpointListenerModified(void * handle, service_reference_pt reference, void * service);
+celix_status_t discovery_endpointListenerRemoved(void * handle, service_reference_pt reference, void * service);
+
+celix_status_t discovery_updateEndpointListener(discovery_pt discovery, service_reference_pt reference, endpoint_listener_pt service);
+
+
+#endif /* DISCOVERY_H_ */
Added: celix/trunk/remote_services/discovery_configured/private/include/endpoint_discovery_poller.h
URL: http://svn.apache.org/viewvc/celix/trunk/remote_services/discovery_configured/private/include/endpoint_discovery_poller.h?rev=1613403&view=auto
==============================================================================
--- celix/trunk/remote_services/discovery_configured/private/include/endpoint_discovery_poller.h (added)
+++ celix/trunk/remote_services/discovery_configured/private/include/endpoint_discovery_poller.h Fri Jul 25 12:06:44 2014
@@ -0,0 +1,41 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership. The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * endpoint_discovery_poller.h
+ *
+ * \date 3 Jul 2014
+ * \author <a href="mailto:celix-dev@incubator.apache.org">Apache Celix Project Team</a>
+ * \copyright Apache License, Version 2.0
+ */
+
+#ifndef ENDPOINT_DISCOVERY_POLLER_H_
+#define ENDPOINT_DISCOVERY_POLLER_H_
+
+#include "celix_errno.h"
+#include "discovery.h"
+
+typedef struct endpoint_discovery_poller *endpoint_discovery_poller_pt;
+
+celix_status_t endpointDiscoveryPoller_create(discovery_pt discovery, endpoint_discovery_poller_pt *poller);
+celix_status_t endpointDiscoveryPoller_addDiscoveryEndpoint(endpoint_discovery_poller_pt poller, char *url);
+celix_status_t endpointDiscoveryPoller_removeDiscoveryEndpoint(endpoint_discovery_poller_pt poller, char *url);
+
+
+
+#endif /* ENDPOINT_DISCOVERY_POLLER_H_ */
Added: celix/trunk/remote_services/discovery_configured/private/src/desc.xml
URL: http://svn.apache.org/viewvc/celix/trunk/remote_services/discovery_configured/private/src/desc.xml?rev=1613403&view=auto
==============================================================================
--- celix/trunk/remote_services/discovery_configured/private/src/desc.xml (added)
+++ celix/trunk/remote_services/discovery_configured/private/src/desc.xml Fri Jul 25 12:06:44 2014
@@ -0,0 +1,23 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<endpoint-descriptions xmlns="http://www.osgi.org/xmlns/rsa/v1.0.0">
+ <endpoint-description>
+ <property name="service.intents">
+ <list>
+ <value>SOAP</value>
+ <value>HTTP</value>
+ </list>
+ </property>
+ <property name="endpoint.id" value="http://ws.acme.com:9000/hello" />
+ <property name="objectClass" value="com.acme.Foo" />
+ <property name="endpoint.package.version.com.acme" value="4.2" />
+ <property name="service.imported.configs" value="com.acme" />
+ <property name="com.acme.ws.xml">
+ <xml>
+ <config xmlns="http://acme.com/defs">
+ <port>1029</port>
+ <host>www.acme.com</host>
+ </config>
+ </xml>
+ </property>
+ </endpoint-description>
+</endpoint-descriptions>
\ No newline at end of file
Added: celix/trunk/remote_services/discovery_configured/private/src/discovery.c
URL: http://svn.apache.org/viewvc/celix/trunk/remote_services/discovery_configured/private/src/discovery.c?rev=1613403&view=auto
==============================================================================
--- celix/trunk/remote_services/discovery_configured/private/src/discovery.c (added)
+++ celix/trunk/remote_services/discovery_configured/private/src/discovery.c Fri Jul 25 12:06:44 2014
@@ -0,0 +1,516 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership. The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * discovery.c
+ *
+ * \date Sep 1, 2013
+ * \author <a href="mailto:celix-dev@incubator.apache.org">Apache Celix Project Team</a>
+ * \copyright Apache License, Version 2.0
+ */
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <stdbool.h>
+#include <apr_thread_proc.h>
+#include <apr_strings.h>
+#include <netdb.h>
+#include <netinet/in.h>
+
+#include <dns_sd.h>
+
+#include "constants.h"
+#include "bundle_context.h"
+#include "array_list.h"
+#include "utils.h"
+#include "celix_errno.h"
+#include "filter.h"
+#include "service_reference.h"
+#include "service_registration.h"
+#include "remote_constants.h"
+
+#include "discovery.h"
+#include "endpoint_discovery_poller.h"
+
+static const char * const OSGI_DISCOVERY_TYPE = "_osgid._udp";
+static const char * const OSGI_DISCOVERY_NAME = "Amdatu Remote Service Endpoint (Bonjour)";
+static void *APR_THREAD_FUNC discovery_pollDiscovery(apr_thread_t *thd, void *data);
+
+static celix_status_t discovery_start(discovery_pt discovery);
+static void discovery_browseDiscoveryCallback(DNSServiceRef sdRef, DNSServiceFlags flags,
+ uint32_t interfaceIndex, DNSServiceErrorType errorCode,
+ const char *serviceName, const char *regtype, const char *replyDomain,
+ void *context);
+
+
+static void discovery_browseCallback(DNSServiceRef sdRef, DNSServiceFlags flags,
+ uint32_t interfaceIndex, DNSServiceErrorType errorCode,
+ const char *serviceName, const char *regtype, const char *replyDomain,
+ void *context);
+static void discovery_resolveAddCallback(DNSServiceRef sdRef,
+ DNSServiceFlags flags, uint32_t interfaceIndex,
+ DNSServiceErrorType errorCode, const char *fullname,
+ const char *hosttarget, uint16_t port, /* In network byte order */
+ uint16_t txtLen, const unsigned char *txtRecord, void *context);
+static void discovery_resolveRemoveCallback(DNSServiceRef sdRef,
+ DNSServiceFlags flags, uint32_t interfaceIndex,
+ DNSServiceErrorType errorCode, const char *fullname,
+ const char *hosttarget, uint16_t port, /* In network byte order */
+ uint16_t txtLen, const unsigned char *txtRecord, void *context);
+static celix_status_t discovery_informEndpointListeners(discovery_pt discovery, endpoint_description_pt endpoint, bool addingService);
+
+static const char * const DEFAULT_DISCOVERY_PORT = "8889";
+static const char * const OSGI_SERVICE_TYPE = "_osgi._udp";
+
+typedef struct discovered_endpoint_entry {
+ apr_pool_t *pool;
+ endpoint_description_pt endpointDescription;
+} * discovered_endpoint_entry_pt;
+
+typedef struct disclosed_endpoint_entry {
+ apr_pool_t *pool;
+ endpoint_description_pt endpointDescription;
+ TXTRecordRef *txtRecord;
+ DNSServiceRef dnsServiceRef;
+} * disclosed_endpoint_entry_pt;
+
+
+struct discovery {
+ bundle_context_pt context;
+ apr_pool_t *pool;
+
+
+ apr_thread_mutex_t *listenerReferencesMutex;
+ apr_thread_mutex_t *discoveredServicesMutex;
+ apr_thread_mutex_t *disclosedServicesMutex;
+
+ hash_map_pt listenerReferences; //key=serviceReference, value=?? TODO
+ hash_map_pt discoveredServices; //key=endpointId (string), value=discovered_endpoint_entry_pt;
+ hash_map_pt disclosedServices; //key=endpointId (string), value=disclosed_endpoint_entry_pt;
+
+ volatile bool running;
+ apr_thread_t *poll;
+ apr_thread_t *pollDiscovery;
+ DNSServiceRef browseRef;
+ DNSServiceRef browseDiscoveryRef;
+ DNSServiceRef discoveryRef;
+
+ char *discoveryPort;
+ char *frameworkUuid;
+ endpoint_discovery_poller_pt poller;
+};
+
+celix_status_t discovery_create(apr_pool_t *pool, bundle_context_pt context, discovery_pt *discovery) {
+ celix_status_t status = CELIX_SUCCESS;
+
+ *discovery = apr_palloc(pool, sizeof(**discovery));
+ if (!*discovery) {
+ status = CELIX_ENOMEM;
+ } else {
+ (*discovery)->context = context;
+ (*discovery)->pool = pool;
+ (*discovery)->listenerReferences = hashMap_create(serviceReference_hashCode, NULL, serviceReference_equals2, NULL);
+ (*discovery)->discoveredServices = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+ (*discovery)->disclosedServices = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+ (*discovery)->running = true;
+ (*discovery)->browseRef = NULL;
+ (*discovery)->discoveryPort = NULL;
+ (*discovery)->listenerReferencesMutex = NULL;
+ (*discovery)->discoveredServicesMutex = NULL;
+ (*discovery)->disclosedServicesMutex = NULL;
+ (*discovery)->frameworkUuid = NULL;
+ (*discovery)->poller = NULL;
+
+ bundleContext_getProperty(context, OSGI_FRAMEWORK_FRAMEWORK_UUID, &(*discovery)->frameworkUuid);
+
+ CELIX_DO_IF(status, status = apr_thread_mutex_create(&(*discovery)->listenerReferencesMutex, APR_THREAD_MUTEX_DEFAULT, pool));
+ CELIX_DO_IF(status, status = apr_thread_mutex_create(&(*discovery)->discoveredServicesMutex, APR_THREAD_MUTEX_DEFAULT, pool));
+ CELIX_DO_IF(status, status = apr_thread_mutex_create(&(*discovery)->disclosedServicesMutex, APR_THREAD_MUTEX_DEFAULT, pool));
+
+ char *port = NULL;
+ bundleContext_getProperty(context, "DISCOVERY_PORT", &port);
+ if (port == NULL) {
+ (*discovery)->discoveryPort = (char *) DEFAULT_DISCOVERY_PORT;
+ } else {
+ (*discovery)->discoveryPort = apr_pstrdup(pool, port);
+ }
+
+ discovery_start(*discovery);
+ }
+
+ return status;
+}
+
+static celix_status_t discovery_start(discovery_pt discovery) {
+ celix_status_t status = CELIX_SUCCESS;
+
+ CELIX_DO_IF(status, status = endpointDiscoveryPoller_create(discovery, &discovery->poller));
+
+ char *path = NULL;
+ bundleContext_getProperty(discovery->context, "DISCOVERY_PATH", &path);
+ if (path == NULL) {
+ path = "path";
+ }
+
+ TXTRecordRef txtRecord;
+
+ TXTRecordCreate(&txtRecord, 512, NULL);
+ TXTRecordSetValue(&txtRecord, "path", strlen(path), path);
+
+ int port = atoi(discovery->discoveryPort);
+ int portInNetworkByteOrder = ((port << 8) & 0xFF00) | ((port >> 8) & 0xFF); //FIXME assuming little endian
+
+ DNSServiceErrorType error = DNSServiceRegister(&discovery->discoveryRef, 0, 0, OSGI_DISCOVERY_NAME, OSGI_DISCOVERY_TYPE, NULL, NULL, portInNetworkByteOrder,
+ TXTRecordGetLength(&txtRecord), TXTRecordGetBytesPtr(&txtRecord), NULL, NULL);
+ if (error != kDNSServiceErr_NoError) {
+ status = CELIX_ILLEGAL_STATE;
+ printf("============= 11 ERROR %d\n", error);
+ }
+
+ error = DNSServiceBrowse(&discovery->browseDiscoveryRef, 0, 0, OSGI_DISCOVERY_TYPE, NULL, discovery_browseDiscoveryCallback, discovery);
+ if (error != kDNSServiceErr_NoError) {
+ status = CELIX_ILLEGAL_STATE;
+ printf("============= 22 ERROR %d\n", error);
+ }
+ status = CELIX_DO_IF(status, apr_thread_create(&discovery->pollDiscovery, NULL, discovery_pollDiscovery, discovery, discovery->pool));
+
+ return status;
+}
+
+static void *APR_THREAD_FUNC discovery_pollDiscovery(apr_thread_t *thd, void *data) {
+ discovery_pt discovery = data;
+
+ while (discovery->running) {
+ DNSServiceProcessResult(discovery->browseDiscoveryRef);
+ }
+ apr_thread_exit(thd, APR_SUCCESS);
+
+ return NULL;
+}
+
+static void discovery_browseDiscoveryCallback(DNSServiceRef sdRef, DNSServiceFlags flags,
+ uint32_t interfaceIndex, DNSServiceErrorType errorCode,
+ const char *serviceName, const char *regtype, const char *replyDomain,
+ void *context) {
+ discovery_pt discovery = context;
+ if (flags & kDNSServiceFlagsAdd) {
+ printf("Added discovery with %s %s %s\n", serviceName, regtype, replyDomain);
+ DNSServiceRef resolveRef = NULL;
+ DNSServiceErrorType resolveError = DNSServiceResolve(&resolveRef, 0, 0, serviceName, regtype, replyDomain, discovery_resolveAddCallback, context);
+ printf("Resolve return with error %i\n", resolveError);
+ if (resolveError == kDNSServiceErr_NoError) {
+ DNSServiceProcessResult(resolveRef);
+ } else {
+ //TODO print error / handle error?
+ }
+ } else {
+ printf("Removed discovery with %s %s %s\n", serviceName, regtype,
+ replyDomain);
+// DNSServiceRef resolveRef = NULL;
+// DNSServiceErrorType resolveError = DNSServiceResolve(&resolveRef, 0, 0,
+// serviceName, regtype, replyDomain, discovery_resolveRemoveCallback,
+// context);
+// if (resolveError == kDNSServiceErr_NoError) {
+// DNSServiceProcessResult(resolveRef);
+// } else {
+// //TODO print error / handle error?
+// }
+ }
+}
+
+static void discovery_resolveAddCallback(DNSServiceRef sdRef, DNSServiceFlags flags, uint32_t interfaceIndex, DNSServiceErrorType errorCode, const char *fullname,
+ const char *hosttarget, uint16_t port, uint16_t txtLen, const unsigned char *txtRecord, void *context) {
+ discovery_pt discovery = context;
+
+ printf("Added discovery with %s %s %s\n", fullname, hosttarget, txtRecord);
+
+ uint8_t valueLen;
+ char *path = (char *) TXTRecordGetValuePtr(txtLen, txtRecord, "path", &valueLen);
+ char *host = strdup(gethostbyname(hosttarget)->h_name);
+ uint16_t hPort = ntohs(port);
+
+ printf("Path: %s, Host: %s\n", path, host);
+
+ char url[1024];
+ snprintf(url, sizeof(url), "%s:%d/%s", host, hPort, path);
+
+ printf("Discovery URL: %s\n", url);
+}
+
+celix_status_t discovery_stop(discovery_pt discovery) {
+ celix_status_t status;
+
+ apr_status_t tstat;
+ discovery->running = false;
+ DNSServiceRefDeallocate(discovery->browseRef);
+ apr_status_t stat = apr_thread_join(&tstat, discovery->poll);
+ if (stat != APR_SUCCESS && tstat != APR_SUCCESS) {
+ status = CELIX_BUNDLE_EXCEPTION;
+ }
+
+ apr_thread_mutex_lock(discovery->disclosedServicesMutex);
+ hash_map_iterator_pt iter = hashMapIterator_create(discovery->disclosedServices);
+ while (hashMapIterator_hasNext(iter)) {
+ hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
+ disclosed_endpoint_entry_pt endpointEntry = hashMapEntry_getValue(entry);
+ DNSServiceRefDeallocate(endpointEntry->dnsServiceRef);
+ }
+ hashMapIterator_destroy(iter);
+
+ iter = hashMapIterator_create(discovery->discoveredServices);
+ while (hashMapIterator_hasNext(iter)) {
+ hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
+ discovered_endpoint_entry_pt endpointEntry = hashMapEntry_getValue(entry);
+ discovery_informEndpointListeners(discovery, endpointEntry->endpointDescription, false);
+ }
+ hashMapIterator_destroy(iter);
+
+ hashMap_destroy(discovery->disclosedServices, false, false);
+
+ discovery->disclosedServices = NULL;
+ apr_thread_mutex_unlock(discovery->disclosedServicesMutex);
+
+ apr_thread_mutex_lock(discovery->discoveredServicesMutex);
+ hashMap_destroy(discovery->discoveredServices, false, false);
+ discovery->discoveredServices = NULL;
+ apr_thread_mutex_unlock(discovery->discoveredServicesMutex);
+
+ apr_thread_mutex_lock(discovery->listenerReferencesMutex);
+ hashMap_destroy(discovery->listenerReferences, false, false);
+ discovery->listenerReferences = NULL;
+ apr_thread_mutex_unlock(discovery->listenerReferencesMutex);
+
+ return status;
+}
+
+celix_status_t discovery_endpointAdded(void *handle, endpoint_description_pt endpoint, char *machtedFilter) {
+ celix_status_t status = CELIX_SUCCESS;
+ discovery_pt discovery = handle;
+
+ printf("DISCOVERY: Endpoint for %s, with filter \"%s\" added\n", endpoint->service, machtedFilter);
+ disclosed_endpoint_entry_pt entry = NULL;
+ apr_pool_t *childPool = NULL;
+ status = apr_pool_create(&childPool, discovery->pool);
+
+ if (status == CELIX_SUCCESS) {
+ entry = apr_palloc(childPool, sizeof(*entry));
+ if (entry == NULL) {
+ status = CELIX_ENOMEM;
+ apr_pool_destroy(childPool);
+ } else {
+ entry->pool = childPool;
+ entry->endpointDescription = endpoint;
+ }
+ }
+
+ if (status == CELIX_SUCCESS) {
+ DNSServiceRef sdRef = NULL;
+ DNSServiceErrorType error;
+ TXTRecordRef txtRecord;
+
+ TXTRecordCreate(&txtRecord, 256, NULL ); //TODO search for correct default record size
+ char serviceId[16];
+ sprintf(serviceId, "%li", endpoint->serviceId);
+
+ TXTRecordSetValue(&txtRecord, "service", strlen(endpoint->service),
+ endpoint->service);
+ TXTRecordSetValue(&txtRecord, "service.id", strlen(serviceId),
+ serviceId);
+ TXTRecordSetValue(&txtRecord, "endpoint.id", strlen(endpoint->id),
+ endpoint->id);
+ TXTRecordSetValue(&txtRecord, "framework.uuid", strlen(discovery->frameworkUuid), discovery->frameworkUuid);
+
+ hash_map_iterator_pt iter = hashMapIterator_create(
+ endpoint->properties);
+ while (hashMapIterator_hasNext(iter)) {
+ hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
+ char *key = hashMapEntry_getKey(entry);
+ char *value = hashMapEntry_getValue(entry);
+ TXTRecordSetValue(&txtRecord, key, strlen(value), value);
+ }
+ hashMapIterator_destroy(iter);
+
+ int port = atoi(discovery->discoveryPort);
+ int portInNetworkByteOrder = ((port << 8) & 0xFF00)
+ | ((port >> 8) & 0xFF); //FIXME assuming little endian
+
+ error = DNSServiceRegister(&sdRef, 0, 0, endpoint->service,
+ OSGI_SERVICE_TYPE, NULL,
+ NULL, portInNetworkByteOrder, /* In network byte order */
+ TXTRecordGetLength(&txtRecord), TXTRecordGetBytesPtr(&txtRecord),
+ NULL, NULL );
+
+ if (error != kDNSServiceErr_NoError) {
+ status = CELIX_ILLEGAL_STATE;
+// printf("Registered record in dns-sd got error code %i\n", error);
+ } else {
+ //entry->txtRecord=txtRecord; TODO
+ entry->dnsServiceRef = sdRef;
+ apr_thread_mutex_lock(discovery->disclosedServicesMutex);
+ if (discovery->disclosedServices != NULL) {
+ hashMap_put(discovery->disclosedServices, endpoint->id, entry);
+ }
+ apr_thread_mutex_unlock(discovery->disclosedServicesMutex);
+ }
+ }
+
+
+
+ return status;
+}
+
+celix_status_t discovery_endpointRemoved(void *handle, endpoint_description_pt endpoint, char *machtedFilter) {
+ celix_status_t status = CELIX_SUCCESS;
+ discovery_pt discovery = handle;
+
+ disclosed_endpoint_entry_pt entry = NULL;
+ apr_thread_mutex_lock(discovery->disclosedServicesMutex);
+ if (discovery->disclosedServices != NULL) {
+ entry = hashMap_remove(discovery->disclosedServices, endpoint->id);
+ }
+ if (entry != NULL) {
+ DNSServiceRefDeallocate(entry->dnsServiceRef);
+ apr_pool_destroy(entry->pool);
+ } else {
+ status = CELIX_ILLEGAL_STATE;
+ }
+ apr_thread_mutex_unlock(discovery->disclosedServicesMutex);
+
+
+ return status;
+}
+
+celix_status_t discovery_endpointListenerAdding(void * handle, service_reference_pt reference, void **service) {
+ celix_status_t status = CELIX_SUCCESS;
+ discovery_pt discovery = handle;
+
+ bundleContext_getService(discovery->context, reference, service);
+
+ return status;
+}
+
+celix_status_t discovery_endpointListenerAdded(void * handle, service_reference_pt reference, void * service) {
+ celix_status_t status = CELIX_SUCCESS;
+ discovery_pt discovery = handle;
+
+ service_registration_pt registration = NULL;
+ serviceReference_getServiceRegistration(reference, ®istration);
+ properties_pt serviceProperties = NULL;
+ serviceRegistration_getProperties(registration, &serviceProperties);
+ char *discoveryListener = properties_get(serviceProperties, "DISCOVERY");
+
+ if (discoveryListener != NULL && strcmp(discoveryListener, "true") == 0) {
+ printf("DISCOVERY: EndpointListener Ignored - Discovery listener\n");
+ } else {
+ printf("DISCOVERY: EndpointListener Added - Add Scope\n");
+
+ apr_thread_mutex_lock(discovery->discoveredServicesMutex);
+ if (discovery->discoveredServices != NULL) {
+ hash_map_iterator_pt iter = hashMapIterator_create(discovery->discoveredServices);
+ while (hashMapIterator_hasNext(iter)) {
+ endpoint_description_pt endpoint = hashMapIterator_nextKey(iter);
+ endpoint_listener_pt listener = service;
+
+ char *scope = properties_get(serviceProperties,
+ (char *) OSGI_ENDPOINT_LISTENER_SCOPE);
+ filter_pt filter = filter_create(scope); //FIXME memory leak
+ bool matchResult = false;
+ filter_match(filter, endpoint->properties, &matchResult);
+ if (matchResult) {
+ listener->endpointAdded(listener, endpoint, NULL);
+ }
+ }
+ hashMapIterator_destroy(iter);
+ }
+ apr_thread_mutex_unlock(discovery->discoveredServicesMutex);
+
+ apr_thread_mutex_lock(discovery->listenerReferencesMutex);
+ if (discovery->listenerReferences != NULL) {
+ hashMap_put(discovery->listenerReferences, reference, NULL /*TODO is the scope value needed?*/);
+ }
+ apr_thread_mutex_unlock(discovery->listenerReferencesMutex);
+ }
+
+ return status;
+}
+
+celix_status_t discovery_endpointListenerModified(void * handle, service_reference_pt reference, void * service) {
+ celix_status_t status = CELIX_SUCCESS;
+ discovery_pt discovery = handle;
+
+// printf("DISCOVERY: EndpointListener Modified - Update Scope TODO\n");
+
+ return status;
+}
+
+
+
+celix_status_t discovery_endpointListenerRemoved(void * handle, service_reference_pt reference, void * service) {
+ celix_status_t status = CELIX_SUCCESS;
+ discovery_pt discovery = handle;
+
+ printf("DISCOVERY: EndpointListener Removed\n");
+ apr_thread_mutex_lock(discovery->listenerReferencesMutex);
+ if (discovery->listenerReferences != NULL) {
+ hashMap_remove(discovery->listenerReferences, reference);
+ }
+ apr_thread_mutex_unlock(discovery->listenerReferencesMutex);
+
+ return status;
+}
+
+
+static celix_status_t discovery_informEndpointListeners(discovery_pt discovery, endpoint_description_pt endpoint, bool endpointAdded) {
+ celix_status_t status = CELIX_SUCCESS;
+
+ // Inform listeners of new endpoint
+ apr_thread_mutex_lock(discovery->listenerReferencesMutex);
+ if (discovery->listenerReferences != NULL) {
+ hash_map_iterator_pt iter = hashMapIterator_create(discovery->listenerReferences);
+ while (hashMapIterator_hasNext(iter)) {
+ hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
+ service_reference_pt reference = hashMapEntry_getKey(entry);
+ endpoint_listener_pt listener = NULL;
+
+ service_registration_pt registration = NULL;
+ serviceReference_getServiceRegistration(reference, ®istration);
+ properties_pt serviceProperties = NULL;
+ serviceRegistration_getProperties(registration, &serviceProperties);
+ char *scope = properties_get(serviceProperties,
+ (char *) OSGI_ENDPOINT_LISTENER_SCOPE);
+ filter_pt filter = filter_create(scope);
+ bool matchResult = false;
+ filter_match(filter, endpoint->properties, &matchResult);
+ if (matchResult) {
+ printf("DISCOVERY: Add service (%s)\n", endpoint->service);
+ bundleContext_getService(discovery->context, reference,
+ (void**) &listener);
+ if (endpointAdded) {
+ listener->endpointAdded(listener->handle, endpoint, NULL );
+ } else {
+ listener->endpointRemoved(listener->handle, endpoint, NULL );
+ }
+
+ }
+ }
+ hashMapIterator_destroy(iter);
+ }
+ apr_thread_mutex_unlock(discovery->listenerReferencesMutex);
+
+ return status;
+}
Added: celix/trunk/remote_services/discovery_configured/private/src/discovery_activator.c
URL: http://svn.apache.org/viewvc/celix/trunk/remote_services/discovery_configured/private/src/discovery_activator.c?rev=1613403&view=auto
==============================================================================
--- celix/trunk/remote_services/discovery_configured/private/src/discovery_activator.c (added)
+++ celix/trunk/remote_services/discovery_configured/private/src/discovery_activator.c Fri Jul 25 12:06:44 2014
@@ -0,0 +1,146 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership. The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * dependency_activator.c
+ *
+ * \date Sep 29, 2011
+ * \author <a href="mailto:celix-dev@incubator.apache.org">Apache Celix Project Team</a>
+ * \copyright Apache License, Version 2.0
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+
+#include <apr_strings.h>
+#include <apr_uuid.h>
+
+#include "bundle_activator.h"
+#include "service_tracker.h"
+#include "service_registration.h"
+#include "constants.h"
+
+#include "discovery.h"
+#include "endpoint_listener.h"
+#include "remote_constants.h"
+
+struct activator {
+ apr_pool_t *pool;
+ bundle_context_pt context;
+
+ discovery_pt discovery;
+
+ service_tracker_pt endpointListenerTracker;
+ service_registration_pt endpointListenerService;
+};
+
+celix_status_t discoveryActivator_createEPLTracker(struct activator *activator,
+ service_tracker_pt *tracker);
+
+celix_status_t bundleActivator_create(bundle_context_pt context,
+ void **userData) {
+ celix_status_t status = CELIX_SUCCESS;
+ apr_pool_t *parentPool = NULL;
+ apr_pool_t *pool = NULL;
+ struct activator *activator = NULL;
+
+ bundleContext_getMemoryPool(context, &parentPool);
+ apr_pool_create(&pool, parentPool);
+ activator = apr_palloc(pool, sizeof(*activator));
+ if (!activator) {
+ status = CELIX_ENOMEM;
+ } else {
+ activator->pool = pool;
+ activator->context = context;
+ activator->endpointListenerTracker = NULL;
+ activator->endpointListenerService = NULL;
+
+ discovery_create(pool, context, &activator->discovery);
+
+ discoveryActivator_createEPLTracker(activator,
+ &activator->endpointListenerTracker);
+
+ *userData = activator;
+ }
+
+ return status;
+}
+
+celix_status_t discoveryActivator_createEPLTracker(struct activator *activator,
+ service_tracker_pt *tracker) {
+ celix_status_t status = CELIX_SUCCESS;
+
+ service_tracker_customizer_pt customizer = NULL;
+
+ status = serviceTrackerCustomizer_create(
+ activator->discovery, discovery_endpointListenerAdding,
+ discovery_endpointListenerAdded, discovery_endpointListenerModified,
+ discovery_endpointListenerRemoved, &customizer);
+
+ if (status == CELIX_SUCCESS) {
+ status = serviceTracker_create(activator->context,
+ (char *) OSGI_ENDPOINT_LISTENER_SERVICE, customizer, tracker);
+
+ serviceTracker_open(activator->endpointListenerTracker);
+ }
+
+ return status;
+}
+
+celix_status_t bundleActivator_start(void * userData, bundle_context_pt context) {
+ celix_status_t status = CELIX_SUCCESS;
+ struct activator *activator = userData;
+ apr_pool_t *pool = NULL;
+ apr_pool_create(&pool, activator->pool);
+
+ endpoint_listener_pt endpointListener = apr_palloc(pool,
+ sizeof(*endpointListener));
+ endpointListener->handle = activator->discovery;
+ endpointListener->endpointAdded = discovery_endpointAdded;
+ endpointListener->endpointRemoved = discovery_endpointRemoved;
+
+ properties_pt props = properties_create();
+ properties_set(props, "DISCOVERY", "true");
+ char *uuid = NULL;
+ bundleContext_getProperty(context, OSGI_FRAMEWORK_FRAMEWORK_UUID, &uuid);
+ char *scope = apr_pstrcat(activator->pool, "(&(", OSGI_FRAMEWORK_OBJECTCLASS, "=*)(", OSGI_RSA_ENDPOINT_FRAMEWORK_UUID, "=", uuid, "))", NULL);
+ printf("DISCOVERY SCOPE IS: %s\n", scope);
+ properties_set(props, (char *) OSGI_ENDPOINT_LISTENER_SCOPE, scope);
+ status = bundleContext_registerService(context,
+ (char *) OSGI_ENDPOINT_LISTENER_SERVICE, endpointListener, props,
+ &activator->endpointListenerService);
+
+ return status;
+}
+
+celix_status_t bundleActivator_stop(void * userData, bundle_context_pt context) {
+ celix_status_t status = CELIX_SUCCESS;
+ struct activator *activator = userData;
+
+ serviceTracker_close(activator->endpointListenerTracker);
+ serviceRegistration_unregister(activator->endpointListenerService);
+ discovery_stop(activator->discovery);
+
+ return status;
+}
+
+celix_status_t bundleActivator_destroy(void * userData,
+ bundle_context_pt context) {
+ celix_status_t status = CELIX_SUCCESS;
+ return status;
+}
Added: celix/trunk/remote_services/discovery_configured/private/src/endpoint_description_reader.c
URL: http://svn.apache.org/viewvc/celix/trunk/remote_services/discovery_configured/private/src/endpoint_description_reader.c?rev=1613403&view=auto
==============================================================================
--- celix/trunk/remote_services/discovery_configured/private/src/endpoint_description_reader.c (added)
+++ celix/trunk/remote_services/discovery_configured/private/src/endpoint_description_reader.c Fri Jul 25 12:06:44 2014
@@ -0,0 +1,217 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership. The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * endpoint_description_reader.c
+ *
+ * \date 24 Jul 2014
+ * \author <a href="mailto:celix-dev@incubator.apache.org">Apache Celix Project Team</a>
+ * \copyright Apache License, Version 2.0
+ */
+
+#include <stdbool.h>
+#include <libxml/xmlreader.h>
+
+#include "array_list.h"
+
+//celix_status_t edr_create() {
+//
+//}
+
+typedef enum {
+ VALUE_TYPE_STRING,
+ VALUE_TYPE_LONG,
+ VALUE_TYPE_DOUBLE,
+ VALUE_TYPE_FLOAT,
+ VALUE_TYPE_INTEGER,
+ VALUE_TYPE_BYTE,
+ VALUE_TYPE_CHAR,
+ VALUE_TYPE_BOOLEAN,
+ VALUE_TYPE_SHORT,
+} valueType;
+
+
+static valueType getValueType(char *name);
+
+int main() {
+ xmlTextReaderPtr reader = xmlReaderForFile("desc.xml", NULL, 0);
+ if (reader != NULL) {
+ bool inProperty = false;
+ bool inXml = false;
+ bool inArray = false;
+ bool inList = false;
+ bool inSet = false;
+ bool inValue = false;
+
+ const char *propertyName = NULL;
+ valueType propertyType;
+ const char *propertyValue = NULL;
+ const char *value = NULL;
+ char *valueBuffer = malloc(256);
+ valueBuffer[0] = '\0';
+ unsigned int currentSize = 255;
+
+ array_list_pt propertyValues = NULL;
+ arrayList_create(&propertyValues);
+
+ int read = xmlTextReaderRead(reader);
+ while (read == 1) {
+ int type = xmlTextReaderNodeType(reader);
+
+ if (type == 1) {
+ const char *localname = (const char*) xmlTextReaderConstLocalName(reader);
+
+ if (inXml) {
+ if (strlen(valueBuffer) + strlen(localname) + 1 > currentSize) {
+ valueBuffer = realloc(*valueBuffer, currentSize * 2);
+ }
+ strcat(valueBuffer, "<");
+ strcat(valueBuffer, localname);
+ // m_valueBuffer.append("<" + qName);
+ // for (int i = 0; i < attributes.getLength(); i++) {
+ // m_valueBuffer.append(" ").append(attributes.getQName(i)).append("=\"")
+ // .append(attributes.getValue(i)).append("\"");
+ // }
+ // m_valueBuffer.append(">");
+ strcat(valueBuffer, ">");
+ read = xmlTextReaderRead(reader);
+ continue;
+ }
+
+ if (strcmp(localname, "property") == 0) {
+ inProperty = true;
+ propertyName = (const char *) xmlTextReaderGetAttribute(reader, "name");
+ propertyType = getValueType((const char *) xmlTextReaderGetAttribute(reader, "value-type"));
+ propertyValue = (const char *) xmlTextReaderGetAttribute(reader, "value");
+ arrayList_clear(propertyValues);
+
+ read = xmlTextReaderRead(reader);
+ continue;
+ }
+
+ valueBuffer[0] = '\0';
+ value = NULL;
+ inArray |= inProperty && strcmp(localname, "array") == 0;
+ inList |= inProperty && strcmp(localname, "list") == 0;
+ inSet |= inProperty && strcmp(localname, "set") == 0;
+ inXml |= inProperty && strcmp(localname, "xml") == 0;
+ inValue |= inProperty && strcmp(localname, "value") == 0;
+ }
+
+ if (type == 15) {
+ const char *localname = (const char*) xmlTextReaderConstLocalName(reader);
+
+ if (inXml) {
+ if (strcmp(localname, "xml") != 0) {
+ strcat(valueBuffer, "</");
+ strcat(valueBuffer, localname);
+ strcat(valueBuffer, ">");
+ }
+ else {
+ inXml = false;
+ }
+ read = xmlTextReaderRead(reader);
+ continue;
+ }
+
+ if (strcmp(localname, "endpoint-description") == 0) {
+// m_endpointDesciptions.add(new EndpointDescription(m_endpointProperties));
+ printf("New description\n");
+ read = xmlTextReaderRead(reader);
+ continue;
+ }
+
+ if (strcmp(localname, "property") == 0) {
+ inProperty = false;
+
+ printf("Property: %s, %d, %s\n", propertyName, propertyType, propertyValue);
+
+ if (inArray) {
+// m_endpointProperties.put(m_propertyName, getPropertyValuesArray());
+ }
+ else if (inList) {
+// m_endpointProperties.put(m_propertyName, getPropertyValuesList());
+ }
+ else if (inSet) {
+// m_endpointProperties.put(m_propertyName, getPropertyValuesSet());
+ }
+ else if (propertyValue != NULL) {
+// m_endpointProperties.put(m_propertyName, m_propertyType.parse(m_propertyValue));
+ }
+ else {
+ printf("Buffer: %s\n", valueBuffer);
+// m_endpointProperties.put(m_propertyName, m_valueBuffer.toString());
+ }
+ inArray = false;
+ inList = false;
+ inSet = false;
+ inXml = false;
+ read = xmlTextReaderRead(reader);
+ continue;
+ }
+
+ if (strcmp(localname, "value") == 0) {
+// m_propertyValues.add(m_propertyType.parse(m_valueBuffer.toString()));
+ inValue = false;
+ read = xmlTextReaderRead(reader);
+ continue;
+ }
+ }
+
+ if (type == 3) {
+ if (inValue || inXml) {
+ const char *value = (const char*) xmlTextReaderValue(reader);
+ printf("Value: %s\n", value);
+ strcat(valueBuffer, value);
+// m_valueBuffer.append(ch, start, length);
+ }
+ }
+
+ read = xmlTextReaderRead(reader);
+ }
+ }
+
+ return 0;
+}
+
+static valueType getValueType(char *name) {
+ if (name == NULL || strcmp(name, "") == 0) {
+ return VALUE_TYPE_STRING;
+ }
+ if (strcmp(name, "String") == 0) {
+ return VALUE_TYPE_STRING;
+ } else if (strcmp(name, "long") == 0 || strcmp(name, "Long") == 0) {
+ return VALUE_TYPE_LONG;
+ } else if (strcmp(name, "double") == 0 || strcmp(name, "Double") == 0) {
+ return VALUE_TYPE_DOUBLE;
+ } else if (strcmp(name, "float") == 0 || strcmp(name, "Float") == 0) {
+ return VALUE_TYPE_FLOAT;
+ } else if (strcmp(name, "integer") == 0 || strcmp(name, "Integer") == 0) {
+ return VALUE_TYPE_INTEGER;
+ } else if (strcmp(name, "short") == 0 || strcmp(name, "Short") == 0) {
+ return VALUE_TYPE_SHORT;
+ } else if (strcmp(name, "byte") == 0 || strcmp(name, "Byte") == 0) {
+ return VALUE_TYPE_BYTE;
+ } else if (strcmp(name, "char") == 0 || strcmp(name, "Character") == 0) {
+ return VALUE_TYPE_CHAR;
+ } else if (strcmp(name, "boolean") == 0 || strcmp(name, "Boolean") == 0) {
+ return VALUE_TYPE_BOOLEAN;
+ } else {
+ return VALUE_TYPE_STRING;
+ }
+}
Added: celix/trunk/remote_services/discovery_configured/private/src/endpoint_discovery_poller.c
URL: http://svn.apache.org/viewvc/celix/trunk/remote_services/discovery_configured/private/src/endpoint_discovery_poller.c?rev=1613403&view=auto
==============================================================================
--- celix/trunk/remote_services/discovery_configured/private/src/endpoint_discovery_poller.c (added)
+++ celix/trunk/remote_services/discovery_configured/private/src/endpoint_discovery_poller.c Fri Jul 25 12:06:44 2014
@@ -0,0 +1,224 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership. The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * endpoint_discovery_poller.c
+ *
+ * \date 3 Jul 2014
+ * \author <a href="mailto:celix-dev@incubator.apache.org">Apache Celix Project Team</a>
+ * \copyright Apache License, Version 2.0
+ */
+#include <stdbool.h>
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+
+#include <curl/curl.h>
+
+#include "endpoint_discovery_poller.h"
+#include "hash_map.h"
+#include "array_list.h"
+#include "celix_threads.h"
+#include "utils.h"
+#include "endpoint_listener.h"
+
+struct endpoint_discovery_poller {
+ discovery_pt discovery;
+ hash_map_pt entries;
+ celix_thread_mutex_t pollerLock;
+ celix_thread_t pollerThread;
+};
+
+static void *endpointDiscoveryPoller_poll(void *data);
+static celix_status_t endpointDiscoveryPoller_getEndpoints(endpoint_discovery_poller_pt poller, char *url, array_list_pt *updatedEndpoints);
+static size_t endpointDiscoveryPoller_writeMemory(void *contents, size_t size, size_t nmemb, void *memoryPtr);
+static celix_status_t endpointDiscoveryPoller_endpointDescriptionEquals(void *endpointPtr, void *comparePtr, bool *equals);
+
+celix_status_t endpointDiscoveryPoller_create(discovery_pt discovery, endpoint_discovery_poller_pt *poller) {
+ celix_status_t status = CELIX_SUCCESS;
+ *poller = malloc(sizeof(**poller));
+ if (!poller) {
+ status = CELIX_ENOMEM;
+ } else {
+ (*poller)->discovery = discovery;
+ (*poller)->entries = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+ status = celixThreadMutex_create(&(*poller)->pollerLock, NULL);
+ if (status != CELIX_SUCCESS) {
+ status = CELIX_ILLEGAL_STATE;
+ } else {
+ celixThread_create(&(*poller)->pollerThread, NULL, endpointDiscoveryPoller_poll, *poller);
+ }
+ }
+ return status;
+}
+
+celix_status_t endpointDiscoveryPoller_addDiscoveryEndpoint(endpoint_discovery_poller_pt poller, char *url) {
+ celix_status_t status = CELIX_SUCCESS;
+
+ status = celixThreadMutex_lock(&(poller)->pollerLock);
+ if (status != 0) {
+ status = CELIX_BUNDLE_EXCEPTION;
+ } else {
+ array_list_pt endpoints;
+ status = arrayList_createWithEquals(endpointDiscoveryPoller_endpointDescriptionEquals, &endpoints);
+ if (status == CELIX_SUCCESS) {
+ hashMap_put(poller->entries, url, endpoints);
+ }
+ status = celixThreadMutex_unlock(&poller->pollerLock);
+ if (status != 0) {
+ status = CELIX_BUNDLE_EXCEPTION;
+ }
+ }
+
+ return status;
+}
+
+celix_status_t endpointDiscoveryPoller_removeDiscoveryEndpoint(endpoint_discovery_poller_pt poller, char *url) {
+ celix_status_t status = CELIX_SUCCESS;
+
+ status = celixThreadMutex_lock(&poller->pollerLock);
+ if (status != 0) {
+ status = CELIX_BUNDLE_EXCEPTION;
+ } else {
+ array_list_pt entries = hashMap_remove(poller->entries, url);
+ int i;
+ for (i = 0; i < arrayList_size(entries); i++) {
+ endpoint_description_pt endpoint = arrayList_get(entries, i);
+ // discovery_removeDiscoveredEndpoint(poller->discovery, endpoint);
+ }
+ arrayList_destroy(entries);
+
+ status = celixThreadMutex_unlock(&poller->pollerLock);
+ if (status != 0) {
+ status = CELIX_BUNDLE_EXCEPTION;
+ }
+ }
+
+ return status;
+}
+
+static void *endpointDiscoveryPoller_poll(void *data) {
+ endpoint_discovery_poller_pt poller = (endpoint_discovery_poller_pt) data;
+
+ celix_status_t status = celixThreadMutex_lock(&poller->pollerLock);
+ if (status != 0) {
+ status = CELIX_BUNDLE_EXCEPTION;
+ } else {
+ hash_map_iterator_pt iterator = hashMapIterator_create(poller->entries);
+ while (hashMapIterator_hasNext(iterator)) {
+ array_list_pt currentEndpoints = hashMapIterator_nextValue(iterator);
+ char *url = hashMapIterator_nextKey(iterator);
+
+ array_list_pt updatedEndpoints = NULL;
+ status = endpointDiscoveryPoller_getEndpoints(poller, url, &updatedEndpoints);
+ if (status == CELIX_SUCCESS) {
+ int i;
+ for (i = 0; i < arrayList_size(currentEndpoints); i++) {
+ endpoint_description_pt endpoint = arrayList_get(currentEndpoints, i);
+ if (!arrayList_contains(updatedEndpoints, endpoint)) {
+ // status = discovery_removeDiscoveredEndpoint(poller->discovery, endpoint);
+ }
+ }
+
+ arrayList_clear(currentEndpoints);
+ arrayList_addAll(currentEndpoints, updatedEndpoints);
+ arrayList_destroy(updatedEndpoints);
+
+ for (i = 0; i < arrayList_size(currentEndpoints); i++) {
+ endpoint_description_pt endpoint = arrayList_get(currentEndpoints, i);
+ // status = discovery_addDiscoveredEndpoint(poller->discovery, endpoint);
+ }
+ }
+ }
+
+ status = celixThreadMutex_unlock(&poller->pollerLock);
+ if (status != 0) {
+ status = CELIX_BUNDLE_EXCEPTION;
+ }
+ }
+
+ return NULL;
+}
+
+struct MemoryStruct {
+ char *memory;
+ size_t size;
+};
+
+static celix_status_t endpointDiscoveryPoller_getEndpoints(endpoint_discovery_poller_pt poller, char *url, array_list_pt *updatedEndpoints) {
+ celix_status_t status = CELIX_SUCCESS;
+
+ CURL *curl;
+ CURLcode res;
+
+ struct MemoryStruct chunk;
+ chunk.memory = malloc(1);
+ chunk.size = 0;
+
+ curl_global_init(CURL_GLOBAL_ALL);
+ curl = curl_easy_init();
+ if(!curl) {
+ status = CELIX_ILLEGAL_STATE;
+ } else {
+ curl_easy_setopt(curl, CURLOPT_URL, url);
+ curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, endpointDiscoveryPoller_writeMemory);
+ curl_easy_setopt(curl, CURLOPT_WRITEDATA, (void *)&chunk);
+ res = curl_easy_perform(curl);
+ curl_easy_cleanup(curl);
+ }
+
+ // process endpoints file
+
+ // clean up endpoints file
+ if(chunk.memory) {
+ free(chunk.memory);
+ }
+ curl_global_cleanup();
+ return status;
+}
+
+static size_t endpointDiscoveryPoller_writeMemory(void *contents, size_t size, size_t nmemb, void *memoryPtr) {
+ size_t realsize = size * nmemb;
+ struct MemoryStruct *mem = (struct MemoryStruct *)memoryPtr;
+
+ mem->memory = realloc(mem->memory, mem->size + realsize + 1);
+ if(mem->memory == NULL) {
+ /* out of memory! */
+ printf("not enough memory (realloc returned NULL)\n");
+ return 0;
+ }
+
+ memcpy(&(mem->memory[mem->size]), contents, realsize);
+ mem->size += realsize;
+ mem->memory[mem->size] = 0;
+
+ return realsize;
+}
+
+static celix_status_t endpointDiscoveryPoller_endpointDescriptionEquals(void *endpointPtr, void *comparePtr, bool *equals) {
+ endpoint_description_pt endpoint = (endpoint_description_pt) endpointPtr;
+ endpoint_description_pt compare = (endpoint_description_pt) comparePtr;
+
+ if (strcmp(endpoint->id, compare->id) == 0) {
+ *equals = true;
+ } else {
+ *equals = false;
+ }
+
+ return CELIX_SUCCESS;
+}