You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by to...@apache.org on 2016/11/11 14:25:37 UTC

svn commit: r1769290 - in /sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution: agent/impl/ monitor/impl/

Author: tommaso
Date: Fri Nov 11 14:25:37 2016
New Revision: 1769290

URL: http://svn.apache.org/viewvc?rev=1769290&view=rev
Log:
SLING-6238 - adding jmx support for queue providers

Added:
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/monitor/impl/DistributionQueueMBean.java   (with props)
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/monitor/impl/DistributionQueueMBeanImpl.java   (with props)
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/monitor/impl/MonitoringDistributionQueueProvider.java   (with props)
Modified:
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/ForwardDistributionAgentFactory.java
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/QueueDistributionAgentFactory.java
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/ReverseDistributionAgentFactory.java
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SyncDistributionAgentFactory.java

Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/ForwardDistributionAgentFactory.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/ForwardDistributionAgentFactory.java?rev=1769290&r1=1769289&r2=1769290&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/ForwardDistributionAgentFactory.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/ForwardDistributionAgentFactory.java Fri Nov 11 14:25:37 2016
@@ -46,6 +46,7 @@ import org.apache.sling.distribution.eve
 import org.apache.sling.distribution.log.impl.DefaultDistributionLog;
 import org.apache.sling.distribution.monitor.impl.ForwardDistributionAgentMBean;
 import org.apache.sling.distribution.monitor.impl.ForwardDistributionAgentMBeanImpl;
+import org.apache.sling.distribution.monitor.impl.MonitoringDistributionQueueProvider;
 import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
 import org.apache.sling.distribution.packaging.DistributionPackageExporter;
 import org.apache.sling.distribution.packaging.DistributionPackageImporter;
@@ -248,6 +249,7 @@ public class ForwardDistributionAgentFac
         } else {
             queueProvider = new SimpleDistributionQueueProvider(scheduler, agentName, true);
         }
+        queueProvider = new MonitoringDistributionQueueProvider(queueProvider, context);
 
         DistributionQueueDispatchingStrategy exportQueueStrategy;
         DistributionQueueDispatchingStrategy errorQueueStrategy = null;

Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/QueueDistributionAgentFactory.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/QueueDistributionAgentFactory.java?rev=1769290&r1=1769289&r2=1769290&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/QueueDistributionAgentFactory.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/QueueDistributionAgentFactory.java Fri Nov 11 14:25:37 2016
@@ -38,6 +38,7 @@ import org.apache.sling.distribution.com
 import org.apache.sling.distribution.component.impl.SettingsUtils;
 import org.apache.sling.distribution.event.impl.DistributionEventFactory;
 import org.apache.sling.distribution.log.impl.DefaultDistributionLog;
+import org.apache.sling.distribution.monitor.impl.MonitoringDistributionQueueProvider;
 import org.apache.sling.distribution.monitor.impl.QueueDistributionAgentMBean;
 import org.apache.sling.distribution.monitor.impl.QueueDistributionAgentMBeanImpl;
 import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
@@ -176,7 +177,8 @@ public class QueueDistributionAgentFacto
         priorityQueues = SettingsUtils.removeEmptyEntries(priorityQueues);
 
 
-        DistributionQueueProvider queueProvider = new JobHandlingDistributionQueueProvider(agentName, jobManager, context);
+        DistributionQueueProvider queueProvider = new MonitoringDistributionQueueProvider(new JobHandlingDistributionQueueProvider(agentName, jobManager, context), context);
+
         DistributionQueueDispatchingStrategy exportQueueStrategy = null;
 
 

Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/ReverseDistributionAgentFactory.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/ReverseDistributionAgentFactory.java?rev=1769290&r1=1769289&r2=1769290&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/ReverseDistributionAgentFactory.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/ReverseDistributionAgentFactory.java Fri Nov 11 14:25:37 2016
@@ -40,6 +40,7 @@ import org.apache.sling.distribution.com
 import org.apache.sling.distribution.component.impl.SettingsUtils;
 import org.apache.sling.distribution.event.impl.DistributionEventFactory;
 import org.apache.sling.distribution.log.impl.DefaultDistributionLog;
+import org.apache.sling.distribution.monitor.impl.MonitoringDistributionQueueProvider;
 import org.apache.sling.distribution.monitor.impl.ReverseDistributionAgentMBean;
 import org.apache.sling.distribution.monitor.impl.ReverseDistributionAgentMBeanImpl;
 import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
@@ -192,7 +193,7 @@ public class ReverseDistributionAgentFac
 
         DistributionPackageExporter packageExporter = new RemoteDistributionPackageExporter(distributionLog, packageBuilder, transportSecretProvider, exporterEndpoints, pullItems);
         DistributionPackageImporter packageImporter = new LocalDistributionPackageImporter(agentName, distributionEventFactory, packageBuilder);
