You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cs...@apache.org on 2020/07/29 15:15:45 UTC
[sling-org-apache-sling-distribution-journal] 01/01: SLING-9593 -
Move binary store, require config
This is an automated email from the ASF dual-hosted git repository.
cschneider pushed a commit to branch SLING-9593-3
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
commit 6d342e7fabacad837811566c7b1bc9e55331dc87
Author: Christian Schneider <cs...@adobe.com>
AuthorDate: Wed Jul 29 17:15:24 2020 +0200
SLING-9593 - Move binary store, require config
---
.../jcr/JcrBinaryStore.java} | 97 +++++++++++++------
.../publisher => binary/jcr}/PackageCleaner.java | 5 +-
.../jcr}/PackageCleanupTask.java | 6 +-
.../impl/publisher/PackageMessageFactory.java | 5 +-
.../journal/shared/JcrBinaryStore.java | 103 ---------------------
.../jcr/JcrBinaryStoreTest.java} | 9 +-
6 files changed, 82 insertions(+), 143 deletions(-)
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepo.java b/src/main/java/org/apache/sling/distribution/journal/binary/jcr/JcrBinaryStore.java
similarity index 68%
rename from src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepo.java
rename to src/main/java/org/apache/sling/distribution/journal/binary/jcr/JcrBinaryStore.java
index 6f2791b..532d083 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepo.java
+++ b/src/main/java/org/apache/sling/distribution/journal/binary/jcr/JcrBinaryStore.java
@@ -16,17 +16,26 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.publisher;
+package org.apache.sling.distribution.journal.binary.jcr;
+
+import static java.util.Collections.singletonMap;
+import static org.apache.sling.api.resource.ResourceResolverFactory.SUBSERVICE;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Objects;
import javax.annotation.Nonnull;
-import javax.annotation.ParametersAreNonnullByDefault;
import javax.jcr.Binary;
import javax.jcr.Node;
import javax.jcr.Property;
+import javax.jcr.Session;
+import javax.jcr.ValueFactory;
import javax.jcr.nodetype.NodeType;
import org.apache.jackrabbit.api.ReferenceBinary;
import org.apache.jackrabbit.commons.JcrUtils;
+import org.apache.jackrabbit.commons.jackrabbit.SimpleReferenceBinary;
import org.apache.sling.api.resource.LoginException;
import org.apache.sling.api.resource.PersistenceException;
import org.apache.sling.api.resource.Resource;
@@ -35,41 +44,77 @@ import org.apache.sling.api.resource.ResourceResolverFactory;
import org.apache.sling.api.resource.ResourceUtil;
import org.apache.sling.commons.metrics.Timer;
import org.apache.sling.distribution.common.DistributionException;
+import org.apache.sling.distribution.journal.BinaryStore;
import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
import org.apache.sling.serviceusermapping.ServiceUserMapped;
import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.ConfigurationPolicy;
import org.osgi.service.component.annotations.Reference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static java.util.Collections.singletonMap;
-import static org.apache.sling.api.resource.ResourceResolverFactory.SUBSERVICE;
-
-import java.io.InputStream;
-import java.util.Objects;
-
-/**
- * Manages the binary content of DistributionPackages. If they are too big to fit in a journal message then they
- * are written to the blob store. It also offers cleanup functionality to remove the data when it is not needed anymore.
- */
-@Component(service = PackageRepo.class)
-@ParametersAreNonnullByDefault
-public class PackageRepo {
-
+@Component(
+ property = {
+ "type=jcr"
+ },
+ configurationPolicy = ConfigurationPolicy.REQUIRE
+)
+public class JcrBinaryStore implements BinaryStore {
+ private static final long MAX_INLINE_PKG_BINARY_SIZE = 800L * 1024;
private static final String SLING_FOLDER = "sling:Folder";
+ static final String PACKAGES_ROOT_PATH = "/var/sling/distribution/journal/packagebinaries";
+
+ private static final Logger LOG = LoggerFactory.getLogger(JcrBinaryStore.class);
- @Reference
- private ResourceResolverFactory resolverFactory;
-
@Reference
private ServiceUserMapped mapped;
@Reference
private DistributionMetricsService distributionMetricsService;
- private static final Logger LOG = LoggerFactory.getLogger(PackageRepo.class);
- static final String PACKAGES_ROOT_PATH = "/var/sling/distribution/journal/packagebinaries";
+ @Reference
+ private ResourceResolverFactory resolverFactory;
+
+ @Override public InputStream get(String reference) throws IOException {
+ try (ResourceResolver resolver = createResourceResolver()) {
+ Session session = resolver.adaptTo(Session.class);
+ if (session == null) {
+ throw new IOException("Unable to get Oak session");
+ }
+ ValueFactory factory = session.getValueFactory();
+ Binary binary = factory.createValue(new SimpleReferenceBinary(reference)).getBinary();
+ return binary.getStream();
+ } catch (Exception e) {
+ throw new IOException(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public String put(String id, InputStream stream, long length) throws IOException {
+ if (length > MAX_INLINE_PKG_BINARY_SIZE) {
+
+ /*
+ * Rather than pro-actively (and somewhat arbitrarily)
+ * decide to avoid sending a package inline based on
+ * its size, we could simply try to send packages of
+ * any size and only avoiding to inline as a fallback.
+ * However, this approach requires the messaging
+ * implementation to offer a mean to distinguish
+ * size issues when sending messages, which is not
+ * always the case.
+ */
+
+ LOG.info("Package {} too large ({}B) to be sent inline", id, length);
+ try {
+ return store(id, stream);
+ } catch (DistributionException e) {
+ throw new IOException(e.getMessage(), e);
+ }
+ }
+ return null;
+ }
+
@Nonnull
public String store(String id, InputStream binaryStream)throws DistributionException {
try (ResourceResolver resolver = createResourceResolver()) {
@@ -110,14 +155,14 @@ public class PackageRepo {
context.stop();
}
}
-
- private ResourceResolver createResourceResolver() throws LoginException {
- return resolverFactory.getServiceResourceResolver(singletonMap(SUBSERVICE, "bookkeeper"));
- }
-
+
@Nonnull
private Resource getRoot(ResourceResolver resolver)
throws PersistenceException {
return ResourceUtil.getOrCreateResource(resolver, PACKAGES_ROOT_PATH, SLING_FOLDER, SLING_FOLDER, true);
}
+
+ private ResourceResolver createResourceResolver() throws LoginException {
+ return resolverFactory.getServiceResourceResolver(singletonMap(SUBSERVICE, "bookkeeper"));
+ }
}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageCleaner.java b/src/main/java/org/apache/sling/distribution/journal/binary/jcr/PackageCleaner.java
similarity index 96%
rename from src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageCleaner.java
rename to src/main/java/org/apache/sling/distribution/journal/binary/jcr/PackageCleaner.java
index d272904..c706f8c 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageCleaner.java
+++ b/src/main/java/org/apache/sling/distribution/journal/binary/jcr/PackageCleaner.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.publisher;
+package org.apache.sling.distribution.journal.binary.jcr;
import org.apache.sling.api.resource.PersistenceException;
import org.apache.sling.api.resource.Resource;
@@ -25,7 +25,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PackageCleaner {
- private static final Logger LOG = LoggerFactory.getLogger(PackageRepo.class);
+ private static final Logger LOG = LoggerFactory.getLogger(PackageCleaner.class);
+
private ResourceResolver resolver;
private long deleteOlderThanTime;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageCleanupTask.java b/src/main/java/org/apache/sling/distribution/journal/binary/jcr/PackageCleanupTask.java
similarity index 95%
rename from src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageCleanupTask.java
rename to src/main/java/org/apache/sling/distribution/journal/binary/jcr/PackageCleanupTask.java
index b534195..56c5b1f 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageCleanupTask.java
+++ b/src/main/java/org/apache/sling/distribution/journal/binary/jcr/PackageCleanupTask.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.publisher;
+package org.apache.sling.distribution.journal.binary.jcr;
import static org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_CONCURRENT;
import static org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_IMMEDIATE;
@@ -55,7 +55,7 @@ public class PackageCleanupTask implements Runnable {
private static final long PKG_MAX_LIFETIME_MS = 30 * 24 * 60 * 60 * 1000;
@Reference
- private PackageRepo packageRepo;
+ private JcrBinaryStore binaryStore;
/**
* The task runs only when at least one DistributionSubscriber agent is configured.
@@ -67,7 +67,7 @@ public class PackageCleanupTask implements Runnable {
public void run() {
LOG.info("Starting Package Cleanup Task");
long deleteOlderThanTime = System.currentTimeMillis() - PKG_MAX_LIFETIME_MS;
- packageRepo.cleanup(deleteOlderThanTime);
+ binaryStore.cleanup(deleteOlderThanTime);
LOG.info("Finished Package Cleanup Task");
}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageMessageFactory.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageMessageFactory.java
index ce69e3c..0ee6e65 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageMessageFactory.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageMessageFactory.java
@@ -46,7 +46,6 @@ import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
-import org.osgi.service.component.annotations.ReferencePolicyOption;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,12 +55,10 @@ public class PackageMessageFactory {
private static final Logger LOG = LoggerFactory.getLogger(PackageMessageFactory.class);
- private static final long MAX_INLINE_PKG_BINARY_SIZE = 800L * 1024;
-
@Reference
private SlingSettingsService slingSettings;
- @Reference(policyOption = ReferencePolicyOption.GREEDY)
+ @Reference
private BinaryStore binaryStore;
private String pubSlingId;
diff --git a/src/main/java/org/apache/sling/distribution/journal/shared/JcrBinaryStore.java b/src/main/java/org/apache/sling/distribution/journal/shared/JcrBinaryStore.java
deleted file mode 100644
index 20c3d86..0000000
--- a/src/main/java/org/apache/sling/distribution/journal/shared/JcrBinaryStore.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sling.distribution.journal.shared;
-
-import java.io.IOException;
-import java.io.InputStream;
-
-import javax.jcr.Binary;
-import javax.jcr.Session;
-import javax.jcr.ValueFactory;
-
-import org.apache.jackrabbit.commons.jackrabbit.SimpleReferenceBinary;
-import org.apache.sling.api.resource.LoginException;
-import org.apache.sling.api.resource.ResourceResolver;
-import org.apache.sling.api.resource.ResourceResolverFactory;
-import org.apache.sling.distribution.common.DistributionException;
-import org.apache.sling.distribution.journal.BinaryStore;
-import org.apache.sling.distribution.journal.impl.publisher.PackageRepo;
-import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.Reference;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static java.util.Collections.singletonMap;
-import static org.apache.sling.api.resource.ResourceResolverFactory.SUBSERVICE;
-
-@Component(
- property = {
- "type=jcr",
- "service.ranking:Integer=10"
- }
-)
-public class JcrBinaryStore implements BinaryStore {
-
- private static final Logger LOG = LoggerFactory.getLogger(JcrBinaryStore.class);
-
- private static final long MAX_INLINE_PKG_BINARY_SIZE = 800L * 1024;
-
- @Reference
- private PackageRepo packageRepo;
-
- @Reference
- private ResourceResolverFactory resolverFactory;
-
- @Override public InputStream get(String reference) throws IOException {
- try (ResourceResolver resolver = createResourceResolver()) {
- Session session = resolver.adaptTo(Session.class);
- if (session == null) {
- throw new IOException("Unable to get Oak session");
- }
- ValueFactory factory = session.getValueFactory();
- Binary binary = factory.createValue(new SimpleReferenceBinary(reference)).getBinary();
- return binary.getStream();
- } catch (Exception e) {
- throw new IOException(e.getMessage(), e);
- }
- }
-
- @Override
- public String put(String id, InputStream stream, long length) throws IOException {
- if (length > MAX_INLINE_PKG_BINARY_SIZE) {
-
- /*
- * Rather than pro-actively (and somewhat arbitrarily)
- * decide to avoid sending a package inline based on
- * its size, we could simply try to send packages of
- * any size and only avoiding to inline as a fallback.
- * However, this approach requires the messaging
- * implementation to offer a mean to distinguish
- * size issues when sending messages, which is not
- * always the case.
- */
-
- LOG.info("Package {} too large ({}B) to be sent inline", id, length);
- try {
- return packageRepo.store(id, stream);
- } catch (DistributionException e) {
- throw new IOException(e.getMessage(), e);
- }
- }
- return null;
- }
-
- private ResourceResolver createResourceResolver() throws LoginException {
- return resolverFactory.getServiceResourceResolver(singletonMap(SUBSERVICE, "bookkeeper"));
- }
-}
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepoTest.java b/src/test/java/org/apache/sling/distribution/journal/binary/jcr/JcrBinaryStoreTest.java
similarity index 95%
rename from src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepoTest.java
rename to src/test/java/org/apache/sling/distribution/journal/binary/jcr/JcrBinaryStoreTest.java
index c6200c8..4e83c32 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepoTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/binary/jcr/JcrBinaryStoreTest.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.publisher;
+package org.apache.sling.distribution.journal.binary.jcr;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertThat;
@@ -54,7 +54,7 @@ import org.mockito.MockitoAnnotations;
import org.mockito.Spy;
import org.osgi.framework.BundleContext;
-public class PackageRepoTest {
+public class JcrBinaryStoreTest {
@Spy
private BundleContext bundleContext = MockOsgi.newBundleContext();
@@ -84,8 +84,7 @@ public class PackageRepoTest {
private Topics topics = new Topics();
@InjectMocks
- private PackageRepo packageRepo;
-
+ private JcrBinaryStore packageRepo;
@Before
public void before() {
@@ -125,7 +124,7 @@ public class PackageRepoTest {
private List<Resource> getPackageNodes(ResourceResolver resolver) throws LoginException {
List<Resource> result = new ArrayList<>();
- Resource root = resolver.getResource(PackageRepo.PACKAGES_ROOT_PATH);
+ Resource root = resolver.getResource(JcrBinaryStore.PACKAGES_ROOT_PATH);
for (Resource pkg : root.getChildren()) {
result.add(pkg);
}