You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2012/01/30 18:26:57 UTC

svn commit: r1237810 - in /camel/branches/camel-2.9.x: ./ camel-core/src/main/java/org/apache/camel/component/seda/ camel-core/src/main/java/org/apache/camel/management/ camel-core/src/main/java/org/apache/camel/management/mbean/ camel-core/src/main/ja...

Author: davsclaus
Date: Mon Jan 30 17:26:56 2012
New Revision: 1237810

URL: http://svn.apache.org/viewvc?rev=1237810&view=rev
Log:
CAMEL-4938: Fixed seda endpoint with JMX not being able to invoke certain methods.

Removed:
    camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedSedaEndpoint.java
Modified:
    camel/branches/camel-2.9.x/   (props changed)
    camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
    camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java
    camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/util/EndpointHelper.java
    camel/branches/camel-2.9.x/components/camel-jaxb/   (props changed)
    camel/branches/camel-2.9.x/components/camel-jaxb/src/test/   (props changed)
    camel/branches/camel-2.9.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsQueueEndpoint.java
    camel/branches/camel-2.9.x/components/camel-jms/src/test/java/org/apache/camel/component/jms/BrowsableQueueTest.java

Propchange: camel/branches/camel-2.9.x/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Jan 30 17:26:56 2012
@@ -1 +1 @@
-/camel/trunk:1235643,1236639,1236667,1237666
+/camel/trunk:1235643,1236639,1236667,1237666,1237807

Propchange: camel/branches/camel-2.9.x/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.

Modified: camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java?rev=1237810&r1=1237809&r2=1237810&view=diff
==============================================================================
--- camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java (original)
+++ camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java Mon Jan 30 17:26:56 2012
@@ -28,13 +28,19 @@ import java.util.concurrent.LinkedBlocki
 import org.apache.camel.Component;
 import org.apache.camel.Consumer;
 import org.apache.camel.Exchange;
+import org.apache.camel.Message;
 import org.apache.camel.MultipleConsumersSupport;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.WaitForTaskToComplete;
+import org.apache.camel.api.management.ManagedAttribute;
+import org.apache.camel.api.management.ManagedOperation;
+import org.apache.camel.api.management.ManagedResource;
 import org.apache.camel.impl.DefaultEndpoint;
 import org.apache.camel.processor.MulticastProcessor;
 import org.apache.camel.spi.BrowsableEndpoint;
+import org.apache.camel.util.EndpointHelper;
+import org.apache.camel.util.MessageHelper;
 import org.apache.camel.util.ServiceHelper;
 
 /**
@@ -42,6 +48,7 @@ import org.apache.camel.util.ServiceHelp
  * href="http://camel.apache.org/queue.html">Queue components</a> for
  * asynchronous SEDA exchanges on a {@link BlockingQueue} within a CamelContext
  */
+@ManagedResource(description = "Managed SedaEndpoint")
 public class SedaEndpoint extends DefaultEndpoint implements BrowsableEndpoint, MultipleConsumersSupport {
     private volatile BlockingQueue<Exchange> queue;
     private int size;
@@ -133,6 +140,7 @@ public class SedaEndpoint extends Defaul
         this.size = queue.remainingCapacity();
     }
 
+    @ManagedAttribute(description = "Queue max capacity")
     public int getSize() {
         return size;
     }
@@ -141,6 +149,7 @@ public class SedaEndpoint extends Defaul
         this.size = size;
     }
 
+    @ManagedAttribute(description = "Current queue size")
     public int getCurrentQueueSize() {
         return queue.size();
     }
@@ -149,6 +158,7 @@ public class SedaEndpoint extends Defaul
         this.blockWhenFull = blockWhenFull;
     }
 
+    @ManagedAttribute(description = "Whether the caller will block sending to a full queue")
     public boolean isBlockWhenFull() {
         return blockWhenFull;
     }
@@ -157,6 +167,7 @@ public class SedaEndpoint extends Defaul
         this.concurrentConsumers = concurrentConsumers;
     }
 
