You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2012/01/30 18:26:57 UTC
svn commit: r1237810 - in /camel/branches/camel-2.9.x: ./
camel-core/src/main/java/org/apache/camel/component/seda/
camel-core/src/main/java/org/apache/camel/management/
camel-core/src/main/java/org/apache/camel/management/mbean/
camel-core/src/main/ja...
Author: davsclaus
Date: Mon Jan 30 17:26:56 2012
New Revision: 1237810
URL: http://svn.apache.org/viewvc?rev=1237810&view=rev
Log:
CAMEL-4938: Fixed seda endpoint with JMX not being able to invoke certain methods.
Removed:
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedSedaEndpoint.java
Modified:
camel/branches/camel-2.9.x/ (props changed)
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/util/EndpointHelper.java
camel/branches/camel-2.9.x/components/camel-jaxb/ (props changed)
camel/branches/camel-2.9.x/components/camel-jaxb/src/test/ (props changed)
camel/branches/camel-2.9.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsQueueEndpoint.java
camel/branches/camel-2.9.x/components/camel-jms/src/test/java/org/apache/camel/component/jms/BrowsableQueueTest.java
Propchange: camel/branches/camel-2.9.x/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Jan 30 17:26:56 2012
@@ -1 +1 @@
-/camel/trunk:1235643,1236639,1236667,1237666
+/camel/trunk:1235643,1236639,1236667,1237666,1237807
Propchange: camel/branches/camel-2.9.x/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.
Modified: camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java?rev=1237810&r1=1237809&r2=1237810&view=diff
==============================================================================
--- camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java (original)
+++ camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java Mon Jan 30 17:26:56 2012
@@ -28,13 +28,19 @@ import java.util.concurrent.LinkedBlocki
import org.apache.camel.Component;
import org.apache.camel.Consumer;
import org.apache.camel.Exchange;
+import org.apache.camel.Message;
import org.apache.camel.MultipleConsumersSupport;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.WaitForTaskToComplete;
+import org.apache.camel.api.management.ManagedAttribute;
+import org.apache.camel.api.management.ManagedOperation;
+import org.apache.camel.api.management.ManagedResource;
import org.apache.camel.impl.DefaultEndpoint;
import org.apache.camel.processor.MulticastProcessor;
import org.apache.camel.spi.BrowsableEndpoint;
+import org.apache.camel.util.EndpointHelper;
+import org.apache.camel.util.MessageHelper;
import org.apache.camel.util.ServiceHelper;
/**
@@ -42,6 +48,7 @@ import org.apache.camel.util.ServiceHelp
* href="http://camel.apache.org/queue.html">Queue components</a> for
* asynchronous SEDA exchanges on a {@link BlockingQueue} within a CamelContext
*/
+@ManagedResource(description = "Managed SedaEndpoint")
public class SedaEndpoint extends DefaultEndpoint implements BrowsableEndpoint, MultipleConsumersSupport {
private volatile BlockingQueue<Exchange> queue;
private int size;
@@ -133,6 +140,7 @@ public class SedaEndpoint extends Defaul
this.size = queue.remainingCapacity();
}
+ @ManagedAttribute(description = "Queue max capacity")
public int getSize() {
return size;
}
@@ -141,6 +149,7 @@ public class SedaEndpoint extends Defaul
this.size = size;
}
+ @ManagedAttribute(description = "Current queue size")
public int getCurrentQueueSize() {
return queue.size();
}
@@ -149,6 +158,7 @@ public class SedaEndpoint extends Defaul
this.blockWhenFull = blockWhenFull;
}
+ @ManagedAttribute(description = "Whether the caller will block sending to a full queue")
public boolean isBlockWhenFull() {
return blockWhenFull;
}
@@ -157,6 +167,7 @@ public class SedaEndpoint extends Defaul
this.concurrentConsumers = concurrentConsumers;
}
+ @ManagedAttribute(description = "Number of concurrent consumers")
public int getConcurrentConsumers() {
return concurrentConsumers;
}
@@ -169,6 +180,7 @@ public class SedaEndpoint extends Defaul
this.waitForTaskToComplete = waitForTaskToComplete;
}
+ @ManagedAttribute
public long getTimeout() {
return timeout;
}
@@ -177,6 +189,7 @@ public class SedaEndpoint extends Defaul
this.timeout = timeout;
}
+ @ManagedAttribute
public boolean isMultipleConsumers() {
return multipleConsumers;
}
@@ -196,6 +209,7 @@ public class SedaEndpoint extends Defaul
return new ArrayList<Exchange>(getQueue());
}
+ @ManagedAttribute
public boolean isMultipleConsumersSupported() {
return isMultipleConsumers();
}
@@ -203,6 +217,7 @@ public class SedaEndpoint extends Defaul
/**
* Purges the queue
*/
+ @ManagedOperation(description = "Purges the seda queue")
public void purgeQueue() {
queue.clear();
}
@@ -221,6 +236,74 @@ public class SedaEndpoint extends Defaul
return new HashSet<SedaProducer>(producers);
}
+ @ManagedOperation(description = "Current number of Exchanges in Queue")
+ public long queueSize() {
+ return getExchanges().size();
+ }
+
+ @ManagedOperation(description = "Get Exchange from queue by index")
+ public String browseExchange(Integer index) {
+ List<Exchange> exchanges = getExchanges();
+ if (index >= exchanges.size()) {
+ return null;
+ }
+ Exchange exchange = exchanges.get(index);
+ if (exchange == null) {
+ return null;
+ }
+ // must use java type with JMX such as java.lang.String
+ return exchange.toString();
+ }
+
+ @ManagedOperation(description = "Get message body from queue by index")
+ public String browseMessageBody(Integer index) {
+ List<Exchange> exchanges = getExchanges();
+ if (index >= exchanges.size()) {
+ return null;
+ }
+ Exchange exchange = exchanges.get(index);
+ if (exchange == null) {
+ return null;
+ }
+
+ // must use java type with JMX such as java.lang.String
+ String body;
+ if (exchange.hasOut()) {
+ body = exchange.getOut().getBody(String.class);
+ } else {
+ body = exchange.getIn().getBody(String.class);
+ }
+
+ return body;
+ }
+
+ @ManagedOperation(description = "Get message as XML from queue by index")
+ public String browseMessageAsXml(Integer index, Boolean includeBody) {
+ List<Exchange> exchanges = getExchanges();
+ if (index >= exchanges.size()) {
+ return null;
+ }
+ Exchange exchange = exchanges.get(index);
+ if (exchange == null) {
+ return null;
+ }
+
+ Message msg = exchange.hasOut() ? exchange.getOut() : exchange.getIn();
+ String xml = MessageHelper.dumpAsXml(msg, includeBody);
+
+ return xml;
+ }
+
+ @ManagedOperation(description = "Gets all the messages as XML from the queue")
+ public String browseAllMessagesAsXml(Boolean includeBody) {
+ return browseRangeMessagesAsXml(0, Integer.MAX_VALUE, includeBody);
+ }
+
+ @ManagedOperation(description = "Gets the range of messages as XML from the queue")
+ public String browseRangeMessagesAsXml(Integer fromIndex, Integer toIndex, Boolean includeBody) {
+ return EndpointHelper.browseRangeMessagesAsXml(this, fromIndex, toIndex, includeBody);
+ }
+
void onStarted(SedaProducer producer) {
producers.add(producer);
}
Modified: camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java?rev=1237810&r1=1237809&r2=1237810&view=diff
==============================================================================
--- camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java (original)
+++ camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java Mon Jan 30 17:26:56 2012
@@ -44,7 +44,6 @@ import org.apache.camel.management.mbean
import org.apache.camel.management.mbean.ManagedProducer;
import org.apache.camel.management.mbean.ManagedRoute;
import org.apache.camel.management.mbean.ManagedScheduledPollConsumer;
-import org.apache.camel.management.mbean.ManagedSedaEndpoint;
import org.apache.camel.management.mbean.ManagedSendProcessor;
import org.apache.camel.management.mbean.ManagedService;
import org.apache.camel.management.mbean.ManagedSuspendableRoute;
@@ -92,10 +91,6 @@ public class DefaultManagementObjectStra
if (endpoint instanceof org.apache.camel.spi.ManagementAware) {
return ((org.apache.camel.spi.ManagementAware<Endpoint>) endpoint).getManagedObject(endpoint);
- } else if (endpoint instanceof SedaEndpoint) {
- ManagedSedaEndpoint me = new ManagedSedaEndpoint((SedaEndpoint) endpoint);
- me.init(context.getManagementStrategy());
- return me;
} else if (endpoint instanceof BrowsableEndpoint) {
ManagedBrowsableEndpoint me = new ManagedBrowsableEndpoint((BrowsableEndpoint) endpoint);
me.init(context.getManagementStrategy());
Modified: camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/util/EndpointHelper.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/util/EndpointHelper.java?rev=1237810&r1=1237809&r2=1237810&view=diff
==============================================================================
--- camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/util/EndpointHelper.java (original)
+++ camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/util/EndpointHelper.java Mon Jan 30 17:26:56 2012
@@ -28,10 +28,12 @@ import java.util.regex.PatternSyntaxExce
import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
+import org.apache.camel.Message;
import org.apache.camel.PollingConsumer;
import org.apache.camel.Processor;
import org.apache.camel.ResolveEndpointFailedException;
import org.apache.camel.Route;
+import org.apache.camel.spi.BrowsableEndpoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -394,4 +396,44 @@ public final class EndpointHelper {
// not found
return null;
}
+
+ /**
+ * Browses the {@link BrowsableEndpoint} within the given range, and returns the messages as a XML payload.
+ *
+ * @param endpoint the browsable endpoint
+ * @param fromIndex from range
+ * @param toIndex to range
+ * @param includeBody whether to include the message body in the XML payload
+ * @return XML payload with the messages
+ * @throws IllegalArgumentException if the from and to range is invalid
+ * @see MessageHelper#dumpAsXml(org.apache.camel.Message)
+ */
+ public static String browseRangeMessagesAsXml(BrowsableEndpoint endpoint, Integer fromIndex, Integer toIndex, Boolean includeBody) {
+ if (fromIndex == null) {
+ fromIndex = 0;
+ }
+ if (toIndex == null) {
+ toIndex = Integer.MAX_VALUE;
+ }
+ if (fromIndex > toIndex) {
+ throw new IllegalArgumentException("From index cannot be larger than to index, was: " + fromIndex + " > " + toIndex);
+ }
+
+ List<Exchange> exchanges = endpoint.getExchanges();
+ if (exchanges.size() == 0) {
+ return null;
+ }
+
+ StringBuilder sb = new StringBuilder();
+ sb.append("<messages>");
+ for (int i = fromIndex; i < exchanges.size() && i <= toIndex; i++) {
+ Exchange exchange = exchanges.get(i);
+ Message msg = exchange.hasOut() ? exchange.getOut() : exchange.getIn();
+ String xml = MessageHelper.dumpAsXml(msg, includeBody);
+ sb.append("\n").append(xml);
+ }
+ sb.append("\n</messages>");
+ return sb.toString();
+ }
+
}
Propchange: camel/branches/camel-2.9.x/components/camel-jaxb/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Jan 30 17:26:56 2012
@@ -1 +1 @@
-/camel/trunk/components/camel-jaxb:1235643,1236403-1236663,1236667,1237148,1237666
+/camel/trunk/components/camel-jaxb:1235643,1236403-1236663,1236667,1237148,1237666,1237807
Propchange: camel/branches/camel-2.9.x/components/camel-jaxb/src/test/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Jan 30 17:26:56 2012
@@ -1 +1 @@
-/camel/trunk/components/camel-jaxb/src/test:1235643,1236403-1236663,1236667,1237148,1237230,1237666
+/camel/trunk/components/camel-jaxb/src/test:1235643,1236403-1236663,1236667,1237148,1237230,1237666,1237807
Modified: camel/branches/camel-2.9.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsQueueEndpoint.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsQueueEndpoint.java?rev=1237810&r1=1237809&r2=1237810&view=diff
==============================================================================
--- camel/branches/camel-2.9.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsQueueEndpoint.java (original)
+++ camel/branches/camel-2.9.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsQueueEndpoint.java Mon Jan 30 17:26:56 2012
@@ -27,6 +27,7 @@ import org.apache.camel.api.management.M
import org.apache.camel.api.management.ManagedOperation;
import org.apache.camel.api.management.ManagedResource;
import org.apache.camel.spi.BrowsableEndpoint;
+import org.apache.camel.util.EndpointHelper;
import org.apache.camel.util.MessageHelper;
import org.springframework.jms.core.JmsOperations;
@@ -162,31 +163,7 @@ public class JmsQueueEndpoint extends Jm
@ManagedOperation(description = "Gets the range of messages as XML from the queue")
public String browseRangeMessagesAsXml(Integer fromIndex, Integer toIndex, Boolean includeBody) {
- if (fromIndex == null) {
- fromIndex = 0;
- }
- if (toIndex == null) {
- toIndex = Integer.MAX_VALUE;
- }
- if (fromIndex > toIndex) {
- throw new IllegalArgumentException("From index cannot be larger than to index, was: " + fromIndex + " > " + toIndex);
- }
-
- List<Exchange> exchanges = getExchanges();
- if (exchanges.size() == 0) {
- return null;
- }
-
- StringBuilder sb = new StringBuilder();
- sb.append("<messages>");
- for (int i = fromIndex; i < exchanges.size() && i <= toIndex; i++) {
- Exchange exchange = exchanges.get(i);
- Message msg = exchange.hasOut() ? exchange.getOut() : exchange.getIn();
- String xml = MessageHelper.dumpAsXml(msg, includeBody);
- sb.append("\n").append(xml);
- }
- sb.append("\n</messages>");
- return sb.toString();
+ return EndpointHelper.browseRangeMessagesAsXml(this, fromIndex, toIndex, includeBody);
}
protected QueueBrowseStrategy createQueueBrowseStrategy() {
Modified: camel/branches/camel-2.9.x/components/camel-jms/src/test/java/org/apache/camel/component/jms/BrowsableQueueTest.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/components/camel-jms/src/test/java/org/apache/camel/component/jms/BrowsableQueueTest.java?rev=1237810&r1=1237809&r2=1237810&view=diff
==============================================================================
--- camel/branches/camel-2.9.x/components/camel-jms/src/test/java/org/apache/camel/component/jms/BrowsableQueueTest.java (original)
+++ camel/branches/camel-2.9.x/components/camel-jms/src/test/java/org/apache/camel/component/jms/BrowsableQueueTest.java Mon Jan 30 17:26:56 2012
@@ -62,6 +62,8 @@ public class BrowsableQueueTest extends
Object expected = expectedBodies[++index];
assertEquals("Body: " + index, expected, actual);
}
+
+ Thread.sleep(99999999);
}
@Test
@@ -115,6 +117,8 @@ public class BrowsableQueueTest extends
}
protected CamelContext createCamelContext() throws Exception {
+ enableJMX();
+
CamelContext camelContext = super.createCamelContext();
ConnectionFactory connectionFactory = CamelJmsTestHelper.createConnectionFactory();