You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by ol...@apache.org on 2019/06/02 08:21:43 UTC
[sling-org-apache-sling-clam] 06/09: SLING-8258 Provide HTTP API
for Sling Clam
This is an automated email from the ASF dual-hosted git repository.
olli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-clam.git
commit 5de5f1b6b6622c51c6385c8c47898581dd77c0ee
Author: Oliver Lietz <ol...@apache.org>
AuthorDate: Sat Jun 1 18:47:12 2019 +0200
SLING-8258 Provide HTTP API for Sling Clam
* add events servlet
---
.../clam/http/internal/ClamEventsServlet.java | 192 +++++++++++++++++++++
.../sling/clam/http/internal/ResponseUtil.java | 20 +++
2 files changed, 212 insertions(+)
diff --git a/src/main/java/org/apache/sling/clam/http/internal/ClamEventsServlet.java b/src/main/java/org/apache/sling/clam/http/internal/ClamEventsServlet.java
new file mode 100644
index 0000000..9123f10
--- /dev/null
+++ b/src/main/java/org/apache/sling/clam/http/internal/ClamEventsServlet.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.clam.http.internal;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.servlet.AsyncContext;
+import javax.servlet.AsyncEvent;
+import javax.servlet.AsyncListener;
+import javax.servlet.Servlet;
+import javax.servlet.ServletException;
+import javax.servlet.ServletOutputStream;
+import javax.servlet.WriteListener;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.sling.auth.core.AuthConstants;
+import org.apache.sling.clam.result.JcrPropertyScanResultHandler;
+import org.apache.sling.commons.clam.ScanResult;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.osgi.framework.Constants;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.http.whiteboard.HttpWhiteboardConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.sling.clam.http.internal.ResponseUtil.json;
+
+@Component(
+ service = {
+ Servlet.class,
+ JcrPropertyScanResultHandler.class
+ },
+ property = {
+ Constants.SERVICE_DESCRIPTION + "=Apache Sling Clam Events Servlet",
+ Constants.SERVICE_VENDOR + "=The Apache Software Foundation",
+ HttpWhiteboardConstants.HTTP_WHITEBOARD_CONTEXT_SELECT + "=(osgi.http.whiteboard.context.name=org.osgi.service.http)",
+ HttpWhiteboardConstants.HTTP_WHITEBOARD_SERVLET_PATTERN + "=/system/clam-events",
+ HttpWhiteboardConstants.HTTP_WHITEBOARD_SERVLET_ASYNC_SUPPORTED + "=true",
+ AuthConstants.AUTH_REQUIREMENTS + "=/system/clam-events"
+ }
+)
+public class ClamEventsServlet extends HttpServlet implements JcrPropertyScanResultHandler {
+
+ private List<Client> clients = Collections.synchronizedList(new ArrayList<>());
+
+ private final AtomicLong counter = new AtomicLong(0);
+
+ private static final String JCR_RESULT_EVENT_TYPE = "sling/clam/jcr/result";
+
+ private final Logger logger = LoggerFactory.getLogger(ClamEventsServlet.class);
+
+ @Override
+ protected void doGet(final HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException {
+ response.setCharacterEncoding(StandardCharsets.UTF_8.name());
+ response.setContentType("text/event-stream");
+ response.addHeader("Connection", "close");
+ final AsyncContext context = request.startAsync();
+ context.setTimeout(30000);
+ final Client client = new Client(context);
+ context.getResponse().getOutputStream().setWriteListener(client);
+ clients.add(client);
+ }
+
+ @Override
+ public void handleJcrPropertyScanResult(final @NotNull ScanResult scanResult, final @NotNull String path, final int propertyType, final @Nullable String userId) {
+ final String data = json(scanResult, path, null, propertyType, userId);
+ addEvent(JCR_RESULT_EVENT_TYPE, data);
+ }
+
+ @Override
+ public void handleJcrPropertyScanResult(final @NotNull ScanResult scanResult, final @NotNull String path, final int index, final int propertyType, final @Nullable String userId) {
+ final String data = json(scanResult, path, index, propertyType, userId);
+ addEvent(JCR_RESULT_EVENT_TYPE, data);
+ }
+
+ private void addEvent(final String type, final String data) {
+ final Event event = new Event(type, data);
+ clients.iterator().forEachRemaining(client -> client.addEvent(event));
+ }
+
+ private class Event {
+
+ final String type;
+
+ final String data;
+
+ public Event(final String type, final String data) {
+ this.type = type;
+ this.data = data;
+ }
+
+ }
+
+ private class Client implements AsyncListener, WriteListener {
+
+ private final AsyncContext context;
+
+ private final Queue<Event> events = new ConcurrentLinkedQueue<>();
+
+ private Client(final AsyncContext context) {
+ this.context = context;
+ context.addListener(this);
+ }
+
+ @Override
+ public void onComplete(final AsyncEvent event) throws IOException {
+ logger.debug("on complete: {}", event.getAsyncContext());
+ clients.remove(this);
+ }
+
+ @Override
+ public void onTimeout(final AsyncEvent event) throws IOException {
+ logger.debug("on timeout: {}", event.getAsyncContext());
+ clients.remove(this);
+ }
+
+ @Override
+ public void onError(final AsyncEvent event) throws IOException {
+ logger.debug("on error: {}", event.getAsyncContext());
+ clients.remove(this);
+ }
+
+ @Override
+ public void onStartAsync(final AsyncEvent event) throws IOException {
+ logger.debug("on start async: {}", event.getAsyncContext());
+ }
+
+ @Override
+ public void onWritePossible() throws IOException {
+ final ServletOutputStream outputStream = context.getResponse().getOutputStream();
+ while (outputStream.isReady() && events.peek() != null) {
+ final Event event = events.poll();
+ final String data = String.format("event: %s\ndata: %s\n\n", event.type, event.data);
+ outputStream.write(data.getBytes(StandardCharsets.UTF_8));
+ flushIfReady(outputStream);
+ }
+ flushIfReady(outputStream);
+ }
+
+ @Override
+ public void onError(final Throwable t) {
+ logger.error("on error: {}", t.getMessage(), t);
+ clients.remove(this);
+ context.complete();
+ }
+
+ private void flushIfReady(final ServletOutputStream outputStream) throws IOException {
+ if (outputStream.isReady()) {
+ outputStream.flush();
+ }
+ }
+
+ private void addEvent(final Event event) {
+ final long count = counter.incrementAndGet();
+ logger.info("adding event: {}", count);
+ events.add(event);
+ try {
+ onWritePossible();
+ } catch (Exception e) {
+ logger.error(e.getMessage(), e);
+ }
+ }
+
+ }
+
+}
diff --git a/src/main/java/org/apache/sling/clam/http/internal/ResponseUtil.java b/src/main/java/org/apache/sling/clam/http/internal/ResponseUtil.java
index c0d8723..de8a09c 100644
--- a/src/main/java/org/apache/sling/clam/http/internal/ResponseUtil.java
+++ b/src/main/java/org/apache/sling/clam/http/internal/ResponseUtil.java
@@ -21,12 +21,14 @@ package org.apache.sling.clam.http.internal;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
+import javax.jcr.PropertyType;
import javax.json.Json;
import javax.json.JsonException;
import javax.json.JsonObjectBuilder;
import javax.servlet.ServletException;
import org.apache.sling.api.SlingHttpServletResponse;
+import org.apache.sling.commons.clam.ScanResult;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -50,4 +52,22 @@ public class ResponseUtil {
}
}
+ static String json(@NotNull ScanResult scanResult, @NotNull String path, @Nullable Integer index, int propertyType, @Nullable String userId) {
+ final JsonObjectBuilder event = Json.createObjectBuilder();
+ event.add("timestamp", scanResult.getTimestamp());
+ event.add("status", scanResult.getStatus().name());
+ event.add("message", scanResult.getMessage());
+ event.add("started", scanResult.getStarted());
+ event.add("size", scanResult.getSize());
+ event.add("path", path);
+ if (index != null) {
+ event.add("index", index);
+ }
+ event.add("propertyType", PropertyType.nameFromValue(propertyType));
+ if (userId != null) {
+ event.add("userId", userId);
+ }
+ return event.build().toString();
+ }
+
}