You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by tm...@apache.org on 2022/10/03 19:29:02 UTC
[sling-org-apache-sling-distribution-journal] branch master updated: SLING-11607 - Add metric that tracks the time a package spends in the distribution journal (#112)
This is an automated email from the ASF dual-hosted git repository.
tmaret pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
The following commit(s) were added to refs/heads/master by this push:
new f1c03a3 SLING-11607 - Add metric that tracks the time a package spends in the distribution journal (#112)
f1c03a3 is described below
commit f1c03a3f8b48d6def025ba3a328da55e2f0fa777
Author: José Correia <37...@users.noreply.github.com>
AuthorDate: Mon Oct 3 21:28:57 2022 +0200
SLING-11607 - Add metric that tracks the time a package spends in the distribution journal (#112)
* Add timer that captures the duration in ms that a package spends in the distribution journal
* Update tests
Co-authored-by: josec <jo...@adobe.com>
---
.../impl/subscriber/DistributionSubscriber.java | 52 +++++++++++++---------
.../journal/shared/DistributionMetricsService.java | 16 ++++++-
.../journal/impl/subscriber/SubscriberTest.java | 2 +
.../shared/DistributionMetricsServiceTest.java | 1 +
4 files changed, 47 insertions(+), 24 deletions(-)
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index 5c4acfd..a7e172f 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -19,6 +19,7 @@
package org.apache.sling.distribution.journal.impl.subscriber;
import static java.lang.String.format;
+import static java.lang.System.currentTimeMillis;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS;
@@ -127,14 +128,14 @@ public class DistributionSubscriber {
@Reference
private SubscriberReadyStore subscriberReadyStore;
-
- private volatile Closeable idleReadyCheck; //NOSONAR
-
- private volatile IdleCheck idleCheck; //NOSONAR
+
+ private volatile Closeable idleReadyCheck; // NOSONAR
+
+ private volatile IdleCheck idleCheck; // NOSONAR
private Closeable packagePoller;
- private volatile CommandPoller commandPoller; //NOSONAR
+ private volatile CommandPoller commandPoller; // NOSONAR
private BookKeeper bookKeeper;
@@ -178,13 +179,13 @@ public class DistributionSubscriber {
if (config.subscriberIdleCheck()) {
// Unofficial config (currently just for test)
AtomicBoolean readyHolder = subscriberReadyStore.getReadyHolder(subAgentName);
-
+
idleCheck = new SubscriberIdle(idleMillies, SubscriberIdle.DEFAULT_FORCE_IDLE_MILLIS, readyHolder);
idleReadyCheck = new SubscriberIdleCheck(context, idleCheck);
} else {
idleCheck = new NoopIdle();
}
-
+
queueNames = getNotEmpty(config.agentNames());
pkgType = requireNonNull(packageBuilder.getType());
@@ -192,33 +193,35 @@ public class DistributionSubscriber {
Consumer<LogMessage> logSender = messagingProvider.createSender(topics.getDiscoveryTopic());
String packageNodeName = escapeTopicName(messagingProvider.getServerUri(), topics.getPackageTopic());
- BookKeeperConfig bkConfig = new BookKeeperConfig(subAgentName, subSlingId, config.editable(), config.maxRetries(), config.packageHandling(), packageNodeName);
+ BookKeeperConfig bkConfig = new BookKeeperConfig(subAgentName, subSlingId, config.editable(),
+ config.maxRetries(), config.packageHandling(), packageNodeName);
bookKeeper = bookKeeperFactory.create(packageBuilder, bkConfig, statusSender, logSender);
-
+
long startOffset = bookKeeper.loadOffset() + 1;
String assign = startOffset > 0 ? messagingProvider.assignTo(startOffset) : null;
packagePoller = messagingProvider.createPoller(topics.getPackageTopic(), Reset.latest, assign,
HandlerAdapter.create(PackageMessage.class, this::handlePackageMessage));
-
queueThread = startBackgroundThread(this::processQueue,
format("Queue Processor for Subscriber agent %s", subAgentName));
int announceDelay = PropertiesUtil.toInteger(properties.get("announceDelay"), 10000);
- announcer = new Announcer(subSlingId, subAgentName, queueNames, messagingProvider.createSender(topics.getDiscoveryTopic()), bookKeeper,
+ announcer = new Announcer(subSlingId, subAgentName, queueNames,
+ messagingProvider.createSender(topics.getDiscoveryTopic()), bookKeeper,
config.maxRetries(), config.editable(), announceDelay);
- LOG.info("Started Subscriber agent {} at offset {}, subscribed to agent names {}", subAgentName, startOffset, queueNames);
+ LOG.info("Started Subscriber agent {} at offset {}, subscribed to agent names {}", subAgentName, startOffset,
+ queueNames);
}
-
+
public static String escapeTopicName(URI messagingUri, String topicName) {
- return String.format("%s%s_%s",
+ return String.format("%s%s_%s",
messagingUri.getHost(),
escape(messagingUri.getPath()),
escape(topicName));
}
-
+
private static String escape(String st) {
return Text.escapeIllegalJcrChars(st.replace("/", "_"));
}
@@ -234,10 +237,11 @@ public class DistributionSubscriber {
* Note that we don't interrupt blocking calls using Thread.interrupt()
* because interrupts can stop the Apache Oak repository.
*
- * See SLING-9340, OAK-2609 and https://jackrabbit.apache.org/oak/docs/dos_and_donts.html
+ * See SLING-9340, OAK-2609 and
+ * https://jackrabbit.apache.org/oak/docs/dos_and_donts.html
*/
- IOUtils.closeQuietly(announcer, bookKeeper,
+ IOUtils.closeQuietly(announcer, bookKeeper,
packagePoller, idleReadyCheck, idleCheck, commandPoller);
running = false;
try {
@@ -249,7 +253,7 @@ public class DistributionSubscriber {
LOG.info("Stopped Subscriber agent {}, subscribed to Publisher agent names {} with package builder {}",
subAgentName, queueNames, pkgType);
}
-
+
public DistributionAgentState getState() {
boolean isBlocked = bookKeeper.getPackageRetries().getSum() > 0;
if (isBlocked) {
@@ -260,6 +264,8 @@ public class DistributionSubscriber {
private void handlePackageMessage(MessageInfo info, PackageMessage message) {
if (shouldEnqueue(info, message)) {
+ distributionMetricsService.getPackageJournalDistributionDuration()
+ .update((currentTimeMillis() - info.getCreateTime()), TimeUnit.MILLISECONDS);
enqueue(new FullMessage<>(info, message));
} else {
try {
@@ -322,7 +328,8 @@ public class DistributionSubscriber {
LOG.info("Stopped Queue processor");
}
- private void fetchAndProcessQueueItem() throws InterruptedException, IOException, LoginException, DistributionException, ImportPostProcessException {
+ private void fetchAndProcessQueueItem() throws InterruptedException, IOException, LoginException,
+ DistributionException, ImportPostProcessException {
blockingSendStoredStatus();
FullMessage<PackageMessage> item = blockingPeekQueueItem();
try (Timer.Context context = distributionMetricsService.getProcessQueueItemDuration().time()) {
@@ -361,7 +368,8 @@ public class DistributionSubscriber {
throw new InterruptedException("Shutting down");
}
- private void processQueueItem(FullMessage<PackageMessage> item) throws PersistenceException, LoginException, DistributionException, ImportPostProcessException {
+ private void processQueueItem(FullMessage<PackageMessage> item)
+ throws PersistenceException, LoginException, DistributionException, ImportPostProcessException {
MessageInfo info = item.getInfo();
PackageMessage pkgMsg = item.getMessage();
boolean skip = shouldSkip(info.getOffset());
@@ -392,7 +400,6 @@ public class DistributionSubscriber {
return waitPrecondition(offset) == Decision.SKIP;
}
-
private Decision waitPrecondition(long offset) {
long endTime = System.currentTimeMillis() + PRECONDITION_TIMEOUT;
while (System.currentTimeMillis() < endTime && running) {
@@ -403,7 +410,8 @@ public class DistributionSubscriber {
return decision;
}
}
- throw new PreConditionTimeoutException("Timeout waiting for distribution package at offset=" + offset + " on status topic");
+ throw new PreConditionTimeoutException(
+ "Timeout waiting for distribution package at offset=" + offset + " on status topic");
}
}
diff --git a/src/main/java/org/apache/sling/distribution/journal/shared/DistributionMetricsService.java b/src/main/java/org/apache/sling/distribution/journal/shared/DistributionMetricsService.java
index 00b6a9b..febdaf0 100644
--- a/src/main/java/org/apache/sling/distribution/journal/shared/DistributionMetricsService.java
+++ b/src/main/java/org/apache/sling/distribution/journal/shared/DistributionMetricsService.java
@@ -83,6 +83,8 @@ public class DistributionMetricsService {
private Timer packageDistributedDuration;
+ private Timer packageJournalDistributionDuration;
+
private Timer buildPackageDuration;
private Timer enqueuePackageDuration;
@@ -120,7 +122,6 @@ public class DistributionMetricsService {
buildPackageDuration = getTimer(getMetricName(PUB_COMPONENT, "build_package_duration"));
enqueuePackageDuration = getTimer(getMetricName(PUB_COMPONENT, "enqueue_package_duration"));
queueCacheFetchCount = getCounter(getMetricName(PUB_COMPONENT, "queue_cache_fetch_count"));
-
importedPackageSize = getHistogram(getMetricName(SUB_COMPONENT, "imported_package_size"));
itemsBufferSize = getCounter(getMetricName(SUB_COMPONENT, "items_buffer_size"));
importedPackageDuration = getTimer(getMetricName(SUB_COMPONENT, "imported_package_duration"));
@@ -130,6 +131,7 @@ public class DistributionMetricsService {
sendStoredStatusDuration = getTimer(getMetricName(SUB_COMPONENT, "send_stored_status_duration"));
processQueueItemDuration = getTimer(getMetricName(SUB_COMPONENT, "process_queue_item_duration"));
packageDistributedDuration = getTimer(getMetricName(SUB_COMPONENT, "request_distributed_duration"));
+ packageJournalDistributionDuration = getTimer(getMetricName(SUB_COMPONENT, "package_journal_distribution_duration"));
queueAccessErrorCount = getCounter(getMetricName(PUB_COMPONENT, "queue_access_error_count"));
importPostProcessDuration = getTimer(getMetricName(PUB_COMPONENT, "import_post_process_duration"));
importPostProcessSuccess = getCounter(getMetricName(SUB_COMPONENT, "import_post_process_success_count"));
@@ -234,7 +236,7 @@ public class DistributionMetricsService {
*/
public Counter getItemsBufferSize() {
return itemsBufferSize;
- }
+ }
/**
* Timer capturing the duration in ms of successful packages import operations.
@@ -300,6 +302,16 @@ public class DistributionMetricsService {
return packageDistributedDuration;
}
+ /**
+ * Timer capturing the duration in ms that a package spent in the distribution journal.
+ * The timer starts when the package is enqueued and stops when the package is consumed.
+ *
+ * @return a Sling Metrics timer
+ */
+ public Timer getPackageJournalDistributionDuration() {
+ return packageJournalDistributionDuration;
+ }
+
/**
* Timer capturing the duration in ms of building a content package
*
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
index b9153de..f200da6 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
@@ -517,6 +517,8 @@ public class SubscriberTest {
.thenReturn(timer);
when(distributionMetricsService.getPackageDistributedDuration())
.thenReturn(timer);
+ when(distributionMetricsService.getPackageJournalDistributionDuration())
+ .thenReturn(timer);
when(distributionMetricsService.getTransientImportErrors())
.thenReturn(counter);
when(distributionMetricsService.getPermanentImportErrors())
diff --git a/src/test/java/org/apache/sling/distribution/journal/shared/DistributionMetricsServiceTest.java b/src/test/java/org/apache/sling/distribution/journal/shared/DistributionMetricsServiceTest.java
index 818e9fc..c536844 100644
--- a/src/test/java/org/apache/sling/distribution/journal/shared/DistributionMetricsServiceTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/shared/DistributionMetricsServiceTest.java
@@ -99,6 +99,7 @@ public class DistributionMetricsServiceTest {
assertNotNull(metrics.getImportedPackageSize());
assertNotNull(metrics.getItemsBufferSize());
assertNotNull(metrics.getPackageDistributedDuration());
+ assertNotNull(metrics.getPackageJournalDistributionDuration());
assertNotNull(metrics.getProcessQueueItemDuration());
assertNotNull(metrics.getQueueCacheFetchCount());
assertNotNull(metrics.getQueueAccessErrorCount());