You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by np...@apache.org on 2018/01/15 15:38:22 UTC

[sling-org-apache-sling-pipes] branch master updated: SLING-7173 add JMX for pipes

This is an automated email from the ASF dual-hosted git repository.

npeltier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-pipes.git


The following commit(s) were added to refs/heads/master by this push:
     new d70bd98  SLING-7173 add JMX for pipes
     new 01b37fc  Merge branch 'master' of github.com:apache/sling-org-apache-sling-pipes
d70bd98 is described below

commit d70bd984b6b70a036f6b074b1035318a2e83ca79
Author: npeltier <pe...@gmail.com>
AuthorDate: Mon Jan 15 16:37:11 2018 +0100

    SLING-7173 add JMX for pipes
    
    - at plumber (for refreshing which pipes are monitored),
    - pipe monitor for pipes (if @monitored property is added, and pipe service user is present),
    - show # of execution, mean time, and add basic possibility of running the pipe (in asynchronous mode)q
---
 .../org/apache/sling/pipes/ExecutionResult.java    |  35 ++++++
 src/main/java/org/apache/sling/pipes/Plumber.java  |   8 ++
 .../java/org/apache/sling/pipes/PlumberMXBean.java |  23 ++++
 .../apache/sling/pipes/internal/JsonWriter.java    |   2 +
 .../sling/pipes/internal/PipeBuilderImpl.java      |   3 +-
 .../apache/sling/pipes/internal/PipeMonitor.java   | 135 +++++++++++++++++++++
 .../sling/pipes/internal/PipeMonitorMBean.java     |  44 +++++++
 .../apache/sling/pipes/internal/PlumberImpl.java   | 127 +++++++++++++++++--
 8 files changed, 362 insertions(+), 15 deletions(-)

