You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2013/07/19 20:44:24 UTC
svn commit: r1504961 [11/11] - in /activemq/activemq-blaze/trunk: ./
src/main/java/org/apache/activeblaze/
src/main/java/org/apache/activeblaze/cluster/
src/main/java/org/apache/activeblaze/group/
src/main/java/org/apache/activeblaze/impl/destination/ ...
Modified: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/ClusterStateTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/ClusterStateTest.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/ClusterStateTest.java (original)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/ClusterStateTest.java Fri Jul 19 18:44:21 2013
@@ -19,12 +19,12 @@ package org.apache.activeblaze.cluster;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+
import junit.framework.TestCase;
import org.apache.activeblaze.group.Member;
/**
* Tests for ClusterState
- *
*/
public class ClusterStateTest extends TestCase {
protected BlazeClusterGroupChannel channel1;
@@ -122,7 +122,7 @@ public class ClusterStateTest extends Te
}
}
- public void XtestExpireImplicitWriteLock() throws Exception {
+ public void testExpireImplicitWriteLock() throws Exception {
ClusterState state1 = this.channel1.getState();
final AtomicBoolean called = new AtomicBoolean();
this.channel1.start();
@@ -133,22 +133,28 @@ public class ClusterStateTest extends Te
this.channel2.start();
this.channel2.waitForElection(0);
state2.put("test", "foo");
+
try {
+
state1.put("test", "bah");
fail("Should have thrown an exception!");
} catch (ClusterUpdateException e) {
+
}
Thread.sleep(2000);
state1.put("test", "bah");
}
- public void XtestExpireImplicitLockOnExit() throws Exception {
+ public void testExpireImplicitLockOnExit() throws Exception {
+
ClusterState state1 = this.channel1.getState();
this.channel1.start();
ClusterState state2 = this.channel2.getState();
state2.setAlwaysLock(true);
- // state2.setLockTimeToLive(1000);
+ //state2.setLockTimeToLive(1000);
this.channel2.getConfiguration().setMinimumGroupSize(2);
+
+
this.channel2.start();
this.channel2.waitForElection(0);
validateCluster();
@@ -160,7 +166,7 @@ public class ClusterStateTest extends Te
}
this.channel2.shutDown();
this.channel1.getConfiguration().setMinimumGroupSize(1);
- Thread.sleep(1000);
+ Thread.sleep(2000);
state1.put("test", "bah");
}
@@ -232,7 +238,7 @@ public class ClusterStateTest extends Te
assertTrue(state1.isEmpty());
}
- public void XtestMapUpdatedOnStart() throws Exception {
+ public void testMapUpdatedOnStart() throws Exception {
ClusterState state1 = this.channel1.getState();
final AtomicBoolean called = new AtomicBoolean();
this.channel1.getConfiguration().setMinimumGroupSize(1);
Modified: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/group/BlazeGroupChannelTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/group/BlazeGroupChannelTest.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/group/BlazeGroupChannelTest.java (original)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/group/BlazeGroupChannelTest.java Fri Jul 19 18:44:21 2013
@@ -19,6 +19,7 @@ package org.apache.activeblaze.group;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
+
import junit.framework.TestCase;
import org.apache.activeblaze.BlazeChannel;
import org.apache.activeblaze.BlazeMessage;
@@ -26,14 +27,12 @@ import org.apache.activeblaze.BlazeMessa
/**
* Test BlazeGroupChannel
- *
*/
public class BlazeGroupChannelTest extends TestCase {
/**
* Test method for
- * {@link org.apache.activeblaze.group.BlazeGroupChannel#send(org.apache.activeblaze.group.Member, org.apache.activeblaze.BlazeMessage)}.
- *
- * @throws Exception
+ * {@link org.apache.activeblaze.group.BlazeGroupChannel#send(org.apache.activeblaze.group.Member, org.apache.activeblaze.BlazeMessage)}
+ * .
*/
public void testSendMemberBlazeMessage() throws Exception {
final int number = 5;
@@ -75,9 +74,8 @@ public class BlazeGroupChannelTest exten
/**
* Test method for
- * {@link org.apache.activeblaze.group.BlazeGroupChannel#sendRequest(org.apache.activeblaze.group.Member, org.apache.activeblaze.BlazeMessage)}.
- *
- * @throws Exception
+ * {@link org.apache.activeblaze.group.BlazeGroupChannel#sendRequest(org.apache.activeblaze.group.Member, org.apache.activeblaze.BlazeMessage)}
+ * .
*/
public void testSendRequestMemberBlazeMessage() throws Exception {
final int number = 100;
@@ -119,7 +117,7 @@ public class BlazeGroupChannelTest exten
reply.shutDown();
}
- public void testSendRequestString() throws Exception {
+ public void testSendRequestString() throws Exception {
String destination = "/test/foo";
final int number = 10;
final List<BlazeMessage> requests = new ArrayList<BlazeMessage>();
@@ -148,7 +146,7 @@ public class BlazeGroupChannelTest exten
reply.addToGroup("test");
request.start();
reply.start();
- Member result = request.getAndWaitForMemberByName("reply", 1000);
+ Member result = request.getAndWaitForMemberByName("reply", 10000);
assertNotNull(result);
for (int i = 0; i < requests.size(); i++) {
BlazeMessage requestMsg = requests.get(i);
@@ -162,9 +160,8 @@ public class BlazeGroupChannelTest exten
/**
* Test method for
- * {@link org.apache.activeblaze.group.BlazeGroupChannel#send(java.lang.String, org.apache.activeblaze.BlazeMessage)}.
- *
- * @throws Exception
+ * {@link org.apache.activeblaze.group.BlazeGroupChannel#send(java.lang.String, org.apache.activeblaze.BlazeMessage)}
+ * .
*/
public void testSendStringBlazeMessage() throws Exception {
final int number = 2;
Modified: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/destination/DestinationMatchTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/destination/DestinationMatchTest.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/destination/DestinationMatchTest.java (original)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/destination/DestinationMatchTest.java Fri Jul 19 18:44:21 2013
@@ -16,26 +16,26 @@
*/
package org.apache.activeblaze.impl.destination;
-import org.apache.activeblaze.impl.destination.DestinationMatch;
-import org.apache.activemq.protobuf.Buffer;
import junit.framework.TestCase;
+import org.apache.activeblaze.wire.Buffer;
/**
- * @author rajdavies
+ *
*
*/
public class DestinationMatchTest extends TestCase {
-
+
/**
- * Test method for {@link org.apache.activeblaze.impl.destination.DestinationMatch#isMatch(byte[], byte[])}.
+ * Test method for DEstinationMatch
*/
public void testIsMatch() {
String base = "foo.fred.blah";
byte[] data = base.getBytes();
- byte[] test = new byte[data.length+4];
+ byte[] test = new byte[data.length + 4];
System.arraycopy(data, 0, test, 4, data.length);
- doMatchTest(new Buffer(test,4,data.length));
+ doMatchTest(new Buffer(test, 4, data.length));
}
+
private void doMatchTest(Buffer base) {
String match1 = "foo.fred.blah";
String match2 = "foo.*.blah";
@@ -44,11 +44,11 @@ public class DestinationMatchTest extend
String match5 = ">.*.*";
String match6 = "*.*.*";
String match7 = "foo.fred.blah.>";
-
+
String fail1 = "blah.fred.foo";
String fail2 = "*.fred.foo";
String fail3 = "foo.*.foo.>";
-
+
assertTrue(DestinationMatch.isMatch(base, match1));
assertTrue(DestinationMatch.isMatch(base, match2));
assertTrue(DestinationMatch.isMatch(base, match3));
@@ -56,15 +56,15 @@ public class DestinationMatchTest extend
assertTrue(DestinationMatch.isMatch(base, match5));
assertTrue(DestinationMatch.isMatch(base, match6));
assertTrue(DestinationMatch.isMatch(base, match7));
-
- assertTrue(DestinationMatch.isMatch(match1,base));
- assertTrue(DestinationMatch.isMatch(match2,base));
- assertTrue(DestinationMatch.isMatch(match3,base));
- assertTrue(DestinationMatch.isMatch(match4,base));
- assertTrue(DestinationMatch.isMatch(match5,base));
- assertTrue(DestinationMatch.isMatch(match6,base));
- assertTrue(DestinationMatch.isMatch(match7,base));
-
+
+ assertTrue(DestinationMatch.isMatch(match1, base));
+ assertTrue(DestinationMatch.isMatch(match2, base));
+ assertTrue(DestinationMatch.isMatch(match3, base));
+ assertTrue(DestinationMatch.isMatch(match4, base));
+ assertTrue(DestinationMatch.isMatch(match5, base));
+ assertTrue(DestinationMatch.isMatch(match6, base));
+ assertTrue(DestinationMatch.isMatch(match7, base));
+
assertFalse(DestinationMatch.isMatch(base, fail1));
assertFalse(DestinationMatch.isMatch(base, fail2));
assertFalse(DestinationMatch.isMatch(base, fail3));
Modified: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/ChainedProcessorTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/ChainedProcessorTest.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/ChainedProcessorTest.java (original)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/ChainedProcessorTest.java Fri Jul 19 18:44:21 2013
@@ -19,12 +19,10 @@ package org.apache.activeblaze.impl.proc
import java.util.concurrent.atomic.AtomicBoolean;
import junit.framework.TestCase;
-
-import org.apache.activeblaze.wire.PacketData.PacketDataBean;
+import org.apache.activeblaze.wire.Packet;
/**
* Test some basics in ChainedProcessor
- *
*/
public class ChainedProcessorTest extends TestCase {
public void testStart() throws Exception {
@@ -74,7 +72,7 @@ public class ChainedProcessorTest extend
A.setEnd(D);
A.setEnd(target);
A.start();
- Packet p = new Packet(new PacketDataBean().freeze());
+ Packet p = new Packet();
D.downStream(p);
assertTrue(test.get());
}
@@ -95,7 +93,7 @@ public class ChainedProcessorTest extend
A.setEnd(C);
A.setEnd(D);
A.start();
- Packet p = new Packet(new PacketDataBean().freeze());
+ Packet p = new Packet();
D.upStream(p);
assertTrue(test.get());
}
Modified: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/PacketAuditTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/PacketAuditTest.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/PacketAuditTest.java (original)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/PacketAuditTest.java Fri Jul 19 18:44:21 2013
@@ -17,38 +17,31 @@
package org.apache.activeblaze.impl.processor;
import junit.framework.TestCase;
-
-import org.apache.activeblaze.wire.PacketData.PacketDataBean;
-import org.apache.activemq.protobuf.Buffer;
-
+import org.apache.activeblaze.wire.Packet;
+import org.apache.activeblaze.wire.PacketAudit;
/**
*
*
*/
public class PacketAuditTest extends TestCase {
-
/**
* Test method for duplicates
- * @throws Exception
*/
public void testAudit() throws Exception {
PacketAudit audit = new PacketAudit();
audit.start();
- for (long i =0; i< audit.getMaxAuditDepth();i++) {
- PacketDataBean data = new PacketDataBean();
- data.setProducerId(new Buffer("fred"));
- data.setMessageSequence(i);
- Packet packet = new Packet(data.freeze());
+ for (long i = 0; i < audit.getMaxAuditDepth(); i++) {
+ Packet packet = new Packet();
+ packet.setProducerId("fred");
+ packet.setMessageSequence(i);
assertFalse(audit.isDuplicate(packet));
}
-
- for (long i =0; i< audit.getMaxAuditDepth();i++) {
- PacketDataBean data = new PacketDataBean();
- data.setProducerId(new Buffer("fred"));
- data.setMessageSequence(i);
- Packet packet = new Packet(data.freeze());
- assertTrue("Testing " + i,audit.isDuplicate(packet));
+ for (long i = 0; i < audit.getMaxAuditDepth(); i++) {
+ Packet packet = new Packet();
+ packet.setProducerId("fred");
+ packet.setMessageSequence(i);
+ assertTrue("Testing " + i, audit.isDuplicate(packet));
}
}
}
Modified: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/TerminatedChainedProcessor.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/TerminatedChainedProcessor.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/TerminatedChainedProcessor.java (original)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/TerminatedChainedProcessor.java Fri Jul 19 18:44:21 2013
@@ -18,40 +18,40 @@ package org.apache.activeblaze.impl.proc
import java.util.ArrayList;
import java.util.List;
-import org.apache.activeblaze.impl.processor.DefaultChainedProcessor;
-import org.apache.activeblaze.impl.processor.Packet;
+
+import org.apache.activeblaze.wire.Packet;
/**
* Test Processor
- *
*/
public class TerminatedChainedProcessor extends DefaultChainedProcessor {
private Packet result = null;
private List<Packet> list = new ArrayList<Packet>();
+
public TerminatedChainedProcessor() {
}
-
- public void downStream(Packet packet){
- this.result= packet;
+
+ public void downStream(Packet packet) {
+ this.result = packet;
this.list.add(packet);
}
- public void upStream(Packet packet){
- this.result=packet;
+ public void upStream(Packet packet) {
+ this.result = packet;
}
-
+
public Packet getResult() {
return this.result;
}
-
- public List<Packet>getPacketList(){
+
+ public List<Packet> getPacketList() {
return this.list;
}
-
- public void reset(){
- this.result=null;
+
+ public void reset() {
+ this.result = null;
this.list.clear();
}
}
Modified: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/reliable/swp/MockNetworkProcessor.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/reliable/swp/MockNetworkProcessor.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/reliable/swp/MockNetworkProcessor.java (original)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/reliable/swp/MockNetworkProcessor.java Fri Jul 19 18:44:21 2013
@@ -17,12 +17,12 @@
package org.apache.activeblaze.impl.reliable.swp;
import java.net.SocketAddress;
+
import org.apache.activeblaze.impl.processor.DefaultChainedProcessor;
-import org.apache.activeblaze.impl.processor.Packet;
+import org.apache.activeblaze.wire.Packet;
/**
* Changes downStream() to upStream and vice versa
- *
*/
public class MockNetworkProcessor extends DefaultChainedProcessor {
int count = 0;
Modified: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/reliable/swp/MockNetworkSwitch.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/reliable/swp/MockNetworkSwitch.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/reliable/swp/MockNetworkSwitch.java (original)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/reliable/swp/MockNetworkSwitch.java Fri Jul 19 18:44:21 2013
@@ -17,19 +17,19 @@
package org.apache.activeblaze.impl.reliable.swp;
import java.net.InetSocketAddress;
-import org.apache.activeblaze.impl.processor.Packet;
+
+import org.apache.activeblaze.wire.Packet;
/**
* Mock network
- *
*/
public class MockNetworkSwitch {
private final MockNetworkProcessor peer1;
private final MockNetworkProcessor peer2;
MockNetworkSwitch() {
- this.peer1 = new MockNetworkProcessor(this, new InetSocketAddress("localhost",1));
- this.peer2 = new MockNetworkProcessor(this,new InetSocketAddress("localhost",2));
+ this.peer1 = new MockNetworkProcessor(this, new InetSocketAddress("localhost", 1));
+ this.peer2 = new MockNetworkProcessor(this, new InetSocketAddress("localhost", 2));
}
MockNetworkProcessor getPeer1() {
Modified: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/reliable/swp/SwpProcessorTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/reliable/swp/SwpProcessorTest.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/reliable/swp/SwpProcessorTest.java (original)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/reliable/swp/SwpProcessorTest.java Fri Jul 19 18:44:21 2013
@@ -25,17 +25,14 @@ import java.util.concurrent.CountDownLat
import java.util.concurrent.TimeUnit;
import junit.framework.TestCase;
-
+import org.apache.activeblaze.BlazeMessage;
import org.apache.activeblaze.impl.processor.DefaultChainedProcessor;
-import org.apache.activeblaze.impl.processor.Packet;
import org.apache.activeblaze.impl.transport.UdpTransport;
import org.apache.activeblaze.util.IdGenerator;
-import org.apache.activeblaze.wire.PacketData.PacketDataBean;
-import org.apache.activemq.protobuf.Buffer;
+import org.apache.activeblaze.wire.Packet;
/**
* Test the SwpProcessor
- *
*/
public class SwpProcessorTest extends TestCase {
private IdGenerator idGenerator = new IdGenerator();
@@ -46,7 +43,6 @@ public class SwpProcessorTest extends Te
SwpProcessor consumer;
/**
- * @throws java.lang.Exception
* @see junit.framework.TestCase#setUp()
*/
protected void setUp() throws Exception {
@@ -59,7 +55,6 @@ public class SwpProcessorTest extends Te
}
/**
- * @throws java.lang.Exception
* @see junit.framework.TestCase#tearDown()
*/
protected void tearDown() throws Exception {
@@ -110,7 +105,7 @@ public class SwpProcessorTest extends Te
seq++;
}
}
-
+
public void testResponseRequiredProcessor() throws Exception {
final List<Packet> results = new ArrayList<Packet>();
final int number = 100;
@@ -148,15 +143,15 @@ public class SwpProcessorTest extends Te
}
protected Packet createPacket(SocketAddress to, boolean responseRequried) throws Exception {
- PacketDataBean data = new PacketDataBean();
- data.setMessageId(new Buffer(this.idGenerator.generateId()));
- Buffer payload = new Buffer(new byte[1024]);
- data.setPayload(payload);
- if( responseRequried ) {
+ byte[] payload = new byte[1024];
+ BlazeMessage data = new BlazeMessage(payload);
+ data.setId(this.idGenerator.generateId());
+
+ if (responseRequried) {
data.setResponseRequired(true);
}
- Packet packet = new Packet(data.freeze());
- packet.setTo(to);
- return packet;
+
+ data.setTo(to);
+ return data;
}
}
Modified: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/transport/MulticastTransportTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/transport/MulticastTransportTest.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/transport/MulticastTransportTest.java (original)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/transport/MulticastTransportTest.java Fri Jul 19 18:44:21 2013
@@ -20,16 +20,11 @@ import java.net.InetSocketAddress;
import java.net.URI;
import junit.framework.TestCase;
-
-import org.apache.activeblaze.impl.processor.Packet;
+import org.apache.activeblaze.BlazeMessage;
import org.apache.activeblaze.impl.processor.TerminatedChainedProcessor;
-import org.apache.activeblaze.wire.MessageType;
-import org.apache.activeblaze.wire.PacketData.PacketDataBean;
-import org.apache.activemq.protobuf.Buffer;
/**
* Test Multicast Transport
- *
*/
public class MulticastTransportTest extends TestCase {
public void testTransport() throws Exception {
@@ -46,19 +41,14 @@ public class MulticastTransportTest exte
receiver.init();
receiver.start();
String payload = "test String";
- Buffer duff = new Buffer("duff");
- PacketDataBean packetData = new PacketDataBean();
- packetData.setMessageType(MessageType.MEMBER_DATA);
- packetData.setMessageId(new Buffer("foo"));
- packetData.setProducerId(duff);
- packetData.setSessionId(1);
- packetData.setMessageSequence(0);
- packetData.setPayload(new Buffer(payload));
- Packet packet = new Packet(packetData.freeze());
- packet.setTo(to);
- sender.downStream(packet);
+ BlazeMessage msg = new BlazeMessage(payload);
+ msg.setId("foo");
+ msg.setTo(to);
+ sender.downStream(msg);
Thread.sleep(500);
- assertEquals(payload, test.getResult().getPacketData().getPayload().toStringUtf8());
+ assertNotNull(test.getResult());
+ BlazeMessage result = (BlazeMessage) test.getResult();
+ assertEquals(payload, result.getText());
receiver.shutDown();
sender.shutDown();
}
Modified: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/transport/UdpTransportTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/transport/UdpTransportTest.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/transport/UdpTransportTest.java (original)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/transport/UdpTransportTest.java Fri Jul 19 18:44:21 2013
@@ -16,57 +16,48 @@
*/
package org.apache.activeblaze.impl.transport;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
import java.net.URI;
import junit.framework.TestCase;
-
-import org.apache.activeblaze.impl.processor.Packet;
+import org.apache.activeblaze.BlazeMessage;
import org.apache.activeblaze.impl.processor.TerminatedChainedProcessor;
-import org.apache.activeblaze.wire.MessageType;
-import org.apache.activeblaze.wire.PacketData.PacketDataBean;
-import org.apache.activemq.protobuf.Buffer;
-
/**
* @author rajdavies
- *
*/
public class UdpTransportTest extends TestCase {
-
+
public void testTransport() throws Exception {
URI receiverURI = new URI("udp://localhost:6966");
URI senderURI = new URI("udp://localhost:6766");
-
-
+
+
UdpTransport sender = new UdpTransport();
sender.setLocalURI(senderURI);
sender.start();
-
+
TerminatedChainedProcessor test = new TerminatedChainedProcessor();
UdpTransport receiver = new UdpTransport();
receiver.setPrev(test);
receiver.setLocalURI(receiverURI);
receiver.start();
-
-
- String payload = "test String";
- Buffer duff = new Buffer("duff");
- PacketDataBean packetData = new PacketDataBean();
- packetData.setMessageType(MessageType.MEMBER_DATA);
- packetData.setMessageId(new Buffer("foo"));
- packetData.setProducerId(duff);
- packetData.setSessionId(1);
- packetData.setMessageSequence(0);
- packetData.setPayload(new Buffer(payload));
- Packet packet = new Packet(receiverURI.getHost(),receiverURI.getPort(),packetData.freeze());
-
-
- sender.downStream(packet);
-
+
+
+ String payload = "test String";
+ BlazeMessage msg = new BlazeMessage(payload);
+ SocketAddress to = new InetSocketAddress(receiverURI.getHost(), receiverURI.getPort());
+ msg.setTo(to);
+ sender.downStream(msg);
+
+
Thread.sleep(500);
-
- assertEquals(payload,test.getResult().getPacketData().getPayload().toStringUtf8());
+
+ assertNotNull(test.getResult());
+ BlazeMessage result = (BlazeMessage) test.getResult();
+ assertEquals(payload, result.getText());
receiver.shutDown();
sender.shutDown();
}
Modified: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/jms/perf/PerfConsumer.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/jms/perf/PerfConsumer.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/jms/perf/PerfConsumer.java (original)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/jms/perf/PerfConsumer.java Fri Jul 19 18:44:21 2013
@@ -42,7 +42,7 @@ public class PerfConsumer implements Mes
public PerfConsumer(ConnectionFactory fac, Destination dest, String consumerName) throws JMSException {
connection = fac.createConnection();
- connection.setClientID(consumerName!=null ? consumerName:"Consumer");
+ connection.setClientID(consumerName != null ? consumerName : "Consumer");
Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
if (dest instanceof Topic && consumerName != null && consumerName.length() > 0) {
consumer = s.createDurableSubscriber((Topic) dest, consumerName);
@@ -108,8 +108,7 @@ public class PerfConsumer implements Mes
}
/**
- * @param initialDelay
- * the initialDelay to set
+ * @param initialDelay the initialDelay to set
*/
public void setInitialDelay(long initialDelay) {
this.initialDelay = initialDelay;
Modified: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/jms/perf/PerfProducer.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/jms/perf/PerfProducer.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/jms/perf/PerfProducer.java (original)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/jms/perf/PerfProducer.java Fri Jul 19 18:44:21 2013
@@ -51,7 +51,7 @@ public class PerfProducer implements Run
public void setDeliveryMode(int mode) throws JMSException {
producer.setDeliveryMode(mode);
}
-
+
public void setTimeToLive(int ttl) throws JMSException {
producer.setTimeToLive(ttl);
}
@@ -69,7 +69,7 @@ public class PerfProducer implements Run
rate.reset();
running = true;
connection.start();
- Thread t = new Thread(this);
+ Thread t = new Thread(this);
t.setName("Producer");
t.start();
}
@@ -79,7 +79,7 @@ public class PerfProducer implements Run
synchronized (this) {
running = false;
}
- stopped.await(1,TimeUnit.SECONDS);
+ stopped.await(1, TimeUnit.SECONDS);
connection.stop();
}
Modified: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/jms/perf/PerfRate.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/jms/perf/PerfRate.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/jms/perf/PerfRate.java (original)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/jms/perf/PerfRate.java Fri Jul 19 18:44:21 2013
@@ -40,7 +40,7 @@ public class PerfRate {
public int getRate() {
long endTime = System.currentTimeMillis();
long totalTime = endTime - startTime;
- int result = (int)((count * 1000) / totalTime);
+ int result = (int) ((count * 1000) / totalTime);
return result;
}
Modified: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/jms/perf/SimpleTopicTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/jms/perf/SimpleTopicTest.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/jms/perf/SimpleTopicTest.java (original)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/jms/perf/SimpleTopicTest.java Fri Jul 19 18:44:21 2013
@@ -21,9 +21,7 @@ import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Session;
-
import junit.framework.TestCase;
-
import org.apache.activeblaze.jms.BlazeJmsConnectionFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -34,44 +32,44 @@ import org.apache.commons.logging.LogFac
public class SimpleTopicTest extends TestCase {
private static final Log LOG = LogFactory.getLog(SimpleTopicTest.class);
-
-
+
+
protected PerfProducer[] producers;
protected PerfConsumer[] consumers;
protected String destinationName = getClass().getName();
protected int sampleCount = 20;
protected long sampleInternal = 10000;
- protected int numberOfDestinations=1;
- protected int numberOfConsumers = 2;
+ protected int numberOfDestinations = 1;
+ protected int numberOfConsumers = 1;
protected int numberofProducers = 1;
protected int totalNumberOfProducers;
protected int totalNumberOfConsumers;
protected int playloadSize = 1024;
protected byte[] array;
protected ConnectionFactory factory;
-
- /**
+
+ /**
* Sets up a test where the producer and consumer have their own connection.
- *
+ *
* @see junit.framework.TestCase#setUp()
*/
protected void setUp() throws Exception {
-
+
factory = createConnectionFactory();
Connection con = factory.createConnection();
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-
+
+
LOG.info("Running " + numberofProducers + " producer(s) and " + numberOfConsumers + " consumer(s) per " + numberOfDestinations + " Destination(s)");
-
- totalNumberOfConsumers=numberOfConsumers*numberOfDestinations;
- totalNumberOfProducers=numberofProducers*numberOfDestinations;
+
+ totalNumberOfConsumers = numberOfConsumers * numberOfDestinations;
+ totalNumberOfProducers = numberofProducers * numberOfDestinations;
producers = new PerfProducer[totalNumberOfProducers];
consumers = new PerfConsumer[totalNumberOfConsumers];
int consumerCount = 0;
int producerCount = 0;
- for (int k =0; k < numberOfDestinations;k++) {
- Destination destination = createDestination(session, destinationName+":"+k);
+ for (int k = 0; k < numberOfDestinations; k++) {
+ Destination destination = createDestination(session, destinationName + ":" + k);
LOG.info("Testing against destination: " + destination);
for (int i = 0; i < numberOfConsumers; i++) {
consumers[consumerCount] = createConsumer(factory, destination, consumerCount);
@@ -80,7 +78,7 @@ public class SimpleTopicTest extends Tes
for (int i = 0; i < numberofProducers; i++) {
array = new byte[playloadSize];
for (int j = i; j < array.length; j++) {
- array[j] = (byte)j;
+ array[j] = (byte) j;
}
producers[producerCount] = createProducer(factory, destination, i, array);
producerCount++;
@@ -98,7 +96,7 @@ public class SimpleTopicTest extends Tes
for (int i = 0; i < numberofProducers; i++) {
producers[i].shutDown();
}
-
+
}
protected Destination createDestination(Session s, String destinationName) throws JMSException {
@@ -107,10 +105,8 @@ public class SimpleTopicTest extends Tes
/**
* Factory method to create a new broker
- *
- * @throws Exception
*/
-
+
protected PerfProducer createProducer(ConnectionFactory fac, Destination dest, int number, byte[] payload) throws JMSException {
return new PerfProducer(fac, dest, payload);
@@ -119,15 +115,13 @@ public class SimpleTopicTest extends Tes
protected PerfConsumer createConsumer(ConnectionFactory fac, Destination dest, int number) throws JMSException {
return new PerfConsumer(fac, dest);
}
-
-
-
+
protected ConnectionFactory createConnectionFactory() throws Exception {
int portNum = 61616;
String uri = "static://(";
int count = this.numberofProducers + this.numberOfConsumers;
- for (int i = 0; i < count;i++) {
+ for (int i = 0; i < count; i++) {
uri += "udp://localhost:" + (portNum++);
uri += ",";
}
@@ -163,31 +157,31 @@ public class SimpleTopicTest extends Tes
protected void dumpProducerRate() {
int totalRate = 0;
int totalCount = 0;
- String producerString="Producers:";
+ String producerString = "Producers:";
for (int i = 0; i < producers.length; i++) {
PerfRate rate = producers[i].getRate().cloneAndReset();
totalRate += rate.getRate();
totalCount += rate.getTotalCount();
- producerString+="["+i+":"+rate.getRate() + ","+rate.getTotalCount()+"];";
+ producerString += "[" + i + ":" + rate.getRate() + "," + rate.getTotalCount() + "];";
}
if (producers != null && producers.length > 0) {
int avgRate = totalRate / producers.length;
System.out.println("Avg producer rate = " + avgRate
+ " msg/sec | Total rate = " + totalRate + ", sent = "
+ totalCount);
- // System.out.println(producerString);
+ // System.out.println(producerString);
}
}
protected void dumpConsumerRate() {
int totalRate = 0;
int totalCount = 0;
- String consumerString="Consumers:";
+ String consumerString = "Consumers:";
for (int i = 0; i < consumers.length; i++) {
PerfRate rate = consumers[i].getRate().cloneAndReset();
totalRate += rate.getRate();
totalCount += rate.getTotalCount();
- consumerString+="["+i+":"+rate.getRate() + ","+rate.getTotalCount()+"];";
+ consumerString += "[" + i + ":" + rate.getRate() + "," + rate.getTotalCount() + "];";
}
if (consumers != null && consumers.length > 0) {
int avgRate = totalRate / consumers.length;
Modified: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/util/PropertyUtilTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/util/PropertyUtilTest.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/util/PropertyUtilTest.java (original)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/util/PropertyUtilTest.java Fri Jul 19 18:44:21 2013
@@ -17,19 +17,17 @@
package org.apache.activeblaze.util;
import java.net.URI;
-import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.Map;
+
import junit.framework.TestCase;
/**
* Test cases for PropertyUtil
- *
*/
public class PropertyUtilTest extends TestCase {
/**
* Test method for {@link org.apache.activeblaze.util.PropertyUtil#parseParameters(java.net.URI)}.
- *
- * @throws Exception
*/
public void testParseParametersURI() throws Exception {
String query = "name=foo&type=blah";
@@ -42,8 +40,6 @@ public class PropertyUtilTest extends Te
/**
* Test method for {@link org.apache.activeblaze.util.PropertyUtil#parseParameters(java.lang.String)}.
- *
- * @throws Exception
*/
public void testParseParametersString() throws Exception {
String query = "name=foo&type=blah";
@@ -57,16 +53,15 @@ public class PropertyUtilTest extends Te
/**
* Test method for
* {@link org.apache.activeblaze.util.PropertyUtil#addPropertiesToURI(java.lang.String, java.util.Map)}.
- * @throws Exception
*/
public void testAddPropertiesToURI() throws Exception {
String query = "type=blah&name=foo";
- Map<String, String> map = new HashMap<String, String>();
- map.put("name", "foo");
+ Map<String, String> map = new LinkedHashMap<String, String>();
map.put("type", "blah");
- String test = "somestring?"+query;
+ map.put("name", "foo");
+ String test = "somestring?" + query;
String result = PropertyUtil.addPropertiesToURI(test, map);
URI uri = new URI(result);
- assertEquals(uri.getQuery(),query);
+ assertEquals(uri.getQuery(), query);
}
}