You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@fineract.apache.org by GitBox <gi...@apache.org> on 2022/09/15 19:41:51 UTC

[GitHub] [fineract] galovics commented on a diff in pull request #2603: FINERACT-1694 Send Asynchronous Events

galovics commented on code in PR #2603:
URL: https://github.com/apache/fineract/pull/2603#discussion_r972353210


##########
fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsTasklet.java:
##########
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.fineract.infrastructure.event.external.jobs;
+
+import java.sql.Types;
+import java.util.List;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.fineract.avro.MessageV1;
+import org.apache.fineract.infrastructure.core.config.FineractProperties;
+import org.apache.fineract.infrastructure.core.service.DateUtils;
+import org.apache.fineract.infrastructure.core.service.RoutingDataSourceServiceFactory;
+import org.apache.fineract.infrastructure.core.service.database.DatabaseSpecificSQLGenerator;
+import org.apache.fineract.infrastructure.event.external.producer.ExternalEventProducer;
+import org.apache.fineract.infrastructure.event.external.repository.ExternalEventRepository;
+import org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEvent;
+import org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEventStatus;
+import org.apache.fineract.infrastructure.event.external.serializer.ExternalEventDataSerializer;
+import org.springframework.batch.core.StepContribution;
+import org.springframework.batch.core.scope.context.ChunkContext;
+import org.springframework.batch.core.step.tasklet.Tasklet;
+import org.springframework.batch.repeat.RepeatStatus;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.jdbc.core.RowMapper;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@RequiredArgsConstructor
+@Component
+public class SendAsynchronousEventsTasklet implements Tasklet {
+
+    private final RoutingDataSourceServiceFactory dataSourceServiceFactory;
+    private final FineractProperties fineractProperties;
+    private final ExternalEventRepository repository;
+    private final ExternalEventProducer eventProducer;
+    private final ExternalEventDataSerializer eventDataSerializer;
+    private final DatabaseSpecificSQLGenerator sqlGenerator;
+
+    @Override
+    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) {
+
+        try {
+            int readBatchSize = getBatchSize();
+            List<Long> externalEventIds = getTobeSentEventsBatch(readBatchSize);
+            sendEventAndUpdateEventSentStatus(externalEventIds);
+        } catch (Exception e) {
+            log.info(e.getMessage());

Review Comment:
   Probably I'd log a custom message and pass the exception itself as a second parameter. Then it'd log the stacktrace as well automatically.



##########
fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsTasklet.java:
##########
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.fineract.infrastructure.event.external.jobs;
+
+import java.sql.Types;
+import java.util.List;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.fineract.avro.MessageV1;
+import org.apache.fineract.infrastructure.core.config.FineractProperties;
+import org.apache.fineract.infrastructure.core.service.DateUtils;
+import org.apache.fineract.infrastructure.core.service.RoutingDataSourceServiceFactory;
+import org.apache.fineract.infrastructure.core.service.database.DatabaseSpecificSQLGenerator;
+import org.apache.fineract.infrastructure.event.external.producer.ExternalEventProducer;
+import org.apache.fineract.infrastructure.event.external.repository.ExternalEventRepository;
+import org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEvent;
+import org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEventStatus;
+import org.apache.fineract.infrastructure.event.external.serializer.ExternalEventDataSerializer;
+import org.springframework.batch.core.StepContribution;
+import org.springframework.batch.core.scope.context.ChunkContext;
+import org.springframework.batch.core.step.tasklet.Tasklet;
+import org.springframework.batch.repeat.RepeatStatus;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.jdbc.core.RowMapper;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@RequiredArgsConstructor
+@Component
+public class SendAsynchronousEventsTasklet implements Tasklet {
+
+    private final RoutingDataSourceServiceFactory dataSourceServiceFactory;
+    private final FineractProperties fineractProperties;
+    private final ExternalEventRepository repository;
+    private final ExternalEventProducer eventProducer;
+    private final ExternalEventDataSerializer eventDataSerializer;
+    private final DatabaseSpecificSQLGenerator sqlGenerator;
+
+    @Override
+    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) {
+
+        try {
+            int readBatchSize = getBatchSize();
+            List<Long> externalEventIds = getTobeSentEventsBatch(readBatchSize);
+            sendEventAndUpdateEventSentStatus(externalEventIds);
+        } catch (Exception e) {
+            log.info(e.getMessage());
+        }
+        return RepeatStatus.FINISHED;
+    }
+
+    private void sendEventAndUpdateEventSentStatus(List<Long> externalEventIds) throws Exception {
+        for (Long eventId : externalEventIds) {
+            ExternalEvent event = repository.findById(eventId).get();
+            MessageV1 eventData = eventDataSerializer.serialize(event);
+            eventProducer.sendEvent(eventData);
+            event.setStatus(ExternalEventStatus.SENT);
+            event.setSentAt(DateUtils.getOffsetDateTimeOfTenant());
+            repository.save(event);
+
+        }
+    }
+
+    private List<Long> getTobeSentEventsBatch(int readBatchSize) {

Review Comment:
   Method naming is not following the Java conventions (camelCase).
   Also, you could rename this method as getQueuedEvents or smthing.



##########
fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsTasklet.java:
##########
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.fineract.infrastructure.event.external.jobs;
+
+import java.sql.Types;
+import java.util.List;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.fineract.avro.MessageV1;
+import org.apache.fineract.infrastructure.core.config.FineractProperties;
+import org.apache.fineract.infrastructure.core.service.DateUtils;
+import org.apache.fineract.infrastructure.core.service.RoutingDataSourceServiceFactory;
+import org.apache.fineract.infrastructure.core.service.database.DatabaseSpecificSQLGenerator;
+import org.apache.fineract.infrastructure.event.external.producer.ExternalEventProducer;
+import org.apache.fineract.infrastructure.event.external.repository.ExternalEventRepository;
+import org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEvent;
+import org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEventStatus;
+import org.apache.fineract.infrastructure.event.external.serializer.ExternalEventDataSerializer;
+import org.springframework.batch.core.StepContribution;
+import org.springframework.batch.core.scope.context.ChunkContext;
+import org.springframework.batch.core.step.tasklet.Tasklet;
+import org.springframework.batch.repeat.RepeatStatus;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.jdbc.core.RowMapper;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@RequiredArgsConstructor
+@Component
+public class SendAsynchronousEventsTasklet implements Tasklet {
+
+    private final RoutingDataSourceServiceFactory dataSourceServiceFactory;
+    private final FineractProperties fineractProperties;
+    private final ExternalEventRepository repository;
+    private final ExternalEventProducer eventProducer;
+    private final ExternalEventDataSerializer eventDataSerializer;
+    private final DatabaseSpecificSQLGenerator sqlGenerator;
+
+    @Override
+    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) {
+
+        try {
+            int readBatchSize = getBatchSize();
+            List<Long> externalEventIds = getTobeSentEventsBatch(readBatchSize);
+            sendEventAndUpdateEventSentStatus(externalEventIds);
+        } catch (Exception e) {
+            log.info(e.getMessage());
+        }
+        return RepeatStatus.FINISHED;
+    }
+
+    private void sendEventAndUpdateEventSentStatus(List<Long> externalEventIds) throws Exception {
+        for (Long eventId : externalEventIds) {
+            ExternalEvent event = repository.findById(eventId).get();
+            MessageV1 eventData = eventDataSerializer.serialize(event);
+            eventProducer.sendEvent(eventData);
+            event.setStatus(ExternalEventStatus.SENT);
+            event.setSentAt(DateUtils.getOffsetDateTimeOfTenant());
+            repository.save(event);
+
+        }
+    }
+
+    private List<Long> getTobeSentEventsBatch(int readBatchSize) {

Review Comment:
   No need to pass the batchsize as a param here, you could call it from within the method.



##########
fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsTasklet.java:
##########
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.fineract.infrastructure.event.external.jobs;
+
+import java.sql.Types;
+import java.util.List;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.fineract.avro.MessageV1;
+import org.apache.fineract.infrastructure.core.config.FineractProperties;
+import org.apache.fineract.infrastructure.core.service.DateUtils;
+import org.apache.fineract.infrastructure.core.service.RoutingDataSourceServiceFactory;
+import org.apache.fineract.infrastructure.core.service.database.DatabaseSpecificSQLGenerator;
+import org.apache.fineract.infrastructure.event.external.producer.ExternalEventProducer;
+import org.apache.fineract.infrastructure.event.external.repository.ExternalEventRepository;
+import org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEvent;
+import org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEventStatus;
+import org.apache.fineract.infrastructure.event.external.serializer.ExternalEventDataSerializer;
+import org.springframework.batch.core.StepContribution;
+import org.springframework.batch.core.scope.context.ChunkContext;
+import org.springframework.batch.core.step.tasklet.Tasklet;
+import org.springframework.batch.repeat.RepeatStatus;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.jdbc.core.RowMapper;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@RequiredArgsConstructor
+@Component
+public class SendAsynchronousEventsTasklet implements Tasklet {
+
+    private final RoutingDataSourceServiceFactory dataSourceServiceFactory;
+    private final FineractProperties fineractProperties;
+    private final ExternalEventRepository repository;
+    private final ExternalEventProducer eventProducer;
+    private final ExternalEventDataSerializer eventDataSerializer;
+    private final DatabaseSpecificSQLGenerator sqlGenerator;
+
+    @Override
+    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) {
+
+        try {
+            int readBatchSize = getBatchSize();
+            List<Long> externalEventIds = getTobeSentEventsBatch(readBatchSize);
+            sendEventAndUpdateEventSentStatus(externalEventIds);
+        } catch (Exception e) {
+            log.info(e.getMessage());
+        }
+        return RepeatStatus.FINISHED;
+    }
+
+    private void sendEventAndUpdateEventSentStatus(List<Long> externalEventIds) throws Exception {
+        for (Long eventId : externalEventIds) {
+            ExternalEvent event = repository.findById(eventId).get();
+            MessageV1 eventData = eventDataSerializer.serialize(event);
+            eventProducer.sendEvent(eventData);
+            event.setStatus(ExternalEventStatus.SENT);
+            event.setSentAt(DateUtils.getOffsetDateTimeOfTenant());
+            repository.save(event);
+
+        }
+    }
+
+    private List<Long> getTobeSentEventsBatch(int readBatchSize) {
+        final JdbcTemplate jdbcTemplate = new JdbcTemplate(this.dataSourceServiceFactory.determineDataSourceService().retrieveDataSource());
+        final StringBuilder readEventsForBatchSqlBuilder = new StringBuilder(500);
+        readEventsForBatchSqlBuilder.append("SELECT ee.id FROM m_external_event as ee WHERE ee.status = ? ORDER BY ee.id ");

Review Comment:
   I thought we agreed you're gonna use JPA entities directly here instead of the plain IDs.



##########
fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/serializer/ExternalEventDataSerializerImpl.java:
##########
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.fineract.infrastructure.event.external.serializer;
+
+import java.util.UUID;
+import lombok.RequiredArgsConstructor;
+import org.apache.fineract.avro.MessageV1;
+import org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEvent;
+import org.apache.fineract.infrastructure.event.external.service.message.MessageFactory;
+import org.apache.fineract.infrastructure.event.external.service.message.domain.MessageCategory;
+import org.apache.fineract.infrastructure.event.external.service.message.domain.MessageData;
+import org.apache.fineract.infrastructure.event.external.service.message.domain.MessageDataSchema;
+import org.apache.fineract.infrastructure.event.external.service.message.domain.MessageId;
+import org.apache.fineract.infrastructure.event.external.service.message.domain.MessageIdempotencyKey;
+import org.apache.fineract.infrastructure.event.external.service.message.domain.MessageSource;
+import org.apache.fineract.infrastructure.event.external.service.message.domain.MessageType;
+import org.apache.fineract.infrastructure.event.external.service.support.ByteBufferConverter;
+import org.springframework.stereotype.Component;
+
+@Component
+@RequiredArgsConstructor
+public class ExternalEventDataSerializerImpl implements ExternalEventDataSerializer {
+
+    private final ByteBufferConverter byteBufferConverter;
+    private final MessageFactory messageFactory;
+    private static final String sourceUUID = UUID.randomUUID().toString();

Review Comment:
   Let's not generate a new source for every single message.
   Rather generate one at startup (upon bean creation) and reuse that source.
   
   Also, make sure that the generated source is logged at least once upon application startup.



##########
fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsTasklet.java:
##########
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.fineract.infrastructure.event.external.jobs;
+
+import java.sql.Types;
+import java.util.List;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.fineract.avro.MessageV1;
+import org.apache.fineract.infrastructure.core.config.FineractProperties;
+import org.apache.fineract.infrastructure.core.service.DateUtils;
+import org.apache.fineract.infrastructure.core.service.RoutingDataSourceServiceFactory;
+import org.apache.fineract.infrastructure.core.service.database.DatabaseSpecificSQLGenerator;
+import org.apache.fineract.infrastructure.event.external.producer.ExternalEventProducer;
+import org.apache.fineract.infrastructure.event.external.repository.ExternalEventRepository;
+import org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEvent;
+import org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEventStatus;
+import org.apache.fineract.infrastructure.event.external.serializer.ExternalEventDataSerializer;
+import org.springframework.batch.core.StepContribution;
+import org.springframework.batch.core.scope.context.ChunkContext;
+import org.springframework.batch.core.step.tasklet.Tasklet;
+import org.springframework.batch.repeat.RepeatStatus;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.jdbc.core.RowMapper;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@RequiredArgsConstructor
+@Component
+public class SendAsynchronousEventsTasklet implements Tasklet {
+
+    private final RoutingDataSourceServiceFactory dataSourceServiceFactory;
+    private final FineractProperties fineractProperties;
+    private final ExternalEventRepository repository;
+    private final ExternalEventProducer eventProducer;
+    private final ExternalEventDataSerializer eventDataSerializer;
+    private final DatabaseSpecificSQLGenerator sqlGenerator;
+
+    @Override
+    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) {
+
+        try {
+            int readBatchSize = getBatchSize();
+            List<Long> externalEventIds = getTobeSentEventsBatch(readBatchSize);
+            sendEventAndUpdateEventSentStatus(externalEventIds);
+        } catch (Exception e) {
+            log.info(e.getMessage());
+        }
+        return RepeatStatus.FINISHED;
+    }
+
+    private void sendEventAndUpdateEventSentStatus(List<Long> externalEventIds) throws Exception {

Review Comment:
   Maybe rename the method to "processEvents"



##########
fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/ExternalEventProducer.java:
##########
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.fineract.infrastructure.event.external.producer;
+
+import org.apache.fineract.avro.MessageV1;
+
+public interface ExternalEventProducer {
+
+    void sendEvent(MessageV1 eventData) throws Exception;

Review Comment:
   param should be named as `message`



##########
fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/JobName.java:
##########
@@ -56,7 +56,7 @@ public enum JobName {
     INCREASE_BUSINESS_DATE_BY_1_DAY("Increase Business Date by 1 day"), //
     INCREASE_COB_DATE_BY_1_DAY("Increase COB Date by 1 day"), //
     LOAN_COB("Loan COB"), //
-    LOAN_DELINQUENCY_CLASSIFICATION("Loan Delinquency Classification");
+    LOAN_DELINQUENCY_CLASSIFICATION("Loan Delinquency Classification"), SEND_ASYNCHRONOUS_EVENTS("Send Asynchronous Events");

Review Comment:
   Please put `//` at the end of every enum value, otherwise the Spotless formatting will screw up the file.



##########
fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/ExternalEventProducer.java:
##########
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.fineract.infrastructure.event.external.producer;
+
+import org.apache.fineract.avro.MessageV1;
+
+public interface ExternalEventProducer {
+
+    void sendEvent(MessageV1 eventData) throws Exception;

Review Comment:
   Let's be more conscious about the exception we're throwing here. I'd say let's create a custom exception type like "AcknowledgementException" or something.



##########
fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/serializer/ExternalEventDataSerializerImpl.java:
##########
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.fineract.infrastructure.event.external.serializer;
+
+import java.util.UUID;
+import lombok.RequiredArgsConstructor;
+import org.apache.fineract.avro.MessageV1;
+import org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEvent;
+import org.apache.fineract.infrastructure.event.external.service.message.MessageFactory;
+import org.apache.fineract.infrastructure.event.external.service.message.domain.MessageCategory;
+import org.apache.fineract.infrastructure.event.external.service.message.domain.MessageData;
+import org.apache.fineract.infrastructure.event.external.service.message.domain.MessageDataSchema;
+import org.apache.fineract.infrastructure.event.external.service.message.domain.MessageId;
+import org.apache.fineract.infrastructure.event.external.service.message.domain.MessageIdempotencyKey;
+import org.apache.fineract.infrastructure.event.external.service.message.domain.MessageSource;
+import org.apache.fineract.infrastructure.event.external.service.message.domain.MessageType;
+import org.apache.fineract.infrastructure.event.external.service.support.ByteBufferConverter;
+import org.springframework.stereotype.Component;
+
+@Component
+@RequiredArgsConstructor
+public class ExternalEventDataSerializerImpl implements ExternalEventDataSerializer {
+
+    private final ByteBufferConverter byteBufferConverter;
+    private final MessageFactory messageFactory;
+    private static final String sourceUUID = UUID.randomUUID().toString();
+
+    @Override
+    public MessageV1 serialize(ExternalEvent event) {

Review Comment:
   How about moving this behavior to construct a MessageV1 from an ExternalEvent into the MessageFactory class? 
   
   I mean technically we're naming this method and class as a serializer but it doesn't serialize anything, just constructs the MessageV1 DTO.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@fineract.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org