You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by tm...@apache.org on 2022/02/18 10:06:35 UTC
[sling-org-apache-sling-distribution-journal] 01/01: Template towards a solution that covers both (a) ensuring events are raised, and (b) introduce a mode to deduplicate distributed events in a cluster
This is an automated email from the ASF dual-hosted git repository.
tmaret pushed a commit to branch ensure-event-raised-at-least-once-and-on-leader-only
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
commit ed95a2535d7443e27d358140d294a8ad376a3d66
Author: tmaret <tm...@adobe.com>
AuthorDate: Fri Feb 18 11:06:12 2022 +0100
Template towards a solution that covers both (a) ensuring events are raised, and (b) introduce a mode to deduplicate distributed events in a cluster
---
pom.xml | 5 +
.../publisher/DistributedEventNotifierManager.java | 124 +++++++++++++++++++++
.../impl/publisher/PackageDistributedNotifier.java | 32 +++---
.../publisher/PackageDistributedNotifierTask.java | 58 ++++++++++
4 files changed, 202 insertions(+), 17 deletions(-)
diff --git a/pom.xml b/pom.xml
index 5f61d27..630b15a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -152,6 +152,11 @@
<artifactId>org.apache.felix.webconsole</artifactId>
<version>4.3.16</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.sling</groupId>
+ <artifactId>org.apache.sling.discovery.api</artifactId>
+ <version>1.0.4</version>
+ </dependency>
<!-- OSGi -->
<dependency>
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributedEventNotifierManager.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributedEventNotifierManager.java
new file mode 100644
index 0000000..ef5dd21
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributedEventNotifierManager.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.publisher;
+
+import java.util.Hashtable;
+
+import org.apache.sling.discovery.TopologyEvent;
+import org.apache.sling.discovery.TopologyEventListener;
+import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.impl.discovery.TopologyChangeHandler;
+import org.apache.sling.distribution.journal.queue.PubQueueProvider;
+import org.apache.sling.distribution.journal.shared.Topics;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.event.EventAdmin;
+import org.osgi.service.metatype.annotations.AttributeDefinition;
+import org.osgi.service.metatype.annotations.Designate;
+import org.osgi.service.metatype.annotations.ObjectClassDefinition;
+
+import static org.apache.sling.discovery.TopologyEvent.Type;
+import static org.apache.sling.discovery.TopologyEvent.Type.TOPOLOGY_CHANGED;
+import static org.apache.sling.discovery.TopologyEvent.Type.TOPOLOGY_CHANGING;
+import static org.apache.sling.discovery.TopologyEvent.Type.TOPOLOGY_INIT;
+
+@Component(immediate = true, service = TopologyEventListener.class)
+@Designate(ocd = DistributedEventNotifierManager.Configuration.class)
+public class DistributedEventNotifierManager implements TopologyEventListener {
+
+ /*
+ * Register the package distributed event notifier service
+ * on all or only the leader instance in a cluster according
+ * to the configuration.
+ */
+
+ @Reference
+ private EventAdmin eventAdmin;
+
+ @Reference
+ private PubQueueProvider pubQueueCacheService;
+
+ @Reference
+ private MessagingProvider messagingProvider;
+
+ @Reference
+ private Topics topics;
+
+ private ServiceRegistration<TopologyChangeHandler> reg;
+
+ private BundleContext context;
+
+ private Configuration config;
+
+ @Activate
+ public void activate(BundleContext context, Configuration config) {
+ this.context = context;
+ this.config = config;
+ if (! config.deduplicateEvent()) {
+ registerService();
+ }
+ }
+
+ public void deactivate() {
+ unregisterService();
+ }
+
+ @Override
+ public void handleTopologyEvent(TopologyEvent event) {
+ if (config.deduplicateEvent()) {
+ Type eventType = event.getType();
+ if (eventType == TOPOLOGY_INIT || eventType == TOPOLOGY_CHANGED) {
+ if (event.getNewView().getLocalInstance().isLeader()) {
+ registerService();
+ } else {
+ unregisterService();
+ }
+
+ } else if (eventType == TOPOLOGY_CHANGING) {
+ unregisterService();
+ }
+ }
+ }
+
+ private void registerService() {
+ if (reg == null) {
+ TopologyChangeHandler notifier = new PackageDistributedNotifier(eventAdmin, pubQueueCacheService, messagingProvider, topics);
+ reg = context.registerService(TopologyChangeHandler.class, notifier, new Hashtable<>());
+ }
+ }
+
+ private void unregisterService() {
+ if (reg != null) {
+ reg.unregister();
+ }
+ }
+
+ @ObjectClassDefinition(name = "Apache Sling Journal based Distribution - Package Distributed Event Notifier Configuration",
+ description = "Apache Sling Content Distribution Package Distributed Event Notifier Configuration")
+ public @interface Configuration {
+
+ @AttributeDefinition(name = "Deduplicate event",
+ description = "When true the distributed event will be sent only on one instance in the cluster. " +
+ "When false the distributed event will be sent on all instances in the cluster. Default is false")
+ boolean deduplicateEvent() default false;
+ }
+}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java
index b3e54c3..47e79e8 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java
@@ -38,9 +38,6 @@ import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.commons.lang3.StringUtils;
import org.apache.sling.distribution.queue.DistributionQueueItem;
-import org.osgi.service.component.annotations.Activate;
-import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.Reference;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
import org.slf4j.Logger;
@@ -49,34 +46,28 @@ import org.slf4j.LoggerFactory;
import static org.apache.sling.distribution.packaging.DistributionPackageInfo.PROPERTY_REQUEST_DEEP_PATHS;
import static org.apache.sling.distribution.packaging.DistributionPackageInfo.PROPERTY_REQUEST_PATHS;
-@Component(immediate = true)
@ParametersAreNonnullByDefault
public class PackageDistributedNotifier implements TopologyChangeHandler {
private static final Logger LOG = LoggerFactory.getLogger(PackageDistributedNotifier.class);
- @Reference
- private EventAdmin eventAdmin;
+ private final EventAdmin eventAdmin;
- @Reference
- private PubQueueProvider pubQueueCacheService;
-
- @Reference
- private MessagingProvider messagingProvider;
-
- @Reference
- private Topics topics;
+ private final PubQueueProvider pubQueueCacheService;
private Consumer<PackageDistributedMessage> sender;
- private boolean sendMsg;
+ private final boolean sendMsg;
- @Activate
- public void activate() {
+ public PackageDistributedNotifier(EventAdmin eventAdmin, PubQueueProvider pubQueueProvider, MessagingProvider messagingProvider, Topics topics) {
+ this.eventAdmin = eventAdmin;
+ this.pubQueueCacheService = pubQueueProvider;
sendMsg = StringUtils.isNotBlank(topics.getEventTopic());
if (sendMsg) {
sender = messagingProvider.createSender(topics.getEventTopic());
}
+ // TODO load the last processed offset from the store
+ // and set the lastDistributedOffset field with the value
LOG.info("Started package distributed notifier with event message topic {}", topics.getEventTopic());
}
@@ -99,6 +90,12 @@ public class PackageDistributedNotifier implements TopologyChangeHandler {
.forEach(msg -> notifyDistributed(pubAgentName, msg));
}
+ protected void storeLastDistributedOffset() {
+ // TODO load the last processed offset from the store
+ // and compare with lastDistributedOffset in memory
+ // store the new value if it has changed.
+ }
+
protected void notifyDistributed(String pubAgentName, DistributionQueueItem queueItem) {
LOG.debug("Sending distributed notifications for pubAgentName={}, pkgId={}", pubAgentName, queueItem.getPackageId());
sendEvt(pubAgentName, queueItem);
@@ -130,6 +127,7 @@ public class PackageDistributedNotifier implements TopologyChangeHandler {
try {
Event distributed = DistributionEvent.eventPackageDistributed(queueItem, pubAgentName);
eventAdmin.sendEvent(distributed);
+ // TODO update lastProcessedOffset field in memory
} catch (Exception e) {
LOG.warn("Exception when sending package distributed event for pubAgentName={}, pkgId={}", pubAgentName, queueItem.getPackageId(), e);
}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifierTask.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifierTask.java
new file mode 100644
index 0000000..39e1da2
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifierTask.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.publisher;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Reference;
+
+import static org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_CONCURRENT;
+import static org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_IMMEDIATE;
+import static org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_PERIOD;
+import static org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_RUN_ON;
+import static org.apache.sling.commons.scheduler.Scheduler.VALUE_RUN_ON_LEADER;
+
+@Component(
+ property = {
+ PROPERTY_SCHEDULER_CONCURRENT + ":Boolean=false",
+ PROPERTY_SCHEDULER_IMMEDIATE + ":Boolean=true",
+ PROPERTY_SCHEDULER_PERIOD + ":Long=" + 60, // 1 minute
+ PROPERTY_SCHEDULER_RUN_ON + "=" + VALUE_RUN_ON_LEADER
+ })
+@ParametersAreNonnullByDefault
+public class PackageDistributedNotifierTask implements Runnable {
+
+ /*
+ * To avoid conflicting writes, only the leader instance
+ * persists the last distributed offset in the repository.
+ *
+ * The task runs in the at minute frequency to avoid
+ * overloading the author repository with a steady stream
+ * of fast commits (approximately 10 commit per second).
+ */
+
+ @Reference
+ private PackageDistributedNotifier notifier;
+
+ @Override
+ public void run() {
+ notifier.storeLastDistributedOffset();
+ }
+}