diff --git a/src/main/java/org/apache/sling/pipes/ExecutionResult.java b/src/main/java/org/apache/sling/pipes/ExecutionResult.java
index c0cd43c..2f8a4a3 100644
--- a/src/main/java/org/apache/sling/pipes/ExecutionResult.java
+++ b/src/main/java/org/apache/sling/pipes/ExecutionResult.java
@@ -18,6 +18,12 @@ package org.apache.sling.pipes;
 
 import org.apache.sling.api.resource.Resource;
 
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.CompositeDataSupport;
+import javax.management.openmbean.CompositeType;
+import javax.management.openmbean.OpenDataException;
+import javax.management.openmbean.OpenType;
+import javax.management.openmbean.SimpleType;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.Set;
@@ -27,6 +33,22 @@ import java.util.Set;
  */
 public class ExecutionResult {
 
+    private static final String[] JMX_NAMES = new String[] {"size", "output"};
+
+    private static final CompositeType COMPOSITE_TYPE = getType();
+
+    static CompositeType getType() {
+        try {
+            return new CompositeType(ExecutionResult.class.getName(),
+                    "Execution of pipe, with size, and output as pipe configuration defined it",
+                    JMX_NAMES,
+                    new String[] {"total size", "output as string"},
+                    new OpenType[]{SimpleType.LONG, SimpleType.STRING});
+        } catch (OpenDataException e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
     /**
      * *not* meant to hold the all the paths, just a set that is emptied each time
      * it's persisted.
@@ -35,6 +57,8 @@ public class ExecutionResult {
 
     OutputWriter writer;
 
+    CompositeData data;
+
     /**
      * Constructor
      * @param writer output writer around which to create the result
@@ -80,4 +104,15 @@ public class ExecutionResult {
     public String toString() {
         return writer.toString();
     }
+
+    /**
+     * @return Composite data view of that result. With size of the execution, and string output (can be json, csv, ...)
+     * @throws OpenDataException in case something went wrong building up the composite data
+     */
+    public CompositeData asCompositeData() throws OpenDataException {
+        if (data == null) {
+            data = new CompositeDataSupport(COMPOSITE_TYPE, JMX_NAMES, new Object[]{size(), toString()});
+        }
+        return data;
+    }
 }
diff --git a/src/main/java/org/apache/sling/pipes/Plumber.java b/src/main/java/org/apache/sling/pipes/Plumber.java
index ec5fa09..c0a1a4d 100644
--- a/src/main/java/org/apache/sling/pipes/Plumber.java
+++ b/src/main/java/org/apache/sling/pipes/Plumber.java
@@ -48,6 +48,14 @@ public interface Plumber {
     Job executeAsync(ResourceResolver resolver, String path, Map bindings);
 
     /**
+     * executes in a background thread
+     * @param path path of the pipe to execute
+     * @param bindings additional bindings to use when executing
+     * @return Job if registered, null otherwise
+     */
+    Job executeAsync(String path, Map bindings);
+
+    /**
      * Executes a pipe at a certain path
      * @param resolver resource resolver with which pipe will be executed
      * @param path path of a valid pipe configuration
diff --git a/src/main/java/org/apache/sling/pipes/PlumberMXBean.java b/src/main/java/org/apache/sling/pipes/PlumberMXBean.java
new file mode 100644
index 0000000..f3b31eb
--- /dev/null
+++ b/src/main/java/org/apache/sling/pipes/PlumberMXBean.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sling.pipes;
+
+public interface PlumberMXBean {
+
+    void refreshMonitoredPipes();
+
+}
diff --git a/src/main/java/org/apache/sling/pipes/internal/JsonWriter.java b/src/main/java/org/apache/sling/pipes/internal/JsonWriter.java
index 8a69294..762c8a5 100644
--- a/src/main/java/org/apache/sling/pipes/internal/JsonWriter.java
+++ b/src/main/java/org/apache/sling/pipes/internal/JsonWriter.java
@@ -24,6 +24,7 @@ import org.apache.sling.pipes.CustomOutputWriter;
 import javax.json.Json;
 import javax.json.JsonValue;
 import javax.json.stream.JsonGenerator;
+import java.io.StringWriter;
 import java.io.Writer;
 import java.util.Map;
 
@@ -37,6 +38,7 @@ public class JsonWriter extends CustomOutputWriter {
     public static final String JSON_EXTENSION = "json";
 
     JsonWriter(){
+        setWriter(new StringWriter());
     }
 
     JsonWriter(Writer writer){
diff --git a/src/main/java/org/apache/sling/pipes/internal/PipeBuilderImpl.java b/src/main/java/org/apache/sling/pipes/internal/PipeBuilderImpl.java
index 68e929f..ce28d44 100644
--- a/src/main/java/org/apache/sling/pipes/internal/PipeBuilderImpl.java
+++ b/src/main/java/org/apache/sling/pipes/internal/PipeBuilderImpl.java
@@ -364,8 +364,7 @@ public class PipeBuilderImpl implements PipeBuilder {
 
     @Override
     public ExecutionResult run(Map bindings) throws Exception {
-        StringWriter stringWriter = new StringWriter();
-        JsonWriter writer = new JsonWriter(stringWriter);
+        JsonWriter writer = new JsonWriter();
         writer.starts();
         Pipe pipe = this.build();
         return plumber.execute(resolver, pipe, bindings,  writer , true);
diff --git a/src/main/java/org/apache/sling/pipes/internal/PipeMonitor.java b/src/main/java/org/apache/sling/pipes/internal/PipeMonitor.java
new file mode 100644
index 0000000..4981420
--- /dev/null
+++ b/src/main/java/org/apache/sling/pipes/internal/PipeMonitor.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sling.pipes.internal;
+
+import org.apache.sling.event.jobs.Job;
+import org.apache.sling.pipes.BasePipe;
+import org.apache.sling.pipes.ExecutionResult;
+import org.apache.sling.pipes.Pipe;
+import org.apache.sling.pipes.Plumber;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.OpenDataException;
+import java.util.HashMap;
+import java.util.Map;
+
+public class PipeMonitor implements PipeMonitorMBean {
+    protected static final Logger LOGGER = LoggerFactory.getLogger(PipeMonitor.class);
+
+    String name;
+
+    String path;
+
+    boolean running = false;
+
+    String status;
+
+    long lastStarted;
+
+    long duration;
+
+    int executions = 0;
+
+    int failed = 0;
+
+    long mean;
+
+    Plumber plumber;
+
+    ExecutionResult lastResult;
+
+    public void starts(){
+        lastStarted = System.currentTimeMillis();
+        running = true;
+        status = BasePipe.STATUS_STARTED;
+    }
+
+    public void ends() {
+        duration = System.currentTimeMillis() - lastStarted;
+        mean = ((mean * executions) + duration) / (executions + 1);
+        executions ++;
+        running = false;
+        status = BasePipe.STATUS_FINISHED;
+    }
+
+    public long getFailed(){
+        return failed;
+    }
+
+    @Override
+    public String getStatus() {
+        return status;
+    }
+
+    public void failed() {
+        failed ++;
+        running = false;
+        status = BasePipe.STATUS_FINISHED;
+    }
+
+    public PipeMonitor(Plumber currentPlumber, Pipe pipe){
+        plumber = currentPlumber;
+        name = pipe.getName();
+        path = pipe.getResource().getPath();
+    }
+
+    @Override
+    public String getName() {
+        return name;
+    }
+
+    @Override
+    public String getPath() {
+        return path;
+    }
+
+    @Override
+    public long getExecutionCount() {
+        return executions;
+    }
+
+    @Override
+    public long getMeanDurationMilliseconds() {
+        return mean;
+    }
+
+    public void setLastResult(ExecutionResult result) {
+        lastResult = result;
+    }
+
+    @Override
+    public CompositeData getLastResult() {
+        try {
+            if (lastResult != null){
+                return lastResult.asCompositeData();
+            }
+        } catch (OpenDataException e) {
+            LOGGER.error("unable to dump last result as composite data", e);
+        }
+        return null;
+    }
+
+    @Override
+    public String run() {
+        Map bindings = new HashMap<>();
+        bindings.put(BasePipe.READ_ONLY, false);
+        Job job = plumber.executeAsync(path, bindings);
+        return String.format("Job %s has been created", job.getId());
+    }
+}
diff --git a/src/main/java/org/apache/sling/pipes/internal/PipeMonitorMBean.java b/src/main/java/org/apache/sling/pipes/internal/PipeMonitorMBean.java
new file mode 100644
index 0000000..2187541
--- /dev/null
+++ b/src/main/java/org/apache/sling/pipes/internal/PipeMonitorMBean.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sling.pipes.internal;
+
+import org.osgi.annotation.versioning.ProviderType;
+
+import javax.management.openmbean.CompositeData;
+
+/**
+ * Mbean for providing pipe related statistics and actions
+ */
+@ProviderType
+public interface PipeMonitorMBean {
+
+    String getName();
+
+    String getPath();
+
+    long getExecutionCount();
+
+    long getMeanDurationMilliseconds();
+
+    long getFailed();
+
+    String getStatus();
+
+    CompositeData getLastResult();
+
+    String run();
+}
diff --git a/src/main/java/org/apache/sling/pipes/internal/PlumberImpl.java b/src/main/java/org/apache/sling/pipes/internal/PlumberImpl.java
index 995b7bf..05dcb69 100644
--- a/src/main/java/org/apache/sling/pipes/internal/PlumberImpl.java
+++ b/src/main/java/org/apache/sling/pipes/internal/PlumberImpl.java
@@ -16,6 +16,7 @@
  */
 package org.apache.sling.pipes.internal;
 
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.sling.api.SlingConstants;
 import org.apache.sling.api.resource.LoginException;
@@ -40,6 +41,7 @@ import org.apache.sling.pipes.Pipe;
 import org.apache.sling.pipes.PipeBindings;
 import org.apache.sling.pipes.PipeBuilder;
 import org.apache.sling.pipes.Plumber;
+import org.apache.sling.pipes.PlumberMXBean;
 import org.apache.sling.pipes.ReferencePipe;
 import org.apache.sling.pipes.internal.slingQuery.ChildrenPipe;
 import org.apache.sling.pipes.internal.slingQuery.ClosestPipe;
@@ -49,6 +51,7 @@ import org.apache.sling.pipes.internal.slingQuery.ParentsPipe;
 import org.apache.sling.pipes.internal.slingQuery.SiblingsPipe;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
 import org.osgi.service.component.annotations.Reference;
 import org.osgi.service.component.annotations.ReferenceCardinality;
 import org.osgi.service.component.annotations.ReferencePolicy;
@@ -59,8 +62,14 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.jcr.RepositoryException;
+import javax.jcr.query.Query;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Calendar;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Date;
 import java.util.GregorianCalendar;
@@ -82,10 +91,15 @@ import static org.apache.sling.pipes.BasePipe.STATUS_STARTED;
         JobConsumer.PROPERTY_TOPICS +"="+PlumberImpl.SLING_EVENT_TOPIC
 })
 @Designate(ocd = PlumberImpl.Configuration.class)
-public class PlumberImpl implements Plumber, JobConsumer {
+public class PlumberImpl implements Plumber, JobConsumer, PlumberMXBean {
     private final Logger log = LoggerFactory.getLogger(this.getClass());
     public static final int DEFAULT_BUFFER_SIZE = 1000;
 
+    protected final static String PN_MONITORED = "monitored";
+    protected final static String MONITORED_PIPES_QUERY = String.format("//element(*,nt:base)[@sling:resourceType='%s' and @%s]", ContainerPipe.RESOURCE_TYPE, PN_MONITORED);
+
+    protected final static String MBEAN_NAME_FORMAT = "org.apache.sling.pipes:name=%s";
+
     @ObjectClassDefinition(name="Apache Sling Pipes : Plumber configuration")
     public @interface Configuration {
         @AttributeDefinition(description="Number of iterations after which plumber should saves a pipe execution")
@@ -111,6 +125,8 @@ public class PlumberImpl implements Plumber, JobConsumer {
 
     private List<String> allowedUsers;
 
+    private Map<String, PipeMonitor> monitoredPipes;
+
     @Activate
     public void activate(Configuration configuration){
         this.configuration = configuration;
@@ -138,6 +154,38 @@ public class PlumberImpl implements Plumber, JobConsumer {
         registerPipe(SiblingsPipe.RESOURCE_TYPE, SiblingsPipe.class);
         registerPipe(ClosestPipe.RESOURCE_TYPE, ClosestPipe.class);
         registerPipe(FindPipe.RESOURCE_TYPE, FindPipe.class);
+        toggleJmxRegistration(this, PlumberMXBean.class.getName(), true);
+        refreshMonitoredPipes();
+    }
+
+    @Deactivate
+    public void deactivate(){
+        toggleJmxRegistration(null, PlumberMXBean.class.getName(), false);
+        if (monitoredPipes != null){
+            for (String path : monitoredPipes.keySet()){
+                toggleJmxRegistration(null, path, false);
+            }
+        }
+    }
+
+    /**
+     * Toggle some mbean registration
+     * @param name partial name that will be used for registration
+     * @param register true to register, false to unregister
+     */
+    private void toggleJmxRegistration(Object instance, String name, boolean register){
+        try {
+            MBeanServer server = ManagementFactory.getPlatformMBeanServer();
+            ObjectName oName = ObjectName.getInstance(String.format(MBEAN_NAME_FORMAT, name));
+            if (register && !server.isRegistered(oName)) {
+                server.registerMBean(instance, oName);
+            }
+            if (!register && server.isRegistered(oName)){
+                server.unregisterMBean(oName);
+            }
+        } catch (Exception e) {
+            log.error("unable to toggle mbean {} registration", name, e);
+        }
     }
 
     @Reference(policy= ReferencePolicy.DYNAMIC, cardinality= ReferenceCardinality.OPTIONAL)
@@ -167,18 +215,23 @@ public class PlumberImpl implements Plumber, JobConsumer {
     @Override
     public Job executeAsync(ResourceResolver resolver, String path, Map bindings) {
         if (allowedUsers.contains(resolver.getUserID())) {
-            if (StringUtils.isBlank((String)serviceUser.get(SUBSERVICE))) {
-                log.error("please configure plumber service user");
-            }
-            final Map props = new HashMap();
-            props.put(SlingConstants.PROPERTY_PATH, path);
-            props.put(PipeBindings.NN_ADDITIONALBINDINGS, bindings);
-            return jobManager.addJob(SLING_EVENT_TOPIC, props);
+            return executeAsync(path, bindings);
         }
         return null;
     }
 
     @Override
+    public Job executeAsync(String path, Map bindings) {
+        if (StringUtils.isBlank((String)serviceUser.get(SUBSERVICE))) {
+            log.error("please configure plumber service user");
+        }
+        final Map props = new HashMap();
+        props.put(SlingConstants.PROPERTY_PATH, path);
+        props.put(PipeBindings.NN_ADDITIONALBINDINGS, bindings);
+        return jobManager.addJob(SLING_EVENT_TOPIC, props);
+    }
+
+    @Override
     public ExecutionResult execute(ResourceResolver resolver, String path, Map additionalBindings, OutputWriter writer, boolean save) throws Exception {
         Resource pipeResource = resolver.getResource(path);
         Pipe pipe = getPipe(pipeResource);
@@ -193,18 +246,24 @@ public class PlumberImpl implements Plumber, JobConsumer {
 
     @Override
     public ExecutionResult execute(ResourceResolver resolver, Pipe pipe, Map additionalBindings, OutputWriter writer, boolean save) throws Exception {
+        boolean success = false;
+        PipeMonitor monitor = null;
         try {
+            log.info("[{}] execution starts, save ({})", pipe, save);
             if (additionalBindings != null && pipe instanceof ContainerPipe){
                 pipe.getBindings().addBindings(additionalBindings);
             }
-            log.info("[{}] execution starts, save ({})", pipe, save);
+            Resource confResource = pipe.getResource();
             writer.setPipe(pipe);
-            if (isRunning(pipe.getResource())){
+            if (isRunning(confResource)){
                 throw new RuntimeException("Pipe is already running");
             }
+            monitor = monitoredPipes.get(confResource.getPath());
             writeStatus(pipe, STATUS_STARTED);
             resolver.commit();
-
+            if (monitor != null){
+                monitor.starts();
+            }
             ExecutionResult result = new ExecutionResult(writer);
             for (Iterator<Resource> it = pipe.getOutput(); it.hasNext();){
                 Resource resource = it.next();
@@ -217,12 +276,20 @@ public class PlumberImpl implements Plumber, JobConsumer {
             if (save && pipe.modifiesContent()) {
                 persist(resolver, pipe, result, null);
             }
-            log.info("[{}] done executing.", pipe.getName());
             writer.ends();
+            if (monitor != null){
+                monitor.ends();
+                monitor.setLastResult(result);
+            }
+            success = true;
             return result;
         } finally {
             writeStatus(pipe, STATUS_FINISHED);
             resolver.commit();
+            log.info("[{}] done executing.", pipe.getName());
+            if (!success && monitor != null){
+                monitor.failed();
+            }
         }
     }
 
@@ -312,7 +379,9 @@ public class PlumberImpl implements Plumber, JobConsumer {
         try(ResourceResolver resolver = factory.getServiceResourceResolver(serviceUser)){
             String path = (String)job.getProperty(SlingConstants.PROPERTY_PATH);
             Map bindings = (Map)job.getProperty(PipeBindings.NN_ADDITIONALBINDINGS);
-            execute(resolver, path, bindings, new NopWriter(), true);
+            OutputWriter writer = new JsonWriter();
+            writer.starts();
+            execute(resolver, path, bindings, writer, true);
             return JobResult.OK;
         } catch (LoginException e) {
             log.error("unable to retrieve resolver for executing scheduled pipe", e);
@@ -321,4 +390,36 @@ public class PlumberImpl implements Plumber, JobConsumer {
         }
         return JobResult.FAILED;
     }
+
+    @Override
+    public void refreshMonitoredPipes() {
+        Map<String, PipeMonitor> map = new HashMap<>();
+        getMonitoredPipes().stream().forEach(bean -> map.put(bean.getPath(), bean));
+        if (monitoredPipes != null) {
+            Collection<String> shouldBeRemoved = CollectionUtils.subtract(monitoredPipes.keySet(), map.keySet());
+            for (String path : shouldBeRemoved){
+                toggleJmxRegistration(null, path, false);
+            }
+        }
+        monitoredPipes = map;
+        for (String path : monitoredPipes.keySet()){
+            toggleJmxRegistration(monitoredPipes.get(path), path, true);
+        }
+    }
+
+    protected Collection<PipeMonitor> getMonitoredPipes() {
+        Collection<PipeMonitor> beans = new ArrayList<>();
+        if (serviceUser != null) {
+            try (ResourceResolver resolver = factory.getServiceResourceResolver(serviceUser)) {
+                for (Iterator<Resource> resourceIterator = resolver.findResources(MONITORED_PIPES_QUERY, Query.XPATH); resourceIterator.hasNext(); ) {
+                    beans.add(new org.apache.sling.pipes.internal.PipeMonitor(this, getPipe(resourceIterator.next())));
+                }
+            } catch (LoginException e) {
+                log.error("unable to retrieve resolver for collecting exposed pipes", e);
+            } catch (Exception e) {
+                log.error("failed to execute the pipe", e);
+            }
+        }
+        return beans;
+    }
 }
\ No newline at end of file

-- 
To stop receiving notification emails like this one, please contact
['"commits@sling.apache.org" <co...@sling.apache.org>'].