-        DistributionQueueProvider queueProvider = new JobHandlingDistributionQueueProvider(agentName, jobManager, context);
+        DistributionQueueProvider queueProvider = new MonitoringDistributionQueueProvider(new JobHandlingDistributionQueueProvider(agentName, jobManager, context), context);
 
         DistributionQueueDispatchingStrategy exportQueueStrategy = new SingleQueueDispatchingStrategy();
         DistributionQueueDispatchingStrategy importQueueStrategy = null;

Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SyncDistributionAgentFactory.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SyncDistributionAgentFactory.java?rev=1769290&r1=1769289&r2=1769290&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SyncDistributionAgentFactory.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SyncDistributionAgentFactory.java Fri Nov 11 14:25:37 2016
@@ -42,6 +42,7 @@ import org.apache.sling.distribution.com
 import org.apache.sling.distribution.component.impl.SettingsUtils;
 import org.apache.sling.distribution.event.impl.DistributionEventFactory;
 import org.apache.sling.distribution.log.impl.DefaultDistributionLog;
+import org.apache.sling.distribution.monitor.impl.MonitoringDistributionQueueProvider;
 import org.apache.sling.distribution.monitor.impl.SyncDistributionAgentMBean;
 import org.apache.sling.distribution.monitor.impl.SyncDistributionAgentMBeanImpl;
 import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
@@ -241,7 +242,7 @@ public class SyncDistributionAgentFactor
         packageImporter = new RemoteDistributionPackageImporter(distributionLog, transportSecretProvider, importerEndpointsMap);
 
         DistributionPackageExporter packageExporter = new RemoteDistributionPackageExporter(distributionLog, packageBuilder, transportSecretProvider, exporterEndpoints, pullItems);
-        DistributionQueueProvider queueProvider = new JobHandlingDistributionQueueProvider(agentName, jobManager, context);
+        DistributionQueueProvider queueProvider = new MonitoringDistributionQueueProvider(new JobHandlingDistributionQueueProvider(agentName, jobManager, context), context);
         DistributionRequestType[] allowedRequests = new DistributionRequestType[]{DistributionRequestType.PULL};
 
         String retryStrategy = SettingsUtils.removeEmptyEntry(PropertiesUtil.toString(config.get(RETRY_STRATEGY), null));

Added: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/monitor/impl/DistributionQueueMBean.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/monitor/impl/DistributionQueueMBean.java?rev=1769290&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/monitor/impl/DistributionQueueMBean.java (added)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/monitor/impl/DistributionQueueMBean.java Fri Nov 11 14:25:37 2016
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.monitor.impl;
+
+import java.util.Calendar;
+
+/**
+ * The MBean representation of a {@code DistributionQueue}.
+ */
+public interface DistributionQueueMBean {
+
+    /**
+     * Get this queue name
+     *
+     * @return the queue name
+     */
+    String getName();
+
+    /**
+     * Get the type of this queue.
+     *
+     * @return the type
+     */
+    String getType();
+
+    /**
+     * Get the current size of the queue.
+     *
+     * @return the current size of queue
+     */
+    int getSize();
+
+    /**
+     * Check if the queue is empty
+     *
+     * @return {@code true} if the queue is empty, {@code false} otherwise
+     */
+    boolean isEmpty();
+
+    /**
+     * Get the state associated with the status of this queue
+     *
+     * @return the state associated with the status of this queue
+     */
+    String getState();
+
+    /**
+     * Get the first item ID (in a FIFO strategy, the next to be processed) from the queue.
+     *
+     * @return first item into the queue or {@code null} if the queue is empty
+     */
+    String getHeadId();
+
+    /**
+     * Get the first item (in a FIFO strategy, the next to be processed) dequeuing attempts from the queue.
+     *
+     * @return first item dequeuing attempts or {@code -1} if the queue is empty
+     */
+    int getHeadDequeuingAttempts();
+
+    /**
+     * Get the first item (in a FIFO strategy, the next to be processed) state.
+     *
+     * @return the first item status or {@code null} if the queue is empty
+     */
+    String getHeadStatus();
+
+    /**
+     * Get the first item (in a FIFO strategy, the next to be processed) date time when joined the queue.
+     *
+     * @return the first item date time when joined the queue or {@code null} if the queue is empty
+     */
+    Calendar getHeadEnqueuingDate();
+
+}