+    @ManagedAttribute(description = "Number of concurrent consumers")
     public int getConcurrentConsumers() {
         return concurrentConsumers;
     }
@@ -169,6 +180,7 @@ public class SedaEndpoint extends Defaul
         this.waitForTaskToComplete = waitForTaskToComplete;
     }
 
+    @ManagedAttribute
     public long getTimeout() {
         return timeout;
     }
@@ -177,6 +189,7 @@ public class SedaEndpoint extends Defaul
         this.timeout = timeout;
     }
 
+    @ManagedAttribute
     public boolean isMultipleConsumers() {
         return multipleConsumers;
     }
@@ -196,6 +209,7 @@ public class SedaEndpoint extends Defaul
         return new ArrayList<Exchange>(getQueue());
     }
 
+    @ManagedAttribute
     public boolean isMultipleConsumersSupported() {
         return isMultipleConsumers();
     }
@@ -203,6 +217,7 @@ public class SedaEndpoint extends Defaul
     /**
      * Purges the queue
      */
+    @ManagedOperation(description = "Purges the seda queue")
     public void purgeQueue() {
         queue.clear();
     }
@@ -221,6 +236,74 @@ public class SedaEndpoint extends Defaul
         return new HashSet<SedaProducer>(producers);
     }
 
+    @ManagedOperation(description = "Current number of Exchanges in Queue")
+    public long queueSize() {
+        return getExchanges().size();
+    }
+
+    @ManagedOperation(description = "Get Exchange from queue by index")
+    public String browseExchange(Integer index) {
+        List<Exchange> exchanges = getExchanges();
+        if (index >= exchanges.size()) {
+            return null;
+        }
+        Exchange exchange = exchanges.get(index);
+        if (exchange == null) {
+            return null;
+        }
+        // must use java type with JMX such as java.lang.String
+        return exchange.toString();
+    }
+
+    @ManagedOperation(description = "Get message body from queue by index")
+    public String browseMessageBody(Integer index) {
+        List<Exchange> exchanges = getExchanges();
+        if (index >= exchanges.size()) {
+            return null;
+        }
+        Exchange exchange = exchanges.get(index);
+        if (exchange == null) {
+            return null;
+        }
+
+        // must use java type with JMX such as java.lang.String
+        String body;
+        if (exchange.hasOut()) {
+            body = exchange.getOut().getBody(String.class);
+        } else {
+            body = exchange.getIn().getBody(String.class);
+        }
+
+        return body;
+    }
+
+    @ManagedOperation(description = "Get message as XML from queue by index")
+    public String browseMessageAsXml(Integer index, Boolean includeBody) {
+        List<Exchange> exchanges = getExchanges();
+        if (index >= exchanges.size()) {
+            return null;
+        }
+        Exchange exchange = exchanges.get(index);
+        if (exchange == null) {
+            return null;
+        }
+
+        Message msg = exchange.hasOut() ? exchange.getOut() : exchange.getIn();
+        String xml = MessageHelper.dumpAsXml(msg, includeBody);
+
+        return xml;
+    }
+
+    @ManagedOperation(description = "Gets all the messages as XML from the queue")
+    public String browseAllMessagesAsXml(Boolean includeBody) {
+        return browseRangeMessagesAsXml(0, Integer.MAX_VALUE, includeBody);
+    }
+
+    @ManagedOperation(description = "Gets the range of messages as XML from the queue")
+    public String browseRangeMessagesAsXml(Integer fromIndex, Integer toIndex, Boolean includeBody) {
+        return EndpointHelper.browseRangeMessagesAsXml(this, fromIndex, toIndex, includeBody);
+    }
+
     void onStarted(SedaProducer producer) {
         producers.add(producer);
     }

Modified: camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java?rev=1237810&r1=1237809&r2=1237810&view=diff
==============================================================================
--- camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java (original)
+++ camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java Mon Jan 30 17:26:56 2012
@@ -44,7 +44,6 @@ import org.apache.camel.management.mbean
 import org.apache.camel.management.mbean.ManagedProducer;
 import org.apache.camel.management.mbean.ManagedRoute;
 import org.apache.camel.management.mbean.ManagedScheduledPollConsumer;
