You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2008/11/21 21:44:43 UTC

svn commit: r719706 [6/6] - in /activemq/activemq-blaze: ./ branches/ tags/ trunk/ trunk/src/ trunk/src/main/ trunk/src/main/java/ trunk/src/main/java/org/ trunk/src/main/java/org/apache/ trunk/src/main/java/org/apache/activeblaze/ trunk/src/main/java/...

Added: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/coordinated/BlazeCoordinatedGroupChannelTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/coordinated/BlazeCoordinatedGroupChannelTest.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/coordinated/BlazeCoordinatedGroupChannelTest.java (added)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/coordinated/BlazeCoordinatedGroupChannelTest.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.coordinated;
+
+import java.util.ArrayList;
+import java.util.List;
+import junit.framework.TestCase;
+
+/**
+ * Test for coordinated channel
+ * 
+ */
+public class BlazeCoordinatedGroupChannelTest extends TestCase {
+    public void testGroup() throws Exception {
+        final int number = 3;
+        List<BlazeCoordinatedGroupChannel> channels = new ArrayList<BlazeCoordinatedGroupChannel>();
+        BlazeCoordinatedGroupChannelFactory factory = new BlazeCoordinatedGroupChannelFactory();
+        for (int i = 0; i < number; i++) {
+            BlazeCoordinatedGroupChannel channel = factory.createChannel("test" + i);
+            channel.getCoordinatedGroupConfiguration().setMinimumGroupSize(number);
+            channel.start();
+            channels.add(channel);
+        }
+        channels.get(number - 1).waitForElection(5000);
+        int coordinatorNumber = 0;
+        BlazeCoordinatedGroupChannel coordinator = null;
+        for (BlazeCoordinatedGroupChannel channel : channels) {
+            if (channel.isCoordinator()) {
+                coordinatorNumber++;
+                coordinator = channel;
+            }
+        }
+        assertNotNull(coordinator);
+        assertEquals(1, coordinatorNumber);
+        // kill the coordinator
+        coordinator.shutDown();
+        Thread.sleep(factory.getConfiguration().getHeartBeatInterval() * 2);
+        coordinatorNumber = 0;
+        coordinator = null;
+        for (BlazeCoordinatedGroupChannel channel : channels) {
+            if (channel.isCoordinator()) {
+                coordinatorNumber++;
+                coordinator = channel;
+            }
+        }
+        assertNotNull(coordinator);
+        assertEquals(1, coordinatorNumber);
+        for (BlazeCoordinatedGroupChannel channel : channels) {
+            channel.shutDown();
+        }
+    }
+
+    public void testWeightedGroup() throws Exception {
+        final int number = 4;
+        List<BlazeCoordinatedGroupChannel> channels = new ArrayList<BlazeCoordinatedGroupChannel>();
+        BlazeCoordinatedGroupChannelFactory factory = new BlazeCoordinatedGroupChannelFactory();
+        BlazeCoordinatedGroupChannel weightedCoordinator = null;
+        for (int i = 0; i < number; i++) {
+            BlazeCoordinatedGroupChannel channel = factory.createChannel("test" + i);
+            channel.getCoordinatedGroupConfiguration().setMinimumGroupSize(number);
+            if (i == number / 2) {
+                channel.getCoordinatedGroupConfiguration().setCoordinatorWeight(10);
+                weightedCoordinator=channel;
+            }else {
+                channel.getCoordinatedGroupConfiguration().setCoordinatorWeight(0);
+            }
+                channel.start();
+                channels.add(channel);
+            
+        }
+        channels.get(number - 1).waitForElection(5000);
+        int coordinatorNumber = 0;
+        BlazeCoordinatedGroupChannel coordinator = null;
+        for (BlazeCoordinatedGroupChannel channel : channels) {
+            if (channel.isCoordinator()) {
+                coordinatorNumber++;
+                coordinator = channel;
+            }
+        }
+        assertNotNull(coordinator);
+        assertTrue(coordinator==weightedCoordinator);
+        assertEquals(1, coordinatorNumber);
+        for (BlazeCoordinatedGroupChannel channel : channels) {
+            channel.shutDown();
+        }
+    }
+}

