You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by ds...@apache.org on 2014/02/10 11:09:32 UTC
svn commit: r1566561 -
/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java
Author: dsosnoski
Date: Mon Feb 10 10:09:31 2014
New Revision: 1566561
URL: http://svn.apache.org/r1566561
Log:
Correct for merge errors.
Modified:
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java
Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java?rev=1566561&r1=1566560&r2=1566561&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java Mon Feb 10 10:09:31 2014
@@ -19,52 +19,78 @@
package org.apache.cxf.ws.rm.soap;
-import java.io.IOException;
-import java.io.InputStream;
import java.io.OutputStream;
-import java.net.ConnectException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
+import java.util.ListIterator;
import java.util.Map;
+import java.util.Set;
import java.util.TimerTask;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.logging.Level;
import java.util.logging.Logger;
+import javax.xml.namespace.QName;
+import javax.xml.stream.XMLStreamConstants;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamReader;
+import javax.xml.stream.XMLStreamWriter;
+
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.binding.soap.SoapHeader;
+import org.apache.cxf.binding.soap.SoapMessage;
+import org.apache.cxf.binding.soap.SoapVersion;
+import org.apache.cxf.binding.soap.interceptor.SoapOutInterceptor;
import org.apache.cxf.common.logging.LogUtils;
import org.apache.cxf.endpoint.ConduitSelector;
import org.apache.cxf.endpoint.DeferredConduitSelector;
import org.apache.cxf.endpoint.Endpoint;
-import org.apache.cxf.helpers.IOUtils;
-import org.apache.cxf.io.CachedOutputStream;
+import org.apache.cxf.helpers.DOMUtils;
+import org.apache.cxf.interceptor.AbstractOutDatabindingInterceptor;
+import org.apache.cxf.interceptor.Fault;
+import org.apache.cxf.interceptor.Interceptor;
import org.apache.cxf.io.CachedOutputStreamCallback;
+import org.apache.cxf.io.WriteOnCloseOutputStream;
import org.apache.cxf.message.Message;
import org.apache.cxf.message.MessageUtils;
+import org.apache.cxf.phase.Phase;
+import org.apache.cxf.phase.PhaseChainCache;
+import org.apache.cxf.phase.PhaseInterceptor;
+import org.apache.cxf.phase.PhaseInterceptorChain;
+import org.apache.cxf.phase.PhaseManager;
import org.apache.cxf.service.model.EndpointInfo;
+import org.apache.cxf.staxutils.PartialXMLStreamReader;
+import org.apache.cxf.staxutils.StaxUtils;
+import org.apache.cxf.staxutils.W3CDOMStreamWriter;
import org.apache.cxf.transport.Conduit;
import org.apache.cxf.transport.MessageObserver;
import org.apache.cxf.workqueue.SynchronousExecutor;
import org.apache.cxf.ws.addressing.AddressingProperties;
import org.apache.cxf.ws.addressing.AttributedURIType;
import org.apache.cxf.ws.addressing.EndpointReferenceType;
+import org.apache.cxf.ws.addressing.soap.MAPCodec;
import org.apache.cxf.ws.policy.AssertionInfo;
import org.apache.cxf.ws.policy.builder.jaxb.JaxbAssertion;
import org.apache.cxf.ws.rm.ProtocolVariation;
+import org.apache.cxf.ws.rm.RMCaptureOutInterceptor;
import org.apache.cxf.ws.rm.RMConfiguration;
import org.apache.cxf.ws.rm.RMContextUtils;
import org.apache.cxf.ws.rm.RMEndpoint;
-import org.apache.cxf.ws.rm.RMException;
import org.apache.cxf.ws.rm.RMManager;
import org.apache.cxf.ws.rm.RMMessageConstants;
import org.apache.cxf.ws.rm.RMProperties;
import org.apache.cxf.ws.rm.RMUtils;
-import org.apache.cxf.ws.rm.RetransmissionCallback;
import org.apache.cxf.ws.rm.RetransmissionQueue;
import org.apache.cxf.ws.rm.RetryStatus;
+import org.apache.cxf.ws.rm.RewindableInputStream;
import org.apache.cxf.ws.rm.SourceSequence;
import org.apache.cxf.ws.rm.manager.RetryPolicyType;
import org.apache.cxf.ws.rm.persistence.RMStore;
@@ -290,7 +316,7 @@ public class RetransmissionQueueImpl imp
* @param message the message context
* @return a ResendCandidate
*/
- protected ResendCandidate createResendCandidate(Message message) {
+ protected ResendCandidate createResendCandidate(SoapMessage message) {
return new ResendCandidate(message);
}
@@ -364,128 +390,12 @@ public class RetransmissionQueueImpl imp
return suspendedCandidates.containsKey(key);
}
- private void clientResend(Message message) {
- Conduit c = message.getExchange().getConduit(message);
- resend(c, message);
- }
-
- private void serverResend(Message message) throws RMException {
-
- // get the message's to address
-
- AddressingProperties maps = RMContextUtils.retrieveMAPs(message, false, true);
- AttributedURIType to = null;
- if (null != maps) {
- to = maps.getTo();
- }
- if (null == to) {
- LOG.log(Level.SEVERE, "NO_ADDRESS_FOR_RESEND_MSG");
- return;
- }
- if (RMUtils.getAddressingConstants().getAnonymousURI().equals(to.getValue())) {
- LOG.log(Level.FINE, "Cannot resend to anonymous target");
- return;
- }
-
- final String address = to.getValue();
- LOG.log(Level.FINE, "Resending to address: {0}", address);
- final ProtocolVariation protocol = RMContextUtils.getProtocolVariation(message);
- final Endpoint reliableEndpoint = manager.getReliableEndpoint(message).getEndpoint(protocol);
-
- ConduitSelector cs = new DeferredConduitSelector() {
- @Override
- public synchronized Conduit selectConduit(Message message) {
- Conduit conduit = null;
- EndpointInfo endpointInfo = reliableEndpoint.getEndpointInfo();
- EndpointReferenceType original = endpointInfo.getTarget();
- try {
- if (null != address) {
- endpointInfo.setAddress(address);
- }
- conduit = super.selectConduit(message);
- } finally {
- endpointInfo.setAddress(original);
- }
- return conduit;
- }
- };
-
- cs.setEndpoint(reliableEndpoint);
- Conduit c = cs.selectConduit(message);
- // REVISIT
- // use application endpoint message observer instead?
- c.setMessageObserver(new MessageObserver() {
- public void onMessage(Message message) {
- LOG.fine("Ignoring response to resent message.");
- }
-
- });
- resend(c, message);
- }
-
- private void resend(Conduit c, Message message) {
- try {
-
- // get registered callbacks, create new output stream and
- // re-register
- // all callbacks except the retransmission callback
-
- OutputStream os = message.getContent(OutputStream.class);
- List<CachedOutputStreamCallback> callbacks = null;
-
- if (os instanceof CachedOutputStream) {
- callbacks = ((CachedOutputStream)os).getCallbacks();
- }
- message.removeContent(OutputStream.class);
- c.prepare(message);
-
- os = message.getContent(OutputStream.class);
-
- if (null != callbacks && callbacks.size() > 1) {
- if (!(os instanceof CachedOutputStream)) {
- os = RMUtils.createCachedStream(message, os);
- }
- for (CachedOutputStreamCallback cb : callbacks) {
- if (!(cb instanceof RetransmissionCallback)) {
- ((CachedOutputStream)os).registerCallback(cb);
- }
- }
- }
- CachedOutputStream content = (CachedOutputStream)message
- .get(RMMessageConstants.SAVED_CONTENT);
- if (null == content) {
- LOG.log(Level.WARNING, "Assuming the message has been acknowledged and released, skipping resend.");
- } else {
- InputStream bis = content.getInputStream();
- if (LOG.isLoggable(Level.FINE)) {
- if (content.size() < 65536) {
- LOG.fine("Using saved output stream: "
- + IOUtils.newStringFromBytes(content.getBytes()));
- } else {
- LOG.fine("Using saved output stream: ...");
- }
- }
-
- // copy saved output stream to new output stream in chunks of 1024
- IOUtils.copyAndCloseInput(bis, os);
- os.flush();
- // closing the conduit this way will close the underlining stream that is os.
- c.close(message);
- }
- } catch (ConnectException ex) {
- //ignore, we'll just resent again later
- } catch (IOException ex) {
- LOG.log(Level.WARNING, "RESEND_FAILED_MSG", ex);
- }
- }
-
/**
* Represents a candidate for resend, i.e. an unacked outgoing message.
*/
protected class ResendCandidate implements Runnable, RetryStatus {
private Message message;
private long number;
- private OutputStream out;
private Date next;
private TimerTask nextTask;
private int retries;
@@ -502,7 +412,6 @@ public class RetransmissionQueueImpl imp
protected ResendCandidate(Message m) {
message = m;
retries = 0;
- out = m.getContent(OutputStream.class);
RMConfiguration cfg = manager.getEffectiveConfiguration(message);
long baseRetransmissionInterval =
cfg.getBaseRetransmissionInterval().longValue();
@@ -518,8 +427,7 @@ public class RetransmissionQueueImpl imp
if (null != maps) {
to = maps.getTo();
}
- if (to != null
- && RMUtils.getAddressingConstants().getAnonymousURI().equals(to.getValue())) {
+ if (to != null && RMUtils.getAddressingConstants().getAnonymousURI().equals(to.getValue())) {
LOG.log(Level.INFO, "Cannot resend to anonymous target. Not scheduling a resend.");
return;
}
@@ -566,7 +474,6 @@ public class RetransmissionQueueImpl imp
// ensure ACK wasn't received while this task was enqueued
// on executor
if (isPending()) {
- message.setContent(OutputStream.class, out);
resender.resend(message, includeAckRequested);
includeAckRequested = false;
}
@@ -668,15 +575,9 @@ public class RetransmissionQueueImpl imp
}
private void releaseSavedMessage() {
- CachedOutputStream saved = (CachedOutputStream)message.remove(RMMessageConstants.SAVED_CONTENT);
- if (saved != null) {
- saved.releaseTempFileHold();
- // call close to dispose
- try {
- saved.close();
- } catch (IOException e) {
- // ignore
- }
+ RewindableInputStream is = (RewindableInputStream)message.get(RMMessageConstants.SAVED_CONTENT);
+ if (is != null) {
+ is.release();
}
}
@@ -734,7 +635,7 @@ public class RetransmissionQueueImpl imp
/**
* Resend mechanics.
*
- * @param context the cloned message context.
+ * @param message
* @param if a AckRequest should be included
*/
void resend(Message message, boolean requestAcknowledge);
@@ -753,17 +654,10 @@ public class RetransmissionQueueImpl imp
if (st != null) {
LOG.log(Level.INFO, "RESEND_MSG", st.getMessageNumber());
}
- try {
- // TODO: remove previously added acknowledgments and update
- // message id (to avoid duplicates)
-
- if (MessageUtils.isRequestor(message)) {
- clientResend(message);
- } else {
- serverResend(message);
- }
- } catch (Exception e) {
- LOG.log(Level.WARNING, "RESEND_FAILED_MSG", e);
+ if (message instanceof SoapMessage) {
+ doResend((SoapMessage)message);
+ } else {
+ doResend(new SoapMessage(message));
}
}
};
@@ -782,5 +676,255 @@ public class RetransmissionQueueImpl imp
protected JaxbAssertion<RMAssertion> getAssertion(AssertionInfo ai) {
return (JaxbAssertion<RMAssertion>)ai.getAssertion();
}
+
+ private void readHeaders(XMLStreamReader xmlReader, SoapMessage message) throws XMLStreamException {
-}
+ // read header portion of SOAP document into DOM
+ SoapVersion version = message.getVersion();
+ XMLStreamReader filteredReader = new PartialXMLStreamReader(xmlReader, version.getBody());
+ Node nd = message.getContent(Node.class);
+ W3CDOMStreamWriter writer = message.get(W3CDOMStreamWriter.class);
+ Document doc = null;
+ if (writer != null) {
+ StaxUtils.copy(filteredReader, writer);
+ doc = writer.getDocument();
+ } else if (nd instanceof Document) {
+ doc = (Document)nd;
+ StaxUtils.readDocElements(doc, doc, filteredReader, false, false);
+ } else {
+ doc = StaxUtils.read(filteredReader);
+ message.setContent(Node.class, doc);
+ }
+
+ // get the actual SOAP header
+ Element element = doc.getDocumentElement();
+ QName header = version.getHeader();
+ List<Element> elemList =
+ DOMUtils.findAllElementsByTagNameNS(element, header.getNamespaceURI(), header.getLocalPart());
+ for (Element elem : elemList) {
+
+ // set all child elements as headers for message transmission
+ Element hel = DOMUtils.getFirstElement(elem);
+ while (hel != null) {
+ SoapHeader sheader = new SoapHeader(DOMUtils.getElementQName(hel), hel);
+ message.getHeaders().add(sheader);
+ hel = DOMUtils.getNextElement(hel);
+ }
+ }
+ }
+
+ private void doResend(SoapMessage message) {
+ try {
+
+ // initialize copied interceptor chain for message
+ PhaseInterceptorChain retransmitChain = manager.getRetransmitChain();
+ ProtocolVariation protocol = RMContextUtils.getProtocolVariation(message);
+ Endpoint endpoint = manager.getReliableEndpoint(message).getEndpoint(protocol);
+ PhaseChainCache cache = new PhaseChainCache();
+ boolean after = true;
+ if (retransmitChain == null) {
+
+ // no saved retransmit chain, so construct one from scratch (won't work for WS-Security on server, so
+ // need to fix)
+ retransmitChain = buildRetransmitChain(endpoint, cache);
+ after = false;
+
+ }
+ message.setInterceptorChain(retransmitChain);
+
+ // clear flag for SOAP out interceptor so envelope will be written
+ message.remove(SoapOutInterceptor.WROTE_ENVELOPE_START);
+
+ // discard all saved content
+ Set<Class<?>> formats = message.getContentFormats();
+ List<CachedOutputStreamCallback> callbacks = null;
+ for (Class<?> clas: formats) {
+ Object content = message.getContent(clas);
+ if (content != null) {
+ LOG.info("Removing " + clas.getName() + " content of actual type " + content.getClass().getName());
+ message.removeContent(clas);
+ if (clas == OutputStream.class && content instanceof WriteOnCloseOutputStream) {
+ callbacks = ((WriteOnCloseOutputStream)content).getCallbacks();
+ }
+ }
+ }
+
+ // read SOAP headers from saved input stream
+ RewindableInputStream is = (RewindableInputStream)message.get(RMMessageConstants.SAVED_CONTENT);
+ is.rewind();
+ XMLStreamReader reader = StaxUtils.createXMLStreamReader(is, "UTF-8");
+ message.getHeaders().clear();
+ if (reader.getEventType() != XMLStreamConstants.START_ELEMENT
+ && reader.nextTag() != XMLStreamConstants.START_ELEMENT) {
+ throw new IllegalStateException("No document found");
+ }
+ readHeaders(reader, message);
+ int event;
+ while ((event = reader.nextTag()) != XMLStreamConstants.START_ELEMENT) {
+ if (event == XMLStreamConstants.END_ELEMENT) {
+ throw new IllegalStateException("No body content present");
+ }
+ }
+
+ // set message addressing properties
+ AddressingProperties maps = new MAPCodec().unmarshalMAPs(message);
+ RMContextUtils.storeMAPs(maps, message, true, MessageUtils.isRequestor(message));
+ AttributedURIType to = null;
+ if (null != maps) {
+ to = maps.getTo();
+ }
+ if (null == to) {
+ LOG.log(Level.SEVERE, "NO_ADDRESS_FOR_RESEND_MSG");
+ return;
+ }
+ if (RMUtils.getAddressingConstants().getAnonymousURI().equals(to.getValue())) {
+ LOG.log(Level.FINE, "Cannot resend to anonymous target");
+ return;
+ }
+
+ // initialize conduit for new message
+ Conduit c = message.getExchange().getConduit(message);
+ if (c == null) {
+ c = buildConduit(message, endpoint, to);
+ }
+ c.prepare(message);
+
+ // replace standard message marshaling with copy from saved stream
+ ListIterator<Interceptor<? extends Message>> iterator = retransmitChain.getIterator();
+ while (iterator.hasNext()) {
+ Interceptor<? extends Message> incept = iterator.next();
+
+ // remove JAX-WS interceptors which handle message modes and such
+ if (incept.getClass().getName().startsWith("org.apache.cxf.jaxws.interceptors")) {
+ retransmitChain.remove(incept);
+ } else if (incept instanceof PhaseInterceptor
+ && (((PhaseInterceptor<?>)incept).getPhase() == Phase.MARSHAL)) {
+
+ // remove any interceptors from the marshal phase
+ retransmitChain.remove(incept);
+ }
+ }
+ retransmitChain.add(new CopyOutInterceptor(reader));
+
+ // restore callbacks on output stream
+ if (callbacks != null) {
+ OutputStream os = message.getContent(OutputStream.class);
+ if (os != null) {
+ WriteOnCloseOutputStream woc;
+ if (os instanceof WriteOnCloseOutputStream) {
+ woc = (WriteOnCloseOutputStream)os;
+ } else {
+ woc = new WriteOnCloseOutputStream(os);
+ message.setContent(OutputStream.class, woc);
+ }
+ for (CachedOutputStreamCallback cb: callbacks) {
+ woc.registerCallback(cb);
+ }
+ }
+ }
+
+ // send the message
+ message.put(RMMessageConstants.RM_RETRANSMISSION, Boolean.TRUE);
+ if (after) {
+ retransmitChain.doInterceptStartingAfter(message, RMCaptureOutInterceptor.class.getName());
+ } else {
+ retransmitChain.doIntercept(message);
+ }
+ if (LOG.isLoggable(Level.INFO)) {
+ RMProperties rmps = RMContextUtils.retrieveRMProperties(message, true);
+ SequenceType seq = rmps.getSequence();
+ LOG.log(Level.INFO, "Retransmitted message " + seq.getMessageNumber() + " in sequence "
+ + seq.getIdentifier().getValue());
+ rmps = new RMProperties();
+ }
+
+ } catch (Exception ex) {
+ LOG.log(Level.SEVERE, "RESEND_FAILED_MSG", ex);
+ }
+ }
+
+ /**
+ * @param message
+ * @param endpoint
+ * @param to
+ * @return
+ */
+ protected Conduit buildConduit(SoapMessage message, final Endpoint endpoint, AttributedURIType to) {
+ Conduit c;
+ final String address = to.getValue();
+ ConduitSelector cs = new DeferredConduitSelector() {
+ @Override
+ public synchronized Conduit selectConduit(Message message) {
+ Conduit conduit = null;
+ EndpointInfo endpointInfo = endpoint.getEndpointInfo();
+ EndpointReferenceType original = endpointInfo.getTarget();
+ try {
+ if (null != address) {
+ endpointInfo.setAddress(address);
+ }
+ conduit = super.selectConduit(message);
+ } finally {
+ endpointInfo.setAddress(original);
+ }
+ return conduit;
+ }
+ };
+
+ cs.setEndpoint(endpoint);
+ c = cs.selectConduit(message);
+ // REVISIT
+ // use application endpoint message observer instead?
+ c.setMessageObserver(new MessageObserver() {
+ public void onMessage(Message message) {
+ LOG.fine("Ignoring response to resent message.");
+ }
+ });
+
+ message.getExchange().setConduit(c);
+ return c;
+ }
+
+ /**
+ * @param endpoint
+ * @param cache
+ * @return
+ */
+ protected PhaseInterceptorChain buildRetransmitChain(final Endpoint endpoint, PhaseChainCache cache) {
+ PhaseInterceptorChain retransmitChain;
+ Bus bus = getManager().getBus();
+ List<Interceptor<? extends Message>> i1 = bus.getOutInterceptors();
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.fine("Interceptors contributed by bus: " + i1);
+ }
+ List<Interceptor<? extends Message>> i2 = endpoint.getOutInterceptors();
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.fine("Interceptors contributed by endpoint: " + i2);
+ }
+ List<Interceptor<? extends Message>> i3 = endpoint.getBinding().getOutInterceptors();
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.fine("Interceptors contributed by binding: " + i3);
+ }
+ PhaseManager pm = bus.getExtension(PhaseManager.class);
+ retransmitChain = cache.get(pm.getOutPhases(), i1, i2, i3);
+ return retransmitChain;
+ }
+
+ public static class CopyOutInterceptor extends AbstractOutDatabindingInterceptor {
+ private final XMLStreamReader reader;
+
+ public CopyOutInterceptor(XMLStreamReader rdr) {
+ super(Phase.MARSHAL);
+ reader = rdr;
+ }
+
+ @Override
+ public void handleMessage(Message message) throws Fault {
+ try {
+ XMLStreamWriter writer = message.getContent(XMLStreamWriter.class);
+ StaxUtils.copy(reader, writer);
+ } catch (XMLStreamException e) {
+ throw new Fault("COULD_NOT_READ_XML_STREAM", LOG, e);
+ }
+ }
+ }
+}
\ No newline at end of file