You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by GitBox <gi...@apache.org> on 2022/02/13 22:19:25 UTC

[GitHub] [sling-org-apache-sling-distribution-journal] tmaret commented on a change in pull request #97: SLING-10583 - Ensure o/a/s/d/a/p/distributed events are raised at least once

tmaret commented on a change in pull request #97:
URL: https://github.com/apache/sling-org-apache-sling-distribution-journal/pull/97#discussion_r805421130



##########
File path: src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java
##########
@@ -46,15 +50,20 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.sling.distribution.journal.impl.subscriber.DistributionSubscriber.escapeTopicName;
 import static org.apache.sling.distribution.packaging.DistributionPackageInfo.PROPERTY_REQUEST_DEEP_PATHS;
 import static org.apache.sling.distribution.packaging.DistributionPackageInfo.PROPERTY_REQUEST_PATHS;
 
 @Component(immediate = true)
 @ParametersAreNonnullByDefault
 public class PackageDistributedNotifier implements TopologyChangeHandler {
 
+    public static final String STORE_TYPE_OFFSETS = "lastRaisedEventOffset";
+
     private static final Logger LOG = LoggerFactory.getLogger(PackageDistributedNotifier.class);
 
+    private final Map<String, LocalStore> map = new HashMap<>();

Review comment:
       It's a detail, we may rename that field `localStores` to capture what the map contains.

##########
File path: src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java
##########
@@ -130,6 +150,9 @@ private void sendEvt(String pubAgentName, DistributionQueueItem queueItem) {
         try {
             Event distributed = DistributionEvent.eventPackageDistributed(queueItem, pubAgentName);
             eventAdmin.sendEvent(distributed);
+            LocalStore localStore = map.get(pubAgentName);
+            localStore.store(STORE_TYPE_OFFSETS, queueItem.get(QueueItemFactory.RECORD_OFFSET));

Review comment:
       Agreed. To reduce the load on the repository without compromising processing guarantees, we should store the `lastRaisedEventOffset` either (a) on a periodical basis if it has changed, or (b) after a given number of consecutive changes.

##########
File path: src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java
##########
@@ -130,6 +150,9 @@ private void sendEvt(String pubAgentName, DistributionQueueItem queueItem) {
         try {
             Event distributed = DistributionEvent.eventPackageDistributed(queueItem, pubAgentName);
             eventAdmin.sendEvent(distributed);
+            LocalStore localStore = map.get(pubAgentName);
+            localStore.store(STORE_TYPE_OFFSETS, queueItem.get(QueueItemFactory.RECORD_OFFSET));
+            map.put(pubAgentName, localStore);

Review comment:
       Putting the entry back into the map is not necessary since the code obtains a reference to the entry and update the entry directly.

##########
File path: src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java
##########
@@ -91,6 +103,14 @@ public void changed(TopologyViewDiff diffView) {
      */
     private void processOffsets(String pubAgentName, Supplier<LongStream> offsets) {
         long minOffset = offsets.get().findFirst().getAsLong();
+        if (!map.containsKey(pubAgentName)) {

Review comment:
       The `containsKey` and `put` methods could be replaced with a single invocation to the `computeIfAbsent` method which would simplify the code.
   
   ```
   LocalStore store = map.computeIfAbsent(pubAgentName, this::newLocalStore);
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@sling.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org