You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by an...@apache.org on 2006/10/12 17:34:08 UTC
svn commit: r463282 [1/2] - in /incubator/cxf/trunk:
common/common/src/main/java/org/apache/cxf/jaxb/
rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/
rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/
rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence...
Author: andreasmyth
Date: Thu Oct 12 08:34:05 2006
New Revision: 463282
URL: http://svn.apache.org/viewvc?view=rev&rev=463282
Log:
[JIRA CXF-138] Abstractions for reliable messaging endpoints and sequences.
Added:
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java (with props)
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMContextUtils.java
- copied, changed from r462824, incubator/cxf/tags/celtix/pre_apache/rt/ws/rm/src/main/java/org/objectweb/celtix/ws/rm/RMContextUtils.java
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMMessageConstants.java
- copied, changed from r462804, incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/JAXWSRMConstants.java
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionQueue.java
- copied, changed from r454719, incubator/cxf/tags/celtix/pre_apache/rt/ws/rm/src/main/java/org/objectweb/celtix/ws/rm/RetransmissionQueue.java
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/SequenceFault.java (with props)
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/SourceSequence.java (with props)
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/AbstractEndpoint.java (with props)
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/Destination.java
- copied, changed from r454703, incubator/cxf/tags/celtix/pre_apache/rt/ws/rm/src/main/java/org/objectweb/celtix/ws/rm/RMDestination.java
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/DestinationSequenceImpl.java
- copied, changed from r454697, incubator/cxf/tags/celtix/pre_apache/rt/ws/rm/src/main/java/org/objectweb/celtix/ws/rm/DestinationSequence.java
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/Proxy.java (with props)
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/RMPropertiesImpl.java
- copied, changed from r462820, incubator/cxf/tags/celtix/pre_apache/rt/ws/rm/src/main/java/org/objectweb/celtix/ws/rm/RMPropertiesImpl.java
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/SequenceFaultFactory.java (with props)
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/Source.java
- copied, changed from r454703, incubator/cxf/tags/celtix/pre_apache/rt/ws/rm/src/main/java/org/objectweb/celtix/ws/rm/RMSource.java
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/SourceSequenceImpl.java
- copied, changed from r454697, incubator/cxf/tags/celtix/pre_apache/rt/ws/rm/src/main/java/org/objectweb/celtix/ws/rm/SourceSequence.java
incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/impl/DestinationSequenceImplTest.java
- copied, changed from r462829, incubator/cxf/tags/celtix/pre_apache/rt/ws/rm/src/test/java/org/objectweb/celtix/ws/rm/DestinationSequenceTest.java
incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/impl/SourceSequenceImplTest.java
- copied, changed from r462829, incubator/cxf/tags/celtix/pre_apache/rt/ws/rm/src/test/java/org/objectweb/celtix/ws/rm/SourceSequenceTest.java
Removed:
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/JAXWSRMConstants.java
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/RMEndpoint.java
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMDestinationSequence.java
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMSourceSequence.java
Modified:
incubator/cxf/trunk/common/common/src/main/java/org/apache/cxf/jaxb/DatatypeFactory.java
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/AbstractSequenceImpl.java
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/ContextUtils.java
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/RMInterceptor.java
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMStore.java
Modified: incubator/cxf/trunk/common/common/src/main/java/org/apache/cxf/jaxb/DatatypeFactory.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/common/common/src/main/java/org/apache/cxf/jaxb/DatatypeFactory.java?view=diff&rev=463282&r1=463281&r2=463282
==============================================================================
--- incubator/cxf/trunk/common/common/src/main/java/org/apache/cxf/jaxb/DatatypeFactory.java (original)
+++ incubator/cxf/trunk/common/common/src/main/java/org/apache/cxf/jaxb/DatatypeFactory.java Thu Oct 12 08:34:05 2006
@@ -33,8 +33,14 @@
*/
public final class DatatypeFactory {
+ public static final Duration PT0S;
private static final Logger LOG = LogUtils.getL7dLogger(DatatypeFactory.class);
-
+
+
+ static {
+ PT0S = createDuration("PT0S");
+ }
+
/**
* prevents instantiation
*
Added: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java?view=auto&rev=463282
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java (added)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java Thu Oct 12 08:34:05 2006
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.rm;
+
+import java.io.InputStream;
+import java.math.BigInteger;
+
+import org.apache.cxf.ws.addressing.v200408.EndpointReferenceType;
+
+public interface DestinationSequence {
+
+ /**
+ * @return the sequence identifier
+ */
+ Identifier getIdentifier();
+
+ /**
+ * @return the acksTo address for the sequence
+ */
+ EndpointReferenceType getAcksTo();
+
+ /**
+ * @return the message number of the last message or null if the last message had not been received.
+ */
+ BigInteger getLastMessageNumber();
+
+ /**
+ * @return the sequence acknowledgement presenting the sequences thus far received by a destination
+ */
+ SequenceAcknowledgement getAcknowledgment();
+
+ /**
+ * @return the sequence acknowledgement presenting the sequences thus far received by a destination
+ * as an input stream
+ */
+ InputStream getAcknowledgmentAsStream();
+
+ /**
+ * @return the identifier of the rm destination
+ */
+ String getEndpointIdentifier();
+
+}
Propchange: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Copied: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMContextUtils.java (from r462824, incubator/cxf/tags/celtix/pre_apache/rt/ws/rm/src/main/java/org/objectweb/celtix/ws/rm/RMContextUtils.java)
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMContextUtils.java?view=diff&rev=463282&p1=incubator/cxf/tags/celtix/pre_apache/rt/ws/rm/src/main/java/org/objectweb/celtix/ws/rm/RMContextUtils.java&r1=462824&p2=incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMContextUtils.java&r2=463282
==============================================================================
--- incubator/cxf/tags/celtix/pre_apache/rt/ws/rm/src/main/java/org/objectweb/celtix/ws/rm/RMContextUtils.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMContextUtils.java Thu Oct 12 08:34:05 2006
@@ -1,6 +1,25 @@
-package org.objectweb.celtix.ws.rm;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.rm;
-import javax.xml.ws.handler.MessageContext;
+import org.apache.cxf.message.Message;
/**
* Holder for utility methods relating to contexts.
@@ -11,18 +30,18 @@
protected RMContextUtils() {
}
- public static RMProperties retrieveRMProperties(MessageContext context, boolean outbound) {
- return (RMProperties)context.get(getRMPropertiesKey(outbound));
+ public static RMProperties retrieveRMProperties(Message message, boolean outbound) {
+ return (RMProperties)message.get(getRMPropertiesKey(outbound));
}
- public static void storeRMProperties(MessageContext context, RMProperties rmps, boolean outbound) {
+ public static void storeRMProperties(Message message, RMProperties rmps, boolean outbound) {
String key = getRMPropertiesKey(outbound);
- context.put(key, rmps);
- context.setScope(key, MessageContext.Scope.HANDLER);
+ message.put(key, rmps);
}
private static String getRMPropertiesKey(boolean outbound) {
- return outbound ? JAXWSRMConstants.RM_PROPERTIES_OUTBOUND : JAXWSRMConstants.RM_PROPERTIES_INBOUND;
+ return outbound ? RMMessageConstants.RM_PROPERTIES_OUTBOUND
+ : RMMessageConstants.RM_PROPERTIES_INBOUND;
}
}
Copied: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMMessageConstants.java (from r462804, incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/JAXWSRMConstants.java)
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMMessageConstants.java?view=diff&rev=463282&p1=incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/JAXWSRMConstants.java&r1=462804&p2=incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMMessageConstants.java&r2=463282
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/JAXWSRMConstants.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMMessageConstants.java Thu Oct 12 08:34:05 2006
@@ -20,25 +20,25 @@
package org.apache.cxf.ws.rm;
/**
- * A container for WS-RM constants.
+ * A container for WS-RM message constants.
*/
-public final class JAXWSRMConstants {
+public final class RMMessageConstants {
/**
- * Used to cache outbound RM properties in context.
+ * Used to cache outbound RM properties in message.
*/
public static final String RM_PROPERTIES_OUTBOUND =
- "org.objectweb.celtix.ws.rm.context.outbound";
+ "org.apache.cxf.ws.rm.outbound";
/**
- * Used to cache inbound RM properties in context.
+ * Used to cache inbound RM properties in message.
*/
public static final String RM_PROPERTIES_INBOUND =
- "org.objectweb.celtix.ws.rm.context.inbound";
+ "org.apache.cxf.ws.rm.inbound";
/**
* Prevents instantiation.
*/
- private JAXWSRMConstants() {
+ private RMMessageConstants() {
}
}
Copied: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionQueue.java (from r454719, incubator/cxf/tags/celtix/pre_apache/rt/ws/rm/src/main/java/org/objectweb/celtix/ws/rm/RetransmissionQueue.java)
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionQueue.java?view=diff&rev=463282&p1=incubator/cxf/tags/celtix/pre_apache/rt/ws/rm/src/main/java/org/objectweb/celtix/ws/rm/RetransmissionQueue.java&r1=454719&p2=incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionQueue.java&r2=463282
==============================================================================
--- incubator/cxf/tags/celtix/pre_apache/rt/ws/rm/src/main/java/org/objectweb/celtix/ws/rm/RetransmissionQueue.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionQueue.java Thu Oct 12 08:34:05 2006
@@ -1,9 +1,27 @@
-package org.objectweb.celtix.ws.rm;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.rm;
import java.util.Collection;
-import org.objectweb.celtix.context.ObjectMessageContext;
-import org.objectweb.celtix.workqueue.WorkQueue;
+import org.apache.cxf.message.Message;
public interface RetransmissionQueue {
@@ -27,7 +45,7 @@
*
* @param ctx the message context.
*/
- void addUnacknowledged(ObjectMessageContext context);
+ void addUnacknowledged(Message message);
/**
* Purge all candidates for the given sequence that have been acknowledged.
@@ -38,10 +56,9 @@
/**
* Initiate resends.
- *
- * @param queue the work queue providing async execution
+ *
*/
- void start(WorkQueue wq);
+ void start();
/**
* Stops retransmission queue.
Added: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/SequenceFault.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/SequenceFault.java?view=auto&rev=463282
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/SequenceFault.java (added)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/SequenceFault.java Thu Oct 12 08:34:05 2006
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.rm;
+
+/**
+ *
+ */
+
+public class SequenceFault extends Exception {
+
+ private SequenceFaultType sequenceFault;
+
+ public SequenceFault(String message) {
+ super(message);
+ }
+
+ public SequenceFault(String message, SequenceFaultType sequenceFault) {
+ super(message);
+ this.sequenceFault = sequenceFault;
+ }
+
+ public SequenceFault(String message, SequenceFaultType sequenceFault, Throwable cause) {
+ super(message, cause);
+ this.sequenceFault = sequenceFault;
+ }
+
+ public SequenceFaultType getFaultInfo() {
+ return this.sequenceFault;
+ }
+}
Propchange: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/SequenceFault.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/SequenceFault.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/SourceSequence.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/SourceSequence.java?view=auto&rev=463282
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/SourceSequence.java (added)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/SourceSequence.java Thu Oct 12 08:34:05 2006
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.rm;
+
+import java.math.BigInteger;
+import java.util.Date;
+
+public interface SourceSequence {
+
+ /**
+ * @return the sequence identifier
+ */
+ Identifier getIdentifier();
+
+ /**
+ * @return the message number assigned to the most recent outgoing application message.
+ */
+ BigInteger getCurrentMessageNr();
+
+ /**
+ * @return true if the last message had been sent for this sequence.
+ */
+ boolean isLastMessage();
+
+ /**
+ * @return the identifier of the sequence that was created on behalf of the CreateSequence request
+ * that included this sequence as an offer
+ */
+ Identifier getOfferingSequenceIdentifier();
+
+ /**
+ * @return the identifier of the rm source
+ */
+ String getEndpointIdentifier();
+
+ /**
+ * @return the expiry data of this sequence
+ */
+ Date getExpiry();
+
+ /**
+ * Returns true if a last message had been sent for this sequence and if all
+ * messages for this sequence have been acknowledged.
+ *
+ * @return true if all messages have been acknowledged.
+ */
+ // boolean allAcknowledged();
+
+ /**
+ * Used by the RM source to cache received acknowledgements for this
+ * sequence.
+ *
+ * @param acknowledgement an acknowledgement for this sequence
+ */
+ // void setAcknowledged(SequenceAcknowledgement acknowledgment);
+}
Propchange: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/SourceSequence.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/SourceSequence.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/AbstractEndpoint.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/AbstractEndpoint.java?view=auto&rev=463282
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/AbstractEndpoint.java (added)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/AbstractEndpoint.java Thu Oct 12 08:34:05 2006
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.rm.impl;
+
+import org.apache.cxf.endpoint.Endpoint;
+import org.apache.cxf.ws.rm.Identifier;
+
+public class AbstractEndpoint {
+
+ private final RMInterceptor interceptor;
+ private final Endpoint endpoint;
+
+ protected AbstractEndpoint(RMInterceptor h, Endpoint e) {
+ interceptor = h;
+ endpoint = e;
+ }
+
+ /**
+ * @return Returns the interceptor.
+ */
+ public RMInterceptor getInterceptor() {
+ return interceptor;
+ }
+
+ /**
+ * @return Returns the endpoint.
+ */
+ public Endpoint getEndpoint() {
+ return endpoint;
+ }
+
+ /**
+ * Generates and returns a new sequence identifier.
+ *
+ * @return the sequence identifier.
+ */
+ public Identifier generateSequenceIdentifier() {
+ String sequenceID = ContextUtils.generateUUID();
+ Identifier sid = RMUtils.getWSRMFactory().createIdentifier();
+ sid.setValue(sequenceID);
+ return sid;
+ }
+}
Propchange: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/AbstractEndpoint.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/AbstractEndpoint.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/AbstractSequenceImpl.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/AbstractSequenceImpl.java?view=diff&rev=463282&r1=463281&r2=463282
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/AbstractSequenceImpl.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/AbstractSequenceImpl.java Thu Oct 12 08:34:05 2006
@@ -29,7 +29,7 @@
public abstract class AbstractSequenceImpl {
protected final Identifier id;
- protected SequenceAcknowledgement acked;
+ protected SequenceAcknowledgement acknowledgement;
protected AbstractSequenceImpl(Identifier i) {
id = i;
@@ -70,7 +70,7 @@
}
public synchronized boolean isAcknowledged(BigInteger m) {
- for (AcknowledgementRange r : acked.getAcknowledgementRange()) {
+ for (AcknowledgementRange r : acknowledgement.getAcknowledgementRange()) {
if (m.subtract(r.getLower()).signum() >= 0 && r.getUpper().subtract(m).signum() >= 0) {
return true;
}
Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/ContextUtils.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/ContextUtils.java?view=diff&rev=463282&r1=463281&r2=463282
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/ContextUtils.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/ContextUtils.java Thu Oct 12 08:34:05 2006
@@ -19,10 +19,12 @@
package org.apache.cxf.ws.rm.impl;
-import java.util.UUID;
-import org.apache.cxf.message.Exchange;
+import org.apache.cxf.endpoint.Endpoint;
import org.apache.cxf.message.Message;
+import org.apache.cxf.ws.addressing.AddressingProperties;
+import org.apache.cxf.ws.addressing.AddressingPropertiesImpl;
+import org.apache.cxf.ws.addressing.VersionTransformer;
/**
* Holder for utility methods relating to contexts.
@@ -30,11 +32,6 @@
public final class ContextUtils {
/**
- * Used to fabricate a Uniform Resource Name from a UUID string
- */
- private static final String URN_UUID = "urn:uuid:";
-
- /**
* Prevents instantiation.
*/
private ContextUtils() {
@@ -44,7 +41,7 @@
* @return a generated UUID
*/
static String generateUUID() {
- return URN_UUID + UUID.randomUUID();
+ return org.apache.cxf.ws.addressing.ContextUtils.generateUUID();
}
/**
@@ -54,7 +51,62 @@
* @return true iff the message direction is outbound
*/
static boolean isOutbound(Message message) {
- Exchange exchange = message.getExchange();
- return message != null && exchange != null && message == exchange.getOutMessage();
+ return org.apache.cxf.ws.addressing.ContextUtils.isOutbound(message);
+ }
+
+ /**
+ * Determine if current messaging role is that of requestor.
+ *
+ * @param message the current Message
+ * @return true iff the current messaging role is that of requestor
+ */
+ public static boolean isRequestor(Message message) {
+ return org.apache.cxf.ws.addressing.ContextUtils.isRequestor(message);
+ }
+
+ /**
+ * Retrieves the addressing properties from the current message.
+ *
+ * @param message the current message
+ * @param isProviderContext true if the binding provider request context
+ * available to the client application as opposed to the message context
+ * visible to handlers
+ * @param isOutbound true iff the message is outbound
+ * @return the current addressing properties
+ */
+ public static AddressingProperties retrieveMAPs(
+ Message message,
+ boolean isProviderContext,
+ boolean isOutbound) {
+ return org.apache.cxf.ws.addressing.ContextUtils
+ .retrieveMAPs(message, isProviderContext, isOutbound);
+ }
+
+ /**
+ * Ensures the appropriate version of WS-Addressing is used.
+ *
+ * @param maps the addressing properties
+ */
+ public static void ensureExposedVersion(AddressingProperties maps) {
+ ((AddressingPropertiesImpl)maps)
+ .exposeAs(VersionTransformer.Names200408.WSA_NAMESPACE_NAME);
+ }
+
+ /**
+ * Returns the endpoint of this message, i.e. the client endpoint
+ * if the current messaging role is that of requestor, or the server
+ * endpoint otherwise.
+ *
+ * @param message the current Message
+ * @return the endpoint
+ */
+ public static Endpoint getEndpoint(Message message) {
+ if (isRequestor(message)) {
+ return message.getExchange().get(Endpoint.class);
+ } else {
+ return null;
+ }
}
+
+
}
Copied: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/Destination.java (from r454703, incubator/cxf/tags/celtix/pre_apache/rt/ws/rm/src/main/java/org/objectweb/celtix/ws/rm/RMDestination.java)
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/Destination.java?view=diff&rev=463282&p1=incubator/cxf/tags/celtix/pre_apache/rt/ws/rm/src/main/java/org/objectweb/celtix/ws/rm/RMDestination.java&r1=454703&p2=incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/Destination.java&r2=463282
==============================================================================
--- incubator/cxf/tags/celtix/pre_apache/rt/ws/rm/src/main/java/org/objectweb/celtix/ws/rm/RMDestination.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/Destination.java Thu Oct 12 08:34:05 2006
@@ -1,4 +1,23 @@
-package org.objectweb.celtix.ws.rm;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.rm.impl;
import java.io.IOException;
import java.util.Collection;
@@ -6,36 +25,43 @@
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
-import org.objectweb.celtix.common.i18n.Message;
-import org.objectweb.celtix.common.logging.LogUtils;
-import org.objectweb.celtix.ws.rm.persistence.RMStore;
-import org.objectweb.celtix.ws.rm.wsdl.SequenceFault;
-public class RMDestination extends RMEndpoint {
+import org.apache.cxf.common.i18n.Message;
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.endpoint.Endpoint;
+import org.apache.cxf.helpers.CastUtils;
+import org.apache.cxf.ws.rm.DestinationSequence;
+import org.apache.cxf.ws.rm.Identifier;
+import org.apache.cxf.ws.rm.SequenceFault;
+import org.apache.cxf.ws.rm.SequenceType;
+import org.apache.cxf.ws.rm.persistence.RMStore;
+
- private static final Logger LOG = LogUtils.getL7dLogger(RMDestination.class);
+public class Destination extends AbstractEndpoint {
+
+ private static final Logger LOG = LogUtils.getL7dLogger(Destination.class);
- private Map<String, DestinationSequence> map;
+ private Map<String, DestinationSequenceImpl> map;
- RMDestination(RMHandler h) {
- super(h);
- map = new HashMap<String, DestinationSequence>();
+ Destination(RMInterceptor interceptor, Endpoint endpoint) {
+ super(interceptor, endpoint);
+ map = new HashMap<String, DestinationSequenceImpl>();
}
public DestinationSequence getSequence(Identifier id) {
return map.get(id.getValue());
}
- public void addSequence(DestinationSequence seq) {
+ public void addSequence(DestinationSequenceImpl seq) {
addSequence(seq, true);
}
- public void addSequence(DestinationSequence seq, boolean persist) {
- seq.setDestination(this);
+ public void addSequence(DestinationSequenceImpl seq, boolean persist) {
+ // seq.setDestination(this);
map.put(seq.getIdentifier().getValue(), seq);
if (persist) {
- RMStore store = getHandler().getStore();
+ RMStore store = getInterceptor().getStore();
if (null != store) {
store.createDestinationSequence(seq);
}
@@ -44,18 +70,16 @@
public void removeSequence(DestinationSequence seq) {
map.remove(seq.getIdentifier().getValue());
- RMStore store = getHandler().getStore();
+ RMStore store = getInterceptor().getStore();
if (null != store) {
store.removeDestinationSequence(seq.getIdentifier());
}
}
- public Collection<DestinationSequence> getAllSequences() {
- return map.values();
+ public Collection<DestinationSequence> getAllSequences() {
+ return CastUtils.cast(map.values());
}
-
-
/**
* Acknowledges receipt of a message. If the message is the last in the sequence,
* sends an out-of-band SequenceAcknowledgement unless there a response will be sent
@@ -68,7 +92,7 @@
*/
public void acknowledge(SequenceType sequenceType, String replyToAddress)
throws SequenceFault {
- DestinationSequence seq = getSequence(sequenceType.getIdentifier());
+ DestinationSequenceImpl seq = getSequenceImpl(sequenceType.getIdentifier());
if (null != seq) {
seq.acknowledge(sequenceType.getMessageNumber());
@@ -84,7 +108,7 @@
if (!(seq.getAcksTo().getAddress().getValue().equals(replyToAddress)
|| seq.canPiggybackAckOnPartialResponse())) {
try {
- getHandler().getProxy().acknowledge(seq);
+ getInterceptor().getProxy().acknowledge(seq);
} catch (IOException ex) {
Message msg = new Message("SEQ_ACK_SEND_EXC", LOG, seq);
LOG.log(Level.SEVERE, msg.toString(), ex);
@@ -92,7 +116,12 @@
}
}
} else {
- throw DestinationSequence.createUnknownSequenceFault(sequenceType.getIdentifier());
+ SequenceFaultFactory sff = new SequenceFaultFactory();
+ throw sff.createUnknownSequenceFault(sequenceType.getIdentifier());
}
+ }
+
+ DestinationSequenceImpl getSequenceImpl(Identifier sid) {
+ return map.get(sid.getValue());
}
}
Copied: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/DestinationSequenceImpl.java (from r454697, incubator/cxf/tags/celtix/pre_apache/rt/ws/rm/src/main/java/org/objectweb/celtix/ws/rm/DestinationSequence.java)
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/DestinationSequenceImpl.java?view=diff&rev=463282&p1=incubator/cxf/tags/celtix/pre_apache/rt/ws/rm/src/main/java/org/objectweb/celtix/ws/rm/DestinationSequence.java&r1=454697&p2=incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/DestinationSequenceImpl.java&r2=463282
==============================================================================
--- incubator/cxf/tags/celtix/pre_apache/rt/ws/rm/src/main/java/org/objectweb/celtix/ws/rm/DestinationSequence.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/DestinationSequenceImpl.java Thu Oct 12 08:34:05 2006
@@ -1,4 +1,23 @@
-package org.objectweb.celtix.ws.rm;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.rm.impl;
import java.io.IOException;
import java.io.InputStream;
@@ -10,22 +29,25 @@
import java.util.logging.Level;
import java.util.logging.Logger;
-import org.objectweb.celtix.bus.configuration.wsrm.AcksPolicyType;
-import org.objectweb.celtix.bus.configuration.wsrm.DeliveryAssuranceType;
-import org.objectweb.celtix.common.i18n.Message;
-import org.objectweb.celtix.common.logging.LogUtils;
-import org.objectweb.celtix.ws.addressing.v200408.EndpointReferenceType;
-import org.objectweb.celtix.ws.rm.SequenceAcknowledgement.AcknowledgementRange;
-import org.objectweb.celtix.ws.rm.persistence.RMDestinationSequence;
-import org.objectweb.celtix.ws.rm.persistence.RMStore;
-import org.objectweb.celtix.ws.rm.policy.RMAssertionType;
-import org.objectweb.celtix.ws.rm.wsdl.SequenceFault;
+import org.apache.cxf.common.i18n.Message;
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.ws.addressing.v200408.EndpointReferenceType;
+import org.apache.cxf.ws.rm.DestinationSequence;
+import org.apache.cxf.ws.rm.Identifier;
+import org.apache.cxf.ws.rm.SequenceAcknowledgement;
+import org.apache.cxf.ws.rm.SequenceAcknowledgement.AcknowledgementRange;
+import org.apache.cxf.ws.rm.SequenceFault;
+import org.apache.cxf.ws.rm.SequenceFaultType;
+import org.apache.cxf.ws.rm.interceptor.AcksPolicyType;
+import org.apache.cxf.ws.rm.interceptor.DeliveryAssuranceType;
+import org.apache.cxf.ws.rm.persistence.RMStore;
+import org.apache.cxf.ws.rm.policy.RMAssertion;
-public class DestinationSequence extends AbstractSequenceImpl implements RMDestinationSequence {
+public class DestinationSequenceImpl extends AbstractSequenceImpl implements DestinationSequence {
- private static final Logger LOG = LogUtils.getL7dLogger(DestinationSequence.class);
+ private static final Logger LOG = LogUtils.getL7dLogger(DestinationSequenceImpl.class);
- private RMDestination destination;
+ private Destination destination;
private EndpointReferenceType acksTo;
private BigInteger lastMessageNumber;
private SequenceMonitor monitor;
@@ -33,20 +55,20 @@
private List<DeferredAcknowledgment> deferredAcknowledgments;
private String correlationID;
- public DestinationSequence(Identifier i, EndpointReferenceType a, RMDestination d) {
+ public DestinationSequenceImpl(Identifier i, EndpointReferenceType a, Destination d) {
this(i, a, null, null);
setDestination(d);
}
- public DestinationSequence(Identifier i, EndpointReferenceType a,
+ public DestinationSequenceImpl(Identifier i, EndpointReferenceType a,
BigInteger lmn, SequenceAcknowledgement ac) {
super(i);
acksTo = a;
lastMessageNumber = lmn;
- acked = ac;
- if (null == acked) {
- acked = RMUtils.getWSRMFactory().createSequenceAcknowledgement();
- acked.setIdentifier(id);
+ acknowledgement = ac;
+ if (null == acknowledgement) {
+ acknowledgement = RMUtils.getWSRMFactory().createSequenceAcknowledgement();
+ acknowledgement.setIdentifier(id);
}
monitor = new SequenceMonitor();
}
@@ -54,79 +76,10 @@
// RMDestinationSequence interface
-
- /**
- * @return the acksTo address for the sequence
+ /* (non-Javadoc)
+ * @see org.apache.cxf.ws.rm.DestinationSequence#acknowledge(java.math.BigInteger)
*/
- public EndpointReferenceType getAcksTo() {
- return acksTo;
- }
-
- /**
- * @return the message number of the last message or null if the last message had not been received.
- */
- public BigInteger getLastMessageNr() {
- return lastMessageNumber;
- }
-
- /**
- * @return the sequence acknowledgement presenting the sequences thus far received by a destination
- */
- public SequenceAcknowledgement getAcknowledgment() {
- return acked;
- }
-
- /**
- * @return the sequence acknowledgement presenting the sequences thus far received by a destination
- * as an input stream
- */
- public InputStream getAcknowledgmentAsStream() {
- return RMUtils.getPersistenceUtils().getAcknowledgementAsInputStream(acked);
- }
-
- /**
- * @return the identifier of the rm destination
- */
- public String getEndpointIdentifier() {
- if (null != destination) {
- return destination.getHandler().getConfigurationHelper().getEndpointId();
- }
- return null;
- }
-
- // end RMDestinationSequence interface
-
- final void setDestination(RMDestination d) {
- destination = d;
- }
-
- RMDestination getDestination() {
- return destination;
- }
-
- void setLastMessageNumber(BigInteger lmn) {
- lastMessageNumber = lmn;
- }
-
- /**
- * Returns the monitor for this sequence.
- *
- * @return the sequence monitor.
- */
- SequenceMonitor getMonitor() {
- return monitor;
- }
-
-
- /**
- * Called by the RM destination upon receipt of a message with the given
- * message number for this sequence.
- *
- * @param messageNumber the number of the received message
- * @param lastMessage true if this is to be the last message in the sequence
- */
- void acknowledge(BigInteger messageNumber) throws SequenceFault {
-
+ public void acknowledge(BigInteger messageNumber) throws SequenceFault {
if (null != lastMessageNumber && messageNumber.compareTo(lastMessageNumber) > 0) {
SequenceFaultType sf = RMUtils.getWSRMFactory().createSequenceFaultType();
sf.setFaultCode(RMUtils.getRMConstants().getLastMessageNumberExceededFaultCode());
@@ -139,8 +92,8 @@
synchronized (this) {
boolean done = false;
int i = 0;
- for (; i < acked.getAcknowledgementRange().size(); i++) {
- AcknowledgementRange r = acked.getAcknowledgementRange().get(i);
+ for (; i < acknowledgement.getAcknowledgementRange().size(); i++) {
+ AcknowledgementRange r = acknowledgement.getAcknowledgementRange().get(i);
if (r.getLower().compareTo(messageNumber) <= 0
&& r.getUpper().compareTo(messageNumber) >= 0) {
done = true;
@@ -166,7 +119,7 @@
.createSequenceAcknowledgementAcknowledgementRange();
range.setLower(messageNumber);
range.setUpper(messageNumber);
- acked.getAcknowledgementRange().add(i, range);
+ acknowledgement.getAcknowledgementRange().add(i, range);
}
notifyAll();
@@ -175,6 +128,78 @@
purgeAcknowledged(messageNumber);
scheduleAcknowledgement();
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.cxf.ws.rm.DestinationSequence#getAcknowledgment()
+ */
+ public SequenceAcknowledgement getAcknowledgment() {
+ return acknowledgement;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.cxf.ws.rm.DestinationSequence#getAcknowledgmentAsStream()
+ */
+ public InputStream getAcknowledgmentAsStream() {
+ // return RMUtils.getPersistenceUtils().getAcknowledgementAsInputStream(acked);
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.cxf.ws.rm.DestinationSequence#getAcksTo()
+ */
+ public EndpointReferenceType getAcksTo() {
+ return acksTo;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.cxf.ws.rm.DestinationSequence#getEndpointIdentifier()
+ */
+ public String getEndpointIdentifier() {
+ // TODO
+ /*
+ if (null != destination) {
+ return destination.getHandler().getConfigurationHelper().getEndpointId();
+ }
+ */
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.cxf.ws.rm.DestinationSequence#getLastMessageNr()
+ */
+ public BigInteger getLastMessageNumber() {
+ return lastMessageNumber;
+ }
+
+ // end RMDestinationSequence interface
+
+ void setLastMessageNumber(BigInteger lmn) {
+ lastMessageNumber = lmn;
+ }
+
+ boolean canPiggybackAckOnPartialResponse() {
+ // TODO: should also check if we allow breaking the WI Profile rule by which no headers
+ // can be included in a HTTP response
+ return getAcksTo().getAddress().getValue().equals(Names.WSA_ANONYMOUS_ADDRESS);
+ }
+
+ final void setDestination(Destination d) {
+ destination = d;
+ }
+
+ Destination getDestination() {
+ return destination;
+ }
+
+ /**
+ * Returns the monitor for this sequence.
+ *
+ * @return the sequence monitor.
+ */
+ SequenceMonitor getMonitor() {
+ return monitor;
}
/**
@@ -188,7 +213,7 @@
* @param s the SequenceType object including identifier and message number
*/
boolean applyDeliveryAssurance(BigInteger mn) {
- DeliveryAssuranceType da = destination.getHandler().getConfigurationHelper().getDeliveryAssurance();
+ DeliveryAssuranceType da = destination.getInterceptor().getDeliveryAssurance();
if (da.isSetAtMostOnce() && isAcknowledged(mn)) {
Message msg = new Message("MESSAGE_ALREADY_DELIVERED", LOG, mn, getIdentifier().getValue());
LOG.log(Level.SEVERE, msg.toString());
@@ -211,13 +236,13 @@
}
synchronized boolean allPredecessorsAcknowledged(BigInteger mn) {
- return acked.getAcknowledgementRange().size() == 1
- && acked.getAcknowledgementRange().get(0).getLower().equals(BigInteger.ONE)
- && acked.getAcknowledgementRange().get(0).getUpper().subtract(mn).signum() >= 0;
+ return acknowledgement.getAcknowledgementRange().size() == 1
+ && acknowledgement.getAcknowledgementRange().get(0).getLower().equals(BigInteger.ONE)
+ && acknowledgement.getAcknowledgementRange().get(0).getUpper().subtract(mn).signum() >= 0;
}
void purgeAcknowledged(BigInteger messageNr) {
- RMStore store = destination.getHandler().getStore();
+ RMStore store = destination.getInterceptor().getStore();
if (null == store) {
return;
}
@@ -250,26 +275,14 @@
return correlationID;
}
- boolean canPiggybackAckOnPartialResponse() {
- // TODO: should also check if we allow breaking the WI Profile rule by which no headers
- // can be included in a HTTP response
- return getAcksTo().getAddress().getValue().equals(Names.WSA_ANONYMOUS_ADDRESS);
- }
-
- static SequenceFault createUnknownSequenceFault(Identifier sid) {
- SequenceFaultType sf = RMUtils.getWSRMFactory().createSequenceFaultType();
- sf.setFaultCode(RMUtils.getRMConstants().getUnknownSequenceFaultCode());
- Message msg = new Message("UNKNOWN_SEQUENCE_EXC", LOG, sid.getValue());
- return new SequenceFault(msg.toString(), sf);
- }
-
private void scheduleAcknowledgement() {
- RMAssertionType rma = destination.getHandler().getConfigurationHelper().getRMAssertion();
+ RMAssertion rma = destination.getInterceptor().getRMAssertion();
int delay = 0;
if (null != rma.getAcknowledgementInterval()) {
delay = rma.getAcknowledgementInterval().getMilliseconds().intValue();
}
- AcksPolicyType ap = destination.getHandler().getConfigurationHelper().getAcksPolicy();
+ AcksPolicyType ap = destination.getInterceptor().getDestinationPolicy().getAcksPolicy();
+
if (delay > 0 && getMonitor().getMPM() >= ap.getIntraMessageThreshold()) {
scheduleDeferredAcknowledgement(delay);
} else {
@@ -282,7 +295,7 @@
acknowledgeOnNextOccasion = true;
}
- private synchronized void scheduleDeferredAcknowledgement(int delay) {
+ synchronized void scheduleDeferredAcknowledgement(int delay) {
if (null == deferredAcknowledgments) {
deferredAcknowledgments = new ArrayList<DeferredAcknowledgment>();
}
@@ -295,17 +308,17 @@
}
DeferredAcknowledgment da = new DeferredAcknowledgment();
deferredAcknowledgments.add(da);
- destination.getHandler().getTimer().schedule(da, delay);
+ destination.getInterceptor().getTimer().schedule(da, delay);
}
final class DeferredAcknowledgment extends TimerTask {
public void run() {
- DestinationSequence.this.scheduleImmediateAcknowledgement();
+ DestinationSequenceImpl.this.scheduleImmediateAcknowledgement();
try {
- destination.getHandler().getProxy().acknowledge(DestinationSequence.this);
+ destination.getInterceptor().getProxy().acknowledge(DestinationSequenceImpl.this);
} catch (IOException ex) {
- Message msg = new Message("SEQ_ACK_SEND_EXC", LOG, DestinationSequence.this);
+ Message msg = new Message("SEQ_ACK_SEND_EXC", LOG, DestinationSequenceImpl.this);
LOG.log(Level.SEVERE, msg.toString(), ex);
}
}
Added: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/Proxy.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/Proxy.java?view=auto&rev=463282
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/Proxy.java (added)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/Proxy.java Thu Oct 12 08:34:05 2006
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.rm.impl;
+
+import java.io.IOException;
+import org.apache.cxf.ws.rm.DestinationSequence;
+
+/**
+ *
+ */
+public interface Proxy {
+
+ void acknowledge(DestinationSequence ds) throws IOException;
+}
Propchange: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/Proxy.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/Proxy.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/RMInterceptor.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/RMInterceptor.java?view=diff&rev=463282&r1=463281&r2=463282
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/RMInterceptor.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/RMInterceptor.java Thu Oct 12 08:34:05 2006
@@ -21,18 +21,26 @@
import java.math.BigInteger;
import java.util.Collections;
+import java.util.Map;
import java.util.Set;
+import java.util.Timer;
+import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.PostConstruct;
import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.endpoint.Endpoint;
import org.apache.cxf.helpers.CastUtils;
import org.apache.cxf.interceptor.Fault;
import org.apache.cxf.message.Message;
import org.apache.cxf.phase.Phase;
import org.apache.cxf.phase.PhaseInterceptor;
+import org.apache.cxf.ws.addressing.AddressingProperties;
import org.apache.cxf.ws.addressing.MAPAggregator;
+import org.apache.cxf.ws.rm.Identifier;
+import org.apache.cxf.ws.rm.RMContextUtils;
+import org.apache.cxf.ws.rm.RetransmissionQueue;
import org.apache.cxf.ws.rm.interceptor.DeliveryAssuranceType;
import org.apache.cxf.ws.rm.interceptor.DestinationPolicyType;
import org.apache.cxf.ws.rm.interceptor.RMInterceptorConfigBean;
@@ -53,7 +61,12 @@
private static final Logger LOG = LogUtils.getL7dLogger(RMInterceptor.class);
private RMStore store;
+ private RetransmissionQueue retransmissionQueue;
+ private Timer timer;
+ private Proxy proxy;
private Set<String> after = Collections.singleton(MAPAggregator.class.getName());
+ private Map<Endpoint, Source> sources;
+ private Map<Endpoint, Destination> destinations;
public RMStore getStore() {
return store;
@@ -63,8 +76,25 @@
this.store = store;
}
- // PhaseInterceptor interface
+ public RetransmissionQueue getRetransmissionQueue() {
+ return retransmissionQueue;
+ }
+
+ public void setRetransmissionQueue(RetransmissionQueue retransmissionQueue) {
+ this.retransmissionQueue = retransmissionQueue;
+ }
+ public Timer getTimer() {
+ return timer;
+ }
+
+ public Proxy getProxy() {
+ return proxy;
+ }
+
+
+ // PhaseInterceptor interface
+
public Set<String> getAfter() {
return after;
}
@@ -81,7 +111,6 @@
return Phase.PRE_LOGICAL;
}
-
public void handleMessage(Message msg) throws Fault {
if (ContextUtils.isOutbound(msg)) {
handleOutbound(msg, false);
@@ -98,8 +127,53 @@
}
}
+ // rm logic
+
void handleOutbound(Message message, boolean isFault) {
LOG.entering(getClass().getName(), "handleOutbound");
+
+ AddressingProperties maps =
+ ContextUtils.retrieveMAPs(message, false, true);
+ ContextUtils.ensureExposedVersion(maps);
+
+ String action = null;
+ if (maps != null && null != maps.getAction()) {
+ action = maps.getAction().getValue();
+ }
+
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.fine("Action: " + action);
+ }
+
+ boolean isApplicationMessage = isAplicationMessage(action);
+
+ RMPropertiesImpl rmpsOut = (RMPropertiesImpl)RMContextUtils.retrieveRMProperties(message, true);
+ if (null == rmpsOut) {
+ rmpsOut = new RMPropertiesImpl();
+ RMContextUtils.storeRMProperties(message, rmpsOut, true);
+ }
+
+ RMPropertiesImpl rmpsIn = null;
+ Identifier inSeqId = null;
+ BigInteger inMessageNumber = null;
+
+ if (isApplicationMessage) {
+
+ rmpsIn = (RMPropertiesImpl)RMContextUtils.retrieveRMProperties(message, false);
+
+ if (null != rmpsIn && null != rmpsIn.getSequence()) {
+ inSeqId = rmpsIn.getSequence().getIdentifier();
+ inMessageNumber = rmpsIn.getSequence().getMessageNumber();
+ }
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.fine("inbound sequence: " + (null == inSeqId ? "null" : inSeqId.getValue()));
+ }
+ }
+
+ if (1 < 0) {
+ System.out.println(inMessageNumber);
+ }
+
}
void handleInbound(Message message, boolean isFault) {
@@ -135,6 +209,44 @@
dp.setAcksPolicy(factory.createAcksPolicyType());
setDestinationPolicy(dp);
}
+ }
+
+
+ synchronized Source getSource(Message message) {
+ Endpoint endpoint = ContextUtils.getEndpoint(message);
+ Source source = sources.get(endpoint);
+ if (null == source) {
+ source = new Source(this, endpoint);
+ sources.put(endpoint, source);
+ }
+ return source;
+ }
+
+ synchronized Destination getDestination(Message message) {
+ Endpoint endpoint = ContextUtils.getEndpoint(message);
+ Destination destination = destinations.get(endpoint);
+ if (null == destination) {
+ destination = new Destination(this, endpoint);
+ destinations.put(endpoint, destination);
+ }
+ return destination;
+ }
+
+ synchronized Destination getDestination(Source source) {
+ return destinations.get(source.getEndpoint());
+ }
+
+
+ boolean isAplicationMessage(String action) {
+ if (RMUtils.getRMConstants().getCreateSequenceAction().equals(action)
+ || RMUtils.getRMConstants().getCreateSequenceResponseAction().equals(action)
+ || RMUtils.getRMConstants().getTerminateSequenceAction().equals(action)
+ || RMUtils.getRMConstants().getLastMessageAction().equals(action)
+ || RMUtils.getRMConstants().getSequenceAcknowledgmentAction().equals(action)
+ || RMUtils.getRMConstants().getSequenceInfoAction().equals(action)) {
+ return false;
+ }
+ return true;
}
Copied: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/RMPropertiesImpl.java (from r462820, incubator/cxf/tags/celtix/pre_apache/rt/ws/rm/src/main/java/org/objectweb/celtix/ws/rm/RMPropertiesImpl.java)
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/RMPropertiesImpl.java?view=diff&rev=463282&p1=incubator/cxf/tags/celtix/pre_apache/rt/ws/rm/src/main/java/org/objectweb/celtix/ws/rm/RMPropertiesImpl.java&r1=462820&p2=incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/RMPropertiesImpl.java&r2=463282
==============================================================================
--- incubator/cxf/tags/celtix/pre_apache/rt/ws/rm/src/main/java/org/objectweb/celtix/ws/rm/RMPropertiesImpl.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/RMPropertiesImpl.java Thu Oct 12 08:34:05 2006
@@ -1,8 +1,34 @@
-package org.objectweb.celtix.ws.rm;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.rm.impl;
import java.util.ArrayList;
import java.util.Collection;
+import org.apache.cxf.ws.rm.AckRequestedType;
+import org.apache.cxf.ws.rm.DestinationSequence;
+import org.apache.cxf.ws.rm.RMProperties;
+import org.apache.cxf.ws.rm.SequenceAcknowledgement;
+import org.apache.cxf.ws.rm.SequenceType;
+import org.apache.cxf.ws.rm.SourceSequence;
+
public class RMPropertiesImpl implements RMProperties {
private SequenceType sequence;
private Collection<SequenceAcknowledgement> acks;
@@ -48,7 +74,8 @@
}
SequenceAcknowledgement ack = seq.getAcknowledgment();
acks.add(ack);
- seq.acknowledgmentSent();
+ // TODO: move to caller
+ // seq.acknowledgmentSent();
}
}
Added: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/SequenceFaultFactory.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/SequenceFaultFactory.java?view=auto&rev=463282
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/SequenceFaultFactory.java (added)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/SequenceFaultFactory.java Thu Oct 12 08:34:05 2006
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.rm.impl;
+
+import java.util.logging.Logger;
+
+import org.apache.cxf.common.i18n.Message;
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.ws.rm.Identifier;
+import org.apache.cxf.ws.rm.SequenceFault;
+import org.apache.cxf.ws.rm.SequenceFaultType;
+
+/**
+ *
+ */
+
+public class SequenceFaultFactory {
+
+ private static final Logger LOG = LogUtils.getL7dLogger(SequenceFaultFactory.class);
+
+ SequenceFault createUnknownSequenceFault(Identifier sid) {
+ SequenceFaultType sf = RMUtils.getWSRMFactory().createSequenceFaultType();
+ sf.setFaultCode(RMUtils.getRMConstants().getUnknownSequenceFaultCode());
+ Message msg = new Message("UNKNOWN_SEQUENCE_EXC", LOG, sid.getValue());
+ return new SequenceFault(msg.toString(), sf);
+ }
+}
Propchange: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/SequenceFaultFactory.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/SequenceFaultFactory.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Copied: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/Source.java (from r454703, incubator/cxf/tags/celtix/pre_apache/rt/ws/rm/src/main/java/org/objectweb/celtix/ws/rm/RMSource.java)
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/Source.java?view=diff&rev=463282&p1=incubator/cxf/tags/celtix/pre_apache/rt/ws/rm/src/main/java/org/objectweb/celtix/ws/rm/RMSource.java&r1=454703&p2=incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/Source.java&r2=463282
==============================================================================
--- incubator/cxf/tags/celtix/pre_apache/rt/ws/rm/src/main/java/org/objectweb/celtix/ws/rm/RMSource.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/Source.java Thu Oct 12 08:34:05 2006
@@ -1,6 +1,24 @@
-package org.objectweb.celtix.ws.rm;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.rm.impl;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -8,29 +26,28 @@
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-import org.objectweb.celtix.common.i18n.Message;
-import org.objectweb.celtix.common.logging.LogUtils;
-import org.objectweb.celtix.ws.rm.persistence.RMStore;
+import org.apache.cxf.endpoint.Endpoint;
+import org.apache.cxf.helpers.CastUtils;
+import org.apache.cxf.ws.rm.Identifier;
+import org.apache.cxf.ws.rm.SequenceAcknowledgement;
+import org.apache.cxf.ws.rm.SourceSequence;
+import org.apache.cxf.ws.rm.persistence.RMStore;
-public class RMSource extends RMEndpoint {
+public class Source extends AbstractEndpoint {
- private static final Logger LOG = LogUtils.getL7dLogger(RMSource.class);
private static final String REQUESTOR_SEQUENCE_ID = "";
- private Map<String, SourceSequence> map;
- private Map<String, SourceSequence> current;
+ private Map<String, SourceSequenceImpl> map;
+ private Map<String, SourceSequenceImpl> current;
private Lock sequenceCreationLock;
private Condition sequenceCreationCondition;
private boolean sequenceCreationNotified;
-
- RMSource(RMHandler h) {
- super(h);
- map = new HashMap<String, SourceSequence>();
- current = new HashMap<String, SourceSequence>();
+ Source(RMInterceptor interceptor, Endpoint endpoint) {
+ super(interceptor, endpoint);
+ map = new HashMap<String, SourceSequenceImpl>();
+ current = new HashMap<String, SourceSequenceImpl>();
sequenceCreationLock = new ReentrantLock();
sequenceCreationCondition = sequenceCreationLock.newCondition();
@@ -40,15 +57,15 @@
return map.get(id.getValue());
}
- public void addSequence(SourceSequence seq) {
+ public void addSequence(SourceSequenceImpl seq) {
addSequence(seq, true);
}
- public void addSequence(SourceSequence seq, boolean persist) {
+ public void addSequence(SourceSequenceImpl seq, boolean persist) {
seq.setSource(this);
map.put(seq.getIdentifier().getValue(), seq);
if (persist) {
- RMStore store = getHandler().getStore();
+ RMStore store = getInterceptor().getStore();
if (null != store) {
store.createSourceSequence(seq);
}
@@ -57,18 +74,60 @@
public void removeSequence(SourceSequence seq) {
map.remove(seq.getIdentifier().getValue());
- RMStore store = getHandler().getStore();
+ RMStore store = getInterceptor().getStore();
if (null != store) {
store.removeSourceSequence(seq.getIdentifier());
}
}
- public Collection<SourceSequence> getAllSequences() {
- return map.values();
- }
+ public Collection<SourceSequence> getAllSequences() {
+ return CastUtils.cast(map.values());
+ }
-
+ /**
+ * Stores the received acknowledgment in the Sequence object identified in
+ * the <code>SequenceAcknowldgement</code> parameter. Then purges any
+ * acknowledged messages from the retransmission queue and requests sequence
+ * termination if necessary.
+ *
+ * @param acknowledgment
+ */
+ public void setAcknowledged(SequenceAcknowledgement acknowledgment) {
+ Identifier sid = acknowledgment.getIdentifier();
+ SourceSequenceImpl seq = getSequenceImpl(sid);
+ if (null != seq) {
+ seq.setAcknowledged(acknowledgment);
+ getInterceptor().getRetransmissionQueue().purgeAcknowledged(seq);
+ if (seq.allAcknowledged()) {
+ // TODO
+ /*
+ try {
+ //
+ getHandler().getProxy().terminateSequence(seq);
+ } catch (IOException ex) {
+ Message msg = new Message("SEQ_TERMINATION_FAILURE", LOG, seq.getIdentifier());
+ LOG.log(Level.SEVERE, msg.toString(), ex);
+ }
+ */
+ }
+ }
+ }
+ /**
+ * Returns a collection of all sequences for which have not yet been
+ * completely acknowledged.
+ *
+ * @return the collection of unacknowledged sequences.
+ */
+ public Collection<SourceSequence> getAllUnacknowledgedSequences() {
+ Collection<SourceSequence> seqs = new ArrayList<SourceSequence>();
+ for (SourceSequenceImpl seq : map.values()) {
+ if (!seq.allAcknowledged()) {
+ seqs.add(seq);
+ }
+ }
+ return seqs;
+ }
/**
* Returns the current sequence used by a client side source.
@@ -83,7 +142,7 @@
* Sets the current sequence used by a client side source.
* @param s the current sequence.
*/
- void setCurrent(SourceSequence s) {
+ void setCurrent(SourceSequenceImpl s) {
setCurrent(null, s);
}
@@ -144,7 +203,7 @@
* sent as part of the inbound sequence with the specified identifier.
* @param s the current sequence.
*/
- void setCurrent(Identifier i, SourceSequence s) {
+ void setCurrent(Identifier i, SourceSequenceImpl s) {
sequenceCreationLock.lock();
try {
current.put(i == null ? REQUESTOR_SEQUENCE_ID : i.getValue(), s);
@@ -154,45 +213,8 @@
sequenceCreationLock.unlock();
}
}
-
- /**
- * Stores the received acknowledgment in the Sequence object identified in
- * the <code>SequenceAcknowldgement</code> parameter. Then purges any
- * acknowledged messages from the retransmission queue and requests sequence
- * termination if necessary.
- *
- * @param acknowledgment
- */
- public void setAcknowledged(SequenceAcknowledgement acknowledgment) {
- Identifier sid = acknowledgment.getIdentifier();
- SourceSequence seq = getSequence(sid);
- if (null != seq) {
- seq.setAcknowledged(acknowledgment);
- getHandler().getPersistenceManager().getQueue().purgeAcknowledged(seq);
- if (seq.allAcknowledged()) {
- try {
- getHandler().getProxy().terminateSequence(seq);
- } catch (IOException ex) {
- Message msg = new Message("SEQ_TERMINATION_FAILURE", LOG, seq.getIdentifier());
- LOG.log(Level.SEVERE, msg.toString(), ex);
- }
- }
- }
- }
- /**
- * Returns a collection of all sequences for which have not yet been
- * completely acknowledged.
- *
- * @return the collection of unacknowledged sequences.
- */
- public Collection<SourceSequence> getAllUnacknowledgedSequences() {
- Collection<SourceSequence> seqs = new ArrayList<SourceSequence>();
- for (SourceSequence seq : map.values()) {
- if (!seq.allAcknowledged()) {
- seqs.add(seq);
- }
- }
- return seqs;
+ SourceSequenceImpl getSequenceImpl(Identifier id) {
+ return map.get(id.getValue());
}
}
Copied: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/SourceSequenceImpl.java (from r454697, incubator/cxf/tags/celtix/pre_apache/rt/ws/rm/src/main/java/org/objectweb/celtix/ws/rm/SourceSequence.java)
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/SourceSequenceImpl.java?view=diff&rev=463282&p1=incubator/cxf/tags/celtix/pre_apache/rt/ws/rm/src/main/java/org/objectweb/celtix/ws/rm/SourceSequence.java&r1=454697&p2=incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/SourceSequenceImpl.java&r2=463282
==============================================================================
--- incubator/cxf/tags/celtix/pre_apache/rt/ws/rm/src/main/java/org/objectweb/celtix/ws/rm/SourceSequence.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/SourceSequenceImpl.java Thu Oct 12 08:34:05 2006
@@ -1,53 +1,61 @@
-package org.objectweb.celtix.ws.rm;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.rm.impl;
import java.math.BigInteger;
import java.util.Date;
import java.util.logging.Level;
import java.util.logging.Logger;
-import javax.xml.datatype.DatatypeConfigurationException;
-import javax.xml.datatype.DatatypeFactory;
import javax.xml.datatype.Duration;
-import org.objectweb.celtix.bus.configuration.wsrm.SequenceTerminationPolicyType;
-import org.objectweb.celtix.common.logging.LogUtils;
-import org.objectweb.celtix.ws.addressing.ContextUtils;
-import org.objectweb.celtix.ws.rm.SequenceAcknowledgement.AcknowledgementRange;
-import org.objectweb.celtix.ws.rm.persistence.RMSourceSequence;
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.jaxb.DatatypeFactory;
+import org.apache.cxf.ws.addressing.ContextUtils;
+import org.apache.cxf.ws.rm.Expires;
+import org.apache.cxf.ws.rm.Identifier;
+import org.apache.cxf.ws.rm.SequenceAcknowledgement;
+import org.apache.cxf.ws.rm.SequenceAcknowledgement.AcknowledgementRange;
+import org.apache.cxf.ws.rm.SourceSequence;
+import org.apache.cxf.ws.rm.interceptor.SequenceTerminationPolicyType;
-public class SourceSequence extends AbstractSequenceImpl implements RMSourceSequence {
+public class SourceSequenceImpl extends AbstractSequenceImpl implements SourceSequence {
- public static final Duration PT0S;
- private static final Logger LOG = LogUtils.getL7dLogger(SourceSequence.class);
+ private static final Logger LOG = LogUtils.getL7dLogger(SourceSequenceImpl.class);
private Date expires;
- private RMSource source;
+ private Source source;
private BigInteger currentMessageNumber;
private boolean lastMessage;
private Identifier offeringId;
- private org.objectweb.celtix.ws.addressing.EndpointReferenceType target;
+ private org.apache.cxf.ws.addressing.EndpointReferenceType target;
- static {
- Duration pt0s = null;
- try {
- DatatypeFactory df = DatatypeFactory.newInstance();
- pt0s = df.newDuration("PT0S");
- } catch (DatatypeConfigurationException ex) {
- LOG.log(Level.INFO, "Could not create Duration object.", ex);
- }
- PT0S = pt0s;
- }
-
- public SourceSequence(Identifier i) {
+ public SourceSequenceImpl(Identifier i) {
this(i, null, null);
}
- public SourceSequence(Identifier i, Date e, Identifier oi) {
+ public SourceSequenceImpl(Identifier i, Date e, Identifier oi) {
this(i, e, oi, BigInteger.ZERO, false);
}
-
- public SourceSequence(Identifier i, Date e, Identifier oi, BigInteger cmn, boolean lm) {
+ public SourceSequenceImpl(Identifier i, Date e, Identifier oi, BigInteger cmn, boolean lm) {
super(i);
expires = e;
@@ -55,41 +63,85 @@
currentMessageNumber = cmn;
lastMessage = lm;
- acked = RMUtils.getWSRMFactory().createSequenceAcknowledgement();
- acked.setIdentifier(id);
+ acknowledgement = RMUtils.getWSRMFactory().createSequenceAcknowledgement();
+ acknowledgement.setIdentifier(id);
}
// begin RMSourceSequence interface
-
+
+ /* (non-Javadoc)
+ * @see org.apache.cxf.ws.rm.SourceSequence#getCurrentMessageNr()
+ */
public BigInteger getCurrentMessageNr() {
return currentMessageNumber;
}
-
- /**
- * @return the identifier of the rm source
+
+ /* (non-Javadoc)
+ * @see org.apache.cxf.ws.rm.SourceSequence#getEndpointIdentifier()
*/
public String getEndpointIdentifier() {
+ // TODO
+ /*
if (null != source) {
return source.getHandler().getConfigurationHelper().getEndpointId();
}
+ */
return null;
}
-
- public Date getExpiry() {
- return expires;
+
+ /* (non-Javadoc)
+ * @see org.apache.cxf.ws.rm.SourceSequence#getOfferingSequenceIdentifier()
+ */
+ public Identifier getOfferingSequenceIdentifier() {
+ return offeringId;
}
+ /* (non-Javadoc)
+ * @see org.apache.cxf.ws.rm.SourceSequence#isLastMessage()
+ */
public boolean isLastMessage() {
return lastMessage;
}
- public Identifier getOfferingSequenceIdentifier() {
- return offeringId;
+ /* (non-Javadoc)
+ * @see org.apache.cxf.ws.rm.SourceSequence#getExpiry()
+ */
+ public Date getExpiry() {
+ // TODO Auto-generated method stub
+ return null;
}
// end RMSourceSequence interface
- void setSource(RMSource s) {
+ /**
+ * Returns true if a last message had been sent for this sequence and if all
+ * messages for this sequence have been acknowledged.
+ *
+ * @return true if all messages have been acknowledged.
+ */
+ public boolean allAcknowledged() {
+ if (!lastMessage) {
+ return false;
+ }
+
+ if (acknowledgement.getAcknowledgementRange().size() == 1) {
+ AcknowledgementRange r = acknowledgement.getAcknowledgementRange().get(0);
+ return r.getLower().equals(BigInteger.ONE) && r.getUpper().equals(currentMessageNumber);
+ }
+ return false;
+ }
+
+ /**
+ * Used by the RM source to cache received acknowledgements for this
+ * sequence.
+ *
+ * @param acknowledgement an acknowledgement for this sequence
+ */
+ public void setAcknowledged(SequenceAcknowledgement a) {
+ acknowledgement = a;
+ }
+
+ void setSource(Source s) {
source = s;
}
@@ -117,15 +169,17 @@
boolean isExpired() {
return expires == null ? false : new Date().after(expires);
+
}
void setExpires(Expires ex) {
Duration d = null;
+ expires = null;
if (null != ex) {
d = ex.getValue();
}
- if (null != d && (null == PT0S || !PT0S.equals(d))) {
+ if (null != d && !d.equals(DatatypeFactory.PT0S)) {
Date now = new Date();
expires = new Date(now.getTime() + ex.getValue().getTimeInMillis(now));
}
@@ -171,39 +225,15 @@
}
}
- /**
- * Used by the RM source to cache received acknowledgements for this
- * sequence.
- *
- * @param acknowledgement an acknowledgement for this sequence
- */
- void setAcknowledged(SequenceAcknowledgement acknowledgement) {
- acked = acknowledgement;
- }
-
+
+
SequenceAcknowledgement getAcknowledgement() {
- return acked;
+ return acknowledgement;
}
- /**
- * Returns true if a last message had been sent for this sequence and if all
- * messages for this sequence have been acknowledged.
- *
- * @return true if all messages have been acknowledged.
- */
- boolean allAcknowledged() {
- if (!lastMessage) {
- return false;
- }
-
- if (acked.getAcknowledgementRange().size() == 1) {
- AcknowledgementRange r = acked.getAcknowledgementRange().get(0);
- return r.getLower().equals(BigInteger.ONE) && r.getUpper().equals(currentMessageNumber);
- }
- return false;
- }
+
/**
* The target for the sequence is the first non-anonymous address that
@@ -215,13 +245,13 @@
*
* @param to
*/
- synchronized void setTarget(org.objectweb.celtix.ws.addressing.EndpointReferenceType to) {
+ synchronized void setTarget(org.apache.cxf.ws.addressing.EndpointReferenceType to) {
if (target == null && !ContextUtils.isGenericAddress(to)) {
target = to;
}
}
- synchronized org.objectweb.celtix.ws.addressing.EndpointReferenceType getTarget() {
+ synchronized org.apache.cxf.ws.addressing.EndpointReferenceType getTarget() {
return target;
}
@@ -230,29 +260,38 @@
* and if so sets the lastMessageNumber property.
*/
private void checkLastMessage(Identifier inSeqId, BigInteger inMsgNumber) {
-
- assert null != source;
// check if this is a response to a message that was is the last message in the sequence
// that included this sequence as an offer
-
+
if (null != inSeqId && null != inMsgNumber) {
- DestinationSequence inSeq = source.getHandler().getDestination().getSequence(inSeqId);
- if (null != inSeq && offeredBy(inSeqId) && inMsgNumber.equals(inSeq.getLastMessageNr())) {
- lastMessage = true;
+ RMInterceptor interceptor = source.getInterceptor();
+ Destination destination = interceptor.getDestination(source);
+ DestinationSequenceImpl inSeq = null;
+ if (null != destination) {
+ inSeq = destination.getSequenceImpl(inSeqId);
+ }
+
+ if (null != inSeq && offeredBy(inSeqId)
+ && inMsgNumber.equals(inSeq.getLastMessageNumber())) {
+ lastMessage = true;
}
}
+
if (!lastMessage) {
- SequenceTerminationPolicyType stp = source.getHandler().getConfigurationHelper()
- .getSequenceTerminationPolicy();
+ SequenceTerminationPolicyType stp = source.getInterceptor().getSourcePolicy()
+ .getSequenceTerminationPolicy();
+
assert null != stp;
if ((!stp.getMaxLength().equals(BigInteger.ZERO) && stp.getMaxLength()
.compareTo(currentMessageNumber) <= 0)
- || (stp.getMaxRanges() > 0 && acked.getAcknowledgementRange().size() >= stp.getMaxRanges())
- || (stp.getMaxUnacknowledged() > 0 && source.getHandler().getPersistenceManager().getQueue()
- .countUnacknowledged(this) >= stp.getMaxUnacknowledged())) {
+ || (stp.getMaxRanges() > 0
+ && acknowledgement.getAcknowledgementRange().size() >= stp.getMaxRanges())
+ || (stp.getMaxUnacknowledged() > 0
+ && source.getInterceptor().getRetransmissionQueue()
+ .countUnacknowledged(this) >= stp.getMaxUnacknowledged())) {
lastMessage = true;
}
}
@@ -261,5 +300,4 @@
LOG.fine(currentMessageNumber + " should be the last message in this sequence.");
}
}
-
}
Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMStore.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMStore.java?view=diff&rev=463282&r1=463281&r2=463282
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMStore.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMStore.java Thu Oct 12 08:34:05 2006
@@ -23,7 +23,9 @@
import java.util.Collection;
import java.util.Map;
+import org.apache.cxf.ws.rm.DestinationSequence;
import org.apache.cxf.ws.rm.Identifier;
+import org.apache.cxf.ws.rm.SourceSequence;
public interface RMStore {
@@ -39,14 +41,14 @@
* <code>RMSourceSequence</code> object.
* @param seq the sequence
*/
- void createSourceSequence(RMSourceSequence seq);
+ void createSourceSequence(SourceSequence seq);
/**
* Create a destination sequence in the persistent store, with the sequence attributes as specified in the
* <code>RMSDestinationSequence</code> object.
* @param seq the sequence
*/
- void createDestinationSequence(RMDestinationSequence seq);
+ void createDestinationSequence(DestinationSequence seq);
/**
* Remove the source sequence with the specified identifier from persistent store.
@@ -67,7 +69,7 @@
* @param endpointIdentifier the identifier for the source
* @return the collection of sequences
*/
- Collection<RMSourceSequence> getSourceSequences(String endpointIdentifier);
+ Collection<SourceSequence> getSourceSequences(String endpointIdentifier);
/**
* Retrieves all sequences managed by the identified RM destination endpoint
@@ -76,7 +78,7 @@
* @param endpointIdentifier the identifier for the destination
* @return the collection of sequences
*/
- Collection<RMDestinationSequence> getDestinationSequences(String endpointIdentifier);
+ Collection<DestinationSequence> getDestinationSequences(String endpointIdentifier);
/**
* Retrieves the outbound/inbound messages stored for the source/destination sequence with
@@ -95,7 +97,7 @@
* @param seq the source sequence
* @param msg the outgoing message
*/
- void persistOutgoing(RMSourceSequence seq, RMMessage msg);
+ void persistOutgoing(SourceSequence seq, RMMessage msg);
/**
* Called by an RM source upon processing an outbound message. The <code>RMMessage</code>
@@ -104,7 +106,7 @@
* @param seq the destination sequence
* @param msg the incoming message
*/
- void persistIncoming(RMDestinationSequence seq, RMMessage msg);
+ void persistIncoming(DestinationSequence seq, RMMessage msg);
/**
* Removes the messages with the given message numbers and identifiers from the store of