You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by cs...@apache.org on 2011/06/15 10:33:58 UTC

svn commit: r1135955 - in /camel/trunk/components/camel-cxf-transport/src/main/java/org/apache/camel/component/cxf/transport: CamelConduit.java CamelOutputStream.java

Author: cschneider
Date: Wed Jun 15 08:33:57 2011
New Revision: 1135955

URL: http://svn.apache.org/viewvc?rev=1135955&view=rev
Log:
Decoupling output stream from conduit

Added:
    camel/trunk/components/camel-cxf-transport/src/main/java/org/apache/camel/component/cxf/transport/CamelOutputStream.java   (with props)
Modified:
    camel/trunk/components/camel-cxf-transport/src/main/java/org/apache/camel/component/cxf/transport/CamelConduit.java

Modified: camel/trunk/components/camel-cxf-transport/src/main/java/org/apache/camel/component/cxf/transport/CamelConduit.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf-transport/src/main/java/org/apache/camel/component/cxf/transport/CamelConduit.java?rev=1135955&r1=1135954&r2=1135955&view=diff
==============================================================================
--- camel/trunk/components/camel-cxf-transport/src/main/java/org/apache/camel/component/cxf/transport/CamelConduit.java (original)
+++ camel/trunk/components/camel-cxf-transport/src/main/java/org/apache/camel/component/cxf/transport/CamelConduit.java Wed Jun 15 08:33:57 2011
@@ -21,26 +21,18 @@ import java.io.OutputStream;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
-import org.apache.camel.Exchange;
-import org.apache.camel.ExchangePattern;
 import org.apache.camel.Producer;
 import org.apache.camel.ProducerTemplate;
 import org.apache.camel.RuntimeCamelException;
-import org.apache.camel.component.cxf.common.header.CxfHeaderHelper;
-import org.apache.camel.component.cxf.common.message.CxfMessageHelper;
 import org.apache.camel.spi.HeaderFilterStrategy;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.cxf.Bus;
 import org.apache.cxf.common.logging.LogUtils;
 import org.apache.cxf.configuration.Configurable;
 import org.apache.cxf.configuration.Configurer;
-import org.apache.cxf.io.CachedOutputStream;
 import org.apache.cxf.message.Message;
 import org.apache.cxf.service.model.EndpointInfo;
 import org.apache.cxf.transport.AbstractConduit;
-import org.apache.cxf.transport.Conduit;
-import org.apache.cxf.transport.Destination;
-import org.apache.cxf.transport.MessageObserver;
 import org.apache.cxf.ws.addressing.EndpointReferenceType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -106,7 +98,12 @@ public class CamelConduit extends Abstra
     // prepare the message for send out , not actually send out the message
     public void prepare(Message message) throws IOException {
         LOG.trace("CamelConduit send message");
-        message.setContent(OutputStream.class, new CamelOutputStream(message));
+        CamelOutputStream os = new CamelOutputStream(this.targetCamelEndpointUri, 
+                                                     this.producer, 
+                                                     this.headerFilterStrategy, 
+                                                     this.getMessageObserver(), 
+                                                     message);
+        message.setContent(OutputStream.class, os);
     }
 
     public void close() {
@@ -153,108 +150,4 @@ public class CamelConduit extends Abstra
         camelTemplate = template;
     }
 
-    private class CamelOutputStream extends CachedOutputStream {
-        private Message outMessage;
-        private boolean isOneWay;
-
-        public CamelOutputStream(Message m) {
-            outMessage = m;
-        }
-
-        protected void doFlush() throws IOException {
-            // do nothing here
-        }
-
-        protected void doClose() throws IOException {
-            isOneWay = outMessage.getExchange().isOneWay();
-            commitOutputMessage();
-        }
-
-        protected void onWrite() throws IOException {
-            // do nothing here
-        }
-
-
-        private void commitOutputMessage() throws IOException {
-            ExchangePattern pattern;
-            if (isOneWay) {
-                pattern = ExchangePattern.InOnly;
-            } else {
-                pattern = ExchangePattern.InOut;
-            }
-            LOG.debug("send the message to endpoint {}", targetCamelEndpointUri);
-            org.apache.camel.Exchange exchange = producer.createExchange(pattern);
-
-            exchange.setProperty(Exchange.TO_ENDPOINT, targetCamelEndpointUri);
-            CachedOutputStream outputStream = (CachedOutputStream) outMessage.getContent(OutputStream.class);
-            // Send out the request message here, copy the protocolHeader back
-            CxfHeaderHelper.propagateCxfToCamel(headerFilterStrategy, outMessage, exchange.getIn().getHeaders(), exchange);
-
-            // TODO support different encoding
-            exchange.getIn().setBody(outputStream.getInputStream());
-            LOG.debug("template sending request: ", exchange.getIn());
-            Exception exception;
-            try {
-                producer.process(exchange);
-            } catch (Exception ex) {
-                exception = ex;
-            }
-            // Throw the exception that the template get
-            exception = exchange.getException();
-            if (exception != null) {
-                throw new IOException("Cannot send the request message.", exchange.getException());
-            }
-            exchange.setProperty(CamelTransportConstants.CXF_EXCHANGE, outMessage.getExchange());
-            if (!isOneWay) {
-                handleResponse(exchange);
-            }
-
-        }
-
-        private void handleResponse(org.apache.camel.Exchange exchange) throws IOException {
-            org.apache.cxf.message.Message inMessage = null;
-            try {
-                inMessage = CxfMessageHelper.getCxfInMessage(headerFilterStrategy, exchange, true);
-            } catch (Exception ex) {
-                throw new IOException("Cannot get the response message. ", ex);
-            }
-            incomingObserver.onMessage(inMessage);
-        }
-    }
-
-    /**
-     * Represented decoupled response endpoint.
-     */
-    // TODO: This class is not used
-    @Deprecated
-    protected class DecoupledDestination implements Destination {
-        protected MessageObserver decoupledMessageObserver;
-        private EndpointReferenceType address;
-
-        DecoupledDestination(EndpointReferenceType ref, MessageObserver incomingObserver) {
-            address = ref;
-            decoupledMessageObserver = incomingObserver;
-        }
-
-        public EndpointReferenceType getAddress() {
-            return address;
-        }
-
-        public Conduit getBackChannel(Message inMessage, Message partialResponse, EndpointReferenceType addr) throws IOException {
-            // shouldn't be called on decoupled endpoint
-            return null;
-        }
-
-        public void shutdown() {
-        }
-
-        public synchronized void setMessageObserver(MessageObserver observer) {
-            decoupledMessageObserver = observer;
-        }
-
-        public synchronized MessageObserver getMessageObserver() {
-            return decoupledMessageObserver;
-        }
-    }
-
 }

