You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by tm...@apache.org on 2022/02/18 10:06:35 UTC

[sling-org-apache-sling-distribution-journal] 01/01: Template towards a solution that covers both (a) ensuring events are raised, and (b) introduce a mode to deduplicate distributed events in a cluster

This is an automated email from the ASF dual-hosted git repository.

tmaret pushed a commit to branch ensure-event-raised-at-least-once-and-on-leader-only
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git

commit ed95a2535d7443e27d358140d294a8ad376a3d66
Author: tmaret <tm...@adobe.com>
AuthorDate: Fri Feb 18 11:06:12 2022 +0100

    Template towards a solution that covers both (a) ensuring events are raised, and (b) introduce a mode to deduplicate distributed events in a cluster
---
 pom.xml                                            |   5 +
 .../publisher/DistributedEventNotifierManager.java | 124 +++++++++++++++++++++
 .../impl/publisher/PackageDistributedNotifier.java |  32 +++---
 .../publisher/PackageDistributedNotifierTask.java  |  58 ++++++++++
 4 files changed, 202 insertions(+), 17 deletions(-)

diff --git a/pom.xml b/pom.xml
index 5f61d27..630b15a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -152,6 +152,11 @@
             <artifactId>org.apache.felix.webconsole</artifactId>
             <version>4.3.16</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.sling</groupId>
+            <artifactId>org.apache.sling.discovery.api</artifactId>
+            <version>1.0.4</version>
+        </dependency>
 
         <!-- OSGi -->
         <dependency>
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributedEventNotifierManager.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributedEventNotifierManager.java
new file mode 100644
index 0000000..ef5dd21
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributedEventNotifierManager.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.publisher;
+
+import java.util.Hashtable;
+
+import org.apache.sling.discovery.TopologyEvent;
+import org.apache.sling.discovery.TopologyEventListener;
+import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.impl.discovery.TopologyChangeHandler;
+import org.apache.sling.distribution.journal.queue.PubQueueProvider;
+import org.apache.sling.distribution.journal.shared.Topics;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.event.EventAdmin;
+import org.osgi.service.metatype.annotations.AttributeDefinition;
+import org.osgi.service.metatype.annotations.Designate;
+import org.osgi.service.metatype.annotations.ObjectClassDefinition;
+
+import static org.apache.sling.discovery.TopologyEvent.Type;
+import static org.apache.sling.discovery.TopologyEvent.Type.TOPOLOGY_CHANGED;
+import static org.apache.sling.discovery.TopologyEvent.Type.TOPOLOGY_CHANGING;
+import static org.apache.sling.discovery.TopologyEvent.Type.TOPOLOGY_INIT;
+
+@Component(immediate = true, service = TopologyEventListener.class)
+@Designate(ocd = DistributedEventNotifierManager.Configuration.class)
+public class DistributedEventNotifierManager implements TopologyEventListener {
+
+    /*
+     * Register the package distributed event notifier service
+     * on all or only the leader instance in a cluster according
+     * to the configuration.
+     */
+
+    @Reference
+    private EventAdmin eventAdmin;
+
+    @Reference
+    private PubQueueProvider pubQueueCacheService;
+
+    @Reference
+    private MessagingProvider messagingProvider;
+
+    @Reference
+    private Topics topics;
+
+    private ServiceRegistration<TopologyChangeHandler> reg;
+
+    private BundleContext context;
+
+    private Configuration config;
+
+    @Activate
+    public void activate(BundleContext context, Configuration config) {
+        this.context = context;
+        this.config = config;
+        if (! config.deduplicateEvent()) {
+            registerService();
+        }
+    }
+
+    public void deactivate() {
+        unregisterService();
+    }
+
+    @Override
+    public void handleTopologyEvent(TopologyEvent event) {
+        if (config.deduplicateEvent()) {
+            Type eventType = event.getType();
+            if (eventType == TOPOLOGY_INIT || eventType == TOPOLOGY_CHANGED) {
+                if (event.getNewView().getLocalInstance().isLeader()) {
+                    registerService();
+                } else {
+                    unregisterService();
+                }
+
+            } else if (eventType == TOPOLOGY_CHANGING) {
+                unregisterService();
+            }
+        }
+    }
+
+    private void registerService() {
+        if (reg == null) {
+            TopologyChangeHandler notifier = new PackageDistributedNotifier(eventAdmin, pubQueueCacheService, messagingProvider, topics);
+            reg = context.registerService(TopologyChangeHandler.class, notifier, new Hashtable<>());
+        }
+    }
+
+    private void unregisterService() {
+        if (reg != null) {
+            reg.unregister();
+        }
+    }
+
+    @ObjectClassDefinition(name = "Apache Sling Journal based Distribution - Package Distributed Event Notifier Configuration",
+            description = "Apache Sling Content Distribution Package Distributed Event Notifier Configuration")
+    public @interface Configuration {
+
+        @AttributeDefinition(name = "Deduplicate event",
+                description = "When true the distributed event will be sent only on one instance in the cluster. " +
+                        "When false the distributed event will be sent on all instances in the cluster. Default is false")
+        boolean deduplicateEvent() default false;
+    }
+}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java
index b3e54c3..47e79e8 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java
@@ -38,9 +38,6 @@ import org.apache.sling.distribution.journal.MessagingProvider;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
-import org.osgi.service.component.annotations.Activate;
-import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.Reference;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventAdmin;
 import org.slf4j.Logger;
