You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cs...@apache.org on 2019/09/30 16:34:48 UTC

[sling-org-apache-sling-distribution-journal] 01/01: SLING-8751 - Webconsole plugin to download content packages

This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch SLING-8751
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git

commit 22d179681c97ae8cd01eef2ba36bf0f91af56af7
Author: Christian Schneider <cs...@adobe.com>
AuthorDate: Mon Sep 30 18:34:31 2019 +0200

    SLING-8751 - Webconsole plugin to download content packages
---
 pom.xml                                            |   7 +-
 .../journal/impl/queue/impl/RangePoller.java       |  55 ++++--
 .../journal/impl/shared/PackageViewerPlugin.java   | 173 +++++++++++++++++++
 .../impl/shared/PackageViewerPluginTest.java       | 184 +++++++++++++++++++++
 4 files changed, 400 insertions(+), 19 deletions(-)

diff --git a/pom.xml b/pom.xml
index 3dac69a..edf6d2c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -137,18 +137,21 @@
             <artifactId>org.apache.sling.distribution.journal.messages</artifactId>
             <version>0.1.2</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.felix</groupId>
+            <artifactId>org.apache.felix.webconsole</artifactId>
+            <version>4.3.16</version>
+        </dependency>
 
         <!-- OSGi -->
         <dependency>
             <groupId>org.osgi</groupId>
             <artifactId>osgi.core</artifactId>
-            <version>6.0.0</version>
             <scope>provided</scope>
         </dependency>
         <dependency>
             <groupId>org.osgi</groupId>
             <artifactId>osgi.cmpn</artifactId>
-            <version>6.0.0</version>
             <scope>provided</scope>
         </dependency>
 
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePoller.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePoller.java
index 850d4d6..58a23e1 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePoller.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePoller.java
@@ -23,20 +23,21 @@ import static org.apache.sling.distribution.journal.HandlerAdapter.create;
 import java.io.Closeable;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
 import javax.annotation.ParametersAreNonnullByDefault;
 
 import org.apache.commons.io.IOUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.sling.distribution.journal.messages.Messages;
-import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
 import org.apache.sling.distribution.journal.FullMessage;
 import org.apache.sling.distribution.journal.MessageInfo;
 import org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.Reset;