Propchange: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/coordinated/BlazeCoordinatedGroupChannelTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/group/BlazeGroupChannelTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/group/BlazeGroupChannelTest.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/group/BlazeGroupChannelTest.java (added)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/group/BlazeGroupChannelTest.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.group;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.jms.JMSException;
+import junit.framework.TestCase;
+import org.apache.activeblaze.BlazeChannel;
+import org.apache.activeblaze.BlazeMessage;
+
+/**
+ * @author rajdavies
+ * 
+ */
+public class BlazeGroupChannelTest extends TestCase {
+    /**
+     * Test method for
+     * {@link org.apache.activeblaze.group.BlazeGroupChannel#send(org.apache.activeblaze.group.Member, org.apache.activeblaze.BlazeMessage)}.
+     * 
+     * @throws Exception
+     */
+    public void testSendMemberBlazeMessage() throws Exception {
+        final int number = 5;
+        final AtomicInteger count = new AtomicInteger();
+        List<BlazeGroupChannel> channels = new ArrayList<BlazeGroupChannel>();
+        BlazeGroupChannelFactory factory = new BlazeGroupChannelFactory();
+        for (int i = 0; i < number; i++) {
+            BlazeGroupChannel channel = factory.createGroupChannel("test" + i);
+            channel.start();
+            channels.add(channel);
+            channel.setInboxListener(new BlazeQueueListener() {
+                public void onMessage(BlazeMessage message) {
+                    synchronized (count) {
+                        synchronized (count) {
+                            count.incrementAndGet();
+                            count.notifyAll();
+                        }
+                    }
+                }
+            });
+        }
+        channels.get(0).getAndWaitForMemberByName(channels.get(number-1).getName(), 2000);
+        BlazeMessage msg = new BlazeMessage();
+        msg.setString("test", "hello");
+        channels.get(0).send(channels.get(1).getLocalMember(), msg);
+        synchronized (count) {
+            if (count.get() == 0) {
+                count.wait(5000);
+            }
+        }
+        // wait a while to check that only one got it
+        Thread.sleep(2000);
+        assertEquals(1, count.get());
+        for (BlazeGroupChannel channel : channels) {
+            channel.shutDown();
+        }
+    }
+
+    /**
+     * Test method for
+     * {@link org.apache.activeblaze.group.BlazeGroupChannel#sendRequest(org.apache.activeblaze.group.Member, org.apache.activeblaze.BlazeMessage)}.
+     * 
+     * @throws Exception
+     */
+    public void testSendRequestMemberBlazeMessage() throws Exception {
+       
+        final int number = 100;
+        final List<BlazeMessage> requests = new ArrayList<BlazeMessage>();
+        final List<BlazeMessage> replies = new ArrayList<BlazeMessage>();
+        for (int i = 0; i < number; i++) {
+            requests.add(new BlazeMessage("request" + i));
+            replies.add(new BlazeMessage("reply" + i));
+        }
+        BlazeGroupChannelFactory factory = new BlazeGroupChannelFactory();
+        final BlazeGroupChannel request = factory.createGroupChannel("request");
+        final BlazeGroupChannel reply = factory.createGroupChannel("reply");
+        request.start();
+        reply.start();
+        reply.setInboxListener(new BlazeQueueListener() {
+            public void onMessage(BlazeMessage message) {
+                if (!replies.isEmpty()) {
+                    BlazeMessage replyMsg = replies.remove(0);
+                    try {
+                        Member to = reply.getMemberById(message.getFromId());
+                        reply.sendReply(to, replyMsg, message.getMessageId());
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
+                }
+            }
+        });
+        Thread.sleep(100);
+        Member target = request.getMemberByName("reply");
+        for (int i = 0; i < requests.size(); i++) {
+            BlazeMessage requestMsg = requests.get(i);
+            BlazeMessage replyMsg = request.sendRequest(target, requestMsg);
+            assertNotNull(replyMsg);
+        }
+        assertTrue(replies.isEmpty());
+        request.shutDown();
+        reply.shutDown();
+    }
+    
+    
+    public void testSendRequestString() throws Exception{
+        String destination = "/test/foo";
+        final int number = 100;
+        final List<BlazeMessage> requests = new ArrayList<BlazeMessage>();
+        final List<BlazeMessage> replies = new ArrayList<BlazeMessage>();
+        for (int i = 0; i < number; i++) {
+            requests.add(new BlazeMessage("request" + i));
+            replies.add(new BlazeMessage("reply" + i));
+        }
+        BlazeGroupChannelFactory factory = new BlazeGroupChannelFactory();
+        final BlazeGroupChannel request = factory.createGroupChannel("request");
+        final BlazeGroupChannel reply = factory.createGroupChannel("reply");
+        reply.addBlazeQueueMessageListener(destination,new BlazeQueueListener() {
+            public void onMessage(BlazeMessage message) {
+                if (!replies.isEmpty()) {
+                    BlazeMessage replyMsg = replies.remove(0);
+                    try {
+                        Member to = reply.getMemberById(message.getFromId());
+                        reply.sendReply(to, replyMsg, message.getMessageId());
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
+                }
+            }
+        });
+        request.start();
+        reply.start();
+        Member result = request.getAndWaitForMemberByName("reply",1000);
+        assertNotNull(result);
+        for (int i = 0; i < requests.size(); i++) {
+            BlazeMessage requestMsg = requests.get(i);
+            BlazeMessage replyMsg = request.sendRequest(destination, requestMsg,1000);
+            assertNotNull(replyMsg);
+        }
+        assertTrue(replies.isEmpty());
+        request.shutDown();
+        reply.shutDown();
+    }
+
+
+    /**
+     * Test method for
+     * {@link org.apache.activeblaze.group.BlazeGroupChannel#send(java.lang.String, org.apache.activeblaze.BlazeMessage)}.
+     * 
+     * @throws Exception
+     */
+    public void testSendStringBlazeMessage() throws Exception {
+        final int number = 2;
+        String destination = "test.foo";
+        final AtomicInteger count = new AtomicInteger();
+        List<BlazeGroupChannel> channels = new ArrayList<BlazeGroupChannel>();
+        BlazeGroupChannelFactory factory = new BlazeGroupChannelFactory();
+        for (int i = 0; i < number; i++) {
+            BlazeGroupChannel channel = factory.createGroupChannel("test" + i);
+            channel.start();
+            channels.add(channel);
+            channel.addBlazeQueueMessageListener(destination, new BlazeQueueListener() {
+                public void onMessage(BlazeMessage message) {
+                    synchronized (count) {
+                        synchronized (count) {
+                            count.incrementAndGet();
+                            count.notifyAll();
+                        }
+                    }
+                }
+            });
+        }
+        Thread.sleep(2000);
+        BlazeMessage msg = new BlazeMessage();
+        msg.setString("test", "hello");
+        channels.get(0).send(destination, msg);
+        synchronized (count) {
+            if (count.get() == 0) {
+                count.wait(5000);
+            }
+        }
+        // wait a while to check that only one got it
+        Thread.sleep(2000);
+        assertEquals(1, count.get());
+        for (BlazeChannel channel : channels) {
+            channel.shutDown();
+        }
+    }
+}

Propchange: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/group/BlazeGroupChannelTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/destination/DestinationMatchTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/destination/DestinationMatchTest.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/destination/DestinationMatchTest.java (added)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/destination/DestinationMatchTest.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.impl.destination;
+
+import org.apache.activeblaze.impl.destination.DestinationMatch;
+import org.apache.activemq.protobuf.Buffer;
+import junit.framework.TestCase;
+
+/**
+ * @author rajdavies
+ *
+ */
+public class DestinationMatchTest extends TestCase {
+   
+    /**
+     * Test method for {@link org.apache.activeblaze.impl.destination.DestinationMatch#isMatch(byte[], byte[])}.
+     */
+    public void testIsMatch() {
+        String base = "foo.fred.blah";
+        byte[] data = base.getBytes();
+        byte[] test = new byte[data.length+4];
+        System.arraycopy(data, 0, test, 4, data.length);
+        doMatchTest(new Buffer(test,4,data.length));
+    }
+    private void doMatchTest(Buffer base) {
+        String match1 = "foo.fred.blah";
+        String match2 = "foo.*.blah";
+        String match3 = ">";
+        String match4 = ">.*.blah";
+        String match5 = ">.*.*";
+        String match6 = "*.*.*";
+        String match7 = "foo.fred.blah.>";
+        
+        String fail1 = "blah.fred.foo";
+        String fail2 = "*.fred.foo";
+        String fail3 = "foo.*.foo.>";
+       
+        assertTrue(DestinationMatch.isMatch(base, match1));
+        assertTrue(DestinationMatch.isMatch(base, match2));
+        assertTrue(DestinationMatch.isMatch(base, match3));
+        assertTrue(DestinationMatch.isMatch(base, match4));
+        assertTrue(DestinationMatch.isMatch(base, match5));
+        assertTrue(DestinationMatch.isMatch(base, match6));
+        assertTrue(DestinationMatch.isMatch(base, match7));
+        
+        assertTrue(DestinationMatch.isMatch(match1,base));
+        assertTrue(DestinationMatch.isMatch(match2,base));
+        assertTrue(DestinationMatch.isMatch(match3,base));
+        assertTrue(DestinationMatch.isMatch(match4,base));
+        assertTrue(DestinationMatch.isMatch(match5,base));
+        assertTrue(DestinationMatch.isMatch(match6,base));
+        assertTrue(DestinationMatch.isMatch(match7,base));
+        
+        assertFalse(DestinationMatch.isMatch(base, fail1));
+        assertFalse(DestinationMatch.isMatch(base, fail2));
+        assertFalse(DestinationMatch.isMatch(base, fail3));
+    }
+}

Propchange: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/destination/DestinationMatchTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/ChainedProcessorTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/ChainedProcessorTest.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/ChainedProcessorTest.java (added)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/ChainedProcessorTest.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.impl.processor;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import junit.framework.TestCase;
+import org.apache.activeblaze.wire.PacketData;
+
+/**
+ * Test some basics in ChainedProcessor
+ * 
+ */
+public class ChainedProcessorTest extends TestCase {
+    public void testStart() throws Exception {
+        final AtomicBoolean test = new AtomicBoolean();
+        ChainedProcessor target = new ChainedProcessor() {
+            public boolean start() {
+                return test.getAndSet(true);
+            }
+        };
+        ChainedProcessor A = new ChainedProcessor();
+        ChainedProcessor B = new ChainedProcessor();
+        A.setNext(B);
+        A.setEnd(target);
+        A.start();
+        assertTrue(test.get());
+    }
+
+    public void testStop() throws Exception {
+        final AtomicBoolean test = new AtomicBoolean();
+        ChainedProcessor target = new ChainedProcessor() {
+            public boolean stop() {
+                return test.getAndSet(true);
+            }
+        };
+        ChainedProcessor A = new ChainedProcessor();
+        ChainedProcessor B = new ChainedProcessor();
+        A.setNext(B);
+        A.setEnd(target);
+        A.start();
+        A.stop();
+        assertTrue(test.get());
+    }
+
+    public void testDownStream() throws Exception {
+        final AtomicBoolean test = new AtomicBoolean();
+        ChainedProcessor target = new ChainedProcessor() {
+            public void downStream(Packet p) {
+                test.set(true);
+            }
+        };
+        ChainedProcessor A = new ChainedProcessor();
+        ChainedProcessor B = new ChainedProcessor();
+        ChainedProcessor C = new ChainedProcessor();
+        ChainedProcessor D = new ChainedProcessor();
+        A.setEnd(B);
+        A.setEnd(C);
+        A.setEnd(D);
+        A.setEnd(target);
+        A.start();
+        Packet p = new Packet(new PacketData());
+        D.downStream(p);
+        assertTrue(test.get());
+    }
+
+    public void testUpStream() throws Exception {
+        final AtomicBoolean test = new AtomicBoolean();
+        ChainedProcessor target = new ChainedProcessor() {
+            public void upStream(Packet p) {
+                test.set(true);
+            }
+        };
+        ChainedProcessor A = new ChainedProcessor();
+        ChainedProcessor B = new ChainedProcessor();
+        ChainedProcessor C = new ChainedProcessor();
+        ChainedProcessor D = new ChainedProcessor();
+        target.setEnd(A);
+        A.setEnd(B);
+        A.setEnd(C);
+        A.setEnd(D);
+        A.start();
+        Packet p = new Packet(new PacketData());
+        D.upStream(p);
+        assertTrue(test.get());
+    }
+}

Propchange: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/ChainedProcessorTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/CompressionProcessorTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/CompressionProcessorTest.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/CompressionProcessorTest.java (added)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/CompressionProcessorTest.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,42 @@
+/**
+ * 
+ */
+package org.apache.activeblaze.impl.processor;
+
+import junit.framework.TestCase;
+import org.apache.activeblaze.wire.PacketData;
+import org.apache.activemq.protobuf.Buffer;
+
+/**
+ * @author rajdavies
+ * 
+ */
+public class CompressionProcessorTest extends TestCase {
+    public void testProcessor() throws Exception {
+        Packet packet = new Packet(new PacketData());
+        byte[] d1 = new byte[1024];
+        Buffer payload = new Buffer(d1);
+        packet.getPacketData().setPayload(payload);
+        TerminatedChainedProcessor test = new TerminatedChainedProcessor();
+        CompressionProcessor proc = new CompressionProcessor();
+        proc.setPrev(test);
+        proc.setNext(test);
+        assertFalse(CompressionProcessor.isCompressed(packet.getPacketData().getPayload()));
+        proc.downStream(packet);
+        assertFalse(CompressionProcessor.isCompressed(packet.getPacketData().getPayload()));
+        // bigger payload
+        byte[] d2 = new byte[proc.getCompressionLimit() * 2];
+        for (int i = 0; i < d2.length; i++) {
+            d2[i] = (byte) i;
+        }
+        payload = new Buffer(d2);
+        packet.getPacketData().setPayload(payload);
+        proc.downStream(packet.clone());
+        Packet result = test.getResult();
+        assertTrue(CompressionProcessor.isCompressed(result.getPacketData().getPayload()));
+        proc.upStream(result.clone());
+        result = test.getResult();
+        assertFalse(CompressionProcessor.isCompressed(result.getPacketData().getPayload()));
+        assertEquals(result.getPacketData().getPayload().length, packet.getPacketData().getPayload().length);
+    }
+}

Propchange: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/CompressionProcessorTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/FragmentationProcessorTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/FragmentationProcessorTest.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/FragmentationProcessorTest.java (added)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/FragmentationProcessorTest.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.impl.processor;
+
+import java.util.ArrayList;
+import java.util.List;
+import junit.framework.TestCase;
+import org.apache.activeblaze.wire.PacketData;
+import org.apache.activemq.protobuf.Buffer;
+
+/**
+ * @author rajdavies
+ * 
+ */
+public class FragmentationProcessorTest extends TestCase {
+    public void testProcessor() throws Exception {
+        Packet packet = new Packet(new PacketData());
+        byte[] testData = new byte[1024 * 32];
+        for (int i = 0; i < testData.length; i++) {
+            testData[i] = (byte) i;
+        }
+        Buffer payload = new Buffer(testData);
+        packet.getPacketData().setPayload(payload);
+        TerminatedChainedProcessor test = new TerminatedChainedProcessor();
+        FragmentationProcessor proc = new FragmentationProcessor();
+        proc.setPrev(test);
+        proc.setNext(test);
+        proc.setMaxPacketSize(1024);
+        proc.downStream(packet);
+        List<Packet> list = new ArrayList<Packet>(test.getPacketList());
+        assertTrue(list.size() > 1);
+        test.reset();
+        assertNull(test.getResult());
+        for (Packet p : list) {
+            proc.upStream(p);
+        }
+        Packet resultPacket = test.getResult();
+        byte[] result = resultPacket.getPacketData().getPayload().toByteArray();
+        assertEquals(result.length, testData.length);
+        for (int i = 0; i < result.length; i++) {
+            assertEquals("Testing byte at: " + i, testData[i], result[i]);
+        }
+    }
+}

Propchange: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/FragmentationProcessorTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/PacketAuditTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/PacketAuditTest.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/PacketAuditTest.java (added)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/PacketAuditTest.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.impl.processor;
+
+import junit.framework.TestCase;
+import org.apache.activeblaze.wire.PacketData;
+import org.apache.activemq.protobuf.Buffer;
+
+
+/**
+ *
+ *
+ */
+public class PacketAuditTest extends TestCase {
+   
+    /**
+     * Test method for duplicates
+     * @throws Exception 
+     */
+    public void testAudit() throws Exception {
+        PacketAudit audit = new PacketAudit();
+        audit.start();
+        for (long i =0; i< audit.getMaxAuditDepth();i++) {
+            PacketData data = new PacketData();
+            data.setProducerId(new Buffer("fred"));
+            data.setMessageSequence(i);
+            Packet packet = new Packet(data);
+            assertFalse(audit.isDuplicate(packet));
+        }
+        
+        for (long i =0; i< audit.getMaxAuditDepth();i++) {
+            PacketData data = new PacketData();
+            data.setProducerId(new Buffer("fred"));
+            data.setMessageSequence(i);
+            Packet packet = new Packet(data);
+            assertTrue("Testing " + i,audit.isDuplicate(packet));
+        }
+    }
+}

Propchange: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/PacketAuditTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/TerminatedChainedProcessor.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/TerminatedChainedProcessor.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/TerminatedChainedProcessor.java (added)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/TerminatedChainedProcessor.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.impl.processor;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.activeblaze.impl.processor.ChainedProcessor;
+import org.apache.activeblaze.impl.processor.Packet;
+
+
+/**
+ * Test Processor
+ *
+ */
+public class TerminatedChainedProcessor extends ChainedProcessor {
+
+    private Packet result = null;
+    private List<Packet> list = new ArrayList<Packet>();
+    public TerminatedChainedProcessor() {
+    }
+    
+    public void downStream(Packet packet){
+        this.result= packet;
+        this.list.add(packet);
+    }
+
+    public void upStream(Packet packet){
+        this.result=packet;
+    }
+    
+    public Packet getResult() {
+        return this.result;
+    }
+    
+    public List<Packet>getPacketList(){
+        return this.list;
+    }
+    
+    public void reset(){
+        this.result=null;
+        this.list.clear();
+    }
+}

Propchange: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/TerminatedChainedProcessor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/transport/MulticastTransportTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/transport/MulticastTransportTest.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/transport/MulticastTransportTest.java (added)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/transport/MulticastTransportTest.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.impl.transport;
+
+import java.net.InetSocketAddress;
+import java.net.URI;
+import org.apache.activeblaze.impl.processor.Packet;
+import org.apache.activeblaze.impl.processor.TerminatedChainedProcessor;
+import org.apache.activeblaze.wire.PacketData;
+import org.apache.activemq.protobuf.Buffer;
+import junit.framework.TestCase;
+
+/**
+ * Test Multicast Transport
+ * 
+ */
+public class MulticastTransportTest extends TestCase {
+    public void testTransport() throws Exception {
+        URI broadcastURI = new URI("mcast://224.2.2.2:9999");
+        InetSocketAddress to = new InetSocketAddress(broadcastURI.getHost(), broadcastURI.getPort());
+        MulticastTransport sender = new MulticastTransport();
+        sender.setLocalURI(broadcastURI);
+        sender.init();
+        sender.start();
+        TerminatedChainedProcessor test = new TerminatedChainedProcessor();
+        MulticastTransport receiver = new MulticastTransport();
+        receiver.setPrev(test);
+        receiver.setLocalURI(broadcastURI);
+        receiver.init();
+        receiver.start();
+        String payload = "test String";
+        Buffer duff = new Buffer("duff");
+        PacketData packetData = new PacketData();
+        packetData.setType(1);
+        packetData.setMessageId(new Buffer("foo"));
+        packetData.setProducerId(duff);
+        packetData.setFromAddress(duff);
+        packetData.setSessionId(1);
+        packetData.setMessageSequence(0);
+        packetData.setPayload(new Buffer(payload));
+        Packet packet = new Packet(packetData);
+        packet.setTo(to);
+        sender.downStream(packet);
+        Thread.sleep(500);
+        assertEquals(payload, test.getResult().getPacketData().getPayload().toStringUtf8());
+        receiver.shutDown();
+        sender.shutDown();
+    }
+}

Propchange: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/transport/MulticastTransportTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/transport/UdpTransportTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/transport/UdpTransportTest.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/transport/UdpTransportTest.java (added)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/transport/UdpTransportTest.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.impl.transport;
+
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.URI;
+import junit.framework.TestCase;
+
+import org.apache.activeblaze.impl.processor.Packet;
+import org.apache.activeblaze.impl.processor.TerminatedChainedProcessor;
+import org.apache.activeblaze.wire.PacketData;
+import org.apache.activemq.protobuf.Buffer;
+
+
+
+/**
+ * @author rajdavies
+ *
+ */
+public class UdpTransportTest extends TestCase {
+    
+    public void testTransport() throws Exception {
+        URI receiverURI = new URI("udp://localhost:6966");
+        URI senderURI = new URI("udp://localhost:6766");
+                
+        
+        UdpTransport sender = new UdpTransport();
+        sender.setLocalURI(senderURI);
+        sender.start();
+        
+        TerminatedChainedProcessor test = new TerminatedChainedProcessor();
+        UdpTransport receiver = new UdpTransport();
+        receiver.setPrev(test);
+        receiver.setLocalURI(receiverURI);
+        receiver.start();
+        
+        
+            String payload = "test String";
+            Buffer duff = new Buffer("duff");
+            PacketData packetData = new PacketData();
+            packetData.setType(1);
+            packetData.setMessageId(new Buffer("foo"));
+            packetData.setProducerId(duff);
+            packetData.setFromAddress(duff);
+            packetData.setSessionId(1);
+            packetData.setMessageSequence(0);
+            packetData.setPayload(new Buffer(payload));
+            Packet packet = new Packet(receiverURI.getHost(),receiverURI.getPort(),packetData);
+            
+           
+            sender.downStream(packet);
+       
+        Thread.sleep(500);
+        
+        assertEquals(payload,test.getResult().getPacketData().getPayload().toStringUtf8());
+        receiver.shutDown();
+        sender.shutDown();
+    }
+}

Propchange: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/transport/UdpTransportTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/util/PropertyUtilTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/util/PropertyUtilTest.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/util/PropertyUtilTest.java (added)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/util/PropertyUtilTest.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.util;
+
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+import junit.framework.TestCase;
+
+/**
+ * Test cases for PropertyUtil
+ * 
+ */
+public class PropertyUtilTest extends TestCase {
+    /**
+     * Test method for {@link org.apache.activeblaze.util.PropertyUtil#parseParameters(java.net.URI)}.
+     * 
+     * @throws Exception
+     */
+    public void testParseParametersURI() throws Exception {
+        String query = "name=foo&type=blah";
+        URI uri = new URI("http://blah:60606?" + query);
+        Map<String, String> result = PropertyUtil.parseParameters(uri);
+        assertEquals(2, result.size());
+        assertEquals("foo", result.get("name"));
+        assertEquals("blah", result.get("type"));
+    }
+
+    /**
+     * Test method for {@link org.apache.activeblaze.util.PropertyUtil#parseParameters(java.lang.String)}.
+     * 
+     * @throws Exception
+     */
+    public void testParseParametersString() throws Exception {
+        String query = "name=foo&type=blah";
+        String uri = "somestring?" + query;
+        Map<String, String> result = PropertyUtil.parseParameters(uri);
+        assertEquals(2, result.size());
+        assertEquals("foo", result.get("name"));
+        assertEquals("blah", result.get("type"));
+    }
+
+    /**
+     * Test method for
+     * {@link org.apache.activeblaze.util.PropertyUtil#addPropertiesToURI(java.lang.String, java.util.Map)}.
+     * @throws Exception 
+     */
+    public void testAddPropertiesToURI() throws Exception {
+        String query = "type=blah&name=foo";
+        Map<String, String> map = new HashMap<String, String>();
+        map.put("name", "foo");
+        map.put("type", "blah");
+        String test = "somestring?"+query;
+        String result = PropertyUtil.addPropertiesToURI(test, map);
+        URI uri = new URI(result);
+        assertEquals(uri.getQuery(),query);
+    }
+}

Propchange: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/util/PropertyUtilTest.java
------------------------------------------------------------------------------
    svn:eol-style = native