You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@fineract.apache.org by ar...@apache.org on 2022/09/12 12:45:41 UTC

[fineract] branch develop updated: FINERACT-1694: Bulk event support

This is an automated email from the ASF dual-hosted git repository.

arnold pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/fineract.git


The following commit(s) were added to refs/heads/develop by this push:
     new 7558b09de FINERACT-1694: Bulk event support
7558b09de is described below

commit 7558b09deff71939a663e8d8211d013bd3638f67
Author: Arnold Galovics <ga...@gmail.com>
AuthorDate: Mon Sep 12 09:46:12 2022 +0200

    FINERACT-1694: Bulk event support
---
 .../src/main/avro/BulkMessageItemV1.avsc           | 32 +++++++++
 .../src/main/avro/BulkMessagePayloadV1.avsc        | 13 ++++
 .../src/main/avro/BulkMessageV1.avsc               | 42 ++++++++++++
 .../external/service/ExternalEventService.java     | 50 +++++++++++---
 .../BulkMessageItemFactory.java}                   | 29 +++++----
 .../external/service/message/MessageFactory.java   | 76 ++++++++++++++++++++++
 .../domain/BulkMessageData.java}                   | 20 +++---
 .../domain/MessageCategory.java}                   | 19 +++---
 .../domain/MessageData.java}                       | 18 ++---
 .../domain/MessageDataSchema.java}                 | 19 +++---
 .../domain/MessageId.java}                         | 18 ++---
 .../domain/MessageIdempotencyKey.java}             | 19 +++---
 .../domain/MessageSource.java}                     | 19 +++---
 .../domain/MessageType.java}                       | 19 +++---
 .../BusinessEventSerializerFactory.java            |  3 +-
 .../service/support/ByteBufferConverter.java       |  4 ++
 ...lientWritePlatformServiceJpaRepositoryImpl.java |  2 +
 .../external/service/ExternalEventServiceTest.java | 45 ++++++++++++-
 18 files changed, 342 insertions(+), 105 deletions(-)

diff --git a/fineract-avro-schemas/src/main/avro/BulkMessageItemV1.avsc b/fineract-avro-schemas/src/main/avro/BulkMessageItemV1.avsc
new file mode 100644
index 000000000..e094fe8b4
--- /dev/null
+++ b/fineract-avro-schemas/src/main/avro/BulkMessageItemV1.avsc
@@ -0,0 +1,32 @@
+{
+    "name": "BulkMessageItemV1",
+    "namespace": "org.apache.fineract.avro",
+    "type": "record",
+    "fields": [
+        {
+            "name": "id",
+            "doc": "The ID of the message to be sent",
+            "type": "int"
+        },
+        {
+            "name": "type",
+            "doc": "The type of event the payload refers to. For example LoanApprovedBusinessEvent",
+            "type": "string"
+        },
+        {
+            "name": "category",
+            "doc": "The category of event the payload refers to. For example LOAN",
+            "type": "string"
+        },
+        {
+            "name": "dataschema",
+            "doc": "The fully qualified name of the schema of the event payload. For example org.apache.fineract.avro.loan.v1.LoanAccountDataV1",
+            "type": "string"
+        },
+        {
+            "name": "data",
+            "doc": "The payload data serialized into Avro bytes",
+            "type": "bytes"
+        }
+    ]
+}
diff --git a/fineract-avro-schemas/src/main/avro/BulkMessagePayloadV1.avsc b/fineract-avro-schemas/src/main/avro/BulkMessagePayloadV1.avsc
new file mode 100644
index 000000000..451edbc5d
--- /dev/null
+++ b/fineract-avro-schemas/src/main/avro/BulkMessagePayloadV1.avsc
@@ -0,0 +1,13 @@
+{
+    "name": "BulkMessagePayloadV1",
+    "namespace": "org.apache.fineract.avro",
+    "type": "record",
+    "fields": [{
+        "name": "datas",
+        "doc": "The individual messages within this bulk message",
+        "type": {
+            "type": "array",
+            "items": "org.apache.fineract.avro.BulkMessageItemV1"
+        }
+    }]
+}
diff --git a/fineract-avro-schemas/src/main/avro/BulkMessageV1.avsc b/fineract-avro-schemas/src/main/avro/BulkMessageV1.avsc
new file mode 100644
index 000000000..f8ed90813
--- /dev/null
+++ b/fineract-avro-schemas/src/main/avro/BulkMessageV1.avsc
@@ -0,0 +1,42 @@
+{
+    "name": "BulkMessageV1",
+    "namespace": "org.apache.fineract.avro",
+    "type": "record",
+    "fields": [
+        {
+            "name": "id",
+            "doc": "The ID of the message to be sent",
+            "type": "int"
+        },
+        {
+            "name": "source",
+            "doc": "A unique identifier of the source service",
+            "type": "string"
+        },
+        {
+            "name": "type",
+            "doc": "The type of event the payload refers to. For example LoanApprovedBusinessEvent",
+            "type": "string"
+        },
+        {
+            "name": "createdAt",
+            "doc": "The UTC time of when the event has been raised; in ISO_LOCAL_DATE_TIME format. For example 2011-12-03T10:15:30",
+            "type": "string"
+        },
+        {
+            "name": "tenantId",
+            "doc": "The tenantId that the event has been sent from. For example default",
+            "type": "string"
+        },
+        {
+            "name": "idempotencyKey",
+            "doc": "The idempotency key for this particular event for consumer de-duplication",
+            "type": "string"
+        },
+        {
+            "name": "data",
+            "doc": "The payload for this bulk message",
+            "type": "org.apache.fineract.avro.BulkMessagePayloadV1"
+        }
+    ]
+}
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/ExternalEventService.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/ExternalEventService.java
index 3b56a0314..cede39a7a 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/ExternalEventService.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/ExternalEventService.java
@@ -19,15 +19,22 @@
 package org.apache.fineract.infrastructure.event.external.service;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 import javax.persistence.EntityManager;
 import javax.persistence.PersistenceContext;
 import lombok.RequiredArgsConstructor;