+import org.apache.sling.distribution.journal.messages.Messages;
+import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @ParametersAreNonnullByDefault
 public class RangePoller {
@@ -49,29 +50,47 @@ public class RangePoller {
 
     private final Closeable headPoller;
 
-    private final CountDownLatch fetched = new CountDownLatch(1);
+    private final List<FullMessage<PackageMessage>> messages;
 
-    private List<FullMessage<PackageMessage>> messages;
+    private final Semaphore nextMessage;
+    private final AtomicLong lastMessageTime;
+    private final AtomicLong lastOffset;
+    private final AtomicLong numMessages;
     
     public RangePoller(MessagingProvider messagingProvider,
                           String packageTopic,
                           long minOffset,
-                          long maxOffset) {
-        this.maxOffset = maxOffset;
+                          long maxOffsetExclusive) {
+        this.maxOffset = maxOffsetExclusive;
         this.minOffset = minOffset;
         this.messages = new ArrayList<>();
+        this.nextMessage = new Semaphore(0);
+        this.lastMessageTime = new AtomicLong(System.currentTimeMillis());
+        this.lastOffset = new AtomicLong();
+        this.numMessages = new AtomicLong();
         String assign = messagingProvider.assignTo(minOffset);
-        LOG.info("Fetching offsets [{},{}[", minOffset, maxOffset);
+        LOG.info("Fetching offsets [{},{}[", minOffset, maxOffsetExclusive);
         headPoller = messagingProvider.createPoller(
                 packageTopic, Reset.earliest, assign,
                 create(Messages.PackageMessage.class, this::handlePackage));
     }
 
     public List<FullMessage<PackageMessage>> fetchRange() throws InterruptedException {
+        return fetchRange(Integer.MAX_VALUE, Integer.MAX_VALUE);
+    }
+    
+    public List<FullMessage<PackageMessage>> fetchRange(int maxMessages, int timeOutMs) throws InterruptedException {
         try {
-            fetched.await();
-            LOG.info("Fetched offsets [{},{}[", minOffset, maxOffset);
-            return messages;
+            boolean timeout = false;
+            while (lastOffset.get() < maxOffset && !timeout && this.numMessages.get() < maxMessages) {
+                timeout = !nextMessage.tryAcquire(timeOutMs, TimeUnit.MILLISECONDS);
+            }
+            if (timeout) {
+                LOG.info("Timeout fetching messages. Got messages from {} to {}. Number of messages: {}", minOffset, lastOffset,messages.size());
+            } else {
+                LOG.info("Fetched offsets [{},{}[. Number of messages: {}", minOffset, maxOffset, messages.size());
+            }
+            return new ArrayList<>(messages);
         } finally {
             IOUtils.closeQuietly(headPoller);
         }
@@ -79,12 +98,14 @@ public class RangePoller {
 
     private void handlePackage(MessageInfo info, Messages.PackageMessage message) {
         long offset = info.getOffset();
-        LOG.debug(String.format("Reading offset %s", offset));
+        LOG.debug("Reading offset {}", offset);
+        this.lastMessageTime.set(System.currentTimeMillis());
+        this.lastOffset.set(offset);
         if (offset < maxOffset) {
+            this.numMessages.incrementAndGet();
             messages.add(new FullMessage<>(info, message));
-        } else {
-            fetched.countDown();
         }
+        nextMessage.release();
     }
 
 }
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPlugin.java b/src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPlugin.java
new file mode 100644
index 0000000..991c35d
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPlugin.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.shared;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.List;
+
+import javax.servlet.Servlet;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.felix.webconsole.AbstractWebConsolePlugin;
+import org.apache.felix.webconsole.WebConsoleConstants;
+import org.apache.sling.distribution.journal.FullMessage;
+import org.apache.sling.distribution.journal.JournalAvailable;
+import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.impl.queue.impl.RangePoller;
+import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
+import org.osgi.framework.Constants;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Reference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Optional;
+
+@Component(service = Servlet.class,
+        property = {
+        Constants.SERVICE_DESCRIPTION + "=" + "Package viewer for content distribution",
+        WebConsoleConstants.PLUGIN_LABEL + "=" + PackageViewerPlugin.LABEL,
+        WebConsoleConstants.PLUGIN_TITLE + "=" + PackageViewerPlugin.TITLE
+})
+public class PackageViewerPlugin extends AbstractWebConsolePlugin {
+    private static final int NOT_FOUND = 404;
+    private static final int TIMEOUT = 1000;
+    private static final int MAX_NUM_MESSAGES = 100;
+    private static final long serialVersionUID = -3113699912185558439L;
+    protected static final String LABEL = "distpackages";
+    protected static final String TITLE = "Distribution Package Viewer";
+
+    private final Logger log = LoggerFactory.getLogger(this.getClass());
+
+    @Reference
+    private JournalAvailable journalAvailable;
+    
+    @Reference
+    private MessagingProvider messagingProvider;
+    
+    @Reference
+    private Topics topics;
+
+    @Override
+    public String getLabel() {
+        return LABEL;
+    }
+
+    @Override
+    public String getTitle() {
+        return TITLE;
+    }
+
+    @Override
+    public String getCategory() {
+        return "Sling";
+    }
+    
+    @Override
+    protected void renderContent(HttpServletRequest req, HttpServletResponse res)
+            throws ServletException, IOException {
+        Optional<Long> offset = getOffset(req);
+        if (!offset.isPresent()) {
+            String startOffsetSt = req.getParameter("startOffset");
+            long startOffset = startOffsetSt != null ? new Long(startOffsetSt) : 0;
+            renderPackageList(startOffset, res.getWriter());
+        } else {
+            writePackage(offset.get(), res);
+        }
+    }
+
+    private void renderPackageList(long startOffset, PrintWriter writer) {
+        writer.println("<table class=\"tablesorter nicetable noauto\">");
+        writer.println("<tr><th>Id</th><th>Offset</th><th>Type</th><th>Paths</th></tr>");
+        List<FullMessage<PackageMessage>> msgs = getMessages(startOffset, Integer.MAX_VALUE);
+        msgs.stream().filter(this::notTestMessage).map(this::writeMsg).forEach(writer::println);
+        writer.println("</table>");
+    }
+
+    private String writeMsg(FullMessage<PackageMessage> msg) {
+        return String.format("<tr><td><a href=\"%s/%d\">%s</a></td><td>%d</td><td>%s</td><td>%s</td></tr>",
+                LABEL,
+                msg.getInfo().getOffset(),
+                msg.getMessage().getPkgId(),
+                msg.getInfo().getOffset(),
+                msg.getMessage().getReqType(),
+                msg.getMessage().getPathsList().toString());
+    }
+
+    private void writePackage(Long offset, HttpServletResponse res) throws IOException {
+        log.info("Retrieving package with offset " + offset);
+        Optional<PackageMessage> msg = getPackage(offset);
+        if (msg.isPresent()) {
+            res.setHeader("Content-Type", "application/octet-stream");
+            String filename = msg.get().getPkgId() + ".zip";
+            res.setHeader("Content-Disposition" , "inline; filename=\"" + filename + "\"");
+            msg.get().getPkgBinary().writeTo(res.getOutputStream());
+        } else {
+            res.setStatus(NOT_FOUND);
+        }
+    }
+    
+    @Override
+    protected boolean isHtmlRequest(HttpServletRequest request) {
+        return !getOffset(request).isPresent();
+    }
+
+    private Optional<Long> getOffset(HttpServletRequest req) {
+        int startIndex = LABEL.length() + 2;
+        if (startIndex <= req.getPathInfo().length()) {
+            String offsetSt = req.getPathInfo().substring(startIndex);
+            return Optional.of(new Long(offsetSt));
+        } else {
+            return Optional.absent();
+        }
+    }
+
+    private boolean notTestMessage(FullMessage<PackageMessage> msg) {
+        return msg.getMessage().getReqType() != ReqType.TEST;
+    }
+    
+    private Optional<PackageMessage> getPackage(long offset) {
+        List<FullMessage<PackageMessage>> messages = getMessages(offset, offset + 1);
+        if (messages.isEmpty()) {
+            return Optional.absent();
+        } else {
+            FullMessage<PackageMessage> fullMsg = messages.iterator().next();
+            PackageMessage msg = fullMsg.getMessage();
+            log.info("Retrieved package with id: {}, offset: {}, type: {}, paths: {}",
+                    msg.getPkgId(), fullMsg.getInfo().getOffset(), msg.getReqType(),
+                    msg.getPathsList().toString());
+            return Optional.of(fullMsg.getMessage());
+        }
+    }
+
+    protected List<FullMessage<PackageMessage>> getMessages(long startOffset, long endOffset) {
+        try {
+            RangePoller poller = new RangePoller(messagingProvider, topics.getPackageTopic(), startOffset, endOffset);
+            return poller.fetchRange(MAX_NUM_MESSAGES, TIMEOUT);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new RuntimeException(e.getMessage(), e);
+        }
+    }
+
+}
\ No newline at end of file
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPluginTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPluginTest.java
new file mode 100644
index 0000000..4d40bbd
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPluginTest.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.shared;
+
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import javax.servlet.ServletException;
+import javax.servlet.ServletOutputStream;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.sling.distribution.journal.FullMessage;
+import org.apache.sling.distribution.journal.MessageInfo;
+import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.Spy;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import com.google.protobuf.ByteString;
+
+@RunWith(MockitoJUnitRunner.class)
+public class PackageViewerPluginTest {
+    @Spy
+    Topics topics = new Topics();
+
+    @Mock
+    private HttpServletRequest req;
+    
+    @Mock
+    private HttpServletResponse res;
+    
+    @Mock
+    MessagingProvider messagingProvider;
+    
+    @Mock
+    ServletOutputStream servletOutputStream;
+    
+    @InjectMocks
+    @Spy
+    PackageViewerPlugin viewer;
+
+    private StringWriter outWriter;
+    
+    @Before
+    public void before() throws IOException {
+        List<FullMessage<PackageMessage>> messages  = Arrays.asList(createPackageMsg(1l));
+        doReturn(messages).when(viewer).getMessages(Mockito.eq(1l), Mockito.anyLong());
+        doReturn(messages).when(viewer).getMessages(Mockito.eq(0l), Mockito.anyLong());
+        doReturn(Collections.emptyList()).when(viewer).getMessages(Mockito.eq(2l), Mockito.anyLong());
+
+        outWriter = new StringWriter();
+        when(res.getWriter()).thenReturn(new PrintWriter(outWriter));
+        when(res.getOutputStream()).thenReturn(servletOutputStream);
+    }
+
+    @Test
+    public void testSimple() {
+        assertThat(viewer.getLabel(), equalTo("distpackages"));
+        assertThat(viewer.getTitle(), equalTo("Distribution Package Viewer"));
+        assertThat(viewer.getCategory(), equalTo("Sling"));
+    }
+    
+    @Test
+    public void testPackageList() throws ServletException, IOException {
+        when(req.getPathInfo()).thenReturn("/distpackages");
+        
+        viewer.renderContent(req, res);
+
+        String outString = outWriter.getBuffer().toString();
+        System.out.println(outString);
+        assertThat(outString, 
+                containsString("<tr><td><a href=\"distpackages/1\">pkgid</a></td><td>1</td><td>ADD</td><td>[/content]</td></tr>"));
+    }
+
+    @Test
+    public void testGetPackage() throws ServletException, IOException {
+        when(req.getPathInfo()).thenReturn("/distpackages/1");
+        viewer.renderContent(req, res);
+        
+        verify(viewer).getMessages(Mockito.eq(1l), Mockito.eq(2l));
+    }
+    
+    @Test
+    public void testGetPackageNotFound() throws ServletException, IOException {
+        when(req.getPathInfo()).thenReturn("/distpackages/2");
+        viewer.renderContent(req, res);
+        verify(res).setStatus(Mockito.eq(404));
+        verify(viewer).getMessages(Mockito.eq(2l), Mockito.eq(3l));
+    }
+    
+    @Test
+    public void testIsHtmlPackage() throws ServletException, IOException {
+        when(req.getPathInfo()).thenReturn("/distpackages/1");
+
+        assertThat(viewer.isHtmlRequest(req), equalTo(false));
+    }
+
+    @Test
+    public void testIsHtmlMain() throws ServletException, IOException {
+        when(req.getPathInfo()).thenReturn("/distpackages");
+
+        assertThat(viewer.isHtmlRequest(req), equalTo(true));
+    }
+
+    private FullMessage<PackageMessage> createPackageMsg(long offset) {
+        MessageInfo info = new TestMessageInfo(offset);
+        PackageMessage message = PackageMessage.newBuilder()
+                .setPubSlingId("")
+                .setReqType(ReqType.ADD)
+                .addPaths("/content")
+                .setPkgId("pkgid")
+                .setPkgType("some_type")
+                .setPkgBinary(ByteString.copyFrom("package content", Charset.defaultCharset()))
+                .build();
+        FullMessage<PackageMessage> msg = new FullMessage<PackageMessage>(info, message);
+        return msg;
+    }
+    
+    class TestMessageInfo implements MessageInfo {
+        
+        private long offset;
+
+        public TestMessageInfo(long offset) {
+            this.offset = offset;
+        }
+
+        @Override
+        public String getTopic() {
+            return topics.getPackageTopic();
+        }
+
+        @Override
+        public int getPartition() {
+            return 0;
+        }
+
+        @Override
+        public long getOffset() {
+            return this.offset;
+        }
+
+        @Override
+        public long getCreateTime() {
+            return 0;
+        }
+        
+    }
+}