You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/02/17 16:12:59 UTC
svn commit: r745113 [2/2] - in /activemq/activemq-blaze/trunk: ./
src/main/java/org/apache/activeblaze/
src/main/java/org/apache/activeblaze/cluster/
src/main/java/org/apache/activeblaze/group/
src/main/java/org/apache/activeblaze/impl/network/ src/mai...
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/ConsumerProcessor.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/ConsumerProcessor.java?rev=745113&r1=745112&r2=745113&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/ConsumerProcessor.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/ConsumerProcessor.java Tue Feb 17 15:12:52 2009
@@ -21,12 +21,14 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+
import org.apache.activeblaze.impl.processor.Packet;
import org.apache.activeblaze.impl.reliable.ReliableBuffer;
-import org.apache.activeblaze.wire.AckData;
import org.apache.activeblaze.wire.MessageType;
-import org.apache.activeblaze.wire.NackData;
import org.apache.activeblaze.wire.PacketData;
+import org.apache.activeblaze.wire.AckData.AckDataBean;
+import org.apache.activeblaze.wire.NackData.NackDataBean;
+import org.apache.activeblaze.wire.PacketData.PacketDataBean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -57,7 +59,7 @@
void processInBound(Packet packet) throws Exception {
PacketData packetData = packet.getPacketData();
- MessageType type = MessageType.valueOf(packetData.getType());
+ MessageType type = packetData.getType();
if (type == MessageType.CONTROL_DATA) {
if (this.replayBuffer.isEmpty()) {
// send back a control message
@@ -107,19 +109,18 @@
}
} else if (!packet.isReplayed() && !this.replayBuffer.isEmpty()) {
// request the sequence
- MessageType nackType = MessageType.NACK_DATA;
- NackData nack = (NackData) nackType.createMessage();
+ NackDataBean nack = new NackDataBean();
this.lock.lock();
try {
nack.setStartSequence(this.lastSequence + 1);
nack.setEndSequence(packet.getMessageSequence() - 1);
nack.setSessionId(packet.getPacketData().getSessionId());
nack.setId(this.ackSequence.incrementAndGet());
- PacketData pd = new PacketData();
+ PacketDataBean pd = new PacketDataBean();
pd.setResponseRequired(false);
- pd.setPayload(nack.toFramedBuffer());
- pd.setType(nackType.getNumber());
- Packet nackPacket = new Packet(pd);
+ pd.setPayload(nack.freeze().toUnframedBuffer());
+ pd.setType(MessageType.NACK_DATA);
+ Packet nackPacket = new Packet(pd.freeze());
nackPacket.setTo(this.peerAddress);
this.swp.sendDownStream(nackPacket);
LOG.debug(this + " Sending Nack: " + nack.getStartSequence() + " , " + nack.getEndSequence());
@@ -152,16 +153,15 @@
this.lock.lock();
try {
this.bufferSize = 0;
- MessageType type = MessageType.ACK_DATA;
- AckData ack = (AckData) type.createMessage();
+ AckDataBean ack = new AckDataBean();
ack.setStartSequence(this.firstSequence);
ack.setEndSequence(this.lastSequence);
ack.setId(this.ackSequence.incrementAndGet());
- PacketData pd = new PacketData();
+ PacketDataBean pd = new PacketDataBean();
pd.setResponseRequired(false);
- pd.setPayload(ack.toFramedBuffer());
- pd.setType(type.getNumber());
- ackPacket = new Packet(pd);
+ pd.setPayload(ack.freeze().toUnframedBuffer());
+ pd.setType(MessageType.ACK_DATA);
+ ackPacket = new Packet(pd.freeze());
ackPacket.setTo(this.peerAddress);
this.lastAckTime = System.currentTimeMillis();
this.firstSequence = this.lastSequence;
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/ProducerProcessor.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/ProducerProcessor.java?rev=745113&r1=745112&r2=745113&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/ProducerProcessor.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/ProducerProcessor.java Tue Feb 17 15:12:52 2009
@@ -23,14 +23,16 @@
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+
import org.apache.activeblaze.BlazeNoRouteException;
import org.apache.activeblaze.impl.processor.Packet;
import org.apache.activeblaze.impl.reliable.ReliableBuffer;
-import org.apache.activeblaze.wire.AckData;
-import org.apache.activeblaze.wire.ControlData;
import org.apache.activeblaze.wire.MessageType;
-import org.apache.activeblaze.wire.NackData;
import org.apache.activeblaze.wire.PacketData;
+import org.apache.activeblaze.wire.AckData.AckDataBuffer;
+import org.apache.activeblaze.wire.ControlData.ControlDataBean;
+import org.apache.activeblaze.wire.NackData.NackDataBuffer;
+import org.apache.activeblaze.wire.PacketData.PacketDataBean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
@@ -66,8 +68,10 @@
*
*/
void processOutbound(final Packet packet) throws BlazeNoRouteException {
- packet.getPacketData().setSessionId(this.sessionId);
- packet.getPacketData().setMessageSequence(this.sendSequence.incrementAndGet());
+ PacketDataBean bean = packet.getPacketData().copy();
+ bean.setSessionId(this.sessionId);
+ bean.setMessageSequence(this.sendSequence.incrementAndGet());
+ packet.setPacketData(bean.freeze());
this.lock.lock();
try {
this.replayBuffer.addPacket(packet);
@@ -92,10 +96,9 @@
Packet result = null;
PacketData data = packet.getPacketData();
if (data != null) {
- MessageType type = MessageType.valueOf(data.getType());
+ MessageType type = data.getType();
if (type == MessageType.ACK_DATA) {
- AckData ackData = (AckData) type.createMessage();
- ackData.mergeFramed(data.getPayload());
+ AckDataBuffer ackData = AckDataBuffer.parseUnframed(data.getPayload());
long start = ackData.getStartSequence();
long end = ackData.getEndSequence();
if (LOG.isDebugEnabled()) {
@@ -119,8 +122,7 @@
}
} else if (type == MessageType.NACK_DATA) {
this.lastAckTime = System.currentTimeMillis();
- NackData nackData = (NackData) type.createMessage();
- nackData.mergeFramed(data.getPayload());
+ NackDataBuffer nackData = NackDataBuffer.parseUnframed(data.getPayload());
this.lastAckId = nackData.getId();
LOG.debug(this + " Got Nack = " + nackData);
// lookup any missed messages
@@ -151,14 +153,13 @@
Packet ackPacket = null;
this.lock.lock();
try {
- MessageType type = MessageType.CONTROL_DATA;
- ControlData control = (ControlData) type.createMessage();
+ ControlDataBean control = new ControlDataBean();
control.setLastId(this.lastAckId);
- PacketData pd = new PacketData();
+ PacketDataBean pd = new PacketDataBean();
pd.setResponseRequired(false);
- pd.setPayload(control.toFramedBuffer());
- pd.setType(type.getNumber());
- ackPacket = new Packet(pd);
+ pd.setPayload(control.freeze().toUnframedBuffer());
+ pd.setType(MessageType.CONTROL_DATA);
+ ackPacket = new Packet(pd.freeze());
ackPacket.setTo(this.peerAddress);
LOG.debug(this + " Sent Control message " + control);
} finally {
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/SwpProcessor.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/SwpProcessor.java?rev=745113&r1=745112&r2=745113&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/SwpProcessor.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/SwpProcessor.java Tue Feb 17 15:12:52 2009
@@ -21,6 +21,7 @@
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activeblaze.impl.processor.DefaultChainedProcessor;
import org.apache.activeblaze.impl.processor.Packet;
@@ -106,10 +107,20 @@
}
public void doStop() throws Exception {
- super.doStop();
if (this.statusTimer != null) {
- this.statusTimer.cancel();
+ // Make sure we shutdown the timer before shutting down the down stream
+ // processors to avoid the timer getting errors.
+ final CountDownLatch done = new CountDownLatch(1);
+ this.statusTimer.schedule(new TimerTask(){
+ @Override
+ public void run() {
+ statusTimer.cancel();
+ done.countDown();
+ }}, 0);
+ done.await();
+ this.statusTimer=null;
}
+ super.doStop();
}
void sendDownStream(Packet packet) throws Exception {
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/BaseTransport.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/BaseTransport.java?rev=745113&r1=745112&r2=745113&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/BaseTransport.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/BaseTransport.java Tue Feb 17 15:12:52 2009
@@ -258,7 +258,12 @@
} catch (InterruptedException e1) {
// we've stopped
} catch (Exception e) {
- LOG.error("Caught an exception processing a packet: " + packet, e);
+ String value="";
+ try {
+ value = packet.toString();
+ } catch (Throwable ignore) {
+ }
+ LOG.error("Caught an exception processing a packet: " + value, e);
stopInternal();
}
}
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/MulticastTransport.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/MulticastTransport.java?rev=745113&r1=745112&r2=745113&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/MulticastTransport.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/MulticastTransport.java Tue Feb 17 15:12:52 2009
@@ -22,9 +22,10 @@
import java.net.MulticastSocket;
import java.net.NetworkInterface;
import java.net.SocketAddress;
+
import org.apache.activeblaze.BlazeException;
import org.apache.activeblaze.impl.processor.Packet;
-import org.apache.activeblaze.wire.PacketData;
+import org.apache.activeblaze.wire.PacketData.PacketDataBuffer;
/**
* Multicast transport
@@ -72,7 +73,7 @@
DatagramPacket dp = new DatagramPacket(receiveData, receiveData.length);
this.socket.receive(dp);
if (dp.getLength() > 0) {
- PacketData data = PacketData.parseFramed(dp.getData());
+ PacketDataBuffer data = PacketDataBuffer.parseFramed(dp.getData());
SocketAddress address = dp.getSocketAddress();
Packet packet = new Packet(address, data);
if (!isEnableAudit() || !this.audit.isDuplicate(packet)) {
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/UdpTransport.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/UdpTransport.java?rev=745113&r1=745112&r2=745113&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/UdpTransport.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/UdpTransport.java Tue Feb 17 15:12:52 2009
@@ -25,15 +25,18 @@
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.util.Map;
+
import org.apache.activeblaze.BlazeException;
import org.apache.activeblaze.BlazeNoRouteException;
import org.apache.activeblaze.impl.processor.Packet;
import org.apache.activeblaze.util.IOUtils;
import org.apache.activeblaze.util.LRUCache;
import org.apache.activeblaze.util.SendRequest;
-import org.apache.activeblaze.wire.AckData;
import org.apache.activeblaze.wire.MessageType;
import org.apache.activeblaze.wire.PacketData;
+import org.apache.activeblaze.wire.AckData.AckDataBean;
+import org.apache.activeblaze.wire.PacketData.PacketDataBean;
+import org.apache.activeblaze.wire.PacketData.PacketDataBuffer;
import org.apache.activemq.protobuf.Buffer;
/**
@@ -47,7 +50,7 @@
private ByteBuffer outBuffer;
- private Map<Buffer, SendRequest> messageRequests = new LRUCache<Buffer, SendRequest>(
+ private Map<Buffer, SendRequest<PacketDataBuffer>> messageRequests = new LRUCache<Buffer, SendRequest<PacketDataBuffer>>(
1000);
public void doInit() throws Exception {
@@ -96,12 +99,11 @@
buffer.flip();
while (buffer.remaining() > 0) {
InputStream stream = IOUtils.getByteBufferInputStream(buffer);
- PacketData data = PacketData.parseFramed(stream);
+ PacketDataBuffer data = PacketDataBuffer.parseFramed(stream);
stream.close();
if (data.getResponse()) {
synchronized (this.messageRequests) {
- SendRequest request = this.messageRequests.remove(data
- .getCorrelationId());
+ SendRequest<PacketDataBuffer> request = this.messageRequests.remove(data.getCorrelationId());
if (request != null) {
request.put(data.getMessageId(), data);
}
@@ -124,12 +126,11 @@
public void downStream(Packet packet) throws Exception {
ByteBuffer buffer = this.outBuffer;
if (isStarted()) {
- SendRequest request = null;
+ SendRequest<PacketDataBuffer> request = null;
if (packet.isResponseRequired()) {
synchronized (this.messageRequests) {
- request = new SendRequest();
- this.messageRequests.put(packet.getPacketData()
- .getMessageId(), request);
+ request = new SendRequest<PacketDataBuffer>();
+ this.messageRequests.put(packet.getPacketData().getMessageId(), request);
}
}
synchronized (buffer) {
@@ -159,18 +160,17 @@
}
private Packet createAckPacket(PacketData data) {
- MessageType type = MessageType.ACK_DATA;
- AckData ackData = (AckData) type.createMessage();
+ AckDataBean ackData = new AckDataBean();
ackData.setSessionId(data.getSessionId());
ackData.setStartSequence(data.getMessageSequence());
ackData.setEndSequence(data.getMessageSequence());
- PacketData pd = new PacketData();
+ PacketDataBean pd = new PacketDataBean();
pd.setResponseRequired(false);
pd.setCorrelationId(data.getMessageId());
pd.setResponse(true);
- pd.setPayload(ackData.toFramedBuffer());
- pd.setType(type.getNumber());
- Packet packet = new Packet(pd);
+ pd.setPayload(ackData.freeze().toUnframedBuffer());
+ pd.setType(MessageType.ACK_DATA);
+ Packet packet = new Packet(pd.freeze());
return packet;
}
}
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsBytesMessage.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsBytesMessage.java?rev=745113&r1=745112&r2=745113&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsBytesMessage.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsBytesMessage.java Tue Feb 17 15:12:52 2009
@@ -31,6 +31,7 @@
import org.apache.activeblaze.jms.BlazeJmsExceptionSupport;
import org.apache.activeblaze.jms.message.BlazeJmsMessage.JmsMessageType;
import org.apache.activeblaze.wire.BlazeData;
+import org.apache.activeblaze.wire.BlazeData.BlazeDataBean;
import org.apache.activemq.protobuf.Buffer;
import org.apache.activemq.protobuf.BufferInputStream;
import org.apache.activemq.protobuf.BufferOutputStream;
@@ -116,7 +117,7 @@
if (this.dataOut != null) {
this.dataOut.close();
Buffer bs = this.bytesOut.toBuffer();
- getContent().setPayload(bs);
+ ((BlazeDataBean)getContent()).setPayload(bs);
this.bytesOut = null;
this.dataOut = null;
}
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsMapMessage.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsMapMessage.java?rev=745113&r1=745112&r2=745113&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsMapMessage.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsMapMessage.java Tue Feb 17 15:12:52 2009
@@ -21,15 +21,17 @@
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
+
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.MessageFormatException;
import javax.jms.MessageNotWriteableException;
+
import org.apache.activeblaze.BlazeException;
import org.apache.activeblaze.BlazeRuntimeException;
-import org.apache.activeblaze.jms.message.BlazeJmsMessage.JmsMessageType;
-import org.apache.activeblaze.wire.BlazeData;
-import org.apache.activeblaze.wire.MapData;
+import org.apache.activeblaze.wire.BlazeData.BlazeDataBean;
+import org.apache.activeblaze.wire.MapData.MapDataBean;
+import org.apache.activeblaze.wire.MapData.MapDataBuffer;
import org.apache.activemq.protobuf.Buffer;
import org.apache.activemq.protobuf.InvalidProtocolBufferException;
@@ -111,13 +113,12 @@
public void storeContent() {
super.storeContent();
if (getContent() != null && !this.map.isEmpty()) {
- MapData mapData = new MapData();
+ MapDataBean mapData = new MapDataBean();
for (Map.Entry<String, Object> entry : this.map.entrySet()) {
marshallMap(mapData, entry.getKey().toString(), entry.getValue());
}
- Buffer payload = mapData.toFramedBuffer();
- BlazeData data = getContent();
- data.setPayload(payload);
+ Buffer payload = mapData.freeze().toUnframedBuffer();
+ ((BlazeDataBean)getContent()).setPayload(payload);
}
}
@@ -129,7 +130,7 @@
protected void loadContent() {
if (getContent() != null && this.map.isEmpty()) {
try {
- MapData mapData = MapData.parseFramed(getContent().getPayload());
+ MapDataBuffer mapData = MapDataBuffer.parseUnframed(getContent().getPayload());
this.map = unmarshall(mapData);
} catch (InvalidProtocolBufferException e) {
throw new BlazeRuntimeException(e);
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsMessage.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsMessage.java?rev=745113&r1=745112&r2=745113&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsMessage.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsMessage.java Tue Feb 17 15:12:52 2009
@@ -17,18 +17,20 @@
package org.apache.activeblaze.jms.message;
import java.util.Enumeration;
+
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageFormatException;
+
import org.apache.activeblaze.BlazeException;
import org.apache.activeblaze.BlazeMessage;
import org.apache.activeblaze.BlazeMessageFormatException;
import org.apache.activeblaze.BlazeRuntimeException;
import org.apache.activeblaze.jms.BlazeJmsDestination;
import org.apache.activeblaze.util.Callback;
-import org.apache.activeblaze.wire.BlazeData;
+import org.apache.activeblaze.wire.BlazeData.BlazeDataBean;
/**
* Implementation of a Jms Message
@@ -99,7 +101,7 @@
* @see javax.jms.Message#clearBody()
*/
public void clearBody() throws JMSException {
- BlazeData data = getContent();
+ BlazeDataBean data = (BlazeDataBean) getContent();
if (data != null) {
data.clearPayload();
}
@@ -109,7 +111,7 @@
* @see javax.jms.Message#clearProperties()
*/
public void clearProperties() {
- BlazeData data = getContent();
+ BlazeDataBean data = (BlazeDataBean) getContent();
if (data != null) {
data.clearMapData();
}
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsObjectMessage.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsObjectMessage.java?rev=745113&r1=745112&r2=745113&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsObjectMessage.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsObjectMessage.java Tue Feb 17 15:12:52 2009
@@ -22,13 +22,15 @@
import java.io.InputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
+
import javax.jms.JMSException;
import javax.jms.ObjectMessage;
+
import org.apache.activeblaze.BlazeException;
import org.apache.activeblaze.BlazeRuntimeException;
import org.apache.activeblaze.jms.BlazeJmsExceptionSupport;
-import org.apache.activeblaze.jms.message.BlazeJmsMessage.JmsMessageType;
import org.apache.activeblaze.util.ClassLoadingAwareObjectInputStream;
+import org.apache.activeblaze.wire.BlazeData.BlazeDataBean;
import org.apache.activemq.protobuf.Buffer;
import org.apache.activemq.protobuf.BufferInputStream;
import org.apache.activemq.protobuf.BufferOutputStream;
@@ -98,7 +100,8 @@
objOut.reset();
objOut.close();
payload = os.toBuffer();
- getContent().setPayload(payload);
+ BlazeDataBean data = (BlazeDataBean) getContent();
+ data.setPayload(payload);
} catch (IOException ioe) {
throw new RuntimeException(ioe.getMessage(), ioe);
}
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsStreamMessage.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsStreamMessage.java?rev=745113&r1=745112&r2=745113&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsStreamMessage.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsStreamMessage.java Tue Feb 17 15:12:52 2009
@@ -32,6 +32,7 @@
import org.apache.activeblaze.jms.BlazeJmsExceptionSupport;
import org.apache.activeblaze.jms.message.BlazeJmsMessage.JmsMessageType;
import org.apache.activeblaze.wire.BlazeData;
+import org.apache.activeblaze.wire.BlazeData.BlazeDataBean;
import org.apache.activemq.protobuf.Buffer;
import org.apache.activemq.protobuf.BufferInputStream;
@@ -135,7 +136,8 @@
try {
this.dataOut.close();
Buffer buffer = new Buffer(bytesOut.toByteArray());
- getContent().setPayload(buffer);
+ BlazeDataBean data = (BlazeDataBean) getContent();
+ data.setPayload(buffer);
this.bytesOut = null;
this.dataOut = null;
} catch (IOException ioe) {
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsTextMessage.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsTextMessage.java?rev=745113&r1=745112&r2=745113&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsTextMessage.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsTextMessage.java Tue Feb 17 15:12:52 2009
@@ -24,6 +24,7 @@
import org.apache.activeblaze.BlazeException;
import org.apache.activeblaze.BlazeRuntimeException;
import org.apache.activeblaze.jms.message.BlazeJmsMessage.JmsMessageType;
+import org.apache.activeblaze.wire.BlazeData.BlazeDataBean;
import org.apache.activemq.protobuf.Buffer;
import org.apache.activemq.protobuf.BufferInputStream;
import org.apache.activemq.protobuf.BufferOutputStream;
@@ -74,7 +75,8 @@
BufferOutputStream os = new BufferOutputStream(this.text != null ? this.text.length() : 10);
DataOutputStream dataOut = new DataOutputStream(os);
MarshallingSupport.writeUTF8(dataOut, this.text);
- getContent().setPayload(os.toBuffer());
+ BlazeDataBean data = (BlazeDataBean) getContent();
+ data.setPayload(os.toBuffer());
dataOut.close();
} catch (IOException e) {
throw new BlazeRuntimeException(e);
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/SendRequest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/SendRequest.java?rev=745113&r1=745112&r2=745113&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/SendRequest.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/SendRequest.java Tue Feb 17 15:12:52 2009
@@ -17,8 +17,8 @@
package org.apache.activeblaze.util;
import java.util.concurrent.atomic.AtomicBoolean;
+
import org.apache.activemq.protobuf.Buffer;
-import org.apache.activemq.protobuf.Message;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -26,13 +26,13 @@
* state on a request
*
*/
-public class SendRequest {
+public class SendRequest<R> {
private static final Log LOG = LogFactory.getLog(SendRequest.class);
private final AtomicBoolean done = new AtomicBoolean();
- private Message<?> response;
+ private R response;
private RequestCallback callback;
- public Object get(long timeout) {
+ public R get(long timeout) {
synchronized (this.done) {
if (this.done.get() == false && this.response == null) {
try {
@@ -45,7 +45,7 @@
return this.response;
}
- public void put(Buffer id,Message<?> response) {
+ public void put(Buffer id, R response) {
this.response = response;
cancel();
RequestCallback callback = this.callback;
Modified: activemq/activemq-blaze/trunk/src/main/proto/blaze.proto
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/proto/blaze.proto?rev=745113&r1=745112&r2=745113&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/proto/blaze.proto (original)
+++ activemq/activemq-blaze/trunk/src/main/proto/blaze.proto Tue Feb 17 15:12:52 2009
@@ -19,8 +19,6 @@
option java_multiple_files = true;
option optimize_for = SPEED;
-
-
// We make use of the wonky comment style bellow because the following options
// are not valid for protoc, but they are valid for the ActiveMQ proto compiler.
// In the ActiveMQ proto compiler, comments terminate with the pipe character: |
@@ -33,226 +31,209 @@
ACK_DATA = 3;
NACK_DATA = 4;
STATE_DATA = 5;
- CONTROL_DATA =6;
+ CONTROL_DATA = 6;
+}
+
+message PacketData {
+ optional bool responseRequired = 1;
+ optional bool reliable = 2;
+ optional bool response = 3;
+ optional bool replayed = 4;
+ optional MessageType type =5;
+ optional bytes producerId = 6;
+ optional int32 sessionId = 7;
+ optional int64 messageSequence = 8;
+ optional int32 numberOfParts= 9;
+ optional int32 partNumber= 10;
+ optional bytes payload= 11;
+ optional bytes messageId =12;
+ optional bytes correlationId = 13;
+}
+
+message BlazeData {
+ optional bool persistent = 1;
+ optional int32 priority = 2;
+ optional int32 redeliveryCounter = 3;
+ optional int32 type =4;
+ optional int64 timestamp = 5;
+ optional int64 expiration = 6;
+ optional bytes messageId = 7;
+ optional bytes correlationId = 8;
+ optional bytes fromId =9;
+ optional bytes messageType = 10;
+ optional bytes payload = 11;
+ optional DestinationData destinationData = 12;
+ optional DestinationData replyToData = 13;
+ optional MapData mapData = 14;
+ optional bytes payload = 15;
+}
+
+message AckData {
+ optional int64 id =1;
+ optional int64 startSequence =2;
+ optional int64 endSequence =3;
+ optional int64 sessionId = 4;
+}
+
+message NackData {
+ optional int64 id =1;
+ optional int64 startSequence =2;
+ optional int64 endSequence =3;
+ optional int64 sessionId = 4;
+}
+
+message ControlData {
+ optional int64 lastId =1; //last ack or nack id
+}
+
+message DestinationData {
+ optional bytes name =1;
+ optional bool topic =2;
+ optional bool temporary=3;
+}
+
+message SubscriptionData {
+ optional bool durable = 1;
+ optional bool noLocal = 2;
+ optional int32 weight = 3;
+ optional string channelName = 4;
+ optional string subscriberName = 5;
+ optional string selector = 6;
+ optional DestinationData destinationData = 7;
+}
+
+message MemberData {
+ optional string id = 1;
+ optional string name = 2;
+ optional int64 startTime = 3;
+ optional int64 timeStamp = 4;
+ optional bytes inetAddress = 5;
+ optional int32 port = 6;
+ // a higher weight means this will be the master
+ optional int64 masterWeight = 7;
+ // if both weights are the same - the refined weight can be used
+ optional int64 refinedWeight = 8;
+ optional bool subscriptionsChanged = 9;
+ optional bool observer = 10;
+ optional bool lockedMaster = 11;
+ repeated bytes groups = 12;
+ repeated SubscriptionData subscriptionData = 13;
+}
+
+message StateKeyData {
+ optional MemberData member =1;
+ optional string key = 2;
+ optional bool locked = 3;
+ optional bool removeOnExit = 4;
+ optional bool releaseLockOnExit = 5;
+ optional int64 expiration = 6;
+ optional int64 lockExpiration = 7;
+}
+
+enum StateType {
+ INSERT = 1;
+ DELETE = 2;
+ SYNC = 3;
+}
+
+message StateData {
+ optional StateKeyData keyData = 1;
+ optional bytes value =2;
+ optional bytes oldvalue =3;
+ optional bool mapUpdate = 4;
+ optional bool mapWrite = 5;
+ optional bool expired = 6;
+ optional bool lockExpired = 7;
+ optional bool lockUpdate = 8;
+ optional bool lockWrite = 9;
+ optional bool error = 10;
+ optional StateType stateType = 11;
+}
+
+enum ElectionType {
+ ELECTION = 0;
+ ANSWER = 1;
+ MASTER = 2;
+}
+
+message ElectionMessage {
+ optional MemberData member = 1;
+ optional ElectionType electionType = 2;
}
- message PacketData {
- optional bool responseRequired = 1;
- optional bool reliable = 2;
- optional bool response = 3;
- optional bool replayed = 4;
- optional int32 type =5;
- optional bytes producerId = 6;
- optional int32 sessionId = 7;
- optional int64 messageSequence = 8;
- optional int32 numberOfParts= 9;
- optional int32 partNumber= 10;
- optional bytes payload= 11;
- optional bytes messageId =12;
- optional bytes correlationId = 13;
-
- }
-
- message BlazeData {
- //| option java_implments = "org.apache.activeblaze.impl.processor.PacketMessageType";
- //| option java_type_method = "MessageType";
- optional bool persistent = 1;
- optional int32 priority = 2;
- optional int32 redeliveryCounter = 3;
- optional int32 type =4;
- optional int64 timestamp = 5;
- optional int64 expiration = 6;
- optional bytes messageId = 7;
- optional bytes correlationId = 8;
- optional bytes fromId =9;
- optional bytes messageType = 10;
- optional bytes payload = 11;
- optional DestinationData destinationData = 12;
- optional DestinationData replyToData = 13;
- optional MapData mapData = 14;
- optional bytes payload = 15;
-
- }
-
- message AckData {
- //| option java_implments = "org.apache.activeblaze.impl.processor.PacketMessageType";
- //| option java_type_method = "MessageType";
- optional int64 id =1;
- optional int64 startSequence =2;
- optional int64 endSequence =3;
- optional int64 sessionId = 4;
- }
-
-
-
- message NackData {
- //| option java_implments = "org.apache.activeblaze.impl.processor.PacketMessageType";
- //| option java_type_method = "MessageType";
- optional int64 id =1;
- optional int64 startSequence =2;
- optional int64 endSequence =3;
- optional int64 sessionId = 4;
- }
-
- message ControlData {
- //| option java_implments = "org.apache.activeblaze.impl.processor.PacketMessageType";
- //| option java_type_method = "MessageType";
- optional int64 lastId =1; //last ack or nack id
- }
-
- message DestinationData {
- optional bytes name =1;
- optional bool topic =2;
- optional bool temporary=3;
- }
-
- message SubscriptionData {
- optional bool durable = 1;
- optional bool noLocal = 2;
- optional int32 weight = 3;
- optional string channelName = 4;
- optional string subscriberName = 5;
- optional string selector = 6;
- optional DestinationData destinationData = 7;
- }
-
- message MemberData {
- //| option java_implments = "org.apache.activeblaze.impl.processor.PacketMessageType";
- //| option java_type_method = "MessageType";
- optional string id = 1;
- optional string name = 2;
- optional int64 startTime = 3;
- optional int64 timeStamp = 4;
- optional bytes inetAddress = 5;
- optional int32 port = 6;
- //a higher weight means this will be the master
- optional int64 masterWeight = 7;
- //if both weights are the same - the refined
- //weight can be used
- optional int64 refinedWeight = 8;
- optional bool subscriptionsChanged = 9;
- optional bool observer = 10;
- optional bool lockedMaster = 11;
- repeated bytes groups = 12;
- repeated SubscriptionData subscriptionData = 13;
- }
-
- message StateKeyData {
- optional MemberData member =1;
- optional string key = 2;
- optional bool locked = 3;
- optional bool removeOnExit = 4;
- optional bool releaseLockOnExit = 5;
- optional int64 expiration = 6;
- optional int64 lockExpiration = 7;
- }
- enum StateType {
- INSERT = 1;
- DELETE = 2;
- SYNC = 3;
- }
- message StateData {
- //| option java_implments = "org.apache.activeblaze.impl.processor.PacketMessageType";
- //| option java_type_method = "MessageType";
- optional StateKeyData keyData = 1;
- optional bytes value =2;
- optional bytes oldvalue =3;
- optional bool mapUpdate = 4;
- optional bool mapWrite = 5;
- optional bool expired = 6;
- optional bool lockExpired = 7;
- optional bool lockUpdate = 8;
- optional bool lockWrite = 9;
- optional bool error = 10;
- optional StateType stateType = 11;
-
- }
-
- enum ElectionType {
- ELECTION = 0;
- ANSWER = 1;
- MASTER = 2;
- }
- message ElectionMessage {
- //| option java_implments = "org.apache.activeblaze.impl.processor.PacketMessageType";
- //| option java_type_method = "MessageType";
- optional MemberData member = 1;
- optional ElectionType electionType = 2;
- }
+///////////////////////////////////////////////////////////////////////
+// Properties / MapData
+///////////////////////////////////////////////////////////////////////
+
+message StringType {
+ optional string name = 1;
+ optional string value = 2;
+}
+
+message BoolType {
+ optional string name = 1;
+ optional bool value = 2;
+}
+
+message ByteType {
+ optional string name = 1;
+ optional int32 value = 2;
+}
+
+message ShortType {
+ optional string name = 1;
+ optional int32 value = 2;
+}
+
+message IntType {
+ optional string name = 1;
+ optional int32 value = 2;
+}
+
+message LongType {
+ optional string name = 1;
+ optional int64 value = 2;
+}
+
+message FloatType {
+ optional string name = 1;
+ optional float value = 2;
+}
- // Properties
- message StringType {
- optional string name = 1;
- optional string value = 2;
- }
-
- message BoolType {
- optional string name = 1;
- optional bool value = 2;
- }
-
- message ByteType {
- optional string name = 1;
- optional int32 value = 2;
- }
-
- message ShortType {
- optional string name = 1;
- optional int32 value = 2;
- }
-
- message IntType {
- optional string name = 1;
- optional int32 value = 2;
- }
-
- message LongType {
- optional string name = 1;
- optional int64 value = 2;
- }
-
- message FloatType {
- optional string name = 1;
- optional float value = 2;
- }
-
- message DoubleType {
- optional string name = 1;
- optional double value = 2;
- }
-
- message CharType {
- optional string name = 1;
- optional string value = 2;
- }
-
- message BytesType {
- optional string name = 1;
- optional bytes value = 2;
- }
-
- message BufferType {
- optional string name = 1;
- optional bytes value = 2;
- }
+message DoubleType {
+ optional string name = 1;
+ optional double value = 2;
+}
+
+message CharType {
+ optional string name = 1;
+ optional string value = 2;
+}
+message BytesType {
+ optional string name = 1;
+ optional bytes value = 2;
+}
+message BufferType {
+ optional string name = 1;
+ optional bytes value = 2;
+}
- message MapData {
- optional string name=1[default = "DEFAULT"];
- repeated StringType stringType = 2;
- repeated IntType intType = 3;
- repeated BoolType boolType = 4;
- repeated LongType longType = 5;
- repeated DoubleType doubleType = 6;
- repeated FloatType floatType = 7;
- repeated ShortType shortType = 8;
- repeated ByteType byteType = 9;
- repeated CharType charType = 10;
- repeated BytesType bytesType = 11;
- repeated MapData mapType = 12;
- repeated BufferType bufferType = 13;
-
- }
+message MapData {
+ optional string name=1[default = "DEFAULT"];
+ repeated StringType stringType = 2;
+ repeated IntType intType = 3;
+ repeated BoolType boolType = 4;
+ repeated LongType longType = 5;
+ repeated DoubleType doubleType = 6;
+ repeated FloatType floatType = 7;
+ repeated ShortType shortType = 8;
+ repeated ByteType byteType = 9;
+ repeated CharType charType = 10;
+ repeated BytesType bytesType = 11;
+ repeated MapData mapType = 12;
+ repeated BufferType bufferType = 13;
+}
\ No newline at end of file
Modified: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/ClusterStateTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/ClusterStateTest.java?rev=745113&r1=745112&r2=745113&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/ClusterStateTest.java (original)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/ClusterStateTest.java Tue Feb 17 15:12:52 2009
@@ -355,7 +355,7 @@
Object value = state1.put("foo", "blob");
assertNull(value);
value = state1.put("foo", "blah");
- assertEquals(value, "blob");
+ assertEquals("blob", value);
}
public void testRemove() throws Exception {
Modified: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/group/BlazeGroupChannelTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/group/BlazeGroupChannelTest.java?rev=745113&r1=745112&r2=745113&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/group/BlazeGroupChannelTest.java (original)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/group/BlazeGroupChannelTest.java Tue Feb 17 15:12:52 2009
@@ -119,7 +119,7 @@
reply.shutDown();
}
- public void testSendRequestString() throws Exception {
+ public void testSendRequestString() throws Exception {
String destination = "/test/foo";
final int number = 10;
final List<BlazeMessage> requests = new ArrayList<BlazeMessage>();
Modified: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/ChainedProcessorTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/ChainedProcessorTest.java?rev=745113&r1=745112&r2=745113&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/ChainedProcessorTest.java (original)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/ChainedProcessorTest.java Tue Feb 17 15:12:52 2009
@@ -17,8 +17,10 @@
package org.apache.activeblaze.impl.processor;
import java.util.concurrent.atomic.AtomicBoolean;
+
import junit.framework.TestCase;
-import org.apache.activeblaze.wire.PacketData;
+
+import org.apache.activeblaze.wire.PacketData.PacketDataBean;
/**
* Test some basics in ChainedProcessor
@@ -72,7 +74,7 @@
A.setEnd(D);
A.setEnd(target);
A.start();
- Packet p = new Packet(new PacketData());
+ Packet p = new Packet(new PacketDataBean().freeze());
D.downStream(p);
assertTrue(test.get());
}
@@ -93,7 +95,7 @@
A.setEnd(C);
A.setEnd(D);
A.start();
- Packet p = new Packet(new PacketData());
+ Packet p = new Packet(new PacketDataBean().freeze());
D.upStream(p);
assertTrue(test.get());
}
Modified: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/CompressionProcessorTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/CompressionProcessorTest.java?rev=745113&r1=745112&r2=745113&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/CompressionProcessorTest.java (original)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/CompressionProcessorTest.java Tue Feb 17 15:12:52 2009
@@ -4,7 +4,8 @@
package org.apache.activeblaze.impl.processor;
import junit.framework.TestCase;
-import org.apache.activeblaze.wire.PacketData;
+
+import org.apache.activeblaze.wire.PacketData.PacketDataBean;
import org.apache.activemq.protobuf.Buffer;
/**
@@ -13,10 +14,9 @@
*/
public class CompressionProcessorTest extends TestCase {
public void testProcessor() throws Exception {
- Packet packet = new Packet(new PacketData());
byte[] d1 = new byte[1024];
Buffer payload = new Buffer(d1);
- packet.getPacketData().setPayload(payload);
+ Packet packet = new Packet(new PacketDataBean().setPayload(payload).freeze());
TerminatedChainedProcessor test = new TerminatedChainedProcessor();
CompressionProcessor proc = new CompressionProcessor();
proc.setPrev(test);
@@ -30,7 +30,7 @@
d2[i] = (byte) i;
}
payload = new Buffer(d2);
- packet.getPacketData().setPayload(payload);
+ packet = new Packet(new PacketDataBean().setPayload(payload).freeze());
proc.downStream(packet.clone());
Packet result = test.getResult();
assertTrue(CompressionProcessor.isCompressed(result.getPacketData().getPayload()));
Modified: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/FragmentationProcessorTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/FragmentationProcessorTest.java?rev=745113&r1=745112&r2=745113&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/FragmentationProcessorTest.java (original)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/FragmentationProcessorTest.java Tue Feb 17 15:12:52 2009
@@ -18,8 +18,10 @@
import java.util.ArrayList;
import java.util.List;
+
import junit.framework.TestCase;
-import org.apache.activeblaze.wire.PacketData;
+
+import org.apache.activeblaze.wire.PacketData.PacketDataBean;
import org.apache.activemq.protobuf.Buffer;
/**
@@ -28,13 +30,13 @@
*/
public class FragmentationProcessorTest extends TestCase {
public void testProcessor() throws Exception {
- Packet packet = new Packet(new PacketData());
byte[] testData = new byte[1024 * 32];
for (int i = 0; i < testData.length; i++) {
testData[i] = (byte) i;
}
Buffer payload = new Buffer(testData);
- packet.getPacketData().setPayload(payload);
+ Packet packet = new Packet(new PacketDataBean().setPayload(payload).freeze());
+
TerminatedChainedProcessor test = new TerminatedChainedProcessor();
FragmentationProcessor proc = new FragmentationProcessor();
proc.setPrev(test);
Modified: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/PacketAuditTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/PacketAuditTest.java?rev=745113&r1=745112&r2=745113&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/PacketAuditTest.java (original)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/PacketAuditTest.java Tue Feb 17 15:12:52 2009
@@ -17,7 +17,8 @@
package org.apache.activeblaze.impl.processor;
import junit.framework.TestCase;
-import org.apache.activeblaze.wire.PacketData;
+
+import org.apache.activeblaze.wire.PacketData.PacketDataBean;
import org.apache.activemq.protobuf.Buffer;
@@ -35,18 +36,18 @@
PacketAudit audit = new PacketAudit();
audit.start();
for (long i =0; i< audit.getMaxAuditDepth();i++) {
- PacketData data = new PacketData();
+ PacketDataBean data = new PacketDataBean();
data.setProducerId(new Buffer("fred"));
data.setMessageSequence(i);
- Packet packet = new Packet(data);
+ Packet packet = new Packet(data.freeze());
assertFalse(audit.isDuplicate(packet));
}
for (long i =0; i< audit.getMaxAuditDepth();i++) {
- PacketData data = new PacketData();
+ PacketDataBean data = new PacketDataBean();
data.setProducerId(new Buffer("fred"));
data.setMessageSequence(i);
- Packet packet = new Packet(data);
+ Packet packet = new Packet(data.freeze());
assertTrue("Testing " + i,audit.isDuplicate(packet));
}
}
Modified: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/reliable/swp/SwpProcessorTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/reliable/swp/SwpProcessorTest.java?rev=745113&r1=745112&r2=745113&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/reliable/swp/SwpProcessorTest.java (original)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/reliable/swp/SwpProcessorTest.java Tue Feb 17 15:12:52 2009
@@ -23,12 +23,14 @@
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+
import junit.framework.TestCase;
+
import org.apache.activeblaze.impl.processor.DefaultChainedProcessor;
import org.apache.activeblaze.impl.processor.Packet;
import org.apache.activeblaze.impl.transport.UdpTransport;
import org.apache.activeblaze.util.IdGenerator;
-import org.apache.activeblaze.wire.PacketData;
+import org.apache.activeblaze.wire.PacketData.PacketDataBean;
import org.apache.activemq.protobuf.Buffer;
/**
@@ -93,7 +95,7 @@
receiver.setLocalURI(this.receiverURI);
this.consumer.start();
for (int i = 0; i < number; i++) {
- Packet packet = createPacket(this.to);
+ Packet packet = createPacket(this.to, false);
this.producer.downStream(packet);
}
latch.await(10, TimeUnit.SECONDS);
@@ -129,8 +131,7 @@
receiver.setLocalURI(this.receiverURI);
this.consumer.start();
for (int i = 0; i < number; i++) {
- Packet packet = createPacket(this.to);
- packet.getPacketData().setResponseRequired(true);
+ Packet packet = createPacket(this.to, true);
this.producer.downStream(packet);
}
latch.await(10, TimeUnit.SECONDS);
@@ -146,12 +147,15 @@
}
}
- protected Packet createPacket(SocketAddress to) throws Exception {
- PacketData data = new PacketData();
+ protected Packet createPacket(SocketAddress to, boolean responseRequried) throws Exception {
+ PacketDataBean data = new PacketDataBean();
data.setMessageId(new Buffer(this.idGenerator.generateId()));
Buffer payload = new Buffer(new byte[1024]);
data.setPayload(payload);
- Packet packet = new Packet(data);
+ if( responseRequried ) {
+ data.setResponseRequired(true);
+ }
+ Packet packet = new Packet(data.freeze());
packet.setTo(to);
return packet;
}
Modified: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/transport/MulticastTransportTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/transport/MulticastTransportTest.java?rev=745113&r1=745112&r2=745113&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/transport/MulticastTransportTest.java (original)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/transport/MulticastTransportTest.java Tue Feb 17 15:12:52 2009
@@ -18,11 +18,14 @@
import java.net.InetSocketAddress;
import java.net.URI;
+
+import junit.framework.TestCase;
+
import org.apache.activeblaze.impl.processor.Packet;
import org.apache.activeblaze.impl.processor.TerminatedChainedProcessor;
-import org.apache.activeblaze.wire.PacketData;
+import org.apache.activeblaze.wire.MessageType;
+import org.apache.activeblaze.wire.PacketData.PacketDataBean;
import org.apache.activemq.protobuf.Buffer;
-import junit.framework.TestCase;
/**
* Test Multicast Transport
@@ -44,14 +47,14 @@
receiver.start();
String payload = "test String";
Buffer duff = new Buffer("duff");
- PacketData packetData = new PacketData();
- packetData.setType(1);
+ PacketDataBean packetData = new PacketDataBean();
+ packetData.setType(MessageType.MEMBER_DATA);
packetData.setMessageId(new Buffer("foo"));
packetData.setProducerId(duff);
packetData.setSessionId(1);
packetData.setMessageSequence(0);
packetData.setPayload(new Buffer(payload));
- Packet packet = new Packet(packetData);
+ Packet packet = new Packet(packetData.freeze());
packet.setTo(to);
sender.downStream(packet);
Thread.sleep(500);
Modified: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/transport/UdpTransportTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/transport/UdpTransportTest.java?rev=745113&r1=745112&r2=745113&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/transport/UdpTransportTest.java (original)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/transport/UdpTransportTest.java Tue Feb 17 15:12:52 2009
@@ -16,14 +16,14 @@
*/
package org.apache.activeblaze.impl.transport;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
import java.net.URI;
+
import junit.framework.TestCase;
import org.apache.activeblaze.impl.processor.Packet;
import org.apache.activeblaze.impl.processor.TerminatedChainedProcessor;
-import org.apache.activeblaze.wire.PacketData;
+import org.apache.activeblaze.wire.MessageType;
+import org.apache.activeblaze.wire.PacketData.PacketDataBean;
import org.apache.activemq.protobuf.Buffer;
@@ -52,14 +52,14 @@
String payload = "test String";
Buffer duff = new Buffer("duff");
- PacketData packetData = new PacketData();
- packetData.setType(1);
+ PacketDataBean packetData = new PacketDataBean();
+ packetData.setType(MessageType.MEMBER_DATA);
packetData.setMessageId(new Buffer("foo"));
packetData.setProducerId(duff);
packetData.setSessionId(1);
packetData.setMessageSequence(0);
packetData.setPayload(new Buffer(payload));
- Packet packet = new Packet(receiverURI.getHost(),receiverURI.getPort(),packetData);
+ Packet packet = new Packet(receiverURI.getHost(),receiverURI.getPort(),packetData.freeze());
sender.downStream(packet);