You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@fineract.apache.org by GitBox <gi...@apache.org> on 2023/01/17 11:26:12 UTC

[GitHub] [fineract] galovics opened a new pull request, #2891: FINERACT-1724: Implemented support for multiple parallel JMS producers for external events

galovics opened a new pull request, #2891:
URL: https://github.com/apache/fineract/pull/2891

   ## Description
   
   Describe the changes made and why they were made.
   
   Ignore if these details are present on the associated [Apache Fineract JIRA ticket](https://github.com/apache/fineract/pull/1284).
   
   
   ## Checklist
   
   Please make sure these boxes are checked before submitting your pull request - thanks!
   
   - [ ] Write the commit message as per https://github.com/apache/fineract/#pull-requests
   
   - [ ] Acknowledge that we will not review PRs that are not passing the build _("green")_ - it is your responsibility to get a proposed PR to pass the build, not primarily the project's maintainers.
   
   - [ ] Create/update unit or integration tests for verifying the changes made.
   
   - [ ] Follow coding conventions at https://cwiki.apache.org/confluence/display/FINERACT/Coding+Conventions.
   
   - [ ] Add required Swagger annotation and update API documentation at fineract-provider/src/main/resources/static/legacy-docs/apiLive.htm with details of any API changes
   
   - [ ] Submission is not a "code dump".  (Large changes can be made "in repository" via a branch.  Ask on the developer mailing list for guidance, if required.)
   
   FYI our guidelines for code reviews are at https://cwiki.apache.org/confluence/display/FINERACT/Code+Review+Guide.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@fineract.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [fineract] adamsaghy commented on a diff in pull request #2891: FINERACT-1724: Implemented support for multiple parallel JMS producers for external events

Posted by GitBox <gi...@apache.org>.
adamsaghy commented on code in PR #2891:
URL: https://github.com/apache/fineract/pull/2891#discussion_r1072285436


##########
fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsTasklet.java:
##########
@@ -69,20 +79,66 @@ private boolean isDownstreamChannelEnabled() {
         return fineractProperties.getEvents().getExternal().getProducer().getJms().isEnabled();
     }
 
-    private List<ExternalEvent> getQueuedEventsBatch() {
+    private List<ExternalEventView> getQueuedEventsBatch() {
         int readBatchSize = getBatchSize();
         Pageable batchSize = PageRequest.ofSize(readBatchSize);
-        return repository.findByStatusOrderById(ExternalEventStatus.TO_BE_SENT, batchSize);
+        return measure(() -> repository.findByStatusOrderById(ExternalEventStatus.TO_BE_SENT, batchSize),

Review Comment:
   I guess for production we dont need this :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@fineract.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [fineract] galovics merged pull request #2891: FINERACT-1724: Implemented support for multiple parallel JMS producers for external events

Posted by GitBox <gi...@apache.org>.
galovics merged PR #2891:
URL: https://github.com/apache/fineract/pull/2891


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@fineract.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [fineract] galovics commented on a diff in pull request #2891: FINERACT-1724: Implemented support for multiple parallel JMS producers for external events

Posted by GitBox <gi...@apache.org>.
galovics commented on code in PR #2891:
URL: https://github.com/apache/fineract/pull/2891#discussion_r1080999034


##########
fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/config/ExternalEventJMSConfiguration.java:
##########
@@ -60,4 +61,13 @@ public ActiveMQTopic activeMqTopic() {
     public ActiveMQQueue activeMqQueue() {
         return new ActiveMQQueue(fineractProperties.getEvents().getExternal().getProducer().getJms().getEventQueueName());
     }
+
+    @Bean("externalEventJmsProducerExecutor")
+    public ThreadPoolTaskExecutor externalEventJmsProducerExecutor() {
+        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
+        threadPoolTaskExecutor.setCorePoolSize(10);

Review Comment:
   It's just the core pool size, the threadpool can grow until the max pool size is reached (100 threads) but that's already a value nobody will utilize in my opinion. Essentially it'd mean 100 parallel producers.
   
   Based on my initial measurement the sweet spot is around 30-40 ActiveMQ producers.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@fineract.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [fineract] adamsaghy commented on a diff in pull request #2891: FINERACT-1724: Implemented support for multiple parallel JMS producers for external events

Posted by GitBox <gi...@apache.org>.
adamsaghy commented on code in PR #2891:
URL: https://github.com/apache/fineract/pull/2891#discussion_r1080989821


##########
fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/config/ExternalEventJMSConfiguration.java:
##########
@@ -60,4 +61,13 @@ public ActiveMQTopic activeMqTopic() {
     public ActiveMQQueue activeMqQueue() {
         return new ActiveMQQueue(fineractProperties.getEvents().getExternal().getProducer().getJms().getEventQueueName());
     }
+
+    @Bean("externalEventJmsProducerExecutor")
+    public ThreadPoolTaskExecutor externalEventJmsProducerExecutor() {
+        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
+        threadPoolTaskExecutor.setCorePoolSize(10);

Review Comment:
   Would it make sense to move these into application.properties?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@fineract.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [fineract] adamsaghy commented on a diff in pull request #2891: FINERACT-1724: Implemented support for multiple parallel JMS producers for external events

Posted by GitBox <gi...@apache.org>.
adamsaghy commented on code in PR #2891:
URL: https://github.com/apache/fineract/pull/2891#discussion_r1072301485


##########
fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/repository/ExternalEventRepository.java:
##########
@@ -19,19 +19,25 @@
 package org.apache.fineract.infrastructure.event.external.repository;
 
 import java.time.LocalDate;
+import java.time.OffsetDateTime;
 import java.util.List;
 import org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEvent;
 import org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEventStatus;
+import org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEventView;
 import org.springframework.data.domain.Pageable;
 import org.springframework.data.jpa.repository.JpaRepository;
 import org.springframework.data.jpa.repository.Modifying;
 import org.springframework.data.jpa.repository.Query;
 
 public interface ExternalEventRepository extends JpaRepository<ExternalEvent, Long> {
 
-    List<ExternalEvent> findByStatusOrderById(ExternalEventStatus status, Pageable batchSize);
+    List<ExternalEventView> findByStatusOrderById(ExternalEventStatus status, Pageable batchSize);
 
     @Modifying(flushAutomatically = true)
     @Query("delete from ExternalEvent e where e.status = :status and e.businessDate <= :dateForPurgeCriteria")
     void deleteOlderEventsWithSentStatus(ExternalEventStatus status, LocalDate dateForPurgeCriteria);
+
+    @Modifying

Review Comment:
   autoflush?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@fineract.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [fineract] ruchiD commented on a diff in pull request #2891: FINERACT-1724: Implemented support for multiple parallel JMS producers for external events

Posted by GitBox <gi...@apache.org>.
ruchiD commented on code in PR #2891:
URL: https://github.com/apache/fineract/pull/2891#discussion_r1073148174


##########
fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/jms/JMSMultiExternalEventProducer.java:
##########
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.fineract.infrastructure.event.external.producer.jms;
+
+import static org.apache.fineract.infrastructure.core.service.MeasuringUtil.measure;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Future;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.fineract.infrastructure.core.config.FineractProperties;
+import org.apache.fineract.infrastructure.core.messaging.jms.MessageFactory;
+import org.apache.fineract.infrastructure.core.service.HashingService;
+import org.apache.fineract.infrastructure.event.external.exception.AcknowledgementTimeoutException;
+import org.apache.fineract.infrastructure.event.external.producer.ExternalEventProducer;
+import org.springframework.beans.factory.InitializingBean;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.core.task.AsyncTaskExecutor;
+import org.springframework.stereotype.Component;
+
+@Component
+@Slf4j
+@RequiredArgsConstructor
+@ConditionalOnProperty(value = "fineract.events.external.producer.jms.enabled", havingValue = "true")
+public class JMSMultiExternalEventProducer implements ExternalEventProducer, InitializingBean {
+
+    @Qualifier("eventDestination")
+    private final Destination destination;
+
+    private final ActiveMQConnectionFactory connectionFactory;

Review Comment:
   Can we use JMS ConnectionFactory here instead of ActiveMQConnectionFactory? ActiveMQConnectionFactory is getting created in ExternalEventJMSConfiguration by default, we can use JMS abstraction here. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@fineract.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [fineract] galovics commented on a diff in pull request #2891: FINERACT-1724: Implemented support for multiple parallel JMS producers for external events

Posted by GitBox <gi...@apache.org>.
galovics commented on code in PR #2891:
URL: https://github.com/apache/fineract/pull/2891#discussion_r1073255186


##########
fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsTasklet.java:
##########
@@ -69,20 +79,66 @@ private boolean isDownstreamChannelEnabled() {
         return fineractProperties.getEvents().getExternal().getProducer().getJms().isEnabled();
     }
 
-    private List<ExternalEvent> getQueuedEventsBatch() {
+    private List<ExternalEventView> getQueuedEventsBatch() {
         int readBatchSize = getBatchSize();
         Pageable batchSize = PageRequest.ofSize(readBatchSize);
-        return repository.findByStatusOrderById(ExternalEventStatus.TO_BE_SENT, batchSize);
+        return measure(() -> repository.findByStatusOrderById(ExternalEventStatus.TO_BE_SENT, batchSize),
+                (events, timeTaken) -> log.debug("Loaded {} events in {}ms", events.size(), timeTaken.toMillis()));
+    }
+
+    private void sendEvents(List<ExternalEventView> queuedEvents) {
+        Map<Long, List<byte[]>> partitions = generatePartitions(queuedEvents);
+        List<Long> eventIds = queuedEvents.stream().map(ExternalEventView::getId).toList();
+        sendEventsToProducer(partitions);
+        markEventsAsSent(eventIds);
     }
 
-    private void processEvents(List<ExternalEvent> queuedEvents) throws IOException {
-        for (ExternalEvent event : queuedEvents) {
-            MessageV1 message = messageFactory.createMessage(event);
-            byte[] byteMessage = byteBufferConverter.convert(message.toByteBuffer());
-            eventProducer.sendEvent(byteMessage);
-            event.setStatus(ExternalEventStatus.SENT);
-            event.setSentAt(DateUtils.getOffsetDateTimeOfTenant());
-            repository.save(event);
+    private void sendEventsToProducer(Map<Long, List<byte[]>> partitions) {
+        eventProducer.sendEvents(partitions);
+    }
+
+    private void markEventsAsSent(List<Long> eventIds) {
+        OffsetDateTime sentAt = DateUtils.getOffsetDateTimeOfTenant();
+
+        // Partitioning dataset to avoid exception: PreparedStatement can have at most 65,535 parameters
+        List<List<Long>> partitions = Lists.partition(eventIds, 5_000);
+        partitions.forEach(partitionedEventIds -> {
+            measure(() -> {
+                repository.markEventsSent(partitionedEventIds, sentAt);
+            }, timeTaken -> {
+                log.debug("Took {}ms to update {} events", timeTaken.toMillis(), partitionedEventIds.size());
+            });
+        });
+    }
+
+    private Map<Long, List<byte[]>> generatePartitions(List<ExternalEventView> queuedEvents) {
+        Map<Long, List<ExternalEventView>> initialPartitions = queuedEvents.stream().collect(groupingBy(externalEvent -> {
+            Long aggregateRootId = externalEvent.getAggregateRootId();

Review Comment:
   I don't think we need the type. The aggregate root ID is only used for picking which producer to use when sending the messages to ensure the right ordering for a single aggregate root. Even if multiple aggregate types have the same aggregate root ID, they'll be sent by the same producer, but that's all.
   Does that explain it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@fineract.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [fineract] ruchiD commented on a diff in pull request #2891: FINERACT-1724: Implemented support for multiple parallel JMS producers for external events

Posted by GitBox <gi...@apache.org>.
ruchiD commented on code in PR #2891:
URL: https://github.com/apache/fineract/pull/2891#discussion_r1073275255


##########
fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsTasklet.java:
##########
@@ -69,20 +79,66 @@ private boolean isDownstreamChannelEnabled() {
         return fineractProperties.getEvents().getExternal().getProducer().getJms().isEnabled();
     }
 
-    private List<ExternalEvent> getQueuedEventsBatch() {
+    private List<ExternalEventView> getQueuedEventsBatch() {
         int readBatchSize = getBatchSize();
         Pageable batchSize = PageRequest.ofSize(readBatchSize);
-        return repository.findByStatusOrderById(ExternalEventStatus.TO_BE_SENT, batchSize);
+        return measure(() -> repository.findByStatusOrderById(ExternalEventStatus.TO_BE_SENT, batchSize),
+                (events, timeTaken) -> log.debug("Loaded {} events in {}ms", events.size(), timeTaken.toMillis()));
+    }
+
+    private void sendEvents(List<ExternalEventView> queuedEvents) {
+        Map<Long, List<byte[]>> partitions = generatePartitions(queuedEvents);
+        List<Long> eventIds = queuedEvents.stream().map(ExternalEventView::getId).toList();
+        sendEventsToProducer(partitions);
+        markEventsAsSent(eventIds);
     }
 
-    private void processEvents(List<ExternalEvent> queuedEvents) throws IOException {
-        for (ExternalEvent event : queuedEvents) {
-            MessageV1 message = messageFactory.createMessage(event);
-            byte[] byteMessage = byteBufferConverter.convert(message.toByteBuffer());
-            eventProducer.sendEvent(byteMessage);
-            event.setStatus(ExternalEventStatus.SENT);
-            event.setSentAt(DateUtils.getOffsetDateTimeOfTenant());
-            repository.save(event);
+    private void sendEventsToProducer(Map<Long, List<byte[]>> partitions) {
+        eventProducer.sendEvents(partitions);
+    }
+
+    private void markEventsAsSent(List<Long> eventIds) {
+        OffsetDateTime sentAt = DateUtils.getOffsetDateTimeOfTenant();
+
+        // Partitioning dataset to avoid exception: PreparedStatement can have at most 65,535 parameters
+        List<List<Long>> partitions = Lists.partition(eventIds, 5_000);
+        partitions.forEach(partitionedEventIds -> {
+            measure(() -> {
+                repository.markEventsSent(partitionedEventIds, sentAt);
+            }, timeTaken -> {
+                log.debug("Took {}ms to update {} events", timeTaken.toMillis(), partitionedEventIds.size());
+            });
+        });
+    }
+
+    private Map<Long, List<byte[]>> generatePartitions(List<ExternalEventView> queuedEvents) {
+        Map<Long, List<ExternalEventView>> initialPartitions = queuedEvents.stream().collect(groupingBy(externalEvent -> {
+            Long aggregateRootId = externalEvent.getAggregateRootId();

Review Comment:
   Yes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@fineract.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [fineract] adamsaghy commented on a diff in pull request #2891: FINERACT-1724: Implemented support for multiple parallel JMS producers for external events

Posted by GitBox <gi...@apache.org>.
adamsaghy commented on code in PR #2891:
URL: https://github.com/apache/fineract/pull/2891#discussion_r1072297910


##########
fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/jms/JMSMultiExternalEventProducer.java:
##########
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.fineract.infrastructure.event.external.producer.jms;
+
+import static org.apache.fineract.infrastructure.core.service.MeasuringUtil.measure;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Future;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.fineract.infrastructure.core.config.FineractProperties;
+import org.apache.fineract.infrastructure.core.messaging.jms.MessageFactory;
+import org.apache.fineract.infrastructure.core.service.HashingService;
+import org.apache.fineract.infrastructure.event.external.exception.AcknowledgementTimeoutException;
+import org.apache.fineract.infrastructure.event.external.producer.ExternalEventProducer;
+import org.springframework.beans.factory.InitializingBean;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.core.task.AsyncTaskExecutor;
+import org.springframework.stereotype.Component;
+
+@Component
+@Slf4j
+@RequiredArgsConstructor
+@ConditionalOnProperty(value = "fineract.events.external.producer.jms.enabled", havingValue = "true")
+public class JMSMultiExternalEventProducer implements ExternalEventProducer, InitializingBean {
+
+    @Qualifier("eventDestination")
+    private final Destination destination;
+
+    private final ActiveMQConnectionFactory connectionFactory;

Review Comment:
   We will support only the ActiveMQ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@fineract.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [fineract] adamsaghy commented on a diff in pull request #2891: FINERACT-1724: Implemented support for multiple parallel JMS producers for external events

Posted by GitBox <gi...@apache.org>.
adamsaghy commented on code in PR #2891:
URL: https://github.com/apache/fineract/pull/2891#discussion_r1072301485


##########
fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/repository/ExternalEventRepository.java:
##########
@@ -19,19 +19,25 @@
 package org.apache.fineract.infrastructure.event.external.repository;
 
 import java.time.LocalDate;
+import java.time.OffsetDateTime;
 import java.util.List;
 import org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEvent;
 import org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEventStatus;
+import org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEventView;
 import org.springframework.data.domain.Pageable;
 import org.springframework.data.jpa.repository.JpaRepository;
 import org.springframework.data.jpa.repository.Modifying;
 import org.springframework.data.jpa.repository.Query;
 
 public interface ExternalEventRepository extends JpaRepository<ExternalEvent, Long> {
 
-    List<ExternalEvent> findByStatusOrderById(ExternalEventStatus status, Pageable batchSize);
+    List<ExternalEventView> findByStatusOrderById(ExternalEventStatus status, Pageable batchSize);
 
     @Modifying(flushAutomatically = true)
     @Query("delete from ExternalEvent e where e.status = :status and e.businessDate <= :dateForPurgeCriteria")
     void deleteOlderEventsWithSentStatus(ExternalEventStatus status, LocalDate dateForPurgeCriteria);
+
+    @Modifying

Review Comment:
   Would autoflush make sense here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@fineract.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [fineract] ruchiD commented on a diff in pull request #2891: FINERACT-1724: Implemented support for multiple parallel JMS producers for external events

Posted by GitBox <gi...@apache.org>.
ruchiD commented on code in PR #2891:
URL: https://github.com/apache/fineract/pull/2891#discussion_r1073184573


##########
fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsTasklet.java:
##########
@@ -69,20 +79,66 @@ private boolean isDownstreamChannelEnabled() {
         return fineractProperties.getEvents().getExternal().getProducer().getJms().isEnabled();
     }
 
-    private List<ExternalEvent> getQueuedEventsBatch() {
+    private List<ExternalEventView> getQueuedEventsBatch() {
         int readBatchSize = getBatchSize();
         Pageable batchSize = PageRequest.ofSize(readBatchSize);
-        return repository.findByStatusOrderById(ExternalEventStatus.TO_BE_SENT, batchSize);
+        return measure(() -> repository.findByStatusOrderById(ExternalEventStatus.TO_BE_SENT, batchSize),
+                (events, timeTaken) -> log.debug("Loaded {} events in {}ms", events.size(), timeTaken.toMillis()));
+    }
+
+    private void sendEvents(List<ExternalEventView> queuedEvents) {
+        Map<Long, List<byte[]>> partitions = generatePartitions(queuedEvents);
+        List<Long> eventIds = queuedEvents.stream().map(ExternalEventView::getId).toList();
+        sendEventsToProducer(partitions);
+        markEventsAsSent(eventIds);
     }
 
-    private void processEvents(List<ExternalEvent> queuedEvents) throws IOException {
-        for (ExternalEvent event : queuedEvents) {
-            MessageV1 message = messageFactory.createMessage(event);
-            byte[] byteMessage = byteBufferConverter.convert(message.toByteBuffer());
-            eventProducer.sendEvent(byteMessage);
-            event.setStatus(ExternalEventStatus.SENT);
-            event.setSentAt(DateUtils.getOffsetDateTimeOfTenant());
-            repository.save(event);
+    private void sendEventsToProducer(Map<Long, List<byte[]>> partitions) {
+        eventProducer.sendEvents(partitions);
+    }
+
+    private void markEventsAsSent(List<Long> eventIds) {
+        OffsetDateTime sentAt = DateUtils.getOffsetDateTimeOfTenant();
+
+        // Partitioning dataset to avoid exception: PreparedStatement can have at most 65,535 parameters
+        List<List<Long>> partitions = Lists.partition(eventIds, 5_000);
+        partitions.forEach(partitionedEventIds -> {
+            measure(() -> {
+                repository.markEventsSent(partitionedEventIds, sentAt);
+            }, timeTaken -> {
+                log.debug("Took {}ms to update {} events", timeTaken.toMillis(), partitionedEventIds.size());
+            });
+        });
+    }
+
+    private Map<Long, List<byte[]>> generatePartitions(List<ExternalEventView> queuedEvents) {
+        Map<Long, List<ExternalEventView>> initialPartitions = queuedEvents.stream().collect(groupingBy(externalEvent -> {
+            Long aggregateRootId = externalEvent.getAggregateRootId();

Review Comment:
   Should we consider type of aggregate also like (client,loan,savings etc)? Will there be a case that different type of aggregates have same aggregate id?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@fineract.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [fineract] galovics commented on a diff in pull request #2891: FINERACT-1724: Implemented support for multiple parallel JMS producers for external events

Posted by GitBox <gi...@apache.org>.
galovics commented on code in PR #2891:
URL: https://github.com/apache/fineract/pull/2891#discussion_r1072366927


##########
fineract-provider/src/main/java/org/apache/fineract/cob/loan/LoanAccountsStayedLockedBusinessEvent.java:
##########
@@ -39,4 +39,9 @@ public String getType() {
     public String getCategory() {
         return CATEGORY;
     }
+
+    @Override

Review Comment:
   In case a single aggregate root cannot be identified, for example the LoanAccountsStayedLockedBusinessEvent. Multiple accounts stayed locked so there's no singular event hence it doesn't matter which producer picks up the message to be sent.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@fineract.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [fineract] adamsaghy commented on a diff in pull request #2891: FINERACT-1724: Implemented support for multiple parallel JMS producers for external events

Posted by GitBox <gi...@apache.org>.
adamsaghy commented on code in PR #2891:
URL: https://github.com/apache/fineract/pull/2891#discussion_r1072168430


##########
fineract-provider/src/main/java/org/apache/fineract/cob/loan/LoanAccountsStayedLockedBusinessEvent.java:
##########
@@ -39,4 +39,9 @@ public String getType() {
     public String getCategory() {
         return CATEGORY;
     }
+
+    @Override

Review Comment:
   Null?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@fineract.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [fineract] galovics commented on a diff in pull request #2891: FINERACT-1724: Implemented support for multiple parallel JMS producers for external events

Posted by GitBox <gi...@apache.org>.
galovics commented on code in PR #2891:
URL: https://github.com/apache/fineract/pull/2891#discussion_r1073253912


##########
fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/repository/ExternalEventRepository.java:
##########
@@ -19,19 +19,25 @@
 package org.apache.fineract.infrastructure.event.external.repository;
 
 import java.time.LocalDate;
+import java.time.OffsetDateTime;
 import java.util.List;
 import org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEvent;
 import org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEventStatus;
+import org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEventView;
 import org.springframework.data.domain.Pageable;
 import org.springframework.data.jpa.repository.JpaRepository;
 import org.springframework.data.jpa.repository.Modifying;
 import org.springframework.data.jpa.repository.Query;
 
 public interface ExternalEventRepository extends JpaRepository<ExternalEvent, Long> {
 
-    List<ExternalEvent> findByStatusOrderById(ExternalEventStatus status, Pageable batchSize);
+    List<ExternalEventView> findByStatusOrderById(ExternalEventStatus status, Pageable batchSize);
 
     @Modifying(flushAutomatically = true)
     @Query("delete from ExternalEvent e where e.status = :status and e.businessDate <= :dateForPurgeCriteria")
     void deleteOlderEventsWithSentStatus(ExternalEventStatus status, LocalDate dateForPurgeCriteria);
+
+    @Modifying

Review Comment:
   Nope.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@fineract.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [fineract] galovics commented on a diff in pull request #2891: FINERACT-1724: Implemented support for multiple parallel JMS producers for external events

Posted by GitBox <gi...@apache.org>.
galovics commented on code in PR #2891:
URL: https://github.com/apache/fineract/pull/2891#discussion_r1072367835


##########
fineract-provider/src/main/java/org/apache/fineract/infrastructure/core/messaging/jms/ActiveMQMessageFactory.java:
##########
@@ -16,21 +16,20 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.fineract.infrastructure.event.external.config;
+package org.apache.fineract.infrastructure.core.messaging.jms;
 
-import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.integration.channel.DirectChannel;
-import org.springframework.integration.config.EnableIntegration;
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.springframework.stereotype.Component;
 
-@Configuration
-@EnableIntegration
-@ConditionalOnProperty(value = "fineract.events.external.enabled", havingValue = "true")
-public class ExternalEventProducerConfiguration {
+@Component

Review Comment:
   Good point and I intentionally left it like this. If RabbitMQ or other messaging solutions come into play, I wanted to defer the "frameworking" part of this.



##########
fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/jms/JMSMultiExternalEventProducer.java:
##########
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.fineract.infrastructure.event.external.producer.jms;
+
+import static org.apache.fineract.infrastructure.core.service.MeasuringUtil.measure;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Future;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.fineract.infrastructure.core.config.FineractProperties;
+import org.apache.fineract.infrastructure.core.messaging.jms.MessageFactory;
+import org.apache.fineract.infrastructure.core.service.HashingService;
+import org.apache.fineract.infrastructure.event.external.exception.AcknowledgementTimeoutException;
+import org.apache.fineract.infrastructure.event.external.producer.ExternalEventProducer;
+import org.springframework.beans.factory.InitializingBean;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.core.task.AsyncTaskExecutor;
+import org.springframework.stereotype.Component;
+
+@Component
+@Slf4j
+@RequiredArgsConstructor
+@ConditionalOnProperty(value = "fineract.events.external.producer.jms.enabled", havingValue = "true")
+public class JMSMultiExternalEventProducer implements ExternalEventProducer, InitializingBean {
+
+    @Qualifier("eventDestination")
+    private final Destination destination;
+
+    private final ActiveMQConnectionFactory connectionFactory;

Review Comment:
   Same as above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@fineract.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [fineract] galovics commented on a diff in pull request #2891: FINERACT-1724: Implemented support for multiple parallel JMS producers for external events

Posted by GitBox <gi...@apache.org>.
galovics commented on code in PR #2891:
URL: https://github.com/apache/fineract/pull/2891#discussion_r1072370757


##########
fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/repository/ExternalEventRepository.java:
##########
@@ -19,19 +19,25 @@
 package org.apache.fineract.infrastructure.event.external.repository;
 
 import java.time.LocalDate;
+import java.time.OffsetDateTime;
 import java.util.List;
 import org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEvent;
 import org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEventStatus;
+import org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEventView;
 import org.springframework.data.domain.Pageable;
 import org.springframework.data.jpa.repository.JpaRepository;
 import org.springframework.data.jpa.repository.Modifying;
 import org.springframework.data.jpa.repository.Query;
 
 public interface ExternalEventRepository extends JpaRepository<ExternalEvent, Long> {
 
-    List<ExternalEvent> findByStatusOrderById(ExternalEventStatus status, Pageable batchSize);
+    List<ExternalEventView> findByStatusOrderById(ExternalEventStatus status, Pageable batchSize);
 
     @Modifying(flushAutomatically = true)
     @Query("delete from ExternalEvent e where e.status = :status and e.businessDate <= :dateForPurgeCriteria")
     void deleteOlderEventsWithSentStatus(ExternalEventStatus status, LocalDate dateForPurgeCriteria);
+
+    @Modifying

Review Comment:
   I don't see the reason for it, do you?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@fineract.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [fineract] adamsaghy commented on a diff in pull request #2891: FINERACT-1724: Implemented support for multiple parallel JMS producers for external events

Posted by GitBox <gi...@apache.org>.
adamsaghy commented on code in PR #2891:
URL: https://github.com/apache/fineract/pull/2891#discussion_r1072171238


##########
fineract-provider/src/main/java/org/apache/fineract/infrastructure/core/messaging/jms/ActiveMQMessageFactory.java:
##########
@@ -16,21 +16,20 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.fineract.infrastructure.event.external.config;
+package org.apache.fineract.infrastructure.core.messaging.jms;
 
-import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.integration.channel.DirectChannel;
-import org.springframework.integration.config.EnableIntegration;
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.springframework.stereotype.Component;
 
-@Configuration
-@EnableIntegration
-@ConditionalOnProperty(value = "fineract.events.external.enabled", havingValue = "true")
-public class ExternalEventProducerConfiguration {
+@Component

Review Comment:
   Dont we need condition whether we wanna have ActiveMQMessageFactory?
   Or will it be the de facto implementation and others might override it only?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@fineract.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [fineract] galovics commented on a diff in pull request #2891: FINERACT-1724: Implemented support for multiple parallel JMS producers for external events

Posted by GitBox <gi...@apache.org>.
galovics commented on code in PR #2891:
URL: https://github.com/apache/fineract/pull/2891#discussion_r1073338862


##########
fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/jms/JMSMultiExternalEventProducer.java:
##########
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.fineract.infrastructure.event.external.producer.jms;
+
+import static org.apache.fineract.infrastructure.core.service.MeasuringUtil.measure;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Future;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.fineract.infrastructure.core.config.FineractProperties;
+import org.apache.fineract.infrastructure.core.messaging.jms.MessageFactory;
+import org.apache.fineract.infrastructure.core.service.HashingService;
+import org.apache.fineract.infrastructure.event.external.exception.AcknowledgementTimeoutException;
+import org.apache.fineract.infrastructure.event.external.producer.ExternalEventProducer;
+import org.springframework.beans.factory.InitializingBean;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.core.task.AsyncTaskExecutor;
+import org.springframework.stereotype.Component;
+
+@Component
+@Slf4j
+@RequiredArgsConstructor
+@ConditionalOnProperty(value = "fineract.events.external.producer.jms.enabled", havingValue = "true")
+public class JMSMultiExternalEventProducer implements ExternalEventProducer, InitializingBean {
+
+    @Qualifier("eventDestination")
+    private final Destination destination;
+
+    private final ActiveMQConnectionFactory connectionFactory;

Review Comment:
   That's a good point @ruchiD , I'll change it for sure



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@fineract.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [fineract] adamsaghy commented on a diff in pull request #2891: FINERACT-1724: Implemented support for multiple parallel JMS producers for external events

Posted by GitBox <gi...@apache.org>.
adamsaghy commented on code in PR #2891:
URL: https://github.com/apache/fineract/pull/2891#discussion_r1072383085


##########
fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/repository/ExternalEventRepository.java:
##########
@@ -19,19 +19,25 @@
 package org.apache.fineract.infrastructure.event.external.repository;
 
 import java.time.LocalDate;
+import java.time.OffsetDateTime;
 import java.util.List;
 import org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEvent;
 import org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEventStatus;
+import org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEventView;
 import org.springframework.data.domain.Pageable;
 import org.springframework.data.jpa.repository.JpaRepository;
 import org.springframework.data.jpa.repository.Modifying;
 import org.springframework.data.jpa.repository.Query;
 
 public interface ExternalEventRepository extends JpaRepository<ExternalEvent, Long> {
 
-    List<ExternalEvent> findByStatusOrderById(ExternalEventStatus status, Pageable batchSize);
+    List<ExternalEventView> findByStatusOrderById(ExternalEventStatus status, Pageable batchSize);
 
     @Modifying(flushAutomatically = true)
     @Query("delete from ExternalEvent e where e.status = :status and e.businessDate <= :dateForPurgeCriteria")
     void deleteOlderEventsWithSentStatus(ExternalEventStatus status, LocalDate dateForPurgeCriteria);
+
+    @Modifying

Review Comment:
   Can it happen to fetch the "not yet sent" events again from the DB before this async event sending logic got executed for all of the events?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@fineract.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [fineract] adamsaghy commented on a diff in pull request #2891: FINERACT-1724: Implemented support for multiple parallel JMS producers for external events

Posted by GitBox <gi...@apache.org>.
adamsaghy commented on code in PR #2891:
URL: https://github.com/apache/fineract/pull/2891#discussion_r1072285436


##########
fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsTasklet.java:
##########
@@ -69,20 +79,66 @@ private boolean isDownstreamChannelEnabled() {
         return fineractProperties.getEvents().getExternal().getProducer().getJms().isEnabled();
     }
 
-    private List<ExternalEvent> getQueuedEventsBatch() {
+    private List<ExternalEventView> getQueuedEventsBatch() {
         int readBatchSize = getBatchSize();
         Pageable batchSize = PageRequest.ofSize(readBatchSize);
-        return repository.findByStatusOrderById(ExternalEventStatus.TO_BE_SENT, batchSize);
+        return measure(() -> repository.findByStatusOrderById(ExternalEventStatus.TO_BE_SENT, batchSize),

Review Comment:
   I guess for production we dont need this :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@fineract.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [fineract] ruchiD commented on a diff in pull request #2891: FINERACT-1724: Implemented support for multiple parallel JMS producers for external events

Posted by GitBox <gi...@apache.org>.
ruchiD commented on code in PR #2891:
URL: https://github.com/apache/fineract/pull/2891#discussion_r1073275255


##########
fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsTasklet.java:
##########
@@ -69,20 +79,66 @@ private boolean isDownstreamChannelEnabled() {
         return fineractProperties.getEvents().getExternal().getProducer().getJms().isEnabled();
     }
 
-    private List<ExternalEvent> getQueuedEventsBatch() {
+    private List<ExternalEventView> getQueuedEventsBatch() {
         int readBatchSize = getBatchSize();
         Pageable batchSize = PageRequest.ofSize(readBatchSize);
-        return repository.findByStatusOrderById(ExternalEventStatus.TO_BE_SENT, batchSize);
+        return measure(() -> repository.findByStatusOrderById(ExternalEventStatus.TO_BE_SENT, batchSize),
+                (events, timeTaken) -> log.debug("Loaded {} events in {}ms", events.size(), timeTaken.toMillis()));
+    }
+
+    private void sendEvents(List<ExternalEventView> queuedEvents) {
+        Map<Long, List<byte[]>> partitions = generatePartitions(queuedEvents);
+        List<Long> eventIds = queuedEvents.stream().map(ExternalEventView::getId).toList();
+        sendEventsToProducer(partitions);
+        markEventsAsSent(eventIds);
     }
 
-    private void processEvents(List<ExternalEvent> queuedEvents) throws IOException {
-        for (ExternalEvent event : queuedEvents) {
-            MessageV1 message = messageFactory.createMessage(event);
-            byte[] byteMessage = byteBufferConverter.convert(message.toByteBuffer());
-            eventProducer.sendEvent(byteMessage);
-            event.setStatus(ExternalEventStatus.SENT);
-            event.setSentAt(DateUtils.getOffsetDateTimeOfTenant());
-            repository.save(event);
+    private void sendEventsToProducer(Map<Long, List<byte[]>> partitions) {
+        eventProducer.sendEvents(partitions);
+    }
+
+    private void markEventsAsSent(List<Long> eventIds) {
+        OffsetDateTime sentAt = DateUtils.getOffsetDateTimeOfTenant();
+
+        // Partitioning dataset to avoid exception: PreparedStatement can have at most 65,535 parameters
+        List<List<Long>> partitions = Lists.partition(eventIds, 5_000);
+        partitions.forEach(partitionedEventIds -> {
+            measure(() -> {
+                repository.markEventsSent(partitionedEventIds, sentAt);
+            }, timeTaken -> {
+                log.debug("Took {}ms to update {} events", timeTaken.toMillis(), partitionedEventIds.size());
+            });
+        });
+    }
+
+    private Map<Long, List<byte[]>> generatePartitions(List<ExternalEventView> queuedEvents) {
+        Map<Long, List<ExternalEventView>> initialPartitions = queuedEvents.stream().collect(groupingBy(externalEvent -> {
+            Long aggregateRootId = externalEvent.getAggregateRootId();

Review Comment:
   Yes, makes sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@fineract.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org