You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by js...@apache.org on 2006/03/28 18:19:27 UTC

svn commit: r389533 - in /incubator/activemq/trunk/activemq-core: ./ src/test/java/org/apache/activemq/transport/reliable/

Author: jstrachan
Date: Tue Mar 28 08:19:23 2006
New Revision: 389533

URL: http://svn.apache.org/viewcvs?rev=389533&view=rev
Log:
added some test cases (that are not yet working) for reliable UDP

Added:
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/reliable/
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/reliable/DropCommandStrategy.java   (with props)
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/reliable/ReliableTransportTest.java
      - copied, changed from r387160, incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/ReliableTransportTest.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/reliable/UnreliableCommandDatagramChannel.java   (with props)
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/reliable/UnreliableCommandDatagramSocket.java   (with props)
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/reliable/UnreliableUdpTransport.java   (with props)
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/reliable/UnreliableUdpTransportTest.java   (with props)
Modified:
    incubator/activemq/trunk/activemq-core/project.xml

Modified: incubator/activemq/trunk/activemq-core/project.xml
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/project.xml?rev=389533&r1=389532&r2=389533&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/project.xml (original)
+++ incubator/activemq/trunk/activemq-core/project.xml Tue Mar 28 08:19:23 2006
@@ -162,7 +162,7 @@
         <dependency>
             <groupId>org.apache.xbean</groupId>
             <artifactId>xbean-spring</artifactId>
-            <version>${xbean_spring_version}</version>
+            <version>2.2</version>
             <url>http://www.xbean.org</url>
             <properties>
                 <war.bundle>true</war.bundle>
@@ -344,28 +344,34 @@
                 <!-- This test currently fails -->
                 <exclude>**/ItStillMarshallsTheSameTest.*</exclude>
                 
-                <!-- http://jira.activemq.org/jira/browse/AMQ-522 -->
+                <!-- https://issues.apache.org/activemq/browse/AMQ-522 -->
                 <exclude>**/ProxyConnectorTest.*</exclude>
 
-                <!-- http://jira.activemq.org/jira/browse/AMQ-594 -->
+                <!-- https://issues.apache.org/activemq/browse/AMQ-594 -->
                 <exclude>**/SimpleNetworkTest.*</exclude>
                 
-                <!-- http://jira.activemq.org/jira/browse/AMQ-583 -->
+                <!-- https://issues.apache.org/activemq/browse/AMQ-583 -->
                 <exclude>**/DiscoveryTransportBrokerTest.*</exclude>
                 
-                <!-- http://jira.activemq.org/jira/browse/AMQ-610 -->
+                <!-- https://issues.apache.org/activemq/browse/AMQ-610 -->
                 <exclude>**/FanoutTransportBrokerTest.*</exclude>
                 
-                <!-- http://jira.activemq.org/jira/browse/AMQ-626 -->
+                <!-- https://issues.apache.org/activemq/browse/AMQ-626 -->
                 <exclude>**/MultipleTestsWithSpringFactoryBeanTest.*</exclude>
                 <exclude>**/MultipleTestsWithXBeanFactoryBeanTest.*</exclude>
                 <exclude>**/MultipleTestsWithSpringXBeanFactoryBeanTest.*</exclude>
                 
+                <!-- https://issues.apache.org/activemq/browse/AMQ-667 -->
+                <exclude>**/SslTransportBrokerTest.*</exclude>
+                
+                
                 <!-- TODO FIX ME ASAP -->
                 <exclude>**/MulticastNetworkTest.*</exclude>
                 <exclude>**/UnreliableUdpTransportTest.*</exclude>
                 <exclude>**/MulticastTransportTest.*</exclude>
                 <exclude>**/UdpSendReceiveWithTwoConnectionsAndLargeMessagesTest.*</exclude>
