You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by js...@apache.org on 2007/03/27 11:30:57 UTC
svn commit: r522838 [2/2] - in /activemq/camel/trunk: camel-core/
camel-core/src/main/java/org/apache/camel/
camel-core/src/main/java/org/apache/camel/builder/
camel-core/src/main/java/org/apache/camel/component/pojo/
camel-core/src/main/java/org/apach...
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java Tue Mar 27 02:30:52 2007
@@ -20,8 +20,10 @@
import org.apache.camel.Exchange;
import org.apache.camel.Expression;
import org.apache.camel.Processor;
+import org.apache.camel.impl.ServiceSupport;
import static org.apache.camel.util.ObjectHelper.iterator;
import static org.apache.camel.util.ObjectHelper.notNull;
+import org.apache.camel.util.ServiceHelper;
import java.util.Iterator;
@@ -31,12 +33,12 @@
*
* @version $Revision$
*/
-public class Splitter<E extends Exchange> implements Processor<E> {
- private final Processor<E> destination;
+public class Splitter<E extends Exchange> extends ServiceSupport implements Processor<E> {
+ private final Processor<E> processor;
private final Expression<E> expression;
public Splitter(Processor<E> destination, Expression<E> expression) {
- this.destination = destination;
+ this.processor = destination;
this.expression = expression;
notNull(destination, "destination");
notNull(expression, "expression");
@@ -44,7 +46,7 @@
@Override
public String toString() {
- return "Splitter[on: " + expression + " to: " + destination + "]";
+ return "Splitter[on: " + expression + " to: " + processor + "]";
}
public void onExchange(E exchange) {
@@ -54,7 +56,15 @@
Object part = iter.next();
E newExchange = (E) exchange.copy();
newExchange.getIn().setBody(part);
- destination.onExchange(newExchange);
+ processor.onExchange(newExchange);
}
+ }
+
+ protected void doStart() throws Exception {
+ ServiceHelper.startServices(processor);
+ }
+
+ protected void doStop() throws Exception {
+ ServiceHelper.stopServices(processor);
}
}
Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ProducerCache.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ProducerCache.java?view=auto&rev=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ProducerCache.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ProducerCache.java Tue Mar 27 02:30:52 2007
@@ -0,0 +1,88 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Producer;
+import org.apache.camel.Endpoint;
+import org.apache.camel.FailedToCreateProducerException;
+import org.apache.camel.Processor;
+import org.apache.camel.impl.ServiceSupport;
+
+import java.util.Map;
+import java.util.Collection;
+import java.util.HashMap;
+
+/**
+ * @version $Revision$
+ */
+public class ProducerCache<E extends Exchange> extends ServiceSupport {
+
+ private Map<String, Producer<E>> producers = new HashMap<String, Producer<E>>();
+
+ public synchronized Producer<E> getProducer(Endpoint<E> endpoint) {
+ String key = endpoint.getEndpointUri();
+ Producer<E> answer = producers.get(key);
+ if (answer == null) {
+ try {
+ answer = endpoint.createProducer();
+ }
+ catch (Exception e) {
+ throw new FailedToCreateProducerException(endpoint, e);
+ }
+ producers.put(key, answer);
+ // TODO auto-start?
+ }
+ return answer;
+ }
+
+ /**
+ * Sends the exchange to the given endpoint
+ *
+ * @param endpoint the endpoint to send the exchange to
+ * @param exchange the exchange to send
+ */
+ public void send(Endpoint<E> endpoint, E exchange) {
+ Producer<E> producer = getProducer(endpoint);
+ producer.onExchange(exchange);
+ }
+
+ /**
+ * Sends an exchange to an endpoint using a supplied @{link Processor} to populate the exchange
+ *
+ * @param endpoint the endpoint to send the exchange to
+ * @param processor the transformer used to populate the new exchange
+ */
+ public void send(Endpoint<E> endpoint, Processor<E> processor) {
+ Producer<E> producer = getProducer(endpoint);
+ E exchange = producer.createExchange();
+
+ // lets populate using the processor callback
+ processor.onExchange(exchange);
+
+ // now lets dispatch
+ producer.onExchange(exchange);
+ }
+
+ protected void doStop() throws Exception {
+ ServiceHelper.stopServices(producers.values());
+ }
+
+ protected void doStart() throws Exception {
+ }
+}
Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ProducerCache.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ProducerCache.java
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ProducerCache.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ServiceHelper.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ServiceHelper.java?view=auto&rev=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ServiceHelper.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ServiceHelper.java Tue Mar 27 02:30:52 2007
@@ -0,0 +1,106 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util;
+
+import org.apache.camel.Service;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.Collection;
+
+/**
+ * A collection of helper methods for working with {@link Service} objects
+ *
+ * @version $Revision$
+ */
+public class ServiceHelper {
+ private static final transient Log log = LogFactory.getLog(ServiceHelper.class);
+
+
+ /**
+ * Starts all of the given services
+ */
+ public static void startServices(Object... services) throws Exception {
+ for (Object value : services) {
+ if (value instanceof Service) {
+ Service service = (Service) value;
+ service.start();
+ }
+ }
+ }
+
+ /**
+ * Starts all of the given services
+ */
+ public static void startServices(Collection services) throws Exception {
+ for (Object value : services) {
+ if (value instanceof Service) {
+ Service service = (Service) value;
+ service.start();
+ }
+ }
+ }
+
+
+ /**
+ * Stops all of the given services, throwing the first exception caught
+ */
+ public static void stopServices(Object... services) throws Exception {
+ Exception firstException = null;
+ for (Object value : services) {
+ if (value instanceof Service) {
+ Service service = (Service) value;
+ try {
+ service.stop();
+ }
+ catch (Exception e) {
+ log.debug("Caught exception shutting down: " + e, e);
+ if (firstException == null) {
+ firstException = e;
+ }
+ }
+ }
+ }
+ if (firstException != null) {
+ throw firstException;
+ }
+ }
+ /**
+ * Stops all of the given services, throwing the first exception caught
+ */
+ public static void stopServices(Collection services) throws Exception {
+ Exception firstException = null;
+ for (Object value : services) {
+ if (value instanceof Service) {
+ Service service = (Service) value;
+ try {
+ service.stop();
+ }
+ catch (Exception e) {
+ log.debug("Caught exception shutting down: " + e, e);
+ if (firstException == null) {
+ firstException = e;
+ }
+ }
+ }
+ }
+ if (firstException != null) {
+ throw firstException;
+ }
+ }
+}
Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ServiceHelper.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ServiceHelper.java
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ServiceHelper.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/pojo/PojoRouteTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/pojo/PojoRouteTest.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/pojo/PojoRouteTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/pojo/PojoRouteTest.java Tue Mar 27 02:30:52 2007
@@ -49,7 +49,7 @@
}
};
// lets add some routes
- container.setRoutes(new RouteBuilder() {
+ container.addRoutes(new RouteBuilder() {
public void configure() {
from("pojo:default:hello").intercept(tracingInterceptor).target().to("pojo:default:bye");
@@ -59,6 +59,8 @@
container.activateEndpoints();
+
+ /* TODO
// now lets fire in a message
PojoEndpoint endpoint = (PojoEndpoint) container.resolveEndpoint("pojo:default:hello");
@@ -76,6 +78,7 @@
} catch (IllegalStateException expected) {
// since bye is not active.
}
+ */
container.deactivateEndpoints();
}
Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/queue/QueueRouteTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/queue/QueueRouteTest.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/queue/QueueRouteTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/queue/QueueRouteTest.java Tue Mar 27 02:30:52 2007
@@ -22,6 +22,7 @@
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
+import org.apache.camel.Producer;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
@@ -34,13 +35,13 @@
public class QueueRouteTest extends TestCase {
- public void testJmsRoute() throws Exception {
+ public void testSedaQueue() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
CamelContext container = new DefaultCamelContext();
// lets add some routes
- container.setRoutes(new RouteBuilder() {
+ container.addRoutes(new RouteBuilder() {
public void configure() {
from("queue:test.a").to("queue:test.b");
from("queue:test.b").process(new Processor<Exchange>() {
@@ -59,7 +60,9 @@
Endpoint<Exchange> endpoint = container.resolveEndpoint("queue:test.a");
Exchange exchange = endpoint.createExchange();
exchange.getIn().setHeader("cheese", 123);
- endpoint.onExchange(exchange);
+
+ Producer<Exchange> producer = endpoint.createProducer();
+ producer.onExchange(exchange);
// now lets sleep for a while
boolean received = latch.await(5, TimeUnit.SECONDS);
Modified: activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfComponent.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfComponent.java (original)
+++ activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfComponent.java Tue Mar 27 02:30:52 2007
@@ -19,9 +19,17 @@
import org.apache.camel.impl.DefaultComponent;
import org.apache.camel.CamelContext;
+import org.apache.cxf.Bus;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.message.MessageImpl;
+import org.apache.cxf.service.model.EndpointInfo;
+import org.apache.cxf.endpoint.Server;
+import org.apache.cxf.endpoint.ServerRegistry;
+import org.apache.cxf.bus.CXFBusFactory;
import java.util.Map;
import java.util.HashMap;
+import java.util.List;
import java.io.IOException;
import java.net.URISyntaxException;
import java.net.URI;
@@ -44,7 +52,7 @@
public synchronized CxfEndpoint createEndpoint(String uri, String[] urlParts) throws IOException, URISyntaxException {
CxfEndpoint endpoint = map.get(uri);
if (endpoint == null) {
- String remainingUrl = uri.substring("mina:".length());
+ String remainingUrl = uri.substring("cxf:".length());
URI u = new URI(remainingUrl);
String protocol = u.getScheme();
@@ -54,4 +62,21 @@
return endpoint;
}
+ /*
+ protected void foo() {
+ Bus bus = CXFBusFactory.getDefaultBus();
+ ServerRegistry serverRegistry = bus.getExtension(ServerRegistry.class);
+ List<Server> servers = serverRegistry.getServers();
+
+ Server targetServer = null;
+ for (Server server : servers) {
+ targetServer = server;
+ EndpointInfo info = server.getEndpoint().getEndpointInfo();
+ String address = info.getAddress();
+
+ Message message = new MessageImpl();
+ server.getMessageObserver().onMessage(message);
+ }
+ }
+ */
}
Modified: activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java (original)
+++ activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java Tue Mar 27 02:30:52 2007
@@ -19,7 +19,11 @@
import org.apache.camel.CamelContext;
import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.Consumer;
import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.impl.DefaultConsumer;
/**
* The endpoint in the service engine
@@ -33,8 +37,16 @@
super(uri, camelContext);
}
- public void onExchange(CxfExchange cxfExchange) {
- // TODO send into CXF
+ public Producer<CxfExchange> createProducer() throws Exception {
+ return startService(new DefaultProducer<CxfExchange>(this) {
+ public void onExchange(CxfExchange exchange) {
+ // TODO send into CXF
+ }
+ });
+ }
+
+ public Consumer<CxfExchange> createConsumer(Processor<CxfExchange> processor) throws Exception {
+ return startService(new DefaultConsumer<CxfExchange>(this, processor) {});
}
public CxfExchange createExchange() {
Modified: activemq/camel/trunk/camel-http/src/main/java/org/apache/camel/component/http/CamelServlet.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-http/src/main/java/org/apache/camel/component/http/CamelServlet.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-http/src/main/java/org/apache/camel/component/http/CamelServlet.java (original)
+++ activemq/camel/trunk/camel-http/src/main/java/org/apache/camel/component/http/CamelServlet.java Tue Mar 27 02:30:52 2007
@@ -17,6 +17,9 @@
*/
package org.apache.camel.component.http;
+import org.apache.camel.Producer;
+import org.apache.camel.util.ProducerCache;
+
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@@ -28,6 +31,7 @@
*/
public class CamelServlet extends HttpServlet {
private HttpEndpoint endpoint;
+ private ProducerCache<HttpExchange> producerCache = new ProducerCache<HttpExchange>();
public CamelServlet() {
}
@@ -44,7 +48,7 @@
}
HttpExchange exchange = endpoint.createExchange(request, response);
- endpoint.onExchange(exchange);
+ producerCache.send(endpoint, exchange);
// HC: The getBinding() interesting because it illustrates the impedance miss-match between
// HTTP's stream oriented protocol, and Camels more message oriented protocol exchanges.
Modified: activemq/camel/trunk/camel-http/src/main/java/org/apache/camel/component/http/HttpEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-http/src/main/java/org/apache/camel/component/http/HttpEndpoint.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-http/src/main/java/org/apache/camel/component/http/HttpEndpoint.java (original)
+++ activemq/camel/trunk/camel-http/src/main/java/org/apache/camel/component/http/HttpEndpoint.java Tue Mar 27 02:30:52 2007
@@ -18,8 +18,12 @@
package org.apache.camel.component.http;
import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.impl.DefaultConsumer;
import org.apache.camel.Processor;
import org.apache.camel.CamelContext;
+import org.apache.camel.Producer;
+import org.apache.camel.Consumer;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@@ -37,16 +41,17 @@
super(uri, camelContext);
}
- public void onExchange(HttpExchange exchange) {
- Processor<HttpExchange> processor = getInboundProcessor();
- if (processor != null) {
- // lets route straight to our processor
- processor.onExchange(exchange);
- }
- else {
- // we need an external HTTP client such as commons-httpclient
- // TODO
- }
+ public Producer<HttpExchange> createProducer() throws Exception {
+ return startService(new DefaultProducer<HttpExchange>(this) {
+ public void onExchange(HttpExchange exchange) {
+ /** TODO */
+ }
+ });
+ }
+
+ public Consumer<HttpExchange> createConsumer(Processor<HttpExchange> processor) throws Exception {
+ // TODO
+ return startService(new DefaultConsumer<HttpExchange>(this, processor) {});
}
public HttpExchange createExchange() {
Modified: activemq/camel/trunk/camel-jbi/src/main/java/org/apache/camel/component/jbi/CamelJbiComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jbi/src/main/java/org/apache/camel/component/jbi/CamelJbiComponent.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-jbi/src/main/java/org/apache/camel/component/jbi/CamelJbiComponent.java (original)
+++ activemq/camel/trunk/camel-jbi/src/main/java/org/apache/camel/component/jbi/CamelJbiComponent.java Tue Mar 27 02:30:52 2007
@@ -16,6 +16,9 @@
import org.apache.camel.Component;
import org.apache.camel.Endpoint;
import org.apache.camel.EndpointResolver;
+import org.apache.camel.Processor;
+import org.apache.camel.Exchange;
+import org.apache.camel.FailedToCreateProducerException;
import org.apache.servicemix.common.DefaultComponent;
import org.apache.servicemix.common.ServiceUnit;
import org.apache.servicemix.jbi.util.IntrospectionSupport;
@@ -91,16 +94,19 @@
return endpoint;
}
- /**
- * A factory method for creating endpoints from a service endpoint
- * which is public so that it can be easily unit tested
- */
public CamelJbiEndpoint createEndpoint(ServiceEndpoint ep) throws URISyntaxException {
URI uri = new URI(ep.getEndpointName());
Map map = URISupport.parseQuery(uri.getQuery());
String camelUri = uri.getSchemeSpecificPart();
Endpoint camelEndpoint = getCamelContext().resolveEndpoint(camelUri);
- CamelJbiEndpoint endpoint = new CamelJbiEndpoint(getServiceUnit(), camelEndpoint, getBinding());
+ Processor<Exchange> processor = null;
+ try {
+ processor = camelEndpoint.createProducer();
+ }
+ catch (Exception e) {
+ throw new FailedToCreateProducerException(camelEndpoint, e);
+ }
+ CamelJbiEndpoint endpoint = new CamelJbiEndpoint(getServiceUnit(), camelEndpoint, getBinding(), processor);
IntrospectionSupport.setProperties(endpoint, map);
@@ -136,8 +142,8 @@
/**
* Returns a JBI endpoint created for the given Camel endpoint
*/
- public CamelJbiEndpoint activateJbiEndpoint(JbiEndpoint camelEndpoint) throws Exception {
- CamelJbiEndpoint jbiEndpoint = null;
+ public CamelJbiEndpoint activateJbiEndpoint(JbiEndpoint camelEndpoint, Processor<Exchange> processor) throws Exception {
+ CamelJbiEndpoint jbiEndpoint;
String endpointUri = camelEndpoint.getEndpointUri();
if (endpointUri.startsWith("endpoint:")) {
// lets decode "service:serviceNamespace:serviceName:endpointName
@@ -151,10 +157,10 @@
}
QName service = new QName(parts[0], parts[1]);
String endpoint = parts[2];
- jbiEndpoint = new CamelJbiEndpoint(getServiceUnit(), service, endpoint, camelEndpoint, getBinding());
+ jbiEndpoint = new CamelJbiEndpoint(getServiceUnit(), service, endpoint, camelEndpoint, getBinding(), processor);
}
else {
- jbiEndpoint = new CamelJbiEndpoint(getServiceUnit(), camelEndpoint, getBinding());
+ jbiEndpoint = new CamelJbiEndpoint(getServiceUnit(), camelEndpoint, getBinding(), processor);
}
// the following method will activate the new dynamic JBI endpoint
Modified: activemq/camel/trunk/camel-jbi/src/main/java/org/apache/camel/component/jbi/CamelJbiEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jbi/src/main/java/org/apache/camel/component/jbi/CamelJbiEndpoint.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-jbi/src/main/java/org/apache/camel/component/jbi/CamelJbiEndpoint.java (original)
+++ activemq/camel/trunk/camel-jbi/src/main/java/org/apache/camel/component/jbi/CamelJbiEndpoint.java Tue Mar 27 02:30:52 2007
@@ -13,6 +13,9 @@
package org.apache.camel.component.jbi;
import org.apache.camel.Endpoint;
+import org.apache.camel.Processor;
+import org.apache.camel.Exchange;
+import org.apache.camel.util.ProducerCache;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.servicemix.common.ServiceUnit;
@@ -32,24 +35,25 @@
private static final QName SERVICE_NAME = new QName("http://camel.apache.org/service", "CamelEndpointComponent");
private Endpoint camelEndpoint;
private JbiBinding binding;
+ private Processor<Exchange> processor;
- public CamelJbiEndpoint(ServiceUnit serviceUnit, QName service, String endpoint, Endpoint camelEndpoint, JbiBinding binding) {
+ public CamelJbiEndpoint(ServiceUnit serviceUnit, QName service, String endpoint, Endpoint camelEndpoint, JbiBinding binding, Processor<Exchange> processor) {
super(serviceUnit, service, endpoint);
+ this.processor = processor;
this.camelEndpoint = camelEndpoint;
this.binding = binding;
}
- public CamelJbiEndpoint(ServiceUnit serviceUnit, Endpoint camelEndpoint, JbiBinding binding) {
- this(serviceUnit, SERVICE_NAME, camelEndpoint.getEndpointUri(), camelEndpoint, binding);
+ public CamelJbiEndpoint(ServiceUnit serviceUnit, Endpoint camelEndpoint, JbiBinding binding, Processor<Exchange> processor) {
+ this(serviceUnit, SERVICE_NAME, camelEndpoint.getEndpointUri(), camelEndpoint, binding, processor);
}
protected void processInOnly(MessageExchange exchange, NormalizedMessage in) throws Exception {
if (log.isDebugEnabled()) {
log.debug("Received exchange: " + exchange);
}
- // lets use the inbound processor to handle the exchange
JbiExchange camelExchange = new JbiExchange(camelEndpoint.getContext(), binding, exchange);
- camelEndpoint.onExchange(camelExchange);
+ processor.onExchange(camelExchange);
}
protected void processInOut(MessageExchange exchange, NormalizedMessage in, NormalizedMessage out) throws Exception {
Modified: activemq/camel/trunk/camel-jbi/src/main/java/org/apache/camel/component/jbi/JbiEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jbi/src/main/java/org/apache/camel/component/jbi/JbiEndpoint.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-jbi/src/main/java/org/apache/camel/component/jbi/JbiEndpoint.java (original)
+++ activemq/camel/trunk/camel-jbi/src/main/java/org/apache/camel/component/jbi/JbiEndpoint.java Tue Mar 27 02:30:52 2007
@@ -20,7 +20,11 @@
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.Consumer;
import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.impl.DefaultConsumer;
/**
* Represents an {@link Endpoint} for interacting with JBI
@@ -37,22 +41,50 @@
toJbiProcessor = new ToJbiProcessor(jbiComponent.getBinding(), jbiComponent.getComponentContext(), uri);
}
- /**
- * Sends a message into JBI
- */
+ public Producer<Exchange> createProducer() throws Exception {
+ return new DefaultProducer<Exchange>(this) {
+ public void onExchange(Exchange exchange) {
+ toJbiProcessor.onExchange(exchange);
+ }
+ };
+ }
+
+ public Consumer<Exchange> createConsumer(final Processor<Exchange> processor) throws Exception {
+ return new DefaultConsumer<Exchange>(this, processor) {
+ CamelJbiEndpoint jbiEndpoint;
+
+ @Override
+ protected void doStart() throws Exception {
+ super.doStart();
+ jbiEndpoint = jbiComponent.activateJbiEndpoint(JbiEndpoint.this, processor);
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+/*
+ if (jbiEndpoint != null) {
+ jbiEndpoint.deactivate();
+ }
+*/
+ super.doStop();
+ }
+ };
+ }
+
+ /*
public void onExchange(Exchange exchange) {
if (getInboundProcessor() != null) {
getInboundProcessor().onExchange(exchange);
} else {
toJbiProcessor.onExchange(exchange); }
}
+ */
@Override
protected void doActivate() throws Exception {
super.doActivate();
// lets create and activate the endpoint in JBI
- jbiComponent.activateJbiEndpoint(this);
}
public JbiExchange createExchange() {
Modified: activemq/camel/trunk/camel-jbi/src/main/java/org/apache/camel/component/jbi/ToJbiProcessor.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jbi/src/main/java/org/apache/camel/component/jbi/ToJbiProcessor.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-jbi/src/main/java/org/apache/camel/component/jbi/ToJbiProcessor.java (original)
+++ activemq/camel/trunk/camel-jbi/src/main/java/org/apache/camel/component/jbi/ToJbiProcessor.java Tue Mar 27 02:30:52 2007
@@ -48,7 +48,11 @@
DeliveryChannel deliveryChannel = componentContext.getDeliveryChannel();
MessageExchangeFactory exchangeFactory = deliveryChannel.createExchangeFactory();
MessageExchange messageExchange = binding.makeJbiMessageExchange(exchange, exchangeFactory);
+ System.out.println("#### Configuring exchange with: " + destinationUri);
URIResolver.configureExchange(messageExchange, componentContext, destinationUri);
+
+ System.out.println("#### service: " + messageExchange.getService() + " endpoint: " + messageExchange.getEndpoint());
+
deliveryChannel.sendSync(messageExchange);
}
catch (MessagingException e) {
Modified: activemq/camel/trunk/camel-jbi/src/test/java/org/apache/camel/component/jbi/JbiTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jbi/src/test/java/org/apache/camel/component/jbi/JbiTestSupport.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-jbi/src/test/java/org/apache/camel/component/jbi/JbiTestSupport.java (original)
+++ activemq/camel/trunk/camel-jbi/src/test/java/org/apache/camel/component/jbi/JbiTestSupport.java Tue Mar 27 02:30:52 2007
@@ -22,6 +22,8 @@
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.TestSupport;
+import org.apache.camel.Processor;
+import org.apache.camel.util.ProducerCache;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.servicemix.jbi.container.ActivationSpec;
@@ -43,17 +45,19 @@
protected CountDownLatch latch = new CountDownLatch(1);
protected Endpoint<Exchange> endpoint;
protected String startEndpointUri = "jbi:endpoint:serviceNamespace:serviceA:endpointA";
+ protected ProducerCache<Exchange> client = new ProducerCache<Exchange>();
/**
* Sends an exchange to the endpoint
*/
- protected void sendExchange(Object expectedBody) {
- // now lets fire in a message
- Exchange exchange = endpoint.createExchange();
- Message in = exchange.getIn();
- in.setBody(expectedBody);
- in.setHeader("cheese", 123);
- endpoint.onExchange(exchange);
+ protected void sendExchange(final Object expectedBody) {
+ client.send(endpoint, new Processor<Exchange>() {
+ public void onExchange(Exchange exchange) {
+ Message in = exchange.getIn();
+ in.setBody(expectedBody);
+ in.setHeader("cheese", 123);
+ }
+ });
}
protected Object assertReceivedValidExchange(Class type) throws Exception {
@@ -96,11 +100,18 @@
camelContext.addComponent("jbi", component);
// lets add some routes
- camelContext.setRoutes(createRoutes());
+ camelContext.addRoutes(createRoutes());
endpoint = camelContext.resolveEndpoint(startEndpointUri);
assertNotNull("No endpoint found!", endpoint);
camelContext.activateEndpoints();
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ client.stop();
+ camelContext.deactivateEndpoints();
+ super.tearDown();
}
protected abstract void appendJbiActivationSpecs(List<ActivationSpec> activationSpecList);
Added: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java?view=auto&rev=522838
==============================================================================
--- activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java (added)
+++ activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java Tue Mar 27 02:30:52 2007
@@ -0,0 +1,74 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import javax.jms.Message;
+import javax.jms.MessageListener;
+
+/**
+ * A JMS {@link MessageListener} which can be used to delegate processing to a Camel endpoint.
+ *
+ * @version $Revision$
+ */
+public class EndpointMessageListener<E extends Exchange> implements MessageListener {
+ private static final transient Log log = LogFactory.getLog(EndpointMessageListener.class);
+ private Endpoint<E> endpoint;
+ private Processor<E> processor;
+ private JmsBinding binding;
+
+ public EndpointMessageListener(Endpoint<E> endpoint, Processor<E> processor) {
+ this.endpoint = endpoint;
+ this.processor = processor;
+ }
+
+ public void onMessage(Message message) {
+ if (log.isDebugEnabled()) {
+ log.debug(endpoint + " receiving JMS message: " + message);
+ }
+ JmsExchange exchange = createExchange(message);
+ processor.onExchange((E) exchange);
+ }
+
+ public JmsExchange createExchange(Message message) {
+ return new JmsExchange(endpoint.getContext(), getBinding(), message);
+ }
+
+ // Properties
+ //-------------------------------------------------------------------------
+ public JmsBinding getBinding() {
+ if (binding == null) {
+ binding = new JmsBinding();
+ }
+ return binding;
+ }
+
+ /**
+ * Sets the binding used to convert from a Camel message to and from a JMS message
+ *
+ * @param binding the binding to use
+ */
+ public void setBinding(JmsBinding binding) {
+ this.binding = binding;
+ }
+}
Propchange: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Propchange: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java (original)
+++ activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java Tue Mar 27 02:30:52 2007
@@ -111,17 +111,7 @@
});
*/
- AbstractMessageListenerContainer listenerContainer = createMessageListenerContainer(template);
- listenerContainer.setDestinationName(subject);
- listenerContainer.setPubSubDomain(template.isPubSubDomain());
- listenerContainer.setConnectionFactory(template.getConnectionFactory());
-
- // TODO support optional parameters
- // selector
- // messageConverter
- // durableSubscriberName
-
- return new JmsEndpoint(uri, getContext(), subject, template, listenerContainer);
+ return new JmsEndpoint(uri, getContext(), subject, template);
}
public JmsTemplate getTemplate() {
@@ -130,13 +120,6 @@
public void setTemplate(JmsTemplate template) {
this.template = template;
- }
-
- protected AbstractMessageListenerContainer createMessageListenerContainer(JmsTemplate template) {
- // TODO use an enum to auto-switch container types?
-
- //return new SimpleMessageListenerContainer();
- return new DefaultMessageListenerContainer();
}
/**
Added: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java?view=auto&rev=522838
==============================================================================
--- activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java (added)
+++ activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java Tue Mar 27 02:30:52 2007
@@ -0,0 +1,68 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms;
+
+import org.apache.camel.Processor;
+import org.apache.camel.Consumer;
+import org.apache.camel.impl.DefaultConsumer;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.springframework.jms.listener.AbstractMessageListenerContainer;
+
+import javax.jms.Message;
+import javax.jms.MessageListener;
+
+/**
+ * A {@link Consumer} which uses Spring's {@link AbstractMessageListenerContainer} implementations to consume JMS messages
+ *
+ * @version $Revision$
+ */
+public class JmsConsumer extends DefaultConsumer<JmsExchange> {
+ private final AbstractMessageListenerContainer listenerContainer;
+
+ public JmsConsumer(JmsEndpoint endpoint, Processor<JmsExchange> processor, AbstractMessageListenerContainer listenerContainer) {
+ super(endpoint, processor);
+ this.listenerContainer = listenerContainer;
+
+ MessageListener messageListener = createMessageListener(endpoint, processor);
+ this.listenerContainer.setMessageListener(messageListener);
+ }
+
+ protected MessageListener createMessageListener(JmsEndpoint endpoint, Processor<JmsExchange> processor) {
+ EndpointMessageListener<JmsExchange> messageListener = new EndpointMessageListener<JmsExchange>(endpoint, processor);
+ messageListener.setBinding(endpoint.getBinding());
+ return messageListener;
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ super.doStart();
+ listenerContainer.afterPropertiesSet();
+ listenerContainer.initialize();
+ listenerContainer.start();
+ }
+
+
+ @Override
+ protected void doStop() throws Exception {
+ listenerContainer.stop();
+ listenerContainer.destroy();
+ super.doStop();
+ }
+
+}
Propchange: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Propchange: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java (original)
+++ activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java Tue Mar 27 02:30:52 2007
@@ -18,12 +18,18 @@
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
+import org.apache.camel.Producer;
+import org.apache.camel.Consumer;
+import org.apache.camel.Processor;
import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.impl.DefaultProducer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.jms.core.JmsOperations;
import org.springframework.jms.core.MessageCreator;
+import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.listener.AbstractMessageListenerContainer;
+import org.springframework.jms.listener.DefaultMessageListenerContainer;
import javax.jms.JMSException;
import javax.jms.Message;
@@ -33,45 +39,35 @@
/**
* @version $Revision:520964 $
*/
-public class JmsEndpoint extends DefaultEndpoint<JmsExchange> implements MessageListener {
+public class JmsEndpoint extends DefaultEndpoint<JmsExchange> {
private static final Log log = LogFactory.getLog(JmsEndpoint.class);
private JmsBinding binding;
- private JmsOperations template;
- private AbstractMessageListenerContainer listenerContainer;
+ private JmsTemplate template;
private String destination;
- public JmsEndpoint(String endpointUri, CamelContext container, String destination, JmsOperations template, AbstractMessageListenerContainer listenerContainer) {
+ public JmsEndpoint(String endpointUri, CamelContext container, String destination, JmsTemplate template) {
super(endpointUri, container);
this.destination = destination;
this.template = template;
- this.listenerContainer = listenerContainer;
- this.listenerContainer.setMessageListener(this);
}
- public void onMessage(Message message) {
- if (log.isDebugEnabled()) {
- log.debug(JmsEndpoint.this + " receiving JMS message: " + message);
- }
- JmsExchange exchange = createExchange(message);
- getInboundProcessor().onExchange(exchange);
+ public Producer<JmsExchange> createProducer() throws Exception {
+ return startService(new JmsProducer(this, template));
}
- public void onExchange(Exchange exchange) {
- // lets convert to the type of an exchange
- JmsExchange jmsExchange = convertTo(JmsExchange.class, exchange);
- onExchange(jmsExchange);
- }
-
- public void onExchange(final JmsExchange exchange) {
- template.send(destination, new MessageCreator() {
- public Message createMessage(Session session) throws JMSException {
- Message message = getBinding().makeJmsMessage(exchange.getIn(), session);
- if (log.isDebugEnabled()) {
- log.debug(JmsEndpoint.this + " sending JMS message: " + message);
- }
- return message;
- }
- });
+ public Consumer<JmsExchange> createConsumer(Processor<JmsExchange> processor) throws Exception {
+ AbstractMessageListenerContainer listenerContainer = createMessageListenerContainer(template);
+ listenerContainer.setDestinationName(destination);
+ listenerContainer.setPubSubDomain(template.isPubSubDomain());
+ listenerContainer.setConnectionFactory(template.getConnectionFactory());
+
+ // TODO support optional parameters
+ // selector
+ // messageConverter
+ // durableSubscriberName
+
+
+ return startService(new JmsConsumer(this, processor, listenerContainer));
}
public JmsExchange createExchange() {
@@ -104,18 +100,18 @@
return template;
}
+ public String getDestination() {
+ return destination;
+ }
+
// Implementation methods
//-------------------------------------------------------------------------
- protected void doActivate() throws Exception {
- super.doActivate();
- listenerContainer.afterPropertiesSet();
- listenerContainer.initialize();
- listenerContainer.start();
- }
- protected void doDeactivate() {
- listenerContainer.stop();
- listenerContainer.destroy();
- super.doDeactivate();
+ protected AbstractMessageListenerContainer createMessageListenerContainer(JmsTemplate template) {
+ // TODO use an enum to auto-switch container types?
+
+ //return new SimpleMessageListenerContainer();
+ return new DefaultMessageListenerContainer();
}
+
}
Added: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java?view=auto&rev=522838
==============================================================================
--- activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java (added)
+++ activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java Tue Mar 27 02:30:52 2007
@@ -0,0 +1,63 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms;
+
+import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.Exchange;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.springframework.jms.core.MessageCreator;
+import org.springframework.jms.core.JmsOperations;
+
+import javax.jms.Message;
+import javax.jms.Session;
+import javax.jms.JMSException;
+
+/**
+ * @version $Revision$
+ */
+public class JmsProducer extends DefaultProducer<JmsExchange> {
+ private static final transient Log log = LogFactory.getLog(JmsProducer.class);
+
+ private final JmsEndpoint endpoint;
+ private final JmsOperations template;
+
+ public JmsProducer(JmsEndpoint endpoint, JmsOperations template) {
+ super(endpoint);
+ this.endpoint = endpoint;
+ this.template = template;
+ }
+
+ public void onExchange(Exchange exchange) {
+ // lets convert to the type of an exchange
+ JmsExchange jmsExchange = endpoint.convertTo(JmsExchange.class, exchange);
+ onExchange(jmsExchange);
+ }
+
+ public void onExchange(final JmsExchange exchange) {
+ template.send(endpoint.getDestination(), new MessageCreator() {
+ public Message createMessage(Session session) throws JMSException {
+ Message message = endpoint.getBinding().makeJmsMessage(exchange.getIn(), session);
+ if (log.isDebugEnabled()) {
+ log.debug(endpoint + " sending JMS message: " + message);
+ }
+ return message;
+ }
+ });
+ }
+}
Propchange: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Propchange: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRouteTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRouteTest.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRouteTest.java (original)
+++ activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRouteTest.java Tue Mar 27 02:30:52 2007
@@ -22,6 +22,8 @@
import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.util.ProducerCache;
import org.apache.camel.builder.RouteBuilder;
import static org.apache.camel.component.jms.JmsComponent.jmsComponentClientAcknowledge;
import org.apache.camel.impl.DefaultCamelContext;
@@ -44,6 +46,7 @@
protected CamelContext container = new DefaultCamelContext();
protected CountDownLatch latch = new CountDownLatch(1);
protected Endpoint<JmsExchange> endpoint;
+ protected ProducerCache<JmsExchange> client = new ProducerCache<JmsExchange>();
public void testJmsRouteWithTextMessage() throws Exception {
String expectedBody = "Hello there!";
@@ -62,13 +65,15 @@
assertEquals("body", expectedBody, body);
}
- protected void sendExchange(Object expectedBody) {
- // now lets fire in a message
- JmsExchange exchange = endpoint.createExchange();
- JmsMessage in = exchange.getIn();
- in.setBody(expectedBody);
- in.setHeader("cheese", 123);
- endpoint.onExchange(exchange);
+ protected void sendExchange(final Object expectedBody) {
+ client.send(endpoint, new Processor<JmsExchange>() {
+ public void onExchange(JmsExchange exchange) {
+ // now lets fire in a message
+ JmsMessage in = exchange.getIn();
+ in.setBody(expectedBody);
+ in.setHeader("cheese", 123);
+ }
+ });
}
protected Object assertReceivedValidExchange(Class type) throws Exception {
@@ -96,7 +101,7 @@
container.addComponent("activemq", jmsComponentClientAcknowledge(connectionFactory));
// lets add some routes
- container.setRoutes(new RouteBuilder() {
+ container.addRoutes(new RouteBuilder() {
public void configure() {
from("jms:activemq:test.a").to("jms:activemq:test.b");
from("jms:activemq:test.b").process(new Processor<JmsExchange>() {
@@ -116,6 +121,7 @@
@Override
protected void tearDown() throws Exception {
+ client.stop();
container.deactivateEndpoints();
}
}
Added: activemq/camel/trunk/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java?view=auto&rev=522838
==============================================================================
--- activemq/camel/trunk/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java (added)
+++ activemq/camel/trunk/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java Tue Mar 27 02:30:52 2007
@@ -0,0 +1,72 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mina;
+
+import org.apache.camel.Processor;
+import org.apache.camel.impl.DefaultConsumer;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.mina.common.IoAcceptor;
+import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.IoHandlerAdapter;
+import org.apache.mina.common.IoSession;
+
+import java.net.SocketAddress;
+
+/**
+ * A @{link Consumer} for MINA
+ *
+ * @version $Revision$
+ */
+public class MinaConsumer extends DefaultConsumer<MinaExchange> {
+ private static final transient Log log = LogFactory.getLog(MinaConsumer.class);
+
+ private final MinaEndpoint endpoint;
+ private final SocketAddress address;
+ private final IoAcceptor acceptor;
+
+ public MinaConsumer(final MinaEndpoint endpoint, Processor<MinaExchange> processor) {
+ super(endpoint, processor);
+ this.endpoint = endpoint;
+ address = endpoint.getAddress();
+ acceptor = endpoint.getAcceptor();
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+
+ if (log.isDebugEnabled()) {
+ log.debug("Binding to server address: " + address + " using acceptor: " + acceptor);
+ }
+
+ IoHandler handler = new IoHandlerAdapter() {
+ @Override
+ public void messageReceived(IoSession session, Object object) throws Exception {
+ getProcessor().onExchange(endpoint.createExchange(session, object));
+ }
+ };
+
+ acceptor.bind(address, handler);
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ acceptor.unbind(address);
+ super.doStop();
+ }
+}
Propchange: activemq/camel/trunk/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/camel/trunk/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Propchange: activemq/camel/trunk/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: activemq/camel/trunk/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java (original)
+++ activemq/camel/trunk/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java Tue Mar 27 02:30:52 2007
@@ -18,20 +18,18 @@
package org.apache.camel.component.mina;
import org.apache.camel.CamelContext;
+import org.apache.camel.Producer;
+import org.apache.camel.Consumer;
+import org.apache.camel.Processor;
+import org.apache.camel.Service;
import org.apache.camel.impl.DefaultEndpoint;
import org.apache.mina.common.IoAcceptor;
-import org.apache.mina.common.IoHandler;
-import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoSession;
-import org.apache.mina.common.ConnectFuture;
import org.apache.mina.common.IoConnector;
-import org.apache.mina.common.support.BaseIoConnector;
-import org.apache.mina.transport.vmpipe.VmPipeConnector;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.net.SocketAddress;
-import java.io.IOException;
/**
* @version $Revision$
@@ -39,9 +37,6 @@
public class MinaEndpoint extends DefaultEndpoint<MinaExchange> {
private static final transient Log log = LogFactory.getLog(MinaEndpoint.class);
- private IoSession session;
- private IoHandler serverHandler;
- private IoHandler clientHandler;
private final IoAcceptor acceptor;
private final SocketAddress address;
private final IoConnector connector;
@@ -54,12 +49,12 @@
this.connector = connector;
}
- public void onExchange(MinaExchange exchange) {
- Object body = exchange.getIn().getBody();
- if (body == null) {
- System.out.println("#### No payload for exchange: " + exchange);
- }
- getSession().write(body);
+ public Producer<MinaExchange> createProducer() throws Exception {
+ return startService(new MinaProducer(this));
+ }
+
+ public Consumer<MinaExchange> createConsumer(Processor<MinaExchange> processor) throws Exception {
+ return startService(new MinaConsumer(this, processor));
}
public MinaExchange createExchange() {
@@ -73,27 +68,18 @@
return exchange;
}
- public IoHandler getServerHandler() {
- if (serverHandler == null) {
- serverHandler = createServerHandler();
- }
- return serverHandler;
- }
-
- public IoHandler getClientHandler() {
- if (clientHandler == null) {
- clientHandler = createClientHandler();
- }
- return clientHandler;
+ // Properties
+ //-------------------------------------------------------------------------
+ public IoAcceptor getAcceptor() {
+ return acceptor;
}
- public IoSession getSession() {
- // TODO lazy create if no inbound processor attached?
- return session;
+ public SocketAddress getAddress() {
+ return address;
}
- public void setSession(IoSession session) {
- this.session = session;
+ public IoConnector getConnector() {
+ return connector;
}
// Implementation methods
@@ -102,56 +88,10 @@
@Override
protected void doActivate() throws Exception {
super.doActivate();
-
- if (getInboundProcessor() != null) {
- // lets initiate the server
-
- if (log.isDebugEnabled()) {
- log.debug("Binding to server address: " + address + " using acceptor: " + acceptor);
- }
-
- acceptor.bind(address, getServerHandler());
- }
- setSession(createSession());
- }
-
- /**
- * Initiates the client connection for outbound communication
- */
- protected IoSession createSession() {
- if (log.isDebugEnabled()) {
- log.debug("Creating connector to address: " + address + " using connector: " + connector);
- }
- ConnectFuture future = connector.connect(address, getClientHandler());
- future.join();
- return future.getSession();
}
-
@Override
protected void doDeactivate() {
acceptor.unbindAll();
- }
-
- protected IoHandler createClientHandler() {
- return new IoHandlerAdapter() {
- @Override
- public void messageReceived(IoSession ioSession, Object object) throws Exception {
- super.messageReceived(ioSession, object); /** TODO */
- }
- };
- }
-
- protected IoHandler createServerHandler() {
- return new IoHandlerAdapter() {
- @Override
- public void messageReceived(IoSession session, Object object) throws Exception {
- processInboundMessage(session, object);
- }
- };
- }
-
- private void processInboundMessage(IoSession session, Object object) {
- getInboundProcessor().onExchange(createExchange(session, object));
}
}
Added: activemq/camel/trunk/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java?view=auto&rev=522838
==============================================================================
--- activemq/camel/trunk/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java (added)
+++ activemq/camel/trunk/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java Tue Mar 27 02:30:52 2007
@@ -0,0 +1,84 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mina;
+
+import org.apache.camel.Producer;
+import org.apache.camel.impl.DefaultProducer;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.mina.common.ConnectFuture;
+import org.apache.mina.common.IoConnector;
+import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.IoHandlerAdapter;
+import org.apache.mina.common.IoSession;
+
+import java.net.SocketAddress;
+
+/**
+ * A {@link Producer} implementation for MINA
+ *
+ * @version $Revision$
+ */
+public class MinaProducer extends DefaultProducer<MinaExchange> {
+ private static final transient Log log = LogFactory.getLog(MinaProducer.class);
+ private IoSession session;
+ private MinaEndpoint endpoint;
+
+ public MinaProducer(MinaEndpoint endpoint) {
+ super(endpoint);
+ this.endpoint = endpoint;
+ }
+
+ public void onExchange(MinaExchange exchange) {
+ if (session == null) {
+ throw new IllegalStateException("Not started yet!");
+ }
+ Object body = exchange.getIn().getBody();
+ if (body == null) {
+ log.warn("No payload for exchange: " + exchange);
+ }
+ else {
+ session.write(body);
+ }
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ SocketAddress address = endpoint.getAddress();
+ IoConnector connector = endpoint.getConnector();
+ if (log.isDebugEnabled()) {
+ log.debug("Creating connector to address: " + address + " using connector: " + connector);
+ }
+ IoHandler ioHandler = new IoHandlerAdapter() {
+ @Override
+ public void messageReceived(IoSession ioSession, Object object) throws Exception {
+ super.messageReceived(ioSession, object); /** TODO */
+ }
+ };
+ ConnectFuture future = connector.connect(address, ioHandler);
+ future.join();
+ session = future.getSession();
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ if (session != null) {
+ session.close().join(2000);
+ }
+ }
+}
Propchange: activemq/camel/trunk/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/camel/trunk/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Propchange: activemq/camel/trunk/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: activemq/camel/trunk/camel-mina/src/test/java/org/apache/camel/component/mina/MinaVmTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-mina/src/test/java/org/apache/camel/component/mina/MinaVmTest.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-mina/src/test/java/org/apache/camel/component/mina/MinaVmTest.java (original)
+++ activemq/camel/trunk/camel-mina/src/test/java/org/apache/camel/component/mina/MinaVmTest.java Tue Mar 27 02:30:52 2007
@@ -22,6 +22,7 @@
import org.apache.camel.Endpoint;
import org.apache.camel.Message;
import org.apache.camel.Processor;
+import org.apache.camel.Producer;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
@@ -36,6 +37,7 @@
protected CountDownLatch latch = new CountDownLatch(1);
protected MinaExchange receivedExchange;
protected String uri = "mina:vm://localhost:8080";
+ protected Producer<MinaExchange> producer;
public void testMinaRoute() throws Exception {
@@ -46,7 +48,8 @@
message.setBody("Hello there!");
message.setHeader("cheese", 123);
- endpoint.onExchange(exchange);
+ producer = endpoint.createProducer();
+ producer.onExchange(exchange);
// now lets sleep for a while
boolean received = latch.await(5, TimeUnit.SECONDS);
@@ -55,13 +58,16 @@
@Override
protected void setUp() throws Exception {
- container.setRoutes(createRouteBuilder());
+ container.addRoutes(createRouteBuilder());
container.activateEndpoints();
}
@Override
protected void tearDown() throws Exception {
+ if (producer != null) {
+ producer.stop();
+ }
container.deactivateEndpoints();
}
Modified: activemq/camel/trunk/camel-spring/src/main/java/org/apache/camel/spring/CamelContextFactoryBean.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-spring/src/main/java/org/apache/camel/spring/CamelContextFactoryBean.java?view=diff&rev=522838&r1=522837&r2=522838
==============================================================================
--- activemq/camel/trunk/camel-spring/src/main/java/org/apache/camel/spring/CamelContextFactoryBean.java (original)
+++ activemq/camel/trunk/camel-spring/src/main/java/org/apache/camel/spring/CamelContextFactoryBean.java Tue Mar 27 02:30:52 2007
@@ -115,7 +115,7 @@
/**
* Strategy to install all available routes into the context
*/
- protected void installRoutes() {
+ protected void installRoutes() throws Exception {
for (RouteBuilder routeBuilder : additionalBuilders) {
getContext().addRoutes(routeBuilder);
}