Added: camel/trunk/components/camel-cxf-transport/src/main/java/org/apache/camel/component/cxf/transport/CamelOutputStream.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf-transport/src/main/java/org/apache/camel/component/cxf/transport/CamelOutputStream.java?rev=1135955&view=auto
==============================================================================
--- camel/trunk/components/camel-cxf-transport/src/main/java/org/apache/camel/component/cxf/transport/CamelOutputStream.java (added)
+++ camel/trunk/components/camel-cxf-transport/src/main/java/org/apache/camel/component/cxf/transport/CamelOutputStream.java Wed Jun 15 08:33:57 2011
@@ -0,0 +1,100 @@
+package org.apache.camel.component.cxf.transport;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Producer;
+import org.apache.camel.component.cxf.common.header.CxfHeaderHelper;
+import org.apache.camel.component.cxf.common.message.CxfMessageHelper;
+import org.apache.camel.spi.HeaderFilterStrategy;
+import org.apache.cxf.io.CachedOutputStream;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.transport.MessageObserver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class CamelOutputStream extends CachedOutputStream {
+    private static final Logger LOG = LoggerFactory.getLogger(CamelOutputStream.class);
+    
+    /**
+     * 
+     */
+    private Message outMessage;
+    private boolean isOneWay;
+    private String targetCamelEndpointUri;
+    private Producer producer;
+    private HeaderFilterStrategy headerFilterStrategy;
+    private MessageObserver observer;
+
+    public CamelOutputStream(String targetCamelEndpointUri, Producer producer, 
+                             HeaderFilterStrategy headerFilterStrategy, MessageObserver observer, 
+                             Message m) {
+        this.targetCamelEndpointUri = targetCamelEndpointUri;
+        this.producer = producer;
+        this.headerFilterStrategy = headerFilterStrategy;
+        this.observer = observer;
+        outMessage = m;
+    }
+
+    protected void doFlush() throws IOException {
+        // do nothing here
+    }
+
+    protected void doClose() throws IOException {
+        isOneWay = outMessage.getExchange().isOneWay();
+        commitOutputMessage();
+    }
+
+    protected void onWrite() throws IOException {
+        // do nothing here
+    }
+
+
+    private void commitOutputMessage() throws IOException {
+        ExchangePattern pattern;
+        if (isOneWay) {
+            pattern = ExchangePattern.InOnly;
+        } else {
+            pattern = ExchangePattern.InOut;
+        }
+        LOG.debug("send the message to endpoint {}", this.targetCamelEndpointUri);
+        org.apache.camel.Exchange exchange = this.producer.createExchange(pattern);
+
+        exchange.setProperty(Exchange.TO_ENDPOINT, this.targetCamelEndpointUri);
+        CachedOutputStream outputStream = (CachedOutputStream) outMessage.getContent(OutputStream.class);
+        // Send out the request message here, copy the protocolHeader back
+        CxfHeaderHelper.propagateCxfToCamel(this.headerFilterStrategy, outMessage, exchange.getIn().getHeaders(), exchange);
+
+        // TODO support different encoding
+        exchange.getIn().setBody(outputStream.getInputStream());
+        LOG.debug("template sending request: ", exchange.getIn());
+        Exception exception;
+        try {
+            this.producer.process(exchange);
+        } catch (Exception ex) {
+            exception = ex;
+        }
+        // Throw the exception that the template get
+        exception = exchange.getException();
+        if (exception != null) {
+            throw new IOException("Cannot send the request message.", exchange.getException());
+        }
+        exchange.setProperty(CamelTransportConstants.CXF_EXCHANGE, outMessage.getExchange());
+        if (!isOneWay) {
+            handleResponse(exchange);
+        }
+
+    }
+
+    private void handleResponse(org.apache.camel.Exchange exchange) throws IOException {
+        org.apache.cxf.message.Message inMessage = null;
+        try {
+            inMessage = CxfMessageHelper.getCxfInMessage(this.headerFilterStrategy, exchange, true);
+        } catch (Exception ex) {
+            throw new IOException("Cannot get the response message. ", ex);
+        }
+        this.observer.onMessage(inMessage);
+    }
+}
\ No newline at end of file

Propchange: camel/trunk/components/camel-cxf-transport/src/main/java/org/apache/camel/component/cxf/transport/CamelOutputStream.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain