You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by ol...@apache.org on 2019/06/02 08:21:43 UTC

[sling-org-apache-sling-clam] 06/09: SLING-8258 Provide HTTP API for Sling Clam

This is an automated email from the ASF dual-hosted git repository.

olli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-clam.git

commit 5de5f1b6b6622c51c6385c8c47898581dd77c0ee
Author: Oliver Lietz <ol...@apache.org>
AuthorDate: Sat Jun 1 18:47:12 2019 +0200

    SLING-8258 Provide HTTP API for Sling Clam
    
    * add events servlet
---
 .../clam/http/internal/ClamEventsServlet.java      | 192 +++++++++++++++++++++
 .../sling/clam/http/internal/ResponseUtil.java     |  20 +++
 2 files changed, 212 insertions(+)

diff --git a/src/main/java/org/apache/sling/clam/http/internal/ClamEventsServlet.java b/src/main/java/org/apache/sling/clam/http/internal/ClamEventsServlet.java
new file mode 100644
index 0000000..9123f10
--- /dev/null
+++ b/src/main/java/org/apache/sling/clam/http/internal/ClamEventsServlet.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.clam.http.internal;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.servlet.AsyncContext;
+import javax.servlet.AsyncEvent;
+import javax.servlet.AsyncListener;
+import javax.servlet.Servlet;
+import javax.servlet.ServletException;
+import javax.servlet.ServletOutputStream;
+import javax.servlet.WriteListener;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.sling.auth.core.AuthConstants;
+import org.apache.sling.clam.result.JcrPropertyScanResultHandler;
+import org.apache.sling.commons.clam.ScanResult;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.osgi.framework.Constants;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.http.whiteboard.HttpWhiteboardConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.sling.clam.http.internal.ResponseUtil.json;
+
+@Component(
+    service = {
+        Servlet.class,
+        JcrPropertyScanResultHandler.class
+    },
+    property = {
+        Constants.SERVICE_DESCRIPTION + "=Apache Sling Clam Events Servlet",
+        Constants.SERVICE_VENDOR + "=The Apache Software Foundation",
+        HttpWhiteboardConstants.HTTP_WHITEBOARD_CONTEXT_SELECT + "=(osgi.http.whiteboard.context.name=org.osgi.service.http)",
+        HttpWhiteboardConstants.HTTP_WHITEBOARD_SERVLET_PATTERN + "=/system/clam-events",
+        HttpWhiteboardConstants.HTTP_WHITEBOARD_SERVLET_ASYNC_SUPPORTED + "=true",
+        AuthConstants.AUTH_REQUIREMENTS + "=/system/clam-events"
+    }
+)
+public class ClamEventsServlet extends HttpServlet implements JcrPropertyScanResultHandler {
+
+    private List<Client> clients = Collections.synchronizedList(new ArrayList<>());
+
+    private final AtomicLong counter = new AtomicLong(0);
+
+    private static final String JCR_RESULT_EVENT_TYPE = "sling/clam/jcr/result";
+
+    private final Logger logger = LoggerFactory.getLogger(ClamEventsServlet.class);
+
+    @Override
+    protected void doGet(final HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException {
+        response.setCharacterEncoding(StandardCharsets.UTF_8.name());
+        response.setContentType("text/event-stream");
+        response.addHeader("Connection", "close");
+        final AsyncContext context = request.startAsync();
+        context.setTimeout(30000);
+        final Client client = new Client(context);
+        context.getResponse().getOutputStream().setWriteListener(client);
+        clients.add(client);
+    }
+
+    @Override
+    public void handleJcrPropertyScanResult(final @NotNull ScanResult scanResult, final @NotNull String path, final int propertyType, final @Nullable String userId) {
+        final String data = json(scanResult, path, null, propertyType, userId);
+        addEvent(JCR_RESULT_EVENT_TYPE, data);
+    }
+
+    @Override
+    public void handleJcrPropertyScanResult(final @NotNull ScanResult scanResult, final @NotNull String path, final int index, final int propertyType, final @Nullable String userId) {
+        final String data = json(scanResult, path, index, propertyType, userId);
+        addEvent(JCR_RESULT_EVENT_TYPE, data);
+    }
+
+    private void addEvent(final String type, final String data) {
+        final Event event = new Event(type, data);
+        clients.iterator().forEachRemaining(client -> client.addEvent(event));
+    }
+
+    private class Event {
+
+        final String type;
+
+        final String data;
+
+        public Event(final String type, final String data) {
+            this.type = type;
+            this.data = data;
+        }
+
+    }
+
+    private class Client implements AsyncListener, WriteListener {
+
+        private final AsyncContext context;
+
+        private final Queue<Event> events = new ConcurrentLinkedQueue<>();
+
+        private Client(final AsyncContext context) {
+            this.context = context;
+            context.addListener(this);
+        }
+
+        @Override
+        public void onComplete(final AsyncEvent event) throws IOException {
+            logger.debug("on complete: {}", event.getAsyncContext());
+            clients.remove(this);
+        }
+
+        @Override
+        public void onTimeout(final AsyncEvent event) throws IOException {
+            logger.debug("on timeout: {}", event.getAsyncContext());
+            clients.remove(this);
+        }
+
+        @Override
+        public void onError(final AsyncEvent event) throws IOException {
+            logger.debug("on error: {}", event.getAsyncContext());
+            clients.remove(this);
+        }
+
+        @Override
+        public void onStartAsync(final AsyncEvent event) throws IOException {
+            logger.debug("on start async: {}", event.getAsyncContext());
+        }
+
+        @Override
+        public void onWritePossible() throws IOException {
+            final ServletOutputStream outputStream = context.getResponse().getOutputStream();
+            while (outputStream.isReady() && events.peek() != null) {
+                final Event event = events.poll();
+                final String data = String.format("event: %s\ndata: %s\n\n", event.type, event.data);
+                outputStream.write(data.getBytes(StandardCharsets.UTF_8));
+                flushIfReady(outputStream);
+            }
+            flushIfReady(outputStream);
+        }
+
+        @Override
+        public void onError(final Throwable t) {
+            logger.error("on error: {}", t.getMessage(), t);
+            clients.remove(this);
+            context.complete();
+        }
+
+        private void flushIfReady(final ServletOutputStream outputStream) throws IOException {
+            if (outputStream.isReady()) {
+                outputStream.flush();
+            }
+        }
+
+        private void addEvent(final Event event) {
+            final long count = counter.incrementAndGet();
+            logger.info("adding event: {}", count);
+            events.add(event);
+            try {
+                onWritePossible();
+            } catch (Exception e) {
+                logger.error(e.getMessage(), e);
+            }
+        }
+
+    }
+
+}
diff --git a/src/main/java/org/apache/sling/clam/http/internal/ResponseUtil.java b/src/main/java/org/apache/sling/clam/http/internal/ResponseUtil.java
index c0d8723..de8a09c 100644
--- a/src/main/java/org/apache/sling/clam/http/internal/ResponseUtil.java
+++ b/src/main/java/org/apache/sling/clam/http/internal/ResponseUtil.java
@@ -21,12 +21,14 @@ package org.apache.sling.clam.http.internal;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 
+import javax.jcr.PropertyType;
 import javax.json.Json;
 import javax.json.JsonException;
 import javax.json.JsonObjectBuilder;
 import javax.servlet.ServletException;
 
 import org.apache.sling.api.SlingHttpServletResponse;
+import org.apache.sling.commons.clam.ScanResult;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
@@ -50,4 +52,22 @@ public class ResponseUtil {
         }
     }
 
+    static String json(@NotNull ScanResult scanResult, @NotNull String path, @Nullable Integer index, int propertyType, @Nullable String userId) {
+        final JsonObjectBuilder event = Json.createObjectBuilder();
+        event.add("timestamp", scanResult.getTimestamp());
+        event.add("status", scanResult.getStatus().name());
+        event.add("message", scanResult.getMessage());
+        event.add("started", scanResult.getStarted());
+        event.add("size", scanResult.getSize());
+        event.add("path", path);
+        if (index != null) {
+            event.add("index", index);
+        }
+        event.add("propertyType", PropertyType.nameFromValue(propertyType));
+        if (userId != null) {
+            event.add("userId", userId);
+        }
+        return event.build().toString();
+    }
+
 }