You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@celix.apache.org by GitBox <gi...@apache.org> on 2020/01/26 15:18:38 UTC

[GitHub] [celix] dhbfischer opened a new pull request #143: Feature/pubsub websocket full duplex communication

dhbfischer opened a new pull request #143: Feature/pubsub websocket full duplex communication
URL: https://github.com/apache/celix/pull/143
 
 
   Created a full duplex communication mechanism for the websocket pubsub by adding an endpoint for each subscriber. This creates an opportunity to connect with the subscriber with the help of a webpage. The websocket publisher example is also updated with an added subscriber and renamed to pubsub_websocket_example instead of pubsub_publisher_websocket.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [celix] Oipo commented on a change in pull request #143: Feature/pubsub websocket full duplex communication

Posted by GitBox <gi...@apache.org>.
Oipo commented on a change in pull request #143: Feature/pubsub websocket full duplex communication
URL: https://github.com/apache/celix/pull/143#discussion_r371027968
 
 

 ##########
 File path: bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.c
 ##########
 @@ -558,34 +586,88 @@ static void* psa_websocket_recvThread(void * data) {
     return NULL;
 }
 
+static void psa_websocketTopicReceiver_ready(struct mg_connection *connection, void *handle) {
+    if (handle != NULL) {
+        pubsub_websocket_topic_receiver_t *receiver = (pubsub_websocket_topic_receiver_t *) handle;
+
+        //Get request info with host, port and uri information
+        const struct mg_request_info *ri = mg_get_request_info(connection);
+        if (ri != NULL && strcmp(receiver->uri, ri->request_uri) == 0) {
+            char *key = NULL;
+            asprintf(&key, "%s:%i", ri->remote_addr, ri->remote_port);
+
+            celixThreadMutex_lock(&receiver->requestedConnections.mutex);
+            psa_websocket_requested_connection_entry_t *entry = hashMap_get(receiver->requestedConnections.map, key);
+            if (entry == NULL) {
+                entry = calloc(1, sizeof(*entry));
+                entry->key = key;
+                entry->uri = strndup(ri->request_uri, 1024 * 1024);
+                entry->socketAddress = strndup(ri->remote_addr, 1024 * 1024);
+                entry->socketPort = ri->remote_port;
+                entry->connected = true;
+                entry->statically = false;
+                entry->passive = true;
+                hashMap_put(receiver->requestedConnections.map, (void *) entry->key, entry);
+                receiver->requestedConnections.allConnected = false;
+            } else {
+                free(key);
+            }
+
+            celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
+        }
+    }
+}
+
+
 static int psa_websocketTopicReceiver_data(struct mg_connection *connection __attribute__((unused)),
                                             int op_code __attribute__((unused)),
                                             char *data,
                                             size_t length,
                                             void *handle) {
     //Received a websocket message, append this message to the buffer of the receiver.
     if (handle != NULL) {
 
 Review comment:
   See above comment about reducing nesting.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [celix] rlenferink merged pull request #143: Feature/pubsub websocket full duplex communication

Posted by GitBox <gi...@apache.org>.
rlenferink merged pull request #143: Feature/pubsub websocket full duplex communication
URL: https://github.com/apache/celix/pull/143
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [celix] Oipo commented on a change in pull request #143: Feature/pubsub websocket full duplex communication

Posted by GitBox <gi...@apache.org>.
Oipo commented on a change in pull request #143: Feature/pubsub websocket full duplex communication
URL: https://github.com/apache/celix/pull/143#discussion_r371026716
 
 

 ##########
 File path: bundles/pubsub/examples/pubsub/pubsub_websocket/private/src/pubsub_websocket_example.c
 ##########
 @@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * pubsub_subscriber.c
+ *
+ *  \date       Sep 21, 2010
+ *  \author     <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ *  \copyright  Apache License, Version 2.0
+ */
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <pthread.h>
+#include <unistd.h>
+
+#include "poi.h"
+#include "poiCmd.h"
+#include "pubsub_websocket_private.h"
+
+#include "service_tracker.h"
+#include "celix_threads.h"
+
+
+static double randCoordinate(double min, double max) {
+    double ret = min + (((double)random()) / (((double)RAND_MAX)/(max-min)));
+    return ret;
+}
+
+static void* send_thread(void* arg) {
+    send_thread_struct_t *st_struct = (send_thread_struct_t *) arg;
+
+    pubsub_publisher_t *publish_svc = st_struct->service;
+    pubsub_info_t *pubsubInfo = st_struct->pubsub;
+    pubsub_sender_t *publisher = st_struct->pubsub->sender;
+
+    char fwUUID[9];
+    memset(fwUUID, 0, 9);
+    memcpy(fwUUID, publisher->ident, 8);
+
+    //poi_t point = calloc(1,sizeof(*point));
 
 Review comment:
   Probably better to remove commented out code.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [celix] dhbfischer commented on a change in pull request #143: Feature/pubsub websocket full duplex communication

Posted by GitBox <gi...@apache.org>.
dhbfischer commented on a change in pull request #143: Feature/pubsub websocket full duplex communication
URL: https://github.com/apache/celix/pull/143#discussion_r371870197
 
 

 ##########
 File path: bundles/pubsub/examples/pubsub/pubsub_websocket/private/src/ps_websocket_activator.c
 ##########
 @@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * ps_websocket_activator.c
+ *
+ *  \date       Jan 16, 2020
+ *  \author     <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ *  \copyright  Apache License, Version 2.0
+ */
+
+#include <stdlib.h>
+#include <pubsub/subscriber.h>
+#include <pubsub/publisher.h>
+
+#include "celix_api.h"
+
+#include "pubsub_websocket_private.h"
+
+
+static const char * PUB_TOPICS[] = {
+        "poi1",
+        "poi2",
+        NULL
+};
+
+
+#define SUB_NAME "poiCmd"
+static const char * SUB_TOPICS[] = {
+        "poiCmd",
 
 Review comment:
   The SUB_NAME define is used as an semicolon separated list of topics, in this case only one. See also subscriber example (file: ps_sub_activator.c).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [celix] Oipo commented on a change in pull request #143: Feature/pubsub websocket full duplex communication

Posted by GitBox <gi...@apache.org>.
Oipo commented on a change in pull request #143: Feature/pubsub websocket full duplex communication
URL: https://github.com/apache/celix/pull/143#discussion_r371025516
 
 

 ##########
 File path: bundles/pubsub/examples/pubsub/pubsub_websocket/private/src/ps_websocket_activator.c
 ##########
 @@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * ps_websocket_activator.c
+ *
+ *  \date       Jan 16, 2020
+ *  \author     <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ *  \copyright  Apache License, Version 2.0
+ */
+
+#include <stdlib.h>
+#include <pubsub/subscriber.h>
+#include <pubsub/publisher.h>
+
+#include "celix_api.h"
+
+#include "pubsub_websocket_private.h"
+
+
+static const char * PUB_TOPICS[] = {
+        "poi1",
+        "poi2",
+        NULL
+};
+
+
+#define SUB_NAME "poiCmd"
+static const char * SUB_TOPICS[] = {
+        "poiCmd",
 
 Review comment:
   Probably better to use SUB_NAME here as well.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [celix] Oipo commented on a change in pull request #143: Feature/pubsub websocket full duplex communication

Posted by GitBox <gi...@apache.org>.
Oipo commented on a change in pull request #143: Feature/pubsub websocket full duplex communication
URL: https://github.com/apache/celix/pull/143#discussion_r371026726
 
 

 ##########
 File path: bundles/pubsub/examples/pubsub/pubsub_websocket/private/src/pubsub_websocket_example.c
 ##########
 @@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * pubsub_subscriber.c
+ *
+ *  \date       Sep 21, 2010
+ *  \author     <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ *  \copyright  Apache License, Version 2.0
+ */
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <pthread.h>
+#include <unistd.h>
+
+#include "poi.h"
+#include "poiCmd.h"
+#include "pubsub_websocket_private.h"
+
+#include "service_tracker.h"
+#include "celix_threads.h"
+
+
+static double randCoordinate(double min, double max) {
+    double ret = min + (((double)random()) / (((double)RAND_MAX)/(max-min)));
+    return ret;
+}
+
+static void* send_thread(void* arg) {
+    send_thread_struct_t *st_struct = (send_thread_struct_t *) arg;
+
+    pubsub_publisher_t *publish_svc = st_struct->service;
+    pubsub_info_t *pubsubInfo = st_struct->pubsub;
+    pubsub_sender_t *publisher = st_struct->pubsub->sender;
+
+    char fwUUID[9];
+    memset(fwUUID, 0, 9);
+    memcpy(fwUUID, publisher->ident, 8);
+
+    //poi_t point = calloc(1,sizeof(*point));
+    location_t place = calloc(1, sizeof(*place));
+
+    char *desc = calloc(64, sizeof(char));
+    snprintf(desc, 64, "fw-%s [TID=%lu]", fwUUID, (unsigned long)pthread_self());
+
+    char *name = calloc(64, sizeof(char));
+    snprintf(name, 64, "Bundle#%ld", publisher->bundleId);
+
+    place->name = name;
+    place->description = desc;
+    place->extra = "extra value";
+    printf("TOPIC : %s\n", st_struct->topic);
+
+    unsigned int msgId = 0;
+
+    while (publisher->stop == false) {
+        if(pubsubInfo->sending) {
+            if (msgId == 0) {
+                if (publish_svc->localMsgTypeIdForMsgType(publish_svc->handle, st_struct->topic, &msgId) != 0) {
+                    printf("PUBLISHER: Cannot retrieve msgId for message '%s'\n", MSG_POI_NAME);
+                }
+            }
+
+            if (msgId > 0) {
+                place->position.lat = randCoordinate(MIN_LAT, MAX_LAT);
+                place->position.lon = randCoordinate(MIN_LON, MAX_LON);
+                int nr_char = (int) randCoordinate(5, 100000);
+                place->data = calloc(nr_char, 1);
+                for (int i = 0; i < (nr_char - 1); i++) {
+                    place->data[i] = i % 10 + '0';
+                }
+                place->data[nr_char - 1] = '\0';
+                if (publish_svc->send) {
+                    if (publish_svc->send(publish_svc->handle, msgId, place) == 0) {
+                        printf("Sent %s [%f, %f] (%s, %s) data len = %d\n", st_struct->topic,
+                               place->position.lat, place->position.lon, place->name, place->description, nr_char);
+                    }
+                } else {
+                    printf("No send for %s\n", st_struct->topic);
+                }
+
+                free(place->data);
+            }
+        }
+        sleep(2);
+    }
+
+    free(place->description);
+    free(place->name);
+    free(place);
+
+//    free(st_struct->pubsub);
 
 Review comment:
   Probably better to remove commented out code.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [celix] Oipo commented on a change in pull request #143: Feature/pubsub websocket full duplex communication

Posted by GitBox <gi...@apache.org>.
Oipo commented on a change in pull request #143: Feature/pubsub websocket full duplex communication
URL: https://github.com/apache/celix/pull/143#discussion_r371027573
 
 

 ##########
 File path: bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.c
 ##########
 @@ -558,34 +586,88 @@ static void* psa_websocket_recvThread(void * data) {
     return NULL;
 }
 
+static void psa_websocketTopicReceiver_ready(struct mg_connection *connection, void *handle) {
+    if (handle != NULL) {
 
 Review comment:
   You can reduce nesting by changing this to `if(handle == null) { return; }`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [celix] Oipo commented on a change in pull request #143: Feature/pubsub websocket full duplex communication

Posted by GitBox <gi...@apache.org>.
Oipo commented on a change in pull request #143: Feature/pubsub websocket full duplex communication
URL: https://github.com/apache/celix/pull/143#discussion_r371027636
 
 

 ##########
 File path: bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.c
 ##########
 @@ -558,34 +586,88 @@ static void* psa_websocket_recvThread(void * data) {
     return NULL;
 }
 
+static void psa_websocketTopicReceiver_ready(struct mg_connection *connection, void *handle) {
+    if (handle != NULL) {
+        pubsub_websocket_topic_receiver_t *receiver = (pubsub_websocket_topic_receiver_t *) handle;
+
+        //Get request info with host, port and uri information
+        const struct mg_request_info *ri = mg_get_request_info(connection);
+        if (ri != NULL && strcmp(receiver->uri, ri->request_uri) == 0) {
 
 Review comment:
   Similarly, you can reduce nesting here.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [celix] dhbfischer commented on a change in pull request #143: Feature/pubsub websocket full duplex communication

Posted by GitBox <gi...@apache.org>.
dhbfischer commented on a change in pull request #143: Feature/pubsub websocket full duplex communication
URL: https://github.com/apache/celix/pull/143#discussion_r371875026
 
 

 ##########
 File path: bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.c
 ##########
 @@ -558,34 +586,88 @@ static void* psa_websocket_recvThread(void * data) {
     return NULL;
 }
 
+static void psa_websocketTopicReceiver_ready(struct mg_connection *connection, void *handle) {
+    if (handle != NULL) {
 
 Review comment:
   Changing that will decrease the coverage

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [celix] Oipo commented on a change in pull request #143: Feature/pubsub websocket full duplex communication

Posted by GitBox <gi...@apache.org>.
Oipo commented on a change in pull request #143: Feature/pubsub websocket full duplex communication
URL: https://github.com/apache/celix/pull/143#discussion_r371027972
 
 

 ##########
 File path: bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.c
 ##########
 @@ -558,34 +586,88 @@ static void* psa_websocket_recvThread(void * data) {
     return NULL;
 }
 
+static void psa_websocketTopicReceiver_ready(struct mg_connection *connection, void *handle) {
+    if (handle != NULL) {
+        pubsub_websocket_topic_receiver_t *receiver = (pubsub_websocket_topic_receiver_t *) handle;
+
+        //Get request info with host, port and uri information
+        const struct mg_request_info *ri = mg_get_request_info(connection);
+        if (ri != NULL && strcmp(receiver->uri, ri->request_uri) == 0) {
+            char *key = NULL;
+            asprintf(&key, "%s:%i", ri->remote_addr, ri->remote_port);
+
+            celixThreadMutex_lock(&receiver->requestedConnections.mutex);
+            psa_websocket_requested_connection_entry_t *entry = hashMap_get(receiver->requestedConnections.map, key);
+            if (entry == NULL) {
+                entry = calloc(1, sizeof(*entry));
+                entry->key = key;
+                entry->uri = strndup(ri->request_uri, 1024 * 1024);
+                entry->socketAddress = strndup(ri->remote_addr, 1024 * 1024);
+                entry->socketPort = ri->remote_port;
+                entry->connected = true;
+                entry->statically = false;
+                entry->passive = true;
+                hashMap_put(receiver->requestedConnections.map, (void *) entry->key, entry);
+                receiver->requestedConnections.allConnected = false;
+            } else {
+                free(key);
+            }
+
+            celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
+        }
+    }
+}
+
+
 static int psa_websocketTopicReceiver_data(struct mg_connection *connection __attribute__((unused)),
                                             int op_code __attribute__((unused)),
                                             char *data,
                                             size_t length,
                                             void *handle) {
     //Received a websocket message, append this message to the buffer of the receiver.
     if (handle != NULL) {
-        psa_websocket_requested_connection_entry_t *entry = (psa_websocket_requested_connection_entry_t *) handle;
+        pubsub_websocket_topic_receiver_t *receiver = (pubsub_websocket_topic_receiver_t *) handle;
 
-        celixThreadMutex_lock(&entry->recvBuffer->mutex);
+        celixThreadMutex_lock(&receiver->recvBuffer.mutex);
         pubsub_websocket_msg_entry_t *msg = malloc(sizeof(*msg));
         const char *rcvdMsgData = malloc(length);
         memcpy((void *) rcvdMsgData, data, length);
         msg->msgData = rcvdMsgData;
         msg->msgSize = length;
-        celix_arrayList_add(entry->recvBuffer->list, msg);
-        celixThreadMutex_unlock(&entry->recvBuffer->mutex);
+        celix_arrayList_add(receiver->recvBuffer.list, msg);
+        celixThreadMutex_unlock(&receiver->recvBuffer.mutex);
     }
 
     return 1; //keep open (non-zero), 0 to close the socket
 }
 
-static void psa_websocketTopicReceiver_close(const struct mg_connection *connection __attribute__((unused)), void *handle) {
+static void psa_websocketTopicReceiver_close(const struct mg_connection *connection, void *handle) {
     //Reset connection for this receiver entry
     if (handle != NULL) {
 
 Review comment:
   See above comment about reducing nesting.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services