You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by tm...@apache.org on 2022/02/18 10:06:34 UTC

[sling-org-apache-sling-distribution-journal] branch ensure-event-raised-at-least-once-and-on-leader-only created (now ed95a25)

This is an automated email from the ASF dual-hosted git repository.

tmaret pushed a change to branch ensure-event-raised-at-least-once-and-on-leader-only
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git.


      at ed95a25  Template towards a solution that covers both (a) ensuring events are raised, and (b) introduce a mode to deduplicate distributed events in a cluster

This branch includes the following new commits:

     new ed95a25  Template towards a solution that covers both (a) ensuring events are raised, and (b) introduce a mode to deduplicate distributed events in a cluster

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[sling-org-apache-sling-distribution-journal] 01/01: Template towards a solution that covers both (a) ensuring events are raised, and (b) introduce a mode to deduplicate distributed events in a cluster

Posted by tm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tmaret pushed a commit to branch ensure-event-raised-at-least-once-and-on-leader-only
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git

commit ed95a2535d7443e27d358140d294a8ad376a3d66
Author: tmaret <tm...@adobe.com>
AuthorDate: Fri Feb 18 11:06:12 2022 +0100

    Template towards a solution that covers both (a) ensuring events are raised, and (b) introduce a mode to deduplicate distributed events in a cluster
---
 pom.xml                                            |   5 +
 .../publisher/DistributedEventNotifierManager.java | 124 +++++++++++++++++++++
 .../impl/publisher/PackageDistributedNotifier.java |  32 +++---
 .../publisher/PackageDistributedNotifierTask.java  |  58 ++++++++++
 4 files changed, 202 insertions(+), 17 deletions(-)

diff --git a/pom.xml b/pom.xml
index 5f61d27..630b15a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -152,6 +152,11 @@
             <artifactId>org.apache.felix.webconsole</artifactId>
             <version>4.3.16</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.sling</groupId>
+            <artifactId>org.apache.sling.discovery.api</artifactId>
+            <version>1.0.4</version>
+        </dependency>
 
         <!-- OSGi -->
         <dependency>
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributedEventNotifierManager.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributedEventNotifierManager.java
new file mode 100644
index 0000000..ef5dd21
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributedEventNotifierManager.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.publisher;
+
+import java.util.Hashtable;
+
+import org.apache.sling.discovery.TopologyEvent;
+import org.apache.sling.discovery.TopologyEventListener;
+import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.impl.discovery.TopologyChangeHandler;
+import org.apache.sling.distribution.journal.queue.PubQueueProvider;
+import org.apache.sling.distribution.journal.shared.Topics;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.event.EventAdmin;
+import org.osgi.service.metatype.annotations.AttributeDefinition;
+import org.osgi.service.metatype.annotations.Designate;
+import org.osgi.service.metatype.annotations.ObjectClassDefinition;
+
+import static org.apache.sling.discovery.TopologyEvent.Type;
+import static org.apache.sling.discovery.TopologyEvent.Type.TOPOLOGY_CHANGED;
+import static org.apache.sling.discovery.TopologyEvent.Type.TOPOLOGY_CHANGING;
+import static org.apache.sling.discovery.TopologyEvent.Type.TOPOLOGY_INIT;
+
+@Component(immediate = true, service = TopologyEventListener.class)
+@Designate(ocd = DistributedEventNotifierManager.Configuration.class)
+public class DistributedEventNotifierManager implements TopologyEventListener {
+
+    /*
+     * Register the package distributed event notifier service
+     * on all or only the leader instance in a cluster according
+     * to the configuration.
+     */
+
+    @Reference
+    private EventAdmin eventAdmin;
+
+    @Reference
+    private PubQueueProvider pubQueueCacheService;
+
+    @Reference
+    private MessagingProvider messagingProvider;
+
+    @Reference
+    private Topics topics;
+
+    private ServiceRegistration<TopologyChangeHandler> reg;
+
+    private BundleContext context;
+
+    private Configuration config;
+
+    @Activate
+    public void activate(BundleContext context, Configuration config) {
+        this.context = context;
+        this.config = config;
+        if (! config.deduplicateEvent()) {
+            registerService();
+        }
+    }
+
+    public void deactivate() {
+        unregisterService();
+    }
+
+    @Override
+    public void handleTopologyEvent(TopologyEvent event) {
+        if (config.deduplicateEvent()) {
+            Type eventType = event.getType();
+            if (eventType == TOPOLOGY_INIT || eventType == TOPOLOGY_CHANGED) {
+                if (event.getNewView().getLocalInstance().isLeader()) {
+                    registerService();
+                } else {
+                    unregisterService();
+                }
+
+            } else if (eventType == TOPOLOGY_CHANGING) {
+                unregisterService();
+            }
+        }
+    }
+
+    private void registerService() {
+        if (reg == null) {
+            TopologyChangeHandler notifier = new PackageDistributedNotifier(eventAdmin, pubQueueCacheService, messagingProvider, topics);
+            reg = context.registerService(TopologyChangeHandler.class, notifier, new Hashtable<>());
+        }
+    }
+
+    private void unregisterService() {
+        if (reg != null) {
+            reg.unregister();
+        }
+    }
+
+    @ObjectClassDefinition(name = "Apache Sling Journal based Distribution - Package Distributed Event Notifier Configuration",
+            description = "Apache Sling Content Distribution Package Distributed Event Notifier Configuration")
+    public @interface Configuration {
+
+        @AttributeDefinition(name = "Deduplicate event",
+                description = "When true the distributed event will be sent only on one instance in the cluster. " +
+                        "When false the distributed event will be sent on all instances in the cluster. Default is false")
+        boolean deduplicateEvent() default false;
+    }
+}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java
index b3e54c3..47e79e8 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java
@@ -38,9 +38,6 @@ import org.apache.sling.distribution.journal.MessagingProvider;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
-import org.osgi.service.component.annotations.Activate;
-import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.Reference;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventAdmin;
 import org.slf4j.Logger;
