You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by ds...@apache.org on 2014/02/10 11:09:32 UTC

svn commit: r1566561 - /cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java

Author: dsosnoski
Date: Mon Feb 10 10:09:31 2014
New Revision: 1566561

URL: http://svn.apache.org/r1566561
Log:
Correct for merge errors.

Modified:
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java?rev=1566561&r1=1566560&r2=1566561&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java Mon Feb 10 10:09:31 2014
@@ -19,52 +19,78 @@
 
 package org.apache.cxf.ws.rm.soap;
 
-import java.io.IOException;
-import java.io.InputStream;
 import java.io.OutputStream;
-import java.net.ConnectException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
+import java.util.ListIterator;
 import java.util.Map;
+import java.util.Set;
 import java.util.TimerTask;
 import java.util.concurrent.Executor;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import javax.xml.namespace.QName;
+import javax.xml.stream.XMLStreamConstants;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamReader;
+import javax.xml.stream.XMLStreamWriter;
+
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.binding.soap.SoapHeader;
+import org.apache.cxf.binding.soap.SoapMessage;
+import org.apache.cxf.binding.soap.SoapVersion;
+import org.apache.cxf.binding.soap.interceptor.SoapOutInterceptor;
 import org.apache.cxf.common.logging.LogUtils;
 import org.apache.cxf.endpoint.ConduitSelector;
 import org.apache.cxf.endpoint.DeferredConduitSelector;
 import org.apache.cxf.endpoint.Endpoint;
-import org.apache.cxf.helpers.IOUtils;
-import org.apache.cxf.io.CachedOutputStream;
+import org.apache.cxf.helpers.DOMUtils;
+import org.apache.cxf.interceptor.AbstractOutDatabindingInterceptor;
+import org.apache.cxf.interceptor.Fault;
+import org.apache.cxf.interceptor.Interceptor;
 import org.apache.cxf.io.CachedOutputStreamCallback;
+import org.apache.cxf.io.WriteOnCloseOutputStream;
 import org.apache.cxf.message.Message;
 import org.apache.cxf.message.MessageUtils;
+import org.apache.cxf.phase.Phase;
+import org.apache.cxf.phase.PhaseChainCache;
+import org.apache.cxf.phase.PhaseInterceptor;
+import org.apache.cxf.phase.PhaseInterceptorChain;
+import org.apache.cxf.phase.PhaseManager;
 import org.apache.cxf.service.model.EndpointInfo;
+import org.apache.cxf.staxutils.PartialXMLStreamReader;
+import org.apache.cxf.staxutils.StaxUtils;
+import org.apache.cxf.staxutils.W3CDOMStreamWriter;
 import org.apache.cxf.transport.Conduit;
 import org.apache.cxf.transport.MessageObserver;
 import org.apache.cxf.workqueue.SynchronousExecutor;
 import org.apache.cxf.ws.addressing.AddressingProperties;
 import org.apache.cxf.ws.addressing.AttributedURIType;
 import org.apache.cxf.ws.addressing.EndpointReferenceType;
+import org.apache.cxf.ws.addressing.soap.MAPCodec;
 import org.apache.cxf.ws.policy.AssertionInfo;
 import org.apache.cxf.ws.policy.builder.jaxb.JaxbAssertion;
 import org.apache.cxf.ws.rm.ProtocolVariation;
+import org.apache.cxf.ws.rm.RMCaptureOutInterceptor;
 import org.apache.cxf.ws.rm.RMConfiguration;
 import org.apache.cxf.ws.rm.RMContextUtils;
 import org.apache.cxf.ws.rm.RMEndpoint;
-import org.apache.cxf.ws.rm.RMException;
 import org.apache.cxf.ws.rm.RMManager;
 import org.apache.cxf.ws.rm.RMMessageConstants;
 import org.apache.cxf.ws.rm.RMProperties;
 import org.apache.cxf.ws.rm.RMUtils;
-import org.apache.cxf.ws.rm.RetransmissionCallback;
 import org.apache.cxf.ws.rm.RetransmissionQueue;
 import org.apache.cxf.ws.rm.RetryStatus;
+import org.apache.cxf.ws.rm.RewindableInputStream;
 import org.apache.cxf.ws.rm.SourceSequence;
 import org.apache.cxf.ws.rm.manager.RetryPolicyType;
 import org.apache.cxf.ws.rm.persistence.RMStore;
@@ -290,7 +316,7 @@ public class RetransmissionQueueImpl imp
      * @param message the message context
      * @return a ResendCandidate
      */
