You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@fineract.apache.org by ar...@apache.org on 2022/09/12 12:45:41 UTC
[fineract] branch develop updated: FINERACT-1694: Bulk event support
This is an automated email from the ASF dual-hosted git repository.
arnold pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/fineract.git
The following commit(s) were added to refs/heads/develop by this push:
new 7558b09de FINERACT-1694: Bulk event support
7558b09de is described below
commit 7558b09deff71939a663e8d8211d013bd3638f67
Author: Arnold Galovics <ga...@gmail.com>
AuthorDate: Mon Sep 12 09:46:12 2022 +0200
FINERACT-1694: Bulk event support
---
.../src/main/avro/BulkMessageItemV1.avsc | 32 +++++++++
.../src/main/avro/BulkMessagePayloadV1.avsc | 13 ++++
.../src/main/avro/BulkMessageV1.avsc | 42 ++++++++++++
.../external/service/ExternalEventService.java | 50 +++++++++++---
.../BulkMessageItemFactory.java} | 29 +++++----
.../external/service/message/MessageFactory.java | 76 ++++++++++++++++++++++
.../domain/BulkMessageData.java} | 20 +++---
.../domain/MessageCategory.java} | 19 +++---
.../domain/MessageData.java} | 18 ++---
.../domain/MessageDataSchema.java} | 19 +++---
.../domain/MessageId.java} | 18 ++---
.../domain/MessageIdempotencyKey.java} | 19 +++---
.../domain/MessageSource.java} | 19 +++---
.../domain/MessageType.java} | 19 +++---
.../BusinessEventSerializerFactory.java | 3 +-
.../service/support/ByteBufferConverter.java | 4 ++
...lientWritePlatformServiceJpaRepositoryImpl.java | 2 +
.../external/service/ExternalEventServiceTest.java | 45 ++++++++++++-
18 files changed, 342 insertions(+), 105 deletions(-)
diff --git a/fineract-avro-schemas/src/main/avro/BulkMessageItemV1.avsc b/fineract-avro-schemas/src/main/avro/BulkMessageItemV1.avsc
new file mode 100644
index 000000000..e094fe8b4
--- /dev/null
+++ b/fineract-avro-schemas/src/main/avro/BulkMessageItemV1.avsc
@@ -0,0 +1,32 @@
+{
+ "name": "BulkMessageItemV1",
+ "namespace": "org.apache.fineract.avro",
+ "type": "record",
+ "fields": [
+ {
+ "name": "id",
+ "doc": "The ID of the message to be sent",
+ "type": "int"
+ },
+ {
+ "name": "type",
+ "doc": "The type of event the payload refers to. For example LoanApprovedBusinessEvent",
+ "type": "string"
+ },
+ {
+ "name": "category",
+ "doc": "The category of event the payload refers to. For example LOAN",
+ "type": "string"
+ },
+ {
+ "name": "dataschema",
+ "doc": "The fully qualified name of the schema of the event payload. For example org.apache.fineract.avro.loan.v1.LoanAccountDataV1",
+ "type": "string"
+ },
+ {
+ "name": "data",
+ "doc": "The payload data serialized into Avro bytes",
+ "type": "bytes"
+ }
+ ]
+}
diff --git a/fineract-avro-schemas/src/main/avro/BulkMessagePayloadV1.avsc b/fineract-avro-schemas/src/main/avro/BulkMessagePayloadV1.avsc
new file mode 100644
index 000000000..451edbc5d
--- /dev/null
+++ b/fineract-avro-schemas/src/main/avro/BulkMessagePayloadV1.avsc
@@ -0,0 +1,13 @@
+{
+ "name": "BulkMessagePayloadV1",
+ "namespace": "org.apache.fineract.avro",
+ "type": "record",
+ "fields": [{
+ "name": "datas",
+ "doc": "The individual messages within this bulk message",
+ "type": {
+ "type": "array",
+ "items": "org.apache.fineract.avro.BulkMessageItemV1"
+ }
+ }]
+}
diff --git a/fineract-avro-schemas/src/main/avro/BulkMessageV1.avsc b/fineract-avro-schemas/src/main/avro/BulkMessageV1.avsc
new file mode 100644
index 000000000..f8ed90813
--- /dev/null
+++ b/fineract-avro-schemas/src/main/avro/BulkMessageV1.avsc
@@ -0,0 +1,42 @@
+{
+ "name": "BulkMessageV1",
+ "namespace": "org.apache.fineract.avro",
+ "type": "record",
+ "fields": [
+ {
+ "name": "id",
+ "doc": "The ID of the message to be sent",
+ "type": "int"
+ },
+ {
+ "name": "source",
+ "doc": "A unique identifier of the source service",
+ "type": "string"
+ },
+ {
+ "name": "type",
+ "doc": "The type of event the payload refers to. For example LoanApprovedBusinessEvent",
+ "type": "string"
+ },
+ {
+ "name": "createdAt",
+ "doc": "The UTC time of when the event has been raised; in ISO_LOCAL_DATE_TIME format. For example 2011-12-03T10:15:30",
+ "type": "string"
+ },
+ {
+ "name": "tenantId",
+ "doc": "The tenantId that the event has been sent from. For example default",
+ "type": "string"
+ },
+ {
+ "name": "idempotencyKey",
+ "doc": "The idempotency key for this particular event for consumer de-duplication",
+ "type": "string"
+ },
+ {
+ "name": "data",
+ "doc": "The payload for this bulk message",
+ "type": "org.apache.fineract.avro.BulkMessagePayloadV1"
+ }
+ ]
+}
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/ExternalEventService.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/ExternalEventService.java
index 3b56a0314..cede39a7a 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/ExternalEventService.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/ExternalEventService.java
@@ -19,15 +19,22 @@
package org.apache.fineract.infrastructure.event.external.service;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;
import lombok.RequiredArgsConstructor;
+import org.apache.fineract.avro.BulkMessageItemV1;
+import org.apache.fineract.avro.BulkMessagePayloadV1;
+import org.apache.fineract.infrastructure.event.business.domain.BulkBusinessEvent;
import org.apache.fineract.infrastructure.event.business.domain.BusinessEvent;
import org.apache.fineract.infrastructure.event.external.repository.ExternalEventRepository;
import org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEvent;
import org.apache.fineract.infrastructure.event.external.service.idempotency.ExternalEventIdempotencyKeyGenerator;
-import org.apache.fineract.infrastructure.event.external.service.serialization.BusinessEventSerializerFactory;
+import org.apache.fineract.infrastructure.event.external.service.message.BulkMessageItemFactory;
import org.apache.fineract.infrastructure.event.external.service.serialization.serializer.BusinessEventSerializer;
+import org.apache.fineract.infrastructure.event.external.service.serialization.serializer.BusinessEventSerializerFactory;
+import org.apache.fineract.infrastructure.event.external.service.support.ByteBufferConverter;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@@ -39,6 +46,8 @@ public class ExternalEventService {
private final ExternalEventRepository repository;
private final ExternalEventIdempotencyKeyGenerator idempotencyKeyGenerator;
private final BusinessEventSerializerFactory serializerFactory;
+ private final ByteBufferConverter byteBufferConverter;
+ private final BulkMessageItemFactory bulkMessageItemFactory;
private EntityManager entityManager;
@@ -47,15 +56,14 @@ public class ExternalEventService {
throw new IllegalArgumentException("event cannot be null");
}
- String eventType = event.getType();
- String idempotencyKey = idempotencyKeyGenerator.generate(event);
try {
- BusinessEventSerializer serializer = serializerFactory.create(event);
- String schema = serializer.getSupportedSchema().getName();
flushChangesBeforeSerialization();
- byte[] data = serializer.serialize(event);
- ExternalEvent externalEvent = new ExternalEvent(eventType, schema, data, idempotencyKey);
-
+ ExternalEvent externalEvent;
+ if (event instanceof BulkBusinessEvent) {
+ externalEvent = handleBulkBusinessEvent((BulkBusinessEvent) event);
+ } else {
+ externalEvent = handleRegularBusinessEvent(event);
+ }
repository.save(externalEvent);
} catch (IOException e) {
throw new RuntimeException("Error while serializing event " + event.getClass().getSimpleName(), e);
@@ -63,6 +71,32 @@ public class ExternalEventService {
}
+ private ExternalEvent handleBulkBusinessEvent(BulkBusinessEvent bulkBusinessEvent) throws IOException {
+ List<BulkMessageItemV1> messages = new ArrayList<>();
+ List<BusinessEvent<?>> events = bulkBusinessEvent.get();
+ for (int i = 0; i < events.size(); i++) {
+ BusinessEvent<?> event = events.get(i);
+ int id = i + 1;
+ BulkMessageItemV1 message = bulkMessageItemFactory.createBulkMessageItem(id, event);
+ messages.add(message);
+ }
+ String idempotencyKey = idempotencyKeyGenerator.generate(bulkBusinessEvent);
+ BulkMessagePayloadV1 avroDto = new BulkMessagePayloadV1(messages);
+ byte[] data = byteBufferConverter.convert(avroDto.toByteBuffer());
+
+ return new ExternalEvent(bulkBusinessEvent.getType(), BulkMessagePayloadV1.class.getName(), data, idempotencyKey);
+ }
+
+ private <T> ExternalEvent handleRegularBusinessEvent(BusinessEvent<T> event) throws IOException {
+ String eventType = event.getType();
+ String idempotencyKey = idempotencyKeyGenerator.generate(event);
+ BusinessEventSerializer serializer = serializerFactory.create(event);
+ String schema = serializer.getSupportedSchema().getName();
+ byte[] data = serializer.serialize(event);
+
+ return new ExternalEvent(eventType, schema, data, idempotencyKey);
+ }
+
private void flushChangesBeforeSerialization() {
entityManager.flush();
}
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/serialization/BusinessEventSerializerFactory.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/message/BulkMessageItemFactory.java
similarity index 51%
copy from fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/serialization/BusinessEventSerializerFactory.java
copy to fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/message/BulkMessageItemFactory.java
index 30fabca15..a01baa18a 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/serialization/BusinessEventSerializerFactory.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/message/BulkMessageItemFactory.java
@@ -16,27 +16,32 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.fineract.infrastructure.event.external.service.serialization;
+package org.apache.fineract.infrastructure.event.external.service.message;
-import java.util.List;
+import java.io.IOException;
+import java.nio.ByteBuffer;
import lombok.RequiredArgsConstructor;
+import org.apache.fineract.avro.BulkMessageItemV1;
import org.apache.fineract.infrastructure.event.business.domain.BusinessEvent;
import org.apache.fineract.infrastructure.event.external.service.serialization.serializer.BusinessEventSerializer;
+import org.apache.fineract.infrastructure.event.external.service.serialization.serializer.BusinessEventSerializerFactory;
+import org.apache.fineract.infrastructure.event.external.service.support.ByteBufferConverter;
import org.springframework.stereotype.Component;
@Component
@RequiredArgsConstructor
-public class BusinessEventSerializerFactory {
+public class BulkMessageItemFactory {
- private final List<BusinessEventSerializer> serializers;
+ private final BusinessEventSerializerFactory serializerFactory;
+ private final ByteBufferConverter byteBufferConverter;
- public <T> BusinessEventSerializer create(BusinessEvent<T> event) {
- for (BusinessEventSerializer serializer : serializers) {
- if (serializer.canSerialize(event)) {
- return serializer;
- }
- }
- throw new IllegalStateException("There's no serializer that's capable of serializing a " + event.getClass().getSimpleName());
+ public BulkMessageItemV1 createBulkMessageItem(int id, BusinessEvent<?> event) throws IOException {
+ BusinessEventSerializer eventSerializer = serializerFactory.create(event);
+ byte[] serializedContent = eventSerializer.serialize(event);
+ String type = event.getType();
+ String category = "nocategory"; // TODO: switch this to the actual category when implemented
+ String schema = eventSerializer.getSupportedSchema().getName();
+ ByteBuffer data = byteBufferConverter.convert(serializedContent);
+ return new BulkMessageItemV1(id, type, category, schema, data);
}
-
}
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/message/MessageFactory.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/message/MessageFactory.java
new file mode 100644
index 000000000..0701dcb24
--- /dev/null
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/message/MessageFactory.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.fineract.infrastructure.event.external.service.message;
+
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import org.apache.fineract.avro.BulkMessageV1;
+import org.apache.fineract.avro.MessageV1;
+import org.apache.fineract.infrastructure.core.service.DateUtils;
+import org.apache.fineract.infrastructure.core.service.ThreadLocalContextUtil;
+import org.apache.fineract.infrastructure.event.external.service.message.domain.BulkMessageData;
+import org.apache.fineract.infrastructure.event.external.service.message.domain.MessageCategory;
+import org.apache.fineract.infrastructure.event.external.service.message.domain.MessageData;
+import org.apache.fineract.infrastructure.event.external.service.message.domain.MessageDataSchema;
+import org.apache.fineract.infrastructure.event.external.service.message.domain.MessageId;
+import org.apache.fineract.infrastructure.event.external.service.message.domain.MessageIdempotencyKey;
+import org.apache.fineract.infrastructure.event.external.service.message.domain.MessageSource;
+import org.apache.fineract.infrastructure.event.external.service.message.domain.MessageType;
+import org.springframework.stereotype.Component;
+
+@Component
+public class MessageFactory {
+
+ public MessageV1 createMessage(MessageId id, MessageSource source, MessageType type, MessageCategory category,
+ MessageIdempotencyKey idempotencyKey, MessageDataSchema dataSchema, MessageData data) {
+ MessageV1 result = new MessageV1();
+ result.setId(id.getId());
+ result.setSource(source.getSource());
+ result.setType(type.getType());
+ result.setCategory(category.getCategory());
+ result.setCreatedAt(getMessageCreatedAt());
+ result.setTenantId(getTenantId());
+ result.setIdempotencyKey(idempotencyKey.getIdempotencyKey());
+ result.setDataschema(dataSchema.getDataSchema());
+ result.setData(data.getData());
+ return result;
+ }
+
+ public BulkMessageV1 createBulkMessage(MessageId id, MessageSource source, MessageType type, BulkMessageData data) {
+
+ BulkMessageV1 result = new BulkMessageV1();
+ result.setId(id.getId());
+ result.setSource(source.getSource());
+ result.setType(type.getType());
+ result.setCreatedAt(getMessageCreatedAt());
+ result.setTenantId(getTenantId());
+ result.setData(data.getData());
+ return result;
+ }
+
+ private String getTenantId() {
+ return ThreadLocalContextUtil.getTenant().getName();
+ }
+
+ private String getMessageCreatedAt() {
+ OffsetDateTime createdAt = DateUtils.getOffsetDateTimeOfTenant();
+ return createdAt.withOffsetSameInstant(ZoneOffset.UTC).toLocalDateTime().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME);
+ }
+}
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/support/ByteBufferConverter.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/message/domain/BulkMessageData.java
similarity index 71%
copy from fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/support/ByteBufferConverter.java
copy to fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/message/domain/BulkMessageData.java
index c5f4ddcbf..8fbed39e6 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/support/ByteBufferConverter.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/message/domain/BulkMessageData.java
@@ -16,18 +16,18 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.fineract.infrastructure.event.external.service.support;
+package org.apache.fineract.infrastructure.event.external.service.message.domain;
-import java.nio.ByteBuffer;
-import org.springframework.stereotype.Component;
+import java.util.Objects;
+import lombok.Getter;
+import org.apache.fineract.avro.BulkMessagePayloadV1;
-@Component
-public class ByteBufferConverter {
+@Getter
+public class BulkMessageData {
- public byte[] convert(ByteBuffer buffer) {
- byte[] bytes = new byte[buffer.remaining()];
- buffer.get(bytes);
- buffer.position(buffer.position() - bytes.length);
- return bytes;
+ private final BulkMessagePayloadV1 data;
+
+ public BulkMessageData(BulkMessagePayloadV1 data) {
+ this.data = Objects.requireNonNull(data, "data cannot be null");
}
}
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/support/ByteBufferConverter.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/message/domain/MessageCategory.java
similarity index 71%
copy from fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/support/ByteBufferConverter.java
copy to fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/message/domain/MessageCategory.java
index c5f4ddcbf..187e7b8b5 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/support/ByteBufferConverter.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/message/domain/MessageCategory.java
@@ -16,18 +16,17 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.fineract.infrastructure.event.external.service.support;
+package org.apache.fineract.infrastructure.event.external.service.message.domain;
-import java.nio.ByteBuffer;
-import org.springframework.stereotype.Component;
+import java.util.Objects;
+import lombok.Getter;
-@Component
-public class ByteBufferConverter {
+@Getter
+public class MessageCategory {
- public byte[] convert(ByteBuffer buffer) {
- byte[] bytes = new byte[buffer.remaining()];
- buffer.get(bytes);
- buffer.position(buffer.position() - bytes.length);
- return bytes;
+ private final String category;
+
+ public MessageCategory(String category) {
+ this.category = Objects.requireNonNull(category, "category cannot be null");
}
}
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/support/ByteBufferConverter.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/message/domain/MessageData.java
similarity index 74%
copy from fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/support/ByteBufferConverter.java
copy to fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/message/domain/MessageData.java
index c5f4ddcbf..0be00a860 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/support/ByteBufferConverter.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/message/domain/MessageData.java
@@ -16,18 +16,18 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.fineract.infrastructure.event.external.service.support;
+package org.apache.fineract.infrastructure.event.external.service.message.domain;
import java.nio.ByteBuffer;
-import org.springframework.stereotype.Component;
+import java.util.Objects;
+import lombok.Getter;
-@Component
-public class ByteBufferConverter {
+@Getter
+public class MessageData {
- public byte[] convert(ByteBuffer buffer) {
- byte[] bytes = new byte[buffer.remaining()];
- buffer.get(bytes);
- buffer.position(buffer.position() - bytes.length);
- return bytes;
+ private final ByteBuffer data;
+
+ public MessageData(ByteBuffer data) {
+ this.data = Objects.requireNonNull(data, "data cannot be null");
}
}
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/support/ByteBufferConverter.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/message/domain/MessageDataSchema.java
similarity index 71%
copy from fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/support/ByteBufferConverter.java
copy to fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/message/domain/MessageDataSchema.java
index c5f4ddcbf..5d47c77c4 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/support/ByteBufferConverter.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/message/domain/MessageDataSchema.java
@@ -16,18 +16,17 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.fineract.infrastructure.event.external.service.support;
+package org.apache.fineract.infrastructure.event.external.service.message.domain;
-import java.nio.ByteBuffer;
-import org.springframework.stereotype.Component;
+import java.util.Objects;
+import lombok.Getter;
-@Component
-public class ByteBufferConverter {
+@Getter
+public class MessageDataSchema {
- public byte[] convert(ByteBuffer buffer) {
- byte[] bytes = new byte[buffer.remaining()];
- buffer.get(bytes);
- buffer.position(buffer.position() - bytes.length);
- return bytes;
+ private final String dataSchema;
+
+ public MessageDataSchema(String dataSchema) {
+ this.dataSchema = Objects.requireNonNull(dataSchema, "dataSchema cannot be null");
}
}
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/support/ByteBufferConverter.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/message/domain/MessageId.java
similarity index 71%
copy from fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/support/ByteBufferConverter.java
copy to fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/message/domain/MessageId.java
index c5f4ddcbf..63a593c72 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/support/ByteBufferConverter.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/message/domain/MessageId.java
@@ -16,18 +16,14 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.fineract.infrastructure.event.external.service.support;
+package org.apache.fineract.infrastructure.event.external.service.message.domain;
-import java.nio.ByteBuffer;
-import org.springframework.stereotype.Component;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
-@Component
-public class ByteBufferConverter {
+@RequiredArgsConstructor
+@Getter
+public class MessageId {
- public byte[] convert(ByteBuffer buffer) {
- byte[] bytes = new byte[buffer.remaining()];
- buffer.get(bytes);
- buffer.position(buffer.position() - bytes.length);
- return bytes;
- }
+ private final int id;
}
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/support/ByteBufferConverter.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/message/domain/MessageIdempotencyKey.java
similarity index 71%
copy from fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/support/ByteBufferConverter.java
copy to fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/message/domain/MessageIdempotencyKey.java
index c5f4ddcbf..ae9e7332e 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/support/ByteBufferConverter.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/message/domain/MessageIdempotencyKey.java
@@ -16,18 +16,17 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.fineract.infrastructure.event.external.service.support;
+package org.apache.fineract.infrastructure.event.external.service.message.domain;
-import java.nio.ByteBuffer;
-import org.springframework.stereotype.Component;
+import java.util.Objects;
+import lombok.Getter;
-@Component
-public class ByteBufferConverter {
+@Getter
+public class MessageIdempotencyKey {
- public byte[] convert(ByteBuffer buffer) {
- byte[] bytes = new byte[buffer.remaining()];
- buffer.get(bytes);
- buffer.position(buffer.position() - bytes.length);
- return bytes;
+ private final String idempotencyKey;
+
+ public MessageIdempotencyKey(String idempotencyKey) {
+ this.idempotencyKey = Objects.requireNonNull(idempotencyKey, "idempotencyKey cannot be null");
}
}
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/support/ByteBufferConverter.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/message/domain/MessageSource.java
similarity index 71%
copy from fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/support/ByteBufferConverter.java
copy to fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/message/domain/MessageSource.java
index c5f4ddcbf..9c096db58 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/support/ByteBufferConverter.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/message/domain/MessageSource.java
@@ -16,18 +16,17 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.fineract.infrastructure.event.external.service.support;
+package org.apache.fineract.infrastructure.event.external.service.message.domain;
-import java.nio.ByteBuffer;
-import org.springframework.stereotype.Component;
+import java.util.Objects;
+import lombok.Getter;
-@Component
-public class ByteBufferConverter {
+@Getter
+public class MessageSource {
- public byte[] convert(ByteBuffer buffer) {
- byte[] bytes = new byte[buffer.remaining()];
- buffer.get(bytes);
- buffer.position(buffer.position() - bytes.length);
- return bytes;
+ private final String source;
+
+ public MessageSource(String source) {
+ this.source = Objects.requireNonNull(source, "source cannot be null");
}
}
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/support/ByteBufferConverter.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/message/domain/MessageType.java
similarity index 71%
copy from fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/support/ByteBufferConverter.java
copy to fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/message/domain/MessageType.java
index c5f4ddcbf..577f845b9 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/support/ByteBufferConverter.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/message/domain/MessageType.java
@@ -16,18 +16,17 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.fineract.infrastructure.event.external.service.support;
+package org.apache.fineract.infrastructure.event.external.service.message.domain;
-import java.nio.ByteBuffer;
-import org.springframework.stereotype.Component;
+import java.util.Objects;
+import lombok.Getter;
-@Component
-public class ByteBufferConverter {
+@Getter
+public class MessageType {
- public byte[] convert(ByteBuffer buffer) {
- byte[] bytes = new byte[buffer.remaining()];
- buffer.get(bytes);
- buffer.position(buffer.position() - bytes.length);
- return bytes;
+ private final String type;
+
+ public MessageType(String type) {
+ this.type = Objects.requireNonNull(type, "type cannot be null");
}
}
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/serialization/BusinessEventSerializerFactory.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/serialization/serializer/BusinessEventSerializerFactory.java
similarity index 92%
rename from fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/serialization/BusinessEventSerializerFactory.java
rename to fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/serialization/serializer/BusinessEventSerializerFactory.java
index 30fabca15..cdfd137d0 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/serialization/BusinessEventSerializerFactory.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/serialization/serializer/BusinessEventSerializerFactory.java
@@ -16,12 +16,11 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.fineract.infrastructure.event.external.service.serialization;
+package org.apache.fineract.infrastructure.event.external.service.serialization.serializer;
import java.util.List;
import lombok.RequiredArgsConstructor;
import org.apache.fineract.infrastructure.event.business.domain.BusinessEvent;
-import org.apache.fineract.infrastructure.event.external.service.serialization.serializer.BusinessEventSerializer;
import org.springframework.stereotype.Component;
@Component
diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/support/ByteBufferConverter.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/support/ByteBufferConverter.java
index c5f4ddcbf..6e7ecfe8a 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/support/ByteBufferConverter.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/support/ByteBufferConverter.java
@@ -30,4 +30,8 @@ public class ByteBufferConverter {
buffer.position(buffer.position() - bytes.length);
return bytes;
}
+
+ public ByteBuffer convert(byte[] buffer) {
+ return ByteBuffer.wrap(buffer);
+ }
}
diff --git a/fineract-provider/src/main/java/org/apache/fineract/portfolio/client/service/ClientWritePlatformServiceJpaRepositoryImpl.java b/fineract-provider/src/main/java/org/apache/fineract/portfolio/client/service/ClientWritePlatformServiceJpaRepositoryImpl.java
index 124ffd6e7..02d9d2ce8 100644
--- a/fineract-provider/src/main/java/org/apache/fineract/portfolio/client/service/ClientWritePlatformServiceJpaRepositoryImpl.java
+++ b/fineract-provider/src/main/java/org/apache/fineract/portfolio/client/service/ClientWritePlatformServiceJpaRepositoryImpl.java
@@ -223,6 +223,7 @@ public class ClientWritePlatformServiceJpaRepositoryImpl implements ClientWriteP
public CommandProcessingResult createClient(final JsonCommand command) {
try {
+ businessEventNotifierService.startExternalEventRecording();
final AppUser currentUser = this.context.authenticatedUser();
this.fromApiJsonDeserializer.validateForCreate(command.json());
@@ -339,6 +340,7 @@ public class ClientWritePlatformServiceJpaRepositoryImpl implements ClientWriteP
entityDatatableChecksWritePlatformService.runTheCheck(newClient.getId(), EntityTables.CLIENT.getName(),
StatusEnum.CREATE.getCode().longValue(), EntityTables.CLIENT.getForeignKeyColumnNameOnDatatable());
+ businessEventNotifierService.stopExternalEventRecording();
return new CommandProcessingResultBuilder() //
.withCommandId(command.commandId()) //
.withOfficeId(clientOffice.getId()) //
diff --git a/fineract-provider/src/test/java/org/apache/fineract/infrastructure/event/external/service/ExternalEventServiceTest.java b/fineract-provider/src/test/java/org/apache/fineract/infrastructure/event/external/service/ExternalEventServiceTest.java
index 32b998ca1..37d919484 100644
--- a/fineract-provider/src/test/java/org/apache/fineract/infrastructure/event/external/service/ExternalEventServiceTest.java
+++ b/fineract-provider/src/test/java/org/apache/fineract/infrastructure/event/external/service/ExternalEventServiceTest.java
@@ -20,26 +20,33 @@ package org.apache.fineract.infrastructure.event.external.service;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.time.LocalDate;
import java.time.ZoneId;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import javax.persistence.EntityManager;
+import org.apache.fineract.avro.BulkMessageItemV1;
import org.apache.fineract.avro.loan.v1.LoanAccountDataV1;
import org.apache.fineract.infrastructure.businessdate.domain.BusinessDateType;
import org.apache.fineract.infrastructure.core.domain.FineractPlatformTenant;
import org.apache.fineract.infrastructure.core.service.ThreadLocalContextUtil;
+import org.apache.fineract.infrastructure.event.business.domain.BulkBusinessEvent;
import org.apache.fineract.infrastructure.event.business.domain.BusinessEvent;
import org.apache.fineract.infrastructure.event.external.repository.ExternalEventRepository;
import org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEvent;
import org.apache.fineract.infrastructure.event.external.service.idempotency.ExternalEventIdempotencyKeyGenerator;
-import org.apache.fineract.infrastructure.event.external.service.serialization.BusinessEventSerializerFactory;
+import org.apache.fineract.infrastructure.event.external.service.message.BulkMessageItemFactory;
import org.apache.fineract.infrastructure.event.external.service.serialization.serializer.BusinessEventSerializer;
+import org.apache.fineract.infrastructure.event.external.service.serialization.serializer.BusinessEventSerializerFactory;
+import org.apache.fineract.infrastructure.event.external.service.support.ByteBufferConverter;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -58,13 +65,18 @@ class ExternalEventServiceTest {
@Mock
private BusinessEventSerializerFactory serializerFactory;
@Mock
+ private ByteBufferConverter byteBufferConverter;
+ @Mock
+ private BulkMessageItemFactory bulkMessageItemFactory;
+ @Mock
private EntityManager entityManager;
private ExternalEventService underTest;
@BeforeEach
public void setUp() {
- underTest = new ExternalEventService(repository, idempotencyKeyGenerator, serializerFactory);
+ underTest = new ExternalEventService(repository, idempotencyKeyGenerator, serializerFactory, byteBufferConverter,
+ bulkMessageItemFactory);
underTest.setEntityManager(entityManager);
FineractPlatformTenant tenant = new FineractPlatformTenant(1L, "default", "Default Tenant", "Europe/Budapest", null);
ThreadLocalContextUtil.setTenant(tenant);
@@ -94,7 +106,7 @@ class ExternalEventServiceTest {
}
@Test
- public void testPostEventShouldWork() throws IOException {
+ public void testPostEventShouldWorkWithRegularEvent() throws IOException {
// given
ArgumentCaptor<ExternalEvent> externalEventArgumentCaptor = ArgumentCaptor.forClass(ExternalEvent.class);
@@ -120,4 +132,31 @@ class ExternalEventServiceTest {
assertThat(externalEvent.getType()).isEqualTo(eventType);
assertThat(externalEvent.getSchema()).isEqualTo(eventSchema);
}
+
+ @Test
+ public void testPostEventShouldWorkWithBulkEvent() throws IOException {
+ // given
+ ArgumentCaptor<ExternalEvent> externalEventArgumentCaptor = ArgumentCaptor.forClass(ExternalEvent.class);
+ String eventType = "BulkBusinessEvent";
+ String schema = "org.apache.fineract.avro.BulkMessagePayloadV1";
+
+ String idempotencyKey = "key";
+ BusinessEvent event = mock(BusinessEvent.class);
+ BulkMessageItemV1 messageItem = new BulkMessageItemV1(1, "", "", "", ByteBuffer.wrap(new byte[0]));
+ BulkBusinessEvent bulkEvent = new BulkBusinessEvent(List.of(event));
+ byte[] data = new byte[0];
+
+ given(bulkMessageItemFactory.createBulkMessageItem(1, event)).willReturn(messageItem);
+ given(idempotencyKeyGenerator.generate(bulkEvent)).willReturn(idempotencyKey);
+ given(byteBufferConverter.convert(any(ByteBuffer.class))).willReturn(data);
+ // when
+ underTest.postEvent(bulkEvent);
+ // then
+ verify(repository).save(externalEventArgumentCaptor.capture());
+ ExternalEvent externalEvent = externalEventArgumentCaptor.getValue();
+ assertThat(externalEvent.getIdempotencyKey()).isEqualTo(idempotencyKey);
+ assertThat(externalEvent.getData()).isEqualTo(data);
+ assertThat(externalEvent.getType()).isEqualTo(eventType);
+ assertThat(externalEvent.getSchema()).isEqualTo(schema);
+ }
}