+import org.apache.fineract.avro.BulkMessageItemV1;
+import org.apache.fineract.avro.BulkMessagePayloadV1;
+import org.apache.fineract.infrastructure.event.business.domain.BulkBusinessEvent;
 import org.apache.fineract.infrastructure.event.business.domain.BusinessEvent;
 import org.apache.fineract.infrastructure.event.external.repository.ExternalEventRepository;
 import org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEvent;
 import org.apache.fineract.infrastructure.event.external.service.idempotency.ExternalEventIdempotencyKeyGenerator;
-import org.apache.fineract.infrastructure.event.external.service.serialization.BusinessEventSerializerFactory;
+import org.apache.fineract.infrastructure.event.external.service.message.BulkMessageItemFactory;
 import org.apache.fineract.infrastructure.event.external.service.serialization.serializer.BusinessEventSerializer;
+import org.apache.fineract.infrastructure.event.external.service.serialization.serializer.BusinessEventSerializerFactory;
+import org.apache.fineract.infrastructure.event.external.service.support.ByteBufferConverter;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Transactional;
 
@@ -39,6 +46,8 @@ public class ExternalEventService {
     private final ExternalEventRepository repository;
     private final ExternalEventIdempotencyKeyGenerator idempotencyKeyGenerator;
     private final BusinessEventSerializerFactory serializerFactory;
+    private final ByteBufferConverter byteBufferConverter;
+    private final BulkMessageItemFactory bulkMessageItemFactory;
 
     private EntityManager entityManager;
 
@@ -47,15 +56,14 @@ public class ExternalEventService {
             throw new IllegalArgumentException("event cannot be null");
         }
 
-        String eventType = event.getType();
-        String idempotencyKey = idempotencyKeyGenerator.generate(event);
         try {
-            BusinessEventSerializer serializer = serializerFactory.create(event);
-            String schema = serializer.getSupportedSchema().getName();
             flushChangesBeforeSerialization();
-            byte[] data = serializer.serialize(event);
-            ExternalEvent externalEvent = new ExternalEvent(eventType, schema, data, idempotencyKey);
-
+            ExternalEvent externalEvent;
+            if (event instanceof BulkBusinessEvent) {
+                externalEvent = handleBulkBusinessEvent((BulkBusinessEvent) event);
+            } else {
+                externalEvent = handleRegularBusinessEvent(event);
+            }
             repository.save(externalEvent);
         } catch (IOException e) {
             throw new RuntimeException("Error while serializing event " + event.getClass().getSimpleName(), e);
@@ -63,6 +71,32 @@ public class ExternalEventService {
 
     }
 
+    private ExternalEvent handleBulkBusinessEvent(BulkBusinessEvent bulkBusinessEvent) throws IOException {
+        List<BulkMessageItemV1> messages = new ArrayList<>();
+        List<BusinessEvent<?>> events = bulkBusinessEvent.get();
+        for (int i = 0; i < events.size(); i++) {
+            BusinessEvent<?> event = events.get(i);
+            int id = i + 1;
+            BulkMessageItemV1 message = bulkMessageItemFactory.createBulkMessageItem(id, event);
+            messages.add(message);
+        }
+        String idempotencyKey = idempotencyKeyGenerator.generate(bulkBusinessEvent);
+        BulkMessagePayloadV1 avroDto = new BulkMessagePayloadV1(messages);
+        byte[] data = byteBufferConverter.convert(avroDto.toByteBuffer());
+
+        return new ExternalEvent(bulkBusinessEvent.getType(), BulkMessagePayloadV1.class.getName(), data, idempotencyKey);
+    }
+
+    private <T> ExternalEvent handleRegularBusinessEvent(BusinessEvent<T> event) throws IOException {
+        String eventType = event.getType();
+        String idempotencyKey = idempotencyKeyGenerator.generate(event);
+        BusinessEventSerializer serializer = serializerFactory.create(event);
+        String schema = serializer.getSupportedSchema().getName();
+        byte[] data = serializer.serialize(event);
+
+        return new ExternalEvent(eventType, schema, data, idempotencyKey);
+    }
+
     private void flushChangesBeforeSerialization() {
         entityManager.flush();
     }
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/serialization/BusinessEventSerializerFactory.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/message/BulkMessageItemFactory.java
similarity index 51%
copy from fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/serialization/BusinessEventSerializerFactory.java
copy to fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/message/BulkMessageItemFactory.java
index 30fabca15..a01baa18a 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/serialization/BusinessEventSerializerFactory.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/message/BulkMessageItemFactory.java
@@ -16,27 +16,32 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.fineract.infrastructure.event.external.service.serialization;
+package org.apache.fineract.infrastructure.event.external.service.message;
 
-import java.util.List;
+import java.io.IOException;
+import java.nio.ByteBuffer;
 import lombok.RequiredArgsConstructor;
+import org.apache.fineract.avro.BulkMessageItemV1;
 import org.apache.fineract.infrastructure.event.business.domain.BusinessEvent;
 import org.apache.fineract.infrastructure.event.external.service.serialization.serializer.BusinessEventSerializer;
+import org.apache.fineract.infrastructure.event.external.service.serialization.serializer.BusinessEventSerializerFactory;
+import org.apache.fineract.infrastructure.event.external.service.support.ByteBufferConverter;
 import org.springframework.stereotype.Component;
 
 @Component
 @RequiredArgsConstructor
-public class BusinessEventSerializerFactory {
+public class BulkMessageItemFactory {
 
-    private final List<BusinessEventSerializer> serializers;
+    private final BusinessEventSerializerFactory serializerFactory;
+    private final ByteBufferConverter byteBufferConverter;
 
-    public <T> BusinessEventSerializer create(BusinessEvent<T> event) {
-        for (BusinessEventSerializer serializer : serializers) {
-            if (serializer.canSerialize(event)) {
-                return serializer;
-            }
-        }
-        throw new IllegalStateException("There's no serializer that's capable of serializing a " + event.getClass().getSimpleName());
+    public BulkMessageItemV1 createBulkMessageItem(int id, BusinessEvent<?> event) throws IOException {
+        BusinessEventSerializer eventSerializer = serializerFactory.create(event);
+        byte[] serializedContent = eventSerializer.serialize(event);
+        String type = event.getType();
+        String category = "nocategory"; // TODO: switch this to the actual category when implemented
+        String schema = eventSerializer.getSupportedSchema().getName();
+        ByteBuffer data = byteBufferConverter.convert(serializedContent);
+        return new BulkMessageItemV1(id, type, category, schema, data);
     }
-
 }
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/message/MessageFactory.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/message/MessageFactory.java
new file mode 100644
index 000000000..0701dcb24
--- /dev/null
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/message/MessageFactory.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.fineract.infrastructure.event.external.service.message;
+
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import org.apache.fineract.avro.BulkMessageV1;
+import org.apache.fineract.avro.MessageV1;
+import org.apache.fineract.infrastructure.core.service.DateUtils;
+import org.apache.fineract.infrastructure.core.service.ThreadLocalContextUtil;
+import org.apache.fineract.infrastructure.event.external.service.message.domain.BulkMessageData;
+import org.apache.fineract.infrastructure.event.external.service.message.domain.MessageCategory;
+import org.apache.fineract.infrastructure.event.external.service.message.domain.MessageData;
+import org.apache.fineract.infrastructure.event.external.service.message.domain.MessageDataSchema;
+import org.apache.fineract.infrastructure.event.external.service.message.domain.MessageId;
+import org.apache.fineract.infrastructure.event.external.service.message.domain.MessageIdempotencyKey;
+import org.apache.fineract.infrastructure.event.external.service.message.domain.MessageSource;
+import org.apache.fineract.infrastructure.event.external.service.message.domain.MessageType;
+import org.springframework.stereotype.Component;
+
+@Component
+public class MessageFactory {
+
+    public MessageV1 createMessage(MessageId id, MessageSource source, MessageType type, MessageCategory category,
+            MessageIdempotencyKey idempotencyKey, MessageDataSchema dataSchema, MessageData data) {
+        MessageV1 result = new MessageV1();
+        result.setId(id.getId());
+        result.setSource(source.getSource());
+        result.setType(type.getType());
+        result.setCategory(category.getCategory());
+        result.setCreatedAt(getMessageCreatedAt());
+        result.setTenantId(getTenantId());
+        result.setIdempotencyKey(idempotencyKey.getIdempotencyKey());
+        result.setDataschema(dataSchema.getDataSchema());
+        result.setData(data.getData());
+        return result;
+    }
+
+    public BulkMessageV1 createBulkMessage(MessageId id, MessageSource source, MessageType type, BulkMessageData data) {
+
+        BulkMessageV1 result = new BulkMessageV1();
+        result.setId(id.getId());
+        result.setSource(source.getSource());
+        result.setType(type.getType());
+        result.setCreatedAt(getMessageCreatedAt());
+        result.setTenantId(getTenantId());
+        result.setData(data.getData());
+        return result;
+    }
+
+    private String getTenantId() {
+        return ThreadLocalContextUtil.getTenant().getName();
+    }
+
+    private String getMessageCreatedAt() {
+        OffsetDateTime createdAt = DateUtils.getOffsetDateTimeOfTenant();
+        return createdAt.withOffsetSameInstant(ZoneOffset.UTC).toLocalDateTime().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME);
+    }
+}
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/support/ByteBufferConverter.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/message/domain/BulkMessageData.java
similarity index 71%
copy from fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/support/ByteBufferConverter.java
copy to fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/message/domain/BulkMessageData.java
index c5f4ddcbf..8fbed39e6 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/support/ByteBufferConverter.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/message/domain/BulkMessageData.java
@@ -16,18 +16,18 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.fineract.infrastructure.event.external.service.support;
+package org.apache.fineract.infrastructure.event.external.service.message.domain;
 
-import java.nio.ByteBuffer;
-import org.springframework.stereotype.Component;
+import java.util.Objects;
+import lombok.Getter;
+import org.apache.fineract.avro.BulkMessagePayloadV1;
 
-@Component
-public class ByteBufferConverter {
+@Getter
+public class BulkMessageData {
 
-    public byte[] convert(ByteBuffer buffer) {
-        byte[] bytes = new byte[buffer.remaining()];
-        buffer.get(bytes);
-        buffer.position(buffer.position() - bytes.length);
-        return bytes;
+    private final BulkMessagePayloadV1 data;
+
+    public BulkMessageData(BulkMessagePayloadV1 data) {
+        this.data = Objects.requireNonNull(data, "data cannot be null");
     }
 }
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/support/ByteBufferConverter.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/message/domain/MessageCategory.java
similarity index 71%
copy from fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/support/ByteBufferConverter.java
copy to fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/message/domain/MessageCategory.java
index c5f4ddcbf..187e7b8b5 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/support/ByteBufferConverter.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/message/domain/MessageCategory.java
@@ -16,18 +16,17 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.fineract.infrastructure.event.external.service.support;
+package org.apache.fineract.infrastructure.event.external.service.message.domain;
 
-import java.nio.ByteBuffer;
-import org.springframework.stereotype.Component;
+import java.util.Objects;
+import lombok.Getter;
 
-@Component
-public class ByteBufferConverter {
+@Getter
+public class MessageCategory {
 
-    public byte[] convert(ByteBuffer buffer) {
-        byte[] bytes = new byte[buffer.remaining()];
-        buffer.get(bytes);
-        buffer.position(buffer.position() - bytes.length);
-        return bytes;
+    private final String category;
+
+    public MessageCategory(String category) {
+        this.category = Objects.requireNonNull(category, "category cannot be null");
     }
 }
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/support/ByteBufferConverter.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/message/domain/MessageData.java
similarity index 74%
copy from fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/support/ByteBufferConverter.java
copy to fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/message/domain/MessageData.java
index c5f4ddcbf..0be00a860 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/support/ByteBufferConverter.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/message/domain/MessageData.java
@@ -16,18 +16,18 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.fineract.infrastructure.event.external.service.support;
+package org.apache.fineract.infrastructure.event.external.service.message.domain;
 
 import java.nio.ByteBuffer;
-import org.springframework.stereotype.Component;
+import java.util.Objects;
+import lombok.Getter;
 
-@Component
-public class ByteBufferConverter {
+@Getter
+public class MessageData {
 
-    public byte[] convert(ByteBuffer buffer) {
-        byte[] bytes = new byte[buffer.remaining()];
-        buffer.get(bytes);
-        buffer.position(buffer.position() - bytes.length);
-        return bytes;
+    private final ByteBuffer data;
+
+    public MessageData(ByteBuffer data) {
+        this.data = Objects.requireNonNull(data, "data cannot be null");
     }
 }
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/support/ByteBufferConverter.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/message/domain/MessageDataSchema.java
similarity index 71%
copy from fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/support/ByteBufferConverter.java
copy to fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/message/domain/MessageDataSchema.java
index c5f4ddcbf..5d47c77c4 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/support/ByteBufferConverter.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/message/domain/MessageDataSchema.java
@@ -16,18 +16,17 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.fineract.infrastructure.event.external.service.support;
+package org.apache.fineract.infrastructure.event.external.service.message.domain;
 
-import java.nio.ByteBuffer;
-import org.springframework.stereotype.Component;
+import java.util.Objects;
+import lombok.Getter;
 
-@Component
-public class ByteBufferConverter {
+@Getter
+public class MessageDataSchema {
 
-    public byte[] convert(ByteBuffer buffer) {
-        byte[] bytes = new byte[buffer.remaining()];
-        buffer.get(bytes);
-        buffer.position(buffer.position() - bytes.length);
-        return bytes;
+    private final String dataSchema;
+
+    public MessageDataSchema(String dataSchema) {
+        this.dataSchema = Objects.requireNonNull(dataSchema, "dataSchema cannot be null");
     }
 }
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/support/ByteBufferConverter.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/message/domain/MessageId.java
similarity index 71%
copy from fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/support/ByteBufferConverter.java
copy to fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/message/domain/MessageId.java
index c5f4ddcbf..63a593c72 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/support/ByteBufferConverter.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/message/domain/MessageId.java
@@ -16,18 +16,14 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.fineract.infrastructure.event.external.service.support;
+package org.apache.fineract.infrastructure.event.external.service.message.domain;
 
-import java.nio.ByteBuffer;
-import org.springframework.stereotype.Component;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
 
-@Component
-public class ByteBufferConverter {
+@RequiredArgsConstructor
+@Getter
+public class MessageId {
 
-    public byte[] convert(ByteBuffer buffer) {
-        byte[] bytes = new byte[buffer.remaining()];
-        buffer.get(bytes);
-        buffer.position(buffer.position() - bytes.length);
-        return bytes;
-    }
+    private final int id;
 }
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/support/ByteBufferConverter.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/message/domain/MessageIdempotencyKey.java
similarity index 71%
copy from fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/support/ByteBufferConverter.java
copy to fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/message/domain/MessageIdempotencyKey.java
index c5f4ddcbf..ae9e7332e 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/support/ByteBufferConverter.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/message/domain/MessageIdempotencyKey.java
@@ -16,18 +16,17 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.fineract.infrastructure.event.external.service.support;
+package org.apache.fineract.infrastructure.event.external.service.message.domain;
 
-import java.nio.ByteBuffer;
-import org.springframework.stereotype.Component;
+import java.util.Objects;
+import lombok.Getter;
 
-@Component
-public class ByteBufferConverter {
+@Getter
+public class MessageIdempotencyKey {
 
-    public byte[] convert(ByteBuffer buffer) {
-        byte[] bytes = new byte[buffer.remaining()];
-        buffer.get(bytes);
-        buffer.position(buffer.position() - bytes.length);
-        return bytes;
+    private final String idempotencyKey;
+
+    public MessageIdempotencyKey(String idempotencyKey) {
+        this.idempotencyKey = Objects.requireNonNull(idempotencyKey, "idempotencyKey cannot be null");
     }
 }
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/support/ByteBufferConverter.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/message/domain/MessageSource.java
similarity index 71%
copy from fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/support/ByteBufferConverter.java
copy to fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/message/domain/MessageSource.java
index c5f4ddcbf..9c096db58 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/support/ByteBufferConverter.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/message/domain/MessageSource.java
@@ -16,18 +16,17 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.fineract.infrastructure.event.external.service.support;
+package org.apache.fineract.infrastructure.event.external.service.message.domain;
 
-import java.nio.ByteBuffer;
-import org.springframework.stereotype.Component;
+import java.util.Objects;
+import lombok.Getter;
 
-@Component
-public class ByteBufferConverter {
+@Getter
+public class MessageSource {
 
-    public byte[] convert(ByteBuffer buffer) {
-        byte[] bytes = new byte[buffer.remaining()];
-        buffer.get(bytes);
-        buffer.position(buffer.position() - bytes.length);
-        return bytes;
+    private final String source;
+
+    public MessageSource(String source) {
+        this.source = Objects.requireNonNull(source, "source cannot be null");
     }
 }
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/support/ByteBufferConverter.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/message/domain/MessageType.java
similarity index 71%
copy from fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/support/ByteBufferConverter.java
copy to fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/message/domain/MessageType.java
index c5f4ddcbf..577f845b9 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/support/ByteBufferConverter.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/message/domain/MessageType.java
@@ -16,18 +16,17 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.fineract.infrastructure.event.external.service.support;
+package org.apache.fineract.infrastructure.event.external.service.message.domain;
 
-import java.nio.ByteBuffer;
-import org.springframework.stereotype.Component;
+import java.util.Objects;
+import lombok.Getter;
 
-@Component
-public class ByteBufferConverter {
+@Getter
+public class MessageType {
 
-    public byte[] convert(ByteBuffer buffer) {
-        byte[] bytes = new byte[buffer.remaining()];
-        buffer.get(bytes);
-        buffer.position(buffer.position() - bytes.length);
-        return bytes;
+    private final String type;
+
+    public MessageType(String type) {
+        this.type = Objects.requireNonNull(type, "type cannot be null");
     }
 }
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/serialization/BusinessEventSerializerFactory.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/serialization/serializer/BusinessEventSerializerFactory.java
similarity index 92%
rename from fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/serialization/BusinessEventSerializerFactory.java
rename to fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/serialization/serializer/BusinessEventSerializerFactory.java
index 30fabca15..cdfd137d0 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/serialization/BusinessEventSerializerFactory.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/serialization/serializer/BusinessEventSerializerFactory.java
@@ -16,12 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.fineract.infrastructure.event.external.service.serialization;
+package org.apache.fineract.infrastructure.event.external.service.serialization.serializer;
 
 import java.util.List;
 import lombok.RequiredArgsConstructor;
 import org.apache.fineract.infrastructure.event.business.domain.BusinessEvent;
-import org.apache.fineract.infrastructure.event.external.service.serialization.serializer.BusinessEventSerializer;
 import org.springframework.stereotype.Component;
 
 @Component
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/support/ByteBufferConverter.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/support/ByteBufferConverter.java
index c5f4ddcbf..6e7ecfe8a 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/support/ByteBufferConverter.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/support/ByteBufferConverter.java
@@ -30,4 +30,8 @@ public class ByteBufferConverter {
         buffer.position(buffer.position() - bytes.length);
         return bytes;
     }
+
+    public ByteBuffer convert(byte[] buffer) {
+        return ByteBuffer.wrap(buffer);
+    }
 }
diff --git a/fineract-provider/src/main/java/org/apache/fineract/portfolio/client/service/ClientWritePlatformServiceJpaRepositoryImpl.java b/fineract-provider/src/main/java/org/apache/fineract/portfolio/client/service/ClientWritePlatformServiceJpaRepositoryImpl.java
index 124ffd6e7..02d9d2ce8 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/portfolio/client/service/ClientWritePlatformServiceJpaRepositoryImpl.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/portfolio/client/service/ClientWritePlatformServiceJpaRepositoryImpl.java
@@ -223,6 +223,7 @@ public class ClientWritePlatformServiceJpaRepositoryImpl implements ClientWriteP
     public CommandProcessingResult createClient(final JsonCommand command) {
 
         try {
+            businessEventNotifierService.startExternalEventRecording();
             final AppUser currentUser = this.context.authenticatedUser();
 
             this.fromApiJsonDeserializer.validateForCreate(command.json());
@@ -339,6 +340,7 @@ public class ClientWritePlatformServiceJpaRepositoryImpl implements ClientWriteP
 
             entityDatatableChecksWritePlatformService.runTheCheck(newClient.getId(), EntityTables.CLIENT.getName(),
                     StatusEnum.CREATE.getCode().longValue(), EntityTables.CLIENT.getForeignKeyColumnNameOnDatatable());
+            businessEventNotifierService.stopExternalEventRecording();
             return new CommandProcessingResultBuilder() //
                     .withCommandId(command.commandId()) //
                     .withOfficeId(clientOffice.getId()) //
diff --git a/fineract-provider/src/test/java/org/apache/fineract/infrastructure/event/external/service/ExternalEventServiceTest.java b/fineract-provider/src/test/java/org/apache/fineract/infrastructure/event/external/service/ExternalEventServiceTest.java
index 32b998ca1..37d919484 100644
--- a/fineract-provider/src/test/java/org/apache/fineract/infrastructure/event/external/service/ExternalEventServiceTest.java
+++ b/fineract-provider/src/test/java/org/apache/fineract/infrastructure/event/external/service/ExternalEventServiceTest.java
@@ -20,26 +20,33 @@ package org.apache.fineract.infrastructure.event.external.service;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.BDDMockito.given;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.time.LocalDate;
 import java.time.ZoneId;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import javax.persistence.EntityManager;
+import org.apache.fineract.avro.BulkMessageItemV1;
 import org.apache.fineract.avro.loan.v1.LoanAccountDataV1;
 import org.apache.fineract.infrastructure.businessdate.domain.BusinessDateType;
 import org.apache.fineract.infrastructure.core.domain.FineractPlatformTenant;
 import org.apache.fineract.infrastructure.core.service.ThreadLocalContextUtil;
+import org.apache.fineract.infrastructure.event.business.domain.BulkBusinessEvent;
 import org.apache.fineract.infrastructure.event.business.domain.BusinessEvent;
 import org.apache.fineract.infrastructure.event.external.repository.ExternalEventRepository;
 import org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEvent;
 import org.apache.fineract.infrastructure.event.external.service.idempotency.ExternalEventIdempotencyKeyGenerator;
-import org.apache.fineract.infrastructure.event.external.service.serialization.BusinessEventSerializerFactory;
+import org.apache.fineract.infrastructure.event.external.service.message.BulkMessageItemFactory;
 import org.apache.fineract.infrastructure.event.external.service.serialization.serializer.BusinessEventSerializer;
+import org.apache.fineract.infrastructure.event.external.service.serialization.serializer.BusinessEventSerializerFactory;
+import org.apache.fineract.infrastructure.event.external.service.support.ByteBufferConverter;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -58,13 +65,18 @@ class ExternalEventServiceTest {
     @Mock
     private BusinessEventSerializerFactory serializerFactory;
     @Mock
+    private ByteBufferConverter byteBufferConverter;
+    @Mock
+    private BulkMessageItemFactory bulkMessageItemFactory;
+    @Mock
     private EntityManager entityManager;
 
     private ExternalEventService underTest;
 
     @BeforeEach
     public void setUp() {
-        underTest = new ExternalEventService(repository, idempotencyKeyGenerator, serializerFactory);
+        underTest = new ExternalEventService(repository, idempotencyKeyGenerator, serializerFactory, byteBufferConverter,
+                bulkMessageItemFactory);
         underTest.setEntityManager(entityManager);
         FineractPlatformTenant tenant = new FineractPlatformTenant(1L, "default", "Default Tenant", "Europe/Budapest", null);
         ThreadLocalContextUtil.setTenant(tenant);
@@ -94,7 +106,7 @@ class ExternalEventServiceTest {
     }
 
     @Test
-    public void testPostEventShouldWork() throws IOException {
+    public void testPostEventShouldWorkWithRegularEvent() throws IOException {
         // given
         ArgumentCaptor<ExternalEvent> externalEventArgumentCaptor = ArgumentCaptor.forClass(ExternalEvent.class);
 
@@ -120,4 +132,31 @@ class ExternalEventServiceTest {
         assertThat(externalEvent.getType()).isEqualTo(eventType);
         assertThat(externalEvent.getSchema()).isEqualTo(eventSchema);
     }
+
+    @Test
+    public void testPostEventShouldWorkWithBulkEvent() throws IOException {
+        // given
+        ArgumentCaptor<ExternalEvent> externalEventArgumentCaptor = ArgumentCaptor.forClass(ExternalEvent.class);
+        String eventType = "BulkBusinessEvent";
+        String schema = "org.apache.fineract.avro.BulkMessagePayloadV1";
+
+        String idempotencyKey = "key";
+        BusinessEvent event = mock(BusinessEvent.class);
+        BulkMessageItemV1 messageItem = new BulkMessageItemV1(1, "", "", "", ByteBuffer.wrap(new byte[0]));
+        BulkBusinessEvent bulkEvent = new BulkBusinessEvent(List.of(event));
+        byte[] data = new byte[0];
+
+        given(bulkMessageItemFactory.createBulkMessageItem(1, event)).willReturn(messageItem);
+        given(idempotencyKeyGenerator.generate(bulkEvent)).willReturn(idempotencyKey);
+        given(byteBufferConverter.convert(any(ByteBuffer.class))).willReturn(data);
+        // when
+        underTest.postEvent(bulkEvent);
+        // then
+        verify(repository).save(externalEventArgumentCaptor.capture());
+        ExternalEvent externalEvent = externalEventArgumentCaptor.getValue();
+        assertThat(externalEvent.getIdempotencyKey()).isEqualTo(idempotencyKey);
+        assertThat(externalEvent.getData()).isEqualTo(data);
+        assertThat(externalEvent.getType()).isEqualTo(eventType);
+        assertThat(externalEvent.getSchema()).isEqualTo(schema);
+    }
 }