-    protected ResendCandidate createResendCandidate(Message message) {
+    protected ResendCandidate createResendCandidate(SoapMessage message) {
         return new ResendCandidate(message);
     }
 
@@ -364,128 +390,12 @@ public class RetransmissionQueueImpl imp
         return suspendedCandidates.containsKey(key);
     }
 
-    private void clientResend(Message message) {
-        Conduit c = message.getExchange().getConduit(message);
-        resend(c, message);
-    }
-
-    private void serverResend(Message message) throws RMException {
-        
-        // get the message's to address
-        
-        AddressingProperties maps = RMContextUtils.retrieveMAPs(message, false, true);
-        AttributedURIType to = null;
-        if (null != maps) {
-            to = maps.getTo();
-        }
-        if (null == to) {
-            LOG.log(Level.SEVERE, "NO_ADDRESS_FOR_RESEND_MSG");
-            return;
-        }
-        if (RMUtils.getAddressingConstants().getAnonymousURI().equals(to.getValue())) {
-            LOG.log(Level.FINE, "Cannot resend to anonymous target");
-            return;
-        }
-        
-        final String address = to.getValue();
-        LOG.log(Level.FINE, "Resending to address: {0}", address);
-        final ProtocolVariation protocol = RMContextUtils.getProtocolVariation(message);
-        final Endpoint reliableEndpoint = manager.getReliableEndpoint(message).getEndpoint(protocol);
-
-        ConduitSelector cs = new DeferredConduitSelector() {
-            @Override
-            public synchronized Conduit selectConduit(Message message) {
-                Conduit conduit = null;
-                EndpointInfo endpointInfo = reliableEndpoint.getEndpointInfo();
-                EndpointReferenceType original =  endpointInfo.getTarget();
-                try {
-                    if (null != address) {
-                        endpointInfo.setAddress(address);
-                    }
-                    conduit = super.selectConduit(message);
-                } finally {
-                    endpointInfo.setAddress(original);
-                }
-                return conduit;
-            }
-        };
-        
-        cs.setEndpoint(reliableEndpoint);
-        Conduit c = cs.selectConduit(message);   
-        // REVISIT
-        // use application endpoint message observer instead?
-        c.setMessageObserver(new MessageObserver() {
-            public void onMessage(Message message) {
-                LOG.fine("Ignoring response to resent message.");
-            }
-            
-        });
-        resend(c, message);
-    }
-    
-    private void resend(Conduit c, Message message) {
-        try {
-
-            // get registered callbacks, create new output stream and
-            // re-register
-            // all callbacks except the retransmission callback
-
-            OutputStream os = message.getContent(OutputStream.class);
-            List<CachedOutputStreamCallback> callbacks = null;
-            
-            if (os instanceof CachedOutputStream) {
-                callbacks = ((CachedOutputStream)os).getCallbacks();
-            }
-            message.removeContent(OutputStream.class);
-            c.prepare(message);
-
-            os = message.getContent(OutputStream.class);
-            
-            if (null != callbacks && callbacks.size() > 1) {
-                if (!(os instanceof CachedOutputStream)) {
-                    os = RMUtils.createCachedStream(message, os);
-                }
-                for (CachedOutputStreamCallback cb : callbacks) {
-                    if (!(cb instanceof RetransmissionCallback)) {
-                        ((CachedOutputStream)os).registerCallback(cb);
-                    }
-                }
-            }
-            CachedOutputStream content = (CachedOutputStream)message
-                .get(RMMessageConstants.SAVED_CONTENT);
-            if (null == content) {
-                LOG.log(Level.WARNING, "Assuming the message has been acknowledged and released, skipping resend.");
-            } else {
-                InputStream bis = content.getInputStream();
-                if (LOG.isLoggable(Level.FINE)) {
-                    if (content.size() < 65536) {
-                        LOG.fine("Using saved output stream: " 
-                                 + IOUtils.newStringFromBytes(content.getBytes()));                        
-                    } else {                        
-                        LOG.fine("Using saved output stream: ...");                        
-                    }
-                }
-
-                // copy saved output stream to new output stream in chunks of 1024
-                IOUtils.copyAndCloseInput(bis, os);
-                os.flush();
-                // closing the conduit this way will close the underlining stream that is os.
-                c.close(message);
-            }
-        } catch (ConnectException ex) {
-            //ignore, we'll just resent again later
-        } catch (IOException ex) {
-            LOG.log(Level.WARNING, "RESEND_FAILED_MSG", ex);
-        }
-    }
-
     /**
      * Represents a candidate for resend, i.e. an unacked outgoing message.
      */
     protected class ResendCandidate implements Runnable, RetryStatus {
         private Message message;
         private long number;
-        private OutputStream out;
         private Date next;
         private TimerTask nextTask;
         private int retries;
@@ -502,7 +412,6 @@ public class RetransmissionQueueImpl imp
         protected ResendCandidate(Message m) {
             message = m;
             retries = 0;
-            out = m.getContent(OutputStream.class);
             RMConfiguration cfg = manager.getEffectiveConfiguration(message);
             long baseRetransmissionInterval = 
                 cfg.getBaseRetransmissionInterval().longValue();
@@ -518,8 +427,7 @@ public class RetransmissionQueueImpl imp
             if (null != maps) {
                 to = maps.getTo();
             }
-            if (to != null 
-                && RMUtils.getAddressingConstants().getAnonymousURI().equals(to.getValue())) {
+            if (to != null  && RMUtils.getAddressingConstants().getAnonymousURI().equals(to.getValue())) {
                 LOG.log(Level.INFO, "Cannot resend to anonymous target.  Not scheduling a resend.");
                 return;
             }
@@ -566,7 +474,6 @@ public class RetransmissionQueueImpl imp
                 // ensure ACK wasn't received while this task was enqueued
                 // on executor
                 if (isPending()) {
-                    message.setContent(OutputStream.class, out);
                     resender.resend(message, includeAckRequested);
                     includeAckRequested = false;
                 }
@@ -668,15 +575,9 @@ public class RetransmissionQueueImpl imp
         }
 
         private void releaseSavedMessage() {
-            CachedOutputStream saved = (CachedOutputStream)message.remove(RMMessageConstants.SAVED_CONTENT);
-            if (saved != null) {
-                saved.releaseTempFileHold();
-                // call close to dispose
-                try {
-                    saved.close();
-                } catch (IOException e) {
-                    // ignore
-                }
+            RewindableInputStream is = (RewindableInputStream)message.get(RMMessageConstants.SAVED_CONTENT);
+            if (is != null) {
+                is.release();
             }
         }
 
@@ -734,7 +635,7 @@ public class RetransmissionQueueImpl imp
         /**
          * Resend mechanics.
          * 
-         * @param context the cloned message context.
+         * @param message
          * @param if a AckRequest should be included
          */
         void resend(Message message, boolean requestAcknowledge);
@@ -753,17 +654,10 @@ public class RetransmissionQueueImpl imp
                 if (st != null) {
                     LOG.log(Level.INFO, "RESEND_MSG", st.getMessageNumber());
                 }
-                try {
-                    // TODO: remove previously added acknowledgments and update
-                    // message id (to avoid duplicates)
-
-                    if (MessageUtils.isRequestor(message)) {
-                        clientResend(message);
-                    } else {
-                        serverResend(message);
-                    }
-                } catch (Exception e) {
-                    LOG.log(Level.WARNING, "RESEND_FAILED_MSG", e);
+                if (message instanceof SoapMessage) {
+                    doResend((SoapMessage)message);
+                } else {
+                    doResend(new SoapMessage(message));
                 }
             }
         };
@@ -782,5 +676,255 @@ public class RetransmissionQueueImpl imp
     protected JaxbAssertion<RMAssertion> getAssertion(AssertionInfo ai) {
         return (JaxbAssertion<RMAssertion>)ai.getAssertion();
     }
+    
+    private void readHeaders(XMLStreamReader xmlReader, SoapMessage message) throws XMLStreamException {
 
-}
+        // read header portion of SOAP document into DOM
+        SoapVersion version = message.getVersion();
+        XMLStreamReader filteredReader = new PartialXMLStreamReader(xmlReader, version.getBody());
+        Node nd = message.getContent(Node.class);
+        W3CDOMStreamWriter writer = message.get(W3CDOMStreamWriter.class);
+        Document doc = null;
+        if (writer != null) {
+            StaxUtils.copy(filteredReader, writer);
+            doc = writer.getDocument();
+        } else if (nd instanceof Document) {
+            doc = (Document)nd;
+            StaxUtils.readDocElements(doc, doc, filteredReader, false, false);
+        } else {
+            doc = StaxUtils.read(filteredReader);
+            message.setContent(Node.class, doc);
+        }
+
+        // get the actual SOAP header
+        Element element = doc.getDocumentElement();
+        QName header = version.getHeader();                
+        List<Element> elemList = 
+            DOMUtils.findAllElementsByTagNameNS(element, header.getNamespaceURI(), header.getLocalPart());
+        for (Element elem : elemList) {
+            
+            // set all child elements as headers for message transmission
+            Element hel = DOMUtils.getFirstElement(elem);
+            while (hel != null) {
+                SoapHeader sheader = new SoapHeader(DOMUtils.getElementQName(hel), hel);
+                message.getHeaders().add(sheader);
+                hel = DOMUtils.getNextElement(hel);
+            }
+        }
+    }
+
+    private void doResend(SoapMessage message) {
+        try {
+            
+            // initialize copied interceptor chain for message
+            PhaseInterceptorChain retransmitChain = manager.getRetransmitChain();
+            ProtocolVariation protocol = RMContextUtils.getProtocolVariation(message);
+            Endpoint endpoint = manager.getReliableEndpoint(message).getEndpoint(protocol);
+            PhaseChainCache cache = new PhaseChainCache();
+            boolean after = true;
+            if (retransmitChain == null) {
+                
+                // no saved retransmit chain, so construct one from scratch (won't work for WS-Security on server, so
+                //  need to fix)
+                retransmitChain = buildRetransmitChain(endpoint, cache);
+                after = false;
+
+            }
+            message.setInterceptorChain(retransmitChain);
+            
+            // clear flag for SOAP out interceptor so envelope will be written
+            message.remove(SoapOutInterceptor.WROTE_ENVELOPE_START);
+            
+            // discard all saved content
+            Set<Class<?>> formats = message.getContentFormats();
+            List<CachedOutputStreamCallback> callbacks = null;
+            for (Class<?> clas: formats) {
+                Object content = message.getContent(clas);
+                if (content != null) {
+                    LOG.info("Removing " + clas.getName() + " content of actual type " + content.getClass().getName());
+                    message.removeContent(clas);
+                    if (clas == OutputStream.class && content instanceof WriteOnCloseOutputStream) {
+                        callbacks = ((WriteOnCloseOutputStream)content).getCallbacks();
+                    }
+                }
+            }
+            
+            // read SOAP headers from saved input stream
+            RewindableInputStream is = (RewindableInputStream)message.get(RMMessageConstants.SAVED_CONTENT);
+            is.rewind();
+            XMLStreamReader reader = StaxUtils.createXMLStreamReader(is, "UTF-8");
+            message.getHeaders().clear();
+            if (reader.getEventType() != XMLStreamConstants.START_ELEMENT
+                && reader.nextTag() != XMLStreamConstants.START_ELEMENT) {
+                throw new IllegalStateException("No document found");
+            }
+            readHeaders(reader, message);
+            int event;
+            while ((event = reader.nextTag()) != XMLStreamConstants.START_ELEMENT) {
+                if (event == XMLStreamConstants.END_ELEMENT) {
+                    throw new IllegalStateException("No body content present");
+                }
+            }
+            
+            // set message addressing properties
+            AddressingProperties maps = new MAPCodec().unmarshalMAPs(message);
+            RMContextUtils.storeMAPs(maps, message, true, MessageUtils.isRequestor(message));
+            AttributedURIType to = null;
+            if (null != maps) {
+                to = maps.getTo();
+            }
+            if (null == to) {
+                LOG.log(Level.SEVERE, "NO_ADDRESS_FOR_RESEND_MSG");
+                return;
+            }
+            if (RMUtils.getAddressingConstants().getAnonymousURI().equals(to.getValue())) {
+                LOG.log(Level.FINE, "Cannot resend to anonymous target");
+                return;
+            }
+            
+            // initialize conduit for new message
+            Conduit c = message.getExchange().getConduit(message);
+            if (c == null) {
+                c = buildConduit(message, endpoint, to);
+            }
+            c.prepare(message);
+            
+            // replace standard message marshaling with copy from saved stream
+            ListIterator<Interceptor<? extends Message>> iterator = retransmitChain.getIterator();
+            while (iterator.hasNext()) {
+                Interceptor<? extends Message> incept = iterator.next();
+                
+                // remove JAX-WS interceptors which handle message modes and such
+                if (incept.getClass().getName().startsWith("org.apache.cxf.jaxws.interceptors")) {
+                    retransmitChain.remove(incept);
+                } else if (incept instanceof PhaseInterceptor
+                    && (((PhaseInterceptor<?>)incept).getPhase() == Phase.MARSHAL)) {
+                    
+                    // remove any interceptors from the marshal phase
+                    retransmitChain.remove(incept);
+                }
+            }
+            retransmitChain.add(new CopyOutInterceptor(reader));
+            
+            // restore callbacks on output stream
+            if (callbacks != null) {
+                OutputStream os = message.getContent(OutputStream.class);
+                if (os != null) {
+                    WriteOnCloseOutputStream woc;
+                    if (os instanceof WriteOnCloseOutputStream) {
+                        woc = (WriteOnCloseOutputStream)os;
+                    } else {
+                        woc = new WriteOnCloseOutputStream(os);
+                        message.setContent(OutputStream.class, woc);
+                    }
+                    for (CachedOutputStreamCallback cb: callbacks) {
+                        woc.registerCallback(cb);
+                    }
+                }
+            }
+            
+            // send the message
+            message.put(RMMessageConstants.RM_RETRANSMISSION, Boolean.TRUE);
+            if (after) {
+                retransmitChain.doInterceptStartingAfter(message, RMCaptureOutInterceptor.class.getName());
+            } else {
+                retransmitChain.doIntercept(message);
+            }
+            if (LOG.isLoggable(Level.INFO)) {
+                RMProperties rmps = RMContextUtils.retrieveRMProperties(message, true);
+                SequenceType seq = rmps.getSequence();
+                LOG.log(Level.INFO, "Retransmitted message " + seq.getMessageNumber() + " in sequence "
+                    + seq.getIdentifier().getValue());
+                rmps = new RMProperties();
+            }
+            
+        } catch (Exception ex) {
+            LOG.log(Level.SEVERE, "RESEND_FAILED_MSG", ex);
+        }
+    }
+
+    /**
+     * @param message
+     * @param endpoint
+     * @param to
+     * @return
+     */
+    protected Conduit buildConduit(SoapMessage message, final Endpoint endpoint, AttributedURIType to) {
+        Conduit c;
+        final String address = to.getValue();
+        ConduitSelector cs = new DeferredConduitSelector() {
+            @Override
+            public synchronized Conduit selectConduit(Message message) {
+                Conduit conduit = null;
+                EndpointInfo endpointInfo = endpoint.getEndpointInfo();
+                EndpointReferenceType original =  endpointInfo.getTarget();
+                try {
+                    if (null != address) {
+                        endpointInfo.setAddress(address);
+                    }
+                    conduit = super.selectConduit(message);
+                } finally {
+                    endpointInfo.setAddress(original);
+                }
+                return conduit;
+            }
+        };
+        
+        cs.setEndpoint(endpoint);
+        c = cs.selectConduit(message);   
+        // REVISIT
+        // use application endpoint message observer instead?
+        c.setMessageObserver(new MessageObserver() {
+            public void onMessage(Message message) {
+                LOG.fine("Ignoring response to resent message.");
+            }
+        });
+        
+        message.getExchange().setConduit(c);
+        return c;
+    }
+
+    /**
+     * @param endpoint
+     * @param cache
+     * @return
+     */
+    protected PhaseInterceptorChain buildRetransmitChain(final Endpoint endpoint, PhaseChainCache cache) {
+        PhaseInterceptorChain retransmitChain;
+        Bus bus = getManager().getBus();
+        List<Interceptor<? extends Message>> i1 = bus.getOutInterceptors();
+        if (LOG.isLoggable(Level.FINE)) {
+            LOG.fine("Interceptors contributed by bus: " + i1);
+        }
+        List<Interceptor<? extends Message>> i2 = endpoint.getOutInterceptors();
+        if (LOG.isLoggable(Level.FINE)) {
+            LOG.fine("Interceptors contributed by endpoint: " + i2);
+        }
+        List<Interceptor<? extends Message>> i3 = endpoint.getBinding().getOutInterceptors();
+        if (LOG.isLoggable(Level.FINE)) {
+            LOG.fine("Interceptors contributed by binding: " + i3);
+        }
+        PhaseManager pm = bus.getExtension(PhaseManager.class);
+        retransmitChain = cache.get(pm.getOutPhases(), i1, i2, i3);
+        return retransmitChain;
+    }
+    
+    public static class CopyOutInterceptor extends AbstractOutDatabindingInterceptor {
+        private final XMLStreamReader reader;
+        
+        public CopyOutInterceptor(XMLStreamReader rdr) {
+            super(Phase.MARSHAL);
+            reader = rdr;
+        }
+        
+        @Override
+        public void handleMessage(Message message) throws Fault {
+            try {
+                XMLStreamWriter writer = message.getContent(XMLStreamWriter.class);
+                StaxUtils.copy(reader, writer);
+            } catch (XMLStreamException e) {
+                throw new Fault("COULD_NOT_READ_XML_STREAM", LOG, e);
+            }
+        }
+    }
+}
\ No newline at end of file