You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by js...@apache.org on 2006/03/28 18:19:27 UTC
svn commit: r389533 - in /incubator/activemq/trunk/activemq-core: ./
src/test/java/org/apache/activemq/transport/reliable/
Author: jstrachan
Date: Tue Mar 28 08:19:23 2006
New Revision: 389533
URL: http://svn.apache.org/viewcvs?rev=389533&view=rev
Log:
added some test cases (that are not yet working) for reliable UDP
Added:
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/reliable/
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/reliable/DropCommandStrategy.java (with props)
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/reliable/ReliableTransportTest.java
- copied, changed from r387160, incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/ReliableTransportTest.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/reliable/UnreliableCommandDatagramChannel.java (with props)
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/reliable/UnreliableCommandDatagramSocket.java (with props)
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/reliable/UnreliableUdpTransport.java (with props)
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/reliable/UnreliableUdpTransportTest.java (with props)
Modified:
incubator/activemq/trunk/activemq-core/project.xml
Modified: incubator/activemq/trunk/activemq-core/project.xml
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/project.xml?rev=389533&r1=389532&r2=389533&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/project.xml (original)
+++ incubator/activemq/trunk/activemq-core/project.xml Tue Mar 28 08:19:23 2006
@@ -162,7 +162,7 @@
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
- <version>${xbean_spring_version}</version>
+ <version>2.2</version>
<url>http://www.xbean.org</url>
<properties>
<war.bundle>true</war.bundle>
@@ -344,28 +344,34 @@
<!-- This test currently fails -->
<exclude>**/ItStillMarshallsTheSameTest.*</exclude>
- <!-- http://jira.activemq.org/jira/browse/AMQ-522 -->
+ <!-- https://issues.apache.org/activemq/browse/AMQ-522 -->
<exclude>**/ProxyConnectorTest.*</exclude>
- <!-- http://jira.activemq.org/jira/browse/AMQ-594 -->
+ <!-- https://issues.apache.org/activemq/browse/AMQ-594 -->
<exclude>**/SimpleNetworkTest.*</exclude>
- <!-- http://jira.activemq.org/jira/browse/AMQ-583 -->
+ <!-- https://issues.apache.org/activemq/browse/AMQ-583 -->
<exclude>**/DiscoveryTransportBrokerTest.*</exclude>
- <!-- http://jira.activemq.org/jira/browse/AMQ-610 -->
+ <!-- https://issues.apache.org/activemq/browse/AMQ-610 -->
<exclude>**/FanoutTransportBrokerTest.*</exclude>
- <!-- http://jira.activemq.org/jira/browse/AMQ-626 -->
+ <!-- https://issues.apache.org/activemq/browse/AMQ-626 -->
<exclude>**/MultipleTestsWithSpringFactoryBeanTest.*</exclude>
<exclude>**/MultipleTestsWithXBeanFactoryBeanTest.*</exclude>
<exclude>**/MultipleTestsWithSpringXBeanFactoryBeanTest.*</exclude>
+ <!-- https://issues.apache.org/activemq/browse/AMQ-667 -->
+ <exclude>**/SslTransportBrokerTest.*</exclude>
+
+
<!-- TODO FIX ME ASAP -->
<exclude>**/MulticastNetworkTest.*</exclude>
<exclude>**/UnreliableUdpTransportTest.*</exclude>
<exclude>**/MulticastTransportTest.*</exclude>
<exclude>**/UdpSendReceiveWithTwoConnectionsAndLargeMessagesTest.*</exclude>
+
+ <exclude>**/UnreliableUdpTransportTest.*</exclude>
</excludes>
</unitTest>
<resources>
Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/reliable/DropCommandStrategy.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/reliable/DropCommandStrategy.java?rev=389533&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/reliable/DropCommandStrategy.java (added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/reliable/DropCommandStrategy.java Tue Mar 28 08:19:23 2006
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.reliable;
+
+import java.net.SocketAddress;
+
+/**
+ *
+ * @version $Revision: $
+ */
+public interface DropCommandStrategy {
+
+ /**
+ * Returns true if the command should be dropped for
+ * the given command ID and address
+ */
+ boolean shouldDropCommand(int commandId, SocketAddress address, boolean redelivery);
+
+}
Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/reliable/DropCommandStrategy.java
------------------------------------------------------------------------------
svn:eol-style = native
Copied: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/reliable/ReliableTransportTest.java (from r387160, incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/ReliableTransportTest.java)
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/reliable/ReliableTransportTest.java?p2=incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/reliable/ReliableTransportTest.java&p1=incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/ReliableTransportTest.java&r1=387160&r2=389533&rev=389533&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/ReliableTransportTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/reliable/ReliableTransportTest.java Tue Mar 28 08:19:23 2006
@@ -14,11 +14,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.transport;
+package org.apache.activemq.transport.reliable;
import edu.emory.mathcs.backport.java.util.Queue;
import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.transport.StubTransport;
+import org.apache.activemq.transport.StubTransportListener;
import org.apache.activemq.transport.reliable.ExceptionIfDroppedReplayStrategy;
import org.apache.activemq.transport.reliable.ReliableTransport;
import org.apache.activemq.transport.reliable.ReplayStrategy;
Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/reliable/UnreliableCommandDatagramChannel.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/reliable/UnreliableCommandDatagramChannel.java?rev=389533&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/reliable/UnreliableCommandDatagramChannel.java (added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/reliable/UnreliableCommandDatagramChannel.java Tue Mar 28 08:19:23 2006
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.reliable;
+
+import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.transport.udp.ByteBufferPool;
+import org.apache.activemq.transport.udp.CommandDatagramChannel;
+import org.apache.activemq.transport.udp.DatagramHeaderMarshaller;
+import org.apache.activemq.transport.udp.UdpTransport;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.DatagramChannel;
+
+/**
+ *
+ * @version $Revision: $
+ */
+public class UnreliableCommandDatagramChannel extends CommandDatagramChannel {
+
+ private static final Log log = LogFactory.getLog(UnreliableCommandDatagramChannel.class);
+
+ private DropCommandStrategy dropCommandStrategy;
+
+ public UnreliableCommandDatagramChannel(UdpTransport transport, OpenWireFormat wireFormat, int datagramSize,
+ SocketAddress targetAddress, DatagramHeaderMarshaller headerMarshaller, ReplayBuffer replayBuffer, DatagramChannel channel,
+ ByteBufferPool bufferPool, DropCommandStrategy strategy) {
+ super(transport, wireFormat, datagramSize, targetAddress, headerMarshaller, channel, bufferPool);
+ this.dropCommandStrategy = strategy;
+ }
+
+ protected void sendWriteBuffer(int commandId, SocketAddress address, ByteBuffer writeBuffer, boolean redelivery) throws IOException {
+ if (dropCommandStrategy.shouldDropCommand(commandId, address, redelivery)) {
+ writeBuffer.flip();
+ log.info("Dropping datagram with command: " + commandId);
+
+ // lets still add it to the replay buffer though!
+ getReplayBuffer().addBuffer(commandId, writeBuffer);
+ }
+ else {
+ super.sendWriteBuffer(commandId, address, writeBuffer, redelivery);
+ }
+ }
+}
Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/reliable/UnreliableCommandDatagramChannel.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/reliable/UnreliableCommandDatagramSocket.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/reliable/UnreliableCommandDatagramSocket.java?rev=389533&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/reliable/UnreliableCommandDatagramSocket.java (added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/reliable/UnreliableCommandDatagramSocket.java Tue Mar 28 08:19:23 2006
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.reliable;
+
+import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.transport.udp.CommandDatagramSocket;
+import org.apache.activemq.transport.udp.DatagramHeaderMarshaller;
+import org.apache.activemq.transport.udp.UdpTransport;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.IOException;
+import java.net.DatagramSocket;
+import java.net.SocketAddress;
+
+/**
+ *
+ * @version $Revision: $
+ */
+public class UnreliableCommandDatagramSocket extends CommandDatagramSocket {
+ private static final Log log = LogFactory.getLog(UnreliableCommandDatagramSocket.class);
+
+ private DropCommandStrategy dropCommandStrategy;
+
+ public UnreliableCommandDatagramSocket(UdpTransport transport, OpenWireFormat wireFormat, int datagramSize,
+ SocketAddress targetAddress, DatagramHeaderMarshaller headerMarshaller, DatagramSocket channel,
+ DropCommandStrategy strategy) {
+ super(transport, wireFormat, datagramSize, targetAddress, headerMarshaller, channel);
+ this.dropCommandStrategy = strategy;
+ }
+
+ protected void sendWriteBuffer(int commandId, SocketAddress address, byte[] data, boolean redelivery)
+ throws IOException {
+ if (dropCommandStrategy.shouldDropCommand(commandId, address, redelivery)) {
+ log.info("Dropping datagram with command: " + commandId);
+
+ // lets still add it to the replay buffer though!
+ ReplayBuffer bufferCache = getReplayBuffer();
+ if (bufferCache != null && !redelivery) {
+ bufferCache.addBuffer(commandId, data);
+ }
+ }
+ else {
+ super.sendWriteBuffer(commandId, address, data, redelivery);
+ }
+ }
+}
Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/reliable/UnreliableCommandDatagramSocket.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/reliable/UnreliableUdpTransport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/reliable/UnreliableUdpTransport.java?rev=389533&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/reliable/UnreliableUdpTransport.java (added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/reliable/UnreliableUdpTransport.java Tue Mar 28 08:19:23 2006
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.reliable;
+
+import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.transport.udp.CommandChannel;
+import org.apache.activemq.transport.udp.CommandDatagramChannel;
+import org.apache.activemq.transport.udp.UdpTransport;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.net.URI;
+import java.net.UnknownHostException;
+
+/**
+ * An unreliable UDP transport that will randomly discard packets to simulate a
+ * bad network (or UDP buffers being flooded).
+ *
+ * @version $Revision: $
+ */
+public class UnreliableUdpTransport extends UdpTransport {
+
+ private DropCommandStrategy dropCommandStrategy;
+
+ public UnreliableUdpTransport(OpenWireFormat wireFormat, int port) throws UnknownHostException, IOException {
+ super(wireFormat, port);
+ }
+
+ public UnreliableUdpTransport(OpenWireFormat wireFormat, SocketAddress socketAddress) throws IOException {
+ super(wireFormat, socketAddress);
+ }
+
+ public UnreliableUdpTransport(OpenWireFormat wireFormat, URI remoteLocation) throws UnknownHostException,
+ IOException {
+ super(wireFormat, remoteLocation);
+ }
+
+ public UnreliableUdpTransport(OpenWireFormat wireFormat) throws IOException {
+ super(wireFormat);
+ }
+
+ public DropCommandStrategy getDropCommandStrategy() {
+ return dropCommandStrategy;
+ }
+
+ public void setDropCommandStrategy(DropCommandStrategy dropCommandStrategy) {
+ this.dropCommandStrategy = dropCommandStrategy;
+ }
+
+ protected CommandChannel createCommandDatagramChannel() {
+ return new UnreliableCommandDatagramChannel(this, getWireFormat(), getDatagramSize(), getTargetAddress(),
+ createDatagramHeaderMarshaller(), getReplayBuffer(), getChannel(), getBufferPool(), dropCommandStrategy);
+ }
+
+}
Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/reliable/UnreliableUdpTransport.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/reliable/UnreliableUdpTransportTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/reliable/UnreliableUdpTransportTest.java?rev=389533&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/reliable/UnreliableUdpTransportTest.java (added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/reliable/UnreliableUdpTransportTest.java Tue Mar 28 08:19:23 2006
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.reliable;
+
+import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.transport.CommandJoiner;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.udp.ResponseRedirectInterceptor;
+import org.apache.activemq.transport.udp.UdpTransport;
+import org.apache.activemq.transport.udp.UdpTransportTest;
+
+import java.net.SocketAddress;
+import java.net.URI;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ *
+ * @version $Revision: $
+ */
+public class UnreliableUdpTransportTest extends UdpTransportTest {
+
+ protected DropCommandStrategy dropStrategy = new DropCommandStrategy() {
+
+ public boolean shouldDropCommand(int commandId, SocketAddress address, boolean redelivery) {
+ if (redelivery) {
+ return false;
+ }
+ return commandId % 3 == 2;
+ }
+ };
+
+ protected Transport createProducer() throws Exception {
+ System.out.println("Producer using URI: " + producerURI);
+
+ OpenWireFormat wireFormat = createWireFormat();
+ UnreliableUdpTransport transport = new UnreliableUdpTransport(wireFormat, new URI(producerURI));
+ transport.setDropCommandStrategy(dropStrategy);
+
+ ReliableTransport reliableTransport = new ReliableTransport(transport, transport);
+ Replayer replayer = reliableTransport.getReplayer();
+ reliableTransport.setReplayStrategy(createReplayStrategy(replayer));
+
+ return new CommandJoiner(reliableTransport, wireFormat);
+ }
+
+ protected Transport createConsumer() throws Exception {
+ System.out.println("Consumer on port: " + consumerPort);
+ OpenWireFormat wireFormat = createWireFormat();
+ UdpTransport transport = new UdpTransport(wireFormat, consumerPort);
+
+ ReliableTransport reliableTransport = new ReliableTransport(transport, transport);
+ Replayer replayer = reliableTransport.getReplayer();
+ reliableTransport.setReplayStrategy(createReplayStrategy(replayer));
+
+ ResponseRedirectInterceptor redirectInterceptor = new ResponseRedirectInterceptor(reliableTransport, transport);
+ return new CommandJoiner(redirectInterceptor, wireFormat);
+ }
+
+ protected ReplayStrategy createReplayStrategy(Replayer replayer) {
+ assertNotNull("Should have a replayer!", replayer);
+ return new DefaultReplayStrategy(1);
+ }
+}
Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/reliable/UnreliableUdpTransportTest.java
------------------------------------------------------------------------------
svn:eol-style = native