-import org.apache.camel.management.mbean.ManagedSedaEndpoint;
 import org.apache.camel.management.mbean.ManagedSendProcessor;
 import org.apache.camel.management.mbean.ManagedService;
 import org.apache.camel.management.mbean.ManagedSuspendableRoute;
@@ -92,10 +91,6 @@ public class DefaultManagementObjectStra
 
         if (endpoint instanceof org.apache.camel.spi.ManagementAware) {
             return ((org.apache.camel.spi.ManagementAware<Endpoint>) endpoint).getManagedObject(endpoint);
-        } else if (endpoint instanceof SedaEndpoint) {
-            ManagedSedaEndpoint me = new ManagedSedaEndpoint((SedaEndpoint) endpoint);
-            me.init(context.getManagementStrategy());
-            return me;
         } else if (endpoint instanceof BrowsableEndpoint) {
             ManagedBrowsableEndpoint me = new ManagedBrowsableEndpoint((BrowsableEndpoint) endpoint);
             me.init(context.getManagementStrategy());

Modified: camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/util/EndpointHelper.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/util/EndpointHelper.java?rev=1237810&r1=1237809&r2=1237810&view=diff
==============================================================================
--- camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/util/EndpointHelper.java (original)
+++ camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/util/EndpointHelper.java Mon Jan 30 17:26:56 2012
@@ -28,10 +28,12 @@ import java.util.regex.PatternSyntaxExce
 import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
+import org.apache.camel.Message;
 import org.apache.camel.PollingConsumer;
 import org.apache.camel.Processor;
 import org.apache.camel.ResolveEndpointFailedException;
 import org.apache.camel.Route;
+import org.apache.camel.spi.BrowsableEndpoint;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -394,4 +396,44 @@ public final class EndpointHelper {
         // not found
         return null;
     }
+
+    /**
+     * Browses the {@link BrowsableEndpoint} within the given range, and returns the messages as a XML payload.
+     *
+     * @param endpoint the browsable endpoint
+     * @param fromIndex  from range
+     * @param toIndex    to range
+     * @param includeBody whether to include the message body in the XML payload
+     * @return XML payload with the messages
+     * @throws IllegalArgumentException if the from and to range is invalid
+     * @see MessageHelper#dumpAsXml(org.apache.camel.Message)
+     */
+    public static String browseRangeMessagesAsXml(BrowsableEndpoint endpoint, Integer fromIndex, Integer toIndex, Boolean includeBody) {
+        if (fromIndex == null) {
+            fromIndex = 0;
+        }
+        if (toIndex == null) {
+            toIndex = Integer.MAX_VALUE;
+        }
+        if (fromIndex > toIndex) {
+            throw new IllegalArgumentException("From index cannot be larger than to index, was: " + fromIndex + " > " + toIndex);
+        }
+
+        List<Exchange> exchanges = endpoint.getExchanges();
+        if (exchanges.size() == 0) {
+            return null;
+        }
+
+        StringBuilder sb = new StringBuilder();
+        sb.append("<messages>");
+        for (int i = fromIndex; i < exchanges.size() && i <= toIndex; i++) {
+            Exchange exchange = exchanges.get(i);
+            Message msg = exchange.hasOut() ? exchange.getOut() : exchange.getIn();
+            String xml = MessageHelper.dumpAsXml(msg, includeBody);
+            sb.append("\n").append(xml);
+        }
+        sb.append("\n</messages>");
+        return sb.toString();
+    }
+    
 }

Propchange: camel/branches/camel-2.9.x/components/camel-jaxb/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Jan 30 17:26:56 2012
@@ -1 +1 @@
-/camel/trunk/components/camel-jaxb:1235643,1236403-1236663,1236667,1237148,1237666
+/camel/trunk/components/camel-jaxb:1235643,1236403-1236663,1236667,1237148,1237666,1237807

