You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2008/11/21 21:44:43 UTC
svn commit: r719706 [6/6] - in /activemq/activemq-blaze: ./ branches/ tags/
trunk/ trunk/src/ trunk/src/main/ trunk/src/main/java/
trunk/src/main/java/org/ trunk/src/main/java/org/apache/
trunk/src/main/java/org/apache/activeblaze/ trunk/src/main/java/...
Added: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/coordinated/BlazeCoordinatedGroupChannelTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/coordinated/BlazeCoordinatedGroupChannelTest.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/coordinated/BlazeCoordinatedGroupChannelTest.java (added)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/coordinated/BlazeCoordinatedGroupChannelTest.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.coordinated;
+
+import java.util.ArrayList;
+import java.util.List;
+import junit.framework.TestCase;
+
+/**
+ * Test for coordinated channel
+ *
+ */
+public class BlazeCoordinatedGroupChannelTest extends TestCase {
+ public void testGroup() throws Exception {
+ final int number = 3;
+ List<BlazeCoordinatedGroupChannel> channels = new ArrayList<BlazeCoordinatedGroupChannel>();
+ BlazeCoordinatedGroupChannelFactory factory = new BlazeCoordinatedGroupChannelFactory();
+ for (int i = 0; i < number; i++) {
+ BlazeCoordinatedGroupChannel channel = factory.createChannel("test" + i);
+ channel.getCoordinatedGroupConfiguration().setMinimumGroupSize(number);
+ channel.start();
+ channels.add(channel);
+ }
+ channels.get(number - 1).waitForElection(5000);
+ int coordinatorNumber = 0;
+ BlazeCoordinatedGroupChannel coordinator = null;
+ for (BlazeCoordinatedGroupChannel channel : channels) {
+ if (channel.isCoordinator()) {
+ coordinatorNumber++;
+ coordinator = channel;
+ }
+ }
+ assertNotNull(coordinator);
+ assertEquals(1, coordinatorNumber);
+ // kill the coordinator
+ coordinator.shutDown();
+ Thread.sleep(factory.getConfiguration().getHeartBeatInterval() * 2);
+ coordinatorNumber = 0;
+ coordinator = null;
+ for (BlazeCoordinatedGroupChannel channel : channels) {
+ if (channel.isCoordinator()) {
+ coordinatorNumber++;
+ coordinator = channel;
+ }
+ }
+ assertNotNull(coordinator);
+ assertEquals(1, coordinatorNumber);
+ for (BlazeCoordinatedGroupChannel channel : channels) {
+ channel.shutDown();
+ }
+ }
+
+ public void testWeightedGroup() throws Exception {
+ final int number = 4;
+ List<BlazeCoordinatedGroupChannel> channels = new ArrayList<BlazeCoordinatedGroupChannel>();
+ BlazeCoordinatedGroupChannelFactory factory = new BlazeCoordinatedGroupChannelFactory();
+ BlazeCoordinatedGroupChannel weightedCoordinator = null;
+ for (int i = 0; i < number; i++) {
+ BlazeCoordinatedGroupChannel channel = factory.createChannel("test" + i);
+ channel.getCoordinatedGroupConfiguration().setMinimumGroupSize(number);
+ if (i == number / 2) {
+ channel.getCoordinatedGroupConfiguration().setCoordinatorWeight(10);
+ weightedCoordinator=channel;
+ }else {
+ channel.getCoordinatedGroupConfiguration().setCoordinatorWeight(0);
+ }
+ channel.start();
+ channels.add(channel);
+
+ }
+ channels.get(number - 1).waitForElection(5000);
+ int coordinatorNumber = 0;
+ BlazeCoordinatedGroupChannel coordinator = null;
+ for (BlazeCoordinatedGroupChannel channel : channels) {
+ if (channel.isCoordinator()) {
+ coordinatorNumber++;
+ coordinator = channel;
+ }
+ }
+ assertNotNull(coordinator);
+ assertTrue(coordinator==weightedCoordinator);
+ assertEquals(1, coordinatorNumber);
+ for (BlazeCoordinatedGroupChannel channel : channels) {
+ channel.shutDown();
+ }
+ }
+}
Propchange: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/coordinated/BlazeCoordinatedGroupChannelTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/group/BlazeGroupChannelTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/group/BlazeGroupChannelTest.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/group/BlazeGroupChannelTest.java (added)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/group/BlazeGroupChannelTest.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.group;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.jms.JMSException;
+import junit.framework.TestCase;
+import org.apache.activeblaze.BlazeChannel;
+import org.apache.activeblaze.BlazeMessage;
+
+/**
+ * @author rajdavies
+ *
+ */
+public class BlazeGroupChannelTest extends TestCase {
+ /**
+ * Test method for
+ * {@link org.apache.activeblaze.group.BlazeGroupChannel#send(org.apache.activeblaze.group.Member, org.apache.activeblaze.BlazeMessage)}.
+ *
+ * @throws Exception
+ */
+ public void testSendMemberBlazeMessage() throws Exception {
+ final int number = 5;
+ final AtomicInteger count = new AtomicInteger();
+ List<BlazeGroupChannel> channels = new ArrayList<BlazeGroupChannel>();
+ BlazeGroupChannelFactory factory = new BlazeGroupChannelFactory();
+ for (int i = 0; i < number; i++) {
+ BlazeGroupChannel channel = factory.createGroupChannel("test" + i);
+ channel.start();
+ channels.add(channel);
+ channel.setInboxListener(new BlazeQueueListener() {
+ public void onMessage(BlazeMessage message) {
+ synchronized (count) {
+ synchronized (count) {
+ count.incrementAndGet();
+ count.notifyAll();
+ }
+ }
+ }
+ });
+ }
+ channels.get(0).getAndWaitForMemberByName(channels.get(number-1).getName(), 2000);
+ BlazeMessage msg = new BlazeMessage();
+ msg.setString("test", "hello");
+ channels.get(0).send(channels.get(1).getLocalMember(), msg);
+ synchronized (count) {
+ if (count.get() == 0) {
+ count.wait(5000);
+ }
+ }
+ // wait a while to check that only one got it
+ Thread.sleep(2000);
+ assertEquals(1, count.get());
+ for (BlazeGroupChannel channel : channels) {
+ channel.shutDown();
+ }
+ }
+
+ /**
+ * Test method for
+ * {@link org.apache.activeblaze.group.BlazeGroupChannel#sendRequest(org.apache.activeblaze.group.Member, org.apache.activeblaze.BlazeMessage)}.
+ *
+ * @throws Exception
+ */
+ public void testSendRequestMemberBlazeMessage() throws Exception {
+
+ final int number = 100;
+ final List<BlazeMessage> requests = new ArrayList<BlazeMessage>();
+ final List<BlazeMessage> replies = new ArrayList<BlazeMessage>();
+ for (int i = 0; i < number; i++) {
+ requests.add(new BlazeMessage("request" + i));
+ replies.add(new BlazeMessage("reply" + i));
+ }
+ BlazeGroupChannelFactory factory = new BlazeGroupChannelFactory();
+ final BlazeGroupChannel request = factory.createGroupChannel("request");
+ final BlazeGroupChannel reply = factory.createGroupChannel("reply");
+ request.start();
+ reply.start();
+ reply.setInboxListener(new BlazeQueueListener() {
+ public void onMessage(BlazeMessage message) {
+ if (!replies.isEmpty()) {
+ BlazeMessage replyMsg = replies.remove(0);
+ try {
+ Member to = reply.getMemberById(message.getFromId());
+ reply.sendReply(to, replyMsg, message.getMessageId());
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ });
+ Thread.sleep(100);
+ Member target = request.getMemberByName("reply");
+ for (int i = 0; i < requests.size(); i++) {
+ BlazeMessage requestMsg = requests.get(i);
+ BlazeMessage replyMsg = request.sendRequest(target, requestMsg);
+ assertNotNull(replyMsg);
+ }
+ assertTrue(replies.isEmpty());
+ request.shutDown();
+ reply.shutDown();
+ }
+
+
+ public void testSendRequestString() throws Exception{
+ String destination = "/test/foo";
+ final int number = 100;
+ final List<BlazeMessage> requests = new ArrayList<BlazeMessage>();
+ final List<BlazeMessage> replies = new ArrayList<BlazeMessage>();
+ for (int i = 0; i < number; i++) {
+ requests.add(new BlazeMessage("request" + i));
+ replies.add(new BlazeMessage("reply" + i));
+ }
+ BlazeGroupChannelFactory factory = new BlazeGroupChannelFactory();
+ final BlazeGroupChannel request = factory.createGroupChannel("request");
+ final BlazeGroupChannel reply = factory.createGroupChannel("reply");
+ reply.addBlazeQueueMessageListener(destination,new BlazeQueueListener() {
+ public void onMessage(BlazeMessage message) {
+ if (!replies.isEmpty()) {
+ BlazeMessage replyMsg = replies.remove(0);
+ try {
+ Member to = reply.getMemberById(message.getFromId());
+ reply.sendReply(to, replyMsg, message.getMessageId());
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ });
+ request.start();
+ reply.start();
+ Member result = request.getAndWaitForMemberByName("reply",1000);
+ assertNotNull(result);
+ for (int i = 0; i < requests.size(); i++) {
+ BlazeMessage requestMsg = requests.get(i);
+ BlazeMessage replyMsg = request.sendRequest(destination, requestMsg,1000);
+ assertNotNull(replyMsg);
+ }
+ assertTrue(replies.isEmpty());
+ request.shutDown();
+ reply.shutDown();
+ }
+
+
+ /**
+ * Test method for
+ * {@link org.apache.activeblaze.group.BlazeGroupChannel#send(java.lang.String, org.apache.activeblaze.BlazeMessage)}.
+ *
+ * @throws Exception
+ */
+ public void testSendStringBlazeMessage() throws Exception {
+ final int number = 2;
+ String destination = "test.foo";
+ final AtomicInteger count = new AtomicInteger();
+ List<BlazeGroupChannel> channels = new ArrayList<BlazeGroupChannel>();
+ BlazeGroupChannelFactory factory = new BlazeGroupChannelFactory();
+ for (int i = 0; i < number; i++) {
+ BlazeGroupChannel channel = factory.createGroupChannel("test" + i);
+ channel.start();
+ channels.add(channel);
+ channel.addBlazeQueueMessageListener(destination, new BlazeQueueListener() {
+ public void onMessage(BlazeMessage message) {
+ synchronized (count) {
+ synchronized (count) {
+ count.incrementAndGet();
+ count.notifyAll();
+ }
+ }
+ }
+ });
+ }
+ Thread.sleep(2000);
+ BlazeMessage msg = new BlazeMessage();
+ msg.setString("test", "hello");
+ channels.get(0).send(destination, msg);
+ synchronized (count) {
+ if (count.get() == 0) {
+ count.wait(5000);
+ }
+ }
+ // wait a while to check that only one got it
+ Thread.sleep(2000);
+ assertEquals(1, count.get());
+ for (BlazeChannel channel : channels) {
+ channel.shutDown();
+ }
+ }
+}
Propchange: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/group/BlazeGroupChannelTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/destination/DestinationMatchTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/destination/DestinationMatchTest.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/destination/DestinationMatchTest.java (added)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/destination/DestinationMatchTest.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.impl.destination;
+
+import org.apache.activeblaze.impl.destination.DestinationMatch;
+import org.apache.activemq.protobuf.Buffer;
+import junit.framework.TestCase;
+
+/**
+ * @author rajdavies
+ *
+ */
+public class DestinationMatchTest extends TestCase {
+
+ /**
+ * Test method for {@link org.apache.activeblaze.impl.destination.DestinationMatch#isMatch(byte[], byte[])}.
+ */
+ public void testIsMatch() {
+ String base = "foo.fred.blah";
+ byte[] data = base.getBytes();
+ byte[] test = new byte[data.length+4];
+ System.arraycopy(data, 0, test, 4, data.length);
+ doMatchTest(new Buffer(test,4,data.length));
+ }
+ private void doMatchTest(Buffer base) {
+ String match1 = "foo.fred.blah";
+ String match2 = "foo.*.blah";
+ String match3 = ">";
+ String match4 = ">.*.blah";
+ String match5 = ">.*.*";
+ String match6 = "*.*.*";
+ String match7 = "foo.fred.blah.>";
+
+ String fail1 = "blah.fred.foo";
+ String fail2 = "*.fred.foo";
+ String fail3 = "foo.*.foo.>";
+
+ assertTrue(DestinationMatch.isMatch(base, match1));
+ assertTrue(DestinationMatch.isMatch(base, match2));
+ assertTrue(DestinationMatch.isMatch(base, match3));
+ assertTrue(DestinationMatch.isMatch(base, match4));
+ assertTrue(DestinationMatch.isMatch(base, match5));
+ assertTrue(DestinationMatch.isMatch(base, match6));
+ assertTrue(DestinationMatch.isMatch(base, match7));
+
+ assertTrue(DestinationMatch.isMatch(match1,base));
+ assertTrue(DestinationMatch.isMatch(match2,base));
+ assertTrue(DestinationMatch.isMatch(match3,base));
+ assertTrue(DestinationMatch.isMatch(match4,base));
+ assertTrue(DestinationMatch.isMatch(match5,base));
+ assertTrue(DestinationMatch.isMatch(match6,base));
+ assertTrue(DestinationMatch.isMatch(match7,base));
+
+ assertFalse(DestinationMatch.isMatch(base, fail1));
+ assertFalse(DestinationMatch.isMatch(base, fail2));
+ assertFalse(DestinationMatch.isMatch(base, fail3));
+ }
+}
Propchange: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/destination/DestinationMatchTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/ChainedProcessorTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/ChainedProcessorTest.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/ChainedProcessorTest.java (added)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/ChainedProcessorTest.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.impl.processor;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import junit.framework.TestCase;
+import org.apache.activeblaze.wire.PacketData;
+
+/**
+ * Test some basics in ChainedProcessor
+ *
+ */
+public class ChainedProcessorTest extends TestCase {
+ public void testStart() throws Exception {
+ final AtomicBoolean test = new AtomicBoolean();
+ ChainedProcessor target = new ChainedProcessor() {
+ public boolean start() {
+ return test.getAndSet(true);
+ }
+ };
+ ChainedProcessor A = new ChainedProcessor();
+ ChainedProcessor B = new ChainedProcessor();
+ A.setNext(B);
+ A.setEnd(target);
+ A.start();
+ assertTrue(test.get());
+ }
+
+ public void testStop() throws Exception {
+ final AtomicBoolean test = new AtomicBoolean();
+ ChainedProcessor target = new ChainedProcessor() {
+ public boolean stop() {
+ return test.getAndSet(true);
+ }
+ };
+ ChainedProcessor A = new ChainedProcessor();
+ ChainedProcessor B = new ChainedProcessor();
+ A.setNext(B);
+ A.setEnd(target);
+ A.start();
+ A.stop();
+ assertTrue(test.get());
+ }
+
+ public void testDownStream() throws Exception {
+ final AtomicBoolean test = new AtomicBoolean();
+ ChainedProcessor target = new ChainedProcessor() {
+ public void downStream(Packet p) {
+ test.set(true);
+ }
+ };
+ ChainedProcessor A = new ChainedProcessor();
+ ChainedProcessor B = new ChainedProcessor();
+ ChainedProcessor C = new ChainedProcessor();
+ ChainedProcessor D = new ChainedProcessor();
+ A.setEnd(B);
+ A.setEnd(C);
+ A.setEnd(D);
+ A.setEnd(target);
+ A.start();
+ Packet p = new Packet(new PacketData());
+ D.downStream(p);
+ assertTrue(test.get());
+ }
+
+ public void testUpStream() throws Exception {
+ final AtomicBoolean test = new AtomicBoolean();
+ ChainedProcessor target = new ChainedProcessor() {
+ public void upStream(Packet p) {
+ test.set(true);
+ }
+ };
+ ChainedProcessor A = new ChainedProcessor();
+ ChainedProcessor B = new ChainedProcessor();
+ ChainedProcessor C = new ChainedProcessor();
+ ChainedProcessor D = new ChainedProcessor();
+ target.setEnd(A);
+ A.setEnd(B);
+ A.setEnd(C);
+ A.setEnd(D);
+ A.start();
+ Packet p = new Packet(new PacketData());
+ D.upStream(p);
+ assertTrue(test.get());
+ }
+}
Propchange: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/ChainedProcessorTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/CompressionProcessorTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/CompressionProcessorTest.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/CompressionProcessorTest.java (added)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/CompressionProcessorTest.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,42 @@
+/**
+ *
+ */
+package org.apache.activeblaze.impl.processor;
+
+import junit.framework.TestCase;
+import org.apache.activeblaze.wire.PacketData;
+import org.apache.activemq.protobuf.Buffer;
+
+/**
+ * @author rajdavies
+ *
+ */
+public class CompressionProcessorTest extends TestCase {
+ public void testProcessor() throws Exception {
+ Packet packet = new Packet(new PacketData());
+ byte[] d1 = new byte[1024];
+ Buffer payload = new Buffer(d1);
+ packet.getPacketData().setPayload(payload);
+ TerminatedChainedProcessor test = new TerminatedChainedProcessor();
+ CompressionProcessor proc = new CompressionProcessor();
+ proc.setPrev(test);
+ proc.setNext(test);
+ assertFalse(CompressionProcessor.isCompressed(packet.getPacketData().getPayload()));
+ proc.downStream(packet);
+ assertFalse(CompressionProcessor.isCompressed(packet.getPacketData().getPayload()));
+ // bigger payload
+ byte[] d2 = new byte[proc.getCompressionLimit() * 2];
+ for (int i = 0; i < d2.length; i++) {
+ d2[i] = (byte) i;
+ }
+ payload = new Buffer(d2);
+ packet.getPacketData().setPayload(payload);
+ proc.downStream(packet.clone());
+ Packet result = test.getResult();
+ assertTrue(CompressionProcessor.isCompressed(result.getPacketData().getPayload()));
+ proc.upStream(result.clone());
+ result = test.getResult();
+ assertFalse(CompressionProcessor.isCompressed(result.getPacketData().getPayload()));
+ assertEquals(result.getPacketData().getPayload().length, packet.getPacketData().getPayload().length);
+ }
+}
Propchange: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/CompressionProcessorTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/FragmentationProcessorTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/FragmentationProcessorTest.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/FragmentationProcessorTest.java (added)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/FragmentationProcessorTest.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.impl.processor;
+
+import java.util.ArrayList;
+import java.util.List;
+import junit.framework.TestCase;
+import org.apache.activeblaze.wire.PacketData;
+import org.apache.activemq.protobuf.Buffer;
+
+/**
+ * @author rajdavies
+ *
+ */
+public class FragmentationProcessorTest extends TestCase {
+ public void testProcessor() throws Exception {
+ Packet packet = new Packet(new PacketData());
+ byte[] testData = new byte[1024 * 32];
+ for (int i = 0; i < testData.length; i++) {
+ testData[i] = (byte) i;
+ }
+ Buffer payload = new Buffer(testData);
+ packet.getPacketData().setPayload(payload);
+ TerminatedChainedProcessor test = new TerminatedChainedProcessor();
+ FragmentationProcessor proc = new FragmentationProcessor();
+ proc.setPrev(test);
+ proc.setNext(test);
+ proc.setMaxPacketSize(1024);
+ proc.downStream(packet);
+ List<Packet> list = new ArrayList<Packet>(test.getPacketList());
+ assertTrue(list.size() > 1);
+ test.reset();
+ assertNull(test.getResult());
+ for (Packet p : list) {
+ proc.upStream(p);
+ }
+ Packet resultPacket = test.getResult();
+ byte[] result = resultPacket.getPacketData().getPayload().toByteArray();
+ assertEquals(result.length, testData.length);
+ for (int i = 0; i < result.length; i++) {
+ assertEquals("Testing byte at: " + i, testData[i], result[i]);
+ }
+ }
+}
Propchange: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/FragmentationProcessorTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/PacketAuditTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/PacketAuditTest.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/PacketAuditTest.java (added)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/PacketAuditTest.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.impl.processor;
+
+import junit.framework.TestCase;
+import org.apache.activeblaze.wire.PacketData;
+import org.apache.activemq.protobuf.Buffer;
+
+
+/**
+ *
+ *
+ */
+public class PacketAuditTest extends TestCase {
+
+ /**
+ * Test method for duplicates
+ * @throws Exception
+ */
+ public void testAudit() throws Exception {
+ PacketAudit audit = new PacketAudit();
+ audit.start();
+ for (long i =0; i< audit.getMaxAuditDepth();i++) {
+ PacketData data = new PacketData();
+ data.setProducerId(new Buffer("fred"));
+ data.setMessageSequence(i);
+ Packet packet = new Packet(data);
+ assertFalse(audit.isDuplicate(packet));
+ }
+
+ for (long i =0; i< audit.getMaxAuditDepth();i++) {
+ PacketData data = new PacketData();
+ data.setProducerId(new Buffer("fred"));
+ data.setMessageSequence(i);
+ Packet packet = new Packet(data);
+ assertTrue("Testing " + i,audit.isDuplicate(packet));
+ }
+ }
+}
Propchange: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/PacketAuditTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/TerminatedChainedProcessor.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/TerminatedChainedProcessor.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/TerminatedChainedProcessor.java (added)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/TerminatedChainedProcessor.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.impl.processor;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.activeblaze.impl.processor.ChainedProcessor;
+import org.apache.activeblaze.impl.processor.Packet;
+
+
+/**
+ * Test Processor
+ *
+ */
+public class TerminatedChainedProcessor extends ChainedProcessor {
+
+ private Packet result = null;
+ private List<Packet> list = new ArrayList<Packet>();
+ public TerminatedChainedProcessor() {
+ }
+
+ public void downStream(Packet packet){
+ this.result= packet;
+ this.list.add(packet);
+ }
+
+ public void upStream(Packet packet){
+ this.result=packet;
+ }
+
+ public Packet getResult() {
+ return this.result;
+ }
+
+ public List<Packet>getPacketList(){
+ return this.list;
+ }
+
+ public void reset(){
+ this.result=null;
+ this.list.clear();
+ }
+}
Propchange: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/TerminatedChainedProcessor.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/transport/MulticastTransportTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/transport/MulticastTransportTest.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/transport/MulticastTransportTest.java (added)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/transport/MulticastTransportTest.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.impl.transport;
+
+import java.net.InetSocketAddress;
+import java.net.URI;
+import org.apache.activeblaze.impl.processor.Packet;
+import org.apache.activeblaze.impl.processor.TerminatedChainedProcessor;
+import org.apache.activeblaze.wire.PacketData;
+import org.apache.activemq.protobuf.Buffer;
+import junit.framework.TestCase;
+
+/**
+ * Test Multicast Transport
+ *
+ */
+public class MulticastTransportTest extends TestCase {
+ public void testTransport() throws Exception {
+ URI broadcastURI = new URI("mcast://224.2.2.2:9999");
+ InetSocketAddress to = new InetSocketAddress(broadcastURI.getHost(), broadcastURI.getPort());
+ MulticastTransport sender = new MulticastTransport();
+ sender.setLocalURI(broadcastURI);
+ sender.init();
+ sender.start();
+ TerminatedChainedProcessor test = new TerminatedChainedProcessor();
+ MulticastTransport receiver = new MulticastTransport();
+ receiver.setPrev(test);
+ receiver.setLocalURI(broadcastURI);
+ receiver.init();
+ receiver.start();
+ String payload = "test String";
+ Buffer duff = new Buffer("duff");
+ PacketData packetData = new PacketData();
+ packetData.setType(1);
+ packetData.setMessageId(new Buffer("foo"));
+ packetData.setProducerId(duff);
+ packetData.setFromAddress(duff);
+ packetData.setSessionId(1);
+ packetData.setMessageSequence(0);
+ packetData.setPayload(new Buffer(payload));
+ Packet packet = new Packet(packetData);
+ packet.setTo(to);
+ sender.downStream(packet);
+ Thread.sleep(500);
+ assertEquals(payload, test.getResult().getPacketData().getPayload().toStringUtf8());
+ receiver.shutDown();
+ sender.shutDown();
+ }
+}
Propchange: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/transport/MulticastTransportTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/transport/UdpTransportTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/transport/UdpTransportTest.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/transport/UdpTransportTest.java (added)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/transport/UdpTransportTest.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.impl.transport;
+
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.URI;
+import junit.framework.TestCase;
+
+import org.apache.activeblaze.impl.processor.Packet;
+import org.apache.activeblaze.impl.processor.TerminatedChainedProcessor;
+import org.apache.activeblaze.wire.PacketData;
+import org.apache.activemq.protobuf.Buffer;
+
+
+
+/**
+ * @author rajdavies
+ *
+ */
+public class UdpTransportTest extends TestCase {
+
+ public void testTransport() throws Exception {
+ URI receiverURI = new URI("udp://localhost:6966");
+ URI senderURI = new URI("udp://localhost:6766");
+
+
+ UdpTransport sender = new UdpTransport();
+ sender.setLocalURI(senderURI);
+ sender.start();
+
+ TerminatedChainedProcessor test = new TerminatedChainedProcessor();
+ UdpTransport receiver = new UdpTransport();
+ receiver.setPrev(test);
+ receiver.setLocalURI(receiverURI);
+ receiver.start();
+
+
+ String payload = "test String";
+ Buffer duff = new Buffer("duff");
+ PacketData packetData = new PacketData();
+ packetData.setType(1);
+ packetData.setMessageId(new Buffer("foo"));
+ packetData.setProducerId(duff);
+ packetData.setFromAddress(duff);
+ packetData.setSessionId(1);
+ packetData.setMessageSequence(0);
+ packetData.setPayload(new Buffer(payload));
+ Packet packet = new Packet(receiverURI.getHost(),receiverURI.getPort(),packetData);
+
+
+ sender.downStream(packet);
+
+ Thread.sleep(500);
+
+ assertEquals(payload,test.getResult().getPacketData().getPayload().toStringUtf8());
+ receiver.shutDown();
+ sender.shutDown();
+ }
+}
Propchange: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/transport/UdpTransportTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/util/PropertyUtilTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/util/PropertyUtilTest.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/util/PropertyUtilTest.java (added)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/util/PropertyUtilTest.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.util;
+
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+import junit.framework.TestCase;
+
+/**
+ * Test cases for PropertyUtil
+ *
+ */
+public class PropertyUtilTest extends TestCase {
+ /**
+ * Test method for {@link org.apache.activeblaze.util.PropertyUtil#parseParameters(java.net.URI)}.
+ *
+ * @throws Exception
+ */
+ public void testParseParametersURI() throws Exception {
+ String query = "name=foo&type=blah";
+ URI uri = new URI("http://blah:60606?" + query);
+ Map<String, String> result = PropertyUtil.parseParameters(uri);
+ assertEquals(2, result.size());
+ assertEquals("foo", result.get("name"));
+ assertEquals("blah", result.get("type"));
+ }
+
+ /**
+ * Test method for {@link org.apache.activeblaze.util.PropertyUtil#parseParameters(java.lang.String)}.
+ *
+ * @throws Exception
+ */
+ public void testParseParametersString() throws Exception {
+ String query = "name=foo&type=blah";
+ String uri = "somestring?" + query;
+ Map<String, String> result = PropertyUtil.parseParameters(uri);
+ assertEquals(2, result.size());
+ assertEquals("foo", result.get("name"));
+ assertEquals("blah", result.get("type"));
+ }
+
+ /**
+ * Test method for
+ * {@link org.apache.activeblaze.util.PropertyUtil#addPropertiesToURI(java.lang.String, java.util.Map)}.
+ * @throws Exception
+ */
+ public void testAddPropertiesToURI() throws Exception {
+ String query = "type=blah&name=foo";
+ Map<String, String> map = new HashMap<String, String>();
+ map.put("name", "foo");
+ map.put("type", "blah");
+ String test = "somestring?"+query;
+ String result = PropertyUtil.addPropertiesToURI(test, map);
+ URI uri = new URI(result);
+ assertEquals(uri.getQuery(),query);
+ }
+}
Propchange: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/util/PropertyUtilTest.java
------------------------------------------------------------------------------
svn:eol-style = native