You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cs...@apache.org on 2020/07/29 15:15:45 UTC

[sling-org-apache-sling-distribution-journal] 01/01: SLING-9593 - Move binary store, require config

This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch SLING-9593-3
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git

commit 6d342e7fabacad837811566c7b1bc9e55331dc87
Author: Christian Schneider <cs...@adobe.com>
AuthorDate: Wed Jul 29 17:15:24 2020 +0200

    SLING-9593 - Move binary store, require config
---
 .../jcr/JcrBinaryStore.java}                       |  97 +++++++++++++------
 .../publisher => binary/jcr}/PackageCleaner.java   |   5 +-
 .../jcr}/PackageCleanupTask.java                   |   6 +-
 .../impl/publisher/PackageMessageFactory.java      |   5 +-
 .../journal/shared/JcrBinaryStore.java             | 103 ---------------------
 .../jcr/JcrBinaryStoreTest.java}                   |   9 +-
 6 files changed, 82 insertions(+), 143 deletions(-)

diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepo.java b/src/main/java/org/apache/sling/distribution/journal/binary/jcr/JcrBinaryStore.java
similarity index 68%
rename from src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepo.java
rename to src/main/java/org/apache/sling/distribution/journal/binary/jcr/JcrBinaryStore.java
index 6f2791b..532d083 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepo.java
+++ b/src/main/java/org/apache/sling/distribution/journal/binary/jcr/JcrBinaryStore.java
@@ -16,17 +16,26 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.publisher;
+package org.apache.sling.distribution.journal.binary.jcr;
+
+import static java.util.Collections.singletonMap;
+import static org.apache.sling.api.resource.ResourceResolverFactory.SUBSERVICE;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Objects;
 
 import javax.annotation.Nonnull;
-import javax.annotation.ParametersAreNonnullByDefault;
 import javax.jcr.Binary;
 import javax.jcr.Node;
 import javax.jcr.Property;
+import javax.jcr.Session;
+import javax.jcr.ValueFactory;
 import javax.jcr.nodetype.NodeType;
 
 import org.apache.jackrabbit.api.ReferenceBinary;
 import org.apache.jackrabbit.commons.JcrUtils;
+import org.apache.jackrabbit.commons.jackrabbit.SimpleReferenceBinary;
 import org.apache.sling.api.resource.LoginException;
 import org.apache.sling.api.resource.PersistenceException;
 import org.apache.sling.api.resource.Resource;
@@ -35,41 +44,77 @@ import org.apache.sling.api.resource.ResourceResolverFactory;
 import org.apache.sling.api.resource.ResourceUtil;
 import org.apache.sling.commons.metrics.Timer;
 import org.apache.sling.distribution.common.DistributionException;
+import org.apache.sling.distribution.journal.BinaryStore;
 import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
 import org.apache.sling.serviceusermapping.ServiceUserMapped;
 import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.ConfigurationPolicy;
 import org.osgi.service.component.annotations.Reference;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static java.util.Collections.singletonMap;