+
+                <exclude>**/UnreliableUdpTransportTest.*</exclude>
             </excludes>
         </unitTest>
         <resources>

Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/reliable/DropCommandStrategy.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/reliable/DropCommandStrategy.java?rev=389533&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/reliable/DropCommandStrategy.java (added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/reliable/DropCommandStrategy.java Tue Mar 28 08:19:23 2006
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.reliable;
+
+import java.net.SocketAddress;
+
+/**
+ *
+ * @version $Revision: $
+ */
+public interface DropCommandStrategy {
+
+    /**
+     * Returns true if the command should be dropped for
+     * the given command ID and address
+     */
+    boolean shouldDropCommand(int commandId, SocketAddress address, boolean redelivery);
+
+}

Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/reliable/DropCommandStrategy.java
------------------------------------------------------------------------------
    svn:eol-style = native

Copied: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/reliable/ReliableTransportTest.java (from r387160, incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/ReliableTransportTest.java)
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/reliable/ReliableTransportTest.java?p2=incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/reliable/ReliableTransportTest.java&p1=incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/ReliableTransportTest.java&r1=387160&r2=389533&rev=389533&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/ReliableTransportTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/reliable/ReliableTransportTest.java Tue Mar 28 08:19:23 2006
@@ -14,11 +14,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.transport;
+package org.apache.activemq.transport.reliable;
 
 import edu.emory.mathcs.backport.java.util.Queue;
 
 import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.transport.StubTransport;
+import org.apache.activemq.transport.StubTransportListener;
 import org.apache.activemq.transport.reliable.ExceptionIfDroppedReplayStrategy;
 import org.apache.activemq.transport.reliable.ReliableTransport;
 import org.apache.activemq.transport.reliable.ReplayStrategy;

Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/reliable/UnreliableCommandDatagramChannel.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/reliable/UnreliableCommandDatagramChannel.java?rev=389533&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/reliable/UnreliableCommandDatagramChannel.java (added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/reliable/UnreliableCommandDatagramChannel.java Tue Mar 28 08:19:23 2006
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.reliable;
+
+import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.transport.udp.ByteBufferPool;
+import org.apache.activemq.transport.udp.CommandDatagramChannel;
+import org.apache.activemq.transport.udp.DatagramHeaderMarshaller;
+import org.apache.activemq.transport.udp.UdpTransport;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.DatagramChannel;
+
+/**
+ * 
+ * @version $Revision: $
+ */
+public class UnreliableCommandDatagramChannel extends CommandDatagramChannel {
+
+    private static final Log log = LogFactory.getLog(UnreliableCommandDatagramChannel.class);
+
+    private DropCommandStrategy dropCommandStrategy;
+
+    public UnreliableCommandDatagramChannel(UdpTransport transport, OpenWireFormat wireFormat, int datagramSize,
+            SocketAddress targetAddress, DatagramHeaderMarshaller headerMarshaller, ReplayBuffer replayBuffer, DatagramChannel channel,
+            ByteBufferPool bufferPool, DropCommandStrategy strategy) {
+        super(transport, wireFormat, datagramSize, targetAddress, headerMarshaller, channel, bufferPool);
+        this.dropCommandStrategy = strategy;
+    }
+
+    protected void sendWriteBuffer(int commandId, SocketAddress address, ByteBuffer writeBuffer, boolean redelivery) throws IOException {
+        if (dropCommandStrategy.shouldDropCommand(commandId, address, redelivery)) {
+            writeBuffer.flip();
+            log.info("Dropping datagram with command: " + commandId);
+            
+            // lets still add it to the replay buffer though!
+            getReplayBuffer().addBuffer(commandId, writeBuffer);
+        }
+        else {
+            super.sendWriteBuffer(commandId, address, writeBuffer, redelivery);
+        }
+    }
+}

Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/reliable/UnreliableCommandDatagramChannel.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/reliable/UnreliableCommandDatagramSocket.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/reliable/UnreliableCommandDatagramSocket.java?rev=389533&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/reliable/UnreliableCommandDatagramSocket.java (added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/reliable/UnreliableCommandDatagramSocket.java Tue Mar 28 08:19:23 2006
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.reliable;
+
+import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.transport.udp.CommandDatagramSocket;
+import org.apache.activemq.transport.udp.DatagramHeaderMarshaller;
+import org.apache.activemq.transport.udp.UdpTransport;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.IOException;
+import java.net.DatagramSocket;
+import java.net.SocketAddress;
+
+/**
+ * 
+ * @version $Revision: $
+ */
+public class UnreliableCommandDatagramSocket extends CommandDatagramSocket {
+    private static final Log log = LogFactory.getLog(UnreliableCommandDatagramSocket.class);
+
+    private DropCommandStrategy dropCommandStrategy;
+
+    public UnreliableCommandDatagramSocket(UdpTransport transport, OpenWireFormat wireFormat, int datagramSize,
+            SocketAddress targetAddress, DatagramHeaderMarshaller headerMarshaller, DatagramSocket channel,
+            DropCommandStrategy strategy) {
+        super(transport, wireFormat, datagramSize, targetAddress, headerMarshaller, channel);
+        this.dropCommandStrategy = strategy;
+    }
+
+    protected void sendWriteBuffer(int commandId, SocketAddress address, byte[] data, boolean redelivery)
+            throws IOException {
+        if (dropCommandStrategy.shouldDropCommand(commandId, address, redelivery)) {
+            log.info("Dropping datagram with command: " + commandId);
+
+            // lets still add it to the replay buffer though!
+            ReplayBuffer bufferCache = getReplayBuffer();
+            if (bufferCache != null && !redelivery) {
+                bufferCache.addBuffer(commandId, data);
+            }
+        }
+        else {
+            super.sendWriteBuffer(commandId, address, data, redelivery);
+        }
+    }
+}

Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/reliable/UnreliableCommandDatagramSocket.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/reliable/UnreliableUdpTransport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/reliable/UnreliableUdpTransport.java?rev=389533&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/reliable/UnreliableUdpTransport.java (added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/reliable/UnreliableUdpTransport.java Tue Mar 28 08:19:23 2006
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.reliable;
+
+import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.transport.udp.CommandChannel;
+import org.apache.activemq.transport.udp.CommandDatagramChannel;
+import org.apache.activemq.transport.udp.UdpTransport;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.net.URI;
+import java.net.UnknownHostException;
+
+/**
+ * An unreliable UDP transport that will randomly discard packets to simulate a
+ * bad network (or UDP buffers being flooded).
+ * 
+ * @version $Revision: $
+ */
+public class UnreliableUdpTransport extends UdpTransport {
+
+    private DropCommandStrategy dropCommandStrategy;
+
+    public UnreliableUdpTransport(OpenWireFormat wireFormat, int port) throws UnknownHostException, IOException {
+        super(wireFormat, port);
+    }
+
+    public UnreliableUdpTransport(OpenWireFormat wireFormat, SocketAddress socketAddress) throws IOException {
+        super(wireFormat, socketAddress);
+    }
+
+    public UnreliableUdpTransport(OpenWireFormat wireFormat, URI remoteLocation) throws UnknownHostException,
+            IOException {
+        super(wireFormat, remoteLocation);
+    }
+
+    public UnreliableUdpTransport(OpenWireFormat wireFormat) throws IOException {
+        super(wireFormat);
+    }
+
+    public DropCommandStrategy getDropCommandStrategy() {
+        return dropCommandStrategy;
+    }
+
+    public void setDropCommandStrategy(DropCommandStrategy dropCommandStrategy) {
+        this.dropCommandStrategy = dropCommandStrategy;
+    }
+
+    protected CommandChannel createCommandDatagramChannel() {
+        return new UnreliableCommandDatagramChannel(this, getWireFormat(), getDatagramSize(), getTargetAddress(),
+                createDatagramHeaderMarshaller(), getReplayBuffer(), getChannel(), getBufferPool(), dropCommandStrategy);
+    }
+
+}

Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/reliable/UnreliableUdpTransport.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/reliable/UnreliableUdpTransportTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/reliable/UnreliableUdpTransportTest.java?rev=389533&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/reliable/UnreliableUdpTransportTest.java (added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/reliable/UnreliableUdpTransportTest.java Tue Mar 28 08:19:23 2006
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.reliable;
+
+import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.transport.CommandJoiner;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.udp.ResponseRedirectInterceptor;
+import org.apache.activemq.transport.udp.UdpTransport;
+import org.apache.activemq.transport.udp.UdpTransportTest;
+
+import java.net.SocketAddress;
+import java.net.URI;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * 
+ * @version $Revision: $
+ */
+public class UnreliableUdpTransportTest extends UdpTransportTest {
+
+    protected DropCommandStrategy dropStrategy = new DropCommandStrategy() {
+        
+        public boolean shouldDropCommand(int commandId, SocketAddress address, boolean redelivery) {
+            if (redelivery) {
+                return false;
+            }
+            return commandId % 3 == 2;
+        }
+    };
+
+    protected Transport createProducer() throws Exception {
+        System.out.println("Producer using URI: " + producerURI);
+
+        OpenWireFormat wireFormat = createWireFormat();
+        UnreliableUdpTransport transport = new UnreliableUdpTransport(wireFormat, new URI(producerURI));
+        transport.setDropCommandStrategy(dropStrategy);
+
+        ReliableTransport reliableTransport = new ReliableTransport(transport, transport);
+        Replayer replayer = reliableTransport.getReplayer();
+        reliableTransport.setReplayStrategy(createReplayStrategy(replayer));
+
+        return new CommandJoiner(reliableTransport, wireFormat);
+    }
+
+    protected Transport createConsumer() throws Exception {
+        System.out.println("Consumer on port: " + consumerPort);
+        OpenWireFormat wireFormat = createWireFormat();
+        UdpTransport transport = new UdpTransport(wireFormat, consumerPort);
+
+        ReliableTransport reliableTransport = new ReliableTransport(transport, transport);
+        Replayer replayer = reliableTransport.getReplayer();
+        reliableTransport.setReplayStrategy(createReplayStrategy(replayer));
+
+        ResponseRedirectInterceptor redirectInterceptor = new ResponseRedirectInterceptor(reliableTransport, transport);
+        return new CommandJoiner(redirectInterceptor, wireFormat);
+    }
+
+    protected ReplayStrategy createReplayStrategy(Replayer replayer) {
+        assertNotNull("Should have a replayer!", replayer);
+        return new DefaultReplayStrategy(1);
+    }
+}

Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/reliable/UnreliableUdpTransportTest.java
------------------------------------------------------------------------------
    svn:eol-style = native