You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by np...@apache.org on 2018/01/15 15:38:22 UTC
[sling-org-apache-sling-pipes] branch master updated: SLING-7173
add JMX for pipes
This is an automated email from the ASF dual-hosted git repository.
npeltier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-pipes.git
The following commit(s) were added to refs/heads/master by this push:
new d70bd98 SLING-7173 add JMX for pipes
new 01b37fc Merge branch 'master' of github.com:apache/sling-org-apache-sling-pipes
d70bd98 is described below
commit d70bd984b6b70a036f6b074b1035318a2e83ca79
Author: npeltier <pe...@gmail.com>
AuthorDate: Mon Jan 15 16:37:11 2018 +0100
SLING-7173 add JMX for pipes
- at plumber (for refreshing which pipes are monitored),
- pipe monitor for pipes (if @monitored property is added, and pipe service user is present),
- show # of execution, mean time, and add basic possibility of running the pipe (in asynchronous mode)q
---
.../org/apache/sling/pipes/ExecutionResult.java | 35 ++++++
src/main/java/org/apache/sling/pipes/Plumber.java | 8 ++
.../java/org/apache/sling/pipes/PlumberMXBean.java | 23 ++++
.../apache/sling/pipes/internal/JsonWriter.java | 2 +
.../sling/pipes/internal/PipeBuilderImpl.java | 3 +-
.../apache/sling/pipes/internal/PipeMonitor.java | 135 +++++++++++++++++++++
.../sling/pipes/internal/PipeMonitorMBean.java | 44 +++++++
.../apache/sling/pipes/internal/PlumberImpl.java | 127 +++++++++++++++++--
8 files changed, 362 insertions(+), 15 deletions(-)
diff --git a/src/main/java/org/apache/sling/pipes/ExecutionResult.java b/src/main/java/org/apache/sling/pipes/ExecutionResult.java
index c0cd43c..2f8a4a3 100644
--- a/src/main/java/org/apache/sling/pipes/ExecutionResult.java
+++ b/src/main/java/org/apache/sling/pipes/ExecutionResult.java
@@ -18,6 +18,12 @@ package org.apache.sling.pipes;
import org.apache.sling.api.resource.Resource;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.CompositeDataSupport;
+import javax.management.openmbean.CompositeType;
+import javax.management.openmbean.OpenDataException;
+import javax.management.openmbean.OpenType;
+import javax.management.openmbean.SimpleType;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
@@ -27,6 +33,22 @@ import java.util.Set;
*/
public class ExecutionResult {
+ private static final String[] JMX_NAMES = new String[] {"size", "output"};
+
+ private static final CompositeType COMPOSITE_TYPE = getType();
+
+ static CompositeType getType() {
+ try {
+ return new CompositeType(ExecutionResult.class.getName(),
+ "Execution of pipe, with size, and output as pipe configuration defined it",
+ JMX_NAMES,
+ new String[] {"total size", "output as string"},
+ new OpenType[]{SimpleType.LONG, SimpleType.STRING});
+ } catch (OpenDataException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
/**
* *not* meant to hold the all the paths, just a set that is emptied each time
* it's persisted.
@@ -35,6 +57,8 @@ public class ExecutionResult {
OutputWriter writer;
+ CompositeData data;
+
/**
* Constructor
* @param writer output writer around which to create the result
@@ -80,4 +104,15 @@ public class ExecutionResult {
public String toString() {
return writer.toString();
}
+
+ /**
+ * @return Composite data view of that result. With size of the execution, and string output (can be json, csv, ...)
+ * @throws OpenDataException in case something went wrong building up the composite data
+ */
+ public CompositeData asCompositeData() throws OpenDataException {
+ if (data == null) {
+ data = new CompositeDataSupport(COMPOSITE_TYPE, JMX_NAMES, new Object[]{size(), toString()});
+ }
+ return data;
+ }
}
diff --git a/src/main/java/org/apache/sling/pipes/Plumber.java b/src/main/java/org/apache/sling/pipes/Plumber.java
index ec5fa09..c0a1a4d 100644
--- a/src/main/java/org/apache/sling/pipes/Plumber.java
+++ b/src/main/java/org/apache/sling/pipes/Plumber.java
@@ -48,6 +48,14 @@ public interface Plumber {
Job executeAsync(ResourceResolver resolver, String path, Map bindings);
/**
+ * executes in a background thread
+ * @param path path of the pipe to execute
+ * @param bindings additional bindings to use when executing
+ * @return Job if registered, null otherwise
+ */
+ Job executeAsync(String path, Map bindings);
+
+ /**
* Executes a pipe at a certain path
* @param resolver resource resolver with which pipe will be executed
* @param path path of a valid pipe configuration
diff --git a/src/main/java/org/apache/sling/pipes/PlumberMXBean.java b/src/main/java/org/apache/sling/pipes/PlumberMXBean.java
new file mode 100644
index 0000000..f3b31eb
--- /dev/null
+++ b/src/main/java/org/apache/sling/pipes/PlumberMXBean.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sling.pipes;
+
+public interface PlumberMXBean {
+
+ void refreshMonitoredPipes();
+
+}
diff --git a/src/main/java/org/apache/sling/pipes/internal/JsonWriter.java b/src/main/java/org/apache/sling/pipes/internal/JsonWriter.java
index 8a69294..762c8a5 100644
--- a/src/main/java/org/apache/sling/pipes/internal/JsonWriter.java
+++ b/src/main/java/org/apache/sling/pipes/internal/JsonWriter.java
@@ -24,6 +24,7 @@ import org.apache.sling.pipes.CustomOutputWriter;
import javax.json.Json;
import javax.json.JsonValue;
import javax.json.stream.JsonGenerator;
+import java.io.StringWriter;
import java.io.Writer;
import java.util.Map;
@@ -37,6 +38,7 @@ public class JsonWriter extends CustomOutputWriter {
public static final String JSON_EXTENSION = "json";
JsonWriter(){
+ setWriter(new StringWriter());
}
JsonWriter(Writer writer){
diff --git a/src/main/java/org/apache/sling/pipes/internal/PipeBuilderImpl.java b/src/main/java/org/apache/sling/pipes/internal/PipeBuilderImpl.java
index 68e929f..ce28d44 100644
--- a/src/main/java/org/apache/sling/pipes/internal/PipeBuilderImpl.java
+++ b/src/main/java/org/apache/sling/pipes/internal/PipeBuilderImpl.java
@@ -364,8 +364,7 @@ public class PipeBuilderImpl implements PipeBuilder {
@Override
public ExecutionResult run(Map bindings) throws Exception {
- StringWriter stringWriter = new StringWriter();
- JsonWriter writer = new JsonWriter(stringWriter);
+ JsonWriter writer = new JsonWriter();
writer.starts();
Pipe pipe = this.build();
return plumber.execute(resolver, pipe, bindings, writer , true);
diff --git a/src/main/java/org/apache/sling/pipes/internal/PipeMonitor.java b/src/main/java/org/apache/sling/pipes/internal/PipeMonitor.java
new file mode 100644
index 0000000..4981420
--- /dev/null
+++ b/src/main/java/org/apache/sling/pipes/internal/PipeMonitor.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sling.pipes.internal;
+
+import org.apache.sling.event.jobs.Job;
+import org.apache.sling.pipes.BasePipe;
+import org.apache.sling.pipes.ExecutionResult;
+import org.apache.sling.pipes.Pipe;
+import org.apache.sling.pipes.Plumber;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.OpenDataException;
+import java.util.HashMap;
+import java.util.Map;
+
+public class PipeMonitor implements PipeMonitorMBean {
+ protected static final Logger LOGGER = LoggerFactory.getLogger(PipeMonitor.class);
+
+ String name;
+
+ String path;
+
+ boolean running = false;
+
+ String status;
+
+ long lastStarted;
+
+ long duration;
+
+ int executions = 0;
+
+ int failed = 0;
+
+ long mean;
+
+ Plumber plumber;
+
+ ExecutionResult lastResult;
+
+ public void starts(){
+ lastStarted = System.currentTimeMillis();
+ running = true;
+ status = BasePipe.STATUS_STARTED;
+ }
+
+ public void ends() {
+ duration = System.currentTimeMillis() - lastStarted;
+ mean = ((mean * executions) + duration) / (executions + 1);
+ executions ++;
+ running = false;
+ status = BasePipe.STATUS_FINISHED;
+ }
+
+ public long getFailed(){
+ return failed;
+ }
+
+ @Override
+ public String getStatus() {
+ return status;
+ }
+
+ public void failed() {
+ failed ++;
+ running = false;
+ status = BasePipe.STATUS_FINISHED;
+ }
+
+ public PipeMonitor(Plumber currentPlumber, Pipe pipe){
+ plumber = currentPlumber;
+ name = pipe.getName();
+ path = pipe.getResource().getPath();
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public String getPath() {
+ return path;
+ }
+
+ @Override
+ public long getExecutionCount() {
+ return executions;
+ }
+
+ @Override
+ public long getMeanDurationMilliseconds() {
+ return mean;
+ }
+
+ public void setLastResult(ExecutionResult result) {
+ lastResult = result;
+ }
+
+ @Override
+ public CompositeData getLastResult() {
+ try {
+ if (lastResult != null){
+ return lastResult.asCompositeData();
+ }
+ } catch (OpenDataException e) {
+ LOGGER.error("unable to dump last result as composite data", e);
+ }
+ return null;
+ }
+
+ @Override
+ public String run() {
+ Map bindings = new HashMap<>();
+ bindings.put(BasePipe.READ_ONLY, false);
+ Job job = plumber.executeAsync(path, bindings);
+ return String.format("Job %s has been created", job.getId());
+ }
+}
diff --git a/src/main/java/org/apache/sling/pipes/internal/PipeMonitorMBean.java b/src/main/java/org/apache/sling/pipes/internal/PipeMonitorMBean.java
new file mode 100644
index 0000000..2187541
--- /dev/null
+++ b/src/main/java/org/apache/sling/pipes/internal/PipeMonitorMBean.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sling.pipes.internal;
+
+import org.osgi.annotation.versioning.ProviderType;
+
+import javax.management.openmbean.CompositeData;
+
+/**
+ * Mbean for providing pipe related statistics and actions
+ */
+@ProviderType
+public interface PipeMonitorMBean {
+
+ String getName();
+
+ String getPath();
+
+ long getExecutionCount();
+
+ long getMeanDurationMilliseconds();
+
+ long getFailed();
+
+ String getStatus();
+
+ CompositeData getLastResult();
+
+ String run();
+}
diff --git a/src/main/java/org/apache/sling/pipes/internal/PlumberImpl.java b/src/main/java/org/apache/sling/pipes/internal/PlumberImpl.java
index 995b7bf..05dcb69 100644
--- a/src/main/java/org/apache/sling/pipes/internal/PlumberImpl.java
+++ b/src/main/java/org/apache/sling/pipes/internal/PlumberImpl.java
@@ -16,6 +16,7 @@
*/
package org.apache.sling.pipes.internal;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.sling.api.SlingConstants;
import org.apache.sling.api.resource.LoginException;
@@ -40,6 +41,7 @@ import org.apache.sling.pipes.Pipe;
import org.apache.sling.pipes.PipeBindings;
import org.apache.sling.pipes.PipeBuilder;
import org.apache.sling.pipes.Plumber;
+import org.apache.sling.pipes.PlumberMXBean;
import org.apache.sling.pipes.ReferencePipe;
import org.apache.sling.pipes.internal.slingQuery.ChildrenPipe;
import org.apache.sling.pipes.internal.slingQuery.ClosestPipe;
@@ -49,6 +51,7 @@ import org.apache.sling.pipes.internal.slingQuery.ParentsPipe;
import org.apache.sling.pipes.internal.slingQuery.SiblingsPipe;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.component.annotations.ReferenceCardinality;
import org.osgi.service.component.annotations.ReferencePolicy;
@@ -59,8 +62,14 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jcr.RepositoryException;
+import javax.jcr.query.Query;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
+import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.GregorianCalendar;
@@ -82,10 +91,15 @@ import static org.apache.sling.pipes.BasePipe.STATUS_STARTED;
JobConsumer.PROPERTY_TOPICS +"="+PlumberImpl.SLING_EVENT_TOPIC
})
@Designate(ocd = PlumberImpl.Configuration.class)
-public class PlumberImpl implements Plumber, JobConsumer {
+public class PlumberImpl implements Plumber, JobConsumer, PlumberMXBean {
private final Logger log = LoggerFactory.getLogger(this.getClass());
public static final int DEFAULT_BUFFER_SIZE = 1000;
+ protected final static String PN_MONITORED = "monitored";
+ protected final static String MONITORED_PIPES_QUERY = String.format("//element(*,nt:base)[@sling:resourceType='%s' and @%s]", ContainerPipe.RESOURCE_TYPE, PN_MONITORED);
+
+ protected final static String MBEAN_NAME_FORMAT = "org.apache.sling.pipes:name=%s";
+
@ObjectClassDefinition(name="Apache Sling Pipes : Plumber configuration")
public @interface Configuration {
@AttributeDefinition(description="Number of iterations after which plumber should saves a pipe execution")
@@ -111,6 +125,8 @@ public class PlumberImpl implements Plumber, JobConsumer {
private List<String> allowedUsers;
+ private Map<String, PipeMonitor> monitoredPipes;
+
@Activate
public void activate(Configuration configuration){
this.configuration = configuration;
@@ -138,6 +154,38 @@ public class PlumberImpl implements Plumber, JobConsumer {
registerPipe(SiblingsPipe.RESOURCE_TYPE, SiblingsPipe.class);
registerPipe(ClosestPipe.RESOURCE_TYPE, ClosestPipe.class);
registerPipe(FindPipe.RESOURCE_TYPE, FindPipe.class);
+ toggleJmxRegistration(this, PlumberMXBean.class.getName(), true);
+ refreshMonitoredPipes();
+ }
+
+ @Deactivate
+ public void deactivate(){
+ toggleJmxRegistration(null, PlumberMXBean.class.getName(), false);
+ if (monitoredPipes != null){
+ for (String path : monitoredPipes.keySet()){
+ toggleJmxRegistration(null, path, false);
+ }
+ }
+ }
+
+ /**
+ * Toggle some mbean registration
+ * @param name partial name that will be used for registration
+ * @param register true to register, false to unregister
+ */
+ private void toggleJmxRegistration(Object instance, String name, boolean register){
+ try {
+ MBeanServer server = ManagementFactory.getPlatformMBeanServer();
+ ObjectName oName = ObjectName.getInstance(String.format(MBEAN_NAME_FORMAT, name));
+ if (register && !server.isRegistered(oName)) {
+ server.registerMBean(instance, oName);
+ }
+ if (!register && server.isRegistered(oName)){
+ server.unregisterMBean(oName);
+ }
+ } catch (Exception e) {
+ log.error("unable to toggle mbean {} registration", name, e);
+ }
}
@Reference(policy= ReferencePolicy.DYNAMIC, cardinality= ReferenceCardinality.OPTIONAL)
@@ -167,18 +215,23 @@ public class PlumberImpl implements Plumber, JobConsumer {
@Override
public Job executeAsync(ResourceResolver resolver, String path, Map bindings) {
if (allowedUsers.contains(resolver.getUserID())) {
- if (StringUtils.isBlank((String)serviceUser.get(SUBSERVICE))) {
- log.error("please configure plumber service user");
- }
- final Map props = new HashMap();
- props.put(SlingConstants.PROPERTY_PATH, path);
- props.put(PipeBindings.NN_ADDITIONALBINDINGS, bindings);
- return jobManager.addJob(SLING_EVENT_TOPIC, props);
+ return executeAsync(path, bindings);
}
return null;
}
@Override
+ public Job executeAsync(String path, Map bindings) {
+ if (StringUtils.isBlank((String)serviceUser.get(SUBSERVICE))) {
+ log.error("please configure plumber service user");
+ }
+ final Map props = new HashMap();
+ props.put(SlingConstants.PROPERTY_PATH, path);
+ props.put(PipeBindings.NN_ADDITIONALBINDINGS, bindings);
+ return jobManager.addJob(SLING_EVENT_TOPIC, props);
+ }
+
+ @Override
public ExecutionResult execute(ResourceResolver resolver, String path, Map additionalBindings, OutputWriter writer, boolean save) throws Exception {
Resource pipeResource = resolver.getResource(path);
Pipe pipe = getPipe(pipeResource);
@@ -193,18 +246,24 @@ public class PlumberImpl implements Plumber, JobConsumer {
@Override
public ExecutionResult execute(ResourceResolver resolver, Pipe pipe, Map additionalBindings, OutputWriter writer, boolean save) throws Exception {
+ boolean success = false;
+ PipeMonitor monitor = null;
try {
+ log.info("[{}] execution starts, save ({})", pipe, save);
if (additionalBindings != null && pipe instanceof ContainerPipe){
pipe.getBindings().addBindings(additionalBindings);
}
- log.info("[{}] execution starts, save ({})", pipe, save);
+ Resource confResource = pipe.getResource();
writer.setPipe(pipe);
- if (isRunning(pipe.getResource())){
+ if (isRunning(confResource)){
throw new RuntimeException("Pipe is already running");
}
+ monitor = monitoredPipes.get(confResource.getPath());
writeStatus(pipe, STATUS_STARTED);
resolver.commit();
-
+ if (monitor != null){
+ monitor.starts();
+ }
ExecutionResult result = new ExecutionResult(writer);
for (Iterator<Resource> it = pipe.getOutput(); it.hasNext();){
Resource resource = it.next();
@@ -217,12 +276,20 @@ public class PlumberImpl implements Plumber, JobConsumer {
if (save && pipe.modifiesContent()) {
persist(resolver, pipe, result, null);
}
- log.info("[{}] done executing.", pipe.getName());
writer.ends();
+ if (monitor != null){
+ monitor.ends();
+ monitor.setLastResult(result);
+ }
+ success = true;
return result;
} finally {
writeStatus(pipe, STATUS_FINISHED);
resolver.commit();
+ log.info("[{}] done executing.", pipe.getName());
+ if (!success && monitor != null){
+ monitor.failed();
+ }
}
}
@@ -312,7 +379,9 @@ public class PlumberImpl implements Plumber, JobConsumer {
try(ResourceResolver resolver = factory.getServiceResourceResolver(serviceUser)){
String path = (String)job.getProperty(SlingConstants.PROPERTY_PATH);
Map bindings = (Map)job.getProperty(PipeBindings.NN_ADDITIONALBINDINGS);
- execute(resolver, path, bindings, new NopWriter(), true);
+ OutputWriter writer = new JsonWriter();
+ writer.starts();
+ execute(resolver, path, bindings, writer, true);
return JobResult.OK;
} catch (LoginException e) {
log.error("unable to retrieve resolver for executing scheduled pipe", e);
@@ -321,4 +390,36 @@ public class PlumberImpl implements Plumber, JobConsumer {
}
return JobResult.FAILED;
}
+
+ @Override
+ public void refreshMonitoredPipes() {
+ Map<String, PipeMonitor> map = new HashMap<>();
+ getMonitoredPipes().stream().forEach(bean -> map.put(bean.getPath(), bean));
+ if (monitoredPipes != null) {
+ Collection<String> shouldBeRemoved = CollectionUtils.subtract(monitoredPipes.keySet(), map.keySet());
+ for (String path : shouldBeRemoved){
+ toggleJmxRegistration(null, path, false);
+ }
+ }
+ monitoredPipes = map;
+ for (String path : monitoredPipes.keySet()){
+ toggleJmxRegistration(monitoredPipes.get(path), path, true);
+ }
+ }
+
+ protected Collection<PipeMonitor> getMonitoredPipes() {
+ Collection<PipeMonitor> beans = new ArrayList<>();
+ if (serviceUser != null) {
+ try (ResourceResolver resolver = factory.getServiceResourceResolver(serviceUser)) {
+ for (Iterator<Resource> resourceIterator = resolver.findResources(MONITORED_PIPES_QUERY, Query.XPATH); resourceIterator.hasNext(); ) {
+ beans.add(new org.apache.sling.pipes.internal.PipeMonitor(this, getPipe(resourceIterator.next())));
+ }
+ } catch (LoginException e) {
+ log.error("unable to retrieve resolver for collecting exposed pipes", e);
+ } catch (Exception e) {
+ log.error("failed to execute the pipe", e);
+ }
+ }
+ return beans;
+ }
}
\ No newline at end of file
--
To stop receiving notification emails like this one, please contact
['"commits@sling.apache.org" <co...@sling.apache.org>'].