@@ -49,34 +46,28 @@ import org.slf4j.LoggerFactory;
 import static org.apache.sling.distribution.packaging.DistributionPackageInfo.PROPERTY_REQUEST_DEEP_PATHS;
 import static org.apache.sling.distribution.packaging.DistributionPackageInfo.PROPERTY_REQUEST_PATHS;
 
-@Component(immediate = true)
 @ParametersAreNonnullByDefault
 public class PackageDistributedNotifier implements TopologyChangeHandler {
 
     private static final Logger LOG = LoggerFactory.getLogger(PackageDistributedNotifier.class);
 
-    @Reference
-    private EventAdmin eventAdmin;
+    private final EventAdmin eventAdmin;
 
-    @Reference
-    private PubQueueProvider pubQueueCacheService;
-
-    @Reference
-    private MessagingProvider messagingProvider;
-
-    @Reference
-    private Topics topics;
+    private final PubQueueProvider pubQueueCacheService;
 
     private Consumer<PackageDistributedMessage> sender;
 
-    private boolean sendMsg;
+    private final boolean sendMsg;
 
-    @Activate
-    public void activate() {
+    public PackageDistributedNotifier(EventAdmin eventAdmin, PubQueueProvider pubQueueProvider, MessagingProvider messagingProvider, Topics topics) {
+        this.eventAdmin = eventAdmin;
+        this.pubQueueCacheService = pubQueueProvider;
         sendMsg = StringUtils.isNotBlank(topics.getEventTopic());
         if (sendMsg) {
             sender = messagingProvider.createSender(topics.getEventTopic());
         }
+        // TODO load the last processed offset from the store
+        //      and set the lastDistributedOffset field with the value
         LOG.info("Started package distributed notifier with event message topic {}", topics.getEventTopic());
     }
 
@@ -99,6 +90,12 @@ public class PackageDistributedNotifier implements TopologyChangeHandler {
             .forEach(msg -> notifyDistributed(pubAgentName, msg));
     }
 
+    protected void storeLastDistributedOffset() {
+        // TODO load the last processed offset from the store
+        //      and compare with lastDistributedOffset in memory
+        //      store the new value if it has changed.
+    }
+
     protected void notifyDistributed(String pubAgentName, DistributionQueueItem queueItem) {
         LOG.debug("Sending distributed notifications for pubAgentName={}, pkgId={}", pubAgentName, queueItem.getPackageId());
         sendEvt(pubAgentName, queueItem);
@@ -130,6 +127,7 @@ public class PackageDistributedNotifier implements TopologyChangeHandler {
         try {
             Event distributed = DistributionEvent.eventPackageDistributed(queueItem, pubAgentName);
             eventAdmin.sendEvent(distributed);
+            // TODO update lastProcessedOffset field in memory
         } catch (Exception e) {
             LOG.warn("Exception when sending package distributed event for pubAgentName={}, pkgId={}", pubAgentName, queueItem.getPackageId(), e);
         }
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifierTask.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifierTask.java
new file mode 100644
index 0000000..39e1da2
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifierTask.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.publisher;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Reference;
+
+import static org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_CONCURRENT;
+import static org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_IMMEDIATE;
+import static org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_PERIOD;
+import static org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_RUN_ON;
+import static org.apache.sling.commons.scheduler.Scheduler.VALUE_RUN_ON_LEADER;
+
+@Component(
+        property = {
+                PROPERTY_SCHEDULER_CONCURRENT + ":Boolean=false",
+                PROPERTY_SCHEDULER_IMMEDIATE + ":Boolean=true",
+                PROPERTY_SCHEDULER_PERIOD + ":Long=" + 60, // 1 minute
+                PROPERTY_SCHEDULER_RUN_ON + "=" +  VALUE_RUN_ON_LEADER
+        })
+@ParametersAreNonnullByDefault
+public class PackageDistributedNotifierTask implements Runnable {
+
+    /*
+     * To avoid conflicting writes, only the leader instance
+     * persists the last distributed offset in the repository.
+     *
+     * The task runs in the at minute frequency to avoid
+     * overloading the author repository with a steady stream
+     * of fast commits (approximately 10 commit per second).
+     */
+
+    @Reference
+    private PackageDistributedNotifier notifier;
+
+    @Override
+    public void run() {
+        notifier.storeLastDistributedOffset();
+    }
+}