You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/02/17 16:12:59 UTC

svn commit: r745113 [2/2] - in /activemq/activemq-blaze/trunk: ./ src/main/java/org/apache/activeblaze/ src/main/java/org/apache/activeblaze/cluster/ src/main/java/org/apache/activeblaze/group/ src/main/java/org/apache/activeblaze/impl/network/ src/mai...

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/ConsumerProcessor.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/ConsumerProcessor.java?rev=745113&r1=745112&r2=745113&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/ConsumerProcessor.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/ConsumerProcessor.java Tue Feb 17 15:12:52 2009
@@ -21,12 +21,14 @@
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+
 import org.apache.activeblaze.impl.processor.Packet;
 import org.apache.activeblaze.impl.reliable.ReliableBuffer;
-import org.apache.activeblaze.wire.AckData;
 import org.apache.activeblaze.wire.MessageType;
-import org.apache.activeblaze.wire.NackData;
 import org.apache.activeblaze.wire.PacketData;
+import org.apache.activeblaze.wire.AckData.AckDataBean;
+import org.apache.activeblaze.wire.NackData.NackDataBean;
+import org.apache.activeblaze.wire.PacketData.PacketDataBean;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -57,7 +59,7 @@
 
     void processInBound(Packet packet) throws Exception {
         PacketData packetData = packet.getPacketData();
-        MessageType type = MessageType.valueOf(packetData.getType());
+        MessageType type = packetData.getType();
         if (type == MessageType.CONTROL_DATA) {
             if (this.replayBuffer.isEmpty()) {
                 // send back a control message
@@ -107,19 +109,18 @@
                     }
                 } else if (!packet.isReplayed() && !this.replayBuffer.isEmpty()) {
                     // request the sequence
-                    MessageType nackType = MessageType.NACK_DATA;
-                    NackData nack = (NackData) nackType.createMessage();
+                    NackDataBean nack = new NackDataBean();
                     this.lock.lock();
                     try {
                         nack.setStartSequence(this.lastSequence + 1);
                         nack.setEndSequence(packet.getMessageSequence() - 1);
                         nack.setSessionId(packet.getPacketData().getSessionId());
                         nack.setId(this.ackSequence.incrementAndGet());
-                        PacketData pd = new PacketData();
+                        PacketDataBean pd = new PacketDataBean();
                         pd.setResponseRequired(false);
-                        pd.setPayload(nack.toFramedBuffer());
-                        pd.setType(nackType.getNumber());
-                        Packet nackPacket = new Packet(pd);
+                        pd.setPayload(nack.freeze().toUnframedBuffer());
+                        pd.setType(MessageType.NACK_DATA);
+                        Packet nackPacket = new Packet(pd.freeze());
                         nackPacket.setTo(this.peerAddress);
                         this.swp.sendDownStream(nackPacket);
                         LOG.debug(this + " Sending Nack: " + nack.getStartSequence() + " , " + nack.getEndSequence());
@@ -152,16 +153,15 @@
         this.lock.lock();
         try {
             this.bufferSize = 0;
-            MessageType type = MessageType.ACK_DATA;
-            AckData ack = (AckData) type.createMessage();
+            AckDataBean ack = new AckDataBean();
             ack.setStartSequence(this.firstSequence);
             ack.setEndSequence(this.lastSequence);
             ack.setId(this.ackSequence.incrementAndGet());
-            PacketData pd = new PacketData();
+            PacketDataBean pd = new PacketDataBean();
             pd.setResponseRequired(false);
-            pd.setPayload(ack.toFramedBuffer());
-            pd.setType(type.getNumber());
-            ackPacket = new Packet(pd);
+            pd.setPayload(ack.freeze().toUnframedBuffer());
+            pd.setType(MessageType.ACK_DATA);
+            ackPacket = new Packet(pd.freeze());
             ackPacket.setTo(this.peerAddress);
             this.lastAckTime = System.currentTimeMillis();
             this.firstSequence = this.lastSequence;

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/ProducerProcessor.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/ProducerProcessor.java?rev=745113&r1=745112&r2=745113&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/ProducerProcessor.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/ProducerProcessor.java Tue Feb 17 15:12:52 2009
@@ -23,14 +23,16 @@
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+
 import org.apache.activeblaze.BlazeNoRouteException;
 import org.apache.activeblaze.impl.processor.Packet;
 import org.apache.activeblaze.impl.reliable.ReliableBuffer;
-import org.apache.activeblaze.wire.AckData;
-import org.apache.activeblaze.wire.ControlData;
 import org.apache.activeblaze.wire.MessageType;
-import org.apache.activeblaze.wire.NackData;
 import org.apache.activeblaze.wire.PacketData;
+import org.apache.activeblaze.wire.AckData.AckDataBuffer;
+import org.apache.activeblaze.wire.ControlData.ControlDataBean;
+import org.apache.activeblaze.wire.NackData.NackDataBuffer;
+import org.apache.activeblaze.wire.PacketData.PacketDataBean;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 /**
@@ -66,8 +68,10 @@
      * 
      */
     void processOutbound(final Packet packet) throws BlazeNoRouteException {
-        packet.getPacketData().setSessionId(this.sessionId);
-        packet.getPacketData().setMessageSequence(this.sendSequence.incrementAndGet());
+        PacketDataBean bean = packet.getPacketData().copy();
+        bean.setSessionId(this.sessionId);
+        bean.setMessageSequence(this.sendSequence.incrementAndGet());
+        packet.setPacketData(bean.freeze());
         this.lock.lock();
         try {
             this.replayBuffer.addPacket(packet);
@@ -92,10 +96,9 @@
         Packet result = null;
         PacketData data = packet.getPacketData();
         if (data != null) {
-            MessageType type = MessageType.valueOf(data.getType());
+            MessageType type = data.getType();
             if (type == MessageType.ACK_DATA) {
-                AckData ackData = (AckData) type.createMessage();
-                ackData.mergeFramed(data.getPayload());
+                AckDataBuffer ackData = AckDataBuffer.parseUnframed(data.getPayload());
                 long start = ackData.getStartSequence();
                 long end = ackData.getEndSequence();
                 if (LOG.isDebugEnabled()) {
@@ -119,8 +122,7 @@
                 }
             } else if (type == MessageType.NACK_DATA) {
                 this.lastAckTime = System.currentTimeMillis();
-                NackData nackData = (NackData) type.createMessage();
-                nackData.mergeFramed(data.getPayload());
+                NackDataBuffer nackData = NackDataBuffer.parseUnframed(data.getPayload());
                 this.lastAckId = nackData.getId();
                 LOG.debug(this + " Got Nack = " + nackData);
                 // lookup any missed messages
@@ -151,14 +153,13 @@
             Packet ackPacket = null;
             this.lock.lock();
             try {
-                MessageType type = MessageType.CONTROL_DATA;
-                ControlData control = (ControlData) type.createMessage();
+                ControlDataBean control = new ControlDataBean();
                 control.setLastId(this.lastAckId);
-                PacketData pd = new PacketData();
+                PacketDataBean pd = new PacketDataBean();
                 pd.setResponseRequired(false);
-                pd.setPayload(control.toFramedBuffer());
-                pd.setType(type.getNumber());
-                ackPacket = new Packet(pd);
+                pd.setPayload(control.freeze().toUnframedBuffer());
+                pd.setType(MessageType.CONTROL_DATA);
+                ackPacket = new Packet(pd.freeze());
                 ackPacket.setTo(this.peerAddress);
                 LOG.debug(this + " Sent Control message " + control);
             } finally {

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/SwpProcessor.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/SwpProcessor.java?rev=745113&r1=745112&r2=745113&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/SwpProcessor.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/SwpProcessor.java Tue Feb 17 15:12:52 2009
@@ -21,6 +21,7 @@
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.activeblaze.impl.processor.DefaultChainedProcessor;
 import org.apache.activeblaze.impl.processor.Packet;
@@ -106,10 +107,20 @@
     }
 
     public void doStop() throws Exception {
-        super.doStop();
         if (this.statusTimer != null) {
-            this.statusTimer.cancel();
+            // Make sure we shutdown the timer before shutting down the down stream 
+            // processors to avoid the timer getting errors.
+            final CountDownLatch done = new CountDownLatch(1);
+            this.statusTimer.schedule(new TimerTask(){
+                @Override
+                public void run() {
+                    statusTimer.cancel();
+                    done.countDown();
+                }}, 0);
+            done.await();
+            this.statusTimer=null;
         }
+        super.doStop();
     }
 
     void sendDownStream(Packet packet) throws Exception {

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/BaseTransport.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/BaseTransport.java?rev=745113&r1=745112&r2=745113&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/BaseTransport.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/BaseTransport.java Tue Feb 17 15:12:52 2009
@@ -258,7 +258,12 @@
         } catch (InterruptedException e1) {
             // we've stopped
         } catch (Exception e) {
-            LOG.error("Caught an exception processing a packet: " + packet, e);
+            String value="";
+            try {
+                value = packet.toString();
+            } catch (Throwable ignore) {
+            }
+            LOG.error("Caught an exception processing a packet: " + value, e);
             stopInternal();
         }
     }

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/MulticastTransport.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/MulticastTransport.java?rev=745113&r1=745112&r2=745113&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/MulticastTransport.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/MulticastTransport.java Tue Feb 17 15:12:52 2009
@@ -22,9 +22,10 @@
 import java.net.MulticastSocket;
 import java.net.NetworkInterface;
 import java.net.SocketAddress;
+
 import org.apache.activeblaze.BlazeException;
 import org.apache.activeblaze.impl.processor.Packet;
-import org.apache.activeblaze.wire.PacketData;
+import org.apache.activeblaze.wire.PacketData.PacketDataBuffer;
 
 /**
  * Multicast transport
@@ -72,7 +73,7 @@
             DatagramPacket dp = new DatagramPacket(receiveData, receiveData.length);
             this.socket.receive(dp);
             if (dp.getLength() > 0) {
-                PacketData data = PacketData.parseFramed(dp.getData());
+                PacketDataBuffer data = PacketDataBuffer.parseFramed(dp.getData());
                 SocketAddress address = dp.getSocketAddress();
                 Packet packet = new Packet(address, data);
                 if (!isEnableAudit() || !this.audit.isDuplicate(packet)) {

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/UdpTransport.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/UdpTransport.java?rev=745113&r1=745112&r2=745113&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/UdpTransport.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/UdpTransport.java Tue Feb 17 15:12:52 2009
@@ -25,15 +25,18 @@
 import java.nio.ByteBuffer;
 import java.nio.channels.DatagramChannel;
 import java.util.Map;
+
 import org.apache.activeblaze.BlazeException;
 import org.apache.activeblaze.BlazeNoRouteException;
 import org.apache.activeblaze.impl.processor.Packet;
 import org.apache.activeblaze.util.IOUtils;
 import org.apache.activeblaze.util.LRUCache;
 import org.apache.activeblaze.util.SendRequest;
-import org.apache.activeblaze.wire.AckData;
 import org.apache.activeblaze.wire.MessageType;
 import org.apache.activeblaze.wire.PacketData;
+import org.apache.activeblaze.wire.AckData.AckDataBean;
+import org.apache.activeblaze.wire.PacketData.PacketDataBean;
+import org.apache.activeblaze.wire.PacketData.PacketDataBuffer;
 import org.apache.activemq.protobuf.Buffer;
 
 /**
@@ -47,7 +50,7 @@
 
     private ByteBuffer outBuffer;
 
-    private Map<Buffer, SendRequest> messageRequests = new LRUCache<Buffer, SendRequest>(
+    private Map<Buffer, SendRequest<PacketDataBuffer>> messageRequests = new LRUCache<Buffer, SendRequest<PacketDataBuffer>>(
             1000);
 
     public void doInit() throws Exception {
@@ -96,12 +99,11 @@
             buffer.flip();
             while (buffer.remaining() > 0) {
                 InputStream stream = IOUtils.getByteBufferInputStream(buffer);
-                PacketData data = PacketData.parseFramed(stream);
+                PacketDataBuffer data = PacketDataBuffer.parseFramed(stream);
                 stream.close();
                 if (data.getResponse()) {
                     synchronized (this.messageRequests) {
-                        SendRequest request = this.messageRequests.remove(data
-                                .getCorrelationId());
+                        SendRequest<PacketDataBuffer> request = this.messageRequests.remove(data.getCorrelationId());
                         if (request != null) {
                             request.put(data.getMessageId(), data);
                         }
@@ -124,12 +126,11 @@
     public void downStream(Packet packet) throws Exception {
         ByteBuffer buffer = this.outBuffer;
         if (isStarted()) {
-            SendRequest request = null;
+            SendRequest<PacketDataBuffer> request = null;
             if (packet.isResponseRequired()) {
                 synchronized (this.messageRequests) {
-                    request = new SendRequest();
-                    this.messageRequests.put(packet.getPacketData()
-                            .getMessageId(), request);
+                    request = new SendRequest<PacketDataBuffer>();
+                    this.messageRequests.put(packet.getPacketData().getMessageId(), request);
                 }
             }
             synchronized (buffer) {
@@ -159,18 +160,17 @@
     }
 
     private Packet createAckPacket(PacketData data) {
-        MessageType type = MessageType.ACK_DATA;
-        AckData ackData = (AckData) type.createMessage();
+        AckDataBean ackData = new AckDataBean();
         ackData.setSessionId(data.getSessionId());
         ackData.setStartSequence(data.getMessageSequence());
         ackData.setEndSequence(data.getMessageSequence());
-        PacketData pd = new PacketData();
+        PacketDataBean pd = new PacketDataBean();
         pd.setResponseRequired(false);
         pd.setCorrelationId(data.getMessageId());
         pd.setResponse(true);
-        pd.setPayload(ackData.toFramedBuffer());
-        pd.setType(type.getNumber());
-        Packet packet = new Packet(pd);
+        pd.setPayload(ackData.freeze().toUnframedBuffer());
+        pd.setType(MessageType.ACK_DATA);
+        Packet packet = new Packet(pd.freeze());
         return packet;
     }
 }

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsBytesMessage.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsBytesMessage.java?rev=745113&r1=745112&r2=745113&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsBytesMessage.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsBytesMessage.java Tue Feb 17 15:12:52 2009
@@ -31,6 +31,7 @@
 import org.apache.activeblaze.jms.BlazeJmsExceptionSupport;
 import org.apache.activeblaze.jms.message.BlazeJmsMessage.JmsMessageType;
 import org.apache.activeblaze.wire.BlazeData;
+import org.apache.activeblaze.wire.BlazeData.BlazeDataBean;
 import org.apache.activemq.protobuf.Buffer;
 import org.apache.activemq.protobuf.BufferInputStream;
 import org.apache.activemq.protobuf.BufferOutputStream;
@@ -116,7 +117,7 @@
             if (this.dataOut != null) {
                 this.dataOut.close();
                 Buffer bs = this.bytesOut.toBuffer();
-                getContent().setPayload(bs);
+                ((BlazeDataBean)getContent()).setPayload(bs);
                 this.bytesOut = null;
                 this.dataOut = null;
             }

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsMapMessage.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsMapMessage.java?rev=745113&r1=745112&r2=745113&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsMapMessage.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsMapMessage.java Tue Feb 17 15:12:52 2009
@@ -21,15 +21,17 @@
 import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.Map;
+
 import javax.jms.JMSException;
 import javax.jms.MapMessage;
 import javax.jms.MessageFormatException;
 import javax.jms.MessageNotWriteableException;
+
 import org.apache.activeblaze.BlazeException;
 import org.apache.activeblaze.BlazeRuntimeException;
-import org.apache.activeblaze.jms.message.BlazeJmsMessage.JmsMessageType;
-import org.apache.activeblaze.wire.BlazeData;
-import org.apache.activeblaze.wire.MapData;
+import org.apache.activeblaze.wire.BlazeData.BlazeDataBean;
+import org.apache.activeblaze.wire.MapData.MapDataBean;
+import org.apache.activeblaze.wire.MapData.MapDataBuffer;
 import org.apache.activemq.protobuf.Buffer;
 import org.apache.activemq.protobuf.InvalidProtocolBufferException;
 
@@ -111,13 +113,12 @@
     public void storeContent() {
         super.storeContent();
         if (getContent() != null && !this.map.isEmpty()) {
-            MapData mapData = new MapData();
+            MapDataBean mapData = new MapDataBean();
             for (Map.Entry<String, Object> entry : this.map.entrySet()) {
                 marshallMap(mapData, entry.getKey().toString(), entry.getValue());
             }
-            Buffer payload = mapData.toFramedBuffer();
-            BlazeData data = getContent();
-            data.setPayload(payload);
+            Buffer payload = mapData.freeze().toUnframedBuffer();
+            ((BlazeDataBean)getContent()).setPayload(payload);
         }
     }
 
@@ -129,7 +130,7 @@
     protected void loadContent() {
         if (getContent() != null && this.map.isEmpty()) {
             try {
-                MapData mapData = MapData.parseFramed(getContent().getPayload());
+                MapDataBuffer mapData = MapDataBuffer.parseUnframed(getContent().getPayload());
                 this.map = unmarshall(mapData);
             } catch (InvalidProtocolBufferException e) {
                 throw new BlazeRuntimeException(e);

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsMessage.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsMessage.java?rev=745113&r1=745112&r2=745113&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsMessage.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsMessage.java Tue Feb 17 15:12:52 2009
@@ -17,18 +17,20 @@
 package org.apache.activeblaze.jms.message;
 
 import java.util.Enumeration;
+
 import javax.jms.DeliveryMode;
 import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageFormatException;
+
 import org.apache.activeblaze.BlazeException;
 import org.apache.activeblaze.BlazeMessage;
 import org.apache.activeblaze.BlazeMessageFormatException;
 import org.apache.activeblaze.BlazeRuntimeException;
 import org.apache.activeblaze.jms.BlazeJmsDestination;
 import org.apache.activeblaze.util.Callback;
-import org.apache.activeblaze.wire.BlazeData;
+import org.apache.activeblaze.wire.BlazeData.BlazeDataBean;
 
 /**
  * Implementation of a Jms Message
@@ -99,7 +101,7 @@
      * @see javax.jms.Message#clearBody()
      */
     public void clearBody() throws JMSException {
-        BlazeData data = getContent();
+        BlazeDataBean data = (BlazeDataBean) getContent();
         if (data != null) {
             data.clearPayload();
         }
@@ -109,7 +111,7 @@
      * @see javax.jms.Message#clearProperties()
      */
     public void clearProperties() {
-        BlazeData data = getContent();
+        BlazeDataBean data = (BlazeDataBean) getContent();
         if (data != null) {
             data.clearMapData();
         }

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsObjectMessage.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsObjectMessage.java?rev=745113&r1=745112&r2=745113&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsObjectMessage.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsObjectMessage.java Tue Feb 17 15:12:52 2009
@@ -22,13 +22,15 @@
 import java.io.InputStream;
 import java.io.ObjectOutputStream;
 import java.io.Serializable;
+
 import javax.jms.JMSException;
 import javax.jms.ObjectMessage;
+
 import org.apache.activeblaze.BlazeException;
 import org.apache.activeblaze.BlazeRuntimeException;
 import org.apache.activeblaze.jms.BlazeJmsExceptionSupport;
-import org.apache.activeblaze.jms.message.BlazeJmsMessage.JmsMessageType;
 import org.apache.activeblaze.util.ClassLoadingAwareObjectInputStream;
+import org.apache.activeblaze.wire.BlazeData.BlazeDataBean;
 import org.apache.activemq.protobuf.Buffer;
 import org.apache.activemq.protobuf.BufferInputStream;
 import org.apache.activemq.protobuf.BufferOutputStream;
@@ -98,7 +100,8 @@
                 objOut.reset();
                 objOut.close();
                 payload = os.toBuffer();
-                getContent().setPayload(payload);
+                BlazeDataBean data = (BlazeDataBean) getContent();
+                data.setPayload(payload);
             } catch (IOException ioe) {
                 throw new RuntimeException(ioe.getMessage(), ioe);
             }

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsStreamMessage.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsStreamMessage.java?rev=745113&r1=745112&r2=745113&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsStreamMessage.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsStreamMessage.java Tue Feb 17 15:12:52 2009
@@ -32,6 +32,7 @@
 import org.apache.activeblaze.jms.BlazeJmsExceptionSupport;
 import org.apache.activeblaze.jms.message.BlazeJmsMessage.JmsMessageType;
 import org.apache.activeblaze.wire.BlazeData;
+import org.apache.activeblaze.wire.BlazeData.BlazeDataBean;
 import org.apache.activemq.protobuf.Buffer;
 import org.apache.activemq.protobuf.BufferInputStream;
 
@@ -135,7 +136,8 @@
             try {
                 this.dataOut.close();
                 Buffer buffer = new Buffer(bytesOut.toByteArray());
-                getContent().setPayload(buffer);
+                BlazeDataBean data = (BlazeDataBean) getContent();
+                data.setPayload(buffer);
                 this.bytesOut = null;
                 this.dataOut = null;
             } catch (IOException ioe) {

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsTextMessage.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsTextMessage.java?rev=745113&r1=745112&r2=745113&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsTextMessage.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsTextMessage.java Tue Feb 17 15:12:52 2009
@@ -24,6 +24,7 @@
 import org.apache.activeblaze.BlazeException;
 import org.apache.activeblaze.BlazeRuntimeException;
 import org.apache.activeblaze.jms.message.BlazeJmsMessage.JmsMessageType;
+import org.apache.activeblaze.wire.BlazeData.BlazeDataBean;
 import org.apache.activemq.protobuf.Buffer;
 import org.apache.activemq.protobuf.BufferInputStream;
 import org.apache.activemq.protobuf.BufferOutputStream;
@@ -74,7 +75,8 @@
             BufferOutputStream os = new BufferOutputStream(this.text != null ? this.text.length() : 10);
             DataOutputStream dataOut = new DataOutputStream(os);
             MarshallingSupport.writeUTF8(dataOut, this.text);
-            getContent().setPayload(os.toBuffer());
+            BlazeDataBean data = (BlazeDataBean) getContent();
+            data.setPayload(os.toBuffer());
             dataOut.close();
         } catch (IOException e) {
             throw new BlazeRuntimeException(e);

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/SendRequest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/SendRequest.java?rev=745113&r1=745112&r2=745113&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/SendRequest.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/SendRequest.java Tue Feb 17 15:12:52 2009
@@ -17,8 +17,8 @@
 package org.apache.activeblaze.util;
 
 import java.util.concurrent.atomic.AtomicBoolean;
+
 import org.apache.activemq.protobuf.Buffer;
-import org.apache.activemq.protobuf.Message;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -26,13 +26,13 @@
  * state on a request
  *
  */
-public class SendRequest {
+public class SendRequest<R> {
     private static final Log LOG = LogFactory.getLog(SendRequest.class);
     private final AtomicBoolean done = new AtomicBoolean();
-    private Message<?> response;
+    private R response;
     private RequestCallback callback;
 
-    public Object get(long timeout) {
+    public R get(long timeout) {
         synchronized (this.done) {
             if (this.done.get() == false && this.response == null) {
                 try {
@@ -45,7 +45,7 @@
         return this.response;
     }
 
-    public void put(Buffer id,Message<?> response) {
+    public void put(Buffer id, R response) {
         this.response = response;
         cancel();
         RequestCallback callback = this.callback;

Modified: activemq/activemq-blaze/trunk/src/main/proto/blaze.proto
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/proto/blaze.proto?rev=745113&r1=745112&r2=745113&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/proto/blaze.proto (original)
+++ activemq/activemq-blaze/trunk/src/main/proto/blaze.proto Tue Feb 17 15:12:52 2009
@@ -19,8 +19,6 @@
 option java_multiple_files = true;
 option optimize_for = SPEED;
 
-
-
 // We make use of the wonky comment style bellow because the following options
 // are not valid for protoc, but they are valid for the ActiveMQ proto compiler.
 // In the ActiveMQ proto compiler, comments terminate with the pipe character: |
@@ -33,226 +31,209 @@
   ACK_DATA = 3;
   NACK_DATA = 4;
   STATE_DATA = 5;
-  CONTROL_DATA =6;
+  CONTROL_DATA = 6;
+}
+
+message PacketData {   
+  optional bool responseRequired = 1;
+  optional bool reliable = 2;
+  optional bool response = 3;
+  optional bool replayed = 4;
+  optional MessageType type =5;  
+  optional bytes producerId = 6;
+  optional int32 sessionId = 7;
+  optional int64 messageSequence = 8;  
+  optional int32 numberOfParts= 9;
+  optional int32 partNumber= 10;
+  optional bytes payload= 11;
+  optional bytes messageId =12;
+  optional bytes correlationId = 13;
+}
+    
+message BlazeData {
+  optional bool persistent = 1;
+  optional int32 priority = 2;
+  optional int32 redeliveryCounter = 3;
+  optional int32 type =4;
+  optional int64 timestamp = 5;
+  optional int64 expiration = 6;
+  optional bytes messageId = 7;
+  optional bytes correlationId = 8;
+  optional bytes fromId =9;
+  optional bytes messageType = 10;
+  optional bytes payload = 11;
+  optional DestinationData destinationData = 12;  
+  optional DestinationData replyToData = 13;  
+  optional MapData mapData = 14;
+  optional bytes payload = 15;
+}
+    
+message AckData {
+  optional int64 id =1;
+  optional int64 startSequence =2;
+  optional int64 endSequence =3;
+  optional int64 sessionId = 4;
+}
+    
+message NackData {
+  optional int64 id =1;
+  optional int64 startSequence =2;
+  optional int64 endSequence =3;
+  optional int64 sessionId = 4;
+}
+    
+message ControlData {
+  optional int64 lastId =1; //last ack or nack id
+}
+    
+message DestinationData {
+  optional bytes name =1;
+  optional bool topic =2;
+  optional bool temporary=3;
+}
+    
+message SubscriptionData {
+  optional bool durable = 1;
+  optional bool noLocal = 2;
+  optional int32 weight = 3;
+  optional string channelName = 4;
+  optional string subscriberName = 5;
+  optional string selector = 6;
+  optional DestinationData destinationData = 7;
+}
+    
+message MemberData {
+  optional string id = 1;
+  optional string name = 2;
+  optional int64 startTime = 3;
+  optional int64 timeStamp = 4;
+  optional bytes inetAddress = 5;
+  optional int32 port = 6;
+  // a higher weight means this will be the master
+  optional int64 masterWeight = 7;
+  // if both weights are the same - the refined weight can be used
+  optional int64 refinedWeight = 8;
+  optional bool  subscriptionsChanged = 9;
+  optional bool  observer = 10;
+  optional bool  lockedMaster = 11;
+  repeated bytes groups = 12;
+  repeated SubscriptionData  subscriptionData = 13; 
+}
+    
+message StateKeyData {
+  optional MemberData member =1;
+  optional string key = 2;
+  optional bool locked = 3;
+  optional bool removeOnExit = 4;
+  optional bool releaseLockOnExit = 5;
+  optional int64 expiration = 6;
+  optional int64 lockExpiration = 7;
+}
+
+enum StateType {
+  INSERT = 1;
+  DELETE = 2;
+  SYNC = 3;
+}
+    
+message StateData {
+  optional StateKeyData keyData = 1;
+  optional bytes value =2;
+  optional bytes oldvalue =3;
+  optional bool mapUpdate = 4;
+  optional bool mapWrite = 5;
+  optional bool expired = 6;
+  optional bool lockExpired = 7;
+  optional bool lockUpdate = 8;
+  optional bool lockWrite = 9;
+  optional bool error = 10;
+  optional StateType stateType = 11;
+}
+
+enum ElectionType {
+  ELECTION = 0;
+  ANSWER = 1;
+  MASTER = 2;
+}
+  
+message ElectionMessage {
+  optional MemberData member = 1;
+  optional ElectionType electionType = 2;
 }
-    message PacketData {   
-      optional bool responseRequired = 1;
-      optional bool reliable = 2;
-      optional bool response = 3;
-      optional bool replayed = 4;
-      optional int32 type =5;  
-	    optional bytes producerId = 6;
-	    optional int32 sessionId = 7;
-      optional int64 messageSequence = 8;  
-      optional int32 numberOfParts= 9;
-      optional int32 partNumber= 10;
-      optional bytes payload= 11;
-      optional bytes messageId =12;
-      optional bytes correlationId = 13;
-	  
-    }
-    
-     message BlazeData {
-      //| option java_implments = "org.apache.activeblaze.impl.processor.PacketMessageType";
-    //| option java_type_method = "MessageType";
-     optional bool persistent = 1;
-     optional int32 priority = 2;
-     optional int32 redeliveryCounter = 3;
-     optional int32 type =4;
-     optional int64 timestamp = 5;
-     optional int64 expiration = 6;
-     optional bytes messageId = 7;
-     optional bytes correlationId = 8;
-     optional bytes fromId =9;
-     optional bytes messageType = 10;
-     optional bytes payload = 11;
-     optional DestinationData destinationData = 12;  
-     optional DestinationData replyToData = 13;  
-     optional MapData mapData = 14;
-     optional bytes payload = 15;
-      
-    }
-    
-    message AckData {
-     //| option java_implments = "org.apache.activeblaze.impl.processor.PacketMessageType";
-       //| option java_type_method = "MessageType";
-       optional int64 id =1;
-       optional int64 startSequence =2;
-       optional int64 endSequence =3;
-       optional int64 sessionId = 4;
-    }
-    
-    
-    
-    message NackData {
-     //| option java_implments = "org.apache.activeblaze.impl.processor.PacketMessageType";
-       //| option java_type_method = "MessageType";
-       optional int64 id =1;
-       optional int64 startSequence =2;
-       optional int64 endSequence =3;
-       optional int64 sessionId = 4;
-    }
-    
-    message ControlData {
-     //| option java_implments = "org.apache.activeblaze.impl.processor.PacketMessageType";
-       //| option java_type_method = "MessageType";
-       optional int64 lastId =1; //last ack or nack id
-    }
-    
-    message DestinationData {
-    optional bytes name =1;
-    optional bool topic =2;
-    optional bool temporary=3;
-    }
-    
-    message SubscriptionData {
-      optional bool durable = 1;
-      optional bool noLocal = 2;
-      optional int32 weight = 3;
-      optional string channelName = 4;
-      optional string subscriberName = 5;
-      optional string selector = 6;
-      optional DestinationData destinationData = 7;
-    }
-    
-    message MemberData {
-       //| option java_implments = "org.apache.activeblaze.impl.processor.PacketMessageType";
-       //| option java_type_method = "MessageType";
-       optional string id = 1;
-       optional string name = 2;
-       optional int64 startTime = 3;
-       optional int64 timeStamp = 4;
-       optional bytes inetAddress = 5;
-       optional int32 port = 6;
-       //a higher weight means this will be the master
-       optional int64 masterWeight = 7;
-       //if both weights are the same - the refined
-       //weight can be used
-       optional int64 refinedWeight = 8;
-       optional bool  subscriptionsChanged = 9;
-       optional bool  observer = 10;
-       optional bool  lockedMaster = 11;
-       repeated bytes groups = 12;
-       repeated SubscriptionData  subscriptionData = 13; 
-    }
-    
-    message StateKeyData {
-      optional MemberData member =1;
-      optional string key = 2;
-      optional bool locked = 3;
-      optional bool removeOnExit = 4;
-      optional bool releaseLockOnExit = 5;
-      optional int64 expiration = 6;
-      optional int64 lockExpiration = 7;
-    }
-    enum StateType {
-      INSERT = 1;
-      DELETE = 2;
-      SYNC = 3;
-    }
-    message StateData {
-     //| option java_implments = "org.apache.activeblaze.impl.processor.PacketMessageType";
-       //| option java_type_method = "MessageType";
-       optional StateKeyData keyData = 1;
-       optional bytes value =2;
-       optional bytes oldvalue =3;
-       optional bool mapUpdate = 4;
-       optional bool mapWrite = 5;
-       optional bool expired = 6;
-       optional bool lockExpired = 7;
-       optional bool lockUpdate = 8;
-       optional bool lockWrite = 9;
-       optional bool error = 10;
-       optional StateType stateType = 11;
-       
-    }
-    
-    enum ElectionType {
-    ELECTION = 0;
-    ANSWER = 1;
-    MASTER = 2;
-  }
-    message ElectionMessage {
-    //| option java_implments = "org.apache.activeblaze.impl.processor.PacketMessageType";
-       //| option java_type_method = "MessageType";
-       optional MemberData member = 1;
-       optional ElectionType electionType = 2;
-    }
    
+///////////////////////////////////////////////////////////////////////
+// Properties / MapData
+///////////////////////////////////////////////////////////////////////
+
+message StringType {
+  optional string name = 1;
+  optional string value = 2;
+}   
+
+message BoolType {
+  optional string name = 1;
+  optional bool value = 2;
+}
+
+message ByteType {
+  optional string name = 1;
+  optional int32 value = 2;
+}
+
+message ShortType {
+  optional string name = 1;
+  optional int32 value = 2;
+}
+
+message IntType {
+  optional string name = 1;
+  optional int32 value = 2;
+}
+
+message LongType {
+  optional string name = 1;
+  optional int64 value = 2;
+}
+
+message FloatType {
+  optional string name = 1;
+  optional float value = 2;
+}
     
-    // Properties
-    message StringType {
-      optional string name = 1;
-      optional string value = 2;
-    }   
-    
-    message BoolType {
-      optional string name = 1;
-      optional bool value = 2;
-    }
-    
-    message ByteType {
-      optional string name = 1;
-      optional int32 value = 2;
-    }
-    
-    message ShortType {
-      optional string name = 1;
-      optional int32 value = 2;
-    }
-    
-    message IntType {
-      optional string name = 1;
-      optional int32 value = 2;
-    }
-    
-    message LongType {
-      optional string name = 1;
-      optional int64 value = 2;
-    }
-    
-    message FloatType {
-      optional string name = 1;
-      optional float value = 2;
-    }
-    
-    message DoubleType {
-      optional string name = 1;
-      optional double value = 2;
-    }
-
-    message CharType {
-	  optional string name = 1;
-	  optional string value = 2;
-	}
-	
-	message BytesType {
-	  optional string name = 1;
-	  optional bytes value = 2;
-	}
-	
-	message BufferType {
-	  optional string name = 1;
-	  optional bytes  value = 2;
-	}
+message DoubleType {
+  optional string name = 1;
+  optional double value = 2;
+}
+
+message CharType {
+  optional string name = 1;
+  optional string value = 2;
+}
 	
+message BytesType {
+  optional string name = 1;
+  optional bytes value = 2;
+}
 	
+message BufferType {
+  optional string name = 1;
+  optional bytes  value = 2;
+}
     
-    message MapData {
-	  optional string name=1[default = "DEFAULT"];
-      repeated StringType stringType = 2;
-      repeated IntType intType = 3;
-      repeated BoolType boolType = 4;
-      repeated LongType longType = 5;
-      repeated DoubleType doubleType = 6;
-      repeated FloatType floatType = 7;
-      repeated ShortType shortType = 8;
-      repeated ByteType byteType = 9;
- 	    repeated CharType charType = 10;
-	    repeated BytesType  bytesType = 11;
-      repeated MapData  mapType = 12;
-      repeated BufferType  bufferType = 13;
-      
-    }
+message MapData {
+  optional string name=1[default = "DEFAULT"];
+  repeated StringType stringType = 2;
+  repeated IntType intType = 3;
+  repeated BoolType boolType = 4;
+  repeated LongType longType = 5;
+  repeated DoubleType doubleType = 6;
+  repeated FloatType floatType = 7;
+  repeated ShortType shortType = 8;
+  repeated ByteType byteType = 9;
+  repeated CharType charType = 10;
+  repeated BytesType  bytesType = 11;
+  repeated MapData  mapType = 12;
+  repeated BufferType  bufferType = 13;
+}
     
        
\ No newline at end of file

Modified: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/ClusterStateTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/ClusterStateTest.java?rev=745113&r1=745112&r2=745113&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/ClusterStateTest.java (original)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/ClusterStateTest.java Tue Feb 17 15:12:52 2009
@@ -355,7 +355,7 @@
         Object value = state1.put("foo", "blob");
         assertNull(value);
         value = state1.put("foo", "blah");
-        assertEquals(value, "blob");
+        assertEquals("blob", value);
     }
 
     public void testRemove() throws Exception {

Modified: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/group/BlazeGroupChannelTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/group/BlazeGroupChannelTest.java?rev=745113&r1=745112&r2=745113&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/group/BlazeGroupChannelTest.java (original)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/group/BlazeGroupChannelTest.java Tue Feb 17 15:12:52 2009
@@ -119,7 +119,7 @@
         reply.shutDown();
     }
 
-    public void testSendRequestString() throws Exception {
+    public void testSendRequestString() throws Exception {      
         String destination = "/test/foo";
         final int number = 10;
         final List<BlazeMessage> requests = new ArrayList<BlazeMessage>();

Modified: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/ChainedProcessorTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/ChainedProcessorTest.java?rev=745113&r1=745112&r2=745113&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/ChainedProcessorTest.java (original)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/ChainedProcessorTest.java Tue Feb 17 15:12:52 2009
@@ -17,8 +17,10 @@
 package org.apache.activeblaze.impl.processor;
 
 import java.util.concurrent.atomic.AtomicBoolean;
+
 import junit.framework.TestCase;
-import org.apache.activeblaze.wire.PacketData;
+
+import org.apache.activeblaze.wire.PacketData.PacketDataBean;
 
 /**
  * Test some basics in ChainedProcessor
@@ -72,7 +74,7 @@
         A.setEnd(D);
         A.setEnd(target);
         A.start();
-        Packet p = new Packet(new PacketData());
+        Packet p = new Packet(new PacketDataBean().freeze());
         D.downStream(p);
         assertTrue(test.get());
     }
@@ -93,7 +95,7 @@
         A.setEnd(C);
         A.setEnd(D);
         A.start();
-        Packet p = new Packet(new PacketData());
+        Packet p = new Packet(new PacketDataBean().freeze());
         D.upStream(p);
         assertTrue(test.get());
     }

Modified: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/CompressionProcessorTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/CompressionProcessorTest.java?rev=745113&r1=745112&r2=745113&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/CompressionProcessorTest.java (original)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/CompressionProcessorTest.java Tue Feb 17 15:12:52 2009
@@ -4,7 +4,8 @@
 package org.apache.activeblaze.impl.processor;
 
 import junit.framework.TestCase;
-import org.apache.activeblaze.wire.PacketData;
+
+import org.apache.activeblaze.wire.PacketData.PacketDataBean;
 import org.apache.activemq.protobuf.Buffer;
 
 /**
@@ -13,10 +14,9 @@
  */
 public class CompressionProcessorTest extends TestCase {
     public void testProcessor() throws Exception {
-        Packet packet = new Packet(new PacketData());
         byte[] d1 = new byte[1024];
         Buffer payload = new Buffer(d1);
-        packet.getPacketData().setPayload(payload);
+        Packet packet = new Packet(new PacketDataBean().setPayload(payload).freeze());
         TerminatedChainedProcessor test = new TerminatedChainedProcessor();
         CompressionProcessor proc = new CompressionProcessor();
         proc.setPrev(test);
@@ -30,7 +30,7 @@
             d2[i] = (byte) i;
         }
         payload = new Buffer(d2);
-        packet.getPacketData().setPayload(payload);
+        packet = new Packet(new PacketDataBean().setPayload(payload).freeze());
         proc.downStream(packet.clone());
         Packet result = test.getResult();
         assertTrue(CompressionProcessor.isCompressed(result.getPacketData().getPayload()));

Modified: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/FragmentationProcessorTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/FragmentationProcessorTest.java?rev=745113&r1=745112&r2=745113&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/FragmentationProcessorTest.java (original)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/FragmentationProcessorTest.java Tue Feb 17 15:12:52 2009
@@ -18,8 +18,10 @@
 
 import java.util.ArrayList;
 import java.util.List;
+
 import junit.framework.TestCase;
-import org.apache.activeblaze.wire.PacketData;
+
+import org.apache.activeblaze.wire.PacketData.PacketDataBean;
 import org.apache.activemq.protobuf.Buffer;
 
 /**
@@ -28,13 +30,13 @@
  */
 public class FragmentationProcessorTest extends TestCase {
     public void testProcessor() throws Exception {
-        Packet packet = new Packet(new PacketData());
         byte[] testData = new byte[1024 * 32];
         for (int i = 0; i < testData.length; i++) {
             testData[i] = (byte) i;
         }
         Buffer payload = new Buffer(testData);
-        packet.getPacketData().setPayload(payload);
+        Packet packet = new Packet(new PacketDataBean().setPayload(payload).freeze());
+        
         TerminatedChainedProcessor test = new TerminatedChainedProcessor();
         FragmentationProcessor proc = new FragmentationProcessor();
         proc.setPrev(test);

Modified: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/PacketAuditTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/PacketAuditTest.java?rev=745113&r1=745112&r2=745113&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/PacketAuditTest.java (original)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/PacketAuditTest.java Tue Feb 17 15:12:52 2009
@@ -17,7 +17,8 @@
 package org.apache.activeblaze.impl.processor;
 
 import junit.framework.TestCase;
-import org.apache.activeblaze.wire.PacketData;
+
+import org.apache.activeblaze.wire.PacketData.PacketDataBean;
 import org.apache.activemq.protobuf.Buffer;
 
 
@@ -35,18 +36,18 @@
         PacketAudit audit = new PacketAudit();
         audit.start();
         for (long i =0; i< audit.getMaxAuditDepth();i++) {
-            PacketData data = new PacketData();
+            PacketDataBean data = new PacketDataBean();
             data.setProducerId(new Buffer("fred"));
             data.setMessageSequence(i);
-            Packet packet = new Packet(data);
+            Packet packet = new Packet(data.freeze());
             assertFalse(audit.isDuplicate(packet));
         }
         
         for (long i =0; i< audit.getMaxAuditDepth();i++) {
-            PacketData data = new PacketData();
+            PacketDataBean data = new PacketDataBean();
             data.setProducerId(new Buffer("fred"));
             data.setMessageSequence(i);
-            Packet packet = new Packet(data);
+            Packet packet = new Packet(data.freeze());
             assertTrue("Testing " + i,audit.isDuplicate(packet));
         }
     }

Modified: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/reliable/swp/SwpProcessorTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/reliable/swp/SwpProcessorTest.java?rev=745113&r1=745112&r2=745113&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/reliable/swp/SwpProcessorTest.java (original)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/reliable/swp/SwpProcessorTest.java Tue Feb 17 15:12:52 2009
@@ -23,12 +23,14 @@
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+
 import junit.framework.TestCase;
+
 import org.apache.activeblaze.impl.processor.DefaultChainedProcessor;
 import org.apache.activeblaze.impl.processor.Packet;
 import org.apache.activeblaze.impl.transport.UdpTransport;
 import org.apache.activeblaze.util.IdGenerator;
-import org.apache.activeblaze.wire.PacketData;
+import org.apache.activeblaze.wire.PacketData.PacketDataBean;
 import org.apache.activemq.protobuf.Buffer;
 
 /**
@@ -93,7 +95,7 @@
         receiver.setLocalURI(this.receiverURI);
         this.consumer.start();
         for (int i = 0; i < number; i++) {
-            Packet packet = createPacket(this.to);
+            Packet packet = createPacket(this.to, false);
             this.producer.downStream(packet);
         }
         latch.await(10, TimeUnit.SECONDS);
@@ -129,8 +131,7 @@
         receiver.setLocalURI(this.receiverURI);
         this.consumer.start();
         for (int i = 0; i < number; i++) {
-            Packet packet = createPacket(this.to);
-            packet.getPacketData().setResponseRequired(true);
+            Packet packet = createPacket(this.to, true);
             this.producer.downStream(packet);
         }
         latch.await(10, TimeUnit.SECONDS);
@@ -146,12 +147,15 @@
         }
     }
 
-    protected Packet createPacket(SocketAddress to) throws Exception {
-        PacketData data = new PacketData();
+    protected Packet createPacket(SocketAddress to, boolean responseRequried) throws Exception {
+        PacketDataBean data = new PacketDataBean();
         data.setMessageId(new Buffer(this.idGenerator.generateId()));
         Buffer payload = new Buffer(new byte[1024]);
         data.setPayload(payload);
-        Packet packet = new Packet(data);
+        if( responseRequried ) {
+            data.setResponseRequired(true);
+        }
+        Packet packet = new Packet(data.freeze());
         packet.setTo(to);
         return packet;
     }

Modified: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/transport/MulticastTransportTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/transport/MulticastTransportTest.java?rev=745113&r1=745112&r2=745113&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/transport/MulticastTransportTest.java (original)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/transport/MulticastTransportTest.java Tue Feb 17 15:12:52 2009
@@ -18,11 +18,14 @@
 
 import java.net.InetSocketAddress;
 import java.net.URI;
+
+import junit.framework.TestCase;
+
 import org.apache.activeblaze.impl.processor.Packet;
 import org.apache.activeblaze.impl.processor.TerminatedChainedProcessor;
-import org.apache.activeblaze.wire.PacketData;
+import org.apache.activeblaze.wire.MessageType;
+import org.apache.activeblaze.wire.PacketData.PacketDataBean;
 import org.apache.activemq.protobuf.Buffer;
-import junit.framework.TestCase;
 
 /**
  * Test Multicast Transport
@@ -44,14 +47,14 @@
         receiver.start();
         String payload = "test String";
         Buffer duff = new Buffer("duff");
-        PacketData packetData = new PacketData();
-        packetData.setType(1);
+        PacketDataBean packetData = new PacketDataBean();
+        packetData.setType(MessageType.MEMBER_DATA);
         packetData.setMessageId(new Buffer("foo"));
         packetData.setProducerId(duff);
         packetData.setSessionId(1);
         packetData.setMessageSequence(0);
         packetData.setPayload(new Buffer(payload));
-        Packet packet = new Packet(packetData);
+        Packet packet = new Packet(packetData.freeze());
         packet.setTo(to);
         sender.downStream(packet);
         Thread.sleep(500);

Modified: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/transport/UdpTransportTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/transport/UdpTransportTest.java?rev=745113&r1=745112&r2=745113&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/transport/UdpTransportTest.java (original)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/transport/UdpTransportTest.java Tue Feb 17 15:12:52 2009
@@ -16,14 +16,14 @@
  */
 package org.apache.activeblaze.impl.transport;
 
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
 import java.net.URI;
+
 import junit.framework.TestCase;
 
 import org.apache.activeblaze.impl.processor.Packet;
 import org.apache.activeblaze.impl.processor.TerminatedChainedProcessor;
-import org.apache.activeblaze.wire.PacketData;
+import org.apache.activeblaze.wire.MessageType;
+import org.apache.activeblaze.wire.PacketData.PacketDataBean;
 import org.apache.activemq.protobuf.Buffer;
 
 
@@ -52,14 +52,14 @@
         
             String payload = "test String";
             Buffer duff = new Buffer("duff");
-            PacketData packetData = new PacketData();
-            packetData.setType(1);
+            PacketDataBean packetData = new PacketDataBean();
+            packetData.setType(MessageType.MEMBER_DATA);
             packetData.setMessageId(new Buffer("foo"));
             packetData.setProducerId(duff);
             packetData.setSessionId(1);
             packetData.setMessageSequence(0);
             packetData.setPayload(new Buffer(payload));
-            Packet packet = new Packet(receiverURI.getHost(),receiverURI.getPort(),packetData);
+            Packet packet = new Packet(receiverURI.getHost(),receiverURI.getPort(),packetData.freeze());
             
            
             sender.downStream(packet);