You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2009/06/29 09:01:56 UTC
svn commit: r789236 - in /camel/trunk/components/camel-jms/src:
main/java/org/apache/camel/component/jms/JmsBinding.java
main/java/org/apache/camel/component/jms/JmsProducer.java
test/java/org/apache/camel/component/jms/JmsProducerWithJMSHeaderTest.java
Author: davsclaus
Date: Mon Jun 29 07:01:56 2009
New Revision: 789236
URL: http://svn.apache.org/viewvc?rev=789236&view=rev
Log:
CAMEL-1689: JMSProducer is a bit easier to read in its process as divided into inout and inonly.
Added:
camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsProducerWithJMSHeaderTest.java (with props)
Modified:
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java?rev=789236&r1=789235&r2=789236&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java Mon Jun 29 07:01:56 2009
@@ -265,9 +265,9 @@
jmsMessage.setJMSReplyTo(ExchangeHelper.convertToType(exchange, Destination.class, headerValue));
} else if (headerName.equals("JMSType")) {
jmsMessage.setJMSType(ExchangeHelper.convertToType(exchange, String.class, headerValue));
- } else if (LOG.isDebugEnabled()) {
+ } else if (LOG.isTraceEnabled()) {
// The following properties are set by the MessageProducer:
- // JMSDeliveryMode, JMSDestination, JMSExpiration, JMSPriorit
+ // JMSDeliveryMode, JMSDestination, JMSExpiration, JMSPriority
// The following are set on the underlying JMS provider:
// JMSMessageID, JMSTimestamp, JMSRedelivered
// log at trace level to not spam log
Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java?rev=789236&r1=789235&r2=789236&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java Mon Jun 29 07:01:56 2009
@@ -142,6 +142,16 @@
}
public void process(final Exchange exchange) {
+ if (exchange.getPattern().isOutCapable()) {
+ // in out requires a bit more work than in only
+ processInOut(exchange);
+ } else {
+ // in only
+ processInOnly(exchange);
+ }
+ }
+
+ protected void processInOut(final Exchange exchange) {
final org.apache.camel.Message in = exchange.getIn();
String destinationName = endpoint.getDestinationName();
@@ -149,163 +159,174 @@
if (destination == null) {
destination = endpoint.getDestination();
}
- if (exchange.getPattern().isOutCapable()) {
- testAndSetRequestor();
+ testAndSetRequestor();
- // note due to JMS transaction semantics we cannot use a single transaction
- // for sending the request and receiving the response
- final Destination replyTo = requestor.getReplyTo();
+ // note due to JMS transaction semantics we cannot use a single transaction
+ // for sending the request and receiving the response
+ final Destination replyTo = requestor.getReplyTo();
- if (replyTo == null) {
- throw new RuntimeExchangeException("Failed to resolve replyTo destination", exchange);
- }
+ if (replyTo == null) {
+ throw new RuntimeExchangeException("Failed to resolve replyTo destination", exchange);
+ }
- final boolean msgIdAsCorrId = endpoint.getConfiguration().isUseMessageIDAsCorrelationID();
- String correlationId = in.getHeader("JMSCorrelationID", String.class);
+ final boolean msgIdAsCorrId = endpoint.getConfiguration().isUseMessageIDAsCorrelationID();
+ String correlationId = in.getHeader("JMSCorrelationID", String.class);
- if (correlationId == null && !msgIdAsCorrId) {
- in.setHeader("JMSCorrelationID", getUuidGenerator().generateId());
- }
+ if (correlationId == null && !msgIdAsCorrId) {
+ in.setHeader("JMSCorrelationID", getUuidGenerator().generateId());
+ }
- final ValueHolder<FutureTask> futureHolder = new ValueHolder<FutureTask>();
- final DeferredMessageSentCallback callback = msgIdAsCorrId ? deferredRequestReplyMap.createDeferredMessageSentCallback() : null;
+ final ValueHolder<FutureTask> futureHolder = new ValueHolder<FutureTask>();
+ final DeferredMessageSentCallback callback = msgIdAsCorrId ? deferredRequestReplyMap.createDeferredMessageSentCallback() : null;
- MessageCreator messageCreator = new MessageCreator() {
- public Message createMessage(Session session) throws JMSException {
- Message message = endpoint.getBinding().makeJmsMessage(exchange, in, session, null);
- message.setJMSReplyTo(replyTo);
- requestor.setReplyToSelectorHeader(in, message);
+ MessageCreator messageCreator = new MessageCreator() {
+ public Message createMessage(Session session) throws JMSException {
+ Message message = endpoint.getBinding().makeJmsMessage(exchange, in, session, null);
+ message.setJMSReplyTo(replyTo);
+ requestor.setReplyToSelectorHeader(in, message);
- FutureTask future;
- future = (!msgIdAsCorrId)
- ? requestor.getReceiveFuture(message.getJMSCorrelationID(), endpoint.getConfiguration().getRequestTimeout())
- : requestor.getReceiveFuture(callback);
+ FutureTask future;
+ future = (!msgIdAsCorrId)
+ ? requestor.getReceiveFuture(message.getJMSCorrelationID(), endpoint.getConfiguration().getRequestTimeout())
+ : requestor.getReceiveFuture(callback);
- futureHolder.set(future);
+ futureHolder.set(future);
- if (LOG.isDebugEnabled()) {
- LOG.debug(endpoint + " sending JMS message: " + message);
- }
- return message;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(endpoint + " sending JMS message: " + message);
}
- };
+ return message;
+ }
+ };
- CamelJmsTemplate template = null;
- CamelJmsTeemplate102 template102 = null;
- if (endpoint.isUseVersion102()) {
- template102 = (CamelJmsTeemplate102)getInOutTemplate();
+ CamelJmsTemplate template = null;
+ CamelJmsTeemplate102 template102 = null;
+ if (endpoint.isUseVersion102()) {
+ template102 = (CamelJmsTeemplate102)getInOutTemplate();
+ } else {
+ template = (CamelJmsTemplate)getInOutTemplate();
+ }
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Using JMS API " + (endpoint.isUseVersion102() ? "v1.0.2" : "v1.1"));
+ }
+
+ if (destinationName != null) {
+ if (template != null) {
+ template.send(destinationName, messageCreator, callback);
} else {
- template = (CamelJmsTemplate)getInOutTemplate();
+ template102.send(destinationName, messageCreator, callback);
}
- if (LOG.isDebugEnabled()) {
- LOG.debug("Using JMS API " + (endpoint.isUseVersion102() ? "v1.0.2" : "v1.1"));
+ } else if (destination != null) {
+ if (template != null) {
+ template.send(destination, messageCreator, callback);
+ } else {
+ template102.send(destination, messageCreator, callback);
}
+ } else {
+ throw new IllegalArgumentException("Neither destination nor destinationName is specified on this endpoint: " + endpoint);
+ }
+
+ setMessageId(exchange);
- if (destinationName != null) {
- if (template != null) {
- template.send(destinationName, messageCreator, callback);
+ // lets wait and return the response
+ long requestTimeout = endpoint.getConfiguration().getRequestTimeout();
+ try {
+ Message message = null;
+ try {
+ if (requestTimeout < 0) {
+ message = (Message)futureHolder.get().get();
} else {
- template102.send(destinationName, messageCreator, callback);
+ message = (Message)futureHolder.get().get(requestTimeout, TimeUnit.MILLISECONDS);
}
- } else if (destination != null) {
- if (template != null) {
- template.send(destination, messageCreator, callback);
- } else {
- template102.send(destination, messageCreator, callback);
+ } catch (InterruptedException e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Future interrupted: " + e, e);
+ }
+ } catch (TimeoutException e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Future timed out: " + e, e);
}
- } else {
- throw new IllegalArgumentException("Neither destination nor destinationName is specified on this endpoint: " + endpoint);
}
+ if (message != null) {
+ // the response can be an exception
+ JmsMessage response = new JmsMessage(message, endpoint.getBinding());
+ Object body = response.getBody();
- setMessageId(exchange);
-
- // lets wait and return the response
- long requestTimeout = endpoint.getConfiguration().getRequestTimeout();
- try {
- Message message = null;
- try {
- if (requestTimeout < 0) {
- message = (Message)futureHolder.get().get();
- } else {
- message = (Message)futureHolder.get().get(requestTimeout, TimeUnit.MILLISECONDS);
- }
- } catch (InterruptedException e) {
+ if (endpoint.isTransferException() && body instanceof Exception) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Future interupted: " + e, e);
+ LOG.debug("Reply recieved. Setting reply as Exception: " + body);
}
- } catch (TimeoutException e) {
+ // we got an exception back and endpoint was configued to transfer exception
+ // therefore set response as exception
+ exchange.setException((Exception) body);
+ } else {
if (LOG.isDebugEnabled()) {
- LOG.debug("Future timed out: " + e, e);
+ LOG.debug("Reply recieved. Setting reply as OUT message: " + body);
}
+ // regular response
+ exchange.setOut(response);
}
- if (message != null) {
- // the response can be an exception
- JmsMessage response = new JmsMessage(message, endpoint.getBinding());
- Object body = response.getBody();
-
- if (endpoint.isTransferException() && body instanceof Exception) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Reply recieved. Setting reply as Exception: " + body);
- }
- // we got an exception back and endpoint was configued to transfer exception
- // therefore set response as exception
- exchange.setException((Exception) body);
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Reply recieved. Setting reply as OUT message: " + body);
- }
- // regular response
- exchange.setOut(response);
- }
- // correlation
- if (correlationId != null) {
- message.setJMSCorrelationID(correlationId);
- exchange.getOut().setHeader("JMSCorrelationID", correlationId);
- }
- } else {
- // no response, so lets set a timed out exception
- exchange.setException(new ExchangeTimedOutException(exchange, requestTimeout));
- }
- } catch (Exception e) {
- exchange.setException(e);
- }
- } else {
- // we must honor these special flags to preverse QoS
- if (!endpoint.isPreserveMessageQos() && !endpoint.isExplicitQosEnabled()) {
- Object replyTo = exchange.getIn().getHeader("JMSReplyTo");
- if (replyTo != null) {
- // we are routing an existing JmsMessage, origin from another JMS endpoint
- // then we need to remove the existing JMSReplyTo
- // as we are not out capable and thus do not expect a reply, and therefore
- // the consumer of this message we send should not return a reply
- String to = destinationName != null ? destinationName : "" + destination;
- LOG.warn("Disabling JMSReplyTo as this Exchange is not OUT capable with JMSReplyTo: " + replyTo + " to destination: " + to + " for Exchange: " + exchange);
- exchange.getIn().setHeader("JMSReplyTo", null);
+ // correlation
+ if (correlationId != null) {
+ message.setJMSCorrelationID(correlationId);
+ exchange.getOut().setHeader("JMSCorrelationID", correlationId);
}
+ } else {
+ // no response, so lets set a timed out exception
+ exchange.setException(new ExchangeTimedOutException(exchange, requestTimeout));
}
+ } catch (Exception e) {
+ exchange.setException(e);
+ }
- MessageCreator messageCreator = new MessageCreator() {
- public Message createMessage(Session session) throws JMSException {
- Message message = endpoint.getBinding().makeJmsMessage(exchange, in, session, null);
- if (LOG.isDebugEnabled()) {
- LOG.debug(endpoint + " sending JMS message: " + message);
- }
- return message;
- }
- };
- if (destination != null) {
- getInOnlyTemplate().send(destination, messageCreator);
- } else if (destinationName != null) {
- getInOnlyTemplate().send(destinationName, messageCreator);
- } else {
- throw new IllegalArgumentException("Neither destination nor "
- + "destinationName are specified on this endpoint: " + endpoint);
+ }
+
+ protected void processInOnly(final Exchange exchange) {
+ final org.apache.camel.Message in = exchange.getIn();
+
+ String destinationName = endpoint.getDestinationName();
+ Destination destination = exchange.getProperty(JmsConstants.JMS_DESTINATION, Destination.class);
+ if (destination == null) {
+ destination = endpoint.getDestination();
+ }
+
+ // we must honor these special flags to preverse QoS
+ if (!endpoint.isPreserveMessageQos() && !endpoint.isExplicitQosEnabled()) {
+ Object replyTo = exchange.getIn().getHeader("JMSReplyTo");
+ if (replyTo != null) {
+ // we are routing an existing JmsMessage, origin from another JMS endpoint
+ // then we need to remove the existing JMSReplyTo
+ // as we are not out capable and thus do not expect a reply, and therefore
+ // the consumer of this message we send should not return a reply
+ String to = destinationName != null ? destinationName : "" + destination;
+ LOG.warn("Disabling JMSReplyTo as this Exchange is not OUT capable with JMSReplyTo: " + replyTo + " to destination: " + to + " for Exchange: " + exchange);
+ exchange.getIn().setHeader("JMSReplyTo", null);
}
+ }
- setMessageId(exchange);
+ MessageCreator messageCreator = new MessageCreator() {
+ public Message createMessage(Session session) throws JMSException {
+ Message message = endpoint.getBinding().makeJmsMessage(exchange, in, session, null);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(endpoint + " sending JMS message: " + message);
+ }
+
+ return message;
+ }
+ };
+
+ if (destination != null) {
+ getInOnlyTemplate().send(destination, messageCreator);
+ } else if (destinationName != null) {
+ getInOnlyTemplate().send(destinationName, messageCreator);
+ } else {
+ throw new IllegalArgumentException("Neither destination nor "
+ + "destinationName are specified on this endpoint: " + endpoint);
}
+
+ setMessageId(exchange);
}
protected void setMessageId(Exchange exchange) {
Added: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsProducerWithJMSHeaderTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsProducerWithJMSHeaderTest.java?rev=789236&view=auto
==============================================================================
--- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsProducerWithJMSHeaderTest.java (added)
+++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsProducerWithJMSHeaderTest.java Mon Jun 29 07:01:56 2009
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms;
+
+import javax.jms.ConnectionFactory;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.camel.CamelContext;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Test;
+import static org.apache.camel.component.jms.JmsComponent.jmsComponentClientAcknowledge;
+
+/**
+ * @version $Revision$
+ */
+public class JmsProducerWithJMSHeaderTest extends ContextTestSupport {
+
+ @Test
+ public void testInOnlyJMSPrioritory() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMessageCount(1);
+ // TODO: CAMEL-1689
+ // mock.message(0).header("JMSPriority").isEqualTo(2);
+
+ template.sendBodyAndHeader("activemq:queue:foo", "Hello World", "JMSPriority", "2");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ protected CamelContext createCamelContext() throws Exception {
+ CamelContext camelContext = super.createCamelContext();
+
+ ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
+ camelContext.addComponent("activemq", jmsComponentClientAcknowledge(connectionFactory));
+
+ return camelContext;
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("activemq:queue:foo").to("mock:result");
+ }
+ };
+ }
+}
Propchange: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsProducerWithJMSHeaderTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsProducerWithJMSHeaderTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date