You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cs...@apache.org on 2020/01/17 09:11:56 UTC

[sling-org-apache-sling-distribution-journal] 02/02: SLING-9009 - Only store every nth skipped package offset

This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git

commit 80306acc7de17f926300970c57e2c157019349fa
Author: Christian Schneider <cs...@adobe.com>
AuthorDate: Fri Jan 17 10:03:43 2020 +0100

    SLING-9009 - Only store every nth skipped package offset
---
 .../journal/impl/subscriber/BookKeeper.java        | 20 +++++-
 .../journal/impl/subscriber/BookKeeperTest.java    | 82 ++++++++++++++++++++++
 2 files changed, 99 insertions(+), 3 deletions(-)

diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
index d56713c..558dbf4 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
@@ -81,6 +81,7 @@ public class BookKeeper implements Closeable {
     private static final String SUBSERVICE_IMPORTER = "importer";
     private static final String SUBSERVICE_BOOKKEEPER = "bookkeeper";
     private static final int RETRY_SEND_DELAY = 1000;
+    private static final int COMMIT_AFTER_NUM_SKIPPED = 10;
 
     private final Logger log = LoggerFactory.getLogger(this.getClass());
     private final ResourceResolverFactory resolverFactory;
@@ -98,6 +99,7 @@ public class BookKeeper implements Closeable {
     private final String subAgentName;
     private final String subSlingId;
     private GaugeService<Integer> retriesGauge;
+    private int skippedCounter = 0;
 
     public BookKeeper(ResourceResolverFactory resolverFactory, 
             DistributionMetricsService distributionMetricsService,
@@ -225,9 +227,21 @@ public class BookKeeper implements Closeable {
     
     public void skipPackage(long offset) throws LoginException, PersistenceException {
         log.info("Skipping package at offset {}", offset);
-        try (ResourceResolver resolver = getServiceResolver(SUBSERVICE_BOOKKEEPER)) {
-            storeOffset(resolver, offset);
-            resolver.commit();
+        if (shouldCommitSkipped()) {
+            try (ResourceResolver resolver = getServiceResolver(SUBSERVICE_BOOKKEEPER)) {
+                storeOffset(resolver, offset);
+                resolver.commit();
+            }
+        }
+    }
+
+    public synchronized boolean shouldCommitSkipped() {
+        skippedCounter ++;
+        if (skippedCounter > COMMIT_AFTER_NUM_SKIPPED) {
+            skippedCounter = 1;
+            return true;
+        } else {
+            return false;
         }
     }
 
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeperTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeperTest.java
new file mode 100644
index 0000000..fd4c761
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeperTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.subscriber;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import java.util.function.Consumer;
+
+import org.apache.sling.api.resource.LoginException;
+import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
+import org.apache.sling.testing.resourceresolver.MockResourceResolverFactory;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.osgi.service.event.EventAdmin;
+
+@RunWith(MockitoJUnitRunner.class)
+public class BookKeeperTest {
+
+    private static final int COMMIT_AFTER_NUM_SKIPPED = 10;
+
+    private ResourceResolverFactory resolverFactory = new MockResourceResolverFactory();
+
+    @Mock
+    private DistributionMetricsService distributionMetricsService;
+
+    @Mock
+    private PackageHandler packageHandler;
+
+    @Mock
+    private EventAdmin eventAdmin;
+
+    @Mock
+    private Consumer<PackageStatusMessage> sender;
+
+    private BookKeeper bookKeeper;
+
+    @Before
+    public void before() {
+        bookKeeper = new BookKeeper(resolverFactory, distributionMetricsService, packageHandler, eventAdmin, sender,
+                "subAgentName", "subSlingId", true, 10);
+    }
+
+    @Test
+    public void testOnlyEveryTenthSkippedPackageOffsetStored() throws InterruptedException, PersistenceException, LoginException {
+        for (int c = 0; c < COMMIT_AFTER_NUM_SKIPPED; c++) {
+            bookKeeper.skipPackage(c);
+            assertThat(bookKeeper.loadOffset(), equalTo(-1l));
+        }
+        for (int c = COMMIT_AFTER_NUM_SKIPPED; c < COMMIT_AFTER_NUM_SKIPPED * 2; c++) {
+            bookKeeper.skipPackage(c);
+            assertThat(bookKeeper.loadOffset(), equalTo(10l));
+        }
+        for (int c = COMMIT_AFTER_NUM_SKIPPED * 2; c < COMMIT_AFTER_NUM_SKIPPED * 3; c++) {
+            bookKeeper.skipPackage(c);
+            assertThat(bookKeeper.loadOffset(), equalTo(20l));
+        }
+    }
+
+}