You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by ge...@apache.org on 2008/10/23 12:22:47 UTC
svn commit: r707336 - in
/servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src:
main/java/org/apache/servicemix/camel/
test/java/org/apache/servicemix/camel/ test/resources/
Author: gertv
Date: Thu Oct 23 03:22:45 2008
New Revision: 707336
URL: http://svn.apache.org/viewvc?rev=707336&view=rev
Log:
SM-1486: servicemix-camel should use asynchronous messaging
Added:
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/main/java/org/apache/servicemix/camel/CamelConsumerEndpoint.java
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/main/java/org/apache/servicemix/camel/CamelProviderEndpoint.java
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/resources/log4j-tests.properties
Removed:
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/main/java/org/apache/servicemix/camel/CamelJbiEndpoint.java
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/main/java/org/apache/servicemix/camel/ToJbiProcessor.java
Modified:
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/main/java/org/apache/servicemix/camel/CamelJbiComponent.java
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/main/java/org/apache/servicemix/camel/CamelSpringDeployer.java
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/main/java/org/apache/servicemix/camel/JbiEndpoint.java
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/java/org/apache/servicemix/camel/JbiEndpointUsingNameUriIntegrationTest.java
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/java/org/apache/servicemix/camel/JbiInOutTest.java
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/java/org/apache/servicemix/camel/JbiTestSupport.java
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/java/org/apache/servicemix/camel/NonJbiCamelEndpointsIntegrationTest.java
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/java/org/apache/servicemix/camel/SendFromCamelToJbiTest.java
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/resources/log4j.properties
Added: servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/main/java/org/apache/servicemix/camel/CamelConsumerEndpoint.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/main/java/org/apache/servicemix/camel/CamelConsumerEndpoint.java?rev=707336&view=auto
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/main/java/org/apache/servicemix/camel/CamelConsumerEndpoint.java (added)
+++ servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/main/java/org/apache/servicemix/camel/CamelConsumerEndpoint.java Thu Oct 23 03:22:45 2008
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.camel;
+
+import java.net.URISyntaxException;
+import java.util.Set;
+
+import javax.jbi.management.DeploymentException;
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessagingException;
+import javax.jbi.messaging.NormalizedMessage;
+import javax.xml.namespace.QName;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.servicemix.common.endpoints.ConsumerEndpoint;
+import org.apache.servicemix.id.IdGenerator;
+import org.apache.servicemix.jbi.resolver.URIResolver;
+
+/**
+ * A consumer endpoint that will be used to send JBI exchanges
+ * originating from camel.
+ */
+public class CamelConsumerEndpoint extends ConsumerEndpoint implements AsyncProcessor {
+
+ public static final QName SERVICE_NAME = new QName("http://activemq.apache.org/camel/schema/jbi", "consumer");
+
+ private JbiBinding binding;
+
+ private JbiEndpoint jbiEndpoint;
+
+ public CamelConsumerEndpoint(JbiBinding binding, JbiEndpoint jbiEndpoint) {
+ setService(SERVICE_NAME);
+ setEndpoint(new IdGenerator().generateId());
+ this.binding = binding;
+ this.jbiEndpoint = jbiEndpoint;
+ }
+
+ public void process(MessageExchange messageExchange) throws Exception {
+ Exchange exchange = (Exchange) messageExchange.getProperty(Exchange.class.getName());
+ AsyncCallback asyncCallback = (AsyncCallback) messageExchange.getProperty(AsyncCallback.class.getName());
+ if (messageExchange.getStatus() == ExchangeStatus.ERROR) {
+ exchange.setException(messageExchange.getError());
+ } else if (messageExchange.getStatus() == ExchangeStatus.ACTIVE) {
+ addHeaders(messageExchange, exchange);
+ if (messageExchange.getFault() != null) {
+ exchange.getFault().setBody(messageExchange.getFault().getContent());
+ addHeaders(messageExchange.getFault(), exchange.getFault());
+ addAttachments(messageExchange.getFault(), exchange.getFault());
+ } else {
+ exchange.getOut().setBody(messageExchange.getMessage("out").getContent());
+ addHeaders(messageExchange.getMessage("out"), exchange.getOut());
+ addAttachments(messageExchange.getMessage("out"), exchange.getOut());
+ }
+ done(messageExchange);
+ }
+ asyncCallback.done(false);
+ }
+
+ public boolean process(Exchange exchange, AsyncCallback asyncCallback) {
+ try {
+ MessageExchange messageExchange = binding.makeJbiMessageExchange(exchange, getExchangeFactory(), jbiEndpoint.getMep());
+
+ if (jbiEndpoint.getOperation() != null) {
+ messageExchange.setOperation(QName.valueOf(jbiEndpoint.getOperation()));
+ }
+
+ URIResolver.configureExchange(messageExchange, getContext(), jbiEndpoint.getDestinationUri());
+ messageExchange.setProperty(Exchange.class.getName(), exchange);
+ messageExchange.setProperty(AsyncCallback.class.getName(), asyncCallback);
+
+ send(messageExchange);
+ return false;
+ } catch (MessagingException e) {
+ throw new JbiException(e);
+ } catch (URISyntaxException e) {
+ throw new JbiException(e);
+ }
+ }
+
+ public void process(Exchange exchange) throws Exception {
+ try {
+ MessageExchange messageExchange = binding.makeJbiMessageExchange(exchange, getExchangeFactory(), jbiEndpoint.getMep());
+
+ if (jbiEndpoint.getOperation() != null) {
+ messageExchange.setOperation(QName.valueOf(jbiEndpoint.getOperation()));
+ }
+
+ URIResolver.configureExchange(messageExchange, getContext(), jbiEndpoint.getDestinationUri());
+
+ sendSync(messageExchange);
+
+ if (messageExchange.getStatus() == ExchangeStatus.ERROR) {
+ exchange.setException(messageExchange.getError());
+ } else if (messageExchange.getStatus() == ExchangeStatus.ACTIVE) {
+ addHeaders(messageExchange, exchange);
+ if (messageExchange.getFault() != null) {
+ exchange.getFault().setBody(messageExchange.getFault().getContent());
+ addHeaders(messageExchange.getFault(), exchange.getFault());
+ addAttachments(messageExchange.getFault(), exchange.getFault());
+ } else {
+ exchange.getOut().setBody(messageExchange.getMessage("out").getContent());
+ addHeaders(messageExchange.getMessage("out"), exchange.getOut());
+ addAttachments(messageExchange.getMessage("out"), exchange.getOut());
+ }
+ done(messageExchange);
+ }
+
+ } catch (MessagingException e) {
+ throw new JbiException(e);
+ } catch (URISyntaxException e) {
+ throw new JbiException(e);
+ }
+ }
+
+ @Override
+ public String getLocationURI() {
+ return null;
+ }
+
+ @Override
+ public void validate() throws DeploymentException {
+ // No validation required
+ }
+
+ private void addHeaders(MessageExchange messageExchange, Exchange camelExchange) {
+ Set entries = messageExchange.getPropertyNames();
+ for (Object o : entries) {
+ String key = o.toString();
+ camelExchange.setProperty(key, messageExchange.getProperty(key));
+ }
+ }
+
+ private void addHeaders(NormalizedMessage normalizedMessage, Message camelMessage) {
+ Set entries = normalizedMessage.getPropertyNames();
+ for (Object o : entries) {
+ String key = o.toString();
+ camelMessage.setHeader(key, normalizedMessage.getProperty(key));
+ }
+ }
+
+ private void addAttachments(NormalizedMessage normalizedMessage, Message camelMessage) {
+ Set entries = normalizedMessage.getAttachmentNames();
+ for (Object o : entries) {
+ String id = o.toString();
+ camelMessage.addAttachment(id, normalizedMessage.getAttachment(id));
+ }
+ }
+
+}
Modified: servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/main/java/org/apache/servicemix/camel/CamelJbiComponent.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/main/java/org/apache/servicemix/camel/CamelJbiComponent.java?rev=707336&r1=707335&r2=707336&view=diff
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/main/java/org/apache/servicemix/camel/CamelJbiComponent.java (original)
+++ servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/main/java/org/apache/servicemix/camel/CamelJbiComponent.java Thu Oct 23 03:22:45 2008
@@ -74,8 +74,8 @@
* @see org.apache.servicemix.common.DefaultComponent#getConfiguredEndpoints()
*/
@Override
- protected List<CamelJbiEndpoint> getConfiguredEndpoints() {
- return new ArrayList<CamelJbiEndpoint>();
+ protected List<CamelProviderEndpoint> getConfiguredEndpoints() {
+ return new ArrayList<CamelProviderEndpoint>();
}
/**
@@ -84,7 +84,7 @@
*/
@Override
protected Class[] getEndpointClasses() {
- return new Class[] {CamelJbiEndpoint.class};
+ return new Class[] {CamelProviderEndpoint.class, CamelConsumerEndpoint.class};
}
/**
@@ -112,18 +112,18 @@
@Override
protected org.apache.servicemix.common.Endpoint getResolvedEPR(ServiceEndpoint ep) throws Exception {
- CamelJbiEndpoint endpoint = createEndpoint(ep);
+ CamelProviderEndpoint endpoint = createEndpoint(ep);
endpoint.activate();
return endpoint;
}
- public CamelJbiEndpoint createEndpoint(ServiceEndpoint ep) throws URISyntaxException {
+ public CamelProviderEndpoint createEndpoint(ServiceEndpoint ep) throws URISyntaxException {
URI uri = new URI(ep.getEndpointName());
Map map = URISupport.parseQuery(uri.getQuery());
String camelUri = uri.getSchemeSpecificPart();
Endpoint camelEndpoint = getCamelContext().getEndpoint(camelUri);
Processor processor = createCamelProcessor(camelEndpoint);
- CamelJbiEndpoint endpoint = new CamelJbiEndpoint(getServiceUnit(), camelEndpoint, getBinding(), processor);
+ CamelProviderEndpoint endpoint = new CamelProviderEndpoint(getServiceUnit(), camelEndpoint, getBinding(), processor);
IntrospectionSupport.setProperties(endpoint, map);
@@ -163,8 +163,8 @@
*
* @returns a JBI endpoint created for the given Camel endpoint
*/
- public CamelJbiEndpoint activateJbiEndpoint(Endpoint camelEndpoint, Processor processor) throws Exception {
- CamelJbiEndpoint jbiEndpoint = createJbiEndpointFromCamel(camelEndpoint, processor);
+ public CamelProviderEndpoint activateJbiEndpoint(Endpoint camelEndpoint, Processor processor) throws Exception {
+ CamelProviderEndpoint jbiEndpoint = createJbiEndpointFromCamel(camelEndpoint, processor);
// the following method will activate the new dynamic JBI endpoint
if (deployer != null) {
@@ -176,20 +176,20 @@
return jbiEndpoint;
}
- public void deactivateJbiEndpoint(CamelJbiEndpoint jbiEndpoint) throws Exception {
+ public void deactivateJbiEndpoint(CamelProviderEndpoint jbiEndpoint) throws Exception {
// this will be done by the ServiceUnit
// jbiEndpoint.deactivate();
}
- protected CamelJbiEndpoint createJbiEndpointFromCamel(Endpoint camelEndpoint, Processor processor) {
- CamelJbiEndpoint jbiEndpoint;
+ protected CamelProviderEndpoint createJbiEndpointFromCamel(Endpoint camelEndpoint, Processor processor) {
+ CamelProviderEndpoint jbiEndpoint;
String endpointUri = camelEndpoint.getEndpointUri();
if (camelEndpoint instanceof JbiEndpoint) {
QName service = null;
String endpoint = null;
if (endpointUri.startsWith("name:")) {
endpoint = endpointUri.substring("name:".length());
- service = CamelJbiEndpoint.SERVICE_NAME;
+ service = CamelProviderEndpoint.SERVICE_NAME;
} else if (endpointUri.startsWith("endpoint:")) {
String uri = endpointUri.substring("endpoint:".length());
// lets decode "serviceNamespace sep serviceName sep
@@ -225,9 +225,9 @@
+ "or jbi:service:[serviceNamespace][sep][serviceName or jbi:name:[endpointName] but was given: "
+ endpointUri);
}
- jbiEndpoint = new CamelJbiEndpoint(getServiceUnit(), service, endpoint, camelEndpoint, getBinding(), processor);
+ jbiEndpoint = new CamelProviderEndpoint(getServiceUnit(), service, endpoint, camelEndpoint, getBinding(), processor);
} else {
- jbiEndpoint = new CamelJbiEndpoint(getServiceUnit(), camelEndpoint, getBinding(), processor);
+ jbiEndpoint = new CamelProviderEndpoint(getServiceUnit(), camelEndpoint, getBinding(), processor);
}
return jbiEndpoint;
}
@@ -242,7 +242,7 @@
/**
* Returns a JBI endpoint created for the given Camel endpoint
*/
- public CamelJbiEndpoint createJbiEndpointFromCamel(Endpoint camelEndpoint) {
+ public CamelProviderEndpoint createJbiEndpointFromCamel(Endpoint camelEndpoint) {
Processor processor = createCamelProcessor(camelEndpoint);
return createJbiEndpointFromCamel(camelEndpoint, processor);
}
Added: servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/main/java/org/apache/servicemix/camel/CamelProviderEndpoint.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/main/java/org/apache/servicemix/camel/CamelProviderEndpoint.java?rev=707336&view=auto
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/main/java/org/apache/servicemix/camel/CamelProviderEndpoint.java (added)
+++ servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/main/java/org/apache/servicemix/camel/CamelProviderEndpoint.java Thu Oct 23 03:22:45 2008
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.camel;
+
+
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.InOnly;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.RobustInOnly;
+import javax.xml.namespace.QName;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.Processor;
+import org.apache.servicemix.JbiConstants;
+import org.apache.servicemix.common.ServiceUnit;
+import org.apache.servicemix.common.endpoints.ProviderEndpoint;
+
+/**
+ * A JBI endpoint which when invoked will delegate to a Camel endpoint
+ *
+ * @version $Revision: 426415 $
+ */
+public class CamelProviderEndpoint extends ProviderEndpoint {
+
+ public static final QName SERVICE_NAME = new QName("http://activemq.apache.org/camel/schema/jbi", "provider");
+
+ private Endpoint camelEndpoint;
+
+ private JbiBinding binding;
+
+ private Processor camelProcessor;
+
+ public CamelProviderEndpoint(ServiceUnit serviceUnit, QName service, String endpoint, Endpoint camelEndpoint, JbiBinding binding,
+ Processor camelProcessor) {
+ super(serviceUnit, service, endpoint);
+ this.camelProcessor = camelProcessor;
+ this.camelEndpoint = camelEndpoint;
+ this.binding = binding;
+ }
+
+ public CamelProviderEndpoint(ServiceUnit serviceUnit, Endpoint camelEndpoint, JbiBinding binding, Processor camelProcessor) {
+ this(serviceUnit, SERVICE_NAME, camelEndpoint.getEndpointUri(), camelEndpoint, binding, camelProcessor);
+ }
+
+ @Override
+ public void process(MessageExchange exchange) throws Exception {
+ // The component acts as a provider, this means that another component has requested our service
+ // As this exchange is active, this is either an in or a fault (out are sent by this component)
+
+ if (exchange.getRole() == MessageExchange.Role.PROVIDER) {
+ // Exchange is finished
+ if (exchange.getStatus() == ExchangeStatus.DONE) {
+ return;
+ // Exchange has been aborted with an exception
+ } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
+ return;
+ // Exchange is active
+ } else {
+ handleActiveProviderExchange(exchange);
+
+ }
+ // Unsupported role: this should never happen has we never create exchanges
+ } else {
+ throw new IllegalStateException("Unsupported role: " + exchange.getRole());
+ }
+ }
+
+
+ protected void handleActiveProviderExchange(MessageExchange exchange) throws Exception {
+ // Fault message
+ if (exchange.getFault() != null) {
+ done(exchange);
+ // In message
+ } else if (exchange.getMessage("in") != null) {
+ if (exchange instanceof InOnly || exchange instanceof RobustInOnly) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Received exchange: " + exchange);
+ }
+ JbiExchange camelExchange = new JbiExchange(camelEndpoint.getCamelContext(), binding, exchange);
+ camelProcessor.process(camelExchange);
+ if (camelExchange.isFailed()) {
+ Throwable t = camelExchange.getException();
+ Exception e;
+ if (t == null) {
+ e = new Exception("Unknown error");
+ } else if (t instanceof Exception) {
+ e = (Exception) t;
+ } else {
+ e = new Exception(t);
+ }
+ fail(exchange, e);
+ } else {
+ done(exchange);
+ }
+ } else {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Received exchange: " + exchange);
+ }
+ JbiExchange camelExchange = new JbiExchange(camelEndpoint.getCamelContext(), binding, exchange);
+ camelProcessor.process(camelExchange);
+ if (camelExchange.isFailed()) {
+ Throwable t = camelExchange.getException();
+ Exception e;
+ if (t == null) {
+ e = new Exception("Unknown error");
+ } else if (t instanceof Exception) {
+ e = (Exception) t;
+ } else {
+ e = new Exception(t);
+ }
+ fail(exchange, e);
+ } else {
+ boolean txSync = exchange.isTransacted() && Boolean.TRUE.equals(exchange.getProperty(JbiConstants.SEND_SYNC));
+ if (txSync) {
+ sendSync(exchange);
+ } else {
+ send(exchange);
+ }
+ }
+ }
+ // This is not compliant with the default MEPs
+ } else {
+ throw new IllegalStateException("Provider exchange is ACTIVE, but no in or fault is provided");
+ }
+ }
+
+}
Modified: servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/main/java/org/apache/servicemix/camel/CamelSpringDeployer.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/main/java/org/apache/servicemix/camel/CamelSpringDeployer.java?rev=707336&r1=707335&r2=707336&view=diff
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/main/java/org/apache/servicemix/camel/CamelSpringDeployer.java (original)
+++ servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/main/java/org/apache/servicemix/camel/CamelSpringDeployer.java Thu Oct 23 03:22:45 2008
@@ -51,7 +51,7 @@
}
};
- private List<CamelJbiEndpoint> activatedEndpoints = new ArrayList<CamelJbiEndpoint>();
+ private List<CamelProviderEndpoint> activatedEndpoints = new ArrayList<CamelProviderEndpoint>();
private String serviceUnitName;
@@ -85,14 +85,14 @@
return serviceUnit;
}
- public void addService(CamelJbiEndpoint endpoint) {
+ public void addService(CamelProviderEndpoint endpoint) {
activatedEndpoints.add(endpoint);
}
@Override
protected List getServices(Kernel kernel) {
try {
- List<CamelJbiEndpoint> services = new ArrayList<CamelJbiEndpoint>(activatedEndpoints);
+ List<org.apache.servicemix.common.Endpoint> services = new ArrayList<org.apache.servicemix.common.Endpoint>(activatedEndpoints);
activatedEndpoints.clear();
ApplicationContext applicationContext = springLoader.getApplicationContext();
Modified: servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/main/java/org/apache/servicemix/camel/JbiEndpoint.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/main/java/org/apache/servicemix/camel/JbiEndpoint.java?rev=707336&r1=707335&r2=707336&view=diff
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/main/java/org/apache/servicemix/camel/JbiEndpoint.java (original)
+++ servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/main/java/org/apache/servicemix/camel/JbiEndpoint.java Thu Oct 23 03:22:45 2008
@@ -19,7 +19,10 @@
import java.net.URISyntaxException;
import java.util.Map;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
import org.apache.camel.Consumer;
+import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
@@ -34,7 +37,6 @@
* @version $Revision: 563665 $
*/
public class JbiEndpoint extends DefaultEndpoint<Exchange> {
- private Processor toJbiProcessor;
private String destinationUri;
@@ -42,21 +44,52 @@
private String operation;
+ private JbiProducer producer;
+
private final CamelJbiComponent jbiComponent;
public JbiEndpoint(CamelJbiComponent jbiComponent, String uri) {
super(uri, jbiComponent);
this.jbiComponent = jbiComponent;
parseUri(uri);
- toJbiProcessor = new ToJbiProcessor(jbiComponent.getBinding(), jbiComponent.getComponentContext(), this);
}
- public Producer<Exchange> createProducer() throws Exception {
- return new DefaultProducer<Exchange>(this) {
- public void process(Exchange exchange) throws Exception {
- toJbiProcessor.process(exchange);
- }
- };
+ public synchronized Producer<Exchange> createProducer() throws Exception {
+ if (producer == null) {
+ producer = new JbiProducer(this);
+ }
+ return producer;
+ }
+
+ protected class JbiProducer extends DefaultProducer<Exchange> implements AsyncProcessor {
+
+ private CamelConsumerEndpoint consumer;
+
+ public JbiProducer(Endpoint<Exchange> exchangeEndpoint) {
+ super(exchangeEndpoint);
+ }
+
+ @Override
+ public void start() throws Exception {
+ consumer = new CamelConsumerEndpoint(jbiComponent.getBinding(), JbiEndpoint.this);
+ //consumer.start();
+ jbiComponent.addEndpoint(consumer);
+ super.start();
+ }
+ @Override
+ public void stop() throws Exception {
+ //consumer.stop();
+ jbiComponent.removeEndpoint(consumer);
+ super.stop();
+ }
+
+ public void process(Exchange exchange) throws Exception {
+ consumer.process(exchange);
+ }
+
+ public boolean process(Exchange exchange, AsyncCallback asyncCallback) {
+ return consumer.process(exchange, asyncCallback);
+ }
}
private void parseUri(String uri) {
@@ -103,7 +136,7 @@
public Consumer<Exchange> createConsumer(final Processor processor) throws Exception {
return new DefaultConsumer<Exchange>(this, processor) {
- CamelJbiEndpoint jbiEndpoint;
+ CamelProviderEndpoint jbiEndpoint;
@Override
protected void doStart() throws Exception {
Modified: servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/java/org/apache/servicemix/camel/JbiEndpointUsingNameUriIntegrationTest.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/java/org/apache/servicemix/camel/JbiEndpointUsingNameUriIntegrationTest.java?rev=707336&r1=707335&r2=707336&view=diff
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/java/org/apache/servicemix/camel/JbiEndpointUsingNameUriIntegrationTest.java (original)
+++ servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/java/org/apache/servicemix/camel/JbiEndpointUsingNameUriIntegrationTest.java Thu Oct 23 03:22:45 2008
@@ -39,7 +39,7 @@
protected void configureExchange(ServiceMixClient client,
MessageExchange exchange) {
ServiceEndpoint endpoint = client.getContext().getEndpoint(
- CamelJbiEndpoint.SERVICE_NAME, "cheese");
+ CamelProviderEndpoint.SERVICE_NAME, "cheese");
assertNotNull("Should have a Camel endpoint exposed in JBI!", endpoint);
exchange.setEndpoint(endpoint);
}
Modified: servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/java/org/apache/servicemix/camel/JbiInOutTest.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/java/org/apache/servicemix/camel/JbiInOutTest.java?rev=707336&r1=707335&r2=707336&view=diff
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/java/org/apache/servicemix/camel/JbiInOutTest.java (original)
+++ servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/java/org/apache/servicemix/camel/JbiInOutTest.java Thu Oct 23 03:22:45 2008
@@ -45,7 +45,7 @@
protected void configureExchange(ServiceMixClient client,
MessageExchange exchange) {
ServiceEndpoint endpoint = client.getContext().getEndpoint(
- CamelJbiEndpoint.SERVICE_NAME, "cheese");
+ CamelProviderEndpoint.SERVICE_NAME, "cheese");
assertNotNull("Should have a Camel endpoint exposed in JBI!", endpoint);
exchange.setEndpoint(endpoint);
}
Modified: servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/java/org/apache/servicemix/camel/JbiTestSupport.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/java/org/apache/servicemix/camel/JbiTestSupport.java?rev=707336&r1=707335&r2=707336&view=diff
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/java/org/apache/servicemix/camel/JbiTestSupport.java (original)
+++ servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/java/org/apache/servicemix/camel/JbiTestSupport.java Thu Oct 23 03:22:45 2008
@@ -20,9 +20,11 @@
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import javax.xml.namespace.QName;
+import org.apache.camel.AsyncCallback;
import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
@@ -35,6 +37,7 @@
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.servicemix.jbi.container.ActivationSpec;
import org.apache.servicemix.jbi.container.SpringJBIContainer;
+import org.apache.servicemix.tck.ExchangeCompletedListener;
/**
* @version $Revision: 563665 $
@@ -46,6 +49,8 @@
protected SpringJBIContainer jbiContainer = new SpringJBIContainer();
+ protected ExchangeCompletedListener exchangeCompletedListener;
+
protected CountDownLatch latch = new CountDownLatch(1);
protected Endpoint<Exchange> endpoint;
@@ -67,6 +72,26 @@
});
}
+ /**
+ * Sends an exchange to the endpoint
+ */
+ protected AtomicBoolean sendExchangeAsync(final Object expectedBody) {
+ final AtomicBoolean bool = new AtomicBoolean();
+ client.send(endpoint, new Processor() {
+ public void process(Exchange exchange) {
+ Message in = exchange.getIn();
+ in.setBody(expectedBody);
+ in.setHeader("cheese", 123);
+ }
+ }, new AsyncCallback() {
+ public void done(boolean b) {
+ bool.set(true);
+ bool.notify();
+ }
+ });
+ return bool;
+ }
+
protected Object assertReceivedValidExchange(Class type) throws Exception {
// lets wait on the message being received
boolean received = latch.await(5, TimeUnit.SECONDS);
@@ -84,6 +109,7 @@
@Override
protected void setUp() throws Exception {
jbiContainer.setEmbedded(true);
+ exchangeCompletedListener = new ExchangeCompletedListener();
CamelJbiComponent component = new CamelJbiComponent();
@@ -103,6 +129,7 @@
.toArray(new ActivationSpec[activationSpecList.size()]);
jbiContainer.setActivationSpecs(activationSpecs);
jbiContainer.afterPropertiesSet();
+ jbiContainer.addListener(exchangeCompletedListener);
// lets configure some componnets
camelContext.addComponent("jbi", component);
Modified: servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/java/org/apache/servicemix/camel/NonJbiCamelEndpointsIntegrationTest.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/java/org/apache/servicemix/camel/NonJbiCamelEndpointsIntegrationTest.java?rev=707336&r1=707335&r2=707336&view=diff
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/java/org/apache/servicemix/camel/NonJbiCamelEndpointsIntegrationTest.java (original)
+++ servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/java/org/apache/servicemix/camel/NonJbiCamelEndpointsIntegrationTest.java Thu Oct 23 03:22:45 2008
@@ -151,7 +151,7 @@
protected void configureExchange(ServiceMixClient client,
MessageExchange exchange) {
ServiceEndpoint endpoint = client.getContext().getEndpoint(
- CamelJbiEndpoint.SERVICE_NAME, "camel:su1-controlBus");
+ CamelProviderEndpoint.SERVICE_NAME, "camel:su1-controlBus");
assertNotNull("Should have a Camel endpoint exposed in JBI!", endpoint);
exchange.setEndpoint(endpoint);
}
Modified: servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/java/org/apache/servicemix/camel/SendFromCamelToJbiTest.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/java/org/apache/servicemix/camel/SendFromCamelToJbiTest.java?rev=707336&r1=707335&r2=707336&view=diff
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/java/org/apache/servicemix/camel/SendFromCamelToJbiTest.java (original)
+++ servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/java/org/apache/servicemix/camel/SendFromCamelToJbiTest.java Thu Oct 23 03:22:45 2008
@@ -30,10 +30,11 @@
* @version $Revision: 563665 $
*/
public class SendFromCamelToJbiTest extends JbiTestSupport {
+
private ReceiverComponent receiverComponent = new ReceiverComponent();
public void testCamelInvokingJbi() throws Exception {
- sendExchange("<foo bar='123'/>");
+ sendExchangeAsync("<foo bar='123'/>");
MessageList list = receiverComponent.getMessageList();
list.assertMessagesReceived(1);
@@ -43,6 +44,8 @@
log.info("Received: " + message);
assertEquals("cheese header", 123, message.getProperty("cheese"));
+
+ exchangeCompletedListener.assertExchangeCompleted();
}
@Override
@@ -56,14 +59,12 @@
}
@Override
- protected void appendJbiActivationSpecs(
- List<ActivationSpec> activationSpecList) {
+ protected void appendJbiActivationSpecs(List<ActivationSpec> activationSpecList) {
ActivationSpec activationSpec = new ActivationSpec();
activationSpec.setId("jbiReceiver");
activationSpec.setService(new QName("serviceNamespace", "serviceA"));
activationSpec.setEndpoint("endpointA");
activationSpec.setComponent(receiverComponent);
-
activationSpecList.add(activationSpec);
}
Added: servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/resources/log4j-tests.properties
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/resources/log4j-tests.properties?rev=707336&view=auto
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/resources/log4j-tests.properties (added)
+++ servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/resources/log4j-tests.properties Thu Oct 23 03:22:45 2008
@@ -0,0 +1,38 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+#
+# The logging properties used during tests..
+#
+log4j.rootLogger=DEBUG, out
+
+log4j.logger.org.apache.activemq=INFO
+log4j.logger.org.apache.activemq.spring=WARN
+log4j.logger.org.apache.activemq.store.journal=INFO
+log4j.logger.org.activeio.journal=INFO
+
+# CONSOLE appender not used by default
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
+
+# File appender
+log4j.appender.out=org.apache.log4j.FileAppender
+log4j.appender.out.layout=org.apache.log4j.PatternLayout
+log4j.appender.out.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
+log4j.appender.out.file=target/servicemix-test.log
+log4j.appender.out.append=true
Modified: servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/resources/log4j.properties?rev=707336&r1=707335&r2=707336&view=diff
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/resources/log4j.properties (original)
+++ servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-camel/src/test/resources/log4j.properties Thu Oct 23 03:22:45 2008
@@ -1,36 +1,31 @@
#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
-# implied.
-#
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
#
#
# The logging properties used during tests..
#
-log4j.rootLogger=INFO, out
+log4j.rootLogger=DEBUG, stdout
-log4j.logger.org.springframework=INFO
log4j.logger.org.apache.activemq=INFO
log4j.logger.org.apache.activemq.spring=WARN
+log4j.logger.org.apache.activemq.store.journal=INFO
+log4j.logger.org.activeio.journal=INFO
-#log4j.logger.org.apache.camel=DEBUG
-#log4j.logger.org.apache.servicemix=DEBUG
-
-# CONSOLE appender
+# CONSOLE appender not used by default
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
@@ -39,5 +34,5 @@
log4j.appender.out=org.apache.log4j.FileAppender
log4j.appender.out.layout=org.apache.log4j.PatternLayout
log4j.appender.out.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
-log4j.appender.out.file=target/camel-test.log
+log4j.appender.out.file=target/servicemix-test.log
log4j.appender.out.append=true