You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by to...@apache.org on 2016/11/11 14:25:37 UTC
svn commit: r1769290 - in
/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution:
agent/impl/ monitor/impl/
Author: tommaso
Date: Fri Nov 11 14:25:37 2016
New Revision: 1769290
URL: http://svn.apache.org/viewvc?rev=1769290&view=rev
Log:
SLING-6238 - adding jmx support for queue providers
Added:
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/monitor/impl/DistributionQueueMBean.java (with props)
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/monitor/impl/DistributionQueueMBeanImpl.java (with props)
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/monitor/impl/MonitoringDistributionQueueProvider.java (with props)
Modified:
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/ForwardDistributionAgentFactory.java
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/QueueDistributionAgentFactory.java
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/ReverseDistributionAgentFactory.java
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SyncDistributionAgentFactory.java
Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/ForwardDistributionAgentFactory.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/ForwardDistributionAgentFactory.java?rev=1769290&r1=1769289&r2=1769290&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/ForwardDistributionAgentFactory.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/ForwardDistributionAgentFactory.java Fri Nov 11 14:25:37 2016
@@ -46,6 +46,7 @@ import org.apache.sling.distribution.eve
import org.apache.sling.distribution.log.impl.DefaultDistributionLog;
import org.apache.sling.distribution.monitor.impl.ForwardDistributionAgentMBean;
import org.apache.sling.distribution.monitor.impl.ForwardDistributionAgentMBeanImpl;
+import org.apache.sling.distribution.monitor.impl.MonitoringDistributionQueueProvider;
import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
import org.apache.sling.distribution.packaging.DistributionPackageExporter;
import org.apache.sling.distribution.packaging.DistributionPackageImporter;
@@ -248,6 +249,7 @@ public class ForwardDistributionAgentFac
} else {
queueProvider = new SimpleDistributionQueueProvider(scheduler, agentName, true);
}
+ queueProvider = new MonitoringDistributionQueueProvider(queueProvider, context);
DistributionQueueDispatchingStrategy exportQueueStrategy;
DistributionQueueDispatchingStrategy errorQueueStrategy = null;
Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/QueueDistributionAgentFactory.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/QueueDistributionAgentFactory.java?rev=1769290&r1=1769289&r2=1769290&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/QueueDistributionAgentFactory.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/QueueDistributionAgentFactory.java Fri Nov 11 14:25:37 2016
@@ -38,6 +38,7 @@ import org.apache.sling.distribution.com
import org.apache.sling.distribution.component.impl.SettingsUtils;
import org.apache.sling.distribution.event.impl.DistributionEventFactory;
import org.apache.sling.distribution.log.impl.DefaultDistributionLog;
+import org.apache.sling.distribution.monitor.impl.MonitoringDistributionQueueProvider;
import org.apache.sling.distribution.monitor.impl.QueueDistributionAgentMBean;
import org.apache.sling.distribution.monitor.impl.QueueDistributionAgentMBeanImpl;
import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
@@ -176,7 +177,8 @@ public class QueueDistributionAgentFacto
priorityQueues = SettingsUtils.removeEmptyEntries(priorityQueues);
- DistributionQueueProvider queueProvider = new JobHandlingDistributionQueueProvider(agentName, jobManager, context);
+ DistributionQueueProvider queueProvider = new MonitoringDistributionQueueProvider(new JobHandlingDistributionQueueProvider(agentName, jobManager, context), context);
+
DistributionQueueDispatchingStrategy exportQueueStrategy = null;
Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/ReverseDistributionAgentFactory.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/ReverseDistributionAgentFactory.java?rev=1769290&r1=1769289&r2=1769290&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/ReverseDistributionAgentFactory.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/ReverseDistributionAgentFactory.java Fri Nov 11 14:25:37 2016
@@ -40,6 +40,7 @@ import org.apache.sling.distribution.com
import org.apache.sling.distribution.component.impl.SettingsUtils;
import org.apache.sling.distribution.event.impl.DistributionEventFactory;
import org.apache.sling.distribution.log.impl.DefaultDistributionLog;
+import org.apache.sling.distribution.monitor.impl.MonitoringDistributionQueueProvider;
import org.apache.sling.distribution.monitor.impl.ReverseDistributionAgentMBean;
import org.apache.sling.distribution.monitor.impl.ReverseDistributionAgentMBeanImpl;
import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
@@ -192,7 +193,7 @@ public class ReverseDistributionAgentFac
DistributionPackageExporter packageExporter = new RemoteDistributionPackageExporter(distributionLog, packageBuilder, transportSecretProvider, exporterEndpoints, pullItems);
DistributionPackageImporter packageImporter = new LocalDistributionPackageImporter(agentName, distributionEventFactory, packageBuilder);
- DistributionQueueProvider queueProvider = new JobHandlingDistributionQueueProvider(agentName, jobManager, context);
+ DistributionQueueProvider queueProvider = new MonitoringDistributionQueueProvider(new JobHandlingDistributionQueueProvider(agentName, jobManager, context), context);
DistributionQueueDispatchingStrategy exportQueueStrategy = new SingleQueueDispatchingStrategy();
DistributionQueueDispatchingStrategy importQueueStrategy = null;
Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SyncDistributionAgentFactory.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SyncDistributionAgentFactory.java?rev=1769290&r1=1769289&r2=1769290&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SyncDistributionAgentFactory.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SyncDistributionAgentFactory.java Fri Nov 11 14:25:37 2016
@@ -42,6 +42,7 @@ import org.apache.sling.distribution.com
import org.apache.sling.distribution.component.impl.SettingsUtils;
import org.apache.sling.distribution.event.impl.DistributionEventFactory;
import org.apache.sling.distribution.log.impl.DefaultDistributionLog;
+import org.apache.sling.distribution.monitor.impl.MonitoringDistributionQueueProvider;
import org.apache.sling.distribution.monitor.impl.SyncDistributionAgentMBean;
import org.apache.sling.distribution.monitor.impl.SyncDistributionAgentMBeanImpl;
import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
@@ -241,7 +242,7 @@ public class SyncDistributionAgentFactor
packageImporter = new RemoteDistributionPackageImporter(distributionLog, transportSecretProvider, importerEndpointsMap);
DistributionPackageExporter packageExporter = new RemoteDistributionPackageExporter(distributionLog, packageBuilder, transportSecretProvider, exporterEndpoints, pullItems);
- DistributionQueueProvider queueProvider = new JobHandlingDistributionQueueProvider(agentName, jobManager, context);
+ DistributionQueueProvider queueProvider = new MonitoringDistributionQueueProvider(new JobHandlingDistributionQueueProvider(agentName, jobManager, context), context);
DistributionRequestType[] allowedRequests = new DistributionRequestType[]{DistributionRequestType.PULL};
String retryStrategy = SettingsUtils.removeEmptyEntry(PropertiesUtil.toString(config.get(RETRY_STRATEGY), null));
Added: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/monitor/impl/DistributionQueueMBean.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/monitor/impl/DistributionQueueMBean.java?rev=1769290&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/monitor/impl/DistributionQueueMBean.java (added)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/monitor/impl/DistributionQueueMBean.java Fri Nov 11 14:25:37 2016
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.monitor.impl;
+
+import java.util.Calendar;
+
+/**
+ * The MBean representation of a {@code DistributionQueue}.
+ */
+public interface DistributionQueueMBean {
+
+ /**
+ * Get this queue name
+ *
+ * @return the queue name
+ */
+ String getName();
+
+ /**
+ * Get the type of this queue.
+ *
+ * @return the type
+ */
+ String getType();
+
+ /**
+ * Get the current size of the queue.
+ *
+ * @return the current size of queue
+ */
+ int getSize();
+
+ /**
+ * Check if the queue is empty
+ *
+ * @return {@code true} if the queue is empty, {@code false} otherwise
+ */
+ boolean isEmpty();
+
+ /**
+ * Get the state associated with the status of this queue
+ *
+ * @return the state associated with the status of this queue
+ */
+ String getState();
+
+ /**
+ * Get the first item ID (in a FIFO strategy, the next to be processed) from the queue.
+ *
+ * @return first item into the queue or {@code null} if the queue is empty
+ */
+ String getHeadId();
+
+ /**
+ * Get the first item (in a FIFO strategy, the next to be processed) dequeuing attempts from the queue.
+ *
+ * @return first item dequeuing attempts or {@code -1} if the queue is empty
+ */
+ int getHeadDequeuingAttempts();
+
+ /**
+ * Get the first item (in a FIFO strategy, the next to be processed) state.
+ *
+ * @return the first item status or {@code null} if the queue is empty
+ */
+ String getHeadStatus();
+
+ /**
+ * Get the first item (in a FIFO strategy, the next to be processed) date time when joined the queue.
+ *
+ * @return the first item date time when joined the queue or {@code null} if the queue is empty
+ */
+ Calendar getHeadEnqueuingDate();
+
+}
Propchange: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/monitor/impl/DistributionQueueMBean.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/monitor/impl/DistributionQueueMBeanImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/monitor/impl/DistributionQueueMBeanImpl.java?rev=1769290&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/monitor/impl/DistributionQueueMBeanImpl.java (added)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/monitor/impl/DistributionQueueMBeanImpl.java Fri Nov 11 14:25:37 2016
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.monitor.impl;
+
+import java.util.Calendar;
+
+import org.apache.sling.distribution.queue.DistributionQueue;
+
+/**
+ * Default implementation of {@link DistributionQueueMBean}
+ */
+public final class DistributionQueueMBeanImpl implements DistributionQueueMBean {
+
+ private final DistributionQueue distributionQueue;
+
+ public DistributionQueueMBeanImpl(DistributionQueue distributionQueue) {
+ this.distributionQueue = distributionQueue;
+ }
+
+ @Override
+ public String getName() {
+ return distributionQueue.getName();
+ }
+
+ @Override
+ public String getType() {
+ return distributionQueue.getType().name().toLowerCase();
+ }
+
+ @Override
+ public int getSize() {
+ return distributionQueue.getStatus().getItemsCount();
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return distributionQueue.getStatus().isEmpty();
+ }
+
+ @Override
+ public String getState() {
+ return distributionQueue.getStatus().getState().name().toLowerCase();
+ }
+
+ @Override
+ public String getHeadId() {
+ if (distributionQueue.getHead() != null) {
+ return distributionQueue.getHead().getId();
+ }
+ return null;
+ }
+
+ @Override
+ public int getHeadDequeuingAttempts() {
+ if (distributionQueue.getHead() != null) {
+ return distributionQueue.getHead().getStatus().getAttempts();
+ }
+ return -1;
+ }
+
+ @Override
+ public String getHeadStatus() {
+ if (distributionQueue.getHead() != null) {
+ return distributionQueue.getHead().getStatus().getItemState().name().toLowerCase();
+ }
+ return null;
+ }
+
+ @Override
+ public Calendar getHeadEnqueuingDate() {
+ if (distributionQueue.getHead() != null) {
+ return distributionQueue.getHead().getStatus().getEntered();
+ }
+ return null;
+ }
+
+}
Propchange: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/monitor/impl/DistributionQueueMBeanImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/monitor/impl/MonitoringDistributionQueueProvider.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/monitor/impl/MonitoringDistributionQueueProvider.java?rev=1769290&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/monitor/impl/MonitoringDistributionQueueProvider.java (added)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/monitor/impl/MonitoringDistributionQueueProvider.java Fri Nov 11 14:25:37 2016
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.monitor.impl;
+
+import javax.management.ObjectName;
+import java.util.Dictionary;
+import java.util.HashSet;
+import java.util.Hashtable;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.sling.distribution.common.DistributionException;
+import org.apache.sling.distribution.queue.DistributionQueue;
+import org.apache.sling.distribution.queue.DistributionQueueProcessor;
+import org.apache.sling.distribution.queue.DistributionQueueProvider;
+import org.apache.sling.distribution.queue.DistributionQueueType;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceRegistration;
+
+/**
+ * A {@link DistributionQueueProvider} that supports monitoring via JMX.
+ */
+public class MonitoringDistributionQueueProvider implements DistributionQueueProvider {
+
+ private final Set<String> monitoredQueues = new HashSet<String>();
+
+ private final List<ServiceRegistration> mBeans = new LinkedList<ServiceRegistration>();
+
+ private final DistributionQueueProvider wrapped;
+
+ private final BundleContext context;
+
+ public MonitoringDistributionQueueProvider(DistributionQueueProvider wrapped, BundleContext context) {
+ this.wrapped = wrapped;
+ this.context = context;
+ }
+
+ @Override
+ public DistributionQueue getQueue(String queueName) throws DistributionException {
+ DistributionQueue distributionQueue = wrapped.getQueue(queueName);
+ monitorQueue(distributionQueue);
+ return distributionQueue;
+ }
+
+ @Override
+ public DistributionQueue getQueue(String queueName, DistributionQueueType type) {
+ DistributionQueue distributionQueue = wrapped.getQueue(queueName, type);
+ monitorQueue(distributionQueue);
+ return distributionQueue;
+ }
+
+ @Override
+ public void enableQueueProcessing(DistributionQueueProcessor queueProcessor, String... queueNames) throws DistributionException {
+ wrapped.enableQueueProcessing(queueProcessor, queueNames);
+ }
+
+ @Override
+ public void disableQueueProcessing() throws DistributionException {
+ wrapped.disableQueueProcessing();
+
+ for (ServiceRegistration mBean : mBeans) {
+ if (mBean != null) {
+ mBean.unregister();
+ }
+ }
+
+ mBeans.clear();
+ monitoredQueues.clear();
+ }
+
+ private void monitorQueue(DistributionQueue distributionQueue) {
+ if (monitoredQueues.add(distributionQueue.getName())) {
+ DistributionQueueMBean mBean = new DistributionQueueMBeanImpl(distributionQueue);
+
+ Dictionary<String, String> mBeanProps = new Hashtable<String, String>();
+ mBeanProps.put("jmx.objectname", "org.apache.sling.distribution:type=queue,id="
+ + ObjectName.quote(distributionQueue.getName()));
+
+ ServiceRegistration mBeanRegistration = context.registerService(DistributionQueueMBean.class.getName(), mBean, mBeanProps);
+ mBeans.add(mBeanRegistration);
+ }
+ }
+
+}
Propchange: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/monitor/impl/MonitoringDistributionQueueProvider.java
------------------------------------------------------------------------------
svn:eol-style = native