You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by bp...@apache.org on 2014/09/19 17:36:25 UTC
svn commit: r1626250 [1/2] - in /celix/trunk/remote_services: ./ discovery/
discovery/private/ discovery/private/include/ discovery/private/src/
discovery_configured/ discovery_configured/private/include/
discovery_configured/private/src/ discovery_etc...
Author: bpetri
Date: Fri Sep 19 15:36:24 2014
New Revision: 1626250
URL: http://svn.apache.org/r1626250
Log:
CELIX-152: Refactored discovery_cfg to be bit more generic. Added Discovery Support based on etcd
Added:
celix/trunk/remote_services/discovery/
celix/trunk/remote_services/discovery/private/
celix/trunk/remote_services/discovery/private/include/
celix/trunk/remote_services/discovery/private/include/discovery.h
celix/trunk/remote_services/discovery/private/include/endpoint_descriptor_common.h
celix/trunk/remote_services/discovery/private/include/endpoint_descriptor_reader.h
celix/trunk/remote_services/discovery/private/include/endpoint_descriptor_writer.h
celix/trunk/remote_services/discovery/private/include/endpoint_discovery_poller.h
celix/trunk/remote_services/discovery/private/include/endpoint_discovery_server.h
celix/trunk/remote_services/discovery/private/src/
celix/trunk/remote_services/discovery/private/src/desc.xml (with props)
celix/trunk/remote_services/discovery/private/src/discovery.c
celix/trunk/remote_services/discovery/private/src/discovery_activator.c
celix/trunk/remote_services/discovery/private/src/endpoint_descriptor_reader.c
celix/trunk/remote_services/discovery/private/src/endpoint_descriptor_writer.c
celix/trunk/remote_services/discovery/private/src/endpoint_discovery_poller.c
celix/trunk/remote_services/discovery/private/src/endpoint_discovery_server.c
celix/trunk/remote_services/discovery_configured/private/include/discovery_impl.h
celix/trunk/remote_services/discovery_configured/private/src/discovery_impl.c
celix/trunk/remote_services/discovery_etcd/
celix/trunk/remote_services/discovery_etcd/CMakeLists.txt
celix/trunk/remote_services/discovery_etcd/private/
celix/trunk/remote_services/discovery_etcd/private/include/
celix/trunk/remote_services/discovery_etcd/private/include/discovery_impl.h
celix/trunk/remote_services/discovery_etcd/private/include/etcd.h
celix/trunk/remote_services/discovery_etcd/private/include/etcd_watcher.h
celix/trunk/remote_services/discovery_etcd/private/src/
celix/trunk/remote_services/discovery_etcd/private/src/discovery_impl.c
celix/trunk/remote_services/discovery_etcd/private/src/etcd.c
celix/trunk/remote_services/discovery_etcd/private/src/etcd_watcher.c
Removed:
celix/trunk/remote_services/discovery_configured/private/include/discovery.h
celix/trunk/remote_services/discovery_configured/private/include/endpoint_descriptor_common.h
celix/trunk/remote_services/discovery_configured/private/include/endpoint_descriptor_reader.h
celix/trunk/remote_services/discovery_configured/private/include/endpoint_descriptor_writer.h
celix/trunk/remote_services/discovery_configured/private/include/endpoint_discovery_poller.h
celix/trunk/remote_services/discovery_configured/private/include/endpoint_discovery_server.h
celix/trunk/remote_services/discovery_configured/private/src/discovery.c
celix/trunk/remote_services/discovery_configured/private/src/discovery_activator.c
celix/trunk/remote_services/discovery_configured/private/src/endpoint_descriptor_reader.c
celix/trunk/remote_services/discovery_configured/private/src/endpoint_descriptor_writer.c
celix/trunk/remote_services/discovery_configured/private/src/endpoint_discovery_poller.c
celix/trunk/remote_services/discovery_configured/private/src/endpoint_discovery_server.c
Modified:
celix/trunk/remote_services/CMakeLists.txt
celix/trunk/remote_services/deploy.cmake
celix/trunk/remote_services/discovery_configured/CMakeLists.txt
celix/trunk/remote_services/discovery_shm/private/src/discovery_activator.c
Modified: celix/trunk/remote_services/CMakeLists.txt
URL: http://svn.apache.org/viewvc/celix/trunk/remote_services/CMakeLists.txt?rev=1626250&r1=1626249&r2=1626250&view=diff
==============================================================================
--- celix/trunk/remote_services/CMakeLists.txt (original)
+++ celix/trunk/remote_services/CMakeLists.txt Fri Sep 19 15:36:24 2014
@@ -36,6 +36,7 @@ if (REMOTE_SERVICE_ADMIN)
add_subdirectory(discovery_slp)
add_subdirectory(discovery_bonjour)
add_subdirectory(discovery_configured)
+ add_subdirectory(discovery_etcd)
add_subdirectory(discovery_shm)
add_subdirectory(calculator_service)
Modified: celix/trunk/remote_services/deploy.cmake
URL: http://svn.apache.org/viewvc/celix/trunk/remote_services/deploy.cmake?rev=1626250&r1=1626249&r2=1626250&view=diff
==============================================================================
--- celix/trunk/remote_services/deploy.cmake (original)
+++ celix/trunk/remote_services/deploy.cmake Fri Sep 19 15:36:24 2014
@@ -46,3 +46,11 @@ if (RSA_BUNDLES_REMOTE_SERVICE_ADMIN_SHM
ENDPOINTS org.apache.celix.calc.api.Calculator_proxy)
endif (RSA_BUNDLES_DISCOVERY_SHM)
endif (RSA_BUNDLES_REMOTE_SERVICE_ADMIN_SHM)
+
+is_enabled(RSA_BUNDLES_DISCOVERY_ETCD)
+if (RSA_BUNDLES_DISCOVERY_ETCD)
+ deploy("remote-services-etcd" BUNDLES discovery_etcd topology_manager remote_service_admin_http calculator shell shell_tui log_service log_writer
+ ENDPOINTS org.apache.celix.calc.api.Calculator_endpoint)
+ deploy("remote-services-etcd-client" BUNDLES topology_manager remote_service_admin_http shell shell_tui log_service log_writer calculator_shell discovery_etcd
+ ENDPOINTS org.apache.celix.calc.api.Calculator_proxy)
+endif (RSA_BUNDLES_DISCOVERY_ETCD)
Added: celix/trunk/remote_services/discovery/private/include/discovery.h
URL: http://svn.apache.org/viewvc/celix/trunk/remote_services/discovery/private/include/discovery.h?rev=1626250&view=auto
==============================================================================
--- celix/trunk/remote_services/discovery/private/include/discovery.h (added)
+++ celix/trunk/remote_services/discovery/private/include/discovery.h Fri Sep 19 15:36:24 2014
@@ -0,0 +1,64 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership. The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * topology_manager.h
+ *
+ * \date Sep 29, 2011
+ * \author <a href="mailto:celix-dev@incubator.apache.org">Apache Celix Project Team</a>
+ * \copyright Apache License, Version 2.0
+ */
+
+#ifndef DISCOVERY_H_
+#define DISCOVERY_H_
+
+#include "bundle_context.h"
+#include "service_reference.h"
+
+#include "endpoint_description.h"
+#include "endpoint_listener.h"
+
+#define DISCOVERY_SERVER_PORT "DISCOVERY_CFG_SERVER_PORT"
+#define DISCOVERY_SERVER_PATH "DISCOVERY_CFG_SERVER_PATH"
+#define DISCOVERY_POLL_ENDPOINTS "DISCOVERY_CFG_POLL_ENDPOINTS"
+
+typedef struct discovery *discovery_pt;
+
+
+/* those one could be put into a general discovery.h - file */
+celix_status_t discovery_create(bundle_context_pt context, discovery_pt *discovery);
+celix_status_t discovery_destroy(discovery_pt discovery);
+
+celix_status_t discovery_start(discovery_pt discovery);
+celix_status_t discovery_stop(discovery_pt discovery);
+
+celix_status_t discovery_endpointAdded(void *handle, endpoint_description_pt endpoint, char *machtedFilter);
+celix_status_t discovery_endpointRemoved(void *handle, endpoint_description_pt endpoint, char *machtedFilter);
+
+celix_status_t discovery_endpointListenerAdding(void * handle, service_reference_pt reference, void **service);
+celix_status_t discovery_endpointListenerAdded(void * handle, service_reference_pt reference, void * service);
+celix_status_t discovery_endpointListenerModified(void * handle, service_reference_pt reference, void * service);
+celix_status_t discovery_endpointListenerRemoved(void * handle, service_reference_pt reference, void * service);
+
+celix_status_t discovery_informEndpointListeners(discovery_pt discovery, endpoint_description_pt endpoint, bool endpointAdded);
+celix_status_t discovery_updateEndpointListener(discovery_pt discovery, service_reference_pt reference, endpoint_listener_pt service);
+
+celix_status_t discovery_addDiscoveredEndpoint(discovery_pt discovery, endpoint_description_pt endpoint);
+celix_status_t discovery_removeDiscoveredEndpoint(discovery_pt discovery, endpoint_description_pt endpoint);
+
+#endif /* DISCOVERY_H_ */
Added: celix/trunk/remote_services/discovery/private/include/endpoint_descriptor_common.h
URL: http://svn.apache.org/viewvc/celix/trunk/remote_services/discovery/private/include/endpoint_descriptor_common.h?rev=1626250&view=auto
==============================================================================
--- celix/trunk/remote_services/discovery/private/include/endpoint_descriptor_common.h (added)
+++ celix/trunk/remote_services/discovery/private/include/endpoint_descriptor_common.h Fri Sep 19 15:36:24 2014
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * endpoint_descriptor_common.h
+ *
+ * \date Aug 8, 2014
+ * \author <a href="mailto:celix-dev@incubator.apache.org">Apache Celix Project Team</a>
+ * \copyright Apache License, Version 2.0
+ */
+
+#ifndef ENDPOINT_DESCRIPTOR_COMMON_H_
+#define ENDPOINT_DESCRIPTOR_COMMON_H_
+
+/*
+ * Private constant & enum definitions for endpoint descriptor reader and writer, not needed for normal usage of the reader and writer.
+ */
+
+typedef enum {
+ VALUE_TYPE_STRING,
+ VALUE_TYPE_LONG,
+ VALUE_TYPE_DOUBLE,
+ VALUE_TYPE_FLOAT,
+ VALUE_TYPE_INTEGER,
+ VALUE_TYPE_BYTE,
+ VALUE_TYPE_CHAR,
+ VALUE_TYPE_BOOLEAN,
+ VALUE_TYPE_SHORT,
+} valueType;
+
+
+static valueType valueTypeFromString(char *name);
+static char* valueTypeToString(valueType type);
+
+static const xmlChar* XML = (const xmlChar*) "xml";
+static const xmlChar* XMLNS = (const xmlChar*) "http://www.osgi.org/xmlns/rsa/v1.0.0";
+
+static const xmlChar* ENDPOINT_DESCRIPTIONS = (const xmlChar*) "endpoint-descriptions";
+static const xmlChar* ENDPOINT_DESCRIPTION = (const xmlChar*) "endpoint-description";
+
+static const xmlChar* ARRAY = (const xmlChar*) "array";
+static const xmlChar* LIST = (const xmlChar*) "list";
+static const xmlChar* SET = (const xmlChar*) "set";
+
+static const xmlChar* PROPERTY = (const xmlChar*) "property";
+static const xmlChar* NAME = (const xmlChar*) "name";
+static const xmlChar* VALUE = (const xmlChar*) "value";
+static const xmlChar* VALUE_TYPE = (const xmlChar*) "value-type";
+
+#endif /* ENDPOINT_DESCRIPTOR_COMMON_H_ */
Added: celix/trunk/remote_services/discovery/private/include/endpoint_descriptor_reader.h
URL: http://svn.apache.org/viewvc/celix/trunk/remote_services/discovery/private/include/endpoint_descriptor_reader.h?rev=1626250&view=auto
==============================================================================
--- celix/trunk/remote_services/discovery/private/include/endpoint_descriptor_reader.h (added)
+++ celix/trunk/remote_services/discovery/private/include/endpoint_descriptor_reader.h Fri Sep 19 15:36:24 2014
@@ -0,0 +1,41 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership. The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * endpoint_descriptor_reader.h
+ *
+ * \date 26 Jul 2014
+ * \author <a href="mailto:celix-dev@incubator.apache.org">Apache Celix Project Team</a>
+ * \copyright Apache License, Version 2.0
+ */
+
+#ifndef ENDPOINT_DESCRIPTOR_READER_H_
+#define ENDPOINT_DESCRIPTOR_READER_H_
+
+#include "celix_errno.h"
+#include "array_list.h"
+
+typedef struct endpoint_descriptor_reader *endpoint_descriptor_reader_pt;
+
+celix_status_t endpointDescriptorReader_create(endpoint_descriptor_reader_pt *reader);
+celix_status_t endpointDescriptorReader_destroy(endpoint_descriptor_reader_pt reader);
+
+celix_status_t endpointDescriptorReader_parseDocument(endpoint_descriptor_reader_pt reader, char *document, array_list_pt *endpoints);
+
+
+#endif /* ENDPOINT_DESCRIPTOR_READER_H_ */
Added: celix/trunk/remote_services/discovery/private/include/endpoint_descriptor_writer.h
URL: http://svn.apache.org/viewvc/celix/trunk/remote_services/discovery/private/include/endpoint_descriptor_writer.h?rev=1626250&view=auto
==============================================================================
--- celix/trunk/remote_services/discovery/private/include/endpoint_descriptor_writer.h (added)
+++ celix/trunk/remote_services/discovery/private/include/endpoint_descriptor_writer.h Fri Sep 19 15:36:24 2014
@@ -0,0 +1,39 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership. The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * endpoint_descriptor_writer.h
+ *
+ * \date 26 Jul 2014
+ * \author <a href="mailto:celix-dev@incubator.apache.org">Apache Celix Project Team</a>
+ * \copyright Apache License, Version 2.0
+ */
+
+#ifndef ENDPOINT_DESCRIPTOR_WRITER_H_
+#define ENDPOINT_DESCRIPTOR_WRITER_H_
+
+#include "celix_errno.h"
+#include "array_list.h"
+
+typedef struct endpoint_descriptor_writer *endpoint_descriptor_writer_pt;
+
+celix_status_t endpointDescriptorWriter_create(endpoint_descriptor_writer_pt *writer);
+celix_status_t endpointDescriptorWriter_destroy(endpoint_descriptor_writer_pt writer);
+celix_status_t endpointDescriptorWriter_writeDocument(endpoint_descriptor_writer_pt writer, array_list_pt endpoints, char **document);
+
+#endif /* ENDPOINT_DESCRIPTOR_WRITER_H_ */
Added: celix/trunk/remote_services/discovery/private/include/endpoint_discovery_poller.h
URL: http://svn.apache.org/viewvc/celix/trunk/remote_services/discovery/private/include/endpoint_discovery_poller.h?rev=1626250&view=auto
==============================================================================
--- celix/trunk/remote_services/discovery/private/include/endpoint_discovery_poller.h (added)
+++ celix/trunk/remote_services/discovery/private/include/endpoint_discovery_poller.h Fri Sep 19 15:36:24 2014
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * endpoint_discovery_poller.h
+ *
+ * \date 3 Jul 2014
+ * \author <a href="mailto:celix-dev@incubator.apache.org">Apache Celix Project Team</a>
+ * \copyright Apache License, Version 2.0
+ */
+
+#ifndef ENDPOINT_DISCOVERY_POLLER_H_
+#define ENDPOINT_DISCOVERY_POLLER_H_
+
+#include "celix_errno.h"
+#include "discovery.h"
+
+typedef struct endpoint_discovery_poller *endpoint_discovery_poller_pt;
+
+celix_status_t endpointDiscoveryPoller_create(discovery_pt discovery, bundle_context_pt context, endpoint_discovery_poller_pt *poller);
+celix_status_t endpointDiscoveryPoller_destroy(endpoint_discovery_poller_pt poller);
+
+celix_status_t endpointDiscoveryPoller_addDiscoveryEndpoint(endpoint_discovery_poller_pt poller, char *url);
+celix_status_t endpointDiscoveryPoller_removeDiscoveryEndpoint(endpoint_discovery_poller_pt poller, char *url);
+
+
+#endif /* ENDPOINT_DISCOVERY_POLLER_H_ */
Added: celix/trunk/remote_services/discovery/private/include/endpoint_discovery_server.h
URL: http://svn.apache.org/viewvc/celix/trunk/remote_services/discovery/private/include/endpoint_discovery_server.h?rev=1626250&view=auto
==============================================================================
--- celix/trunk/remote_services/discovery/private/include/endpoint_discovery_server.h (added)
+++ celix/trunk/remote_services/discovery/private/include/endpoint_discovery_server.h Fri Sep 19 15:36:24 2014
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * endpoint_discovery_server.h
+ *
+ * \date Aug 12, 2014
+ * \author <a href="mailto:celix-dev@incubator.apache.org">Apache Celix Project Team</a>
+ * \copyright Apache License, Version 2.0
+ */
+
+#ifndef ENDPOINT_DISCOVERY_SERVER_H_
+#define ENDPOINT_DISCOVERY_SERVER_H_
+
+#include "celix_errno.h"
+#include "discovery.h"
+
+typedef struct endpoint_discovery_server *endpoint_discovery_server_pt;
+
+/**
+ * Creates and starts a new instance of an endpoint discovery server.
+ *
+ * @param discovery [in] the discovery service itself;
+ * @param context [in] the bundle context;
+ * @param server [out] the pointer to the created instance.
+ * @return CELIX_SUCCESS when successful.
+ */
+celix_status_t endpointDiscoveryServer_create(discovery_pt discovery, bundle_context_pt context, endpoint_discovery_server_pt *server);
+
+/**
+ * Stops and destroys a given instance of an endpoint discovery server.
+ *
+ * @param server [in] the pointer to the instance to destroy.
+ * @return CELIX_SUCCESS when successful.
+ */
+celix_status_t endpointDiscoveryServer_destroy(endpoint_discovery_server_pt server);
+
+/**
+ * Adds a given endpoint description to expose through the given discovery server.
+ *
+ * @param server [in] the endpoint discovery server to expose the endpoint through;
+ * @param endpoint [in] the endpoint description to expose.
+ * @return CELIX_SUCCESS when successful.
+ */
+celix_status_t endpointDiscoveryServer_addEndpoint(endpoint_discovery_server_pt server, endpoint_description_pt endpoint);
+
+/**
+ * Removes a given endpoint description from exposure through the given discovery server.
+ *
+ * @param server [in] the endpoint discovery server to remove the endpoint from;
+ * @param endpoint [in] the endpoint description to remove.
+ * @return CELIX_SUCCESS when successful.
+ */
+celix_status_t endpointDiscoveryServer_removeEndpoint( endpoint_discovery_server_pt server, endpoint_description_pt endpoint);
+
+#endif /* ENDPOINT_DISCOVERY_SERVER_H_ */
Added: celix/trunk/remote_services/discovery/private/src/desc.xml
URL: http://svn.apache.org/viewvc/celix/trunk/remote_services/discovery/private/src/desc.xml?rev=1626250&view=auto
==============================================================================
Binary file - no diff available.
Propchange: celix/trunk/remote_services/discovery/private/src/desc.xml
------------------------------------------------------------------------------
svn:mime-type = application/xml
Added: celix/trunk/remote_services/discovery/private/src/discovery.c
URL: http://svn.apache.org/viewvc/celix/trunk/remote_services/discovery/private/src/discovery.c?rev=1626250&view=auto
==============================================================================
--- celix/trunk/remote_services/discovery/private/src/discovery.c (added)
+++ celix/trunk/remote_services/discovery/private/src/discovery.c Fri Sep 19 15:36:24 2014
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * discovery.c
+ *
+ * \date Aug 8, 2014
+ * \author <a href="mailto:celix-dev@incubator.apache.org">Apache Celix Project Team</a>
+ * \copyright Apache License, Version 2.0
+ */
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <stdbool.h>
+#include <netdb.h>
+#include <netinet/in.h>
+
+#include "constants.h"
+#include "celix_threads.h"
+#include "bundle_context.h"
+#include "array_list.h"
+#include "utils.h"
+#include "celix_errno.h"
+#include "filter.h"
+#include "service_reference.h"
+#include "service_registration.h"
+#include "remote_constants.h"
+#include "celix_log.h"
+#include "discovery.h"
+#include "discovery_impl.h"
+#include "endpoint_discovery_poller.h"
+#include "endpoint_discovery_server.h"
+
+
+celix_status_t discovery_endpointAdded(void *handle, endpoint_description_pt endpoint, char *matchedFilter) {
+ celix_status_t status = CELIX_SUCCESS;
+ discovery_pt discovery = handle;
+
+ fw_log(logger, OSGI_FRAMEWORK_LOG_INFO, "Endpoint for %s, with filter \"%s\" added...", endpoint->service, matchedFilter);
+
+ status = endpointDiscoveryServer_addEndpoint(discovery->server, endpoint);
+
+ return status;
+}
+
+celix_status_t discovery_endpointRemoved(void *handle, endpoint_description_pt endpoint, char *matchedFilter) {
+ celix_status_t status = CELIX_SUCCESS;
+ discovery_pt discovery = handle;
+
+ fw_log(logger, OSGI_FRAMEWORK_LOG_INFO, "Endpoint for %s, with filter \"%s\" removed...", endpoint->service, matchedFilter);
+
+ status = endpointDiscoveryServer_removeEndpoint(discovery->server, endpoint);
+
+ return status;
+}
+
+celix_status_t discovery_endpointListenerAdding(void* handle, service_reference_pt reference, void** service) {
+ celix_status_t status = CELIX_SUCCESS;
+ discovery_pt discovery = handle;
+
+ bundleContext_getService(discovery->context, reference, service);
+
+ return status;
+}
+
+celix_status_t discovery_endpointListenerAdded(void* handle, service_reference_pt reference, void* service) {
+ celix_status_t status = CELIX_SUCCESS;
+ discovery_pt discovery = handle;
+
+ service_registration_pt registration = NULL;
+ serviceReference_getServiceRegistration(reference, ®istration);
+
+ properties_pt serviceProperties = NULL;
+ serviceRegistration_getProperties(registration, &serviceProperties);
+
+ char *discoveryListener = properties_get(serviceProperties, "DISCOVERY");
+ char *scope = properties_get(serviceProperties, (char *) OSGI_ENDPOINT_LISTENER_SCOPE);
+ filter_pt filter = filter_create(scope);
+
+ if (discoveryListener != NULL && strcmp(discoveryListener, "true") == 0) {
+ fw_log(logger, OSGI_FRAMEWORK_LOG_INFO, "EndpointListener Ignored - Discovery listener");
+ } else {
+ celixThreadMutex_lock(&discovery->discoveredServicesMutex);
+
+ hash_map_iterator_pt iter = hashMapIterator_create(discovery->discoveredServices);
+ while (hashMapIterator_hasNext(iter)) {
+ endpoint_description_pt endpoint = hashMapIterator_nextKey(iter);
+
+ bool matchResult = false;
+ filter_match(filter, endpoint->properties, &matchResult);
+ if (matchResult) {
+ endpoint_listener_pt listener = service;
+
+ fw_log(logger, OSGI_FRAMEWORK_LOG_INFO, "EndpointListener Added - Add Scope");
+
+ listener->endpointAdded(listener, endpoint, NULL);
+ }
+ }
+ hashMapIterator_destroy(iter);
+
+ celixThreadMutex_unlock(&discovery->discoveredServicesMutex);
+
+ celixThreadMutex_lock(&discovery->listenerReferencesMutex);
+
+ hashMap_put(discovery->listenerReferences, reference, NULL);
+
+ celixThreadMutex_unlock(&discovery->listenerReferencesMutex);
+ }
+
+ filter_destroy(filter);
+
+ return status;
+}
+
+celix_status_t discovery_endpointListenerModified(void * handle, service_reference_pt reference, void * service) {
+ celix_status_t status = CELIX_SUCCESS;
+ discovery_pt discovery = handle;
+
+ status = discovery_endpointListenerRemoved(handle, reference, service);
+ status = discovery_endpointListenerAdded(handle, reference, service);
+
+ return status;
+}
+
+celix_status_t discovery_endpointListenerRemoved(void * handle, service_reference_pt reference, void * service) {
+ celix_status_t status = CELIX_SUCCESS;
+ discovery_pt discovery = handle;
+
+ status = celixThreadMutex_lock(&discovery->listenerReferencesMutex);
+
+ if (discovery->listenerReferences != NULL) {
+ if (hashMap_remove(discovery->listenerReferences, reference)) {
+ fw_log(logger, OSGI_FRAMEWORK_LOG_INFO, "EndpointListener Removed");
+ }
+ }
+
+ status = celixThreadMutex_unlock(&discovery->listenerReferencesMutex);
+
+ return status;
+}
+
+celix_status_t discovery_informEndpointListeners(discovery_pt discovery, endpoint_description_pt endpoint, bool endpointAdded) {
+ celix_status_t status = CELIX_SUCCESS;
+
+ // Inform listeners of new endpoint
+ status = celixThreadMutex_lock(&discovery->listenerReferencesMutex);
+
+ if (discovery->listenerReferences != NULL) {
+ hash_map_iterator_pt iter = hashMapIterator_create(discovery->listenerReferences);
+ while (hashMapIterator_hasNext(iter)) {
+ hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
+
+ service_reference_pt reference = hashMapEntry_getKey(entry);
+ endpoint_listener_pt listener = NULL;
+
+ service_registration_pt registration = NULL;
+ serviceReference_getServiceRegistration(reference, ®istration);
+
+ properties_pt serviceProperties = NULL;
+ serviceRegistration_getProperties(registration, &serviceProperties);
+ char *scope = properties_get(serviceProperties, (char *) OSGI_ENDPOINT_LISTENER_SCOPE);
+
+ filter_pt filter = filter_create(scope);
+ bool matchResult = false;
+
+ status = filter_match(filter, endpoint->properties, &matchResult);
+ if (matchResult) {
+ bundleContext_getService(discovery->context, reference, (void**) &listener);
+ if (endpointAdded) {
+ fw_log(logger, OSGI_FRAMEWORK_LOG_INFO, "Adding service (%s)", endpoint->service);
+
+ listener->endpointAdded(listener->handle, endpoint, scope);
+ } else {
+ fw_log(logger, OSGI_FRAMEWORK_LOG_INFO, "Removing service (%s)", endpoint->service);
+
+ listener->endpointRemoved(listener->handle, endpoint, scope);
+ }
+ }
+ }
+ hashMapIterator_destroy(iter);
+ }
+
+ status = celixThreadMutex_unlock(&discovery->listenerReferencesMutex);
+
+ return status;
+}
+
+celix_status_t discovery_addDiscoveredEndpoint(discovery_pt discovery, endpoint_description_pt endpoint) {
+ celix_status_t status = CELIX_SUCCESS;
+
+ status = celixThreadMutex_lock(&discovery->discoveredServicesMutex);
+
+ char* endpointId = endpoint->id;
+ bool exists = hashMap_get(discovery->discoveredServices, endpointId) != NULL;
+ if (!exists) {
+ hashMap_put(discovery->discoveredServices, endpointId, endpoint);
+ }
+
+ status = celixThreadMutex_unlock(&discovery->discoveredServicesMutex);
+
+ if (!exists) {
+ // notify our listeners that a new endpoint is available...
+ discovery_informEndpointListeners(discovery, endpoint, true /* addingService */);
+ }
+
+ return status;
+}
+
+celix_status_t discovery_removeDiscoveredEndpoint(discovery_pt discovery, endpoint_description_pt endpoint) {
+ celix_status_t status = CELIX_SUCCESS;
+
+ status = celixThreadMutex_lock(&discovery->discoveredServicesMutex);
+
+ char* endpointId = endpoint->id;
+ void* oldValue = hashMap_remove(discovery->discoveredServices, endpointId);
+
+ status = celixThreadMutex_unlock(&discovery->discoveredServicesMutex);
+
+ if (oldValue) {
+ status = discovery_informEndpointListeners(discovery, endpoint, false /* addingService */);
+ }
+
+ return status;
+}
Added: celix/trunk/remote_services/discovery/private/src/discovery_activator.c
URL: http://svn.apache.org/viewvc/celix/trunk/remote_services/discovery/private/src/discovery_activator.c?rev=1626250&view=auto
==============================================================================
--- celix/trunk/remote_services/discovery/private/src/discovery_activator.c (added)
+++ celix/trunk/remote_services/discovery/private/src/discovery_activator.c Fri Sep 19 15:36:24 2014
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * discovery_activator.c
+ *
+ * \date Aug 8, 2014
+ * \author <a href="mailto:celix-dev@incubator.apache.org">Apache Celix Project Team</a>
+ * \copyright Apache License, Version 2.0
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+
+#include "bundle_activator.h"
+#include "service_tracker.h"
+#include "service_registration.h"
+#include "constants.h"
+
+#include "celix_log.h"
+#include "discovery.h"
+#include "endpoint_listener.h"
+#include "remote_constants.h"
+
+struct activator {
+ bundle_context_pt context;
+ discovery_pt discovery;
+
+ service_tracker_pt endpointListenerTracker;
+ service_registration_pt endpointListenerService;
+};
+
+celix_status_t bundleActivator_createEPLTracker(struct activator *activator, service_tracker_pt *tracker) {
+ celix_status_t status = CELIX_SUCCESS;
+
+ service_tracker_customizer_pt customizer = NULL;
+
+ status = serviceTrackerCustomizer_create(activator->discovery,
+ discovery_endpointListenerAdding, discovery_endpointListenerAdded, discovery_endpointListenerModified, discovery_endpointListenerRemoved,
+ &customizer);
+
+ if (status == CELIX_SUCCESS) {
+ status = serviceTracker_create(activator->context, (char *) OSGI_ENDPOINT_LISTENER_SERVICE, customizer, tracker);
+ }
+
+ return status;
+}
+
+celix_status_t bundleActivator_create(bundle_context_pt context, void **userData) {
+ celix_status_t status = CELIX_SUCCESS;
+
+ struct activator* activator = malloc(sizeof(struct activator));
+ if (!activator) {
+ return CELIX_ENOMEM;
+ }
+
+ status = discovery_create(context, &activator->discovery);
+ if (status != CELIX_SUCCESS) {
+ return status;
+ }
+
+ activator->context = context;
+ activator->endpointListenerTracker = NULL;
+ activator->endpointListenerService = NULL;
+
+ status = bundleActivator_createEPLTracker(activator, &activator->endpointListenerTracker);
+
+ *userData = activator;
+
+ return status;
+}
+
+celix_status_t bundleActivator_start(void * userData, bundle_context_pt context) {
+ celix_status_t status = CELIX_SUCCESS;
+ struct activator *activator = userData;
+
+ char *uuid = NULL;
+ status = bundleContext_getProperty(context, OSGI_FRAMEWORK_FRAMEWORK_UUID, &uuid);
+ if (!uuid) {
+ fw_log(logger, OSGI_FRAMEWORK_LOG_DEBUG, "no framework UUID defined?!");
+ return CELIX_ILLEGAL_STATE;
+ }
+
+ size_t len = 11 + strlen(OSGI_FRAMEWORK_OBJECTCLASS) + strlen(OSGI_RSA_ENDPOINT_FRAMEWORK_UUID) + strlen(uuid);
+ char *scope = malloc(len);
+ if (!scope) {
+ return CELIX_ENOMEM;
+ }
+
+ sprintf(scope, "(&(%s=*)(%s=%s))", OSGI_FRAMEWORK_OBJECTCLASS, OSGI_RSA_ENDPOINT_FRAMEWORK_UUID, uuid);
+ scope[len] = 0;
+
+ fw_log(logger, OSGI_FRAMEWORK_LOG_DEBUG, "using scope %s.", scope);
+
+ endpoint_listener_pt endpointListener = malloc(sizeof(struct endpoint_listener));
+ if (!endpointListener) {
+ return CELIX_ENOMEM;
+ }
+
+ endpointListener->handle = activator->discovery;
+ endpointListener->endpointAdded = discovery_endpointAdded;
+ endpointListener->endpointRemoved = discovery_endpointRemoved;
+
+ properties_pt props = properties_create();
+ properties_set(props, "DISCOVERY", "true");
+ properties_set(props, (char *) OSGI_ENDPOINT_LISTENER_SCOPE, scope);
+
+ status = bundleContext_registerService(context, (char *) OSGI_ENDPOINT_LISTENER_SERVICE, endpointListener, props, &activator->endpointListenerService);
+
+ if (status == CELIX_SUCCESS) {
+ status = serviceTracker_open(activator->endpointListenerTracker);
+ }
+
+ if (status == CELIX_SUCCESS) {
+ status = discovery_start(activator->discovery);
+ }
+
+ // We can release the scope, as properties_set makes a copy of the key & value...
+ free(scope);
+
+ return status;
+}
+
+celix_status_t bundleActivator_stop(void * userData, bundle_context_pt context) {
+ celix_status_t status = CELIX_SUCCESS;
+ struct activator *activator = userData;
+
+ status = serviceTracker_close(activator->endpointListenerTracker);
+
+ status = serviceRegistration_unregister(activator->endpointListenerService);
+
+ status = discovery_stop(activator->discovery);
+
+ return status;
+}
+
+celix_status_t bundleActivator_destroy(void * userData, bundle_context_pt context) {
+ celix_status_t status = CELIX_SUCCESS;
+ struct activator *activator = userData;
+
+ status = serviceTracker_destroy(activator->endpointListenerTracker);
+
+// status = serviceRegistration_destroy(activator->endpointListenerService);
+
+ status = discovery_destroy(activator->discovery);
+
+ activator->endpointListenerTracker = NULL;
+ activator->endpointListenerService = NULL;
+ activator->discovery = NULL;
+ activator->context = NULL;
+
+ free(activator);
+
+ return status;
+}
Added: celix/trunk/remote_services/discovery/private/src/endpoint_descriptor_reader.c
URL: http://svn.apache.org/viewvc/celix/trunk/remote_services/discovery/private/src/endpoint_descriptor_reader.c?rev=1626250&view=auto
==============================================================================
--- celix/trunk/remote_services/discovery/private/src/endpoint_descriptor_reader.c (added)
+++ celix/trunk/remote_services/discovery/private/src/endpoint_descriptor_reader.c Fri Sep 19 15:36:24 2014
@@ -0,0 +1,370 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership. The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * endpoint_description_reader.c
+ *
+ * \date 24 Jul 2014
+ * \author <a href="mailto:celix-dev@incubator.apache.org">Apache Celix Project Team</a>
+ * \copyright Apache License, Version 2.0
+ */
+
+#include <stdbool.h>
+#include <string.h>
+#include <libxml/xmlreader.h>
+
+#include "celix_log.h"
+#include "constants.h"
+#include "remote_constants.h"
+
+#include "endpoint_description.h"
+#include "endpoint_descriptor_common.h"
+#include "endpoint_descriptor_reader.h"
+#include "properties.h"
+#include "utils.h"
+
+struct endpoint_descriptor_reader {
+ xmlTextReaderPtr reader;
+};
+
+celix_status_t endpointDescriptorReader_create(endpoint_descriptor_reader_pt *reader) {
+ celix_status_t status = CELIX_SUCCESS;
+
+ *reader = malloc(sizeof(**reader));
+ if (!reader) {
+ status = CELIX_ENOMEM;
+ } else {
+ (*reader)->reader = NULL;
+ }
+
+ return status;
+}
+
+celix_status_t endpointDescriptorReader_destroy(endpoint_descriptor_reader_pt reader) {
+ celix_status_t status = CELIX_SUCCESS;
+
+ free(reader);
+
+ return status;
+}
+
+void endpointDescriptorReader_addSingleValuedProperty(properties_pt properties, const xmlChar* name, const xmlChar* value) {
+ properties_set(properties, strdup((char *) name), strdup((char *) value));
+}
+
+void endpointDescriptorReader_addMultiValuedProperty(properties_pt properties, const xmlChar* name, array_list_pt values) {
+ char *value = calloc(256, sizeof(*value));
+ if (value) {
+ int i, size = arrayList_size(values);
+ for (i = 0; i < size; i++) {
+ char* item = (char*) arrayList_get(values, i);
+ if (i > 0) {
+ value = strcat(value, ",");
+ }
+ value = strcat(value, item);
+ }
+
+ properties_set(properties, strdup((char *) name), strdup(value));
+
+ free(value);
+ }
+}
+
+celix_status_t endpointDescriptorReader_parseDocument(endpoint_descriptor_reader_pt reader, char *document, array_list_pt *endpoints) {
+ celix_status_t status = CELIX_SUCCESS;
+
+ reader->reader = xmlReaderForMemory(document, strlen(document), NULL, "UTF-8", 0);
+ if (reader == NULL) {
+ status = CELIX_BUNDLE_EXCEPTION;
+ } else {
+ bool inProperty = false;
+ bool inXml = false;
+ bool inArray = false;
+ bool inList = false;
+ bool inSet = false;
+ bool inValue = false;
+
+ const xmlChar *propertyName = NULL;
+ const xmlChar *propertyValue = NULL;
+ valueType propertyType = VALUE_TYPE_STRING;
+ xmlChar *valueBuffer = xmlMalloc(256);
+ valueBuffer[0] = '\0';
+ unsigned int currentSize = 255;
+
+ array_list_pt propertyValues = NULL;
+ arrayList_create(&propertyValues);
+
+ array_list_pt endpointDescriptions = NULL;
+ if (*endpoints) {
+ // use the given arraylist...
+ endpointDescriptions = *endpoints;
+ } else {
+ arrayList_create(&endpointDescriptions);
+ // return the read endpoints...
+ *endpoints = endpointDescriptions;
+ }
+
+ properties_pt endpointProperties = NULL;
+
+ int read = xmlTextReaderRead(reader->reader);
+ while (read == XML_TEXTREADER_MODE_INTERACTIVE) {
+ int type = xmlTextReaderNodeType(reader->reader);
+
+ if (type == XML_READER_TYPE_ELEMENT) {
+ const xmlChar *localname = xmlTextReaderConstLocalName(reader->reader);
+
+ if (inXml) {
+ valueBuffer = xmlStrcat(valueBuffer, BAD_CAST "<");
+ valueBuffer = xmlStrcat(valueBuffer, localname);
+
+ int i = xmlTextReaderMoveToFirstAttribute(reader->reader);
+ while (i == 1) {
+ const xmlChar *name = xmlTextReaderConstName(reader->reader);
+ const xmlChar *value = xmlTextReaderConstValue(reader->reader);
+
+ valueBuffer = xmlStrcat(valueBuffer, BAD_CAST " ");
+ valueBuffer = xmlStrcat(valueBuffer, name);
+ valueBuffer = xmlStrcat(valueBuffer, BAD_CAST "=\"");
+ valueBuffer = xmlStrcat(valueBuffer, BAD_CAST value);
+ valueBuffer = xmlStrcat(valueBuffer, BAD_CAST "\"");
+
+ i = xmlTextReaderMoveToNextAttribute(reader->reader);
+ }
+
+ valueBuffer = xmlStrcat(valueBuffer, BAD_CAST ">");
+ } else if (xmlStrcmp(localname, ENDPOINT_DESCRIPTION) == 0) {
+ endpointProperties = properties_create();
+ } else if (xmlStrcmp(localname, PROPERTY) == 0) {
+ inProperty = true;
+
+ propertyName = xmlTextReaderGetAttribute(reader->reader, NAME);
+ propertyValue = xmlTextReaderGetAttribute(reader->reader, VALUE);
+ xmlChar* type = xmlTextReaderGetAttribute(reader->reader, VALUE_TYPE);
+ propertyType = valueTypeFromString((char*) type);
+ arrayList_clear(propertyValues);
+
+ if (xmlTextReaderIsEmptyElement(reader->reader)) {
+ inProperty = false;
+
+ if (propertyValue != NULL) {
+ if (propertyType != VALUE_TYPE_STRING && strcmp(OSGI_RSA_ENDPOINT_SERVICE_ID, (char*) propertyName)) {
+ fw_log(logger, OSGI_FRAMEWORK_LOG_WARNING, "ENDPOINT_DESCRIPTOR_READER: Only single-valued string supported for %s\n", propertyName);
+ }
+ endpointDescriptorReader_addSingleValuedProperty(endpointProperties, propertyName, propertyValue);
+ }
+
+ xmlFree((void *) propertyName);
+ xmlFree((void *) propertyValue);
+ xmlFree((void *) type);
+ }
+ } else {
+ valueBuffer[0] = 0;
+ inArray |= inProperty && xmlStrcmp(localname, ARRAY) == 0;
+ inList |= inProperty && xmlStrcmp(localname, LIST) == 0;
+ inSet |= inProperty && xmlStrcmp(localname, SET) == 0;
+ inXml |= inProperty && xmlStrcmp(localname, XML) == 0;
+ inValue |= inProperty && xmlStrcmp(localname, VALUE) == 0;
+ }
+ } else if (type == XML_READER_TYPE_END_ELEMENT) {
+ const xmlChar *localname = xmlTextReaderConstLocalName(reader->reader);
+
+ if (inXml) {
+ if (xmlStrcmp(localname, XML) != 0) {
+ valueBuffer = xmlStrcat(valueBuffer, BAD_CAST "</");
+ valueBuffer = xmlStrcat(valueBuffer, localname);
+ valueBuffer = xmlStrcat(valueBuffer, BAD_CAST ">");
+ }
+ else {
+ inXml = false;
+ }
+ } else if (xmlStrcmp(localname, ENDPOINT_DESCRIPTION) == 0) {
+ endpoint_description_pt endpointDescription = NULL;
+ // Completely parsed endpoint description, add it to our list of results...
+ endpointDescription_create(endpointProperties, &endpointDescription);
+ arrayList_add(endpointDescriptions, endpointDescription);
+
+ endpointProperties = properties_create();
+ } else if (xmlStrcmp(localname, PROPERTY) == 0) {
+ inProperty = false;
+
+ if (inArray || inList || inSet) {
+ endpointDescriptorReader_addMultiValuedProperty(endpointProperties, propertyName, propertyValues);
+ }
+ else if (propertyValue != NULL) {
+ if (propertyType != VALUE_TYPE_STRING) {
+ fw_log(logger, OSGI_FRAMEWORK_LOG_WARNING, "ENDPOINT_DESCRIPTOR_READER: Only string support for %s\n", propertyName);
+ }
+ endpointDescriptorReader_addSingleValuedProperty(endpointProperties, propertyName, propertyValue);
+
+ xmlFree((void *) propertyValue);
+ }
+ else {
+ endpointDescriptorReader_addSingleValuedProperty(endpointProperties, propertyName, valueBuffer);
+ }
+
+ xmlFree((void *) propertyName);
+ arrayList_clear(propertyValues);
+
+ propertyType = VALUE_TYPE_STRING;
+ inArray = false;
+ inList = false;
+ inSet = false;
+ inXml = false;
+ } else if (xmlStrcmp(localname, VALUE) == 0) {
+ arrayList_add(propertyValues, strdup((char*) valueBuffer));
+ valueBuffer[0] = 0;
+ inValue = false;
+ }
+ } else if (type == XML_READER_TYPE_TEXT) {
+ if (inValue || inXml) {
+ const xmlChar *value = xmlTextReaderValue(reader->reader);
+ valueBuffer = xmlStrcat(valueBuffer, value);
+ xmlFree((void *)value);
+ }
+ }
+
+ read = xmlTextReaderRead(reader->reader);
+ }
+
+ arrayList_destroy(propertyValues);
+ xmlFree(valueBuffer);
+
+ xmlFreeTextReader(reader->reader);
+ }
+
+ return status;
+}
+
+static valueType valueTypeFromString(char *name) {
+ if (name == NULL || strcmp(name, "") == 0 || strcmp(name, "String") == 0) {
+ return VALUE_TYPE_STRING;
+ } else if (strcmp(name, "long") == 0 || strcmp(name, "Long") == 0) {
+ return VALUE_TYPE_LONG;
+ } else if (strcmp(name, "double") == 0 || strcmp(name, "Double") == 0) {
+ return VALUE_TYPE_DOUBLE;
+ } else if (strcmp(name, "float") == 0 || strcmp(name, "Float") == 0) {
+ return VALUE_TYPE_FLOAT;
+ } else if (strcmp(name, "int") == 0 || strcmp(name, "integer") == 0 || strcmp(name, "Integer") == 0) {
+ return VALUE_TYPE_INTEGER;
+ } else if (strcmp(name, "short") == 0 || strcmp(name, "Short") == 0) {
+ return VALUE_TYPE_SHORT;
+ } else if (strcmp(name, "byte") == 0 || strcmp(name, "Byte") == 0) {
+ return VALUE_TYPE_BYTE;
+ } else if (strcmp(name, "char") == 0 || strcmp(name, "Character") == 0) {
+ return VALUE_TYPE_CHAR;
+ } else if (strcmp(name, "boolean") == 0 || strcmp(name, "Boolean") == 0) {
+ return VALUE_TYPE_BOOLEAN;
+ } else {
+ return VALUE_TYPE_STRING;
+ }
+}
+
+#ifdef RSA_ENDPOINT_TEST_READER
+int main() {
+ array_list_pt list = NULL;
+ endpoint_descriptor_reader_pt reader = NULL;
+
+ char *doc = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>"
+"<endpoint-descriptions xmlns=\"http://www.osgi.org/xmlns/rsa/v1.0.0\">"
+ "<endpoint-description>"
+ "<property name=\"endpoint.service.id\" value-type=\"long\" value=\"6\"/>"
+ "<property name=\"endpoint.framework.uuid\" value=\"2983D849-93B1-4C2C-AC6D-5BCDA93ACB96\"/>"
+ "<property name=\"service.intents\">"
+ "<list>"
+ "<value>SOAP</value>"
+ "<value>HTTP</value>"
+ "</list>"
+ "</property>"
+ "<property name=\"endpoint.id\" value=\"11111111-1111-1111-1111-111111111111\" />"
+ "<property name=\"objectClass\"><array><value>com.acme.Foo</value></array></property>"
+ "<property name=\"endpoint.package.version.com.acme\" value=\"4.2\" />"
+ "<property name=\"service.imported.configs\" value=\"com.acme\" />"
+ "<property name=\"service.imported\" value=\"true\"/>"
+ "<property name=\"com.acme.ws.xml\">"
+ "<xml>"
+ "<config xmlns=\"http://acme.com/defs\">"
+ "<port>1029</port>"
+ "<host>www.acme.com</host>"
+ "</config>"
+ "</xml>"
+ "</property>"
+ "</endpoint-description>"
+ "<endpoint-description>"
+ "<property name=\"endpoint.service.id\" value-type=\"long\" value=\"5\"/>"
+ "<property name=\"endpoint.framework.uuid\" value=\"2983D849-93B1-4C2C-AC6D-5BCDA93ACB96\"/>"
+ "<property name=\"service.intents\">"
+ "<list>"
+ "<value>SOAP</value>"
+ "<value>HTTP</value>"
+ "</list>"
+ "</property>"
+ "<property name=\"endpoint.id\" value=\"22222222-2222-2222-2222-222222222222\" />"
+ "<property name=\"objectClass\"><array><value>com.acme.Bar</value></array></property>"
+ "<property name=\"endpoint.package.version.com.acme\" value=\"4.2\" />"
+ "<property name=\"service.imported.configs\" value=\"com.acme\" />"
+ "<property name=\"com.acme.ws.xml\">"
+ "<xml>"
+ "<config xmlns=\"http://acme.com/defs\">"
+ "<port>1029</port>"
+ "<host>www.acme.com</host>"
+ "</config>"
+ "</xml>"
+ "</property>"
+ "</endpoint-description>"
+ "</endpoint-descriptions>";
+
+ endpointDescriptorReader_create(&reader);
+
+ endpointDescriptorReader_parseDocument(reader, doc, &list);
+
+ int i;
+ for (i = 0; i < arrayList_size(list); i++) {
+ printf("\nEndpoint description #%d:\n", (i+1));
+ endpoint_description_pt edp = arrayList_get(list, i);
+ printf("Id: %s\n", edp->id);
+ printf("Service Id: %ld\n", edp->serviceId);
+ printf("Framework UUID: %s\n", edp->frameworkUUID);
+ printf("Service: %s\n", edp->service);
+
+ properties_pt props = edp->properties;
+ if (props) {
+ printf("Service properties:\n");
+ hash_map_iterator_pt iter = hashMapIterator_create(props);
+ while (hashMapIterator_hasNext(iter)) {
+ hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
+
+ printf("- %s => '%s'\n", hashMapEntry_getKey(entry), hashMapEntry_getValue(entry));
+ }
+ hashMapIterator_destroy(iter);
+ } else {
+ printf("No service properties...\n");
+ }
+
+
+ endpointDescription_destroy(edp);
+ }
+
+ if (list != NULL) {
+ arrayList_destroy(list);
+ }
+
+ endpointDescriptorReader_destroy(reader);
+
+ return 0;
+}
+#endif
Added: celix/trunk/remote_services/discovery/private/src/endpoint_descriptor_writer.c
URL: http://svn.apache.org/viewvc/celix/trunk/remote_services/discovery/private/src/endpoint_descriptor_writer.c?rev=1626250&view=auto
==============================================================================
--- celix/trunk/remote_services/discovery/private/src/endpoint_descriptor_writer.c (added)
+++ celix/trunk/remote_services/discovery/private/src/endpoint_descriptor_writer.c Fri Sep 19 15:36:24 2014
@@ -0,0 +1,232 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership. The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * endpoint_description_writer.c
+ *
+ * \date 26 Jul 2014
+ * \author <a href="mailto:celix-dev@incubator.apache.org">Apache Celix Project Team</a>
+ * \copyright Apache License, Version 2.0
+ */
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <libxml/xmlwriter.h>
+
+#include "constants.h"
+#include "remote_constants.h"
+
+#include "endpoint_description.h"
+#include "endpoint_descriptor_common.h"
+#include "endpoint_descriptor_writer.h"
+
+struct endpoint_descriptor_writer {
+ xmlBufferPtr buffer;
+ xmlTextWriterPtr writer;
+};
+
+static celix_status_t endpointDescriptorWriter_writeEndpoint(endpoint_descriptor_writer_pt writer, endpoint_description_pt endpoint);
+
+celix_status_t endpointDescriptorWriter_create(endpoint_descriptor_writer_pt *writer) {
+ celix_status_t status = CELIX_SUCCESS;
+
+ *writer = malloc(sizeof(**writer));
+ if (!writer) {
+ status = CELIX_ENOMEM;
+ } else {
+ (*writer)->buffer = xmlBufferCreate();
+ if ((*writer)->buffer == NULL) {
+ status = CELIX_BUNDLE_EXCEPTION;
+ } else {
+ (*writer)->writer = xmlNewTextWriterMemory((*writer)->buffer, 0);
+ if ((*writer)->writer == NULL) {
+ status = CELIX_BUNDLE_EXCEPTION;
+ }
+ }
+ }
+
+ return status;
+}
+
+celix_status_t endpointDescriptorWriter_destroy(endpoint_descriptor_writer_pt writer) {
+ xmlFreeTextWriter(writer->writer);
+ xmlBufferFree(writer->buffer);
+ free(writer);
+ return CELIX_SUCCESS;
+}
+
+celix_status_t endpointDescriptorWriter_writeDocument(endpoint_descriptor_writer_pt writer, array_list_pt endpoints, char **document) {
+ celix_status_t status = CELIX_SUCCESS;
+ int rc;
+
+ rc = xmlTextWriterStartDocument(writer->writer, NULL, "UTF-8", NULL);
+ if (rc < 0) {
+ status = CELIX_BUNDLE_EXCEPTION;
+ } else {
+ rc = xmlTextWriterStartElementNS(writer->writer, NULL, ENDPOINT_DESCRIPTIONS, XMLNS);
+ if (rc < 0) {
+ status = CELIX_BUNDLE_EXCEPTION;
+ } else {
+ int i;
+ for (i = 0; i < arrayList_size(endpoints); i++) {
+ endpoint_description_pt endpoint = arrayList_get(endpoints, i);
+ status = endpointDescriptorWriter_writeEndpoint(writer, endpoint);
+ }
+ if (status == CELIX_SUCCESS) {
+ rc = xmlTextWriterEndElement(writer->writer);
+ if (rc < 0) {
+ status = CELIX_BUNDLE_EXCEPTION;
+ } else {
+ rc = xmlTextWriterEndDocument(writer->writer);
+ if (rc < 0) {
+ status = CELIX_BUNDLE_EXCEPTION;
+ } else {
+ *document = (char *) writer->buffer->content;
+ }
+ }
+ }
+ }
+ }
+
+ return status;
+}
+
+static celix_status_t endpointDescriptorWriter_writeArrayValue(xmlTextWriterPtr writer, const xmlChar* value) {
+ xmlTextWriterStartElement(writer, ARRAY);
+ xmlTextWriterStartElement(writer, VALUE);
+ xmlTextWriterWriteString(writer, value);
+ xmlTextWriterEndElement(writer); // value
+ xmlTextWriterEndElement(writer); // array
+
+ return CELIX_SUCCESS;
+}
+
+static celix_status_t endpointDescriptorWriter_writeTypedValue(xmlTextWriterPtr writer, valueType type, const xmlChar* value) {
+ xmlTextWriterWriteAttribute(writer, VALUE_TYPE, (const xmlChar*) valueTypeToString(type));
+ xmlTextWriterWriteAttribute(writer, VALUE, value);
+
+ return CELIX_SUCCESS;
+}
+
+static celix_status_t endpointDescriptorWriter_writeUntypedValue(xmlTextWriterPtr writer, const xmlChar* value) {
+ xmlTextWriterWriteAttribute(writer, VALUE, value);
+
+ return CELIX_SUCCESS;
+}
+
+static celix_status_t endpointDescriptorWriter_writeEndpoint(endpoint_descriptor_writer_pt writer, endpoint_description_pt endpoint) {
+ celix_status_t status = CELIX_SUCCESS;
+
+ if (endpoint == NULL || writer == NULL) {
+ status = CELIX_ILLEGAL_ARGUMENT;
+ } else {
+ xmlTextWriterStartElement(writer->writer, ENDPOINT_DESCRIPTION);
+
+ hash_map_iterator_pt iter = hashMapIterator_create(endpoint->properties);
+ while (hashMapIterator_hasNext(iter)) {
+ hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
+
+ void* propertyName = hashMapEntry_getKey(entry);
+ const xmlChar* propertyValue = (const xmlChar*) hashMapEntry_getValue(entry);
+
+ xmlTextWriterStartElement(writer->writer, PROPERTY);
+ xmlTextWriterWriteAttribute(writer->writer, NAME, propertyName);
+
+ if (strcmp(OSGI_FRAMEWORK_OBJECTCLASS, (char*) propertyName) == 0) {
+ // objectClass *must* be represented as array of string values...
+ endpointDescriptorWriter_writeArrayValue(writer->writer, propertyValue);
+ } else if (strcmp(OSGI_RSA_ENDPOINT_SERVICE_ID, (char*) propertyName) == 0) {
+ // endpoint.service.id *must* be represented as long value...
+ endpointDescriptorWriter_writeTypedValue(writer->writer, VALUE_TYPE_LONG, propertyValue);
+ } else {
+ // represent all other values as plain string values...
+ endpointDescriptorWriter_writeUntypedValue(writer->writer, propertyValue);
+ }
+
+ xmlTextWriterEndElement(writer->writer);
+ }
+ hashMapIterator_destroy(iter);
+
+ xmlTextWriterEndElement(writer->writer);
+ }
+
+ return status;
+}
+
+
+static char* valueTypeToString(valueType type) {
+ switch (type) {
+ case VALUE_TYPE_BOOLEAN:
+ return "boolean";
+ case VALUE_TYPE_BYTE:
+ return "byte";
+ case VALUE_TYPE_CHAR:
+ return "char";
+ case VALUE_TYPE_DOUBLE:
+ return "double";
+ case VALUE_TYPE_FLOAT:
+ return "float";
+ case VALUE_TYPE_INTEGER:
+ return "int";
+ case VALUE_TYPE_LONG:
+ return "long";
+ case VALUE_TYPE_SHORT:
+ return "short";
+ case VALUE_TYPE_STRING:
+ // FALL-THROUGH!
+ default:
+ return "string";
+ }
+}
+
+#ifdef RSA_ENDPOINT_TEST_WRITER
+int main() {
+ endpoint_descriptor_writer_pt writer = NULL;
+ endpointDescriptorWriter_create(&writer);
+ array_list_pt list = NULL;
+ arrayList_create(&list);
+
+ properties_pt props = properties_create();
+ properties_set(props, "objectClass", "com.acme.Foo");
+ properties_set(props, "endpoint.service.id", "3");
+ properties_set(props, "endpoint.id", "abcdefghijklmnopqrstuvwxyz");
+ properties_set(props, "endpoint.framework.uuid", "2983D849-93B1-4C2C-AC6D-5BCDA93ACB96");
+ endpoint_description_pt epd = NULL;
+ endpointDescription_create(props, &epd);
+ arrayList_add(list, epd);
+
+ properties_pt props2 = properties_create();
+ properties_set(props2, "objectClass", "com.acme.Bar");
+ properties_set(props, "endpoint.service.id", "4");
+ properties_set(props, "endpoint.id", "abcdefghijklmnopqrstuvwxyz");
+ properties_set(props, "endpoint.framework.uuid", "2983D849-93B1-4C2C-AC6D-5BCDA93ACB96");
+ endpoint_description_pt epd2 = NULL;
+ endpointDescription_create(props2, &epd2);
+ arrayList_add(list, epd2);
+
+ char *buffer = NULL;
+ endpointDescriptorWriter_writeDocument(writer, list, &buffer);
+
+ arrayList_destroy(list);
+ endpointDescription_destroy(epd);
+ endpointDescription_destroy(epd2);
+ endpointDescriptorWriter_destroy(writer);
+
+ printf("%s\n", buffer);
+}
+#endif
Added: celix/trunk/remote_services/discovery/private/src/endpoint_discovery_poller.c
URL: http://svn.apache.org/viewvc/celix/trunk/remote_services/discovery/private/src/endpoint_discovery_poller.c?rev=1626250&view=auto
==============================================================================
--- celix/trunk/remote_services/discovery/private/src/endpoint_discovery_poller.c (added)
+++ celix/trunk/remote_services/discovery/private/src/endpoint_discovery_poller.c Fri Sep 19 15:36:24 2014
@@ -0,0 +1,338 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * endpoint_discovery_poller.c
+ *
+ * \date 3 Jul 2014
+ * \author <a href="mailto:celix-dev@incubator.apache.org">Apache Celix Project Team</a>
+ * \copyright Apache License, Version 2.0
+ */
+#include <stdbool.h>
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <unistd.h>
+
+#include <curl/curl.h>
+
+#include "hash_map.h"
+#include "array_list.h"
+#include "celix_log.h"
+#include "celix_threads.h"
+#include "utils.h"
+
+#include "discovery_impl.h"
+
+#include "endpoint_listener.h"
+#include "endpoint_discovery_poller.h"
+#include "endpoint_descriptor_reader.h"
+
+struct endpoint_discovery_poller {
+ discovery_pt discovery;
+ hash_map_pt entries;
+
+ celix_thread_mutex_t pollerLock;
+ celix_thread_t pollerThread;
+
+ unsigned int poll_interval;
+ volatile bool running;
+};
+
+#define DISCOVERY_POLL_INTERVAL "DISCOVERY_CFG_POLL_INTERVAL"
+#define DEFAULT_POLL_INTERVAL "10"
+
+static void *endpointDiscoveryPoller_poll(void *data);
+static celix_status_t endpointDiscoveryPoller_getEndpoints(endpoint_discovery_poller_pt poller, char *url, array_list_pt *updatedEndpoints);
+static celix_status_t endpointDiscoveryPoller_endpointDescriptionEquals(void *endpointPtr, void *comparePtr, bool *equals);
+
+/**
+ * Allocates memory and initializes a new endpoint_discovery_poller instance.
+ */
+celix_status_t endpointDiscoveryPoller_create(discovery_pt discovery, bundle_context_pt context, endpoint_discovery_poller_pt *poller) {
+ celix_status_t status = CELIX_SUCCESS;
+
+ *poller = malloc(sizeof(struct endpoint_discovery_poller));
+ if (!poller) {
+ return CELIX_ENOMEM;
+ }
+
+ status = celixThreadMutex_create(&(*poller)->pollerLock, NULL);
+ if (status != CELIX_SUCCESS) {
+ return status;
+ }
+
+ char* interval = NULL;
+ status = bundleContext_getProperty(context, DISCOVERY_POLL_INTERVAL, &interval);
+ if (!interval) {
+ interval = DEFAULT_POLL_INTERVAL;
+ }
+
+ char* endpoints = NULL;
+ status = bundleContext_getProperty(context, DISCOVERY_POLL_ENDPOINTS, &endpoints);
+ if (!endpoints) {
+ endpoints = DEFAULT_POLL_ENDPOINTS;
+ }
+ // we're going to mutate the string with strtok, so create a copy...
+ endpoints = strdup(endpoints);
+
+ (*poller)->poll_interval = atoi(interval);
+ (*poller)->discovery = discovery;
+ (*poller)->entries = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+
+ const char* sep = ",";
+ char* tok = strtok(endpoints, sep);
+ while (tok) {
+ endpointDiscoveryPoller_addDiscoveryEndpoint(*poller, strdup(utils_stringTrim(tok)));
+ tok = strtok(NULL, sep);
+ }
+ // Clean up after ourselves...
+ free(endpoints);
+
+ status = celixThreadMutex_lock(&(*poller)->pollerLock);
+ if (status != CELIX_SUCCESS) {
+ return CELIX_BUNDLE_EXCEPTION;
+ }
+
+ status = celixThread_create(&(*poller)->pollerThread, NULL, endpointDiscoveryPoller_poll, *poller);
+ if (status != CELIX_SUCCESS) {
+ return status;
+ }
+
+ (*poller)->running = true;
+
+ status = celixThreadMutex_unlock(&(*poller)->pollerLock);
+
+ return status;
+}
+
+/**
+ * Destroys and frees up memory for a given endpoint_discovery_poller struct.
+ */
+celix_status_t endpointDiscoveryPoller_destroy(endpoint_discovery_poller_pt poller) {
+ celix_status_t status = CELIX_SUCCESS;
+
+ poller->running = false;
+
+ celixThread_join(poller->pollerThread, NULL);
+
+ status = celixThreadMutex_lock(&poller->pollerLock);
+ if (status != CELIX_SUCCESS) {
+ return CELIX_BUNDLE_EXCEPTION;
+ }
+
+ hashMap_destroy(poller->entries, true, false);
+
+ status = celixThreadMutex_unlock(&poller->pollerLock);
+
+ free(poller);
+
+ return status;
+}
+
+/**
+ * Adds a new endpoint URL to the list of polled endpoints.
+ */
+celix_status_t endpointDiscoveryPoller_addDiscoveryEndpoint(endpoint_discovery_poller_pt poller, char *url) {
+ celix_status_t status = CELIX_SUCCESS;
+
+ status = celixThreadMutex_lock(&(poller)->pollerLock);
+ if (status != CELIX_SUCCESS) {
+ return CELIX_BUNDLE_EXCEPTION;
+ }
+
+ // Avoid memory leaks when adding an already existing URL...
+ array_list_pt endpoints = hashMap_get(poller->entries, url);
+ if (endpoints == NULL) {
+ status = arrayList_createWithEquals(endpointDiscoveryPoller_endpointDescriptionEquals, &endpoints);
+
+ if (status == CELIX_SUCCESS) {
+ hashMap_put(poller->entries, url, endpoints);
+ }
+ }
+
+ status = celixThreadMutex_unlock(&poller->pollerLock);
+
+ return status;
+}
+
+/**
+ * Removes an endpoint URL from the list of polled endpoints.
+ */
+celix_status_t endpointDiscoveryPoller_removeDiscoveryEndpoint(endpoint_discovery_poller_pt poller, char *url) {
+ celix_status_t status = CELIX_SUCCESS;
+
+ status = celixThreadMutex_lock(&poller->pollerLock);
+ if (status != CELIX_SUCCESS) {
+ return CELIX_BUNDLE_EXCEPTION;
+ }
+
+ array_list_pt entries = hashMap_remove(poller->entries, url);
+ for (int i = 0; i < arrayList_size(entries); i++) {
+ endpoint_description_pt endpoint = arrayList_get(entries, i);
+
+ discovery_removeDiscoveredEndpoint(poller->discovery, endpoint);
+ }
+ arrayList_destroy(entries);
+
+ status = celixThreadMutex_unlock(&poller->pollerLock);
+
+ return status;
+}
+
+static void *endpointDiscoveryPoller_poll(void *data) {
+ endpoint_discovery_poller_pt poller = (endpoint_discovery_poller_pt) data;
+ discovery_pt discovery = poller->discovery;
+
+ useconds_t interval = poller->poll_interval * 1000000L;
+
+ while (poller->running) {
+ usleep(interval);
+
+ celix_status_t status = celixThreadMutex_lock(&poller->pollerLock);
+ if (status != CELIX_SUCCESS) {
+ fw_log(logger, OSGI_FRAMEWORK_LOG_WARNING, "ENDPOINT_POLLER: failed to obtain lock; retrying...");
+ continue;
+ }
+
+ hash_map_iterator_pt iterator = hashMapIterator_create(poller->entries);
+ while (hashMapIterator_hasNext(iterator)) {
+ hash_map_entry_pt entry = hashMapIterator_nextEntry(iterator);
+
+ char *url = hashMapEntry_getKey(entry);
+ array_list_pt currentEndpoints = hashMapEntry_getValue(entry);
+
+ array_list_pt updatedEndpoints = NULL;
+ // create an arraylist with a custom equality test to ensure we can find endpoints properly...
+ status = arrayList_createWithEquals(endpointDiscoveryPoller_endpointDescriptionEquals, &updatedEndpoints);
+
+ status = endpointDiscoveryPoller_getEndpoints(poller, url, &updatedEndpoints);
+ if (status != CELIX_SUCCESS) {
+ continue;
+ }
+
+ for (int i = 0; i < arrayList_size(currentEndpoints); i++) {
+ endpoint_description_pt endpoint = arrayList_get(currentEndpoints, i);
+ if (!arrayList_contains(updatedEndpoints, endpoint)) {
+ status = discovery_removeDiscoveredEndpoint(poller->discovery, endpoint);
+ }
+ }
+
+ arrayList_clear(currentEndpoints);
+ if (updatedEndpoints) {
+ arrayList_addAll(currentEndpoints, updatedEndpoints);
+ arrayList_destroy(updatedEndpoints);
+ }
+
+ for (int i = 0; i < arrayList_size(currentEndpoints); i++) {
+ endpoint_description_pt endpoint = arrayList_get(currentEndpoints, i);
+
+ status = discovery_addDiscoveredEndpoint(poller->discovery, endpoint);
+ }
+ }
+
+ status = celixThreadMutex_unlock(&poller->pollerLock);
+ if (status != CELIX_SUCCESS) {
+ fw_log(logger, OSGI_FRAMEWORK_LOG_WARNING, "ENDPOINT_POLLER: failed to release lock; retrying...");
+ }
+ }
+
+ return NULL;
+}
+
+struct MemoryStruct {
+ char *memory;
+ size_t size;
+};
+
+static size_t endpointDiscoveryPoller_writeMemory(void *contents, size_t size, size_t nmemb, void *memoryPtr) {
+ size_t realsize = size * nmemb;
+ struct MemoryStruct *mem = (struct MemoryStruct *)memoryPtr;
+
+ mem->memory = realloc(mem->memory, mem->size + realsize + 1);
+ if(mem->memory == NULL) {
+ fw_log(logger, OSGI_FRAMEWORK_LOG_ERROR, "ENDPOINT_POLLER: not enough memory (realloc returned NULL)!");
+ return 0;
+ }
+
+ memcpy(&(mem->memory[mem->size]), contents, realsize);
+ mem->size += realsize;
+ mem->memory[mem->size] = 0;
+
+ return realsize;
+}
+
+static celix_status_t endpointDiscoveryPoller_getEndpoints(endpoint_discovery_poller_pt poller, char *url, array_list_pt *updatedEndpoints) {
+ celix_status_t status = CELIX_SUCCESS;
+
+ CURL *curl;
+ CURLcode res;
+
+ struct MemoryStruct chunk;
+ chunk.memory = malloc(1);
+ chunk.size = 0;
+
+ curl_global_init(CURL_GLOBAL_ALL);
+ curl = curl_easy_init();
+ if (!curl) {
+ status = CELIX_ILLEGAL_STATE;
+ } else {
+ curl_easy_setopt(curl, CURLOPT_URL, url);
+ curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, endpointDiscoveryPoller_writeMemory);
+ curl_easy_setopt(curl, CURLOPT_WRITEDATA, (void *)&chunk);
+ res = curl_easy_perform(curl);
+ curl_easy_cleanup(curl);
+ }
+
+ // process endpoints file
+ if (res == CURLE_OK) {
+ endpoint_descriptor_reader_pt reader = NULL;
+
+ status = endpointDescriptorReader_create(&reader);
+ if (status == CELIX_SUCCESS) {
+ status = endpointDescriptorReader_parseDocument(reader, chunk.memory, updatedEndpoints);
+ }
+
+ if (reader) {
+ endpointDescriptorReader_destroy(reader);
+ }
+ } else {
+ fw_log(logger, OSGI_FRAMEWORK_LOG_ERROR, "ENDPOINT_POLLER: unable to read endpoints, reason: %s", curl_easy_strerror(res));
+ }
+
+ // clean up endpoints file
+ if (chunk.memory) {
+ free(chunk.memory);
+ }
+
+ curl_global_cleanup();
+ return status;
+}
+
+static celix_status_t endpointDiscoveryPoller_endpointDescriptionEquals(void *endpointPtr, void *comparePtr, bool *equals) {
+ endpoint_description_pt endpoint = (endpoint_description_pt) endpointPtr;
+ endpoint_description_pt compare = (endpoint_description_pt) comparePtr;
+
+ if (strcmp(endpoint->id, compare->id) == 0) {
+ *equals = true;
+ } else {
+ *equals = false;
+ }
+
+ return CELIX_SUCCESS;
+}
Added: celix/trunk/remote_services/discovery/private/src/endpoint_discovery_server.c
URL: http://svn.apache.org/viewvc/celix/trunk/remote_services/discovery/private/src/endpoint_discovery_server.c?rev=1626250&view=auto
==============================================================================
--- celix/trunk/remote_services/discovery/private/src/endpoint_discovery_server.c (added)
+++ celix/trunk/remote_services/discovery/private/src/endpoint_discovery_server.c Fri Sep 19 15:36:24 2014
@@ -0,0 +1,310 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * endpoint_discovery_server.c
+ *
+ * \date Aug 12, 2014
+ * \author <a href="mailto:celix-dev@incubator.apache.org">Apache Celix Project Team</a>
+ * \copyright Apache License, Version 2.0
+ */
+#include <stdlib.h>
+#include <stdint.h>
+
+#include "civetweb.h"
+#include "celix_errno.h"
+#include "utils.h"
+#include "celix_log.h"
+#include "discovery.h"
+#include "discovery_impl.h"
+
+#include "endpoint_descriptor_writer.h"
+#include "endpoint_discovery_server.h"
+
+
+#define DEFAULT_SERVER_THREADS "1"
+
+#define CIVETWEB_REQUEST_NOT_HANDLED 0
+#define CIVETWEB_REQUEST_HANDLED 1
+
+static const char *response_headers =
+ "HTTP/1.1 200 OK\r\n"
+ "Cache: no-cache\r\n"
+ "Content-Type: application/xml;charset=utf-8\r\n"
+ "\r\n";
+
+struct endpoint_discovery_server {
+ hash_map_pt entries; // key = endpointId, value = endpoint_descriptor_pt
+
+ celix_thread_mutex_t serverLock;
+
+ const char* path;
+ struct mg_context* ctx;
+};
+
+// Forward declarations...
+static int endpointDiscoveryServer_callback(struct mg_connection *conn);
+static char* format_path(char* path);
+
+celix_status_t endpointDiscoveryServer_create(discovery_pt discovery, bundle_context_pt context, endpoint_discovery_server_pt *server) {
+ celix_status_t status = CELIX_SUCCESS;
+
+ *server = malloc(sizeof(struct endpoint_discovery_server));
+ if (!*server) {
+ return CELIX_ENOMEM;
+ }
+
+ (*server)->entries = hashMap_create(&utils_stringHash, NULL, &utils_stringEquals, NULL);
+ if (!(*server)->entries) {
+ return CELIX_ENOMEM;
+ }
+
+ status = celixThreadMutex_create(&(*server)->serverLock, NULL);
+ if (status != CELIX_SUCCESS) {
+ return CELIX_BUNDLE_EXCEPTION;
+ }
+
+ char *port = NULL;
+ bundleContext_getProperty(context, DISCOVERY_SERVER_PORT, &port);
+ if (port == NULL) {
+ port = DEFAULT_SERVER_PORT;
+ }
+
+ char *path = NULL;
+ bundleContext_getProperty(context, DISCOVERY_SERVER_PATH, &path);
+ if (path == NULL) {
+ path = DEFAULT_SERVER_PATH;
+ }
+
+ (*server)->path = format_path(path);
+
+ const char *options[] = {
+ "listening_ports", port,
+ "num_threads", DEFAULT_SERVER_THREADS,
+ NULL
+ };
+
+ const struct mg_callbacks callbacks = {
+ .begin_request = endpointDiscoveryServer_callback,
+ };
+
+ (*server)->ctx = mg_start(&callbacks, (*server), options);
+
+ fw_log(logger, OSGI_FRAMEWORK_LOG_INFO, "Starting discovery server on port %s...", port);
+
+ return status;
+}
+
+celix_status_t endpointDiscoveryServer_destroy(endpoint_discovery_server_pt server) {
+ celix_status_t status = CELIX_SUCCESS;
+
+ // stop & block until the actual server is shut down...
+ if (server->ctx != NULL) {
+ mg_stop(server->ctx);
+ server->ctx = NULL;
+ }
+
+ status = celixThreadMutex_lock(&server->serverLock);
+
+ hashMap_destroy(server->entries, true /* freeKeys */, false /* freeValues */);
+
+ status = celixThreadMutex_unlock(&server->serverLock);
+ status = celixThreadMutex_destroy(&server->serverLock);
+
+ free((void*) server->path);
+ free(server);
+
+ return status;
+}
+
+celix_status_t endpointDiscoveryServer_addEndpoint(endpoint_discovery_server_pt server, endpoint_description_pt endpoint) {
+ celix_status_t status = CELIX_SUCCESS;
+
+ status = celixThreadMutex_lock(&server->serverLock);
+ if (status != CELIX_SUCCESS) {
+ return CELIX_BUNDLE_EXCEPTION;
+ }
+
+ // create a local copy of the endpointId which we can control...
+ char* endpointId = strdup(endpoint->id);
+ endpoint_description_pt cur_value = hashMap_get(server->entries, endpointId);
+ if (!cur_value) {
+ fw_log(logger, OSGI_FRAMEWORK_LOG_INFO, "exposing new endpoint \"%s\"...", endpointId);
+
+ hashMap_put(server->entries, endpointId, endpoint);
+ }
+
+ status = celixThreadMutex_unlock(&server->serverLock);
+ if (status != CELIX_SUCCESS) {
+ return CELIX_BUNDLE_EXCEPTION;
+ }
+
+ return status;
+}
+
+celix_status_t endpointDiscoveryServer_removeEndpoint(endpoint_discovery_server_pt server, endpoint_description_pt endpoint) {
+ celix_status_t status = CELIX_SUCCESS;
+
+ status = celixThreadMutex_lock(&server->serverLock);
+ if (status != CELIX_SUCCESS) {
+ return CELIX_BUNDLE_EXCEPTION;
+ }
+
+ hash_map_entry_pt entry = hashMap_getEntry(server->entries, endpoint->id);
+ if (entry) {
+ char* key = hashMapEntry_getKey(entry);
+
+ fw_log(logger, OSGI_FRAMEWORK_LOG_INFO, "removing endpoint \"%s\"...\n", key);
+
+ hashMap_remove(server->entries, key);
+
+ // we've made this key, see _addEnpoint above...
+ free((void*) key);
+ }
+
+ status = celixThreadMutex_unlock(&server->serverLock);
+ if (status != CELIX_SUCCESS) {
+ return CELIX_BUNDLE_EXCEPTION;
+ }
+
+ return status;
+}
+
+static char* format_path(char* path) {
+ char* result = strdup(utils_stringTrim(path));
+ // check whether the path starts with a leading slash...
+ if (result[0] != '/') {
+ size_t len = strlen(result);
+ result = realloc(result, len + 2);
+ memmove(result + 1, result, len);
+ result[0] = '/';
+ result[len + 1] = 0;
+ }
+ return result;
+}
+
+static celix_status_t endpointDiscoveryServer_getEndpoints(endpoint_discovery_server_pt server, const char* the_endpoint_id, array_list_pt *endpoints) {
+ celix_status_t status = CELIX_SUCCESS;
+
+ status = arrayList_create(endpoints);
+ if (status != CELIX_SUCCESS) {
+ return CELIX_ENOMEM;
+ }
+
+ status = celixThreadMutex_lock(&server->serverLock);
+ if (status != CELIX_SUCCESS) {
+ return CELIX_BUNDLE_EXCEPTION;
+ }
+
+ hash_map_iterator_pt iter = hashMapIterator_create(server->entries);
+ while (hashMapIterator_hasNext(iter)) {
+ hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
+
+ char* endpoint_id = hashMapEntry_getKey(entry);
+ if (the_endpoint_id == NULL || strcmp(the_endpoint_id, endpoint_id) == 0) {
+ endpoint_description_pt endpoint = hashMapEntry_getValue(entry);
+
+ arrayList_add(*endpoints, endpoint);
+ }
+ }
+ hashMapIterator_destroy(iter);
+
+ status = celixThreadMutex_unlock(&server->serverLock);
+ if (status != CELIX_SUCCESS) {
+ return CELIX_BUNDLE_EXCEPTION;
+ }
+
+ return status;
+}
+
+static int endpointDiscoveryServer_writeEndpoints(struct mg_connection* conn, array_list_pt endpoints) {
+ celix_status_t status = CELIX_SUCCESS;
+
+ endpoint_descriptor_writer_pt writer = NULL;
+ status = endpointDescriptorWriter_create(&writer);
+ if (status != CELIX_SUCCESS) {
+ return CIVETWEB_REQUEST_NOT_HANDLED;
+ }
+
+ char *buffer = NULL;
+ status = endpointDescriptorWriter_writeDocument(writer, endpoints, &buffer);
+ if (buffer) {
+ mg_write(conn, response_headers, strlen(response_headers));
+ mg_write(conn, buffer, strlen(buffer));
+ }
+
+ status = endpointDescriptorWriter_destroy(writer);
+
+ return CIVETWEB_REQUEST_HANDLED;
+}
+
+// returns all endpoints as XML...
+static int endpointDiscoveryServer_returnAllEndpoints(endpoint_discovery_server_pt server, struct mg_connection* conn) {
+ int status = CIVETWEB_REQUEST_NOT_HANDLED;
+
+ array_list_pt endpoints = NULL;
+ endpointDiscoveryServer_getEndpoints(server, NULL, &endpoints);
+ if (endpoints) {
+ status = endpointDiscoveryServer_writeEndpoints(conn, endpoints);
+
+ arrayList_destroy(endpoints);
+ }
+
+ return status;
+}
+
+// returns a single endpoint as XML...
+static int endpointDiscoveryServer_returnEndpoint(endpoint_discovery_server_pt server, struct mg_connection* conn, const char* endpoint_id) {
+ int status = CIVETWEB_REQUEST_NOT_HANDLED;
+
+ array_list_pt endpoints = NULL;
+ endpointDiscoveryServer_getEndpoints(server, endpoint_id, &endpoints);
+ if (endpoints) {
+ status = endpointDiscoveryServer_writeEndpoints(conn, endpoints);
+
+ arrayList_destroy(endpoints);
+ }
+
+ return status;
+}
+
+static int endpointDiscoveryServer_callback(struct mg_connection* conn) {
+ int status = CIVETWEB_REQUEST_NOT_HANDLED;
+
+ const struct mg_request_info *request_info = mg_get_request_info(conn);
+ if (request_info->uri != NULL && strcmp("GET", request_info->request_method) == 0) {
+ endpoint_discovery_server_pt server = request_info->user_data;
+
+ const char *uri = request_info->uri;
+ const size_t path_len = strlen(server->path);
+ const size_t uri_len = strlen(uri);
+
+ if (strncmp(server->path, uri, strlen(server->path)) == 0) {
+ // Be lenient when it comes to the trailing slash...
+ if (path_len == uri_len || (uri_len == (path_len + 1) && uri[path_len] == '/')) {
+ status = endpointDiscoveryServer_returnAllEndpoints(server, conn);
+ } else {
+ const char* endpoint_id = uri + path_len + 1; // right after the slash...
+
+ status = endpointDiscoveryServer_returnEndpoint(server, conn, endpoint_id);
+ }
+ }
+ }
+
+ return status;
+}
Modified: celix/trunk/remote_services/discovery_configured/CMakeLists.txt
URL: http://svn.apache.org/viewvc/celix/trunk/remote_services/discovery_configured/CMakeLists.txt?rev=1626250&r1=1626249&r2=1626250&view=diff
==============================================================================
--- celix/trunk/remote_services/discovery_configured/CMakeLists.txt (original)
+++ celix/trunk/remote_services/discovery_configured/CMakeLists.txt Fri Sep 19 15:36:24 2014
@@ -23,6 +23,7 @@ include_directories("${LIBXML2_INCLUDE_D
include_directories("${PROJECT_SOURCE_DIR}/utils/public/include")
include_directories("${PROJECT_SOURCE_DIR}/remote_services/utils/private/include")
include_directories("${PROJECT_SOURCE_DIR}/remote_services/utils/public/include")
+include_directories("${PROJECT_SOURCE_DIR}/remote_services/discovery/private/include")
include_directories("${PROJECT_SOURCE_DIR}/remote_services/discovery_configured/private/include")
include_directories("${PROJECT_SOURCE_DIR}/remote_services/endpoint_listener/public/include")
include_directories("${PROJECT_SOURCE_DIR}/remote_services/remote_service_admin/public/include")
@@ -32,12 +33,13 @@ SET_HEADER(BUNDLE_SYMBOLICNAME "apache_c
SET_HEADERS("Bundle-Name: Apache Celix RSA Configured Discovery")
bundle(discovery_configured SOURCES
- private/src/discovery.c
- private/src/discovery_activator.c
- private/src/endpoint_descriptor_reader.c
- private/src/endpoint_descriptor_writer.c
- private/src/endpoint_discovery_poller.c
- private/src/endpoint_discovery_server.c
+ private/src/discovery_impl.c
+ ${PROJECT_SOURCE_DIR}/remote_services/discovery/private/src/discovery_activator.c
+ ${PROJECT_SOURCE_DIR}/remote_services/discovery/private/src/discovery.c
+ ${PROJECT_SOURCE_DIR}/remote_services/discovery/private/src/endpoint_descriptor_reader.c
+ ${PROJECT_SOURCE_DIR}/remote_services/discovery/private/src/endpoint_descriptor_writer.c
+ ${PROJECT_SOURCE_DIR}/remote_services/discovery/private/src/endpoint_discovery_poller.c
+ ${PROJECT_SOURCE_DIR}/remote_services/discovery/private/src/endpoint_discovery_server.c
${PROJECT_SOURCE_DIR}/remote_services/remote_service_admin/private/src/endpoint_description.c
${PROJECT_SOURCE_DIR}/remote_services/utils/private/src/civetweb.c
)
@@ -48,7 +50,7 @@ target_link_libraries(discovery_configur
if (RSA_ENDPOINT_TEST_READER)
add_executable(descparser
- private/src/endpoint_descriptor_reader.c
+ ${PROJECT_SOURCE_DIR}/remote_services/discovery/private/src/endpoint_descriptor_reader.c
${PROJECT_SOURCE_DIR}/remote_services/remote_service_admin/private/src/endpoint_description.c)
target_link_libraries(descparser ${LIBXML2_LIBRARIES} celix_framework celix_utils)
@@ -56,7 +58,7 @@ endif (RSA_ENDPOINT_TEST_READER)
if (RSA_ENDPOINT_TEST_WRITER)
add_executable(descwriter
- private/src/endpoint_descriptor_writer.c
+ ${PROJECT_SOURCE_DIR}/remote_services/discovery/private/src/endpoint_descriptor_writer.c
${PROJECT_SOURCE_DIR}/remote_services/remote_service_admin/private/src/endpoint_description.c)
target_link_libraries(descwriter ${LIBXML2_LIBRARIES} celix_framework celix_utils)
Added: celix/trunk/remote_services/discovery_configured/private/include/discovery_impl.h
URL: http://svn.apache.org/viewvc/celix/trunk/remote_services/discovery_configured/private/include/discovery_impl.h?rev=1626250&view=auto
==============================================================================
--- celix/trunk/remote_services/discovery_configured/private/include/discovery_impl.h (added)
+++ celix/trunk/remote_services/discovery_configured/private/include/discovery_impl.h Fri Sep 19 15:36:24 2014
@@ -0,0 +1,59 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership. The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * topology_manager.h
+ *
+ * \date Sep 29, 2011
+ * \author <a href="mailto:celix-dev@incubator.apache.org">Apache Celix Project Team</a>
+ * \copyright Apache License, Version 2.0
+ */
+
+#ifndef DISCOVERY_IMPL_H_
+#define DISCOVERY_IMPL_H_
+
+#include "bundle_context.h"
+#include "service_reference.h"
+
+#include "endpoint_description.h"
+#include "endpoint_listener.h"
+
+#include "endpoint_discovery_poller.h"
+#include "endpoint_discovery_server.h"
+
+
+
+#define DEFAULT_SERVER_PORT "9999"
+#define DEFAULT_SERVER_PATH "/org.apache.celix.discovery.configured"
+#define DEFAULT_POLL_ENDPOINTS "http://localhost:9999/org.apache.celix.discovery.configured"
+
+
+struct discovery {
+ bundle_context_pt context;
+
+ celix_thread_mutex_t listenerReferencesMutex;
+ celix_thread_mutex_t discoveredServicesMutex;
+
+ hash_map_pt listenerReferences; //key=serviceReference, value=nop
+ hash_map_pt discoveredServices; //key=endpointId (string), value=endpoint_description_pt
+
+ endpoint_discovery_poller_pt poller;
+ endpoint_discovery_server_pt server;
+};
+
+#endif /* DISCOVERY_IMPL_H_ */