You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cs...@apache.org on 2019/09/30 16:34:48 UTC
[sling-org-apache-sling-distribution-journal] 01/01: SLING-8751 -
Webconsole plugin to download content packages
This is an automated email from the ASF dual-hosted git repository.
cschneider pushed a commit to branch SLING-8751
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
commit 22d179681c97ae8cd01eef2ba36bf0f91af56af7
Author: Christian Schneider <cs...@adobe.com>
AuthorDate: Mon Sep 30 18:34:31 2019 +0200
SLING-8751 - Webconsole plugin to download content packages
---
pom.xml | 7 +-
.../journal/impl/queue/impl/RangePoller.java | 55 ++++--
.../journal/impl/shared/PackageViewerPlugin.java | 173 +++++++++++++++++++
.../impl/shared/PackageViewerPluginTest.java | 184 +++++++++++++++++++++
4 files changed, 400 insertions(+), 19 deletions(-)
diff --git a/pom.xml b/pom.xml
index 3dac69a..edf6d2c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -137,18 +137,21 @@
<artifactId>org.apache.sling.distribution.journal.messages</artifactId>
<version>0.1.2</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>org.apache.felix.webconsole</artifactId>
+ <version>4.3.16</version>
+ </dependency>
<!-- OSGi -->
<dependency>
<groupId>org.osgi</groupId>
<artifactId>osgi.core</artifactId>
- <version>6.0.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.osgi</groupId>
<artifactId>osgi.cmpn</artifactId>
- <version>6.0.0</version>
<scope>provided</scope>
</dependency>
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePoller.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePoller.java
index 850d4d6..58a23e1 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePoller.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePoller.java
@@ -23,20 +23,21 @@ import static org.apache.sling.distribution.journal.HandlerAdapter.create;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.ParametersAreNonnullByDefault;
import org.apache.commons.io.IOUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.sling.distribution.journal.messages.Messages;
-import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
import org.apache.sling.distribution.journal.FullMessage;
import org.apache.sling.distribution.journal.MessageInfo;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.Reset;
+import org.apache.sling.distribution.journal.messages.Messages;
+import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
@ParametersAreNonnullByDefault
public class RangePoller {
@@ -49,29 +50,47 @@ public class RangePoller {
private final Closeable headPoller;
- private final CountDownLatch fetched = new CountDownLatch(1);
+ private final List<FullMessage<PackageMessage>> messages;
- private List<FullMessage<PackageMessage>> messages;
+ private final Semaphore nextMessage;
+ private final AtomicLong lastMessageTime;
+ private final AtomicLong lastOffset;
+ private final AtomicLong numMessages;
public RangePoller(MessagingProvider messagingProvider,
String packageTopic,
long minOffset,
- long maxOffset) {
- this.maxOffset = maxOffset;
+ long maxOffsetExclusive) {
+ this.maxOffset = maxOffsetExclusive;
this.minOffset = minOffset;
this.messages = new ArrayList<>();
+ this.nextMessage = new Semaphore(0);
+ this.lastMessageTime = new AtomicLong(System.currentTimeMillis());
+ this.lastOffset = new AtomicLong();
+ this.numMessages = new AtomicLong();
String assign = messagingProvider.assignTo(minOffset);
- LOG.info("Fetching offsets [{},{}[", minOffset, maxOffset);
+ LOG.info("Fetching offsets [{},{}[", minOffset, maxOffsetExclusive);
headPoller = messagingProvider.createPoller(
packageTopic, Reset.earliest, assign,
create(Messages.PackageMessage.class, this::handlePackage));
}
public List<FullMessage<PackageMessage>> fetchRange() throws InterruptedException {
+ return fetchRange(Integer.MAX_VALUE, Integer.MAX_VALUE);
+ }
+
+ public List<FullMessage<PackageMessage>> fetchRange(int maxMessages, int timeOutMs) throws InterruptedException {
try {
- fetched.await();
- LOG.info("Fetched offsets [{},{}[", minOffset, maxOffset);
- return messages;
+ boolean timeout = false;
+ while (lastOffset.get() < maxOffset && !timeout && this.numMessages.get() < maxMessages) {
+ timeout = !nextMessage.tryAcquire(timeOutMs, TimeUnit.MILLISECONDS);
+ }
+ if (timeout) {
+ LOG.info("Timeout fetching messages. Got messages from {} to {}. Number of messages: {}", minOffset, lastOffset,messages.size());
+ } else {
+ LOG.info("Fetched offsets [{},{}[. Number of messages: {}", minOffset, maxOffset, messages.size());
+ }
+ return new ArrayList<>(messages);
} finally {
IOUtils.closeQuietly(headPoller);
}
@@ -79,12 +98,14 @@ public class RangePoller {
private void handlePackage(MessageInfo info, Messages.PackageMessage message) {
long offset = info.getOffset();
- LOG.debug(String.format("Reading offset %s", offset));
+ LOG.debug("Reading offset {}", offset);
+ this.lastMessageTime.set(System.currentTimeMillis());
+ this.lastOffset.set(offset);
if (offset < maxOffset) {
+ this.numMessages.incrementAndGet();
messages.add(new FullMessage<>(info, message));
- } else {
- fetched.countDown();
}
+ nextMessage.release();
}
}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPlugin.java b/src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPlugin.java
new file mode 100644
index 0000000..991c35d
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPlugin.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.shared;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.List;
+
+import javax.servlet.Servlet;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.felix.webconsole.AbstractWebConsolePlugin;
+import org.apache.felix.webconsole.WebConsoleConstants;
+import org.apache.sling.distribution.journal.FullMessage;
+import org.apache.sling.distribution.journal.JournalAvailable;
+import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.impl.queue.impl.RangePoller;
+import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
+import org.osgi.framework.Constants;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Reference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Optional;
+
+@Component(service = Servlet.class,
+ property = {
+ Constants.SERVICE_DESCRIPTION + "=" + "Package viewer for content distribution",
+ WebConsoleConstants.PLUGIN_LABEL + "=" + PackageViewerPlugin.LABEL,
+ WebConsoleConstants.PLUGIN_TITLE + "=" + PackageViewerPlugin.TITLE
+})
+public class PackageViewerPlugin extends AbstractWebConsolePlugin {
+ private static final int NOT_FOUND = 404;
+ private static final int TIMEOUT = 1000;
+ private static final int MAX_NUM_MESSAGES = 100;
+ private static final long serialVersionUID = -3113699912185558439L;
+ protected static final String LABEL = "distpackages";
+ protected static final String TITLE = "Distribution Package Viewer";
+
+ private final Logger log = LoggerFactory.getLogger(this.getClass());
+
+ @Reference
+ private JournalAvailable journalAvailable;
+
+ @Reference
+ private MessagingProvider messagingProvider;
+
+ @Reference
+ private Topics topics;
+
+ @Override
+ public String getLabel() {
+ return LABEL;
+ }
+
+ @Override
+ public String getTitle() {
+ return TITLE;
+ }
+
+ @Override
+ public String getCategory() {
+ return "Sling";
+ }
+
+ @Override
+ protected void renderContent(HttpServletRequest req, HttpServletResponse res)
+ throws ServletException, IOException {
+ Optional<Long> offset = getOffset(req);
+ if (!offset.isPresent()) {
+ String startOffsetSt = req.getParameter("startOffset");
+ long startOffset = startOffsetSt != null ? new Long(startOffsetSt) : 0;
+ renderPackageList(startOffset, res.getWriter());
+ } else {
+ writePackage(offset.get(), res);
+ }
+ }
+
+ private void renderPackageList(long startOffset, PrintWriter writer) {
+ writer.println("<table class=\"tablesorter nicetable noauto\">");
+ writer.println("<tr><th>Id</th><th>Offset</th><th>Type</th><th>Paths</th></tr>");
+ List<FullMessage<PackageMessage>> msgs = getMessages(startOffset, Integer.MAX_VALUE);
+ msgs.stream().filter(this::notTestMessage).map(this::writeMsg).forEach(writer::println);
+ writer.println("</table>");
+ }
+
+ private String writeMsg(FullMessage<PackageMessage> msg) {
+ return String.format("<tr><td><a href=\"%s/%d\">%s</a></td><td>%d</td><td>%s</td><td>%s</td></tr>",
+ LABEL,
+ msg.getInfo().getOffset(),
+ msg.getMessage().getPkgId(),
+ msg.getInfo().getOffset(),
+ msg.getMessage().getReqType(),
+ msg.getMessage().getPathsList().toString());
+ }
+
+ private void writePackage(Long offset, HttpServletResponse res) throws IOException {
+ log.info("Retrieving package with offset " + offset);
+ Optional<PackageMessage> msg = getPackage(offset);
+ if (msg.isPresent()) {
+ res.setHeader("Content-Type", "application/octet-stream");
+ String filename = msg.get().getPkgId() + ".zip";
+ res.setHeader("Content-Disposition" , "inline; filename=\"" + filename + "\"");
+ msg.get().getPkgBinary().writeTo(res.getOutputStream());
+ } else {
+ res.setStatus(NOT_FOUND);
+ }
+ }
+
+ @Override
+ protected boolean isHtmlRequest(HttpServletRequest request) {
+ return !getOffset(request).isPresent();
+ }
+
+ private Optional<Long> getOffset(HttpServletRequest req) {
+ int startIndex = LABEL.length() + 2;
+ if (startIndex <= req.getPathInfo().length()) {
+ String offsetSt = req.getPathInfo().substring(startIndex);
+ return Optional.of(new Long(offsetSt));
+ } else {
+ return Optional.absent();
+ }
+ }
+
+ private boolean notTestMessage(FullMessage<PackageMessage> msg) {
+ return msg.getMessage().getReqType() != ReqType.TEST;
+ }
+
+ private Optional<PackageMessage> getPackage(long offset) {
+ List<FullMessage<PackageMessage>> messages = getMessages(offset, offset + 1);
+ if (messages.isEmpty()) {
+ return Optional.absent();
+ } else {
+ FullMessage<PackageMessage> fullMsg = messages.iterator().next();
+ PackageMessage msg = fullMsg.getMessage();
+ log.info("Retrieved package with id: {}, offset: {}, type: {}, paths: {}",
+ msg.getPkgId(), fullMsg.getInfo().getOffset(), msg.getReqType(),
+ msg.getPathsList().toString());
+ return Optional.of(fullMsg.getMessage());
+ }
+ }
+
+ protected List<FullMessage<PackageMessage>> getMessages(long startOffset, long endOffset) {
+ try {
+ RangePoller poller = new RangePoller(messagingProvider, topics.getPackageTopic(), startOffset, endOffset);
+ return poller.fetchRange(MAX_NUM_MESSAGES, TIMEOUT);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPluginTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPluginTest.java
new file mode 100644
index 0000000..4d40bbd
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPluginTest.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.shared;
+
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import javax.servlet.ServletException;
+import javax.servlet.ServletOutputStream;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.sling.distribution.journal.FullMessage;
+import org.apache.sling.distribution.journal.MessageInfo;
+import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.Spy;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import com.google.protobuf.ByteString;
+
+@RunWith(MockitoJUnitRunner.class)
+public class PackageViewerPluginTest {
+ @Spy
+ Topics topics = new Topics();
+
+ @Mock
+ private HttpServletRequest req;
+
+ @Mock
+ private HttpServletResponse res;
+
+ @Mock
+ MessagingProvider messagingProvider;
+
+ @Mock
+ ServletOutputStream servletOutputStream;
+
+ @InjectMocks
+ @Spy
+ PackageViewerPlugin viewer;
+
+ private StringWriter outWriter;
+
+ @Before
+ public void before() throws IOException {
+ List<FullMessage<PackageMessage>> messages = Arrays.asList(createPackageMsg(1l));
+ doReturn(messages).when(viewer).getMessages(Mockito.eq(1l), Mockito.anyLong());
+ doReturn(messages).when(viewer).getMessages(Mockito.eq(0l), Mockito.anyLong());
+ doReturn(Collections.emptyList()).when(viewer).getMessages(Mockito.eq(2l), Mockito.anyLong());
+
+ outWriter = new StringWriter();
+ when(res.getWriter()).thenReturn(new PrintWriter(outWriter));
+ when(res.getOutputStream()).thenReturn(servletOutputStream);
+ }
+
+ @Test
+ public void testSimple() {
+ assertThat(viewer.getLabel(), equalTo("distpackages"));
+ assertThat(viewer.getTitle(), equalTo("Distribution Package Viewer"));
+ assertThat(viewer.getCategory(), equalTo("Sling"));
+ }
+
+ @Test
+ public void testPackageList() throws ServletException, IOException {
+ when(req.getPathInfo()).thenReturn("/distpackages");
+
+ viewer.renderContent(req, res);
+
+ String outString = outWriter.getBuffer().toString();
+ System.out.println(outString);
+ assertThat(outString,
+ containsString("<tr><td><a href=\"distpackages/1\">pkgid</a></td><td>1</td><td>ADD</td><td>[/content]</td></tr>"));
+ }
+
+ @Test
+ public void testGetPackage() throws ServletException, IOException {
+ when(req.getPathInfo()).thenReturn("/distpackages/1");
+ viewer.renderContent(req, res);
+
+ verify(viewer).getMessages(Mockito.eq(1l), Mockito.eq(2l));
+ }
+
+ @Test
+ public void testGetPackageNotFound() throws ServletException, IOException {
+ when(req.getPathInfo()).thenReturn("/distpackages/2");
+ viewer.renderContent(req, res);
+ verify(res).setStatus(Mockito.eq(404));
+ verify(viewer).getMessages(Mockito.eq(2l), Mockito.eq(3l));
+ }
+
+ @Test
+ public void testIsHtmlPackage() throws ServletException, IOException {
+ when(req.getPathInfo()).thenReturn("/distpackages/1");
+
+ assertThat(viewer.isHtmlRequest(req), equalTo(false));
+ }
+
+ @Test
+ public void testIsHtmlMain() throws ServletException, IOException {
+ when(req.getPathInfo()).thenReturn("/distpackages");
+
+ assertThat(viewer.isHtmlRequest(req), equalTo(true));
+ }
+
+ private FullMessage<PackageMessage> createPackageMsg(long offset) {
+ MessageInfo info = new TestMessageInfo(offset);
+ PackageMessage message = PackageMessage.newBuilder()
+ .setPubSlingId("")
+ .setReqType(ReqType.ADD)
+ .addPaths("/content")
+ .setPkgId("pkgid")
+ .setPkgType("some_type")
+ .setPkgBinary(ByteString.copyFrom("package content", Charset.defaultCharset()))
+ .build();
+ FullMessage<PackageMessage> msg = new FullMessage<PackageMessage>(info, message);
+ return msg;
+ }
+
+ class TestMessageInfo implements MessageInfo {
+
+ private long offset;
+
+ public TestMessageInfo(long offset) {
+ this.offset = offset;
+ }
+
+ @Override
+ public String getTopic() {
+ return topics.getPackageTopic();
+ }
+
+ @Override
+ public int getPartition() {
+ return 0;
+ }
+
+ @Override
+ public long getOffset() {
+ return this.offset;
+ }
+
+ @Override
+ public long getCreateTime() {
+ return 0;
+ }
+
+ }
+}