You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by ay...@apache.org on 2011/08/23 17:41:22 UTC
svn commit: r1160748 [1/2] - in /cxf/trunk: rt/ws/rm/
rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/
rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/
rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/
rt/ws/rm/src/main/java/org/apache/...
Author: ay
Date: Tue Aug 23 15:41:21 2011
New Revision: 1160748
URL: http://svn.apache.org/viewvc?rev=1160748&view=rev
Log:
[CXF-3757] jmx instrumentaiton for ws-rm
Added:
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java (with props)
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMManager.java (with props)
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionStatus.java (with props)
cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ManagedRMManagerTest.java (with props)
cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/managed-manager-bean.xml (with props)
cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ManagedEndpointsTest.java (with props)
cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/managed-client.xml (with props)
cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/managed-server.xml (with props)
Modified:
cxf/trunk/rt/ws/rm/pom.xml
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMUtils.java
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionQueue.java
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMStore.java
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java
cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerConfigurationTest.java
cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreTest.java
Modified: cxf/trunk/rt/ws/rm/pom.xml
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/pom.xml?rev=1160748&r1=1160747&r2=1160748&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/pom.xml (original)
+++ cxf/trunk/rt/ws/rm/pom.xml Tue Aug 23 15:41:21 2011
@@ -64,6 +64,11 @@
</dependency>
<dependency>
<groupId>org.apache.cxf</groupId>
+ <artifactId>cxf-rt-management</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.cxf</groupId>
<artifactId>cxf-common-utilities</artifactId>
<version>${project.version}</version>
</dependency>
Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java?rev=1160748&r1=1160747&r2=1160748&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java Tue Aug 23 15:41:21 2011
@@ -327,7 +327,7 @@ public class DestinationSequence extends
if (null == store) {
return;
}
- store.removeMessages(getIdentifier(), Collections.singleton(new Long(messageNr)), false);
+ store.removeMessages(getIdentifier(), Collections.singleton(messageNr), false);
}
/**
@@ -341,6 +341,10 @@ public class DestinationSequence extends
return acknowledgeOnNextOccasion;
}
+ List<DeferredAcknowledgment> getDeferredAcknowledgements() {
+ return deferredAcknowledgments;
+ }
+
/**
* The correlation of the incoming CreateSequence call used to create this
* sequence is recorded so that in the absence of an offer, the corresponding
Added: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java?rev=1160748&view=auto
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java (added)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java Tue Aug 23 15:41:21 2011
@@ -0,0 +1,463 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.rm;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+
+import javax.management.JMException;
+import javax.management.ObjectName;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.CompositeDataSupport;
+import javax.management.openmbean.CompositeType;
+import javax.management.openmbean.OpenDataException;
+import javax.management.openmbean.OpenType;
+import javax.management.openmbean.SimpleType;
+
+import org.apache.cxf.management.ManagedComponent;
+import org.apache.cxf.management.annotation.ManagedAttribute;
+import org.apache.cxf.management.annotation.ManagedOperation;
+import org.apache.cxf.management.annotation.ManagedOperationParameter;
+import org.apache.cxf.management.annotation.ManagedOperationParameters;
+import org.apache.cxf.management.annotation.ManagedResource;
+import org.apache.cxf.ws.rm.DestinationSequence.DeferredAcknowledgment;
+import org.apache.cxf.ws.rm.v200702.Identifier;
+import org.apache.cxf.ws.rm.v200702.SequenceAcknowledgement.AcknowledgementRange;
+
+/**
+ * The ManagedRMEndpoint is a JMX managed bean for RMEndpoint.
+ *
+ */
+@ManagedResource(componentName = "RMEndpoint",
+ description = "Responsible for Sources and Destinations.")
+public class ManagedRMEndpoint implements ManagedComponent {
+
+ private static final String[] SOURCE_SEQUENCE_NAMES =
+ {"sequenceId", "currentMessageNumber", "expires", "lastMessage", "queuedMessageCount",
+ "target"};
+ private static final String[] SOURCE_SEQUENCE_DESCRIPTIONS = SOURCE_SEQUENCE_NAMES;
+ private static final OpenType<?>[] SOURCE_SEQUENCE_TYPES =
+ {SimpleType.STRING, SimpleType.LONG, SimpleType.DATE, SimpleType.BOOLEAN, SimpleType.INTEGER,
+ SimpleType.STRING};
+
+ private static final String[] DESTINATION_SEQUENCE_NAMES =
+ {"sequenceId", "lastMessageNumber", "correlationId", "ackTo"};
+ private static final String[] DESTINATION_SEQUENCE_DESCRIPTIONS = DESTINATION_SEQUENCE_NAMES;
+ private static final OpenType<?>[] DESTINATION_SEQUENCE_TYPES =
+ {SimpleType.STRING, SimpleType.LONG, SimpleType.STRING,
+ SimpleType.STRING};
+
+ private static final String[] RETRANSMISSION_STATUS_NAMES =
+ {"messageNumber", "resends", "previous", "next", "nextInterval", "backOff", "pending", "suspended"};
+ private static final String[] RETRANSMISSION_STATUS_DESCRIPTIONS = RETRANSMISSION_STATUS_NAMES;
+ private static final OpenType<?>[] RETRANSMISSION_STATUS_TYPES =
+ {SimpleType.LONG, SimpleType.INTEGER, SimpleType.DATE, SimpleType.DATE, SimpleType.LONG, SimpleType.LONG,
+ SimpleType.BOOLEAN, SimpleType.BOOLEAN};
+
+ private static CompositeType sourceSequenceType;
+ private static CompositeType destinationSequenceType;
+
+ private static CompositeType retransmissionStatusType;
+
+ private RMEndpoint endpoint;
+
+
+ static {
+ try {
+ sourceSequenceType = new CompositeType("sourceSequence",
+ "sourceSequence",
+ SOURCE_SEQUENCE_NAMES,
+ SOURCE_SEQUENCE_DESCRIPTIONS,
+ SOURCE_SEQUENCE_TYPES);
+
+ destinationSequenceType = new CompositeType("destinationSequence",
+ "destinationSequence",
+ DESTINATION_SEQUENCE_NAMES,
+ DESTINATION_SEQUENCE_DESCRIPTIONS,
+ DESTINATION_SEQUENCE_TYPES);
+
+ retransmissionStatusType = new CompositeType("retransmissionStatus",
+ "retransmissionStatus",
+ RETRANSMISSION_STATUS_NAMES,
+ RETRANSMISSION_STATUS_DESCRIPTIONS,
+ RETRANSMISSION_STATUS_TYPES);
+
+ } catch (OpenDataException e) {
+ // ignore and handle it later
+ }
+ }
+
+ public ManagedRMEndpoint(RMEndpoint endpoint) {
+ this.endpoint = endpoint;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.cxf.management.ManagedComponent#getObjectName()
+ */
+ public ObjectName getObjectName() throws JMException {
+ return RMUtils.getManagedObjectName(endpoint);
+ }
+
+ @ManagedOperation(description = "Total Number of Queued Messages")
+ public int getQueuedMessageTotalCount() {
+ Source source = endpoint.getSource();
+ RetransmissionQueue queue = endpoint.getManager().getRetransmissionQueue();
+ int count = 0;
+ for (SourceSequence ss : source.getAllSequences()) {
+ count += queue.countUnacknowledged(ss);
+ }
+
+ return count;
+ }
+
+ @ManagedOperation(description = "Number of Queued Messages")
+ @ManagedOperationParameters({
+ @ManagedOperationParameter(name = "sequenceId", description = "The sequence identifier")
+ })
+ public int getQueuedMessageCount(String sid) {
+ RMManager manager = endpoint.getManager();
+ SourceSequence ss = getSourceSeq(sid);
+ if (null == ss) {
+ throw new IllegalArgumentException("no sequence");
+ }
+
+ return manager.getRetransmissionQueue().countUnacknowledged(ss);
+ }
+
+ @ManagedOperation(description = "List of UnAcknowledged Message Numbers")
+ @ManagedOperationParameters({
+ @ManagedOperationParameter(name = "sequenceId", description = "The sequence identifier")
+ })
+ public Long[] getUnAcknowledgedMessageIdentifiers(String sid) {
+ RetransmissionQueue rq = endpoint.getManager().getRetransmissionQueue();
+ SourceSequence ss = getSourceSeq(sid);
+ if (null == ss) {
+ throw new IllegalArgumentException("no sequence");
+ }
+
+ List<Long> numbers = rq.getUnacknowledgedMessageNumbers(ss);
+ return numbers.toArray(new Long[numbers.size()]);
+ }
+
+ @ManagedOperation(description = "Total Number of Deferred Acknowledgements")
+ public int getDeferredAcknowledgementTotalCount() {
+ Destination destination = endpoint.getDestination();
+
+ int count = 0;
+ for (DestinationSequence ds : destination.getAllSequences()) {
+ List<DeferredAcknowledgment> das = ds.getDeferredAcknowledgements();
+ if (null != das) {
+ count += das.size();
+ }
+ }
+
+ return count;
+ }
+
+ @ManagedOperation(description = "Number of Deferred Acknowledgements")
+ @ManagedOperationParameters({
+ @ManagedOperationParameter(name = "sequenceId", description = "The sequence identifier")
+ })
+ public int getDeferredAcknowledgementCount(String sid) {
+ DestinationSequence ds = getDestinationSeq(sid);
+ if (null == ds) {
+ throw new IllegalArgumentException("no sequence");
+ }
+
+ return ds.getDeferredAcknowledgements().size();
+ }
+
+ @ManagedOperation(description = "Source Sequence Acknowledged Range")
+ @ManagedOperationParameters({
+ @ManagedOperationParameter(name = "sequenceId", description = "The sequence identifier")
+ })
+ public Long[] getSourceSequenceAcknowledgedRange(String sid) {
+ SourceSequence ss = getSourceSeq(sid);
+ if (null == ss) {
+ throw new IllegalArgumentException("no sequence");
+ }
+
+ List<Long> list = new ArrayList<Long>();
+
+ for (AcknowledgementRange r : ss.getAcknowledgement().getAcknowledgementRange()) {
+ list.add(r.getLower());
+ list.add(r.getUpper());
+ }
+ return list.toArray(new Long[list.size()]);
+ }
+
+ @ManagedOperation(description = "Destination Sequence Acknowledged Range")
+ @ManagedOperationParameters({
+ @ManagedOperationParameter(name = "sequenceId", description = "The sequence identifier")
+ })
+ public Long[] getDestinationSequenceAcknowledgedRange(String sid) {
+ DestinationSequence ds = getDestinationSeq(sid);
+ if (null == ds) {
+ throw new IllegalArgumentException("no sequence");
+ }
+
+ List<Long> list = new ArrayList<Long>();
+
+ for (AcknowledgementRange r : ds.getAcknowledgment().getAcknowledgementRange()) {
+ list.add(r.getLower());
+ list.add(r.getUpper());
+ }
+ return list.toArray(new Long[list.size()]);
+ }
+
+ @ManagedOperation(description = "Retransmission Status")
+ @ManagedOperationParameters({
+ @ManagedOperationParameter(name = "sequenceId", description = "The sequence identifier"),
+ @ManagedOperationParameter(name = "messageNumber", description = "The message number")
+ })
+ public CompositeData getRetransmissionStatus(String sid, long num) throws JMException {
+ SourceSequence ss = getSourceSeq(sid);
+ if (null == ss) {
+ throw new IllegalArgumentException("no sequence");
+ }
+ RetransmissionQueue rq = endpoint.getManager().getRetransmissionQueue();
+ RetransmissionStatus rs = rq.getRetransmissionStatus(ss, num);
+ return getRetransmissionStatusProperties(num, rs);
+ }
+
+ @ManagedOperation(description = "Retransmission Statuses")
+ @ManagedOperationParameters({
+ @ManagedOperationParameter(name = "sequenceId", description = "The sequence identifier")
+ })
+ public CompositeData[] getRetransmissionStatuses(String sid) throws JMException {
+ SourceSequence ss = getSourceSeq(sid);
+ if (null == ss) {
+ throw new IllegalArgumentException("no sequence");
+ }
+ RetransmissionQueue rq = endpoint.getManager().getRetransmissionQueue();
+ Map<Long, RetransmissionStatus> rsmap = rq.getRetransmissionStatuses(ss);
+
+ CompositeData[] rsps = new CompositeData[rsmap.size()];
+ int i = 0;
+ for (Map.Entry<Long, RetransmissionStatus> rs : rsmap.entrySet()) {
+ rsps[i++] = getRetransmissionStatusProperties(rs.getKey(), rs.getValue());
+ }
+ return rsps;
+ }
+
+ @ManagedOperation(description = "List of Source Sequence IDs")
+ @ManagedOperationParameters({
+ @ManagedOperationParameter(name = "expired", description = "The expired sequences included")
+ })
+ public String[] getSourceSequenceIds(boolean expired) {
+ Source source = endpoint.getSource();
+ List<String> list = new ArrayList<String>();
+ for (SourceSequence ss : source.getAllSequences()) {
+ if (expired || !ss.isExpired()) {
+ list.add(ss.getIdentifier().getValue());
+ }
+ }
+ return list.toArray(new String[list.size()]);
+ }
+
+
+ @ManagedOperation(description = "List of Destination Sequence IDs")
+ public String[] getDestinationSequenceIds() {
+ Destination destination = endpoint.getDestination();
+ List<String> list = new ArrayList<String>();
+ for (DestinationSequence ds : destination.getAllSequences()) {
+ list.add(ds.getIdentifier().getValue());
+ }
+ return list.toArray(new String[list.size()]);
+ }
+
+
+ @ManagedOperation(description = "Suspend Retransmission Queue")
+ @ManagedOperationParameters({
+ @ManagedOperationParameter(name = "sequenceId", description = "The sequence identifier")
+ })
+ public void suspendSourceQueue(String sid) throws JMException {
+ SourceSequence ss = getSourceSeq(sid);
+ if (null == ss) {
+ throw new IllegalArgumentException("no sequence");
+ }
+ RetransmissionQueue rq = endpoint.getManager().getRetransmissionQueue();
+ rq.suspend(ss);
+ }
+
+ @ManagedOperation(description = "Resume Retransmission Queue")
+ @ManagedOperationParameters({
+ @ManagedOperationParameter(name = "sequenceId", description = "The sequence identifier")
+ })
+ public void resumeSourceQueue(String sid) throws JMException {
+ SourceSequence ss = getSourceSeq(sid);
+ if (null == ss) {
+ throw new JMException("no source sequence");
+ }
+ RetransmissionQueue rq = endpoint.getManager().getRetransmissionQueue();
+ rq.resume(ss);
+ }
+
+ @ManagedOperation(description = "Current Source Sequence Properties")
+ public CompositeData getCurrentSourceSequence() throws JMException {
+ Source source = endpoint.getSource();
+ SourceSequence ss = source.getCurrent();
+
+ return getSourceSequenceProperties(ss);
+ }
+
+ @ManagedOperation(description = "Current Source Sequence Identifier")
+ public String getCurrentSourceSequenceId() throws JMException {
+ Source source = endpoint.getSource();
+ SourceSequence ss = source.getCurrent();
+
+ if (null == ss) {
+ throw new JMException("no source sequence");
+ }
+
+ return ss.getIdentifier().getValue();
+ }
+
+ @ManagedOperation(description = "Source Sequence Properties")
+ @ManagedOperationParameters({
+ @ManagedOperationParameter(name = "sequenceId", description = "The sequence identifier")
+ })
+ public CompositeData getSourceSequence(String sid) throws JMException {
+ SourceSequence ss = getSourceSeq(sid);
+
+ return getSourceSequenceProperties(ss);
+ }
+
+ @ManagedOperation(description = "Source Sequences Properties")
+ @ManagedOperationParameters({
+ @ManagedOperationParameter(name = "expired", description = "The expired sequences included")
+ })
+ public CompositeData[] getSourceSequences(boolean expired) throws JMException {
+ List<CompositeData> sps = new ArrayList<CompositeData>();
+
+ Source source = endpoint.getSource();
+ for (SourceSequence ss : source.getAllSequences()) {
+ if (expired || !ss.isExpired()) {
+ sps.add(getSourceSequenceProperties(ss));
+ }
+ }
+
+ return sps.toArray(new CompositeData[sps.size()]);
+ }
+
+
+ @ManagedOperation(description = "Destination Sequence Properties")
+ @ManagedOperationParameters({
+ @ManagedOperationParameter(name = "sequenceId", description = "The destination identifier")
+ })
+ public CompositeData getDestinationSequence(String sid) throws JMException {
+ DestinationSequence ds = getDestinationSeq(sid);
+
+ return getDestinationSequenceProperties(ds);
+ }
+
+ @ManagedOperation(description = "Destination Sequences Properties")
+ public CompositeData[] getDestinationSequences() throws JMException {
+ List<CompositeData> sps = new ArrayList<CompositeData>();
+
+ Destination destination = endpoint.getDestination();
+ for (DestinationSequence ds : destination.getAllSequences()) {
+ sps.add(getDestinationSequenceProperties(ds));
+ }
+
+ return sps.toArray(new CompositeData[sps.size()]);
+ }
+
+ private SourceSequence getSourceSeq(String sid) {
+ Source source = endpoint.getSource();
+ Identifier identifier = RMUtils.getWSRMFactory().createIdentifier();
+ identifier.setValue(sid);
+ return source.getSequence(identifier);
+ }
+
+ private DestinationSequence getDestinationSeq(String sid) {
+ Destination destination = endpoint.getDestination();
+ Identifier identifier = RMUtils.getWSRMFactory().createIdentifier();
+ identifier.setValue(sid);
+ return destination.getSequence(identifier);
+ }
+
+ private CompositeData getSourceSequenceProperties(SourceSequence ss) throws JMException {
+ if (null == ss) {
+ throw new IllegalArgumentException("no sequence");
+ }
+ RMManager manager = endpoint.getManager();
+ Object[] ssv = new Object[]{ss.getIdentifier().getValue(), ss.getCurrentMessageNr(),
+ ss.getExpires(), ss.isLastMessage(),
+ manager.getRetransmissionQueue().countUnacknowledged(ss),
+ ss.getTarget().getAddress().getValue()};
+
+ CompositeData ssps = new CompositeDataSupport(sourceSequenceType,
+ SOURCE_SEQUENCE_NAMES, ssv);
+ return ssps;
+ }
+
+ private CompositeData getDestinationSequenceProperties(DestinationSequence ds) throws JMException {
+ if (null == ds) {
+ throw new IllegalArgumentException("no sequence");
+ }
+ Object[] dsv = new Object[]{ds.getIdentifier().getValue(), ds.getLastMessageNumber(),
+ ds.getCorrelationID(),
+ ds.getAcksTo().getAddress().getValue()};
+
+ CompositeData dsps = new CompositeDataSupport(destinationSequenceType,
+ DESTINATION_SEQUENCE_NAMES, dsv);
+ return dsps;
+ }
+
+ private CompositeData getRetransmissionStatusProperties(long num,
+ RetransmissionStatus rs) throws JMException {
+ CompositeData rsps = null;
+ if (null != rs) {
+ Object[] rsv = new Object[] {num, rs.getResends(), rs.getPrevious(),
+ rs.getNext(), rs.getNextInterval(),
+ rs.getBackoff(), rs.isPending(), rs.isSuspended()};
+ rsps = new CompositeDataSupport(retransmissionStatusType,
+ RETRANSMISSION_STATUS_NAMES, rsv);
+ }
+ return rsps;
+ }
+
+ @ManagedAttribute(description = "Address Attribute", currencyTimeLimit = 60)
+ public String getAddress() {
+ return endpoint.getApplicationEndpoint().getEndpointInfo().getAddress();
+ }
+
+ //Not relevant unless ws-rm is used for non-http protocols
+// @ManagedAttribute(description = "TransportId Attribute", currencyTimeLimit = 60)
+// public String getTransportId() {
+// return endpoint.getApplicationEndpoint().getEndpointInfo().getTransportId();
+// }
+
+ @ManagedAttribute(description = "Application Message Last Received", currencyTimeLimit = 60)
+ public Date getLastApplicationMessage() {
+ return endpoint.getLastApplicationMessage() == 0L ? null
+ : new Date(endpoint.getLastApplicationMessage());
+ }
+
+ @ManagedAttribute(description = "Protocol Message Last Received", currencyTimeLimit = 60)
+ public Date getLastControlMessage() {
+ return endpoint.getLastControlMessage() == 0L ? null
+ : new Date(endpoint.getLastControlMessage());
+ }
+
+}
Propchange: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java
------------------------------------------------------------------------------
svn:executable = *
Added: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMManager.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMManager.java?rev=1160748&view=auto
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMManager.java (added)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMManager.java Tue Aug 23 15:41:21 2011
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.rm;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import javax.management.JMException;
+import javax.management.ObjectName;
+
+//import org.apache.cxf.endpoint.Endpoint;
+import org.apache.cxf.endpoint.Endpoint;
+import org.apache.cxf.management.ManagedComponent;
+import org.apache.cxf.management.annotation.ManagedAttribute;
+import org.apache.cxf.management.annotation.ManagedOperation;
+import org.apache.cxf.management.annotation.ManagedResource;
+
+/**
+ * The ManagedRMManager is a JMX managed bean for RMManager.
+ *
+ */
+@ManagedResource(componentName = "RMManager",
+ description = "Responsible for managing RMEndpoints.")
+public class ManagedRMManager implements ManagedComponent {
+
+ private RMManager manager;
+
+ public ManagedRMManager(RMManager manager) {
+ this.manager = manager;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.cxf.management.ManagedComponent#getObjectName()
+ */
+ public ObjectName getObjectName() throws JMException {
+ return RMUtils.getManagedObjectName(manager);
+ }
+
+ @ManagedOperation
+ public void shutdown() {
+ manager.shutdown();
+ }
+
+ @ManagedOperation
+ public String[] getEndpointIdentifiers() {
+ Set<String> identifiers = new HashSet<String>();
+ //FIXME find this method for 2.5
+// for (Endpoint ep : manager.getReliableEndpointsMap().keySet()) {
+ for (Endpoint ep : getReliableEndpointsMap().keySet()) {
+ identifiers.add(RMUtils.getEndpointIdentifier(ep));
+ }
+ return identifiers.toArray(new String[identifiers.size()]);
+ }
+
+ //TODO see the comment above
+ private Map<Endpoint, RMEndpoint> getReliableEndpointsMap() {
+ Map<Endpoint, RMEndpoint> epmap = new HashMap<Endpoint, RMEndpoint>();
+ for (ProtocolVariation pv : manager.getEndpointMaps().keySet()) {
+ epmap.putAll(manager.getEndpointMaps().get(pv));
+ }
+ return epmap;
+ }
+
+ @ManagedAttribute(description = "Using Store")
+ public boolean isUsingStore() {
+ return manager.getStore() != null;
+ }
+}
Propchange: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMManager.java
------------------------------------------------------------------------------
svn:executable = *
Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java?rev=1160748&r1=1160747&r2=1160748&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java Tue Aug 23 15:41:21 2011
@@ -24,6 +24,7 @@ import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
+import javax.management.JMException;
import javax.wsdl.extensions.ExtensibilityElement;
import javax.xml.XMLConstants;
import javax.xml.bind.JAXBContext;
@@ -42,6 +43,7 @@ import org.apache.cxf.databinding.DataBi
import org.apache.cxf.endpoint.Endpoint;
import org.apache.cxf.interceptor.Interceptor;
import org.apache.cxf.jaxb.JAXBDataBinding;
+import org.apache.cxf.management.InstrumentationManager;
import org.apache.cxf.message.Message;
import org.apache.cxf.service.Service;
import org.apache.cxf.service.factory.ServiceConstructionException;
@@ -97,7 +99,9 @@ public class RMEndpoint {
private QName interfaceQName;
private long lastApplicationMessage;
private long lastControlMessage;
-
+ private InstrumentationManager instrumentationManager;
+ private ManagedRMEndpoint managedEndpoint;
+
/**
* Constructor.
*
@@ -260,6 +264,17 @@ public class RMEndpoint {
createService();
createEndpoint(d);
setPolicies();
+ if (manager != null && manager.getBus() != null) {
+ managedEndpoint = new ManagedRMEndpoint(this);
+ instrumentationManager = manager.getBus().getExtension(InstrumentationManager.class);
+ if (instrumentationManager != null) {
+ try {
+ instrumentationManager.register(managedEndpoint);
+ } catch (JMException jmex) {
+ LOG.log(Level.WARNING, "Registering ManagedRMEndpoint failed.", jmex);
+ }
+ }
+ }
}
void createService() {
@@ -645,6 +660,8 @@ public class RMEndpoint {
for (SourceSequence ss : getSource().getAllSequences()) {
manager.getRetransmissionQueue().stop(ss);
}
+
+ // unregistering of this managed bean from the server is done by the bus itself
}
class EffectivePolicyImpl implements EffectivePolicy {
Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java?rev=1160748&r1=1160747&r2=1160748&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java Tue Aug 23 15:41:21 2011
@@ -30,6 +30,7 @@ import java.util.logging.Logger;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
+import javax.management.JMException;
import javax.xml.namespace.QName;
import org.apache.cxf.Bus;
@@ -42,6 +43,7 @@ import org.apache.cxf.endpoint.Endpoint;
import org.apache.cxf.endpoint.Server;
import org.apache.cxf.endpoint.ServerLifeCycleListener;
import org.apache.cxf.endpoint.ServerLifeCycleManager;
+import org.apache.cxf.management.InstrumentationManager;
import org.apache.cxf.message.Exchange;
import org.apache.cxf.message.ExchangeImpl;
import org.apache.cxf.message.Message;
@@ -100,6 +102,8 @@ public class RMManager {
private DeliveryAssuranceType deliveryAssurance;
private SourcePolicyType sourcePolicy;
private DestinationPolicyType destinationPolicy;
+ private InstrumentationManager instrumentationManager;
+ private ManagedRMManager managedManager;
private String rmNamespace = RM10Constants.NAMESPACE_URI;
private String rmAddressingNamespace = Names200408.WSA_NAMESPACE_NAME;
@@ -466,6 +470,8 @@ public class RMManager {
t.purge();
t.cancel();
}
+
+ // unregistring of this managed bean from the server is done by the bus itself
}
synchronized void shutdownReliableEndpoint(Endpoint e) {
@@ -623,6 +629,17 @@ public class RMManager {
if (null == idGenerator) {
idGenerator = new DefaultSequenceIdentifierGenerator();
}
+ if (null != bus) {
+ managedManager = new ManagedRMManager(this);
+ instrumentationManager = bus.getExtension(InstrumentationManager.class);
+ if (instrumentationManager != null) {
+ try {
+ instrumentationManager.register(managedManager);
+ } catch (JMException jmex) {
+ LOG.log(Level.WARNING, "Registering ManagedRMManager failed.", jmex);
+ }
+ }
+ }
}
@PostConstruct
Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMUtils.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMUtils.java?rev=1160748&r1=1160747&r2=1160748&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMUtils.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMUtils.java Tue Aug 23 15:41:21 2011
@@ -21,8 +21,13 @@ package org.apache.cxf.ws.rm;
import java.io.OutputStream;
+import javax.management.JMException;
+import javax.management.ObjectName;
+
+import org.apache.cxf.Bus;
import org.apache.cxf.endpoint.Endpoint;
import org.apache.cxf.io.WriteOnCloseOutputStream;
+import org.apache.cxf.management.ManagementConstants;
import org.apache.cxf.message.Message;
import org.apache.cxf.ws.addressing.AddressingConstants;
import org.apache.cxf.ws.addressing.AddressingConstantsImpl;
@@ -124,4 +129,40 @@ public final class RMUtils {
}
return (WriteOnCloseOutputStream) os;
}
+
+ public static ObjectName getManagedObjectName(RMManager manager) throws JMException {
+ StringBuilder buffer = new StringBuilder();
+ writeTypeProperty(buffer, manager.getBus(), "WSRM.Manager");
+ return new ObjectName(buffer.toString());
+ }
+
+ public static ObjectName getManagedObjectName(RMEndpoint endpoint) throws JMException {
+ StringBuilder buffer = new StringBuilder();
+ writeTypeProperty(buffer, endpoint.getManager().getBus(), "WSRM.Endpoint");
+ Endpoint ep = endpoint.getApplicationEndpoint();
+ writeEndpointProperty(buffer, ep);
+ return new ObjectName(buffer.toString());
+ }
+
+ public static ObjectName getManagedObjectName(RMManager manager, Endpoint ep) throws JMException {
+ StringBuilder buffer = new StringBuilder();
+ writeTypeProperty(buffer, manager.getBus(), "WSRM.Endpoint");
+ writeEndpointProperty(buffer, ep);
+ return new ObjectName(buffer.toString());
+ }
+
+ private static void writeTypeProperty(StringBuilder buffer, Bus bus, String type) {
+ String busId = bus.getId();
+ buffer.append(ManagementConstants.DEFAULT_DOMAIN_NAME + ":");
+ buffer.append(ManagementConstants.BUS_ID_PROP + "=" + busId + ",");
+ buffer.append(ManagementConstants.TYPE_PROP + "=" + type);
+ }
+
+ private static void writeEndpointProperty(StringBuilder buffer, Endpoint ep) {
+ String serviceName = ObjectName.quote(ep.getService().getName().toString());
+ buffer.append(",");
+ buffer.append(ManagementConstants.SERVICE_NAME_PROP + "=" + serviceName + ",");
+ String endpointName = ObjectName.quote(ep.getEndpointInfo().getName().toString());
+ buffer.append(ManagementConstants.PORT_NAME_PROP + "=" + endpointName);
+ }
}
Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionQueue.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionQueue.java?rev=1160748&r1=1160747&r2=1160748&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionQueue.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionQueue.java Tue Aug 23 15:41:21 2011
@@ -19,6 +19,9 @@
package org.apache.cxf.ws.rm;
+import java.util.List;
+import java.util.Map;
+
import org.apache.cxf.message.Message;
public interface RetransmissionQueue {
@@ -39,8 +42,8 @@ public interface RetransmissionQueue {
boolean isEmpty();
/**
- * Accepts a new context for posible future retransmission.
- * @param ctx the message context.
+ * Accepts a new message for possible future retransmission.
+ * @param message the message context.
*/
void addUnacknowledged(Message message);
@@ -52,15 +55,47 @@ public interface RetransmissionQueue {
void purgeAcknowledged(SourceSequence seq);
/**
+ *
+ * @param seq
+ * @return
+ */
+ List<Long> getUnacknowledgedMessageNumbers(SourceSequence seq);
+
+ /**
+ * Returns the retransmission status for the specified message.
+ * @param seq
+ * @param num
+ * @return
+ */
+ RetransmissionStatus getRetransmissionStatus(SourceSequence seq, long num);
+
+ /**
+ * Return the retransmission status of all the messages assigned to the sequence.
+ * @param seq
+ * @return
+ */
+ Map<Long, RetransmissionStatus> getRetransmissionStatuses(SourceSequence seq);
+
+ /**
* Initiate resends.
- *
*/
void start();
/**
* Stops retransmission queue.
+ * @param seq
*/
void stop(SourceSequence seq);
-
+ /**
+ * Suspends the retransmission attempts for the specified sequence
+ * @param seq
+ */
+ void suspend(SourceSequence seq);
+
+ /**
+ * Resumes the retransmission attempts for the specified sequence
+ * @param seq
+ */
+ void resume(SourceSequence seq);
}
Added: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionStatus.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionStatus.java?rev=1160748&view=auto
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionStatus.java (added)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionStatus.java Tue Aug 23 15:41:21 2011
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.rm;
+
+import java.util.Date;
+
+/**
+ *
+ */
+public interface RetransmissionStatus {
+ /**
+ * @return the next transmission time
+ */
+ Date getNext();
+
+ /**
+ * @return the previous transmission time
+ */
+ Date getPrevious();
+
+ /**
+ * @return the resends
+ */
+ int getResends();
+
+ /**
+ * @return the nextInterval
+ */
+ long getNextInterval();
+
+ /**
+ * @return the backoff
+ */
+ long getBackoff();
+
+ /**
+ * @return the pending
+ */
+ boolean isPending();
+
+ /**
+ * @return the suspended
+ */
+ boolean isSuspended();
+}
Propchange: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionStatus.java
------------------------------------------------------------------------------
svn:executable = *
Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMStore.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMStore.java?rev=1160748&r1=1160747&r2=1160748&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMStore.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMStore.java Tue Aug 23 15:41:21 2011
@@ -42,6 +42,20 @@ public interface RMStore {
* @param seq the sequence
*/
void createDestinationSequence(DestinationSequence seq);
+
+ /**
+ * Retrieve the source sequence with the specified identifier from persistent store.
+ * @param seq the sequence
+ * @return the sequence if present; otherwise null
+ */
+ SourceSequence getSourceSequence(Identifier seq, ProtocolVariation protocol);
+
+ /**
+ * Retrieve the destination sequence with the specified identifier from persistent store.
+ * @param seq the sequence
+ * @return the sequence if present; otherwise null
+ */
+ DestinationSequence getDestinationSequence(Identifier seq, ProtocolVariation protocol);
/**
* Remove the source sequence with the specified identifier from persistent store.
Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java?rev=1160748&r1=1160747&r2=1160748&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java Tue Aug 23 15:41:21 2011
@@ -101,6 +101,12 @@ public class RMTxStore implements RMStor
= "INSERT INTO {0} VALUES(?, ?, ?, ?)";
private static final String DELETE_MESSAGE_STMT_STR =
"DELETE FROM {0} WHERE SEQ_ID = ? AND MSG_NO = ?";
+ private static final String SELECT_DEST_SEQUENCE_STMT_STR =
+ "SELECT ACKS_TO, LAST_MSG_NO, ACKNOWLEDGED FROM CXF_RM_DEST_SEQUENCES "
+ + "WHERE SEQ_ID = ?";
+ private static final String SELECT_SRC_SEQUENCE_STMT_STR =
+ "SELECT CUR_MSG_NO, LAST_MSG, EXPIRY, OFFERING_SEQ_ID FROM CXF_RM_SRC_SEQUENCES "
+ + "WHERE SEQ_ID = ?";
private static final String SELECT_DEST_SEQUENCES_STMT_STR =
"SELECT SEQ_ID, ACKS_TO, LAST_MSG_NO, ACKNOWLEDGED FROM CXF_RM_DEST_SEQUENCES "
+ "WHERE ENDPOINT_ID = ?";
@@ -126,6 +132,8 @@ public class RMTxStore implements RMStor
private PreparedStatement updateSrcSequenceStmt;
private PreparedStatement selectDestSequencesStmt;
private PreparedStatement selectSrcSequencesStmt;
+ private PreparedStatement selectDestSequenceStmt;
+ private PreparedStatement selectSrcSequenceStmt;
private PreparedStatement createInboundMessageStmt;
private PreparedStatement createOutboundMessageStmt;
private PreparedStatement deleteInboundMessageStmt;
@@ -237,6 +245,68 @@ public class RMTxStore implements RMStor
}
}
+ public DestinationSequence getDestinationSequence(Identifier sid, ProtocolVariation protocol) {
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.info("Getting destination sequence for id: " + sid);
+ }
+ try {
+ if (null == selectDestSequenceStmt) {
+ selectDestSequenceStmt =
+ connection.prepareStatement(SELECT_DEST_SEQUENCE_STMT_STR);
+ }
+ selectDestSequenceStmt.setString(1, sid.getValue());
+
+ ResultSet res = selectDestSequenceStmt.executeQuery();
+ if (res.next()) {
+ EndpointReferenceType acksTo = RMUtils.createReference(res.getString(1));
+ long lm = res.getLong(2);
+ InputStream is = res.getBinaryStream(3);
+ SequenceAcknowledgement ack = null;
+ if (null != is) {
+ ack = PersistenceUtils.getInstance()
+ .deserialiseAcknowledgment(is);
+ }
+ return new DestinationSequence(sid, acksTo, lm, ack, protocol);
+ }
+ } catch (SQLException ex) {
+ LOG.log(Level.WARNING, new Message("SELECT_DEST_SEQ_FAILED_MSG", LOG).toString(), ex);
+ }
+ return null;
+ }
+
+ public SourceSequence getSourceSequence(Identifier sid, ProtocolVariation protocol) {
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.info("Getting source sequences for id: " + sid);
+ }
+ try {
+ if (null == selectSrcSequenceStmt) {
+ selectSrcSequenceStmt =
+ connection.prepareStatement(SELECT_SRC_SEQUENCE_STMT_STR);
+ }
+ selectSrcSequenceStmt.setString(1, sid.getValue());
+ ResultSet res = selectSrcSequenceStmt.executeQuery();
+
+ if (res.next()) {
+ long cmn = res.getLong(1);
+ boolean lm = res.getBoolean(2);
+ long lval = res.getLong(3);
+ Date expiry = 0 == lval ? null : new Date(lval);
+ String oidValue = res.getString(4);
+ Identifier oi = null;
+ if (null != oidValue) {
+ oi = RMUtils.getWSRMFactory().createIdentifier();
+ oi.setValue(oidValue);
+ }
+ return new SourceSequence(sid, expiry, oi, cmn, lm, protocol);
+
+ }
+ } catch (SQLException ex) {
+ // ignore
+ LOG.log(Level.WARNING, new Message("SELECT_SRC_SEQ_FAILED_MSG", LOG).toString(), ex);
+ }
+ return null;
+ }
+
public void removeDestinationSequence(Identifier sid) {
try {
beginTransaction();
Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java?rev=1160748&r1=1160747&r2=1160748&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java Tue Aug 23 15:41:21 2011
@@ -62,6 +62,7 @@ import org.apache.cxf.ws.rm.RMProperties
import org.apache.cxf.ws.rm.RMUtils;
import org.apache.cxf.ws.rm.RetransmissionCallback;
import org.apache.cxf.ws.rm.RetransmissionQueue;
+import org.apache.cxf.ws.rm.RetransmissionStatus;
import org.apache.cxf.ws.rm.SourceSequence;
import org.apache.cxf.ws.rm.persistence.RMStore;
import org.apache.cxf.ws.rm.policy.RM10PolicyUtils;
@@ -76,7 +77,10 @@ public class RetransmissionQueueImpl imp
private static final Logger LOG = LogUtils.getL7dLogger(RetransmissionQueueImpl.class);
- private Map<String, List<ResendCandidate>> candidates = new HashMap<String, List<ResendCandidate>>();
+ private Map<String, List<ResendCandidate>> candidates =
+ new HashMap<String, List<ResendCandidate>>();
+ private Map<String, List<ResendCandidate>> suspendedCandidates =
+ new HashMap<String, List<ResendCandidate>>();
private Resender resender;
private RMManager manager;
@@ -149,6 +153,52 @@ public class RetransmissionQueueImpl imp
}
}
+ public List<Long> getUnacknowledgedMessageNumbers(SourceSequence seq) {
+ List<Long> unacknowledged = new ArrayList<Long>();
+ List<ResendCandidate> sequenceCandidates = getSequenceCandidates(seq);
+ if (null != sequenceCandidates) {
+ for (int i = 0; i < sequenceCandidates.size(); i++) {
+ ResendCandidate candidate = sequenceCandidates.get(i);
+ RMProperties properties = RMContextUtils.retrieveRMProperties(candidate.getMessage(),
+ true);
+ SequenceType st = properties.getSequence();
+ unacknowledged.add(st.getMessageNumber());
+ }
+ }
+ return unacknowledged;
+ }
+
+ public RetransmissionStatus getRetransmissionStatus(SourceSequence seq, long num) {
+ List<ResendCandidate> sequenceCandidates = getSequenceCandidates(seq);
+ if (null != sequenceCandidates) {
+ for (int i = 0; i < sequenceCandidates.size(); i++) {
+ ResendCandidate candidate = sequenceCandidates.get(i);
+ RMProperties properties = RMContextUtils.retrieveRMProperties(candidate.getMessage(),
+ true);
+ SequenceType st = properties.getSequence();
+ if (num == st.getMessageNumber()) {
+ return candidate;
+ }
+ }
+ }
+ return null;
+ }
+
+ public Map<Long, RetransmissionStatus> getRetransmissionStatuses(SourceSequence seq) {
+ Map<Long, RetransmissionStatus> cp = new HashMap<Long, RetransmissionStatus>();
+ List<ResendCandidate> sequenceCandidates = getSequenceCandidates(seq);
+ if (null != sequenceCandidates) {
+ for (int i = 0; i < sequenceCandidates.size(); i++) {
+ ResendCandidate candidate = sequenceCandidates.get(i);
+ RMProperties properties = RMContextUtils.retrieveRMProperties(candidate.getMessage(),
+ true);
+ SequenceType st = properties.getSequence();
+ cp.put(st.getMessageNumber(), candidate);
+ }
+ }
+ return cp;
+ }
+
/**
* Initiate resends.
*/
@@ -182,6 +232,36 @@ public class RetransmissionQueueImpl imp
void stop() {
}
+
+ public void suspend(SourceSequence seq) {
+ synchronized (this) {
+ String key = seq.getIdentifier().getValue();
+ List<ResendCandidate> sequenceCandidates = candidates.remove(key);
+ if (null != sequenceCandidates) {
+ for (int i = sequenceCandidates.size() - 1; i >= 0; i--) {
+ ResendCandidate candidate = sequenceCandidates.get(i);
+ candidate.suspend();
+ }
+ suspendedCandidates.put(key, sequenceCandidates);
+ LOG.log(Level.FINE, "Suspended resends for sequence {0}.", key);
+ }
+ }
+ }
+
+ public void resume(SourceSequence seq) {
+ synchronized (this) {
+ String key = seq.getIdentifier().getValue();
+ List<ResendCandidate> sequenceCandidates = suspendedCandidates.remove(key);
+ if (null != sequenceCandidates) {
+ for (int i = 0; i < sequenceCandidates.size(); i++) {
+ ResendCandidate candidate = sequenceCandidates.get(i);
+ candidate.resume();
+ }
+ candidates.put(key, sequenceCandidates);
+ LOG.log(Level.FINE, "Resumed resends for sequence {0}.", key);
+ }
+ }
+ }
/**
* @return the exponential backoff
@@ -219,6 +299,9 @@ public class RetransmissionQueueImpl imp
candidates.put(key, sequenceCandidates);
}
candidate = new ResendCandidate(message);
+ if (isSequenceSuspended(key)) {
+ candidate.suspend();
+ }
sequenceCandidates.add(candidate);
}
LOG.fine("Cached unacknowledged message.");
@@ -248,7 +331,20 @@ public class RetransmissionQueueImpl imp
* @pre called with mutex held
*/
protected List<ResendCandidate> getSequenceCandidates(String key) {
- return candidates.get(key);
+ List<ResendCandidate> sc = candidates.get(key);
+ if (null == sc) {
+ sc = suspendedCandidates.get(key);
+ }
+ return sc;
+ }
+
+ /**
+ * @param key the sequence identifier under consideration
+ * @return true if the sequence is currently suspended; false otherwise
+ * @pre called with mutex held
+ */
+ protected boolean isSequenceSuspended(String key) {
+ return suspendedCandidates.containsKey(key);
}
private void clientResend(Message message) {
@@ -373,7 +469,7 @@ public class RetransmissionQueueImpl imp
/**
* Represents a candidate for resend, i.e. an unacked outgoing message.
*/
- protected class ResendCandidate implements Runnable {
+ protected class ResendCandidate implements Runnable, RetransmissionStatus {
private Message message;
private OutputStream out;
private Date next;
@@ -382,6 +478,7 @@ public class RetransmissionQueueImpl imp
private long nextInterval;
private long backoff;
private boolean pending;
+ private boolean suspended;
private boolean includeAckRequested;
/**
@@ -458,21 +555,43 @@ public class RetransmissionQueueImpl imp
/**
* @return number of resend attempts
*/
- protected int getResends() {
+ public int getResends() {
return resends;
}
/**
* @return date of next resend
*/
- protected Date getNext() {
+ public Date getNext() {
return next;
}
/**
+ * @return date of previous resend or null if no attempt is yet taken
+ */
+ public Date getPrevious() {
+ if (resends > 0) {
+ return new Date(next.getTime() - nextInterval / backoff);
+ }
+ return null;
+ }
+
+ public long getNextInterval() {
+ return nextInterval;
+ }
+
+ public long getBackoff() {
+ return backoff;
+ }
+
+ public boolean isSuspended() {
+ return suspended;
+ }
+
+ /**
* @return if resend attempt is pending
*/
- protected synchronized boolean isPending() {
+ public synchronized boolean isPending() {
return pending;
}
@@ -498,13 +617,29 @@ public class RetransmissionQueueImpl imp
}
}
+ protected void suspend() {
+ suspended = true;
+ pending = false;
+ //TODO release the message and later reload it upon resume
+ //cancel();
+ if (null != nextTask) {
+ nextTask.cancel();
+ }
+ }
+
+ protected void resume() {
+ suspended = false;
+ next = new Date(System.currentTimeMillis());
+ attempted();
+ }
+
private void releaseSavedMessage() {
- CachedOutputStream saved = (CachedOutputStream)message.get(RMMessageConstants.SAVED_CONTENT);
+ CachedOutputStream saved = (CachedOutputStream)message.remove(RMMessageConstants.SAVED_CONTENT);
if (saved != null) {
saved.releaseTempFileHold();
}
-
}
+
/**
* @return associated message context
*/
Added: cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ManagedRMManagerTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ManagedRMManagerTest.java?rev=1160748&view=auto
==============================================================================
--- cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ManagedRMManagerTest.java (added)
+++ cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ManagedRMManagerTest.java Tue Aug 23 15:41:21 2011
@@ -0,0 +1,505 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.rm;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectInstance;
+import javax.management.ObjectName;
+import javax.management.openmbean.CompositeData;
+import javax.xml.namespace.QName;
+
+import junit.framework.Assert;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.binding.soap.model.SoapBindingInfo;
+import org.apache.cxf.bus.spring.SpringBusFactory;
+import org.apache.cxf.common.WSDLConstants;
+import org.apache.cxf.endpoint.Endpoint;
+import org.apache.cxf.endpoint.EndpointImpl;
+import org.apache.cxf.management.InstrumentationManager;
+import org.apache.cxf.message.Exchange;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.service.Service;
+import org.apache.cxf.service.ServiceImpl;
+import org.apache.cxf.service.model.EndpointInfo;
+import org.apache.cxf.service.model.ServiceInfo;
+import org.apache.cxf.ws.addressing.EndpointReferenceType;
+import org.apache.cxf.ws.rm.v200702.Identifier;
+import org.apache.cxf.ws.rm.v200702.SequenceAcknowledgement;
+import org.easymock.classextension.EasyMock;
+import org.easymock.classextension.IMocksControl;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ManagedRMManagerTest extends Assert {
+ private static final String TEST_URI = "http://nowhere.com/bar/foo";
+ private IMocksControl control;
+
+ private Bus bus;
+ private InstrumentationManager im;
+ private RMManager manager;
+ private Endpoint endpoint;
+
+ @Before
+ public void setUp() {
+ control = EasyMock.createNiceControl();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (bus != null) {
+ bus.shutdown(true);
+ }
+ control.verify();
+ }
+
+ @Test
+ public void testManagedRMManager() throws Exception {
+ final SpringBusFactory factory = new SpringBusFactory();
+ bus = factory.createBus("org/apache/cxf/ws/rm/managed-manager-bean.xml");
+ im = bus.getExtension(InstrumentationManager.class);
+ manager = bus.getExtension(RMManager.class);
+ endpoint = createTestEndpoint();
+ assertTrue("Instrumentation Manager should not be null", im != null);
+ assertTrue("RMManager should not be null", manager != null);
+
+ MBeanServer mbs = im.getMBeanServer();
+ assertNotNull("MBeanServer should be available.", mbs);
+
+ ObjectName managerName = RMUtils.getManagedObjectName(manager);
+ Set<ObjectInstance> mbset = mbs.queryMBeans(managerName, null);
+ assertEquals("ManagedRMManager should be found", 1, mbset.size());
+
+ Object o;
+ o = mbs.getAttribute(managerName, "UsingStore");
+ assertTrue(o instanceof Boolean);
+ assertFalse("Store attribute is false", (Boolean)o);
+
+ o = mbs.invoke(managerName, "getEndpointIdentifiers", null, null);
+ assertTrue(o instanceof String[]);
+ assertEquals("No Endpoint", 0, ((String[])o).length);
+
+ RMEndpoint rme = createTestRMEndpoint();
+
+ ObjectName endpointName = RMUtils.getManagedObjectName(rme);
+ mbset = mbs.queryMBeans(endpointName, null);
+ assertEquals("ManagedRMEndpoint should be found", 1, mbset.size());
+
+ o = mbs.invoke(managerName, "getEndpointIdentifiers", null, null);
+ assertEquals("One Endpoint", 1, ((String[])o).length);
+ assertEquals("Endpoint identifier must match",
+ RMUtils.getEndpointIdentifier(endpoint), ((String[])o)[0]);
+
+ // test some endpoint methods
+ o = mbs.getAttribute(endpointName, "Address");
+ assertTrue(o instanceof String);
+ assertEquals("Endpoint address must match", TEST_URI, o);
+
+ o = mbs.getAttribute(endpointName, "LastApplicationMessage");
+ assertNull(o);
+
+ o = mbs.getAttribute(endpointName, "LastControlMessage");
+ assertNull(o);
+
+ o = mbs.invoke(endpointName, "getDestinationSequenceIds", null, null);
+ assertTrue(o instanceof String[]);
+ assertEquals("No sequence", 0, ((String[])o).length);
+
+ o = mbs.invoke(endpointName, "getDestinationSequences", null, null);
+ assertTrue(o instanceof CompositeData[]);
+ assertEquals("No sequence", 0, ((CompositeData[])o).length);
+
+ o = mbs.invoke(endpointName, "getSourceSequenceIds", new Object[]{true}, new String[]{"boolean"});
+ assertTrue(o instanceof String[]);
+ assertEquals("No sequence", 0, ((String[])o).length);
+
+ o = mbs.invoke(endpointName, "getSourceSequences", new Object[]{true}, new String[]{"boolean"});
+ assertTrue(o instanceof CompositeData[]);
+ assertEquals("No sequence", 0, ((CompositeData[])o).length);
+
+ o = mbs.invoke(endpointName, "getDeferredAcknowledgementTotalCount", null, null);
+ assertTrue(o instanceof Integer);
+ assertEquals("No deferred acks", 0, o);
+
+ o = mbs.invoke(endpointName, "getQueuedMessageTotalCount", null, null);
+ assertTrue(o instanceof Integer);
+ assertEquals("No queued messages", 0, o);
+ }
+
+ @Test
+ public void testManagedRMEndpointGetQueuedCount() throws Exception {
+ ManagedRMEndpoint managedEndpoint = createTestManagedRMEndpoint();
+
+ int n = managedEndpoint.getQueuedMessageTotalCount();
+ assertEquals(3, n);
+
+ n = managedEndpoint.getQueuedMessageCount("seq1");
+ assertEquals(2, n);
+ }
+
+ @Test
+ public void testGetUnAcknowledgedMessageIdentifiers() throws Exception {
+ ManagedRMEndpoint managedEndpoint = createTestManagedRMEndpoint();
+
+ Long[] numbers = managedEndpoint.getUnAcknowledgedMessageIdentifiers("seq1");
+ assertEquals(2, numbers.length);
+ assertTrue(2L == numbers[0] && 4L == numbers[1]);
+ }
+
+ @Test
+ public void testGetSourceSequenceAcknowledgedRange() throws Exception {
+ ManagedRMEndpoint managedEndpoint = createTestManagedRMEndpoint();
+
+ Long[] ranges = managedEndpoint.getSourceSequenceAcknowledgedRange("seq1");
+ assertEquals(4, ranges.length);
+ assertTrue(1L == ranges[0] && 1L == ranges[1] && 3L == ranges[2] && 3L == ranges[3]);
+ }
+
+ @Test
+ public void testGetSourceSequences() throws Exception {
+ ManagedRMEndpoint managedEndpoint = createTestManagedRMEndpoint();
+
+ String[] sids = managedEndpoint.getSourceSequenceIds(true);
+ assertEquals(2, sids.length);
+ assertTrue(("seq1".equals(sids[0]) || "seq1".equals(sids[1]))
+ && ("seq2".equals(sids[0]) || "seq2".equals(sids[1])));
+
+ String sid = managedEndpoint.getCurrentSourceSequenceId();
+ assertEquals("seq2", sid);
+
+ CompositeData[] sequences = managedEndpoint.getSourceSequences(true);
+ assertEquals(2, sequences.length);
+ verifySourceSequence(sequences[0]);
+ verifySourceSequence(sequences[1]);
+ }
+
+ @Test
+ public void testGetDestinationSequences() throws Exception {
+ ManagedRMEndpoint managedEndpoint = createTestManagedRMEndpoint();
+
+ String[] sids = managedEndpoint.getDestinationSequenceIds();
+ assertEquals(2, sids.length);
+ assertTrue(("seq3".equals(sids[0]) || "seq3".equals(sids[1]))
+ && ("seq4".equals(sids[0]) || "seq4".equals(sids[1])));
+
+ CompositeData[] sequences = managedEndpoint.getDestinationSequences();
+ assertEquals(2, sequences.length);
+ verifyDestinationSequence(sequences[0]);
+ verifyDestinationSequence(sequences[1]);
+ }
+
+ @Test
+ public void testGetRetransmissionStatus() throws Exception {
+ ManagedRMEndpoint managedEndpoint = createTestManagedRMEndpoint();
+ TestRetransmissionQueue rq = (TestRetransmissionQueue)manager.getRetransmissionQueue();
+
+ CompositeData status = managedEndpoint.getRetransmissionStatus("seq1", 3L);
+ assertNull(status);
+
+ status = managedEndpoint.getRetransmissionStatus("seq1", 2L);
+ assertNotNull(status);
+ verifyRetransmissionStatus(status, 2L, rq.getRetransmissionStatus());
+ }
+
+ @Test
+ public void testSuspendAndResumeSourceQueue() throws Exception {
+ ManagedRMEndpoint managedEndpoint = createTestManagedRMEndpoint();
+ TestRetransmissionQueue rq = (TestRetransmissionQueue)manager.getRetransmissionQueue();
+
+ assertFalse(rq.isSuspended("seq1"));
+
+ managedEndpoint.suspendSourceQueue("seq1");
+ assertTrue(rq.isSuspended("seq1"));
+
+ managedEndpoint.resumeSourceQueue("seq1");
+ assertFalse(rq.isSuspended("seq1"));
+ }
+
+ private void verifySourceSequence(CompositeData cd) {
+ Object key = cd.get("sequenceId");
+ if ("seq1".equals(key)) {
+ verifySourceSequence(cd, "seq1", 5L, 2);
+ } else if ("seq2".equals(key)) {
+ verifySourceSequence(cd, "seq2", 4L, 1);
+ } else {
+ fail("Unexpected sequence: " + key);
+ }
+ }
+
+ private void verifySourceSequence(CompositeData cd, String sid, long num, int qsize) {
+ assertTrue(sid.equals(cd.get("sequenceId"))
+ && num == ((Long)cd.get("currentMessageNumber")).longValue()
+ && qsize == ((Integer)cd.get("queuedMessageCount")).intValue());
+ }
+
+ private void verifyDestinationSequence(CompositeData cd) {
+ Object key = cd.get("sequenceId");
+ assertTrue("seq3".equals(key) || "seq4".equals(key));
+ }
+
+ private void verifyRetransmissionStatus(CompositeData cd, long num, RetransmissionStatus status) {
+ assertEquals(num, cd.get("messageNumber"));
+ assertEquals(status.getResends(), cd.get("resends"));
+ assertEquals(status.getNext(), cd.get("next"));
+ assertEquals(status.getPrevious(), cd.get("previous"));
+ assertEquals(status.getNextInterval(), cd.get("nextInterval"));
+ assertEquals(status.getBackoff(), cd.get("backOff"));
+ }
+
+ private ManagedRMEndpoint createTestManagedRMEndpoint() {
+ manager = new RMManager();
+ RMEndpoint rme = control.createMock(RMEndpoint.class);
+ EndpointReferenceType ref = RMUtils.createReference(TEST_URI);
+ Source source = new Source(rme);
+ Destination destination = new Destination(rme);
+
+ RetransmissionQueue rq = new TestRetransmissionQueue();
+ manager.setRetransmissionQueue(rq);
+ manager.initialise();
+
+ List<SourceSequence> sss = createTestSourceSequences(source, ref);
+ List<DestinationSequence> dss = createTestDestinationSequences(destination, ref);
+
+ EasyMock.expect(rme.getManager()).andReturn(manager).anyTimes();
+ EasyMock.expect(rme.getSource()).andReturn(source).anyTimes();
+ EasyMock.expect(rme.getDestination()).andReturn(destination).anyTimes();
+ EasyMock.expect(rme.getProtocol()).andReturn(null).anyTimes();
+
+ control.replay();
+ setCurrentMessageNumber(sss.get(0), 5L);
+ setCurrentMessageNumber(sss.get(1), 4L);
+ source.addSequence(sss.get(0));
+ source.addSequence(sss.get(1));
+
+ source.setCurrent(sss.get(1));
+
+ destination.addSequence(dss.get(0));
+ destination.addSequence(dss.get(1));
+ return new ManagedRMEndpoint(rme);
+ }
+
+ private void setCurrentMessageNumber(SourceSequence ss, long num) {
+ for (int i = 0; i < num; i++) {
+ ss.nextMessageNumber();
+ }
+ }
+
+ private List<SourceSequence> createTestSourceSequences(Source source,
+ EndpointReferenceType to) {
+ List<SourceSequence> sss = new ArrayList<SourceSequence>();
+ sss.add(createTestSourceSequence(source, "seq1", to, new long[]{1L, 1L, 3L, 3L}));
+ sss.add(createTestSourceSequence(source, "seq2", to, new long[]{1L, 1L, 3L, 3L}));
+
+ return sss;
+ }
+
+ private List<DestinationSequence> createTestDestinationSequences(Destination destination,
+ EndpointReferenceType to) {
+ List<DestinationSequence> dss = new ArrayList<DestinationSequence>();
+ dss.add(createTestDestinationSequence(destination, "seq3", to, new long[]{1L, 1L, 3L, 3L}));
+ dss.add(createTestDestinationSequence(destination, "seq4", to, new long[]{1L, 1L, 3L, 3L}));
+
+ return dss;
+ }
+
+ private SourceSequence createTestSourceSequence(Source source, String sid,
+ EndpointReferenceType to, long[] acked) {
+ Identifier identifier = RMUtils.getWSRMFactory().createIdentifier();
+ identifier.setValue(sid);
+ SourceSequence ss = new SourceSequence(identifier, null);
+ ss.setSource(source);
+ ss.setTarget(to);
+ List<SequenceAcknowledgement.AcknowledgementRange> ranges =
+ ss.getAcknowledgement().getAcknowledgementRange();
+ for (int i = 0; i < acked.length; i += 2) {
+ ranges.add(createAcknowledgementRange(acked[i], acked[i + 1]));
+ }
+ return ss;
+ }
+
+ private DestinationSequence createTestDestinationSequence(Destination destination, String sid,
+ EndpointReferenceType to, long[] acked) {
+ Identifier identifier = RMUtils.getWSRMFactory().createIdentifier();
+ identifier.setValue(sid);
+ DestinationSequence ds = new DestinationSequence(identifier, to, null, null);
+ ds.setDestination(destination);
+
+ List<SequenceAcknowledgement.AcknowledgementRange> ranges =
+ ds.getAcknowledgment().getAcknowledgementRange();
+ for (int i = 0; i < acked.length; i += 2) {
+ ranges.add(createAcknowledgementRange(acked[i], acked[i + 1]));
+ }
+ return ds;
+ }
+
+ private SequenceAcknowledgement.AcknowledgementRange createAcknowledgementRange(long l, long u) {
+ SequenceAcknowledgement.AcknowledgementRange range =
+ new SequenceAcknowledgement.AcknowledgementRange();
+ range.setLower(l);
+ range.setUpper(u);
+ return range;
+ }
+
+ private Endpoint createTestEndpoint() throws Exception {
+ ServiceInfo svci = new ServiceInfo();
+ svci.setName(new QName(TEST_URI, "testService"));
+ Service svc = new ServiceImpl(svci);
+ SoapBindingInfo binding = new SoapBindingInfo(svci, WSDLConstants.NS_SOAP11);
+ binding.setTransportURI(WSDLConstants.NS_SOAP_HTTP_TRANSPORT);
+ EndpointInfo ei = new EndpointInfo();
+ ei.setAddress(TEST_URI);
+ ei.setName(new QName(TEST_URI, "testPort"));
+ ei.setBinding(binding);
+ ei.setService(svci);
+ return new EndpointImpl(bus, svc, ei);
+ }
+
+ private RMEndpoint createTestRMEndpoint() throws Exception {
+ Message message = control.createMock(Message.class);
+ Exchange exchange = control.createMock(Exchange.class);
+
+ EasyMock.expect(message.getExchange()).andReturn(exchange).anyTimes();
+ EasyMock.expect(exchange.get(Endpoint.class)).andReturn(endpoint);
+ EasyMock.expect(exchange.getOutMessage()).andReturn(message);
+
+ control.replay();
+ return manager.getReliableEndpoint(message);
+ }
+
+ private class TestRetransmissionQueue implements RetransmissionQueue {
+ private Set<String> suspended = new HashSet<String>();
+ private RetransmissionStatus status = new TestRetransmissionStatus();
+
+ public int countUnacknowledged(SourceSequence seq) {
+ final String key = seq.getIdentifier().getValue();
+ if ("seq1".equals(key)) {
+ return 2;
+ } else if ("seq2".equals(key)) {
+ return 1;
+ }
+ return 0;
+ }
+
+ public boolean isEmpty() {
+ return false;
+ }
+
+ public void addUnacknowledged(Message message) {
+ // TODO Auto-generated method stub
+ }
+
+ public void purgeAcknowledged(SourceSequence seq) {
+ // TODO Auto-generated method stub
+ }
+
+ public List<Long> getUnacknowledgedMessageNumbers(SourceSequence seq) {
+ List<Long> list = new ArrayList<Long>();
+ final String key = seq.getIdentifier().getValue();
+ if ("seq1".equals(key)) {
+ list.add(2L);
+ list.add(4L);
+ } else if ("seq2".equals(key)) {
+ list.add(3L);
+ }
+ return list;
+ }
+
+ public RetransmissionStatus getRetransmissionStatus(SourceSequence seq, long num) {
+ final String key = seq.getIdentifier().getValue();
+ if (("seq1".equals(key) && (2L == num || 4L == num))
+ || ("seq2".equals(key) && (2L == num))) {
+ return status;
+ }
+ return null;
+ }
+
+ public Map<Long, RetransmissionStatus> getRetransmissionStatuses(SourceSequence seq) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ public void start() {
+ // TODO Auto-generated method stub
+ }
+
+ public void stop(SourceSequence seq) {
+ // TODO Auto-generated method stub
+ }
+
+ public void suspend(SourceSequence seq) {
+ suspended.add(seq.getIdentifier().getValue());
+ }
+
+ public void resume(SourceSequence seq) {
+ suspended.remove(seq.getIdentifier().getValue());
+ }
+
+ boolean isSuspended(String sid) {
+ return suspended.contains(sid);
+ }
+
+ RetransmissionStatus getRetransmissionStatus() {
+ return status;
+ }
+ }
+
+ private static class TestRetransmissionStatus implements RetransmissionStatus {
+ private long interval = 300000L;
+ private Date next = new Date(System.currentTimeMillis() + interval / 2);
+ private Date previous = new Date(next.getTime() - interval);
+
+ public Date getNext() {
+ return next;
+ }
+
+ public Date getPrevious() {
+ return previous;
+ }
+
+ public int getResends() {
+ return 2;
+ }
+
+ public long getNextInterval() {
+ return interval;
+ }
+
+ public long getBackoff() {
+ return 1L;
+ }
+
+ public boolean isPending() {
+ return false;
+ }
+
+ public boolean isSuspended() {
+ return false;
+ }
+ }
+}
Propchange: cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ManagedRMManagerTest.java
------------------------------------------------------------------------------
svn:executable = *
Modified: cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerConfigurationTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerConfigurationTest.java?rev=1160748&r1=1160747&r2=1160748&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerConfigurationTest.java (original)
+++ cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerConfigurationTest.java Tue Aug 23 15:41:21 2011
@@ -157,6 +157,16 @@ public class RMManagerConfigurationTest
// TODO Auto-generated method stub
}
+
+ public SourceSequence getSourceSequence(Identifier seq, ProtocolVariation protocol) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ public DestinationSequence getDestinationSequence(Identifier seq, ProtocolVariation protocol) {
+ // TODO Auto-generated method stub
+ return null;
+ }
}
}
Added: cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/managed-manager-bean.xml
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/managed-manager-bean.xml?rev=1160748&view=auto
==============================================================================
--- cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/managed-manager-bean.xml (added)
+++ cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/managed-manager-bean.xml Tue Aug 23 15:41:21 2011
@@ -0,0 +1,60 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:cxf="http://cxf.apache.org/core"
+ xmlns:wsrm-mgr="http://cxf.apache.org/ws/rm/manager"
+ xmlns:wsrm-policy="http://schemas.xmlsoap.org/ws/2005/02/rm/policy"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="
+http://cxf.apache.org/core http://cxf.apache.org/schemas/core.xsd
+http://cxf.apache.org/ws/rm/manager http://cxf.apache.org/schemas/configuration/wsrm-manager.xsd
+http://schemas.xmlsoap.org/ws/2005/02/rm/policy http://schemas.xmlsoap.org/ws/2005/02/rm/wsrm-policy.xsd
+
+http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
+
+ <bean id="org.apache.cxf.management.InstrumentationManager" class="org.apache.cxf.management.jmx.InstrumentationManagerImpl">
+ <property name="bus" ref="cxf" />
+ <property name="enabled" value="true"/>
+ <property name="threaded" value="false"/>
+ <property name="daemon" value="false"/>
+ <property name="JMXServiceURL" value="service:jmx:rmi:///jndi/rmi://localhost:9914/jmxrmi" />
+ </bean>
+
+ <cxf:bus>
+ <cxf:features>
+ <wsrm-mgr:reliableMessaging>
+ <wsrm-policy:RMAssertion>
+ <wsrm-policy:BaseRetransmissionInterval Milliseconds="10000"/>
+ <wsrm-policy:AcknowledgementInterval Milliseconds="10000"/>
+ </wsrm-policy:RMAssertion>
+ <wsrm-mgr:deliveryAssurance>
+ <wsrm-mgr:InOrder/>
+ </wsrm-mgr:deliveryAssurance>
+ <wsrm-mgr:sourcePolicy>
+ <wsrm-mgr:sequenceTerminationPolicy terminateOnShutdown="true"/>
+ </wsrm-mgr:sourcePolicy>
+ <wsrm-mgr:destinationPolicy>
+ <wsrm-mgr:acksPolicy ImmediaAcksTimeout="2000" intraMessageThreshold="0"/>
+ </wsrm-mgr:destinationPolicy>
+ </wsrm-mgr:reliableMessaging>
+ </cxf:features>
+ </cxf:bus>
+
+</beans>
Propchange: cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/managed-manager-bean.xml
------------------------------------------------------------------------------
svn:executable = *
Modified: cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreTest.java?rev=1160748&r1=1160747&r2=1160748&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreTest.java (original)
+++ cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreTest.java Tue Aug 23 15:41:21 2011
@@ -226,30 +226,34 @@ public class RMTxStoreTest extends Asser
@Test
public void testCreateDeleteMessages() throws IOException, SQLException {
- RMMessage msg = control.createMock(RMMessage.class);
+ RMMessage msg1 = control.createMock(RMMessage.class);
+ RMMessage msg2 = control.createMock(RMMessage.class);
Identifier sid1 = new Identifier();
sid1.setValue("sequence1");
- EasyMock.expect(msg.getMessageNumber()).andReturn(ONE).times(2);
+ EasyMock.expect(msg1.getMessageNumber()).andReturn(ONE);
+ EasyMock.expect(msg2.getMessageNumber()).andReturn(ONE);
byte[] bytes = new byte[89];
- EasyMock.expect(msg.getInputStream()).andReturn(new ByteArrayInputStream(bytes));
- EasyMock.expect(msg.getSize()).andReturn(bytes.length);
+ EasyMock.expect(msg1.getInputStream()).andReturn(new ByteArrayInputStream(bytes));
+ EasyMock.expect(msg1.getSize()).andReturn(bytes.length);
+ EasyMock.expect(msg2.getInputStream()).andReturn(new ByteArrayInputStream(bytes));
+ EasyMock.expect(msg2.getSize()).andReturn(bytes.length);
control.replay();
store.beginTransaction();
- store.storeMessage(sid1, msg, true);
- store.storeMessage(sid1, msg, false);
+ store.storeMessage(sid1, msg1, true);
+ store.storeMessage(sid1, msg2, false);
store.commit();
control.verify();
control.reset();
- EasyMock.expect(msg.getMessageNumber()).andReturn(ONE);
- EasyMock.expect(msg.getInputStream()).andReturn(new ByteArrayInputStream(bytes));
- EasyMock.expect(msg.getSize()).andReturn(bytes.length);
+ EasyMock.expect(msg1.getMessageNumber()).andReturn(ONE);
+ EasyMock.expect(msg1.getInputStream()).andReturn(new ByteArrayInputStream(bytes));
+ EasyMock.expect(msg1.getSize()).andReturn(bytes.length);
control.replay();
store.beginTransaction();
try {
- store.storeMessage(sid1, msg, true);
+ store.storeMessage(sid1, msg1, true);
} catch (SQLException ex) {
assertEquals("23505", ex.getSQLState());
}
@@ -257,14 +261,17 @@ public class RMTxStoreTest extends Asser
control.verify();
control.reset();
- EasyMock.expect(msg.getMessageNumber()).andReturn(TEN).times(2);
- EasyMock.expect(msg.getInputStream()).andReturn(new ByteArrayInputStream(bytes));
- EasyMock.expect(msg.getSize()).andReturn(bytes.length);
+ EasyMock.expect(msg1.getMessageNumber()).andReturn(TEN);
+ EasyMock.expect(msg2.getMessageNumber()).andReturn(TEN);
+ EasyMock.expect(msg1.getInputStream()).andReturn(new ByteArrayInputStream(bytes));
+ EasyMock.expect(msg1.getSize()).andReturn(bytes.length);
+ EasyMock.expect(msg2.getInputStream()).andReturn(new ByteArrayInputStream(bytes));
+ EasyMock.expect(msg2.getSize()).andReturn(bytes.length);
control.replay();
store.beginTransaction();
- store.storeMessage(sid1, msg, true);
- store.storeMessage(sid1, msg, false);
+ store.storeMessage(sid1, msg1, true);
+ store.storeMessage(sid1, msg2, false);
store.commit();
control.verify();
@@ -420,6 +427,64 @@ public class RMTxStoreTest extends Asser
}
@Test
+ public void testGetDestinationSequence() throws SQLException, IOException {
+
+ Identifier sid1 = null;
+ Identifier sid2 = null;
+
+ DestinationSequence seq =
+ store.getDestinationSequence(new Identifier(), ProtocolVariation.RM10WSA200408);
+ assertNull(seq);
+
+ try {
+ sid1 = setupDestinationSequence("sequence1");
+
+ seq = store.getDestinationSequence(sid1, ProtocolVariation.RM10WSA200408);
+ assertNotNull(seq);
+
+ sid2 = setupDestinationSequence("sequence2");
+ seq = store.getDestinationSequence(sid2, ProtocolVariation.RM10WSA200408);
+ assertNotNull(seq);
+ } finally {
+ if (null != sid1) {
+ store.removeDestinationSequence(sid1);
+ }
+ if (null != sid2) {
+ store.removeDestinationSequence(sid2);
+ }
+ }
+ }
+
+ @Test
+ public void testGetSourceSequence() throws SQLException, IOException {
+
+ Identifier sid1 = null;
+ Identifier sid2 = null;
+
+ SourceSequence seq =
+ store.getSourceSequence(new Identifier(), ProtocolVariation.RM10WSA200408);
+ assertNull(seq);
+
+ try {
+ sid1 = setupSourceSequence("sequence1");
+
+ seq = store.getSourceSequence(sid1, ProtocolVariation.RM10WSA200408);
+ assertNotNull(seq);
+
+ sid2 = setupSourceSequence("sequence2");
+ seq = store.getSourceSequence(sid2, ProtocolVariation.RM10WSA200408);
+ assertNotNull(seq);
+ } finally {
+ if (null != sid1) {
+ store.removeSourceSequence(sid1);
+ }
+ if (null != sid2) {
+ store.removeSourceSequence(sid2);
+ }
+ }
+ }
+
+ @Test
public void testGetMessages() throws SQLException, IOException {
Identifier sid1 = new Identifier();