-import static org.apache.sling.api.resource.ResourceResolverFactory.SUBSERVICE;
-
-import java.io.InputStream;
-import java.util.Objects;
-
-/**
- * Manages the binary content of DistributionPackages. If they are too big to fit in a journal message then they
- * are written to the blob store. It also offers cleanup functionality to remove the data when it is not needed anymore.
- */
-@Component(service = PackageRepo.class)
-@ParametersAreNonnullByDefault
-public class PackageRepo {
-
+@Component(
+    property = {
+        "type=jcr"
+    },
+    configurationPolicy = ConfigurationPolicy.REQUIRE
+)
+public class JcrBinaryStore implements BinaryStore {
+    private static final long MAX_INLINE_PKG_BINARY_SIZE = 800L * 1024;
     private static final String SLING_FOLDER = "sling:Folder";
+    static final String PACKAGES_ROOT_PATH = "/var/sling/distribution/journal/packagebinaries";
+
+    private static final Logger LOG = LoggerFactory.getLogger(JcrBinaryStore.class);
 
-    @Reference
-    private ResourceResolverFactory resolverFactory;
-    
     @Reference
     private ServiceUserMapped mapped;
 
     @Reference
     private DistributionMetricsService distributionMetricsService;
 
-    private static final Logger LOG = LoggerFactory.getLogger(PackageRepo.class);
-    static final String PACKAGES_ROOT_PATH = "/var/sling/distribution/journal/packagebinaries";
 
+    @Reference
+    private ResourceResolverFactory resolverFactory;
+
+    @Override public InputStream get(String reference) throws IOException {
+        try (ResourceResolver resolver = createResourceResolver()) {
+            Session session = resolver.adaptTo(Session.class);
+            if (session == null) {
+                throw new IOException("Unable to get Oak session");
+            }
+            ValueFactory factory = session.getValueFactory();
+            Binary binary = factory.createValue(new SimpleReferenceBinary(reference)).getBinary();
+            return binary.getStream();
+        } catch (Exception e) {
+            throw new IOException(e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public String put(String id, InputStream stream, long length) throws IOException {
+        if (length > MAX_INLINE_PKG_BINARY_SIZE) {
+
+            /*
+             * Rather than pro-actively (and somewhat arbitrarily)
+             * decide to avoid sending a package inline based on
+             * its size, we could simply try to send packages of
+             * any size and only avoiding to inline as a fallback.
+             * However, this approach requires the messaging
+             * implementation to offer a mean to distinguish
+             * size issues when sending messages, which is not
+             * always the case.
+             */
+
+            LOG.info("Package {} too large ({}B) to be sent inline", id, length);
+            try {
+                return store(id, stream);
+            } catch (DistributionException e) {
+                throw new IOException(e.getMessage(), e);
+            }
+        }
+        return null;
+    }
+    
     @Nonnull
     public String store(String id, InputStream binaryStream)throws DistributionException {
         try (ResourceResolver resolver = createResourceResolver()) {
@@ -110,14 +155,14 @@ public class PackageRepo {
             context.stop();
         }
     }
-
-    private ResourceResolver createResourceResolver() throws LoginException {
-        return resolverFactory.getServiceResourceResolver(singletonMap(SUBSERVICE, "bookkeeper"));
-    }
-
+    
     @Nonnull
     private Resource getRoot(ResourceResolver resolver)
             throws PersistenceException {
         return ResourceUtil.getOrCreateResource(resolver, PACKAGES_ROOT_PATH, SLING_FOLDER, SLING_FOLDER, true);
     }
+
+    private ResourceResolver createResourceResolver() throws LoginException {
+        return resolverFactory.getServiceResourceResolver(singletonMap(SUBSERVICE, "bookkeeper"));
+    }
 }
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageCleaner.java b/src/main/java/org/apache/sling/distribution/journal/binary/jcr/PackageCleaner.java
similarity index 96%
rename from src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageCleaner.java
rename to src/main/java/org/apache/sling/distribution/journal/binary/jcr/PackageCleaner.java
index d272904..c706f8c 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageCleaner.java
+++ b/src/main/java/org/apache/sling/distribution/journal/binary/jcr/PackageCleaner.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.publisher;
+package org.apache.sling.distribution.journal.binary.jcr;
 
 import org.apache.sling.api.resource.PersistenceException;
 import org.apache.sling.api.resource.Resource;
@@ -25,7 +25,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class PackageCleaner {
-    private static final Logger LOG = LoggerFactory.getLogger(PackageRepo.class);
+    private static final Logger LOG = LoggerFactory.getLogger(PackageCleaner.class);
+    
     private ResourceResolver resolver;
     private long deleteOlderThanTime;
 
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageCleanupTask.java b/src/main/java/org/apache/sling/distribution/journal/binary/jcr/PackageCleanupTask.java
similarity index 95%
rename from src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageCleanupTask.java
rename to src/main/java/org/apache/sling/distribution/journal/binary/jcr/PackageCleanupTask.java
index b534195..56c5b1f 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageCleanupTask.java
+++ b/src/main/java/org/apache/sling/distribution/journal/binary/jcr/PackageCleanupTask.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.publisher;
+package org.apache.sling.distribution.journal.binary.jcr;
 
 import static org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_CONCURRENT;
 import static org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_IMMEDIATE;
@@ -55,7 +55,7 @@ public class PackageCleanupTask implements Runnable {
     private static final long PKG_MAX_LIFETIME_MS = 30 * 24 * 60 * 60 * 1000;
 
     @Reference
-    private PackageRepo packageRepo;
+    private JcrBinaryStore binaryStore;
 
     /**
      * The task runs only when at least one DistributionSubscriber agent is configured.
@@ -67,7 +67,7 @@ public class PackageCleanupTask implements Runnable {
     public void run() {
         LOG.info("Starting Package Cleanup Task");
         long deleteOlderThanTime = System.currentTimeMillis() - PKG_MAX_LIFETIME_MS;
-        packageRepo.cleanup(deleteOlderThanTime);
+        binaryStore.cleanup(deleteOlderThanTime);
         LOG.info("Finished Package Cleanup Task");
     }
 
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageMessageFactory.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageMessageFactory.java
index ce69e3c..0ee6e65 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageMessageFactory.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageMessageFactory.java
@@ -46,7 +46,6 @@ import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.Deactivate;
 import org.osgi.service.component.annotations.Reference;
-import org.osgi.service.component.annotations.ReferencePolicyOption;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -56,12 +55,10 @@ public class PackageMessageFactory {
 
     private static final Logger LOG = LoggerFactory.getLogger(PackageMessageFactory.class);
 
-    private static final long MAX_INLINE_PKG_BINARY_SIZE = 800L * 1024;
-
     @Reference
     private SlingSettingsService slingSettings;
 
-    @Reference(policyOption = ReferencePolicyOption.GREEDY)
+    @Reference
     private BinaryStore binaryStore;
 
     private String pubSlingId;
diff --git a/src/main/java/org/apache/sling/distribution/journal/shared/JcrBinaryStore.java b/src/main/java/org/apache/sling/distribution/journal/shared/JcrBinaryStore.java
deleted file mode 100644
index 20c3d86..0000000
--- a/src/main/java/org/apache/sling/distribution/journal/shared/JcrBinaryStore.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sling.distribution.journal.shared;
-
-import java.io.IOException;
-import java.io.InputStream;
-
-import javax.jcr.Binary;
-import javax.jcr.Session;
-import javax.jcr.ValueFactory;
-
-import org.apache.jackrabbit.commons.jackrabbit.SimpleReferenceBinary;
-import org.apache.sling.api.resource.LoginException;
-import org.apache.sling.api.resource.ResourceResolver;
-import org.apache.sling.api.resource.ResourceResolverFactory;
-import org.apache.sling.distribution.common.DistributionException;
-import org.apache.sling.distribution.journal.BinaryStore;
-import org.apache.sling.distribution.journal.impl.publisher.PackageRepo;
-import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.Reference;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static java.util.Collections.singletonMap;
-import static org.apache.sling.api.resource.ResourceResolverFactory.SUBSERVICE;
-
-@Component(
-    property = {
-        "type=jcr",
-        "service.ranking:Integer=10"
-    }
-)
-public class JcrBinaryStore implements BinaryStore {
-
-    private static final Logger LOG = LoggerFactory.getLogger(JcrBinaryStore.class);
-
-    private static final long MAX_INLINE_PKG_BINARY_SIZE = 800L * 1024;
-
-    @Reference
-    private PackageRepo packageRepo;
-
-    @Reference
-    private ResourceResolverFactory resolverFactory;
-
-    @Override public InputStream get(String reference) throws IOException {
-        try (ResourceResolver resolver = createResourceResolver()) {
-            Session session = resolver.adaptTo(Session.class);
-            if (session == null) {
-                throw new IOException("Unable to get Oak session");
-            }
-            ValueFactory factory = session.getValueFactory();
-            Binary binary = factory.createValue(new SimpleReferenceBinary(reference)).getBinary();
-            return binary.getStream();
-        } catch (Exception e) {
-            throw new IOException(e.getMessage(), e);
-        }
-    }
-
-    @Override
-    public String put(String id, InputStream stream, long length) throws IOException {
-        if (length > MAX_INLINE_PKG_BINARY_SIZE) {
-
-            /*
-             * Rather than pro-actively (and somewhat arbitrarily)
-             * decide to avoid sending a package inline based on
-             * its size, we could simply try to send packages of
-             * any size and only avoiding to inline as a fallback.
-             * However, this approach requires the messaging
-             * implementation to offer a mean to distinguish
-             * size issues when sending messages, which is not
-             * always the case.
-             */
-
-            LOG.info("Package {} too large ({}B) to be sent inline", id, length);
-            try {
-                return packageRepo.store(id, stream);
-            } catch (DistributionException e) {
-                throw new IOException(e.getMessage(), e);
-            }
-        }
-        return null;
-    }
-
-    private ResourceResolver createResourceResolver() throws LoginException {
-        return resolverFactory.getServiceResourceResolver(singletonMap(SUBSERVICE, "bookkeeper"));
-    }
-}
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepoTest.java b/src/test/java/org/apache/sling/distribution/journal/binary/jcr/JcrBinaryStoreTest.java
similarity index 95%
rename from src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepoTest.java
rename to src/test/java/org/apache/sling/distribution/journal/binary/jcr/JcrBinaryStoreTest.java
index c6200c8..4e83c32 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepoTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/binary/jcr/JcrBinaryStoreTest.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.publisher;
+package org.apache.sling.distribution.journal.binary.jcr;
 
 import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertThat;
@@ -54,7 +54,7 @@ import org.mockito.MockitoAnnotations;
 import org.mockito.Spy;
 import org.osgi.framework.BundleContext;
 
-public class PackageRepoTest {
+public class JcrBinaryStoreTest {
     
     @Spy
     private BundleContext bundleContext = MockOsgi.newBundleContext();
@@ -84,8 +84,7 @@ public class PackageRepoTest {
     private Topics topics = new Topics();
     
     @InjectMocks
-    private PackageRepo packageRepo;
-
+    private JcrBinaryStore packageRepo;
 
     @Before
     public void before() {
@@ -125,7 +124,7 @@ public class PackageRepoTest {
 
     private List<Resource> getPackageNodes(ResourceResolver resolver) throws LoginException {
         List<Resource> result = new ArrayList<>();
-        Resource root = resolver.getResource(PackageRepo.PACKAGES_ROOT_PATH);
+        Resource root = resolver.getResource(JcrBinaryStore.PACKAGES_ROOT_PATH);
         for (Resource pkg : root.getChildren()) {
             result.add(pkg);
         }