Propchange: camel/branches/camel-2.9.x/components/camel-jaxb/src/test/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Jan 30 17:26:56 2012
@@ -1 +1 @@
-/camel/trunk/components/camel-jaxb/src/test:1235643,1236403-1236663,1236667,1237148,1237230,1237666
+/camel/trunk/components/camel-jaxb/src/test:1235643,1236403-1236663,1236667,1237148,1237230,1237666,1237807

Modified: camel/branches/camel-2.9.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsQueueEndpoint.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsQueueEndpoint.java?rev=1237810&r1=1237809&r2=1237810&view=diff
==============================================================================
--- camel/branches/camel-2.9.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsQueueEndpoint.java (original)
+++ camel/branches/camel-2.9.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsQueueEndpoint.java Mon Jan 30 17:26:56 2012
@@ -27,6 +27,7 @@ import org.apache.camel.api.management.M
 import org.apache.camel.api.management.ManagedOperation;
 import org.apache.camel.api.management.ManagedResource;
 import org.apache.camel.spi.BrowsableEndpoint;
+import org.apache.camel.util.EndpointHelper;
 import org.apache.camel.util.MessageHelper;
 import org.springframework.jms.core.JmsOperations;
 
@@ -162,31 +163,7 @@ public class JmsQueueEndpoint extends Jm
 
     @ManagedOperation(description = "Gets the range of messages as XML from the queue")
     public String browseRangeMessagesAsXml(Integer fromIndex, Integer toIndex, Boolean includeBody) {
-        if (fromIndex == null) {
-            fromIndex = 0;
-        }
-        if (toIndex == null) {
-            toIndex = Integer.MAX_VALUE;
-        }
-        if (fromIndex > toIndex) {
-            throw new IllegalArgumentException("From index cannot be larger than to index, was: " + fromIndex + " > " + toIndex);
-        }
-
-        List<Exchange> exchanges = getExchanges();
-        if (exchanges.size() == 0) {
-            return null;
-        }
-
-        StringBuilder sb = new StringBuilder();
-        sb.append("<messages>");
-        for (int i = fromIndex; i < exchanges.size() && i <= toIndex; i++) {
-            Exchange exchange = exchanges.get(i);
-            Message msg = exchange.hasOut() ? exchange.getOut() : exchange.getIn();
-            String xml = MessageHelper.dumpAsXml(msg, includeBody);
-            sb.append("\n").append(xml);
-        }
-        sb.append("\n</messages>");
-        return sb.toString();
+        return EndpointHelper.browseRangeMessagesAsXml(this, fromIndex, toIndex, includeBody);
     }
 
     protected QueueBrowseStrategy createQueueBrowseStrategy() {

Modified: camel/branches/camel-2.9.x/components/camel-jms/src/test/java/org/apache/camel/component/jms/BrowsableQueueTest.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/components/camel-jms/src/test/java/org/apache/camel/component/jms/BrowsableQueueTest.java?rev=1237810&r1=1237809&r2=1237810&view=diff
==============================================================================
--- camel/branches/camel-2.9.x/components/camel-jms/src/test/java/org/apache/camel/component/jms/BrowsableQueueTest.java (original)
+++ camel/branches/camel-2.9.x/components/camel-jms/src/test/java/org/apache/camel/component/jms/BrowsableQueueTest.java Mon Jan 30 17:26:56 2012
@@ -62,6 +62,8 @@ public class BrowsableQueueTest extends 
             Object expected = expectedBodies[++index];
             assertEquals("Body: " + index, expected, actual);
         }
+
+        Thread.sleep(99999999);
     }
 
     @Test
@@ -115,6 +117,8 @@ public class BrowsableQueueTest extends 
     }
 
     protected CamelContext createCamelContext() throws Exception {
+        enableJMX();
+
         CamelContext camelContext = super.createCamelContext();
 
         ConnectionFactory connectionFactory = CamelJmsTestHelper.createConnectionFactory();