Propchange: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/monitor/impl/DistributionQueueMBean.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/monitor/impl/DistributionQueueMBeanImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/monitor/impl/DistributionQueueMBeanImpl.java?rev=1769290&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/monitor/impl/DistributionQueueMBeanImpl.java (added)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/monitor/impl/DistributionQueueMBeanImpl.java Fri Nov 11 14:25:37 2016
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.monitor.impl;
+
+import java.util.Calendar;
+
+import org.apache.sling.distribution.queue.DistributionQueue;
+
+/**
+ * Default implementation of {@link DistributionQueueMBean}
+ */
+public final class DistributionQueueMBeanImpl implements DistributionQueueMBean {
+
+    private final DistributionQueue distributionQueue;
+
+    public DistributionQueueMBeanImpl(DistributionQueue distributionQueue) {
+        this.distributionQueue = distributionQueue;
+    }
+
+    @Override
+    public String getName() {
+        return distributionQueue.getName();
+    }
+
+    @Override
+    public String getType() {
+        return distributionQueue.getType().name().toLowerCase();
+    }
+
+    @Override
+    public int getSize() {
+        return distributionQueue.getStatus().getItemsCount();
+    }
+
+    @Override
+    public boolean isEmpty() {
+        return distributionQueue.getStatus().isEmpty();
+    }
+
+    @Override
+    public String getState() {
+        return distributionQueue.getStatus().getState().name().toLowerCase();
+    }
+
+    @Override
+    public String getHeadId() {
+        if (distributionQueue.getHead() != null) {
+            return distributionQueue.getHead().getId();
+        }
+        return null;
+    }
+
+    @Override
+    public int getHeadDequeuingAttempts() {
+        if (distributionQueue.getHead() != null) {
+            return distributionQueue.getHead().getStatus().getAttempts();
+        }
+        return -1;
+    }
+
+    @Override
+    public String getHeadStatus() {
+        if (distributionQueue.getHead() != null) {
+            return distributionQueue.getHead().getStatus().getItemState().name().toLowerCase();
+        }
+        return null;
+    }
+
+    @Override
+    public Calendar getHeadEnqueuingDate() {
+        if (distributionQueue.getHead() != null) {
+            return distributionQueue.getHead().getStatus().getEntered();
+        }
+        return null;
+    }
+
+}

Propchange: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/monitor/impl/DistributionQueueMBeanImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/monitor/impl/MonitoringDistributionQueueProvider.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/monitor/impl/MonitoringDistributionQueueProvider.java?rev=1769290&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/monitor/impl/MonitoringDistributionQueueProvider.java (added)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/monitor/impl/MonitoringDistributionQueueProvider.java Fri Nov 11 14:25:37 2016
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.monitor.impl;
+
+import javax.management.ObjectName;
+import java.util.Dictionary;
+import java.util.HashSet;
+import java.util.Hashtable;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.sling.distribution.common.DistributionException;
+import org.apache.sling.distribution.queue.DistributionQueue;
+import org.apache.sling.distribution.queue.DistributionQueueProcessor;
+import org.apache.sling.distribution.queue.DistributionQueueProvider;
+import org.apache.sling.distribution.queue.DistributionQueueType;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceRegistration;
+
+/**
+ * A {@link DistributionQueueProvider} that supports monitoring via JMX.
+ */
+public class MonitoringDistributionQueueProvider implements DistributionQueueProvider {
+
+    private final Set<String> monitoredQueues = new HashSet<String>();
+
+    private final List<ServiceRegistration> mBeans = new LinkedList<ServiceRegistration>();
+
+    private final DistributionQueueProvider wrapped;
+
+    private final BundleContext context;
+
+    public MonitoringDistributionQueueProvider(DistributionQueueProvider wrapped, BundleContext context) {
+        this.wrapped = wrapped;
+        this.context = context;
+    }
+
+    @Override
+    public DistributionQueue getQueue(String queueName) throws DistributionException {
+        DistributionQueue distributionQueue = wrapped.getQueue(queueName);
+        monitorQueue(distributionQueue);
+        return distributionQueue;
+    }
+
+    @Override
+    public DistributionQueue getQueue(String queueName, DistributionQueueType type) {
+        DistributionQueue distributionQueue = wrapped.getQueue(queueName, type);
+        monitorQueue(distributionQueue);
+        return distributionQueue;
+    }
+
+    @Override
+    public void enableQueueProcessing(DistributionQueueProcessor queueProcessor, String... queueNames) throws DistributionException {
+        wrapped.enableQueueProcessing(queueProcessor, queueNames);
+    }
+
+    @Override
+    public void disableQueueProcessing() throws DistributionException {
+        wrapped.disableQueueProcessing();
+
+        for (ServiceRegistration mBean : mBeans) {
+            if (mBean != null) {
+                mBean.unregister();
+            }
+        }
+
+        mBeans.clear();
+        monitoredQueues.clear();
+    }
+
+    private void monitorQueue(DistributionQueue distributionQueue) {
+        if (monitoredQueues.add(distributionQueue.getName())) {
+            DistributionQueueMBean mBean = new DistributionQueueMBeanImpl(distributionQueue);
+
+            Dictionary<String, String> mBeanProps = new Hashtable<String, String>();
+            mBeanProps.put("jmx.objectname", "org.apache.sling.distribution:type=queue,id="
+                    + ObjectName.quote(distributionQueue.getName()));
+
+            ServiceRegistration mBeanRegistration = context.registerService(DistributionQueueMBean.class.getName(), mBean, mBeanProps);
+            mBeans.add(mBeanRegistration);
+        }
+    }
+
+}

Propchange: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/monitor/impl/MonitoringDistributionQueueProvider.java
------------------------------------------------------------------------------
    svn:eol-style = native