You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@celix.apache.org by GitBox <gi...@apache.org> on 2020/05/05 12:13:14 UTC

[GitHub] [celix] pnoltes opened a new pull request #223: Feature/pubsub custom serializers

pnoltes opened a new pull request #223:
URL: https://github.com/apache/celix/pull/223


   - Adds a pubsub serialization services per msg types alternative for the current serialization setup.
   - Also fixes some issues in bundles trackers / bundle listeners


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [celix] pnoltes commented on pull request #223: Feature/pubsub custom serializers

Posted by GitBox <gi...@apache.org>.
pnoltes commented on pull request #223:
URL: https://github.com/apache/celix/pull/223#issuecomment-629845894


   @abroekhuis and @rbulter ping: Could you see if the PR is ok. 
   This PR also solves some threading issues, so I would like to merge this to master asap.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [celix] pnoltes commented on a change in pull request #223: Feature/pubsub custom serializers

Posted by GitBox <gi...@apache.org>.
pnoltes commented on a change in pull request #223:
URL: https://github.com/apache/celix/pull/223#discussion_r426428191



##########
File path: bundles/pubsub/pubsub_serializer_avrobin/src/pubsub_avrobin_serialization_provider.c
##########
@@ -0,0 +1,120 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#include "pubsub_avrobin_serialization_provider.h"
+
+#include <stdlib.h>
+#include <stdarg.h>
+#include <string.h>
+#include <assert.h>
+
+#include "avrobin_serializer.h"
+#include "dyn_message.h"
+#include "celix_log_helper.h"
+#include "pubsub_message_serialization_service.h"
+
+static void dfi_log(void *handle, int level, const char *file, int line, const char *msg, ...) {
+    va_list ap;
+    celix_log_helper_t *log = handle;
+    char *logStr = NULL;
+    va_start(ap, msg);
+    vasprintf(&logStr, msg, ap);
+    va_end(ap);
+    celix_logHelper_log(log, level, "FILE:%s, LINE:%i, MSG:%s", file, line, logStr);
+    free(logStr);
+}
+
+
+static celix_status_t pubsub_avrobinSerializationProvider_serialize(pubsub_serialization_entry_t* entry, const void* msg, struct iovec** output, size_t* outputIovLen) {
+    celix_status_t status = CELIX_SUCCESS;
+
+    if (output != NULL) {
+        *output = calloc(1, sizeof(struct iovec));
+        *outputIovLen = 1;
+    } else {
+        return CELIX_ILLEGAL_ARGUMENT;
+    }
+
+    uint8_t *serializedOutput = NULL;
+    size_t serializedOutputLen;
+    dyn_type* dynType;
+    dynMessage_getMessageType(entry->msgType, &dynType);
+
+    if (avrobinSerializer_serialize(dynType, msg, &serializedOutput, &serializedOutputLen) != 0) {
+        status = CELIX_BUNDLE_EXCEPTION;
+    }
+
+    if (status == CELIX_SUCCESS) {
+        (**output).iov_base = (void*)serializedOutput;
+        (**output).iov_len  = serializedOutputLen;
+    }
+
+    return status;
+}
+
+void pubsub_avrobinSerializationProvider_freeSerializeMsg(pubsub_serialization_entry_t* entry, struct iovec* input, size_t inputIovLen) {
+    if (input != NULL) {
+        if (entry->msgType != NULL) {
+            for (int i = 0; i < inputIovLen; i++) {
+                if (input[i].iov_base) {
+                    free(input[i].iov_base);
+                }
+                input[i].iov_base = NULL;
+                input[i].iov_len = 0;
+            }
+        }
+        free(input);
+    }
+}
+
+celix_status_t pubsub_avrobinSerializationProvider_deserialize(pubsub_serialization_entry_t* entry, const struct iovec* input, size_t inputIovLen, void **out) {
+    celix_status_t status = CELIX_SUCCESS;
+    if (input == NULL) return CELIX_BUNDLE_EXCEPTION;
+    void *msg = NULL;
+    dyn_type* dynType;
+    dynMessage_getMessageType(entry->msgType, &dynType);
+
+    assert(inputIovLen == 1);
+
+    if (avrobinSerializer_deserialize(dynType, (uint8_t *)input->iov_base, input->iov_len, &msg) != 0) {
+        status = CELIX_BUNDLE_EXCEPTION;
+    } else{
+        *out = msg;
+    }
+
+    return status;
+}
+
+void pubsub_avrobinSerializationProvider_freeDeserializeMsg(pubsub_serialization_entry_t* entry, void *msg) {
+    if (entry->msgType != NULL) {
+        dyn_type* dynType;
+        dynMessage_getMessageType(entry->msgType, &dynType);
+        dynType_free(dynType, msg);
+    }
+}
+
+pubsub_serialization_provider_t* pubsub_avrobinSerializationProvider_create(celix_bundle_context_t* ctx)  {
+    pubsub_serialization_provider_t* provider = pubsub_serializationProvider_create(ctx, "avrobin", 0, pubsub_avrobinSerializationProvider_serialize, pubsub_avrobinSerializationProvider_freeSerializeMsg, pubsub_avrobinSerializationProvider_deserialize, pubsub_avrobinSerializationProvider_freeDeserializeMsg);
+    avrobinSerializer_logSetup(dfi_log, pubsub_serializationProvider_getLogHelper(provider), 1);;

Review comment:
       updated




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [celix] pnoltes merged pull request #223: Feature/pubsub custom serializers

Posted by GitBox <gi...@apache.org>.
pnoltes merged pull request #223:
URL: https://github.com/apache/celix/pull/223


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [celix] Oipo commented on pull request #223: Feature/pubsub custom serializers

Posted by GitBox <gi...@apache.org>.
Oipo commented on pull request #223:
URL: https://github.com/apache/celix/pull/223#issuecomment-629856783


   > @Oipo Could you retest if the tests still fail?
   
   All green.
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [celix] pnoltes commented on pull request #223: Feature/pubsub custom serializers

Posted by GitBox <gi...@apache.org>.
pnoltes commented on pull request #223:
URL: https://github.com/apache/celix/pull/223#issuecomment-630071770


   > Looks good, but please please please, make PR's smaller. This touched so many things, that should have been in different PR's...
   > Smaller PR's makes reviewing much easier.
   
   Yeah I understand that. My bad for adding the FindCppuTest stuff. 
   
   Note that I ran into the tracking bundles issues when developing the pubsub msg serialization bundles. So splitting is not really an option then. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [celix] pnoltes commented on a change in pull request #223: Feature/pubsub custom serializers

Posted by GitBox <gi...@apache.org>.
pnoltes commented on a change in pull request #223:
URL: https://github.com/apache/celix/pull/223#discussion_r426427968



##########
File path: bundles/pubsub/pubsub_serializer_avrobin/gtest/CMakeLists.txt
##########
@@ -0,0 +1,36 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+add_celix_bundle(pubsub_avrobin_serialization_descriptor NO_ACTIVATOR VERSION 1.0.0)
+celix_bundle_files(pubsub_avrobin_serialization_descriptor
+		${CMAKE_CURRENT_SOURCE_DIR}/msg_descriptors/msg_poi1.descriptor
+		DESTINATION "META-INF/descriptors"
+)
+
+add_executable(test_pubsub_serializer_avrobin
+        src/PubSubAvrobinSerializationProviderTestSuite.cc
+)
+target_link_libraries(test_pubsub_serializer_avrobin PRIVATE Celix::framework Celix::dfi Celix::pubsub_utils GTest::gtest GTest::gtest_main)
+target_compile_options(test_pubsub_serializer_avrobin PRIVATE -std=c++14) #Note test code is allowed to be C++14
+
+add_dependencies(test_pubsub_serializer_avrobin celix_pubsub_serializer_avrobin_bundle pubsub_avrobin_serialization_descriptor_bundle)
+target_compile_definitions(test_pubsub_serializer_avrobin PRIVATE -DSER_BUNDLE=\"$<TARGET_PROPERTY:celix_pubsub_serializer_avrobin,BUNDLE_FILE>\")

Review comment:
       renamed (also for json)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [celix] pnoltes commented on a change in pull request #223: Feature/pubsub custom serializers

Posted by GitBox <gi...@apache.org>.
pnoltes commented on a change in pull request #223:
URL: https://github.com/apache/celix/pull/223#discussion_r426414311



##########
File path: bundles/pubsub/pubsub_utils/src/pubsub_serialization_provider.c
##########
@@ -0,0 +1,668 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#include "pubsub_serialization_provider.h"
+
+#include <stdlib.h>
+#include <stdarg.h>
+#include <dirent.h>
+#include <string.h>
+
+#include "celix_constants.h"
+#include "dyn_function.h"
+#include "celix_version.h"
+#include "celix_utils.h"
+#include "dyn_message.h"
+#include "pubsub_utils.h"
+#include "celix_log_helper.h"
+#include "pubsub_message_serialization_service.h"
+#include "celix_shell_command.h"
+
+#define MAX_PATH_LEN    1024
+
+typedef enum
+{
+    FIT_INVALID = 0,
+    FIT_DESCRIPTOR = 1,
+    FIT_AVPR = 2
+} descriptor_type_e;
+
+#define L_DEBUG(...) \
+    celix_logHelper_debug(provider->logHelper, __VA_ARGS__)
+#define L_INFO(...) \
+    celix_logHelper_info(provider->logHelper, __VA_ARGS__)
+#define L_WARN(...) \
+    celix_logHelper_warning(provider->logHelper, __VA_ARGS__)
+#define L_ERROR(...) \
+    celix_logHelper_error(provider->logHelper, __VA_ARGS__)
+
+
+struct pubsub_serialization_provider {
+    celix_bundle_context_t *ctx;
+    celix_log_helper_t *logHelper;
+    char* serializationType;
+
+    //serialization callbacks
+    celix_status_t (*serialize)(pubsub_serialization_entry_t* entry, const void* msg, struct iovec** output, size_t* outputIovLen);
+    void (*freeSerializeMsg)(pubsub_serialization_entry_t* entry, struct iovec* input, size_t inputIovLen);
+    celix_status_t (*deserialize)(pubsub_serialization_entry_t* entry, const struct iovec* input, size_t inputIovLen __attribute__((unused)), void **out);
+    void (*freeDeserializeMsg)(pubsub_serialization_entry_t* entry, void *msg);
+
+    //updated serialization services
+    long bundleTrackerId;
+
+    pubsub_message_serialization_marker_t markerSvc;
+    long serializationMarkerSvcId;
+
+    celix_shell_command_t cmdSvc;
+    long cmdSvcId;
+
+    celix_thread_mutex_t mutex; //protects below
+    celix_array_list_t *serializationSvcEntries; //key = pubsub_serialization_entry;
+};
+
+static void dfi_log(void *handle, int level, const char *file, int line, const char *msg, ...) {
+    (void)level;
+    va_list ap;
+    pubsub_serialization_provider_t *provider = handle;
+    char *logStr = NULL;
+    va_start(ap, msg);
+    vasprintf(&logStr, msg, ap);
+    va_end(ap);
+    celix_logHelper_log(provider->logHelper, CELIX_LOG_LEVEL_WARNING, "FILE:%s, LINE:%i, MSG:%s", file, line, logStr);
+    free(logStr);
+}
+
+static descriptor_type_e getDescriptorType(const char* filename) {
+    if (strstr(filename, ".descriptor")) {
+        return FIT_DESCRIPTOR;
+    }
+    else if (strstr(filename, ".properties")) {
+        return FIT_AVPR;
+    }
+    else {
+        return FIT_INVALID;
+    }
+}
+
+static bool readPropertiesFile(pubsub_serialization_provider_t* provider, const char* properties_file_name, const char* root, char* avpr_fqn, char* path) {
+    snprintf(path, MAX_PATH_LEN, "%s/%s", root, properties_file_name); // use path to create path to properties file
+    FILE *properties = fopen(path, "r");
+    if (!properties) {
+        L_WARN("Could not find or open %s as a properties file in %s\n", properties_file_name, root);
+        return false;
+    }
+
+    *avpr_fqn = '\0';
+    *path = '\0'; //re-use path to create path to avpr file
+    char *p_line = malloc(MAX_PATH_LEN);
+    size_t line_len = MAX_PATH_LEN;
+    while (getline(&p_line, &line_len, properties) >= 0) {
+        if (strncmp(p_line, "fqn=", strlen("fqn=")) == 0) {
+            snprintf(avpr_fqn, MAX_PATH_LEN, "%s", (p_line + strlen("fqn=")));
+            avpr_fqn[strcspn(avpr_fqn, "\n")] = 0;
+        }
+        else if (strncmp(p_line, "avpr=", strlen("avpr=")) == 0) {
+            snprintf(path, MAX_PATH_LEN, "%s/%s", root, (p_line + strlen("avpr=")));
+            path[strcspn(path, "\n")] = 0;
+        }
+    }
+    free(p_line);
+    fclose(properties);
+
+    if (*avpr_fqn == '\0') {
+        L_WARN("File %s does not contain a fully qualified name for the parser\n", properties_file_name);
+        return false;
+    }
+
+    if (*path == '\0') {
+        L_WARN("File %s does not contain a location for the avpr file\n", properties_file_name);
+        return false;
+    }
+
+    return true;
+}
+
+static FILE* openFileStream(pubsub_serialization_provider_t* provider, descriptor_type_e descriptorType, const char* filename, const char* root, char* avpr_fqn, char* pathOrError) {
+    FILE* result = NULL;
+    memset(pathOrError, 0, MAX_PATH_LEN);
+    switch (descriptorType) {
+        case FIT_INVALID:
+            snprintf(pathOrError, MAX_PATH_LEN, "Because %s is not a valid file", filename);
+            break;
+        case FIT_DESCRIPTOR:
+            snprintf(pathOrError, MAX_PATH_LEN, "%s/%s", root, filename);
+            result = fopen(pathOrError, "r");
+            break;
+        case FIT_AVPR:
+            if (readPropertiesFile(provider, filename, root, avpr_fqn, pathOrError)) {
+                result = fopen(pathOrError, "r");
+            }
+            break;
+        default:
+            L_WARN("Unknown file input type, returning NULL!\n");
+            break;
+    }
+
+    return result;
+}
+
+static unsigned int pubsub_serializationProvider_getMsgId(pubsub_serialization_provider_t* provider __attribute__((unused)), dyn_message_type *msg) {
+    unsigned int msgId = 0;
+
+    char *msgName = NULL;
+    dynMessage_getName(msg, &msgName);
+
+    char *msgIdStr = NULL;
+    int rv = dynMessage_getAnnotationEntry(msg, "msgId", &msgIdStr);
+    if (rv == CELIX_SUCCESS && msgIdStr != NULL) {
+        // custom msg id passed, use it
+        long customMsgId = strtol(msgIdStr, NULL, 10);
+        if (customMsgId > 0) {
+            msgId = (unsigned int) customMsgId;
+        }
+    }
+    if (msgId == 0) {
+        msgId = celix_utils_stringHash(msgName);
+    }
+
+    return msgId;
+}
+
+static dyn_message_type* pubsub_serializationProvider_parseDfiDescriptor(pubsub_serialization_provider_t* provider, FILE* stream, const char* entryPath) {
+    dyn_message_type *msg = NULL;
+    int rc = dynMessage_parse(stream, &msg);
+    if (rc != 0 || msg == NULL) {
+        L_WARN("Cannot parse message from descriptor from entry %s.\n", entryPath);
+        return NULL;
+    }
+
+    char *msgName = NULL;
+    rc += dynMessage_getName(msg, &msgName);
+
+    version_pt msgVersion = NULL;
+    rc += dynMessage_getVersion(msg, &msgVersion);
+
+    if (rc != 0 || msgName == NULL || msgVersion == NULL) {
+        L_WARN("Cannot retrieve name and/or version from msg, using entry %s.\n", entryPath);
+        dynMessage_destroy(msg);
+        return NULL;
+    }
+
+    return msg;
+}
+
+//TODO FIXME, see #158
+//
+//    static dyn_message_type* pubsub_serializationProvider_parseAvprDescriptor(pubsub_serialization_provider_t* provider, FILE* stream, const char *entryName, const char* fqn) {
+//
+//    //dyn_message_type* msgType = dynMessage_parseAvpr(file_ptr, fqn);
+//    dyn_message_type* msgType = NULL;
+//
+//    if (!msgType) {
+//        L_WARN("[json serializer] Cannot parse avpr file '%s'\n", fqn);
+//        return -1;
+//    }
+//
+//    dyn_type* type;
+//    dynMessage_getMessageType(msgType, &type);
+//
+//    const char *msgName = dynType_getName(type);
+//
+//    version_pt msgVersion = NULL;
+//    celix_status_t s = version_createVersionFromString(dynType_getMetaInfo(type, "version"), &msgVersion);
+//
+//    if (s != CELIX_SUCCESS || !msgName) {
+//        L_WARN("[json serializer] Cannot retrieve name and/or version from msg\n");
+//        if (s == CELIX_SUCCESS) {
+//            version_destroy(msgVersion);
+//        }
+//        return -1;
+//    }
+//
+//    unsigned int msgId = 0;
+//    const char *msgIdStr = dynType_getMetaInfo(type, "msgId");
+//    if (msgIdStr != NULL) {
+//        // custom msg id passed, use it
+//        long customMsgId = strtol(msgIdStr, NULL, 10);
+//        if (customMsgId > 0)
+//            msgId = (unsigned int) customMsgId;
+//    }
+//
+//    if (msgId == 0) {
+//        msgId = utils_stringHash(msgName);
+//    }
+//
+//
+//
+//    return 0;
+//}
+//}
+
+/**
+ * Returns true if the msgType is valid and uqinue (new msg fqn & msg id).
+ * Logs error if msg id clashes or versions are different.
+ * Logs warning if descriptors are different.
+ */
+static bool pubsub_serializationProvider_isUniqueAndCheckValid(pubsub_serialization_provider_t* provider, pubsub_serialization_entry_t* entry) {

Review comment:
       renamed and added doc




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [celix] pnoltes commented on pull request #223: Feature/pubsub custom serializers

Posted by GitBox <gi...@apache.org>.
pnoltes commented on pull request #223:
URL: https://github.com/apache/celix/pull/223#issuecomment-629844098


   > > When running the test `pubsub_utils_test`, I get the following failed test:
   > > ```
   > > [ RUN      ] PubSubSerializationProviderTestSuite.FindSerializationServices
   > > [2020-05-12T11:34:51] [info] [celix_framework] [framework_start:579] Celix framework started
   > > [2020-05-12T11:34:51] [error] [celix_pubsub_serialization_provider] Error adding descriptor .pubsub_serialization_provider_cache/bundle1/version0.0/META-INF/descriptors/msg_poi1_invalid1.descriptor. Found msg types with same fqn, but different msg ids. Msg fqn is poi1, but found msg ids are '42' and '2090622942'. Not adding descriptor with msg id 42.
   > > [2020-05-12T11:34:51] [error] [celix_pubsub_serialization_provider] Error adding descriptor .pubsub_serialization_provider_cache/bundle1/version0.0/META-INF/descriptors/msg_poi1_invalid1.descriptor. Found msg types with same msg id, but different msg fqn. Msg id is 42, but found fully qualified names are 'poi1' and 'poi2'. Not adding descriptor with msg fqn poi1.
   > > [2020-05-12T11:34:51] [warning] [celix_pubsub_serialization_provider] FILE:/home/oipo-unencrypted/Programming/celix-apache/libs/dfi/src/dyn_common.c, LINE:118, MSG:Error parsing, expected token '�' got 'h' at position 1
   > > [2020-05-12T11:34:51] [warning] [celix_pubsub_serialization_provider] Cannot parse message from descriptor from entry .pubsub_serialization_provider_cache/bundle1/version0.0/META-INF/descriptors/garbage.descriptor.
   > > 
   > > [2020-05-12T11:34:51] [error] [celix_pubsub_serialization_provider] Error adding descriptor .pubsub_serialization_provider_cache/bundle1/version0.0/META-INF/descriptors/msg_poi3_invalid2.descriptor. Found two different version for msg poi3. This is not supported, please align used versions between bundles!. Versions found 1.1.0 and 1.0.0. Not adding descriptor with version 1.1.0.
   > > [2020-05-12T11:34:51] [error] [celix_pubsub_serialization_provider] Error adding descriptor .pubsub_serialization_provider_cache/bundle1/version0.0/META-INF/descriptors/msg_poi2.descriptor. Found msg types with same msg id, but different msg fqn. Msg id is 5555, but found fully qualified names are 'poi2' and 'poi2_2'. Not adding descriptor with msg fqn poi2.
   > > [2020-05-12T11:34:51] [error] [celix_pubsub_serialization_provider] Error adding descriptor .pubsub_serialization_provider_cache/bundle1/version0.0/META-INF/descriptors/msg_poi2.descriptor. Found msg types with same fqn, but different msg ids. Msg fqn is poi2, but found msg ids are '5555' and '42'. Not adding descriptor with msg id 5555.
   > > /home/oipo-unencrypted/Programming/celix-apache/bundles/pubsub/pubsub_utils/gtest/src/PubSubSerializationProviderTestSuite.cc:66: Failure
   > > Expected equality of these values:
   > >   4
   > >   nrEntries
   > >     Which is: 5
   > > /home/oipo-unencrypted/Programming/celix-apache/bundles/pubsub/pubsub_utils/gtest/src/PubSubSerializationProviderTestSuite.cc:68: Failure
   > > Expected equality of these values:
   > >   4
   > >   nrOfInvalidEntries
   > >     Which is: 3
   > > /home/oipo-unencrypted/Programming/celix-apache/bundles/pubsub/pubsub_utils/gtest/src/PubSubSerializationProviderTestSuite.cc:71: Failure
   > > Expected equality of these values:
   > >   4
   > >   celix_arrayList_size(services)
   > >     Which is: 5
   > > [2020-05-12T11:34:51] [info] [celix_framework] [framework_stop:644] Celix framework stopped
   > > [  FAILED  ] PubSubSerializationProviderTestSuite.FindSerializationServices (1 ms)
   > > ```
   > 
   > probably still some dependency on the order of loading the test descriptor files. I will look into this.
   
   @Oipo Could you retest if the tests still fail?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [celix] pnoltes closed pull request #223: Feature/pubsub custom serializers

Posted by GitBox <gi...@apache.org>.
pnoltes closed pull request #223:
URL: https://github.com/apache/celix/pull/223


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [celix] abroekhuis commented on a change in pull request #223: Feature/pubsub custom serializers

Posted by GitBox <gi...@apache.org>.
abroekhuis commented on a change in pull request #223:
URL: https://github.com/apache/celix/pull/223#discussion_r426298603



##########
File path: bundles/pubsub/pubsub_utils/src/pubsub_serializer_handler.c
##########
@@ -0,0 +1,360 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+
+#include "pubsub_serializer_handler.h"
+
+#include <string.h>
+
+#include "celix_version.h"
+#include "pubsub_message_serialization_service.h"
+#include "celix_log_helper.h"
+
+#define L_DEBUG(...) \
+    celix_logHelper_debug(handler->logHelper, __VA_ARGS__)
+#define L_INFO(...) \
+    celix_logHelper_info(handler->logHelper, __VA_ARGS__)
+#define L_WARN(...) \
+    celix_logHelper_warning(handler->logHelper, __VA_ARGS__)
+#define L_ERROR(...) \
+    celix_logHelper_error(handler->logHelper, __VA_ARGS__)
+
+typedef struct pubsub_serialization_service_entry {
+    long svcId;
+    const celix_properties_t *properties;
+    uint32_t msgId;
+    celix_version_t* msgVersion;
+    char* msgFqn;
+    pubsub_message_serialization_service_t* svc;
+} pubsub_serialization_service_entry_t;
+
+struct pubsub_serializer_handler {
+    celix_bundle_context_t* ctx;
+    bool backwardCompatible;
+    long serializationSvcTrackerId;
+    celix_log_helper_t *logHelper;
+
+    celix_thread_rwlock_t lock;
+    hash_map_t *serializationServices; //key = msg id, value = sorted array list with pubsub_serialization_service_entry_t*
+};
+
+static void addSvc(void *handle, void* svc, const celix_properties_t *props) {
+    pubsub_serializer_handler_t* handler = handle;
+    pubsub_message_serialization_service_t* serSvc = svc;
+    pubsub_serializerHandler_addSerializationService(handler, serSvc, props);
+}
+
+static void remSvc(void *handle, void* svc, const celix_properties_t *props) {
+    pubsub_serializer_handler_t* handler = handle;
+    pubsub_message_serialization_service_t* serSvc = svc;
+    pubsub_serializerHandler_removeSerializationService(handler, serSvc, props);
+}
+
+int compareEntries(const void *a, const void *b) {
+    const pubsub_serialization_service_entry_t* aEntry = a;
+    const pubsub_serialization_service_entry_t* bEntry = b;
+
+    long servIdA = celix_properties_getAsLong(aEntry->properties, OSGI_FRAMEWORK_SERVICE_ID, 0);
+    long servIdB = celix_properties_getAsLong(bEntry->properties, OSGI_FRAMEWORK_SERVICE_ID, 0);
+
+    long servRankingA = celix_properties_getAsLong(aEntry->properties, OSGI_FRAMEWORK_SERVICE_RANKING, 0);
+    long servRankingB = celix_properties_getAsLong(bEntry->properties, OSGI_FRAMEWORK_SERVICE_RANKING, 0);
+
+    return utils_compareServiceIdsAndRanking(servIdA, servRankingA, servIdB, servRankingB);
+}
+
+static pubsub_serialization_service_entry_t* findEntry(pubsub_serializer_handler_t* handler, uint32_t msgId) {
+    //NOTE assumes mutex is locked
+    celix_array_list_t* entries = hashMap_get(handler->serializationServices, (void*)(uintptr_t)msgId);
+    if (entries != NULL) {
+        return celix_arrayList_get(entries, 0); //NOTE if entries not null, always at least 1 entry
+    }
+    return NULL;
+}
+
+static bool isCompatible(pubsub_serializer_handler_t* handler, pubsub_serialization_service_entry_t* entry, int serializedMajorVersion, int serializedMinorVersion) {
+    bool compatible = false;
+    if (handler->backwardCompatible) {
+        compatible = celix_version_isUserCompatible(entry->msgVersion, serializedMajorVersion, serializedMinorVersion);
+    } else {
+        int major = celix_version_getMajor(entry->msgVersion);
+        int minor = celix_version_getMinor(entry->msgVersion);
+        compatible = major == serializedMajorVersion && minor == serializedMinorVersion;
+    }
+    return compatible;
+}
+
+static const char* getMsgFqn(pubsub_serializer_handler_t* handler, uint32_t msgId) {
+    //NOTE assumes mutex is locked
+    const char *result = NULL;
+    celix_array_list_t* entries = hashMap_get(handler->serializationServices, (void*)(uintptr_t)msgId);
+    if (entries != NULL) {
+        pubsub_serialization_service_entry_t *entry = celix_arrayList_get(entries, 0); //NOTE if an entries exists, there is at least 1 entry.
+        result = entry->msgFqn;
+    }
+    return result;
+}
+
+pubsub_serializer_handler_t* pubsub_serializerHandler_create(celix_bundle_context_t* ctx, const char* serializerType, bool backwardCompatible) {
+    pubsub_serializer_handler_t* handler = calloc(1, sizeof(*handler));
+    handler->ctx = ctx;
+    handler->backwardCompatible = backwardCompatible;
+
+    handler->logHelper = celix_logHelper_create(ctx, "celix_pubsub_serialization_handler");
+
+    celixThreadRwlock_create(&handler->lock, NULL);
+    handler->serializationServices = hashMap_create(NULL, NULL, NULL, NULL);
+
+    char *filter = NULL;
+    asprintf(&filter, "(%s=%s)", PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, serializerType);
+    celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
+    opts.filter.serviceName = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_NAME;
+    opts.filter.versionRange = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_RANGE;
+    opts.filter.filter = filter;
+    opts.callbackHandle = handler;
+    opts.addWithProperties = addSvc;
+    opts.removeWithProperties = remSvc;
+    handler->serializationSvcTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
+    free(filter);
+
+    return handler;
+}
+
+
+void pubsub_serializerHandler_destroy(pubsub_serializer_handler_t* handler) {
+    if (handler != NULL) {
+        celix_bundleContext_stopTracker(handler->ctx, handler->serializationSvcTrackerId);
+        celixThreadRwlock_destroy(&handler->lock);
+        hash_map_iterator_t iter = hashMapIterator_construct(handler->serializationServices);
+        while (hashMapIterator_hasNext(&iter)) {
+            celix_array_list_t *entries = hashMapIterator_nextValue(&iter);
+            for (int i = 0; i < celix_arrayList_size(entries); ++i) {
+                pubsub_serialization_service_entry_t* entry = celix_arrayList_get(entries, i);
+                free(entry->msgFqn);
+                celix_version_destroy(entry->msgVersion);
+                free(entry);
+            }
+            celix_arrayList_destroy(entries);
+        }
+        hashMap_destroy(handler->serializationServices, false, false);
+        celix_logHelper_destroy(handler->logHelper);
+        free(handler);
+    }
+}
+
+void pubsub_serializerHandler_addSerializationService(pubsub_serializer_handler_t* handler, pubsub_message_serialization_service_t* svc, const celix_properties_t* svcProperties) {
+    long svcId = celix_properties_getAsLong(svcProperties, OSGI_FRAMEWORK_SERVICE_ID, -1L);
+    const char *msgFqn = celix_properties_get(svcProperties, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_FQN_PROPERTY, NULL);
+    const char *version = celix_properties_get(svcProperties, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_VERSION_PROPERTY, "0.0.0");
+    uint32_t msgId = (uint32_t)celix_properties_getAsLong(svcProperties, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_ID_PROPERTY, 0L);
+
+    if (msgId == 0) {
+        msgId = celix_utils_stringHash(msgFqn);
+    }
+
+    celix_version_t* msgVersion = celix_version_createVersionFromString(version);
+    if (msgVersion == NULL) {
+        L_ERROR("%s service has an invalid %s property. value is '%s'", PUBSUB_MESSAGE_SERIALIZATION_SERVICE_NAME, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_VERSION_PROPERTY, msgVersion);
+        return;
+    }
+
+    celixThreadRwlock_writeLock(&handler->lock);
+
+    pubsub_serialization_service_entry_t* existingEntry = findEntry(handler, msgId);
+
+    bool valid = true;
+    if (existingEntry != NULL && strncmp(existingEntry->msgFqn, msgFqn, 1024*1024) != 0) {
+        L_ERROR("Msg id clash. Registered serialization service with msg id %d and msg fqn '%s' clashes with an existing serialization service using the same msg id and msg fqn '%s'. Ignoring serialization service.", msgId, msgFqn, existingEntry->msgFqn);
+        valid = false;
+    }
+
+    if (existingEntry != NULL && celix_version_compareTo(existingEntry->msgVersion, msgVersion) != 0) {
+        char* existingVersion = celix_version_toString(existingEntry->msgVersion);
+        L_ERROR("Mismatched message versions. Registered serialization service with msg '%s' with version %s, has a different version than an existing serialization service with version '%s'. Ignoring serialization service.", msgFqn, version, existingVersion);
+        free(existingVersion);
+        valid = false;
+    }
+
+    if (valid) {
+        celix_array_list_t *entries = hashMap_get(handler->serializationServices, (void *) (uintptr_t) msgId);
+        if (entries == NULL) {
+            entries = celix_arrayList_create();
+        }
+        pubsub_serialization_service_entry_t *entry = calloc(1, sizeof(*entry));
+        entry->svcId = svcId;
+        entry->properties = svcProperties;
+        entry->msgFqn = celix_utils_strdup(msgFqn);
+        entry->msgId = msgId;
+        entry->msgVersion = msgVersion;
+        entry->svc = svc;
+        celix_arrayList_add(entries, entry);
+        celix_arrayList_sort(entries, compareEntries);
+
+        hashMap_put(handler->serializationServices, (void *) (uintptr_t) msgId, entries);
+    } else {
+        celix_version_destroy(msgVersion);
+    }
+    celixThreadRwlock_unlock(&handler->lock);
+}
+
+void pubsub_serializerHandler_removeSerializationService(pubsub_serializer_handler_t* handler, pubsub_message_serialization_service_t* svc, const celix_properties_t* svcProperties) {
+    long svcId = celix_properties_getAsLong(svcProperties, OSGI_FRAMEWORK_SERVICE_ID, -1L);
+    const char *msgFqn = celix_properties_get(svcProperties, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_FQN_PROPERTY, NULL);
+    uint32_t msgId = (uint32_t)celix_properties_getAsLong(svcProperties, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_ID_PROPERTY, 0L);
+    if (msgId == 0) {
+        msgId = celix_utils_stringHash(msgFqn);
+    }
+
+    celixThreadRwlock_writeLock(&handler->lock);
+    celix_array_list_t* entries = hashMap_get(handler->serializationServices, (void*)(uintptr_t)msgId);
+    if (entries != NULL) {
+        pubsub_serialization_service_entry_t *found = NULL;
+        for (int i = 0; i < celix_arrayList_size(entries); ++i) {
+            pubsub_serialization_service_entry_t *entry = celix_arrayList_get(entries, i);
+            if (entry->svcId == svcId) {
+                found = entry;
+                celix_arrayList_removeAt(entries, i);
+                celix_arrayList_sort(entries, compareEntries);
+                break;
+            }
+        }
+        if (found != NULL) {
+            free(found->msgFqn);
+            celix_version_destroy(found->msgVersion);
+            free(found);
+        }
+        if (celix_arrayList_size(entries) == 0) {
+            hashMap_remove(handler->serializationServices, (void*)(uintptr_t)msgId);
+            celix_arrayList_destroy(entries);
+        }
+    }
+    celixThreadRwlock_unlock(&handler->lock);
+}
+
+celix_status_t pubsub_serializerHandler_serialize(pubsub_serializer_handler_t* handler, uint32_t msgId, const void* input, struct iovec** output, size_t* outputIovLen) {

Review comment:
       iovec already has a length. Why outputIovLen?

##########
File path: bundles/pubsub/pubsub_utils/src/pubsub_serializer_handler.c
##########
@@ -0,0 +1,360 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+
+#include "pubsub_serializer_handler.h"
+
+#include <string.h>
+
+#include "celix_version.h"
+#include "pubsub_message_serialization_service.h"
+#include "celix_log_helper.h"
+
+#define L_DEBUG(...) \
+    celix_logHelper_debug(handler->logHelper, __VA_ARGS__)
+#define L_INFO(...) \
+    celix_logHelper_info(handler->logHelper, __VA_ARGS__)
+#define L_WARN(...) \
+    celix_logHelper_warning(handler->logHelper, __VA_ARGS__)
+#define L_ERROR(...) \
+    celix_logHelper_error(handler->logHelper, __VA_ARGS__)
+
+typedef struct pubsub_serialization_service_entry {
+    long svcId;
+    const celix_properties_t *properties;
+    uint32_t msgId;
+    celix_version_t* msgVersion;
+    char* msgFqn;
+    pubsub_message_serialization_service_t* svc;
+} pubsub_serialization_service_entry_t;
+
+struct pubsub_serializer_handler {
+    celix_bundle_context_t* ctx;
+    bool backwardCompatible;
+    long serializationSvcTrackerId;
+    celix_log_helper_t *logHelper;
+
+    celix_thread_rwlock_t lock;
+    hash_map_t *serializationServices; //key = msg id, value = sorted array list with pubsub_serialization_service_entry_t*
+};
+
+static void addSvc(void *handle, void* svc, const celix_properties_t *props) {
+    pubsub_serializer_handler_t* handler = handle;
+    pubsub_message_serialization_service_t* serSvc = svc;
+    pubsub_serializerHandler_addSerializationService(handler, serSvc, props);
+}
+
+static void remSvc(void *handle, void* svc, const celix_properties_t *props) {
+    pubsub_serializer_handler_t* handler = handle;
+    pubsub_message_serialization_service_t* serSvc = svc;
+    pubsub_serializerHandler_removeSerializationService(handler, serSvc, props);
+}
+
+int compareEntries(const void *a, const void *b) {
+    const pubsub_serialization_service_entry_t* aEntry = a;
+    const pubsub_serialization_service_entry_t* bEntry = b;
+
+    long servIdA = celix_properties_getAsLong(aEntry->properties, OSGI_FRAMEWORK_SERVICE_ID, 0);
+    long servIdB = celix_properties_getAsLong(bEntry->properties, OSGI_FRAMEWORK_SERVICE_ID, 0);
+
+    long servRankingA = celix_properties_getAsLong(aEntry->properties, OSGI_FRAMEWORK_SERVICE_RANKING, 0);
+    long servRankingB = celix_properties_getAsLong(bEntry->properties, OSGI_FRAMEWORK_SERVICE_RANKING, 0);
+
+    return utils_compareServiceIdsAndRanking(servIdA, servRankingA, servIdB, servRankingB);
+}
+
+static pubsub_serialization_service_entry_t* findEntry(pubsub_serializer_handler_t* handler, uint32_t msgId) {
+    //NOTE assumes mutex is locked
+    celix_array_list_t* entries = hashMap_get(handler->serializationServices, (void*)(uintptr_t)msgId);
+    if (entries != NULL) {
+        return celix_arrayList_get(entries, 0); //NOTE if entries not null, always at least 1 entry
+    }
+    return NULL;
+}
+
+static bool isCompatible(pubsub_serializer_handler_t* handler, pubsub_serialization_service_entry_t* entry, int serializedMajorVersion, int serializedMinorVersion) {
+    bool compatible = false;
+    if (handler->backwardCompatible) {
+        compatible = celix_version_isUserCompatible(entry->msgVersion, serializedMajorVersion, serializedMinorVersion);
+    } else {
+        int major = celix_version_getMajor(entry->msgVersion);
+        int minor = celix_version_getMinor(entry->msgVersion);
+        compatible = major == serializedMajorVersion && minor == serializedMinorVersion;

Review comment:
       Is there no function in version for this?

##########
File path: bundles/pubsub/pubsub_serializer_avrobin/gtest/CMakeLists.txt
##########
@@ -0,0 +1,36 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+add_celix_bundle(pubsub_avrobin_serialization_descriptor NO_ACTIVATOR VERSION 1.0.0)
+celix_bundle_files(pubsub_avrobin_serialization_descriptor
+		${CMAKE_CURRENT_SOURCE_DIR}/msg_descriptors/msg_poi1.descriptor
+		DESTINATION "META-INF/descriptors"
+)
+
+add_executable(test_pubsub_serializer_avrobin
+        src/PubSubAvrobinSerializationProviderTestSuite.cc
+)
+target_link_libraries(test_pubsub_serializer_avrobin PRIVATE Celix::framework Celix::dfi Celix::pubsub_utils GTest::gtest GTest::gtest_main)
+target_compile_options(test_pubsub_serializer_avrobin PRIVATE -std=c++14) #Note test code is allowed to be C++14
+
+add_dependencies(test_pubsub_serializer_avrobin celix_pubsub_serializer_avrobin_bundle pubsub_avrobin_serialization_descriptor_bundle)
+target_compile_definitions(test_pubsub_serializer_avrobin PRIVATE -DSER_BUNDLE=\"$<TARGET_PROPERTY:celix_pubsub_serializer_avrobin,BUNDLE_FILE>\")

Review comment:
       SER_BUNDLE -> SERIALIZATION_BUNDLE?

##########
File path: bundles/pubsub/pubsub_utils/src/pubsub_serializer_handler.c
##########
@@ -0,0 +1,360 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+
+#include "pubsub_serializer_handler.h"
+
+#include <string.h>
+
+#include "celix_version.h"
+#include "pubsub_message_serialization_service.h"
+#include "celix_log_helper.h"
+
+#define L_DEBUG(...) \
+    celix_logHelper_debug(handler->logHelper, __VA_ARGS__)
+#define L_INFO(...) \
+    celix_logHelper_info(handler->logHelper, __VA_ARGS__)
+#define L_WARN(...) \
+    celix_logHelper_warning(handler->logHelper, __VA_ARGS__)
+#define L_ERROR(...) \
+    celix_logHelper_error(handler->logHelper, __VA_ARGS__)
+
+typedef struct pubsub_serialization_service_entry {
+    long svcId;
+    const celix_properties_t *properties;
+    uint32_t msgId;
+    celix_version_t* msgVersion;
+    char* msgFqn;
+    pubsub_message_serialization_service_t* svc;
+} pubsub_serialization_service_entry_t;
+
+struct pubsub_serializer_handler {
+    celix_bundle_context_t* ctx;
+    bool backwardCompatible;
+    long serializationSvcTrackerId;
+    celix_log_helper_t *logHelper;
+
+    celix_thread_rwlock_t lock;
+    hash_map_t *serializationServices; //key = msg id, value = sorted array list with pubsub_serialization_service_entry_t*
+};
+
+static void addSvc(void *handle, void* svc, const celix_properties_t *props) {

Review comment:
       Why not addService? Much more obvious in reading.
   
   Applies to several other functions as well. Often it is fully descriptive, but there are several shorthand versions.

##########
File path: bundles/pubsub/pubsub_serializer_avrobin/src/pubsub_avrobin_serialization_provider.c
##########
@@ -0,0 +1,120 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#include "pubsub_avrobin_serialization_provider.h"
+
+#include <stdlib.h>
+#include <stdarg.h>
+#include <string.h>
+#include <assert.h>
+
+#include "avrobin_serializer.h"
+#include "dyn_message.h"
+#include "celix_log_helper.h"
+#include "pubsub_message_serialization_service.h"
+
+static void dfi_log(void *handle, int level, const char *file, int line, const char *msg, ...) {
+    va_list ap;
+    celix_log_helper_t *log = handle;
+    char *logStr = NULL;
+    va_start(ap, msg);
+    vasprintf(&logStr, msg, ap);
+    va_end(ap);
+    celix_logHelper_log(log, level, "FILE:%s, LINE:%i, MSG:%s", file, line, logStr);
+    free(logStr);
+}
+
+
+static celix_status_t pubsub_avrobinSerializationProvider_serialize(pubsub_serialization_entry_t* entry, const void* msg, struct iovec** output, size_t* outputIovLen) {
+    celix_status_t status = CELIX_SUCCESS;
+
+    if (output != NULL) {
+        *output = calloc(1, sizeof(struct iovec));
+        *outputIovLen = 1;
+    } else {
+        return CELIX_ILLEGAL_ARGUMENT;
+    }
+
+    uint8_t *serializedOutput = NULL;
+    size_t serializedOutputLen;
+    dyn_type* dynType;
+    dynMessage_getMessageType(entry->msgType, &dynType);
+
+    if (avrobinSerializer_serialize(dynType, msg, &serializedOutput, &serializedOutputLen) != 0) {
+        status = CELIX_BUNDLE_EXCEPTION;
+    }
+
+    if (status == CELIX_SUCCESS) {
+        (**output).iov_base = (void*)serializedOutput;
+        (**output).iov_len  = serializedOutputLen;
+    }
+
+    return status;
+}
+
+void pubsub_avrobinSerializationProvider_freeSerializeMsg(pubsub_serialization_entry_t* entry, struct iovec* input, size_t inputIovLen) {
+    if (input != NULL) {
+        if (entry->msgType != NULL) {
+            for (int i = 0; i < inputIovLen; i++) {
+                if (input[i].iov_base) {
+                    free(input[i].iov_base);
+                }
+                input[i].iov_base = NULL;
+                input[i].iov_len = 0;
+            }
+        }
+        free(input);
+    }
+}
+
+celix_status_t pubsub_avrobinSerializationProvider_deserialize(pubsub_serialization_entry_t* entry, const struct iovec* input, size_t inputIovLen, void **out) {
+    celix_status_t status = CELIX_SUCCESS;
+    if (input == NULL) return CELIX_BUNDLE_EXCEPTION;
+    void *msg = NULL;
+    dyn_type* dynType;
+    dynMessage_getMessageType(entry->msgType, &dynType);
+
+    assert(inputIovLen == 1);
+
+    if (avrobinSerializer_deserialize(dynType, (uint8_t *)input->iov_base, input->iov_len, &msg) != 0) {
+        status = CELIX_BUNDLE_EXCEPTION;
+    } else{
+        *out = msg;
+    }
+
+    return status;
+}
+
+void pubsub_avrobinSerializationProvider_freeDeserializeMsg(pubsub_serialization_entry_t* entry, void *msg) {
+    if (entry->msgType != NULL) {
+        dyn_type* dynType;
+        dynMessage_getMessageType(entry->msgType, &dynType);
+        dynType_free(dynType, msg);
+    }
+}
+
+pubsub_serialization_provider_t* pubsub_avrobinSerializationProvider_create(celix_bundle_context_t* ctx)  {
+    pubsub_serialization_provider_t* provider = pubsub_serializationProvider_create(ctx, "avrobin", 0, pubsub_avrobinSerializationProvider_serialize, pubsub_avrobinSerializationProvider_freeSerializeMsg, pubsub_avrobinSerializationProvider_deserialize, pubsub_avrobinSerializationProvider_freeDeserializeMsg);
+    avrobinSerializer_logSetup(dfi_log, pubsub_serializationProvider_getLogHelper(provider), 1);;

Review comment:
       double ; at end of line

##########
File path: bundles/pubsub/pubsub_utils/src/pubsub_serialization_provider.c
##########
@@ -0,0 +1,668 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#include "pubsub_serialization_provider.h"
+
+#include <stdlib.h>
+#include <stdarg.h>
+#include <dirent.h>
+#include <string.h>
+
+#include "celix_constants.h"
+#include "dyn_function.h"
+#include "celix_version.h"
+#include "celix_utils.h"
+#include "dyn_message.h"
+#include "pubsub_utils.h"
+#include "celix_log_helper.h"
+#include "pubsub_message_serialization_service.h"
+#include "celix_shell_command.h"
+
+#define MAX_PATH_LEN    1024
+
+typedef enum
+{
+    FIT_INVALID = 0,
+    FIT_DESCRIPTOR = 1,
+    FIT_AVPR = 2
+} descriptor_type_e;
+
+#define L_DEBUG(...) \
+    celix_logHelper_debug(provider->logHelper, __VA_ARGS__)
+#define L_INFO(...) \
+    celix_logHelper_info(provider->logHelper, __VA_ARGS__)
+#define L_WARN(...) \
+    celix_logHelper_warning(provider->logHelper, __VA_ARGS__)
+#define L_ERROR(...) \
+    celix_logHelper_error(provider->logHelper, __VA_ARGS__)
+
+
+struct pubsub_serialization_provider {
+    celix_bundle_context_t *ctx;
+    celix_log_helper_t *logHelper;
+    char* serializationType;
+
+    //serialization callbacks
+    celix_status_t (*serialize)(pubsub_serialization_entry_t* entry, const void* msg, struct iovec** output, size_t* outputIovLen);
+    void (*freeSerializeMsg)(pubsub_serialization_entry_t* entry, struct iovec* input, size_t inputIovLen);
+    celix_status_t (*deserialize)(pubsub_serialization_entry_t* entry, const struct iovec* input, size_t inputIovLen __attribute__((unused)), void **out);
+    void (*freeDeserializeMsg)(pubsub_serialization_entry_t* entry, void *msg);
+
+    //updated serialization services
+    long bundleTrackerId;
+
+    pubsub_message_serialization_marker_t markerSvc;
+    long serializationMarkerSvcId;
+
+    celix_shell_command_t cmdSvc;
+    long cmdSvcId;
+
+    celix_thread_mutex_t mutex; //protects below
+    celix_array_list_t *serializationSvcEntries; //key = pubsub_serialization_entry;
+};
+
+static void dfi_log(void *handle, int level, const char *file, int line, const char *msg, ...) {
+    (void)level;
+    va_list ap;
+    pubsub_serialization_provider_t *provider = handle;
+    char *logStr = NULL;
+    va_start(ap, msg);
+    vasprintf(&logStr, msg, ap);
+    va_end(ap);
+    celix_logHelper_log(provider->logHelper, CELIX_LOG_LEVEL_WARNING, "FILE:%s, LINE:%i, MSG:%s", file, line, logStr);
+    free(logStr);
+}
+
+static descriptor_type_e getDescriptorType(const char* filename) {
+    if (strstr(filename, ".descriptor")) {
+        return FIT_DESCRIPTOR;
+    }
+    else if (strstr(filename, ".properties")) {
+        return FIT_AVPR;
+    }
+    else {
+        return FIT_INVALID;
+    }
+}
+
+static bool readPropertiesFile(pubsub_serialization_provider_t* provider, const char* properties_file_name, const char* root, char* avpr_fqn, char* path) {
+    snprintf(path, MAX_PATH_LEN, "%s/%s", root, properties_file_name); // use path to create path to properties file
+    FILE *properties = fopen(path, "r");
+    if (!properties) {
+        L_WARN("Could not find or open %s as a properties file in %s\n", properties_file_name, root);
+        return false;
+    }
+
+    *avpr_fqn = '\0';
+    *path = '\0'; //re-use path to create path to avpr file
+    char *p_line = malloc(MAX_PATH_LEN);
+    size_t line_len = MAX_PATH_LEN;
+    while (getline(&p_line, &line_len, properties) >= 0) {
+        if (strncmp(p_line, "fqn=", strlen("fqn=")) == 0) {
+            snprintf(avpr_fqn, MAX_PATH_LEN, "%s", (p_line + strlen("fqn=")));
+            avpr_fqn[strcspn(avpr_fqn, "\n")] = 0;
+        }
+        else if (strncmp(p_line, "avpr=", strlen("avpr=")) == 0) {
+            snprintf(path, MAX_PATH_LEN, "%s/%s", root, (p_line + strlen("avpr=")));
+            path[strcspn(path, "\n")] = 0;
+        }
+    }
+    free(p_line);
+    fclose(properties);
+
+    if (*avpr_fqn == '\0') {
+        L_WARN("File %s does not contain a fully qualified name for the parser\n", properties_file_name);
+        return false;
+    }
+
+    if (*path == '\0') {
+        L_WARN("File %s does not contain a location for the avpr file\n", properties_file_name);
+        return false;
+    }
+
+    return true;
+}
+
+static FILE* openFileStream(pubsub_serialization_provider_t* provider, descriptor_type_e descriptorType, const char* filename, const char* root, char* avpr_fqn, char* pathOrError) {
+    FILE* result = NULL;
+    memset(pathOrError, 0, MAX_PATH_LEN);
+    switch (descriptorType) {
+        case FIT_INVALID:
+            snprintf(pathOrError, MAX_PATH_LEN, "Because %s is not a valid file", filename);
+            break;
+        case FIT_DESCRIPTOR:
+            snprintf(pathOrError, MAX_PATH_LEN, "%s/%s", root, filename);
+            result = fopen(pathOrError, "r");
+            break;
+        case FIT_AVPR:
+            if (readPropertiesFile(provider, filename, root, avpr_fqn, pathOrError)) {
+                result = fopen(pathOrError, "r");
+            }
+            break;
+        default:
+            L_WARN("Unknown file input type, returning NULL!\n");
+            break;
+    }
+
+    return result;
+}
+
+static unsigned int pubsub_serializationProvider_getMsgId(pubsub_serialization_provider_t* provider __attribute__((unused)), dyn_message_type *msg) {
+    unsigned int msgId = 0;
+
+    char *msgName = NULL;
+    dynMessage_getName(msg, &msgName);
+
+    char *msgIdStr = NULL;
+    int rv = dynMessage_getAnnotationEntry(msg, "msgId", &msgIdStr);
+    if (rv == CELIX_SUCCESS && msgIdStr != NULL) {
+        // custom msg id passed, use it
+        long customMsgId = strtol(msgIdStr, NULL, 10);
+        if (customMsgId > 0) {
+            msgId = (unsigned int) customMsgId;
+        }
+    }
+    if (msgId == 0) {
+        msgId = celix_utils_stringHash(msgName);
+    }
+
+    return msgId;
+}
+
+static dyn_message_type* pubsub_serializationProvider_parseDfiDescriptor(pubsub_serialization_provider_t* provider, FILE* stream, const char* entryPath) {
+    dyn_message_type *msg = NULL;
+    int rc = dynMessage_parse(stream, &msg);
+    if (rc != 0 || msg == NULL) {
+        L_WARN("Cannot parse message from descriptor from entry %s.\n", entryPath);
+        return NULL;
+    }
+
+    char *msgName = NULL;
+    rc += dynMessage_getName(msg, &msgName);
+
+    version_pt msgVersion = NULL;
+    rc += dynMessage_getVersion(msg, &msgVersion);
+
+    if (rc != 0 || msgName == NULL || msgVersion == NULL) {
+        L_WARN("Cannot retrieve name and/or version from msg, using entry %s.\n", entryPath);
+        dynMessage_destroy(msg);
+        return NULL;
+    }
+
+    return msg;
+}
+
+//TODO FIXME, see #158
+//
+//    static dyn_message_type* pubsub_serializationProvider_parseAvprDescriptor(pubsub_serialization_provider_t* provider, FILE* stream, const char *entryName, const char* fqn) {
+//
+//    //dyn_message_type* msgType = dynMessage_parseAvpr(file_ptr, fqn);
+//    dyn_message_type* msgType = NULL;
+//
+//    if (!msgType) {
+//        L_WARN("[json serializer] Cannot parse avpr file '%s'\n", fqn);
+//        return -1;
+//    }
+//
+//    dyn_type* type;
+//    dynMessage_getMessageType(msgType, &type);
+//
+//    const char *msgName = dynType_getName(type);
+//
+//    version_pt msgVersion = NULL;
+//    celix_status_t s = version_createVersionFromString(dynType_getMetaInfo(type, "version"), &msgVersion);
+//
+//    if (s != CELIX_SUCCESS || !msgName) {
+//        L_WARN("[json serializer] Cannot retrieve name and/or version from msg\n");
+//        if (s == CELIX_SUCCESS) {
+//            version_destroy(msgVersion);
+//        }
+//        return -1;
+//    }
+//
+//    unsigned int msgId = 0;
+//    const char *msgIdStr = dynType_getMetaInfo(type, "msgId");
+//    if (msgIdStr != NULL) {
+//        // custom msg id passed, use it
+//        long customMsgId = strtol(msgIdStr, NULL, 10);
+//        if (customMsgId > 0)
+//            msgId = (unsigned int) customMsgId;
+//    }
+//
+//    if (msgId == 0) {
+//        msgId = utils_stringHash(msgName);
+//    }
+//
+//
+//
+//    return 0;
+//}
+//}
+
+/**
+ * Returns true if the msgType is valid and uqinue (new msg fqn & msg id).
+ * Logs error if msg id clashes or versions are different.
+ * Logs warning if descriptors are different.
+ */
+static bool pubsub_serializationProvider_isUniqueAndCheckValid(pubsub_serialization_provider_t* provider, pubsub_serialization_entry_t* entry) {

Review comment:
       isUniqueAndValid?

##########
File path: bundles/pubsub/pubsub_utils/src/pubsub_serialization_provider.c
##########
@@ -0,0 +1,668 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#include "pubsub_serialization_provider.h"
+
+#include <stdlib.h>
+#include <stdarg.h>
+#include <dirent.h>
+#include <string.h>
+
+#include "celix_constants.h"
+#include "dyn_function.h"
+#include "celix_version.h"
+#include "celix_utils.h"
+#include "dyn_message.h"
+#include "pubsub_utils.h"
+#include "celix_log_helper.h"
+#include "pubsub_message_serialization_service.h"
+#include "celix_shell_command.h"
+
+#define MAX_PATH_LEN    1024
+
+typedef enum
+{
+    FIT_INVALID = 0,
+    FIT_DESCRIPTOR = 1,
+    FIT_AVPR = 2
+} descriptor_type_e;
+
+#define L_DEBUG(...) \
+    celix_logHelper_debug(provider->logHelper, __VA_ARGS__)
+#define L_INFO(...) \
+    celix_logHelper_info(provider->logHelper, __VA_ARGS__)
+#define L_WARN(...) \
+    celix_logHelper_warning(provider->logHelper, __VA_ARGS__)
+#define L_ERROR(...) \
+    celix_logHelper_error(provider->logHelper, __VA_ARGS__)
+
+
+struct pubsub_serialization_provider {
+    celix_bundle_context_t *ctx;
+    celix_log_helper_t *logHelper;
+    char* serializationType;
+
+    //serialization callbacks
+    celix_status_t (*serialize)(pubsub_serialization_entry_t* entry, const void* msg, struct iovec** output, size_t* outputIovLen);
+    void (*freeSerializeMsg)(pubsub_serialization_entry_t* entry, struct iovec* input, size_t inputIovLen);
+    celix_status_t (*deserialize)(pubsub_serialization_entry_t* entry, const struct iovec* input, size_t inputIovLen __attribute__((unused)), void **out);
+    void (*freeDeserializeMsg)(pubsub_serialization_entry_t* entry, void *msg);
+
+    //updated serialization services
+    long bundleTrackerId;
+
+    pubsub_message_serialization_marker_t markerSvc;
+    long serializationMarkerSvcId;
+
+    celix_shell_command_t cmdSvc;
+    long cmdSvcId;
+
+    celix_thread_mutex_t mutex; //protects below
+    celix_array_list_t *serializationSvcEntries; //key = pubsub_serialization_entry;
+};
+
+static void dfi_log(void *handle, int level, const char *file, int line, const char *msg, ...) {
+    (void)level;
+    va_list ap;
+    pubsub_serialization_provider_t *provider = handle;
+    char *logStr = NULL;
+    va_start(ap, msg);
+    vasprintf(&logStr, msg, ap);
+    va_end(ap);
+    celix_logHelper_log(provider->logHelper, CELIX_LOG_LEVEL_WARNING, "FILE:%s, LINE:%i, MSG:%s", file, line, logStr);
+    free(logStr);
+}
+
+static descriptor_type_e getDescriptorType(const char* filename) {
+    if (strstr(filename, ".descriptor")) {
+        return FIT_DESCRIPTOR;
+    }
+    else if (strstr(filename, ".properties")) {
+        return FIT_AVPR;
+    }
+    else {
+        return FIT_INVALID;
+    }
+}
+
+static bool readPropertiesFile(pubsub_serialization_provider_t* provider, const char* properties_file_name, const char* root, char* avpr_fqn, char* path) {

Review comment:
       This reads the dfi files? Name is confusing/conflicting with the regular properties read/write.

##########
File path: bundles/pubsub/pubsub_utils/src/pubsub_serializer_handler.c
##########
@@ -0,0 +1,360 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+
+#include "pubsub_serializer_handler.h"
+
+#include <string.h>
+
+#include "celix_version.h"
+#include "pubsub_message_serialization_service.h"
+#include "celix_log_helper.h"
+
+#define L_DEBUG(...) \
+    celix_logHelper_debug(handler->logHelper, __VA_ARGS__)
+#define L_INFO(...) \
+    celix_logHelper_info(handler->logHelper, __VA_ARGS__)
+#define L_WARN(...) \
+    celix_logHelper_warning(handler->logHelper, __VA_ARGS__)
+#define L_ERROR(...) \
+    celix_logHelper_error(handler->logHelper, __VA_ARGS__)
+
+typedef struct pubsub_serialization_service_entry {
+    long svcId;
+    const celix_properties_t *properties;
+    uint32_t msgId;
+    celix_version_t* msgVersion;
+    char* msgFqn;
+    pubsub_message_serialization_service_t* svc;
+} pubsub_serialization_service_entry_t;
+
+struct pubsub_serializer_handler {
+    celix_bundle_context_t* ctx;
+    bool backwardCompatible;
+    long serializationSvcTrackerId;
+    celix_log_helper_t *logHelper;
+
+    celix_thread_rwlock_t lock;
+    hash_map_t *serializationServices; //key = msg id, value = sorted array list with pubsub_serialization_service_entry_t*
+};
+
+static void addSvc(void *handle, void* svc, const celix_properties_t *props) {
+    pubsub_serializer_handler_t* handler = handle;
+    pubsub_message_serialization_service_t* serSvc = svc;
+    pubsub_serializerHandler_addSerializationService(handler, serSvc, props);
+}
+
+static void remSvc(void *handle, void* svc, const celix_properties_t *props) {
+    pubsub_serializer_handler_t* handler = handle;
+    pubsub_message_serialization_service_t* serSvc = svc;
+    pubsub_serializerHandler_removeSerializationService(handler, serSvc, props);
+}
+
+int compareEntries(const void *a, const void *b) {
+    const pubsub_serialization_service_entry_t* aEntry = a;
+    const pubsub_serialization_service_entry_t* bEntry = b;
+
+    long servIdA = celix_properties_getAsLong(aEntry->properties, OSGI_FRAMEWORK_SERVICE_ID, 0);
+    long servIdB = celix_properties_getAsLong(bEntry->properties, OSGI_FRAMEWORK_SERVICE_ID, 0);
+
+    long servRankingA = celix_properties_getAsLong(aEntry->properties, OSGI_FRAMEWORK_SERVICE_RANKING, 0);
+    long servRankingB = celix_properties_getAsLong(bEntry->properties, OSGI_FRAMEWORK_SERVICE_RANKING, 0);
+
+    return utils_compareServiceIdsAndRanking(servIdA, servRankingA, servIdB, servRankingB);
+}
+
+static pubsub_serialization_service_entry_t* findEntry(pubsub_serializer_handler_t* handler, uint32_t msgId) {
+    //NOTE assumes mutex is locked
+    celix_array_list_t* entries = hashMap_get(handler->serializationServices, (void*)(uintptr_t)msgId);
+    if (entries != NULL) {
+        return celix_arrayList_get(entries, 0); //NOTE if entries not null, always at least 1 entry
+    }
+    return NULL;
+}
+
+static bool isCompatible(pubsub_serializer_handler_t* handler, pubsub_serialization_service_entry_t* entry, int serializedMajorVersion, int serializedMinorVersion) {
+    bool compatible = false;
+    if (handler->backwardCompatible) {
+        compatible = celix_version_isUserCompatible(entry->msgVersion, serializedMajorVersion, serializedMinorVersion);
+    } else {
+        int major = celix_version_getMajor(entry->msgVersion);
+        int minor = celix_version_getMinor(entry->msgVersion);
+        compatible = major == serializedMajorVersion && minor == serializedMinorVersion;
+    }
+    return compatible;
+}
+
+static const char* getMsgFqn(pubsub_serializer_handler_t* handler, uint32_t msgId) {
+    //NOTE assumes mutex is locked
+    const char *result = NULL;
+    celix_array_list_t* entries = hashMap_get(handler->serializationServices, (void*)(uintptr_t)msgId);
+    if (entries != NULL) {
+        pubsub_serialization_service_entry_t *entry = celix_arrayList_get(entries, 0); //NOTE if an entries exists, there is at least 1 entry.
+        result = entry->msgFqn;
+    }
+    return result;
+}
+
+pubsub_serializer_handler_t* pubsub_serializerHandler_create(celix_bundle_context_t* ctx, const char* serializerType, bool backwardCompatible) {
+    pubsub_serializer_handler_t* handler = calloc(1, sizeof(*handler));
+    handler->ctx = ctx;
+    handler->backwardCompatible = backwardCompatible;
+
+    handler->logHelper = celix_logHelper_create(ctx, "celix_pubsub_serialization_handler");
+
+    celixThreadRwlock_create(&handler->lock, NULL);
+    handler->serializationServices = hashMap_create(NULL, NULL, NULL, NULL);
+
+    char *filter = NULL;
+    asprintf(&filter, "(%s=%s)", PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, serializerType);
+    celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
+    opts.filter.serviceName = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_NAME;
+    opts.filter.versionRange = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_RANGE;
+    opts.filter.filter = filter;
+    opts.callbackHandle = handler;
+    opts.addWithProperties = addSvc;
+    opts.removeWithProperties = remSvc;
+    handler->serializationSvcTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
+    free(filter);
+
+    return handler;
+}
+
+
+void pubsub_serializerHandler_destroy(pubsub_serializer_handler_t* handler) {
+    if (handler != NULL) {
+        celix_bundleContext_stopTracker(handler->ctx, handler->serializationSvcTrackerId);
+        celixThreadRwlock_destroy(&handler->lock);
+        hash_map_iterator_t iter = hashMapIterator_construct(handler->serializationServices);
+        while (hashMapIterator_hasNext(&iter)) {
+            celix_array_list_t *entries = hashMapIterator_nextValue(&iter);
+            for (int i = 0; i < celix_arrayList_size(entries); ++i) {
+                pubsub_serialization_service_entry_t* entry = celix_arrayList_get(entries, i);
+                free(entry->msgFqn);
+                celix_version_destroy(entry->msgVersion);
+                free(entry);
+            }
+            celix_arrayList_destroy(entries);
+        }
+        hashMap_destroy(handler->serializationServices, false, false);
+        celix_logHelper_destroy(handler->logHelper);
+        free(handler);
+    }
+}
+
+void pubsub_serializerHandler_addSerializationService(pubsub_serializer_handler_t* handler, pubsub_message_serialization_service_t* svc, const celix_properties_t* svcProperties) {
+    long svcId = celix_properties_getAsLong(svcProperties, OSGI_FRAMEWORK_SERVICE_ID, -1L);
+    const char *msgFqn = celix_properties_get(svcProperties, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_FQN_PROPERTY, NULL);
+    const char *version = celix_properties_get(svcProperties, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_VERSION_PROPERTY, "0.0.0");
+    uint32_t msgId = (uint32_t)celix_properties_getAsLong(svcProperties, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_ID_PROPERTY, 0L);
+
+    if (msgId == 0) {
+        msgId = celix_utils_stringHash(msgFqn);
+    }
+
+    celix_version_t* msgVersion = celix_version_createVersionFromString(version);
+    if (msgVersion == NULL) {
+        L_ERROR("%s service has an invalid %s property. value is '%s'", PUBSUB_MESSAGE_SERIALIZATION_SERVICE_NAME, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_VERSION_PROPERTY, msgVersion);
+        return;
+    }
+
+    celixThreadRwlock_writeLock(&handler->lock);
+
+    pubsub_serialization_service_entry_t* existingEntry = findEntry(handler, msgId);
+
+    bool valid = true;
+    if (existingEntry != NULL && strncmp(existingEntry->msgFqn, msgFqn, 1024*1024) != 0) {
+        L_ERROR("Msg id clash. Registered serialization service with msg id %d and msg fqn '%s' clashes with an existing serialization service using the same msg id and msg fqn '%s'. Ignoring serialization service.", msgId, msgFqn, existingEntry->msgFqn);
+        valid = false;
+    }
+
+    if (existingEntry != NULL && celix_version_compareTo(existingEntry->msgVersion, msgVersion) != 0) {
+        char* existingVersion = celix_version_toString(existingEntry->msgVersion);
+        L_ERROR("Mismatched message versions. Registered serialization service with msg '%s' with version %s, has a different version than an existing serialization service with version '%s'. Ignoring serialization service.", msgFqn, version, existingVersion);
+        free(existingVersion);
+        valid = false;
+    }
+
+    if (valid) {
+        celix_array_list_t *entries = hashMap_get(handler->serializationServices, (void *) (uintptr_t) msgId);
+        if (entries == NULL) {
+            entries = celix_arrayList_create();
+        }
+        pubsub_serialization_service_entry_t *entry = calloc(1, sizeof(*entry));
+        entry->svcId = svcId;
+        entry->properties = svcProperties;
+        entry->msgFqn = celix_utils_strdup(msgFqn);
+        entry->msgId = msgId;
+        entry->msgVersion = msgVersion;
+        entry->svc = svc;
+        celix_arrayList_add(entries, entry);
+        celix_arrayList_sort(entries, compareEntries);
+
+        hashMap_put(handler->serializationServices, (void *) (uintptr_t) msgId, entries);
+    } else {
+        celix_version_destroy(msgVersion);
+    }
+    celixThreadRwlock_unlock(&handler->lock);
+}
+
+void pubsub_serializerHandler_removeSerializationService(pubsub_serializer_handler_t* handler, pubsub_message_serialization_service_t* svc, const celix_properties_t* svcProperties) {
+    long svcId = celix_properties_getAsLong(svcProperties, OSGI_FRAMEWORK_SERVICE_ID, -1L);
+    const char *msgFqn = celix_properties_get(svcProperties, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_FQN_PROPERTY, NULL);
+    uint32_t msgId = (uint32_t)celix_properties_getAsLong(svcProperties, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_ID_PROPERTY, 0L);
+    if (msgId == 0) {
+        msgId = celix_utils_stringHash(msgFqn);
+    }
+
+    celixThreadRwlock_writeLock(&handler->lock);
+    celix_array_list_t* entries = hashMap_get(handler->serializationServices, (void*)(uintptr_t)msgId);
+    if (entries != NULL) {
+        pubsub_serialization_service_entry_t *found = NULL;
+        for (int i = 0; i < celix_arrayList_size(entries); ++i) {
+            pubsub_serialization_service_entry_t *entry = celix_arrayList_get(entries, i);
+            if (entry->svcId == svcId) {
+                found = entry;
+                celix_arrayList_removeAt(entries, i);
+                celix_arrayList_sort(entries, compareEntries);
+                break;
+            }
+        }
+        if (found != NULL) {
+            free(found->msgFqn);
+            celix_version_destroy(found->msgVersion);
+            free(found);
+        }
+        if (celix_arrayList_size(entries) == 0) {
+            hashMap_remove(handler->serializationServices, (void*)(uintptr_t)msgId);
+            celix_arrayList_destroy(entries);
+        }
+    }
+    celixThreadRwlock_unlock(&handler->lock);
+}
+
+celix_status_t pubsub_serializerHandler_serialize(pubsub_serializer_handler_t* handler, uint32_t msgId, const void* input, struct iovec** output, size_t* outputIovLen) {
+    celix_status_t status;
+    celixThreadRwlock_readLock(&handler->lock);
+    pubsub_serialization_service_entry_t* entry = findEntry(handler, msgId);
+    if (entry != NULL) {
+        status = entry->svc->serialize(entry->svc->handle, input, output, outputIovLen);
+    } else {
+        status = CELIX_ILLEGAL_ARGUMENT;
+        L_ERROR("Cannot find message serialization service for msg id %u.", msgId);
+    }
+    celixThreadRwlock_unlock(&handler->lock);
+    return status;
+}
+
+celix_status_t pubsub_serializerHandler_freeSerializedMsg(pubsub_serializer_handler_t* handler, uint32_t msgId, struct iovec* input, size_t inputIovLen) {
+    celix_status_t status = CELIX_SUCCESS;
+    celixThreadRwlock_readLock(&handler->lock);
+    pubsub_serialization_service_entry_t* entry = findEntry(handler, msgId);
+    if (entry != NULL) {
+        entry->svc->freeSerializedMsg(entry->svc->handle, input, inputIovLen);
+    } else {
+        status = CELIX_ILLEGAL_ARGUMENT;
+        L_ERROR("Cannot find message serialization service for msg id %u.", msgId);
+    }
+    celixThreadRwlock_unlock(&handler->lock);
+    return status;
+
+}
+
+celix_status_t pubsub_serializerHandler_deserialize(pubsub_serializer_handler_t* handler, uint32_t msgId, int serializedMajorVersion, int serializedMinorVersion, const struct iovec* input, size_t inputIovLen, void** out) {
+    celix_status_t status;
+    celixThreadRwlock_readLock(&handler->lock);
+    pubsub_serialization_service_entry_t* entry = findEntry(handler, msgId);
+    bool compatible = false;
+    if (entry != NULL) {
+        compatible = isCompatible(handler, entry, serializedMajorVersion, serializedMinorVersion);
+        if (compatible) {
+            status = entry->svc->deserialize(entry->svc->handle, input, inputIovLen, out);
+        } else {
+            status = CELIX_ILLEGAL_ARGUMENT;
+            char *version = celix_version_toString(entry->msgVersion);
+            L_ERROR("Cannot deserialize for message %s version %s. The serialized input has a version of %d.%d.x and this is incompatible.", entry->msgFqn, version, serializedMajorVersion, serializedMinorVersion);
+            free(version);
+        }
+    } else {
+        status = CELIX_ILLEGAL_ARGUMENT;
+        L_ERROR("Cannot find message serialization service for msg id %u.", msgId);
+    }
+    celixThreadRwlock_unlock(&handler->lock);
+    return status;
+}
+
+celix_status_t pubsub_serializerHandler_freeDeserializedMsg(pubsub_serializer_handler_t* handler, uint32_t msgId, void* msg) {
+    celix_status_t status = CELIX_SUCCESS;
+    celixThreadRwlock_readLock(&handler->lock);
+    pubsub_serialization_service_entry_t* entry = findEntry(handler, msgId);
+    if (entry != NULL) {
+        entry->svc->freeDeserializedMsg(entry->svc->handle, msg);
+    } else {
+        status = CELIX_ILLEGAL_ARGUMENT;
+        L_ERROR("Cannot find message serialization service for msg id %u.", msgId);
+    }
+    celixThreadRwlock_unlock(&handler->lock);
+    return status;
+}
+
+bool pubsub_serializerHandler_supportMsg(pubsub_serializer_handler_t* handler, uint32_t msgId, int serializedMajorVersion, int serializedMinorVersion) {

Review comment:
       Does this check if a message can be (de)serialized? If so -> isMessageSupported. If not, what does it do?

##########
File path: bundles/pubsub/pubsub_serializer_avrobin/src/pubsub_avrobin_serialization_provider.c
##########
@@ -0,0 +1,120 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#include "pubsub_avrobin_serialization_provider.h"
+
+#include <stdlib.h>
+#include <stdarg.h>
+#include <string.h>
+#include <assert.h>
+
+#include "avrobin_serializer.h"
+#include "dyn_message.h"
+#include "celix_log_helper.h"
+#include "pubsub_message_serialization_service.h"
+
+static void dfi_log(void *handle, int level, const char *file, int line, const char *msg, ...) {
+    va_list ap;
+    celix_log_helper_t *log = handle;
+    char *logStr = NULL;
+    va_start(ap, msg);
+    vasprintf(&logStr, msg, ap);
+    va_end(ap);
+    celix_logHelper_log(log, level, "FILE:%s, LINE:%i, MSG:%s", file, line, logStr);
+    free(logStr);
+}
+
+
+static celix_status_t pubsub_avrobinSerializationProvider_serialize(pubsub_serialization_entry_t* entry, const void* msg, struct iovec** output, size_t* outputIovLen) {

Review comment:
       Why outputIovLen? Not even used?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [celix] Oipo commented on pull request #223: Feature/pubsub custom serializers

Posted by GitBox <gi...@apache.org>.
Oipo commented on pull request #223:
URL: https://github.com/apache/celix/pull/223#issuecomment-626291859


   If you change `pubsub_utils.c:175` to `if (dir && stat(dir, &s) == 0) {`, does that fix the segfault for you?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [celix] pnoltes commented on a change in pull request #223: Feature/pubsub custom serializers

Posted by GitBox <gi...@apache.org>.
pnoltes commented on a change in pull request #223:
URL: https://github.com/apache/celix/pull/223#discussion_r426400600



##########
File path: bundles/pubsub/pubsub_utils/src/pubsub_serializer_handler.c
##########
@@ -0,0 +1,360 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+
+#include "pubsub_serializer_handler.h"
+
+#include <string.h>
+
+#include "celix_version.h"
+#include "pubsub_message_serialization_service.h"
+#include "celix_log_helper.h"
+
+#define L_DEBUG(...) \
+    celix_logHelper_debug(handler->logHelper, __VA_ARGS__)
+#define L_INFO(...) \
+    celix_logHelper_info(handler->logHelper, __VA_ARGS__)
+#define L_WARN(...) \
+    celix_logHelper_warning(handler->logHelper, __VA_ARGS__)
+#define L_ERROR(...) \
+    celix_logHelper_error(handler->logHelper, __VA_ARGS__)
+
+typedef struct pubsub_serialization_service_entry {
+    long svcId;
+    const celix_properties_t *properties;
+    uint32_t msgId;
+    celix_version_t* msgVersion;
+    char* msgFqn;
+    pubsub_message_serialization_service_t* svc;
+} pubsub_serialization_service_entry_t;
+
+struct pubsub_serializer_handler {
+    celix_bundle_context_t* ctx;
+    bool backwardCompatible;
+    long serializationSvcTrackerId;
+    celix_log_helper_t *logHelper;
+
+    celix_thread_rwlock_t lock;
+    hash_map_t *serializationServices; //key = msg id, value = sorted array list with pubsub_serialization_service_entry_t*
+};
+
+static void addSvc(void *handle, void* svc, const celix_properties_t *props) {
+    pubsub_serializer_handler_t* handler = handle;
+    pubsub_message_serialization_service_t* serSvc = svc;
+    pubsub_serializerHandler_addSerializationService(handler, serSvc, props);
+}
+
+static void remSvc(void *handle, void* svc, const celix_properties_t *props) {
+    pubsub_serializer_handler_t* handler = handle;
+    pubsub_message_serialization_service_t* serSvc = svc;
+    pubsub_serializerHandler_removeSerializationService(handler, serSvc, props);
+}
+
+int compareEntries(const void *a, const void *b) {
+    const pubsub_serialization_service_entry_t* aEntry = a;
+    const pubsub_serialization_service_entry_t* bEntry = b;
+
+    long servIdA = celix_properties_getAsLong(aEntry->properties, OSGI_FRAMEWORK_SERVICE_ID, 0);
+    long servIdB = celix_properties_getAsLong(bEntry->properties, OSGI_FRAMEWORK_SERVICE_ID, 0);
+
+    long servRankingA = celix_properties_getAsLong(aEntry->properties, OSGI_FRAMEWORK_SERVICE_RANKING, 0);
+    long servRankingB = celix_properties_getAsLong(bEntry->properties, OSGI_FRAMEWORK_SERVICE_RANKING, 0);
+
+    return utils_compareServiceIdsAndRanking(servIdA, servRankingA, servIdB, servRankingB);
+}
+
+static pubsub_serialization_service_entry_t* findEntry(pubsub_serializer_handler_t* handler, uint32_t msgId) {
+    //NOTE assumes mutex is locked
+    celix_array_list_t* entries = hashMap_get(handler->serializationServices, (void*)(uintptr_t)msgId);
+    if (entries != NULL) {
+        return celix_arrayList_get(entries, 0); //NOTE if entries not null, always at least 1 entry
+    }
+    return NULL;
+}
+
+static bool isCompatible(pubsub_serializer_handler_t* handler, pubsub_serialization_service_entry_t* entry, int serializedMajorVersion, int serializedMinorVersion) {
+    bool compatible = false;
+    if (handler->backwardCompatible) {
+        compatible = celix_version_isUserCompatible(entry->msgVersion, serializedMajorVersion, serializedMinorVersion);
+    } else {
+        int major = celix_version_getMajor(entry->msgVersion);
+        int minor = celix_version_getMinor(entry->msgVersion);
+        compatible = major == serializedMajorVersion && minor == serializedMinorVersion;
+    }
+    return compatible;
+}
+
+static const char* getMsgFqn(pubsub_serializer_handler_t* handler, uint32_t msgId) {
+    //NOTE assumes mutex is locked
+    const char *result = NULL;
+    celix_array_list_t* entries = hashMap_get(handler->serializationServices, (void*)(uintptr_t)msgId);
+    if (entries != NULL) {
+        pubsub_serialization_service_entry_t *entry = celix_arrayList_get(entries, 0); //NOTE if an entries exists, there is at least 1 entry.
+        result = entry->msgFqn;
+    }
+    return result;
+}
+
+pubsub_serializer_handler_t* pubsub_serializerHandler_create(celix_bundle_context_t* ctx, const char* serializerType, bool backwardCompatible) {
+    pubsub_serializer_handler_t* handler = calloc(1, sizeof(*handler));
+    handler->ctx = ctx;
+    handler->backwardCompatible = backwardCompatible;
+
+    handler->logHelper = celix_logHelper_create(ctx, "celix_pubsub_serialization_handler");
+
+    celixThreadRwlock_create(&handler->lock, NULL);
+    handler->serializationServices = hashMap_create(NULL, NULL, NULL, NULL);
+
+    char *filter = NULL;
+    asprintf(&filter, "(%s=%s)", PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, serializerType);
+    celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
+    opts.filter.serviceName = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_NAME;
+    opts.filter.versionRange = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_RANGE;
+    opts.filter.filter = filter;
+    opts.callbackHandle = handler;
+    opts.addWithProperties = addSvc;
+    opts.removeWithProperties = remSvc;
+    handler->serializationSvcTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
+    free(filter);
+
+    return handler;
+}
+
+
+void pubsub_serializerHandler_destroy(pubsub_serializer_handler_t* handler) {
+    if (handler != NULL) {
+        celix_bundleContext_stopTracker(handler->ctx, handler->serializationSvcTrackerId);
+        celixThreadRwlock_destroy(&handler->lock);
+        hash_map_iterator_t iter = hashMapIterator_construct(handler->serializationServices);
+        while (hashMapIterator_hasNext(&iter)) {
+            celix_array_list_t *entries = hashMapIterator_nextValue(&iter);
+            for (int i = 0; i < celix_arrayList_size(entries); ++i) {
+                pubsub_serialization_service_entry_t* entry = celix_arrayList_get(entries, i);
+                free(entry->msgFqn);
+                celix_version_destroy(entry->msgVersion);
+                free(entry);
+            }
+            celix_arrayList_destroy(entries);
+        }
+        hashMap_destroy(handler->serializationServices, false, false);
+        celix_logHelper_destroy(handler->logHelper);
+        free(handler);
+    }
+}
+
+void pubsub_serializerHandler_addSerializationService(pubsub_serializer_handler_t* handler, pubsub_message_serialization_service_t* svc, const celix_properties_t* svcProperties) {
+    long svcId = celix_properties_getAsLong(svcProperties, OSGI_FRAMEWORK_SERVICE_ID, -1L);
+    const char *msgFqn = celix_properties_get(svcProperties, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_FQN_PROPERTY, NULL);
+    const char *version = celix_properties_get(svcProperties, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_VERSION_PROPERTY, "0.0.0");
+    uint32_t msgId = (uint32_t)celix_properties_getAsLong(svcProperties, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_ID_PROPERTY, 0L);
+
+    if (msgId == 0) {
+        msgId = celix_utils_stringHash(msgFqn);
+    }
+
+    celix_version_t* msgVersion = celix_version_createVersionFromString(version);
+    if (msgVersion == NULL) {
+        L_ERROR("%s service has an invalid %s property. value is '%s'", PUBSUB_MESSAGE_SERIALIZATION_SERVICE_NAME, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_VERSION_PROPERTY, msgVersion);
+        return;
+    }
+
+    celixThreadRwlock_writeLock(&handler->lock);
+
+    pubsub_serialization_service_entry_t* existingEntry = findEntry(handler, msgId);
+
+    bool valid = true;
+    if (existingEntry != NULL && strncmp(existingEntry->msgFqn, msgFqn, 1024*1024) != 0) {
+        L_ERROR("Msg id clash. Registered serialization service with msg id %d and msg fqn '%s' clashes with an existing serialization service using the same msg id and msg fqn '%s'. Ignoring serialization service.", msgId, msgFqn, existingEntry->msgFqn);
+        valid = false;
+    }
+
+    if (existingEntry != NULL && celix_version_compareTo(existingEntry->msgVersion, msgVersion) != 0) {
+        char* existingVersion = celix_version_toString(existingEntry->msgVersion);
+        L_ERROR("Mismatched message versions. Registered serialization service with msg '%s' with version %s, has a different version than an existing serialization service with version '%s'. Ignoring serialization service.", msgFqn, version, existingVersion);
+        free(existingVersion);
+        valid = false;
+    }
+
+    if (valid) {
+        celix_array_list_t *entries = hashMap_get(handler->serializationServices, (void *) (uintptr_t) msgId);
+        if (entries == NULL) {
+            entries = celix_arrayList_create();
+        }
+        pubsub_serialization_service_entry_t *entry = calloc(1, sizeof(*entry));
+        entry->svcId = svcId;
+        entry->properties = svcProperties;
+        entry->msgFqn = celix_utils_strdup(msgFqn);
+        entry->msgId = msgId;
+        entry->msgVersion = msgVersion;
+        entry->svc = svc;
+        celix_arrayList_add(entries, entry);
+        celix_arrayList_sort(entries, compareEntries);
+
+        hashMap_put(handler->serializationServices, (void *) (uintptr_t) msgId, entries);
+    } else {
+        celix_version_destroy(msgVersion);
+    }
+    celixThreadRwlock_unlock(&handler->lock);
+}
+
+void pubsub_serializerHandler_removeSerializationService(pubsub_serializer_handler_t* handler, pubsub_message_serialization_service_t* svc, const celix_properties_t* svcProperties) {
+    long svcId = celix_properties_getAsLong(svcProperties, OSGI_FRAMEWORK_SERVICE_ID, -1L);
+    const char *msgFqn = celix_properties_get(svcProperties, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_FQN_PROPERTY, NULL);
+    uint32_t msgId = (uint32_t)celix_properties_getAsLong(svcProperties, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_ID_PROPERTY, 0L);
+    if (msgId == 0) {
+        msgId = celix_utils_stringHash(msgFqn);
+    }
+
+    celixThreadRwlock_writeLock(&handler->lock);
+    celix_array_list_t* entries = hashMap_get(handler->serializationServices, (void*)(uintptr_t)msgId);
+    if (entries != NULL) {
+        pubsub_serialization_service_entry_t *found = NULL;
+        for (int i = 0; i < celix_arrayList_size(entries); ++i) {
+            pubsub_serialization_service_entry_t *entry = celix_arrayList_get(entries, i);
+            if (entry->svcId == svcId) {
+                found = entry;
+                celix_arrayList_removeAt(entries, i);
+                celix_arrayList_sort(entries, compareEntries);
+                break;
+            }
+        }
+        if (found != NULL) {
+            free(found->msgFqn);
+            celix_version_destroy(found->msgVersion);
+            free(found);
+        }
+        if (celix_arrayList_size(entries) == 0) {
+            hashMap_remove(handler->serializationServices, (void*)(uintptr_t)msgId);
+            celix_arrayList_destroy(entries);
+        }
+    }
+    celixThreadRwlock_unlock(&handler->lock);
+}
+
+celix_status_t pubsub_serializerHandler_serialize(pubsub_serializer_handler_t* handler, uint32_t msgId, const void* input, struct iovec** output, size_t* outputIovLen) {

Review comment:
       Because struct iovec** output is an array iovecs. I added the message serialization service doc as used in the pubsub serialization handler doc, maybe this makes it more clearer.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [celix] Oipo edited a comment on pull request #223: Feature/pubsub custom serializers

Posted by GitBox <gi...@apache.org>.
Oipo edited a comment on pull request #223:
URL: https://github.com/apache/celix/pull/223#issuecomment-626291859


   If you change `pubsub_utils.c:175` to `if (dir && stat(dir, &s) == 0) {`, does that fix the segfault in PubSubAvrobinSerializationProviderTestSuite for you?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [celix] pnoltes commented on a change in pull request #223: Feature/pubsub custom serializers

Posted by GitBox <gi...@apache.org>.
pnoltes commented on a change in pull request #223:
URL: https://github.com/apache/celix/pull/223#discussion_r426414542



##########
File path: bundles/pubsub/pubsub_serializer_avrobin/src/pubsub_avrobin_serialization_provider.c
##########
@@ -0,0 +1,120 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#include "pubsub_avrobin_serialization_provider.h"
+
+#include <stdlib.h>
+#include <stdarg.h>
+#include <string.h>
+#include <assert.h>
+
+#include "avrobin_serializer.h"
+#include "dyn_message.h"
+#include "celix_log_helper.h"
+#include "pubsub_message_serialization_service.h"
+
+static void dfi_log(void *handle, int level, const char *file, int line, const char *msg, ...) {
+    va_list ap;
+    celix_log_helper_t *log = handle;
+    char *logStr = NULL;
+    va_start(ap, msg);
+    vasprintf(&logStr, msg, ap);
+    va_end(ap);
+    celix_logHelper_log(log, level, "FILE:%s, LINE:%i, MSG:%s", file, line, logStr);
+    free(logStr);
+}
+
+
+static celix_status_t pubsub_avrobinSerializationProvider_serialize(pubsub_serialization_entry_t* entry, const void* msg, struct iovec** output, size_t* outputIovLen) {

Review comment:
       Yes, for the current serialization (avrobin/json) outputIovLen is not used




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [celix] pnoltes commented on a change in pull request #223: Feature/pubsub custom serializers

Posted by GitBox <gi...@apache.org>.
pnoltes commented on a change in pull request #223:
URL: https://github.com/apache/celix/pull/223#discussion_r426411883



##########
File path: bundles/pubsub/pubsub_utils/src/pubsub_serialization_provider.c
##########
@@ -0,0 +1,668 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#include "pubsub_serialization_provider.h"
+
+#include <stdlib.h>
+#include <stdarg.h>
+#include <dirent.h>
+#include <string.h>
+
+#include "celix_constants.h"
+#include "dyn_function.h"
+#include "celix_version.h"
+#include "celix_utils.h"
+#include "dyn_message.h"
+#include "pubsub_utils.h"
+#include "celix_log_helper.h"
+#include "pubsub_message_serialization_service.h"
+#include "celix_shell_command.h"
+
+#define MAX_PATH_LEN    1024
+
+typedef enum
+{
+    FIT_INVALID = 0,
+    FIT_DESCRIPTOR = 1,
+    FIT_AVPR = 2
+} descriptor_type_e;
+
+#define L_DEBUG(...) \
+    celix_logHelper_debug(provider->logHelper, __VA_ARGS__)
+#define L_INFO(...) \
+    celix_logHelper_info(provider->logHelper, __VA_ARGS__)
+#define L_WARN(...) \
+    celix_logHelper_warning(provider->logHelper, __VA_ARGS__)
+#define L_ERROR(...) \
+    celix_logHelper_error(provider->logHelper, __VA_ARGS__)
+
+
+struct pubsub_serialization_provider {
+    celix_bundle_context_t *ctx;
+    celix_log_helper_t *logHelper;
+    char* serializationType;
+
+    //serialization callbacks
+    celix_status_t (*serialize)(pubsub_serialization_entry_t* entry, const void* msg, struct iovec** output, size_t* outputIovLen);
+    void (*freeSerializeMsg)(pubsub_serialization_entry_t* entry, struct iovec* input, size_t inputIovLen);
+    celix_status_t (*deserialize)(pubsub_serialization_entry_t* entry, const struct iovec* input, size_t inputIovLen __attribute__((unused)), void **out);
+    void (*freeDeserializeMsg)(pubsub_serialization_entry_t* entry, void *msg);
+
+    //updated serialization services
+    long bundleTrackerId;
+
+    pubsub_message_serialization_marker_t markerSvc;
+    long serializationMarkerSvcId;
+
+    celix_shell_command_t cmdSvc;
+    long cmdSvcId;
+
+    celix_thread_mutex_t mutex; //protects below
+    celix_array_list_t *serializationSvcEntries; //key = pubsub_serialization_entry;
+};
+
+static void dfi_log(void *handle, int level, const char *file, int line, const char *msg, ...) {
+    (void)level;
+    va_list ap;
+    pubsub_serialization_provider_t *provider = handle;
+    char *logStr = NULL;
+    va_start(ap, msg);
+    vasprintf(&logStr, msg, ap);
+    va_end(ap);
+    celix_logHelper_log(provider->logHelper, CELIX_LOG_LEVEL_WARNING, "FILE:%s, LINE:%i, MSG:%s", file, line, logStr);
+    free(logStr);
+}
+
+static descriptor_type_e getDescriptorType(const char* filename) {
+    if (strstr(filename, ".descriptor")) {
+        return FIT_DESCRIPTOR;
+    }
+    else if (strstr(filename, ".properties")) {
+        return FIT_AVPR;
+    }
+    else {
+        return FIT_INVALID;
+    }
+}
+
+static bool readPropertiesFile(pubsub_serialization_provider_t* provider, const char* properties_file_name, const char* root, char* avpr_fqn, char* path) {

Review comment:
       renamed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [celix] pnoltes commented on a change in pull request #223: Feature/pubsub custom serializers

Posted by GitBox <gi...@apache.org>.
pnoltes commented on a change in pull request #223:
URL: https://github.com/apache/celix/pull/223#discussion_r426428390



##########
File path: bundles/pubsub/pubsub_utils/src/pubsub_serializer_handler.c
##########
@@ -0,0 +1,360 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+
+#include "pubsub_serializer_handler.h"
+
+#include <string.h>
+
+#include "celix_version.h"
+#include "pubsub_message_serialization_service.h"
+#include "celix_log_helper.h"
+
+#define L_DEBUG(...) \
+    celix_logHelper_debug(handler->logHelper, __VA_ARGS__)
+#define L_INFO(...) \
+    celix_logHelper_info(handler->logHelper, __VA_ARGS__)
+#define L_WARN(...) \
+    celix_logHelper_warning(handler->logHelper, __VA_ARGS__)
+#define L_ERROR(...) \
+    celix_logHelper_error(handler->logHelper, __VA_ARGS__)
+
+typedef struct pubsub_serialization_service_entry {
+    long svcId;
+    const celix_properties_t *properties;
+    uint32_t msgId;
+    celix_version_t* msgVersion;
+    char* msgFqn;
+    pubsub_message_serialization_service_t* svc;
+} pubsub_serialization_service_entry_t;
+
+struct pubsub_serializer_handler {
+    celix_bundle_context_t* ctx;
+    bool backwardCompatible;
+    long serializationSvcTrackerId;
+    celix_log_helper_t *logHelper;
+
+    celix_thread_rwlock_t lock;
+    hash_map_t *serializationServices; //key = msg id, value = sorted array list with pubsub_serialization_service_entry_t*
+};
+
+static void addSvc(void *handle, void* svc, const celix_properties_t *props) {
+    pubsub_serializer_handler_t* handler = handle;
+    pubsub_message_serialization_service_t* serSvc = svc;
+    pubsub_serializerHandler_addSerializationService(handler, serSvc, props);
+}
+
+static void remSvc(void *handle, void* svc, const celix_properties_t *props) {
+    pubsub_serializer_handler_t* handler = handle;
+    pubsub_message_serialization_service_t* serSvc = svc;
+    pubsub_serializerHandler_removeSerializationService(handler, serSvc, props);
+}
+
+int compareEntries(const void *a, const void *b) {
+    const pubsub_serialization_service_entry_t* aEntry = a;
+    const pubsub_serialization_service_entry_t* bEntry = b;
+
+    long servIdA = celix_properties_getAsLong(aEntry->properties, OSGI_FRAMEWORK_SERVICE_ID, 0);
+    long servIdB = celix_properties_getAsLong(bEntry->properties, OSGI_FRAMEWORK_SERVICE_ID, 0);
+
+    long servRankingA = celix_properties_getAsLong(aEntry->properties, OSGI_FRAMEWORK_SERVICE_RANKING, 0);
+    long servRankingB = celix_properties_getAsLong(bEntry->properties, OSGI_FRAMEWORK_SERVICE_RANKING, 0);
+
+    return utils_compareServiceIdsAndRanking(servIdA, servRankingA, servIdB, servRankingB);
+}
+
+static pubsub_serialization_service_entry_t* findEntry(pubsub_serializer_handler_t* handler, uint32_t msgId) {
+    //NOTE assumes mutex is locked
+    celix_array_list_t* entries = hashMap_get(handler->serializationServices, (void*)(uintptr_t)msgId);
+    if (entries != NULL) {
+        return celix_arrayList_get(entries, 0); //NOTE if entries not null, always at least 1 entry
+    }
+    return NULL;
+}
+
+static bool isCompatible(pubsub_serializer_handler_t* handler, pubsub_serialization_service_entry_t* entry, int serializedMajorVersion, int serializedMinorVersion) {
+    bool compatible = false;
+    if (handler->backwardCompatible) {
+        compatible = celix_version_isUserCompatible(entry->msgVersion, serializedMajorVersion, serializedMinorVersion);
+    } else {
+        int major = celix_version_getMajor(entry->msgVersion);
+        int minor = celix_version_getMinor(entry->msgVersion);
+        compatible = major == serializedMajorVersion && minor == serializedMinorVersion;

Review comment:
       There is now :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [celix] pnoltes commented on a change in pull request #223: Feature/pubsub custom serializers

Posted by GitBox <gi...@apache.org>.
pnoltes commented on a change in pull request #223:
URL: https://github.com/apache/celix/pull/223#discussion_r426398791



##########
File path: bundles/pubsub/pubsub_utils/src/pubsub_serializer_handler.c
##########
@@ -0,0 +1,360 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+
+#include "pubsub_serializer_handler.h"
+
+#include <string.h>
+
+#include "celix_version.h"
+#include "pubsub_message_serialization_service.h"
+#include "celix_log_helper.h"
+
+#define L_DEBUG(...) \
+    celix_logHelper_debug(handler->logHelper, __VA_ARGS__)
+#define L_INFO(...) \
+    celix_logHelper_info(handler->logHelper, __VA_ARGS__)
+#define L_WARN(...) \
+    celix_logHelper_warning(handler->logHelper, __VA_ARGS__)
+#define L_ERROR(...) \
+    celix_logHelper_error(handler->logHelper, __VA_ARGS__)
+
+typedef struct pubsub_serialization_service_entry {
+    long svcId;
+    const celix_properties_t *properties;
+    uint32_t msgId;
+    celix_version_t* msgVersion;
+    char* msgFqn;
+    pubsub_message_serialization_service_t* svc;
+} pubsub_serialization_service_entry_t;
+
+struct pubsub_serializer_handler {
+    celix_bundle_context_t* ctx;
+    bool backwardCompatible;
+    long serializationSvcTrackerId;
+    celix_log_helper_t *logHelper;
+
+    celix_thread_rwlock_t lock;
+    hash_map_t *serializationServices; //key = msg id, value = sorted array list with pubsub_serialization_service_entry_t*
+};
+
+static void addSvc(void *handle, void* svc, const celix_properties_t *props) {

Review comment:
       renamec




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [celix] pnoltes commented on a change in pull request #223: Feature/pubsub custom serializers

Posted by GitBox <gi...@apache.org>.
pnoltes commented on a change in pull request #223:
URL: https://github.com/apache/celix/pull/223#discussion_r426402517



##########
File path: bundles/pubsub/pubsub_utils/src/pubsub_serializer_handler.c
##########
@@ -0,0 +1,360 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+
+#include "pubsub_serializer_handler.h"
+
+#include <string.h>
+
+#include "celix_version.h"
+#include "pubsub_message_serialization_service.h"
+#include "celix_log_helper.h"
+
+#define L_DEBUG(...) \
+    celix_logHelper_debug(handler->logHelper, __VA_ARGS__)
+#define L_INFO(...) \
+    celix_logHelper_info(handler->logHelper, __VA_ARGS__)
+#define L_WARN(...) \
+    celix_logHelper_warning(handler->logHelper, __VA_ARGS__)
+#define L_ERROR(...) \
+    celix_logHelper_error(handler->logHelper, __VA_ARGS__)
+
+typedef struct pubsub_serialization_service_entry {
+    long svcId;
+    const celix_properties_t *properties;
+    uint32_t msgId;
+    celix_version_t* msgVersion;
+    char* msgFqn;
+    pubsub_message_serialization_service_t* svc;
+} pubsub_serialization_service_entry_t;
+
+struct pubsub_serializer_handler {
+    celix_bundle_context_t* ctx;
+    bool backwardCompatible;
+    long serializationSvcTrackerId;
+    celix_log_helper_t *logHelper;
+
+    celix_thread_rwlock_t lock;
+    hash_map_t *serializationServices; //key = msg id, value = sorted array list with pubsub_serialization_service_entry_t*
+};
+
+static void addSvc(void *handle, void* svc, const celix_properties_t *props) {
+    pubsub_serializer_handler_t* handler = handle;
+    pubsub_message_serialization_service_t* serSvc = svc;
+    pubsub_serializerHandler_addSerializationService(handler, serSvc, props);
+}
+
+static void remSvc(void *handle, void* svc, const celix_properties_t *props) {
+    pubsub_serializer_handler_t* handler = handle;
+    pubsub_message_serialization_service_t* serSvc = svc;
+    pubsub_serializerHandler_removeSerializationService(handler, serSvc, props);
+}
+
+int compareEntries(const void *a, const void *b) {
+    const pubsub_serialization_service_entry_t* aEntry = a;
+    const pubsub_serialization_service_entry_t* bEntry = b;
+
+    long servIdA = celix_properties_getAsLong(aEntry->properties, OSGI_FRAMEWORK_SERVICE_ID, 0);
+    long servIdB = celix_properties_getAsLong(bEntry->properties, OSGI_FRAMEWORK_SERVICE_ID, 0);
+
+    long servRankingA = celix_properties_getAsLong(aEntry->properties, OSGI_FRAMEWORK_SERVICE_RANKING, 0);
+    long servRankingB = celix_properties_getAsLong(bEntry->properties, OSGI_FRAMEWORK_SERVICE_RANKING, 0);
+
+    return utils_compareServiceIdsAndRanking(servIdA, servRankingA, servIdB, servRankingB);
+}
+
+static pubsub_serialization_service_entry_t* findEntry(pubsub_serializer_handler_t* handler, uint32_t msgId) {
+    //NOTE assumes mutex is locked
+    celix_array_list_t* entries = hashMap_get(handler->serializationServices, (void*)(uintptr_t)msgId);
+    if (entries != NULL) {
+        return celix_arrayList_get(entries, 0); //NOTE if entries not null, always at least 1 entry
+    }
+    return NULL;
+}
+
+static bool isCompatible(pubsub_serializer_handler_t* handler, pubsub_serialization_service_entry_t* entry, int serializedMajorVersion, int serializedMinorVersion) {
+    bool compatible = false;
+    if (handler->backwardCompatible) {
+        compatible = celix_version_isUserCompatible(entry->msgVersion, serializedMajorVersion, serializedMinorVersion);
+    } else {
+        int major = celix_version_getMajor(entry->msgVersion);
+        int minor = celix_version_getMinor(entry->msgVersion);
+        compatible = major == serializedMajorVersion && minor == serializedMinorVersion;
+    }
+    return compatible;
+}
+
+static const char* getMsgFqn(pubsub_serializer_handler_t* handler, uint32_t msgId) {
+    //NOTE assumes mutex is locked
+    const char *result = NULL;
+    celix_array_list_t* entries = hashMap_get(handler->serializationServices, (void*)(uintptr_t)msgId);
+    if (entries != NULL) {
+        pubsub_serialization_service_entry_t *entry = celix_arrayList_get(entries, 0); //NOTE if an entries exists, there is at least 1 entry.
+        result = entry->msgFqn;
+    }
+    return result;
+}
+
+pubsub_serializer_handler_t* pubsub_serializerHandler_create(celix_bundle_context_t* ctx, const char* serializerType, bool backwardCompatible) {
+    pubsub_serializer_handler_t* handler = calloc(1, sizeof(*handler));
+    handler->ctx = ctx;
+    handler->backwardCompatible = backwardCompatible;
+
+    handler->logHelper = celix_logHelper_create(ctx, "celix_pubsub_serialization_handler");
+
+    celixThreadRwlock_create(&handler->lock, NULL);
+    handler->serializationServices = hashMap_create(NULL, NULL, NULL, NULL);
+
+    char *filter = NULL;
+    asprintf(&filter, "(%s=%s)", PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, serializerType);
+    celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
+    opts.filter.serviceName = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_NAME;
+    opts.filter.versionRange = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_RANGE;
+    opts.filter.filter = filter;
+    opts.callbackHandle = handler;
+    opts.addWithProperties = addSvc;
+    opts.removeWithProperties = remSvc;
+    handler->serializationSvcTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
+    free(filter);
+
+    return handler;
+}
+
+
+void pubsub_serializerHandler_destroy(pubsub_serializer_handler_t* handler) {
+    if (handler != NULL) {
+        celix_bundleContext_stopTracker(handler->ctx, handler->serializationSvcTrackerId);
+        celixThreadRwlock_destroy(&handler->lock);
+        hash_map_iterator_t iter = hashMapIterator_construct(handler->serializationServices);
+        while (hashMapIterator_hasNext(&iter)) {
+            celix_array_list_t *entries = hashMapIterator_nextValue(&iter);
+            for (int i = 0; i < celix_arrayList_size(entries); ++i) {
+                pubsub_serialization_service_entry_t* entry = celix_arrayList_get(entries, i);
+                free(entry->msgFqn);
+                celix_version_destroy(entry->msgVersion);
+                free(entry);
+            }
+            celix_arrayList_destroy(entries);
+        }
+        hashMap_destroy(handler->serializationServices, false, false);
+        celix_logHelper_destroy(handler->logHelper);
+        free(handler);
+    }
+}
+
+void pubsub_serializerHandler_addSerializationService(pubsub_serializer_handler_t* handler, pubsub_message_serialization_service_t* svc, const celix_properties_t* svcProperties) {
+    long svcId = celix_properties_getAsLong(svcProperties, OSGI_FRAMEWORK_SERVICE_ID, -1L);
+    const char *msgFqn = celix_properties_get(svcProperties, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_FQN_PROPERTY, NULL);
+    const char *version = celix_properties_get(svcProperties, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_VERSION_PROPERTY, "0.0.0");
+    uint32_t msgId = (uint32_t)celix_properties_getAsLong(svcProperties, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_ID_PROPERTY, 0L);
+
+    if (msgId == 0) {
+        msgId = celix_utils_stringHash(msgFqn);
+    }
+
+    celix_version_t* msgVersion = celix_version_createVersionFromString(version);
+    if (msgVersion == NULL) {
+        L_ERROR("%s service has an invalid %s property. value is '%s'", PUBSUB_MESSAGE_SERIALIZATION_SERVICE_NAME, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_VERSION_PROPERTY, msgVersion);
+        return;
+    }
+
+    celixThreadRwlock_writeLock(&handler->lock);
+
+    pubsub_serialization_service_entry_t* existingEntry = findEntry(handler, msgId);
+
+    bool valid = true;
+    if (existingEntry != NULL && strncmp(existingEntry->msgFqn, msgFqn, 1024*1024) != 0) {
+        L_ERROR("Msg id clash. Registered serialization service with msg id %d and msg fqn '%s' clashes with an existing serialization service using the same msg id and msg fqn '%s'. Ignoring serialization service.", msgId, msgFqn, existingEntry->msgFqn);
+        valid = false;
+    }
+
+    if (existingEntry != NULL && celix_version_compareTo(existingEntry->msgVersion, msgVersion) != 0) {
+        char* existingVersion = celix_version_toString(existingEntry->msgVersion);
+        L_ERROR("Mismatched message versions. Registered serialization service with msg '%s' with version %s, has a different version than an existing serialization service with version '%s'. Ignoring serialization service.", msgFqn, version, existingVersion);
+        free(existingVersion);
+        valid = false;
+    }
+
+    if (valid) {
+        celix_array_list_t *entries = hashMap_get(handler->serializationServices, (void *) (uintptr_t) msgId);
+        if (entries == NULL) {
+            entries = celix_arrayList_create();
+        }
+        pubsub_serialization_service_entry_t *entry = calloc(1, sizeof(*entry));
+        entry->svcId = svcId;
+        entry->properties = svcProperties;
+        entry->msgFqn = celix_utils_strdup(msgFqn);
+        entry->msgId = msgId;
+        entry->msgVersion = msgVersion;
+        entry->svc = svc;
+        celix_arrayList_add(entries, entry);
+        celix_arrayList_sort(entries, compareEntries);
+
+        hashMap_put(handler->serializationServices, (void *) (uintptr_t) msgId, entries);
+    } else {
+        celix_version_destroy(msgVersion);
+    }
+    celixThreadRwlock_unlock(&handler->lock);
+}
+
+void pubsub_serializerHandler_removeSerializationService(pubsub_serializer_handler_t* handler, pubsub_message_serialization_service_t* svc, const celix_properties_t* svcProperties) {
+    long svcId = celix_properties_getAsLong(svcProperties, OSGI_FRAMEWORK_SERVICE_ID, -1L);
+    const char *msgFqn = celix_properties_get(svcProperties, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_FQN_PROPERTY, NULL);
+    uint32_t msgId = (uint32_t)celix_properties_getAsLong(svcProperties, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_ID_PROPERTY, 0L);
+    if (msgId == 0) {
+        msgId = celix_utils_stringHash(msgFqn);
+    }
+
+    celixThreadRwlock_writeLock(&handler->lock);
+    celix_array_list_t* entries = hashMap_get(handler->serializationServices, (void*)(uintptr_t)msgId);
+    if (entries != NULL) {
+        pubsub_serialization_service_entry_t *found = NULL;
+        for (int i = 0; i < celix_arrayList_size(entries); ++i) {
+            pubsub_serialization_service_entry_t *entry = celix_arrayList_get(entries, i);
+            if (entry->svcId == svcId) {
+                found = entry;
+                celix_arrayList_removeAt(entries, i);
+                celix_arrayList_sort(entries, compareEntries);
+                break;
+            }
+        }
+        if (found != NULL) {
+            free(found->msgFqn);
+            celix_version_destroy(found->msgVersion);
+            free(found);
+        }
+        if (celix_arrayList_size(entries) == 0) {
+            hashMap_remove(handler->serializationServices, (void*)(uintptr_t)msgId);
+            celix_arrayList_destroy(entries);
+        }
+    }
+    celixThreadRwlock_unlock(&handler->lock);
+}
+
+celix_status_t pubsub_serializerHandler_serialize(pubsub_serializer_handler_t* handler, uint32_t msgId, const void* input, struct iovec** output, size_t* outputIovLen) {
+    celix_status_t status;
+    celixThreadRwlock_readLock(&handler->lock);
+    pubsub_serialization_service_entry_t* entry = findEntry(handler, msgId);
+    if (entry != NULL) {
+        status = entry->svc->serialize(entry->svc->handle, input, output, outputIovLen);
+    } else {
+        status = CELIX_ILLEGAL_ARGUMENT;
+        L_ERROR("Cannot find message serialization service for msg id %u.", msgId);
+    }
+    celixThreadRwlock_unlock(&handler->lock);
+    return status;
+}
+
+celix_status_t pubsub_serializerHandler_freeSerializedMsg(pubsub_serializer_handler_t* handler, uint32_t msgId, struct iovec* input, size_t inputIovLen) {
+    celix_status_t status = CELIX_SUCCESS;
+    celixThreadRwlock_readLock(&handler->lock);
+    pubsub_serialization_service_entry_t* entry = findEntry(handler, msgId);
+    if (entry != NULL) {
+        entry->svc->freeSerializedMsg(entry->svc->handle, input, inputIovLen);
+    } else {
+        status = CELIX_ILLEGAL_ARGUMENT;
+        L_ERROR("Cannot find message serialization service for msg id %u.", msgId);
+    }
+    celixThreadRwlock_unlock(&handler->lock);
+    return status;
+
+}
+
+celix_status_t pubsub_serializerHandler_deserialize(pubsub_serializer_handler_t* handler, uint32_t msgId, int serializedMajorVersion, int serializedMinorVersion, const struct iovec* input, size_t inputIovLen, void** out) {
+    celix_status_t status;
+    celixThreadRwlock_readLock(&handler->lock);
+    pubsub_serialization_service_entry_t* entry = findEntry(handler, msgId);
+    bool compatible = false;
+    if (entry != NULL) {
+        compatible = isCompatible(handler, entry, serializedMajorVersion, serializedMinorVersion);
+        if (compatible) {
+            status = entry->svc->deserialize(entry->svc->handle, input, inputIovLen, out);
+        } else {
+            status = CELIX_ILLEGAL_ARGUMENT;
+            char *version = celix_version_toString(entry->msgVersion);
+            L_ERROR("Cannot deserialize for message %s version %s. The serialized input has a version of %d.%d.x and this is incompatible.", entry->msgFqn, version, serializedMajorVersion, serializedMinorVersion);
+            free(version);
+        }
+    } else {
+        status = CELIX_ILLEGAL_ARGUMENT;
+        L_ERROR("Cannot find message serialization service for msg id %u.", msgId);
+    }
+    celixThreadRwlock_unlock(&handler->lock);
+    return status;
+}
+
+celix_status_t pubsub_serializerHandler_freeDeserializedMsg(pubsub_serializer_handler_t* handler, uint32_t msgId, void* msg) {
+    celix_status_t status = CELIX_SUCCESS;
+    celixThreadRwlock_readLock(&handler->lock);
+    pubsub_serialization_service_entry_t* entry = findEntry(handler, msgId);
+    if (entry != NULL) {
+        entry->svc->freeDeserializedMsg(entry->svc->handle, msg);
+    } else {
+        status = CELIX_ILLEGAL_ARGUMENT;
+        L_ERROR("Cannot find message serialization service for msg id %u.", msgId);
+    }
+    celixThreadRwlock_unlock(&handler->lock);
+    return status;
+}
+
+bool pubsub_serializerHandler_supportMsg(pubsub_serializer_handler_t* handler, uint32_t msgId, int serializedMajorVersion, int serializedMinorVersion) {

Review comment:
       renamed and added doc




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org