You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by cs...@apache.org on 2011/06/15 10:33:58 UTC
svn commit: r1135955 - in
/camel/trunk/components/camel-cxf-transport/src/main/java/org/apache/camel/component/cxf/transport:
CamelConduit.java CamelOutputStream.java
Author: cschneider
Date: Wed Jun 15 08:33:57 2011
New Revision: 1135955
URL: http://svn.apache.org/viewvc?rev=1135955&view=rev
Log:
Decoupling output stream from conduit
Added:
camel/trunk/components/camel-cxf-transport/src/main/java/org/apache/camel/component/cxf/transport/CamelOutputStream.java (with props)
Modified:
camel/trunk/components/camel-cxf-transport/src/main/java/org/apache/camel/component/cxf/transport/CamelConduit.java
Modified: camel/trunk/components/camel-cxf-transport/src/main/java/org/apache/camel/component/cxf/transport/CamelConduit.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf-transport/src/main/java/org/apache/camel/component/cxf/transport/CamelConduit.java?rev=1135955&r1=1135954&r2=1135955&view=diff
==============================================================================
--- camel/trunk/components/camel-cxf-transport/src/main/java/org/apache/camel/component/cxf/transport/CamelConduit.java (original)
+++ camel/trunk/components/camel-cxf-transport/src/main/java/org/apache/camel/component/cxf/transport/CamelConduit.java Wed Jun 15 08:33:57 2011
@@ -21,26 +21,18 @@ import java.io.OutputStream;
import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
-import org.apache.camel.Exchange;
-import org.apache.camel.ExchangePattern;
import org.apache.camel.Producer;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.RuntimeCamelException;
-import org.apache.camel.component.cxf.common.header.CxfHeaderHelper;
-import org.apache.camel.component.cxf.common.message.CxfMessageHelper;
import org.apache.camel.spi.HeaderFilterStrategy;
import org.apache.camel.util.ObjectHelper;
import org.apache.cxf.Bus;
import org.apache.cxf.common.logging.LogUtils;
import org.apache.cxf.configuration.Configurable;
import org.apache.cxf.configuration.Configurer;
-import org.apache.cxf.io.CachedOutputStream;
import org.apache.cxf.message.Message;
import org.apache.cxf.service.model.EndpointInfo;
import org.apache.cxf.transport.AbstractConduit;
-import org.apache.cxf.transport.Conduit;
-import org.apache.cxf.transport.Destination;
-import org.apache.cxf.transport.MessageObserver;
import org.apache.cxf.ws.addressing.EndpointReferenceType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -106,7 +98,12 @@ public class CamelConduit extends Abstra
// prepare the message for send out , not actually send out the message
public void prepare(Message message) throws IOException {
LOG.trace("CamelConduit send message");
- message.setContent(OutputStream.class, new CamelOutputStream(message));
+ CamelOutputStream os = new CamelOutputStream(this.targetCamelEndpointUri,
+ this.producer,
+ this.headerFilterStrategy,
+ this.getMessageObserver(),
+ message);
+ message.setContent(OutputStream.class, os);
}
public void close() {
@@ -153,108 +150,4 @@ public class CamelConduit extends Abstra
camelTemplate = template;
}
- private class CamelOutputStream extends CachedOutputStream {
- private Message outMessage;
- private boolean isOneWay;
-
- public CamelOutputStream(Message m) {
- outMessage = m;
- }
-
- protected void doFlush() throws IOException {
- // do nothing here
- }
-
- protected void doClose() throws IOException {
- isOneWay = outMessage.getExchange().isOneWay();
- commitOutputMessage();
- }
-
- protected void onWrite() throws IOException {
- // do nothing here
- }
-
-
- private void commitOutputMessage() throws IOException {
- ExchangePattern pattern;
- if (isOneWay) {
- pattern = ExchangePattern.InOnly;
- } else {
- pattern = ExchangePattern.InOut;
- }
- LOG.debug("send the message to endpoint {}", targetCamelEndpointUri);
- org.apache.camel.Exchange exchange = producer.createExchange(pattern);
-
- exchange.setProperty(Exchange.TO_ENDPOINT, targetCamelEndpointUri);
- CachedOutputStream outputStream = (CachedOutputStream) outMessage.getContent(OutputStream.class);
- // Send out the request message here, copy the protocolHeader back
- CxfHeaderHelper.propagateCxfToCamel(headerFilterStrategy, outMessage, exchange.getIn().getHeaders(), exchange);
-
- // TODO support different encoding
- exchange.getIn().setBody(outputStream.getInputStream());
- LOG.debug("template sending request: ", exchange.getIn());
- Exception exception;
- try {
- producer.process(exchange);
- } catch (Exception ex) {
- exception = ex;
- }
- // Throw the exception that the template get
- exception = exchange.getException();
- if (exception != null) {
- throw new IOException("Cannot send the request message.", exchange.getException());
- }
- exchange.setProperty(CamelTransportConstants.CXF_EXCHANGE, outMessage.getExchange());
- if (!isOneWay) {
- handleResponse(exchange);
- }
-
- }
-
- private void handleResponse(org.apache.camel.Exchange exchange) throws IOException {
- org.apache.cxf.message.Message inMessage = null;
- try {
- inMessage = CxfMessageHelper.getCxfInMessage(headerFilterStrategy, exchange, true);
- } catch (Exception ex) {
- throw new IOException("Cannot get the response message. ", ex);
- }
- incomingObserver.onMessage(inMessage);
- }
- }
-
- /**
- * Represented decoupled response endpoint.
- */
- // TODO: This class is not used
- @Deprecated
- protected class DecoupledDestination implements Destination {
- protected MessageObserver decoupledMessageObserver;
- private EndpointReferenceType address;
-
- DecoupledDestination(EndpointReferenceType ref, MessageObserver incomingObserver) {
- address = ref;
- decoupledMessageObserver = incomingObserver;
- }
-
- public EndpointReferenceType getAddress() {
- return address;
- }
-
- public Conduit getBackChannel(Message inMessage, Message partialResponse, EndpointReferenceType addr) throws IOException {
- // shouldn't be called on decoupled endpoint
- return null;
- }
-
- public void shutdown() {
- }
-
- public synchronized void setMessageObserver(MessageObserver observer) {
- decoupledMessageObserver = observer;
- }
-
- public synchronized MessageObserver getMessageObserver() {
- return decoupledMessageObserver;
- }
- }
-
}
Added: camel/trunk/components/camel-cxf-transport/src/main/java/org/apache/camel/component/cxf/transport/CamelOutputStream.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf-transport/src/main/java/org/apache/camel/component/cxf/transport/CamelOutputStream.java?rev=1135955&view=auto
==============================================================================
--- camel/trunk/components/camel-cxf-transport/src/main/java/org/apache/camel/component/cxf/transport/CamelOutputStream.java (added)
+++ camel/trunk/components/camel-cxf-transport/src/main/java/org/apache/camel/component/cxf/transport/CamelOutputStream.java Wed Jun 15 08:33:57 2011
@@ -0,0 +1,100 @@
+package org.apache.camel.component.cxf.transport;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Producer;
+import org.apache.camel.component.cxf.common.header.CxfHeaderHelper;
+import org.apache.camel.component.cxf.common.message.CxfMessageHelper;
+import org.apache.camel.spi.HeaderFilterStrategy;
+import org.apache.cxf.io.CachedOutputStream;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.transport.MessageObserver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class CamelOutputStream extends CachedOutputStream {
+ private static final Logger LOG = LoggerFactory.getLogger(CamelOutputStream.class);
+
+ /**
+ *
+ */
+ private Message outMessage;
+ private boolean isOneWay;
+ private String targetCamelEndpointUri;
+ private Producer producer;
+ private HeaderFilterStrategy headerFilterStrategy;
+ private MessageObserver observer;
+
+ public CamelOutputStream(String targetCamelEndpointUri, Producer producer,
+ HeaderFilterStrategy headerFilterStrategy, MessageObserver observer,
+ Message m) {
+ this.targetCamelEndpointUri = targetCamelEndpointUri;
+ this.producer = producer;
+ this.headerFilterStrategy = headerFilterStrategy;
+ this.observer = observer;
+ outMessage = m;
+ }
+
+ protected void doFlush() throws IOException {
+ // do nothing here
+ }
+
+ protected void doClose() throws IOException {
+ isOneWay = outMessage.getExchange().isOneWay();
+ commitOutputMessage();
+ }
+
+ protected void onWrite() throws IOException {
+ // do nothing here
+ }
+
+
+ private void commitOutputMessage() throws IOException {
+ ExchangePattern pattern;
+ if (isOneWay) {
+ pattern = ExchangePattern.InOnly;
+ } else {
+ pattern = ExchangePattern.InOut;
+ }
+ LOG.debug("send the message to endpoint {}", this.targetCamelEndpointUri);
+ org.apache.camel.Exchange exchange = this.producer.createExchange(pattern);
+
+ exchange.setProperty(Exchange.TO_ENDPOINT, this.targetCamelEndpointUri);
+ CachedOutputStream outputStream = (CachedOutputStream) outMessage.getContent(OutputStream.class);
+ // Send out the request message here, copy the protocolHeader back
+ CxfHeaderHelper.propagateCxfToCamel(this.headerFilterStrategy, outMessage, exchange.getIn().getHeaders(), exchange);
+
+ // TODO support different encoding
+ exchange.getIn().setBody(outputStream.getInputStream());
+ LOG.debug("template sending request: ", exchange.getIn());
+ Exception exception;
+ try {
+ this.producer.process(exchange);
+ } catch (Exception ex) {
+ exception = ex;
+ }
+ // Throw the exception that the template get
+ exception = exchange.getException();
+ if (exception != null) {
+ throw new IOException("Cannot send the request message.", exchange.getException());
+ }
+ exchange.setProperty(CamelTransportConstants.CXF_EXCHANGE, outMessage.getExchange());
+ if (!isOneWay) {
+ handleResponse(exchange);
+ }
+
+ }
+
+ private void handleResponse(org.apache.camel.Exchange exchange) throws IOException {
+ org.apache.cxf.message.Message inMessage = null;
+ try {
+ inMessage = CxfMessageHelper.getCxfInMessage(this.headerFilterStrategy, exchange, true);
+ } catch (Exception ex) {
+ throw new IOException("Cannot get the response message. ", ex);
+ }
+ this.observer.onMessage(inMessage);
+ }
+}
\ No newline at end of file
Propchange: camel/trunk/components/camel-cxf-transport/src/main/java/org/apache/camel/component/cxf/transport/CamelOutputStream.java
------------------------------------------------------------------------------
svn:mime-type = text/plain