You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cs...@apache.org on 2019/11/26 16:22:22 UTC

[sling-org-apache-sling-distribution-journal] branch master updated (16eda73 -> 45dfea3)

This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git.


    from 16eda73  SLING-8859 - Fix double JournalAvailable service
     new 1c4cbc9  SLING-8861 - Longer delay in error case
     new 45dfea3  SLING-8862 - Extract JournalAvailableServiceMarker

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../journal/impl/queue/impl/PubQueueCache.java     | 17 +++++---
 .../impl/shared/JournalAvailableChecker.java       | 31 +++++----------
 .../shared/JournalAvailableServiceMarker.java}     | 45 ++++++++++------------
 3 files changed, 41 insertions(+), 52 deletions(-)
 copy src/{test/java/org/apache/sling/distribution/journal/impl/shared/TestMessageInfo.java => main/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableServiceMarker.java} (53%)


[sling-org-apache-sling-distribution-journal] 02/02: SLING-8862 - Extract JournalAvailableServiceMarker

Posted by cs...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git

commit 45dfea39c842b64e5aeac9d623c8d0acd15a638e
Author: Christian Schneider <cs...@adobe.com>
AuthorDate: Tue Nov 26 17:12:27 2019 +0100

    SLING-8862 - Extract JournalAvailableServiceMarker
---
 .../impl/shared/JournalAvailableChecker.java       | 31 ++++----------
 .../impl/shared/JournalAvailableServiceMarker.java | 50 ++++++++++++++++++++++
 2 files changed, 59 insertions(+), 22 deletions(-)

diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableChecker.java b/src/main/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableChecker.java
index 5a401c3..18c7f5c 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableChecker.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableChecker.java
@@ -27,11 +27,9 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.sling.distribution.journal.ExceptionEventSender;
-import org.apache.sling.distribution.journal.JournalAvailable;
 import org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService.GaugeService;
 import org.osgi.framework.BundleContext;
-import org.osgi.framework.ServiceRegistration;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.Deactivate;
@@ -47,7 +45,7 @@ import org.slf4j.LoggerFactory;
         service = EventHandler.class,
         property = EventConstants.EVENT_TOPIC + "=" + ExceptionEventSender.ERROR_TOPIC
 )
