You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by tm...@apache.org on 2022/10/03 19:29:02 UTC

[sling-org-apache-sling-distribution-journal] branch master updated: SLING-11607 - Add metric that tracks the time a package spends in the distribution journal (#112)

This is an automated email from the ASF dual-hosted git repository.

tmaret pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git


The following commit(s) were added to refs/heads/master by this push:
     new f1c03a3  SLING-11607 - Add metric that tracks the time a package spends in the distribution journal (#112)
f1c03a3 is described below

commit f1c03a3f8b48d6def025ba3a328da55e2f0fa777
Author: José Correia <37...@users.noreply.github.com>
AuthorDate: Mon Oct 3 21:28:57 2022 +0200

    SLING-11607 - Add metric that tracks the time a package spends in the distribution journal (#112)
    
    * Add timer that captures the duration in ms that a package spends in the distribution journal
    * Update tests
    
    Co-authored-by: josec <jo...@adobe.com>
---
 .../impl/subscriber/DistributionSubscriber.java    | 52 +++++++++++++---------
 .../journal/shared/DistributionMetricsService.java | 16 ++++++-
 .../journal/impl/subscriber/SubscriberTest.java    |  2 +
 .../shared/DistributionMetricsServiceTest.java     |  1 +
 4 files changed, 47 insertions(+), 24 deletions(-)

diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index 5c4acfd..a7e172f 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -19,6 +19,7 @@
 package org.apache.sling.distribution.journal.impl.subscriber;
 
 import static java.lang.String.format;
+import static java.lang.System.currentTimeMillis;
 import static java.util.Objects.requireNonNull;
 import static java.util.concurrent.TimeUnit.MINUTES;
 import static java.util.concurrent.TimeUnit.SECONDS;
@@ -127,14 +128,14 @@ public class DistributionSubscriber {
 
     @Reference
     private SubscriberReadyStore subscriberReadyStore;
-    
-    private volatile Closeable idleReadyCheck; //NOSONAR
-    
-    private volatile IdleCheck idleCheck; //NOSONAR
+
+    private volatile Closeable idleReadyCheck; // NOSONAR
+
+    private volatile IdleCheck idleCheck; // NOSONAR
 
     private Closeable packagePoller;
 
-    private volatile CommandPoller commandPoller; //NOSONAR
+    private volatile CommandPoller commandPoller; // NOSONAR
 
     private BookKeeper bookKeeper;
 
@@ -178,13 +179,13 @@ public class DistributionSubscriber {
         if (config.subscriberIdleCheck()) {
             // Unofficial config (currently just for test)
             AtomicBoolean readyHolder = subscriberReadyStore.getReadyHolder(subAgentName);
-            
+
             idleCheck = new SubscriberIdle(idleMillies, SubscriberIdle.DEFAULT_FORCE_IDLE_MILLIS, readyHolder);
             idleReadyCheck = new SubscriberIdleCheck(context, idleCheck);
         } else {
             idleCheck = new NoopIdle();
         }
-        
+
         queueNames = getNotEmpty(config.agentNames());
         pkgType = requireNonNull(packageBuilder.getType());
 
@@ -192,33 +193,35 @@ public class DistributionSubscriber {
         Consumer<LogMessage> logSender = messagingProvider.createSender(topics.getDiscoveryTopic());
 
         String packageNodeName = escapeTopicName(messagingProvider.getServerUri(), topics.getPackageTopic());
-        BookKeeperConfig bkConfig = new BookKeeperConfig(subAgentName, subSlingId, config.editable(), config.maxRetries(), config.packageHandling(), packageNodeName);
+        BookKeeperConfig bkConfig = new BookKeeperConfig(subAgentName, subSlingId, config.editable(),
+                config.maxRetries(), config.packageHandling(), packageNodeName);
         bookKeeper = bookKeeperFactory.create(packageBuilder, bkConfig, statusSender, logSender);
-        
+
         long startOffset = bookKeeper.loadOffset() + 1;
         String assign = startOffset > 0 ? messagingProvider.assignTo(startOffset) : null;
 
         packagePoller = messagingProvider.createPoller(topics.getPackageTopic(), Reset.latest, assign,
                 HandlerAdapter.create(PackageMessage.class, this::handlePackageMessage));
 
-
         queueThread = startBackgroundThread(this::processQueue,
                 format("Queue Processor for Subscriber agent %s", subAgentName));
 
         int announceDelay = PropertiesUtil.toInteger(properties.get("announceDelay"), 10000);
-        announcer = new Announcer(subSlingId, subAgentName, queueNames, messagingProvider.createSender(topics.getDiscoveryTopic()), bookKeeper,
+        announcer = new Announcer(subSlingId, subAgentName, queueNames,
+                messagingProvider.createSender(topics.getDiscoveryTopic()), bookKeeper,
                 config.maxRetries(), config.editable(), announceDelay);
 
-        LOG.info("Started Subscriber agent {} at offset {}, subscribed to agent names {}", subAgentName, startOffset, queueNames);
+        LOG.info("Started Subscriber agent {} at offset {}, subscribed to agent names {}", subAgentName, startOffset,
+                queueNames);
     }
-    
+
     public static String escapeTopicName(URI messagingUri, String topicName) {
-        return String.format("%s%s_%s", 
+        return String.format("%s%s_%s",
                 messagingUri.getHost(),
                 escape(messagingUri.getPath()),
                 escape(topicName));
     }
-    
+
     private static String escape(String st) {
         return Text.escapeIllegalJcrChars(st.replace("/", "_"));
     }
@@ -234,10 +237,11 @@ public class DistributionSubscriber {
          * Note that we don't interrupt blocking calls using Thread.interrupt()
          * because interrupts can stop the Apache Oak repository.
          *
-         * See SLING-9340, OAK-2609 and https://jackrabbit.apache.org/oak/docs/dos_and_donts.html
+         * See SLING-9340, OAK-2609 and
+         * https://jackrabbit.apache.org/oak/docs/dos_and_donts.html
          */
 
-        IOUtils.closeQuietly(announcer, bookKeeper, 
+        IOUtils.closeQuietly(announcer, bookKeeper,
                 packagePoller, idleReadyCheck, idleCheck, commandPoller);
         running = false;
         try {
@@ -249,7 +253,7 @@ public class DistributionSubscriber {
         LOG.info("Stopped Subscriber agent {}, subscribed to Publisher agent names {} with package builder {}",
                 subAgentName, queueNames, pkgType);
     }
-    
+
     public DistributionAgentState getState() {
         boolean isBlocked = bookKeeper.getPackageRetries().getSum() > 0;
         if (isBlocked) {
@@ -260,6 +264,8 @@ public class DistributionSubscriber {
 
     private void handlePackageMessage(MessageInfo info, PackageMessage message) {
         if (shouldEnqueue(info, message)) {
+            distributionMetricsService.getPackageJournalDistributionDuration()
+                    .update((currentTimeMillis() - info.getCreateTime()), TimeUnit.MILLISECONDS);
             enqueue(new FullMessage<>(info, message));
         } else {
             try {
@@ -322,7 +328,8 @@ public class DistributionSubscriber {
         LOG.info("Stopped Queue processor");
     }
 
-    private void fetchAndProcessQueueItem() throws InterruptedException, IOException, LoginException, DistributionException, ImportPostProcessException {
+    private void fetchAndProcessQueueItem() throws InterruptedException, IOException, LoginException,
+            DistributionException, ImportPostProcessException {
         blockingSendStoredStatus();
         FullMessage<PackageMessage> item = blockingPeekQueueItem();
         try (Timer.Context context = distributionMetricsService.getProcessQueueItemDuration().time()) {
@@ -361,7 +368,8 @@ public class DistributionSubscriber {
         throw new InterruptedException("Shutting down");
     }
 
-    private void processQueueItem(FullMessage<PackageMessage> item) throws PersistenceException, LoginException, DistributionException, ImportPostProcessException {
+    private void processQueueItem(FullMessage<PackageMessage> item)
+            throws PersistenceException, LoginException, DistributionException, ImportPostProcessException {
         MessageInfo info = item.getInfo();
         PackageMessage pkgMsg = item.getMessage();
         boolean skip = shouldSkip(info.getOffset());
@@ -392,7 +400,6 @@ public class DistributionSubscriber {
         return waitPrecondition(offset) == Decision.SKIP;
     }
 
-
     private Decision waitPrecondition(long offset) {
         long endTime = System.currentTimeMillis() + PRECONDITION_TIMEOUT;
         while (System.currentTimeMillis() < endTime && running) {
@@ -403,7 +410,8 @@ public class DistributionSubscriber {
                 return decision;
             }
         }
-        throw new PreConditionTimeoutException("Timeout waiting for distribution package at offset=" + offset + " on status topic");
+        throw new PreConditionTimeoutException(
+                "Timeout waiting for distribution package at offset=" + offset + " on status topic");
     }
 
 }
diff --git a/src/main/java/org/apache/sling/distribution/journal/shared/DistributionMetricsService.java b/src/main/java/org/apache/sling/distribution/journal/shared/DistributionMetricsService.java
index 00b6a9b..febdaf0 100644
--- a/src/main/java/org/apache/sling/distribution/journal/shared/DistributionMetricsService.java
+++ b/src/main/java/org/apache/sling/distribution/journal/shared/DistributionMetricsService.java
@@ -83,6 +83,8 @@ public class DistributionMetricsService {
 
     private Timer packageDistributedDuration;
 
+    private Timer packageJournalDistributionDuration;
+
     private Timer buildPackageDuration;
 
     private Timer enqueuePackageDuration;
@@ -120,7 +122,6 @@ public class DistributionMetricsService {
         buildPackageDuration = getTimer(getMetricName(PUB_COMPONENT, "build_package_duration"));
         enqueuePackageDuration = getTimer(getMetricName(PUB_COMPONENT, "enqueue_package_duration"));
         queueCacheFetchCount = getCounter(getMetricName(PUB_COMPONENT, "queue_cache_fetch_count"));
-
         importedPackageSize = getHistogram(getMetricName(SUB_COMPONENT, "imported_package_size"));
         itemsBufferSize = getCounter(getMetricName(SUB_COMPONENT, "items_buffer_size"));
         importedPackageDuration = getTimer(getMetricName(SUB_COMPONENT, "imported_package_duration"));
@@ -130,6 +131,7 @@ public class DistributionMetricsService {
         sendStoredStatusDuration = getTimer(getMetricName(SUB_COMPONENT, "send_stored_status_duration"));
         processQueueItemDuration = getTimer(getMetricName(SUB_COMPONENT, "process_queue_item_duration"));
         packageDistributedDuration = getTimer(getMetricName(SUB_COMPONENT, "request_distributed_duration"));
+        packageJournalDistributionDuration = getTimer(getMetricName(SUB_COMPONENT, "package_journal_distribution_duration"));
         queueAccessErrorCount = getCounter(getMetricName(PUB_COMPONENT, "queue_access_error_count"));
         importPostProcessDuration = getTimer(getMetricName(PUB_COMPONENT, "import_post_process_duration"));
         importPostProcessSuccess = getCounter(getMetricName(SUB_COMPONENT, "import_post_process_success_count"));
@@ -234,7 +236,7 @@ public class DistributionMetricsService {
      */
     public Counter getItemsBufferSize() {
         return itemsBufferSize;
-    }
+    } 
 
     /**
      * Timer capturing the duration in ms of successful packages import operations.
@@ -300,6 +302,16 @@ public class DistributionMetricsService {
         return packageDistributedDuration;
     }
 
+    /**
+     * Timer capturing the duration in ms that a package spent in the distribution journal.
+     * The timer starts when the package is enqueued and stops when the package is consumed.
+     *
+     * @return a Sling Metrics timer
+     */
+    public Timer getPackageJournalDistributionDuration() {
+        return packageJournalDistributionDuration;
+    }
+
     /**
      * Timer capturing the duration in ms of building a content package
      *
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
index b9153de..f200da6 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
@@ -517,6 +517,8 @@ public class SubscriberTest {
                 .thenReturn(timer);
         when(distributionMetricsService.getPackageDistributedDuration())
                 .thenReturn(timer);
+        when(distributionMetricsService.getPackageJournalDistributionDuration())
+                .thenReturn(timer);
         when(distributionMetricsService.getTransientImportErrors())
                 .thenReturn(counter);
         when(distributionMetricsService.getPermanentImportErrors())
diff --git a/src/test/java/org/apache/sling/distribution/journal/shared/DistributionMetricsServiceTest.java b/src/test/java/org/apache/sling/distribution/journal/shared/DistributionMetricsServiceTest.java
index 818e9fc..c536844 100644
--- a/src/test/java/org/apache/sling/distribution/journal/shared/DistributionMetricsServiceTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/shared/DistributionMetricsServiceTest.java
@@ -99,6 +99,7 @@ public class DistributionMetricsServiceTest {
         assertNotNull(metrics.getImportedPackageSize());
         assertNotNull(metrics.getItemsBufferSize());
         assertNotNull(metrics.getPackageDistributedDuration());
+        assertNotNull(metrics.getPackageJournalDistributionDuration());
         assertNotNull(metrics.getProcessQueueItemDuration());
         assertNotNull(metrics.getQueueCacheFetchCount());
         assertNotNull(metrics.getQueueAccessErrorCount());