@@ -49,34 +46,28 @@ import org.slf4j.LoggerFactory;
 import static org.apache.sling.distribution.packaging.DistributionPackageInfo.PROPERTY_REQUEST_DEEP_PATHS;
 import static org.apache.sling.distribution.packaging.DistributionPackageInfo.PROPERTY_REQUEST_PATHS;
 
-@Component(immediate = true)
 @ParametersAreNonnullByDefault
 public class PackageDistributedNotifier implements TopologyChangeHandler {
 
     private static final Logger LOG = LoggerFactory.getLogger(PackageDistributedNotifier.class);
 
-    @Reference
-    private EventAdmin eventAdmin;
+    private final EventAdmin eventAdmin;
 
-    @Reference
-    private PubQueueProvider pubQueueCacheService;
-
-    @Reference
-    private MessagingProvider messagingProvider;
-
-    @Reference
-    private Topics topics;
+    private final PubQueueProvider pubQueueCacheService;
 
     private Consumer<PackageDistributedMessage> sender;
 
-    private boolean sendMsg;
+    private final boolean sendMsg;
 
-    @Activate
-    public void activate() {
+    public PackageDistributedNotifier(EventAdmin eventAdmin, PubQueueProvider pubQueueProvider, MessagingProvider messagingProvider, Topics topics) {
+        this.eventAdmin = eventAdmin;
+        this.pubQueueCacheService = pubQueueProvider;
         sendMsg = StringUtils.isNotBlank(topics.getEventTopic());
         if (sendMsg) {
             sender = messagingProvider.createSender(topics.getEventTopic());
         }
+        // TODO load the last processed offset from the store
+        //      and set the lastDistributedOffset field with the value
         LOG.info("Started package distributed notifier with event message topic {}", topics.getEventTopic());
     }
 
@@ -99,6 +90,12 @@ public class PackageDistributedNotifier implements TopologyChangeHandler {
             .forEach(msg -> notifyDistributed(pubAgentName, msg));
     }
 
+    protected void storeLastDistributedOffset() {
+        // TODO load the last processed offset from the store
+        //      and compare with lastDistributedOffset in memory
+        //      store the new value if it has changed.
+    }
+
     protected void notifyDistributed(String pubAgentName, DistributionQueueItem queueItem) {
         LOG.debug("Sending distributed notifications for pubAgentName={}, pkgId={}", pubAgentName, queueItem.getPackageId());
         sendEvt(pubAgentName, queueItem);
@@ -130,6 +127,7 @@ public class PackageDistributedNotifier implements TopologyChangeHandler {
         try {
             Event distributed = DistributionEvent.eventPackageDistributed(queueItem, pubAgentName);
             eventAdmin.sendEvent(distributed);
+            // TODO update lastProcessedOffset field in memory
         } catch (Exception e) {
             LOG.warn("Exception when sending package distributed event for pubAgentName={}, pkgId={}", pubAgentName, queueItem.getPackageId(), e);
         }
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifierTask.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifierTask.java
new file mode 100644
index 0000000..39e1da2
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifierTask.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.publisher;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Reference;
+
+import static org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_CONCURRENT;
+import static org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_IMMEDIATE;
+import static org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_PERIOD;
+import static org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_RUN_ON;
+import static org.apache.sling.commons.scheduler.Scheduler.VALUE_RUN_ON_LEADER;
+
+@Component(
+        property = {
+                PROPERTY_SCHEDULER_CONCURRENT + ":Boolean=false",
+                PROPERTY_SCHEDULER_IMMEDIATE + ":Boolean=true",
+                PROPERTY_SCHEDULER_PERIOD + ":Long=" + 60, // 1 minute
+                PROPERTY_SCHEDULER_RUN_ON + "=" +  VALUE_RUN_ON_LEADER
+        })
+@ParametersAreNonnullByDefault
+public class PackageDistributedNotifierTask implements Runnable {
+
+    /*
+     * To avoid conflicting writes, only the leader instance
+     * persists the last distributed offset in the repository.
+     *
+     * The task runs in the at minute frequency to avoid
+     * overloading the author repository with a steady stream
+     * of fast commits (approximately 10 commit per second).
+     */
+
+    @Reference
+    private PackageDistributedNotifier notifier;
+
+    @Override
+    public void run() {
+        notifier.storeLastDistributedOffset();
+    }
+}