-public class JournalAvailableChecker implements JournalAvailable, EventHandler {
+public class JournalAvailableChecker implements EventHandler {
     
     private static final Duration INITIAL_RETRY_DELAY = Duration.of(1, SECONDS);
     private static final Duration MAX_RETRY_DELAY = Duration.of(5, MINUTES);
@@ -70,9 +68,7 @@ public class JournalAvailableChecker implements JournalAvailable, EventHandler {
     @Reference
     DistributionMetricsService metrics;
     
-    private BundleContext context;
-
-    private volatile ServiceRegistration<JournalAvailable> reg;
+    private JournalAvailableServiceMarker marker;
 
     private GaugeService<Boolean> gauge;
 
@@ -85,7 +81,7 @@ public class JournalAvailableChecker implements JournalAvailable, EventHandler {
     public void activate(BundleContext context) {
         requireNonNull(provider);
         requireNonNull(topics);
-        this.context = context;
+        this.marker = new JournalAvailableServiceMarker(context);
         this.backoffRetry.startChecks();
         this.gauge = metrics.createGauge(DistributionMetricsService.BASE_COMPONENT + ".journal_available", "", this::isAvailable);
         LOG.info("Started Journal availability checker service");
@@ -94,7 +90,7 @@ public class JournalAvailableChecker implements JournalAvailable, EventHandler {
     @Deactivate
     public void deactivate() {
         gauge.close();
-        unRegister();
+        this.marker.unRegister();
         IOUtils.closeQuietly(this.backoffRetry);
         LOG.info("Stopped Journal availability checker service");
     }
@@ -108,11 +104,9 @@ public class JournalAvailableChecker implements JournalAvailable, EventHandler {
 
     private void available() {
         LOG.info("Journal is available");
-        if (this.reg == null) {
-            this.reg = context.registerService(JournalAvailable.class, this, null);
-        }
+        this.marker.register();
     }
-    
+
     private void stillUnAvailable(Exception e) {
         String msg = "Journal is still unavailable: " + e.getMessage();
         if (LOG.isDebugEnabled()) {
@@ -120,11 +114,11 @@ public class JournalAvailableChecker implements JournalAvailable, EventHandler {
         } else {
             LOG.warn(msg);
         }
-        unRegister();
+        this.marker.unRegister();
     }
     
     public boolean isAvailable() {
-        return reg != null;
+        return this.marker.isRegistered();
     }
 
     public void run() {
@@ -138,20 +132,13 @@ public class JournalAvailableChecker implements JournalAvailable, EventHandler {
         }
     }
 
-    private void unRegister() {
-        if (this.reg != null) {
-            this.reg.unregister();
-            this.reg = null;
-        }
-    }
-
     @Override
     public synchronized void handleEvent(Event event) {
         String type = (String) event.getProperty(ExceptionEventSender.KEY_TYPE);
         int curNumErrors = this.numErrors.incrementAndGet();
         if (curNumErrors >= MIN_ERRORS) {
             LOG.warn("Received exception event {}. Journal is considered unavailable.", type);
-            unRegister();
+            this.marker.unRegister();
             this.numErrors.set(0);
             this.backoffRetry.startChecks(); 
         } else {
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableServiceMarker.java b/src/main/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableServiceMarker.java
new file mode 100644
index 0000000..c6ac778
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableServiceMarker.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.shared;
+
+import org.apache.sling.distribution.journal.JournalAvailable;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceRegistration;
+
+class JournalAvailableServiceMarker implements JournalAvailable {
+
+    private BundleContext context;
+    private ServiceRegistration<JournalAvailable> reg;
+    
+    JournalAvailableServiceMarker(BundleContext context) {
+        this.context = context;
+    }
+    
+    synchronized void register() {
+        if (this.reg == null) {
+            this.reg = context.registerService(JournalAvailable.class, this, null);
+        }
+    }
+
+    synchronized void unRegister() {
+        if (this.reg != null) {
+            this.reg.unregister();
+            this.reg = null;
+        }
+    }
+
+    synchronized boolean isRegistered() {
+        return this.reg != null;
+    }
+}


[sling-org-apache-sling-distribution-journal] 01/02: SLING-8861 - Longer delay in error case

Posted by cs...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git

commit 1c4cbc9ba060d834194529efe2f8a0532a006333
Author: Christian Schneider <cs...@adobe.com>
AuthorDate: Tue Nov 26 17:08:13 2019 +0100

    SLING-8861 - Longer delay in error case
---
 .../journal/impl/queue/impl/PubQueueCache.java          | 17 +++++++++++------
 1 file changed, 11 insertions(+), 6 deletions(-)

diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
index 5128dc6..438c765 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
@@ -85,6 +85,7 @@ public class PubQueueCache {
      * Interval in millisecond between two seeding messages.
      */
     private static final long SEEDING_DELAY_MS = 1000;
+    private static final long SEEDING_ERROR_DELAY_MS = 10000;
 
     /**
      * Blocks the threads awaiting until the agentQueues
@@ -126,7 +127,6 @@ public class PubQueueCache {
 
     private final Thread seeder;
 
-
     public PubQueueCache(MessagingProvider messagingProvider, EventAdmin eventAdmin, DistributionMetricsService distributionMetricsService, String topic) {
         this.messagingProvider = messagingProvider;
         this.eventAdmin = eventAdmin;
@@ -166,18 +166,23 @@ public class PubQueueCache {
             LOG.debug("Send seeding message");
             try {
                 sender.send(topic, pkgMsg);
+                sleep(SEEDING_DELAY_MS);
             } catch (MessagingException e) {
                 LOG.warn(e.getMessage(), e);
-            }
-            try {
-                Thread.sleep(SEEDING_DELAY_MS);
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
+                sleep(SEEDING_ERROR_DELAY_MS);
             }
         }
         LOG.info("Stop message seeder");
     }
 
+    private void sleep(long sleepMs) {
+        try {
+            Thread.sleep(sleepMs);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
+    }
+
     private PackageMessage createTestMessage() {
         String pkgId = UUID.randomUUID().toString();
         return PackageMessage.newBuilder()