You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2009/03/13 10:04:24 UTC

svn commit: r753178 - in /activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze: ./ group/ impl/reliable/simple/ jms/ jms/message/

Author: rajdavies
Date: Fri Mar 13 09:04:23 2009
New Revision: 753178

URL: http://svn.apache.org/viewvc?rev=753178&view=rev
Log:
remove depreciated code

Removed:
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/simple/SimpleFlow.java
Modified:
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessage.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessageProcessor.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/simple/SimpleReliableProcessor.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnection.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsMessageTransformation.java

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java?rev=753178&r1=753177&r2=753178&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java Fri Mar 13 09:04:23 2009
@@ -19,7 +19,6 @@
 import java.net.URI;
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
-
 import org.apache.activeblaze.impl.network.Network;
 import org.apache.activeblaze.impl.network.NetworkFactory;
 import org.apache.activeblaze.impl.processor.ChainedProcessor;
@@ -38,6 +37,7 @@
 import org.apache.activeblaze.wire.PacketData.PacketDataBuffer;
 import org.apache.activemq.protobuf.Buffer;
 import org.apache.activemq.protobuf.MessageBuffer;
+import org.apache.activemq.protobuf.UTF8Buffer;
 /**
  * <P>
  * A <CODE>BlazeChannel</CODE> handles all client communication, either unicast,
@@ -185,14 +185,7 @@
     }
 
     public synchronized void broadcast(Destination destination, BlazeMessage msg) throws Exception {
-        msg.setDestination(destination);
-        msg.storeContent();
-        BlazeDataBuffer blazeData = msg.getContent().freeze();
-        PacketDataBean packetData = getPacketData(MessageType.BLAZE_DATA, blazeData);
-        packetData.setReliable(true);
-        packetData.setDestinationData(destination.getData());
-        packetData.setPayloadType(msg.getType());
-        Packet packet = new Packet(packetData.freeze());
+        Packet packet = buildPacket(destination, msg);
         this.broadcast.downStream(packet);
     }
 
@@ -229,19 +222,19 @@
     public void setConfiguration(BlazeConfiguration configuration) {
         this.configuration = configuration;
     }
-    
 
     /**
      * @return the blazeMessageProcessor
      */
-    public BlazeMessageProcessor getBlazeMessageProcessor(){
+    public BlazeMessageProcessor getBlazeMessageProcessor() {
         return this.blazeMessageProcessor;
     }
 
     /**
-     * @param blazeMessageProcessor the blazeMessageProcessor to set
+     * @param blazeMessageProcessor
+     *            the blazeMessageProcessor to set
      */
-    public void setBlazeMessageProcessor(BlazeMessageProcessor blazeMessageProcessor){
+    public void setBlazeMessageProcessor(BlazeMessageProcessor blazeMessageProcessor) {
         this.blazeMessageProcessor = blazeMessageProcessor;
     }
 
@@ -261,29 +254,28 @@
         BlazeMessage result = null;
         if (this.blazeMessageProcessor != null) {
             result = this.blazeMessageProcessor.processBlazeMessage(data);
-        }else {
-            
-        if (data != null) {
-            DestinationData destination = data.getDestinationData();
-            Buffer payload = data.getPayload();
-            BlazeDataBuffer blazeData = BlazeDataBuffer.parseUnframed(payload);
-            String fromId = null;
-            if (data.hasProducerId()) {
-                fromId = data.getProducerId().toStringUtf8();
-            }
-            result = createMessage(fromId);
-            result.setDestination(destination);
-            result.setFromId(fromId);
-            if (data.hasMessageId()) {
-                result.setMessageId(data.getMessageId().toStringUtf8());
-            }
-            if (data.hasCorrelationId()) {
-                result.setCorrelationId(data.getCorrelationId().toStringUtf8());
+        } else {
+            if (data != null) {
+                DestinationData destination = data.getDestinationData();
+                Buffer payload = data.getPayload();
+                BlazeDataBuffer blazeData = BlazeDataBuffer.parseUnframed(payload);
+                String fromId = null;
+                if (data.hasProducerId()) {
+                    fromId = data.getProducerId().toStringUtf8();
+                }
+                result = createMessage(fromId);
+                result.setDestination(destination);
+                result.setFromId(fromId);
+                if (data.hasMessageId()) {
+                    result.setMessageId(data.getMessageId().toStringUtf8());
+                }
+                if (data.hasCorrelationId()) {
+                    result.setCorrelationId(data.getCorrelationId().toStringUtf8());
+                }
+                result.setTimeStamp(blazeData.getTimestamp());
+                result.setType(data.getPayloadType());
+                result.setContent(blazeData);
             }
-            result.setTimeStamp(blazeData.getTimestamp());
-            result.setType(data.getPayloadType());
-            result.setContent(blazeData);
-        }
         }
         return result;
     }
@@ -306,4 +298,33 @@
             }
         }
     }
+
+    protected final Packet buildPacket(Destination destination, BlazeMessage message) {
+        return buildPacket(destination, message, false,null);
+    }
+    
+    protected final Packet buildPacket(Destination destination, BlazeMessage message,String correlationId) {
+        return buildPacket(destination, message, false,correlationId);
+    }
+    
+    protected final Packet buildPacket(Destination destination, BlazeMessage message,boolean responseRequired) {
+        return buildPacket(destination, message, responseRequired,null);
+    }
+
+    protected final Packet buildPacket(Destination destination, BlazeMessage message, boolean responseRequired,String correlationId) {
+        message.setDestination(destination);
+        message.storeContent();
+        
+        BlazeDataBuffer blazeData = message.getContent().freeze();
+        PacketDataBean packetData = getPacketData(MessageType.BLAZE_DATA, blazeData);
+        packetData.setReliable(true);
+        packetData.setResponseRequired(responseRequired);
+        if (correlationId != null && correlationId.length() > 0){
+            packetData.setCorrelationId(new UTF8Buffer(correlationId));
+        }
+        packetData.setDestinationData(destination.getData());
+        packetData.setPayloadType(message.getType());
+        Packet packet = new Packet(packetData.freeze());
+        return packet;
+    }
 }

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessage.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessage.java?rev=753178&r1=753177&r2=753178&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessage.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessage.java Fri Mar 13 09:04:23 2009
@@ -23,7 +23,6 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-
 import org.apache.activeblaze.util.IOUtils;
 import org.apache.activeblaze.wire.BlazeData;
 import org.apache.activeblaze.wire.BoolType;
@@ -53,26 +52,33 @@
 import org.apache.activeblaze.wire.ShortType.ShortTypeBean;
 import org.apache.activeblaze.wire.StringType.StringTypeBean;
 import org.apache.activemq.protobuf.Buffer;
-
+import org.apache.activemq.protobuf.UTF8Buffer;
 /**
- * A <CODE>BlazeMessage</CODE> object is used to send a set of name-value pairs. The names are <CODE>String</CODE>
- * objects, and the values are primitive data types in the Java programming language. The names must have a value that
- * is not null, and not an empty string. The entries can be accessed sequentially or randomly by name. The order of the
- * entries is undefined. <CODE>BlazeMessage</CODE> inherits from the <CODE>Message</CODE> interface and adds a
- * message body that contains a Map.
+ * A <CODE>BlazeMessage</CODE> object is used to send a set of name-value pairs.
+ * The names are <CODE>String</CODE> objects, and the values are primitive data
+ * types in the Java programming language. The names must have a value that is
+ * not null, and not an empty string. The entries can be accessed sequentially
+ * or randomly by name. The order of the entries is undefined.
+ * <CODE>BlazeMessage</CODE> inherits from the <CODE>Message</CODE> interface
+ * and adds a message body that contains a Map.
  * <P>
- * The primitive types can be read or written explicitly using methods for each type. They may also be read or written
- * generically as objects. For instance, a call to <CODE>BlazeMessage.setInt("foo", 6)</CODE> is equivalent to
- * <CODE> BlazeMessage.setObject("foo", new Integer(6))</CODE>. Both forms are provided, because the explicit form is
- * convenient for static programming, and the object form is needed when types are not known at compile time.
+ * The primitive types can be read or written explicitly using methods for each
+ * type. They may also be read or written generically as objects. For instance,
+ * a call to <CODE>BlazeMessage.setInt("foo", 6)</CODE> is equivalent to
+ * <CODE> BlazeMessage.setObject("foo", new Integer(6))</CODE>. Both forms are
+ * provided, because the explicit form is convenient for static programming, and
+ * the object form is needed when types are not known at compile time.
  * <P>
  * <P>
- * <CODE>BlazeMessage</CODE> objects support the following conversion table. The marked cases must be supported. The
- * unmarked cases must throw a <CODE>JMSException</CODE>. The <CODE>String</CODE> -to-primitive conversions may
- * throw a runtime exception if the primitive's <CODE>valueOf()</CODE> method does not accept it as a valid
- * <CODE> String</CODE> representation of the primitive.
+ * <CODE>BlazeMessage</CODE> objects support the following conversion table. The
+ * marked cases must be supported. The unmarked cases must throw a
+ * <CODE>JMSException</CODE>. The <CODE>String</CODE> -to-primitive conversions
+ * may throw a runtime exception if the primitive's <CODE>valueOf()</CODE>
+ * method does not accept it as a valid <CODE> String</CODE> representation of
+ * the primitive.
  * <P>
- * A value written as the row type can be read as the column type. <p/>
+ * A value written as the row type can be read as the column type.
+ * <p/>
  * 
  * <PRE>
  * | | boolean byte short char int long float double String byte[] |----------------------------------------------------------------------
@@ -83,13 +89,14 @@
  * 
  * <p/>
  * <P>
- * Attempting to read a null value as a primitive type must be treated as calling the primitive's corresponding
- * <code>valueOf(String)</code> conversion method with a null value. Since <code>char</code> does not support a
- * <code>String</code> conversion, attempting to read a null value as a <code>char</code> must throw a
- * <code>NullPointerException</code>.
+ * Attempting to read a null value as a primitive type must be treated as
+ * calling the primitive's corresponding <code>valueOf(String)</code> conversion
+ * method with a null value. Since <code>char</code> does not support a
+ * <code>String</code> conversion, attempting to read a null value as a
+ * <code>char</code> must throw a <code>NullPointerException</code>.
  * 
  */
-public class BlazeMessage implements Map<String, Object>{
+public class BlazeMessage implements Map<String, Object> {
     private static final String DEFAULT_TEXT_PAYLOAD = "DEFAULT_TEXT_PAYLOAD";
     private static final String DEFAULT_BYTES_PAYLOAD = "DEFAULT_BYTES_PAYLOAD";
     private static final String DEFAULT_OBJECT_PAYLOAD = "DEFAULT_OBJECT_PAYLOAD";
@@ -107,22 +114,24 @@
     private transient boolean persistent;
     private transient int type;
     private BlazeData content;
-    
+    private transient boolean loaded;
+
     /**
      * Default Constructor
      */
     public BlazeMessage() {
     }
-    
+
     /**
-     * Constructor - Utility to construct a message with a text <Code>String</Code> payload
+     * Constructor - Utility to construct a message with a text
+     * <Code>String</Code> payload
      * 
      * @param text
      */
     public BlazeMessage(String text) {
         setStringValue(DEFAULT_TEXT_PAYLOAD, text);
     }
-    
+
     /**
      * Constructor - Utility to construct a message with a byte[] array payload
      * 
@@ -131,7 +140,7 @@
     public BlazeMessage(byte[] data) {
         setBytesValue(DEFAULT_BYTES_PAYLOAD, data);
     }
-    
+
     /**
      * Constructor - Utility to construct a message with an object payload
      * 
@@ -140,269 +149,282 @@
     public BlazeMessage(Object data) {
         setObject(data);
     }
-    
+
     /**
      * Utility method for setting a default <Code>String</Code> payload
      * 
      * @param text
      */
-    public void setText(String text){
+    public void setText(String text) {
         setStringValue(DEFAULT_TEXT_PAYLOAD, text);
     }
-    
+
     /**
-     * Utility method used for when a BlazeMessage is only carrying a byte[] array
+     * Utility method used for when a BlazeMessage is only carrying a byte[]
+     * array
      * 
      * @return text the default text
      * @throws Exception
      */
-    public String getText() throws Exception{
+    public String getText() throws Exception {
         return getStringValue(DEFAULT_TEXT_PAYLOAD);
     }
-    
+
     /**
      * Utility method for setting a default <Code>byte[]</Code> payload
      * 
      * @param payload
      */
-    public void setBytes(byte[] payload){
+    public void setBytes(byte[] payload) {
         setBytesValue(DEFAULT_BYTES_PAYLOAD, payload);
     }
-    
+
     /**
      * Utility method used for when a BlazeMessage is only carrying an Object
      * 
      * @return text the default text
      * @throws Exception
      */
-    public Object getObject() throws Exception{
+    public Object getObject() throws Exception {
         Buffer buffer = getBufferValue(DEFAULT_OBJECT_PAYLOAD);
         return IOUtils.getObject(buffer);
     }
-    
+
     /**
      * Utility method for setting a default <Code>Object</Code> payload
      * 
      * @param payload
      */
-    public void setObject(Object payload){
+    public void setObject(Object payload) {
         try {
             put(DEFAULT_OBJECT_PAYLOAD, IOUtils.getBuffer(payload));
         } catch (Exception e) {
             throw new BlazeRuntimeException(e);
         }
     }
-    
+
     /**
      * Utility method used for when a BlazeMessage is only carrying a String
      * 
      * @return text the default text
      * @throws Exception
      */
-    public byte[] getBytes() throws Exception{
+    public byte[] getBytes() throws Exception {
         return getBytesValue(DEFAULT_BYTES_PAYLOAD);
     }
-    
+
     /**
      * @return the destination
      */
-    public Destination getDestination(){
+    public Destination getDestination() {
         initializeReading();
         return this.destination;
     }
-    
+
     /**
-     * @param destination the destination to set
+     * @param destination
+     *            the destination to set
      */
-    public void setDestination(Destination destination){
+    public void setDestination(Destination destination) {
         this.destination = destination;
     }
-    
+
     /**
      * @param destination
      */
-    public void setDestination(DestinationData destinationData){
+    public void setDestination(DestinationData destinationData) {
         if (destinationData != null) {
             this.destination = new Destination(destinationData);
         }
     }
-    
+
     /**
      * The id of the channel that sent the message
      * 
      * @return the fromId
      */
-    public String getFromId(){
+    public String getFromId() {
         initializeReading();
         return this.fromId;
     }
-    
+
     /**
-     * @param fromId the fromId to set
+     * @param fromId
+     *            the fromId to set
      */
-    public void setFromId(String fromId){
+    public void setFromId(String fromId) {
         this.fromId = fromId;
     }
-    
+
     /**
      * @return the messageId
      */
-    public String getMessageId(){
+    public String getMessageId() {
         initializeReading();
         return this.messageId;
     }
-    
+
     /**
-     * @param messageId the messageId to set
+     * @param messageId
+     *            the messageId to set
      */
-    public void setMessageId(String messageId){
+    public void setMessageId(String messageId) {
         this.messageId = messageId;
     }
-    
+
     /**
      * @return the correlationId
      */
-    public String getCorrelationId(){
+    public String getCorrelationId() {
         initializeReading();
         return this.correlationId;
     }
-    
+
     /**
-     * @param correlationId the correlationId to set
+     * @param correlationId
+     *            the correlationId to set
      */
-    public void setCorrelationId(String correlationId){
+    public void setCorrelationId(String correlationId) {
         this.correlationId = correlationId;
     }
-    
+
     /**
      * @return the timeStamp
      */
-    public long getTimeStamp(){
+    public long getTimeStamp() {
         initializeReading();
         return this.timeStamp;
     }
-    
+
     /**
-     * @param timeStamp the timeStamp to set
+     * @param timeStamp
+     *            the timeStamp to set
      */
-    public void setTimeStamp(long timeStamp){
+    public void setTimeStamp(long timeStamp) {
         this.timeStamp = timeStamp;
     }
-    
+
     /**
      * @return the replyTo
      */
-    public Destination getReplyTo(){
+    public Destination getReplyTo() {
         initializeReading();
         return this.replyTo;
     }
-    
+
     /**
-     * @param replyTo the replyTo to set
+     * @param replyTo
+     *            the replyTo to set
      */
-    public void setReplyTo(Destination replyTo){
+    public void setReplyTo(Destination replyTo) {
         this.replyTo = replyTo;
     }
-    
+
     /**
-     * @param replyTo the replyTo to set
+     * @param replyTo
+     *            the replyTo to set
      */
-    public void setReplyTo(DestinationData replyTo){
+    public void setReplyTo(DestinationData replyTo) {
         this.replyTo = new Destination(replyTo);
     }
-    
+
     /**
      * @return the expiration
      */
-    public long getExpiration(){
+    public long getExpiration() {
         initializeReading();
         return this.expiration;
     }
-    
+
     /**
-     * @param expiration the expiration to set
+     * @param expiration
+     *            the expiration to set
      */
-    public void setExpiration(long expiration){
+    public void setExpiration(long expiration) {
         this.expiration = expiration;
     }
-    
+
     /**
      * @return the redeliveryCounter
      */
-    public int getRedeliveryCounter(){
+    public int getRedeliveryCounter() {
         initializeReading();
         return this.redeliveryCounter;
     }
-    
+
     /**
-     * @param redeliveryCounter the redeliveryCounter to set
+     * @param redeliveryCounter
+     *            the redeliveryCounter to set
      */
-    public void setRedeliveryCounter(int redeliveryCounter){
+    public void setRedeliveryCounter(int redeliveryCounter) {
         this.redeliveryCounter = redeliveryCounter;
     }
-    
+
     /**
      * @return the priority
      */
-    public int getPriority(){
+    public int getPriority() {
         initializeReading();
         return this.priority;
     }
-    
+
     /**
-     * @param priority the priority to set
+     * @param priority
+     *            the priority to set
      */
-    public void setPriority(int priority){
+    public void setPriority(int priority) {
         this.priority = priority;
     }
-    
+
     /**
      * @return the persistent
      */
-    public boolean isPersistent(){
+    public boolean isPersistent() {
         initializeReading();
         return this.persistent;
     }
-    
+
     /**
-     * @param persistent the persistent to set
+     * @param persistent
+     *            the persistent to set
      */
-    public void setPersistent(boolean persistent){
+    public void setPersistent(boolean persistent) {
         this.persistent = persistent;
     }
-    
+
     /**
      * @return the type
      */
-    public String getMessageType(){
+    public String getMessageType() {
         initializeReading();
         return this.messageType;
     }
-    
+
     /**
-     * @param type the type to set
+     * @param type
+     *            the type to set
      */
-    public void setMessageType(String type){
+    public void setMessageType(String type) {
         this.messageType = type;
     }
-    
+
     /**
      * Get the type
      * 
      * @return the type
      */
-    public int getType(){    
+    public int getType() {
         return this.type;
     }
-    
-    public void setType(int type){       
+
+    public void setType(int type) {
         this.type = type;
     }
-    
+
     /**
      * @return a copy of this message
      */
-    public BlazeMessage clone(){
+    public BlazeMessage clone() {
         BlazeMessage copy = new BlazeMessage();
         try {
             copy(copy);
@@ -411,22 +433,24 @@
         }
         return copy;
     }
-    
+
     /**
      * clear the contents of this message
      */
-    public void clear(){
+    public void clear() {
         this.map.clear();
     }
-    
+
     /**
      * Returns the <CODE>boolean</CODE> value with the specified name.
      * 
-     * @param name the name of the <CODE>boolean</CODE>
+     * @param name
+     *            the name of the <CODE>boolean</CODE>
      * @return the <CODE>boolean</CODE> value with the specified name
-     * @throws BlazeMessageFormatException if this type conversion is invalid.
+     * @throws BlazeMessageFormatException
+     *             if this type conversion is invalid.
      */
-    public boolean getBooleanValue(String name) throws BlazeMessageFormatException{
+    public boolean getBooleanValue(String name) throws BlazeMessageFormatException {
         initializeReading();
         Object value = this.map.get(name);
         if (value == null) {
@@ -440,15 +464,17 @@
         }
         throw new BlazeMessageFormatException(" cannot read a boolean from " + value.getClass().getName());
     }
-    
+
     /**
      * Returns the <CODE>byte</CODE> value with the specified name.
      * 
-     * @param name the name of the <CODE>byte</CODE>
+     * @param name
+     *            the name of the <CODE>byte</CODE>
      * @return the <CODE>byte</CODE> value with the specified name
-     * @throws BlazeMessageFormatException if this type conversion is invalid.
+     * @throws BlazeMessageFormatException
+     *             if this type conversion is invalid.
      */
-    public byte getByteValue(String name) throws BlazeMessageFormatException{
+    public byte getByteValue(String name) throws BlazeMessageFormatException {
         initializeReading();
         Object value = this.map.get(name);
         if (value == null) {
@@ -462,15 +488,17 @@
         }
         throw new BlazeMessageFormatException(" cannot read a byte from " + value.getClass().getName());
     }
-    
+
     /**
      * Returns the <CODE>short</CODE> value with the specified name.
      * 
-     * @param name the name of the <CODE>short</CODE>
+     * @param name
+     *            the name of the <CODE>short</CODE>
      * @return the <CODE>short</CODE> value with the specified name
-     * @throws BlazeMessageFormatException if this type conversion is invalid.
+     * @throws BlazeMessageFormatException
+     *             if this type conversion is invalid.
      */
-    public short getShortValue(String name) throws BlazeMessageFormatException{
+    public short getShortValue(String name) throws BlazeMessageFormatException {
         initializeReading();
         Object value = this.map.get(name);
         if (value == null) {
@@ -487,15 +515,17 @@
         }
         throw new BlazeMessageFormatException(" cannot read a short from " + value.getClass().getName());
     }
-    
+
     /**
      * Returns the Unicode character value with the specified name.
      * 
-     * @param name the name of the Unicode character
+     * @param name
+     *            the name of the Unicode character
      * @return the Unicode character value with the specified name
-     * @throws BlazeMessageFormatException if this type conversion is invalid.
+     * @throws BlazeMessageFormatException
+     *             if this type conversion is invalid.
      */
-    public char getCharValue(String name) throws BlazeMessageFormatException{
+    public char getCharValue(String name) throws BlazeMessageFormatException {
         initializeReading();
         Object value = this.map.get(name);
         if (value == null) {
@@ -506,15 +536,17 @@
         }
         throw new BlazeMessageFormatException(" cannot read a short from " + value.getClass().getName());
     }
-    
+
     /**
      * Returns the <CODE>int</CODE> value with the specified name.
      * 
-     * @param name the name of the <CODE>int</CODE>
+     * @param name
+     *            the name of the <CODE>int</CODE>
      * @return the <CODE>int</CODE> value with the specified name
-     * @throws BlazeMessageFormatException if this type conversion is invalid.
+     * @throws BlazeMessageFormatException
+     *             if this type conversion is invalid.
      */
-    public int getIntValue(String name) throws BlazeMessageFormatException{
+    public int getIntValue(String name) throws BlazeMessageFormatException {
         initializeReading();
         Object value = this.map.get(name);
         if (value == null) {
@@ -534,15 +566,17 @@
         }
         throw new BlazeMessageFormatException(" cannot read an int from " + value.getClass().getName());
     }
-    
+
     /**
      * Returns the <CODE>long</CODE> value with the specified name.
      * 
-     * @param name the name of the <CODE>long</CODE>
+     * @param name
+     *            the name of the <CODE>long</CODE>
      * @return the <CODE>long</CODE> value with the specified name
-     * @throws BlazeMessageFormatException if this type conversion is invalid.
+     * @throws BlazeMessageFormatException
+     *             if this type conversion is invalid.
      */
-    public long getLongValue(String name) throws BlazeMessageFormatException{
+    public long getLongValue(String name) throws BlazeMessageFormatException {
         initializeReading();
         Object value = this.map.get(name);
         if (value == null) {
@@ -565,15 +599,17 @@
         }
         throw new BlazeMessageFormatException(" cannot read a long from " + value.getClass().getName());
     }
-    
+
     /**
      * Returns the <CODE>float</CODE> value with the specified name.
      * 
-     * @param name the name of the <CODE>float</CODE>
+     * @param name
+     *            the name of the <CODE>float</CODE>
      * @return the <CODE>float</CODE> value with the specified name
-     * @throws BlazeMessageFormatException if this type conversion is invalid.
+     * @throws BlazeMessageFormatException
+     *             if this type conversion is invalid.
      */
-    public float getFloatValue(String name) throws BlazeMessageFormatException{
+    public float getFloatValue(String name) throws BlazeMessageFormatException {
         initializeReading();
         Object value = this.map.get(name);
         if (value == null) {
@@ -587,15 +623,17 @@
         }
         throw new BlazeMessageFormatException(" cannot read a float from " + value.getClass().getName());
     }
-    
+
     /**
      * Returns the <CODE>double</CODE> value with the specified name.
      * 
-     * @param name the name of the <CODE>double</CODE>
+     * @param name
+     *            the name of the <CODE>double</CODE>
      * @return the <CODE>double</CODE> value with the specified name
-     * @throws BlazeMessageFormatException if this type conversion is invalid.
+     * @throws BlazeMessageFormatException
+     *             if this type conversion is invalid.
      */
-    public double getDoubleValue(String name) throws BlazeMessageFormatException{
+    public double getDoubleValue(String name) throws BlazeMessageFormatException {
         initializeReading();
         Object value = this.map.get(name);
         if (value == null) {
@@ -612,16 +650,18 @@
         }
         throw new BlazeMessageFormatException(" cannot read a double from " + value.getClass().getName());
     }
-    
+
     /**
      * Returns the <CODE>String</CODE> value with the specified name.
      * 
-     * @param name the name of the <CODE>String</CODE>
-     * @return the <CODE>String</CODE> value with the specified name; if there is no item by this name, a null value
-     *         is returned
-     * @throws BlazeMessageFormatException if this type conversion is invalid.
+     * @param name
+     *            the name of the <CODE>String</CODE>
+     * @return the <CODE>String</CODE> value with the specified name; if there
+     *         is no item by this name, a null value is returned
+     * @throws BlazeMessageFormatException
+     *             if this type conversion is invalid.
      */
-    public String getStringValue(String name) throws BlazeMessageFormatException{
+    public String getStringValue(String name) throws BlazeMessageFormatException {
         initializeReading();
         Object value = this.map.get(name);
         if (value == null) {
@@ -632,15 +672,18 @@
         }
         return value.toString();
     }
-    
+
     /**
      * Returns the byte array value with the specified name.
      * 
-     * @param name the name of the byte array
-     * @return the byte array value with the specified name; if there is no item by this name, a null value is returned.
-     * @throws BlazeMessageFormatException if this type conversion is invalid.
+     * @param name
+     *            the name of the byte array
+     * @return the byte array value with the specified name; if there is no item
+     *         by this name, a null value is returned.
+     * @throws BlazeMessageFormatException
+     *             if this type conversion is invalid.
      */
-    public byte[] getBytesValue(String name) throws BlazeMessageFormatException{
+    public byte[] getBytesValue(String name) throws BlazeMessageFormatException {
         initializeReading();
         Object value = this.map.get(name);
         if (value instanceof byte[]) {
@@ -648,15 +691,18 @@
         }
         throw new BlazeMessageFormatException(" cannot read a byte[] from " + value.getClass().getName());
     }
-    
+
     /**
      * Returns a Buffer with the specified name.
      * 
-     * @param name the name of the byte array
-     * @return the byte array value with the specified name; if there is no item by this name, a null value is returned.
-     * @throws BlazeMessageFormatException if this type conversion is invalid.
+     * @param name
+     *            the name of the byte array
+     * @return the byte array value with the specified name; if there is no item
+     *         by this name, a null value is returned.
+     * @throws BlazeMessageFormatException
+     *             if this type conversion is invalid.
      */
-    public Buffer getBufferValue(String name) throws BlazeMessageFormatException{
+    public Buffer getBufferValue(String name) throws BlazeMessageFormatException {
         initializeReading();
         Object value = this.map.get(name);
         if (value instanceof Buffer) {
@@ -664,44 +710,50 @@
         }
         throw new BlazeMessageFormatException(" cannot read a Buffer from " + value.getClass().getName());
     }
-    
+
     /**
      * Returns the value of the object with the specified name.
      * <P>
-     * This method can be used to return, in objectified format, an object in the Java programming language ("Java
-     * object") that had been stored in the Map with the equivalent <CODE>setObject</CODE> method call, or its
+     * This method can be used to return, in objectified format, an object in
+     * the Java programming language ("Java object") that had been stored in the
+     * Map with the equivalent <CODE>setObject</CODE> method call, or its
      * equivalent primitive <CODE>set <I>type </I></CODE> method.
      * <P>
-     * Note that byte values are returned as <CODE>byte[]</CODE>, not <CODE>Byte[]</CODE>.
+     * Note that byte values are returned as <CODE>byte[]</CODE>, not
+     * <CODE>Byte[]</CODE>.
      * 
-     * @param name the name of the Java object
-     * @return a copy of the Java object value with the specified name, in objectified format (for example, if the
-     *         object was set as an <CODE>int</CODE>, an <CODE>Integer</CODE> is returned); if there is no item by
-     *         this name, a null value is returned
+     * @param name
+     *            the name of the Java object
+     * @return a copy of the Java object value with the specified name, in
+     *         objectified format (for example, if the object was set as an
+     *         <CODE>int</CODE>, an <CODE>Integer</CODE> is returned); if there
+     *         is no item by this name, a null value is returned
      */
-    public Object getObjectValue(String name){
+    public Object getObjectValue(String name) {
         initializeReading();
         return this.map.get(name);
     }
-    
+
     /**
-     * Returns an <CODE>Enumeration</CODE> of all the names in the <CODE>BlazeMessage</CODE> object.
+     * Returns an <CODE>Enumeration</CODE> of all the names in the
+     * <CODE>BlazeMessage</CODE> object.
      * 
      * @return an enumeration of all the names in this <CODE>BlazeMessage</CODE>
      */
-    public Enumeration<String> getNames(){
+    public Enumeration<String> getNames() {
         initializeReading();
         return Collections.enumeration(this.map.keySet());
     }
-    
+
     /**
      * put a key,value pair into the message
      * 
      * @param name
-     * @param value must be a supported primitive, or map of supported primitives
+     * @param value
+     *            must be a supported primitive, or map of supported primitives
      * @return the previous value associated with the key
      */
-    public Object put(String name,Object value){
+    public Object put(String name, Object value) {
         initializeWriting();
         if (name == null) {
             throw new IllegalArgumentException("The name of the property cannot be null.");
@@ -712,115 +764,137 @@
         checkValidObject(value);
         return this.map.put(name, value);
     }
-    
+
     /**
      * Sets a <CODE>boolean</CODE> value with the specified name into the Map.
      * 
-     * @param name the name of the <CODE>boolean</CODE>
-     * @param value the <CODE>boolean</CODE> value to set in the Map
+     * @param name
+     *            the name of the <CODE>boolean</CODE>
+     * @param value
+     *            the <CODE>boolean</CODE> value to set in the Map
      */
-    public void setBooleanValue(String name,boolean value){
+    public void setBooleanValue(String name, boolean value) {
         initializeWriting();
         put(name, value ? Boolean.TRUE : Boolean.FALSE);
     }
-    
+
     /**
      * Sets a <CODE>byte</CODE> value with the specified name into the Map.
      * 
-     * @param name the name of the <CODE>byte</CODE>
-     * @param value the <CODE>byte</CODE> value to set in the Map
+     * @param name
+     *            the name of the <CODE>byte</CODE>
+     * @param value
+     *            the <CODE>byte</CODE> value to set in the Map
      */
-    public void setByteValue(String name,byte value){
+    public void setByteValue(String name, byte value) {
         initializeWriting();
         put(name, Byte.valueOf(value));
     }
-    
+
     /**
      * Sets a <CODE>short</CODE> value with the specified name into the Map.
      * 
-     * @param name the name of the <CODE>short</CODE>
-     * @param value the <CODE>short</CODE> value to set in the Map
+     * @param name
+     *            the name of the <CODE>short</CODE>
+     * @param value
+     *            the <CODE>short</CODE> value to set in the Map
      */
-    public void setShortValue(String name,short value){
+    public void setShortValue(String name, short value) {
         initializeWriting();
         put(name, Short.valueOf(value));
     }
-    
+
     /**
      * Sets a Unicode character value with the specified name into the Map.
      * 
-     * @param name the name of the Unicode character
-     * @param value the Unicode character value to set in the Map
+     * @param name
+     *            the name of the Unicode character
+     * @param value
+     *            the Unicode character value to set in the Map
      */
-    public void setCharValue(String name,char value){
+    public void setCharValue(String name, char value) {
         initializeWriting();
         put(name, Character.valueOf(value));
     }
-    
+
     /**
      * Sets an <CODE>int</CODE> value with the specified name into the Map.
      * 
-     * @param name the name of the <CODE>int</CODE>
-     * @param value the <CODE>int</CODE> value to set in the Map
+     * @param name
+     *            the name of the <CODE>int</CODE>
+     * @param value
+     *            the <CODE>int</CODE> value to set in the Map
      */
-    public void setIntValue(String name,int value){
+    public void setIntValue(String name, int value) {
         initializeWriting();
         put(name, Integer.valueOf(value));
     }
-    
+
     /**
      * Sets a <CODE>long</CODE> value with the specified name into the Map.
      * 
-     * @param name the name of the <CODE>long</CODE>
-     * @param value the <CODE>long</CODE> value to set in the Map
+     * @param name
+     *            the name of the <CODE>long</CODE>
+     * @param value
+     *            the <CODE>long</CODE> value to set in the Map
      */
-    public void setLongValue(String name,long value){
+    public void setLongValue(String name, long value) {
         initializeWriting();
         put(name, Long.valueOf(value));
     }
-    
+
     /**
      * Sets a <CODE>float</CODE> value with the specified name into the Map.
      * 
-     * @param name the name of the <CODE>float</CODE>
-     * @param value the <CODE>float</CODE> value to set in the Map
+     * @param name
+     *            the name of the <CODE>float</CODE>
+     * @param value
+     *            the <CODE>float</CODE> value to set in the Map
      */
-    public void setFloatValue(String name,float value){
+    public void setFloatValue(String name, float value) {
         initializeWriting();
         put(name, new Float(value));
     }
-    
+
     /**
      * Sets a <CODE>double</CODE> value with the specified name into the Map.
      * 
-     * @param name the name of the <CODE>double</CODE>
-     * @param value the <CODE>double</CODE> value to set in the Map
+     * @param name
+     *            the name of the <CODE>double</CODE>
+     * @param value
+     *            the <CODE>double</CODE> value to set in the Map
      */
-    public void setDoubleValue(String name,double value){
+    public void setDoubleValue(String name, double value) {
         initializeWriting();
         put(name, new Double(value));
     }
-    
+
     /**
      * Sets a <CODE>String</CODE> value with the specified name into the Map.
      * 
-     * @param name the name of the <CODE>String</CODE>
-     * @param value the <CODE>String</CODE> value to set in the Map
+     * @param name
+     *            the name of the <CODE>String</CODE>
+     * @param value
+     *            the <CODE>String</CODE> value to set in the Map
      */
-    public void setStringValue(String name,String value){
+    public void setStringValue(String name, String value) {
         initializeWriting();
         put(name, value);
     }
-    
+
     /**
      * Sets a byte array value with the specified name into the Map.
      * 
-     * @param name the name of the byte array
-     * @param value the byte array value to set in the Map; the array is copied so that the value for <CODE>name </CODE>
-     *            will not be altered by future modifications
-     * @throws NullPointerException if the name is null, or if the name is an empty string.
+     * @param name
+     *            the name of the byte array
+     * @param value
+     *            the byte array value to set in the Map; the array is copied so
+     *            that the value for <CODE>name </CODE> will not be altered by
+     *            future modifications
+     * @throws NullPointerException
+     *             if the name is null, or if the name is an empty string.
      */
-    public void setBytesValue(String name,byte[] value){
+    public void setBytesValue(String name, byte[] value) {
         initializeWriting();
         if (value != null) {
             put(name, value);
@@ -828,15 +902,18 @@
             this.map.remove(name);
         }
     }
-    
+
     /**
      * Sets a Buffer value with the specified name into the Map.
      * 
-     * @param name the name of the byte array
-     * @param value the Buffer value to set in the Map
-     * @throws NullPointerException if the name is null, or if the name is an empty string.
+     * @param name
+     *            the name of the byte array
+     * @param value
+     *            the Buffer value to set in the Map
+     * @throws NullPointerException
+     *             if the name is null, or if the name is an empty string.
      */
-    public void setBufferValue(String name,Buffer value){
+    public void setBufferValue(String name, Buffer value) {
         initializeWriting();
         if (value != null) {
             put(name, value);
@@ -844,22 +921,27 @@
             this.map.remove(name);
         }
     }
-    
+
     /**
-     * Sets a portion of the byte array value with the specified name into the Map.
+     * Sets a portion of the byte array value with the specified name into the
+     * Map.
      * 
-     * @param name the name of the byte array
-     * @param value the byte array value to set in the Map
-     * @param offset the initial offset within the byte array
-     * @param length the number of bytes to use
+     * @param name
+     *            the name of the byte array
+     * @param value
+     *            the byte array value to set in the Map
+     * @param offset
+     *            the initial offset within the byte array
+     * @param length
+     *            the number of bytes to use
      */
-    public void setBytesValue(String name,byte[] value,int offset,int length){
+    public void setBytesValue(String name, byte[] value, int offset, int length) {
         initializeWriting();
         byte[] data = new byte[length];
         System.arraycopy(value, offset, data, 0, length);
         put(name, data);
     }
-    
+
     /**
      * Find out if the message contains a key This isn't recursive
      * 
@@ -867,11 +949,11 @@
      * @return true if the message contains the key
      * 
      */
-    public boolean containsKey(Object key){
+    public boolean containsKey(Object key) {
         initializeReading();
         return this.map.containsKey(key.toString());
     }
-    
+
     /**
      * Find out if the message contains a value
      * 
@@ -879,60 +961,61 @@
      * @return true if the value exists
      * 
      */
-    public boolean containsValue(Object value){
+    public boolean containsValue(Object value) {
         initializeReading();
         return this.map.containsValue(value);
     }
-    
+
     /**
      * @return a set of Map.Entry values
      * 
      */
-    public Set<java.util.Map.Entry<String, Object>> entrySet(){
+    public Set<java.util.Map.Entry<String, Object>> entrySet() {
         initializeReading();
         return this.map.entrySet();
     }
-    
+
     /**
      * Retrieve the object associated with the key
      * 
      * @param key
      * @return the object
      */
-    public Object get(Object key){
+    public Object get(Object key) {
         initializeReading();
         return getObjectValue(key.toString());
     }
-    
+
     /**
      * @return true if the message is empty
      * 
      */
-    public boolean isEmpty(){
+    public boolean isEmpty() {
         initializeReading();
         return this.map.isEmpty();
     }
-    
+
     /**
      * @return a Set of all the keys
      */
-    public Set<String> keySet(){
+    public Set<String> keySet() {
         initializeReading();
         return this.map.keySet();
     }
-    
+
     /**
      * Add all entries in a Map to the message
      * 
-     * @param t the map
+     * @param t
+     *            the map
      * 
      */
-    public void putAll(Map<? extends String, ? extends Object> t){
+    public void putAll(Map<? extends String, ? extends Object> t) {
         for (Map.Entry<? extends String, ? extends Object> entry : t.entrySet()) {
             put(entry.getKey(), entry.getValue());
         }
     }
-    
+
     /**
      * Remove a key/value pair from the message
      * 
@@ -940,46 +1023,46 @@
      * @return the value removed or null
      * 
      */
-    public Object remove(Object key){
+    public Object remove(Object key) {
         setContent(null);
         return this.map.remove(key.toString());
     }
-    
+
     /**
      * @return the number of entries in the message
      */
-    public int size(){
+    public int size() {
         initializeReading();
         return this.map.size();
     }
-    
+
     /**
      * @return a Collection of the values in the message
      */
-    public Collection<Object> values(){
+    public Collection<Object> values() {
         initializeReading();
         return this.map.values();
     }
-    
+
     /**
      * check if a named value exists in the message
      * 
      * @param name
      * @return true if value exits
      */
-    public boolean valueExists(String name){
+    public boolean valueExists(String name) {
         return this.map.containsKey(name);
     }
-    
-    protected void initializeReading(){
+
+    protected void initializeReading() {
         loadContent();
     }
-    
-    protected void initializeWriting(){
+
+    protected void initializeWriting() {
         setContent(null);
     }
-    
-    protected void checkValidObject(Object value) throws IllegalArgumentException{
+
+    protected void checkValidObject(Object value) throws IllegalArgumentException {
         boolean valid = value instanceof Boolean || value instanceof Byte || value instanceof Short
                 || value instanceof Integer || value instanceof Long;
         valid = valid || value instanceof Float || value instanceof Double || value instanceof Character
@@ -995,37 +1078,37 @@
             throw new IllegalArgumentException("Not a valid message value: " + value);
         }
     }
-    
+
     /**
      * @return pretty print
      * @see java.lang.Object#toString()
      */
-    public String toString(){
+    public String toString() {
         return super.toString() + "MQBlazeMessage{ " + "map = " + this.map + " }";
     }
-    
-    protected void copy(BlazeMessage copy) throws BlazeException{
+
+    protected void copy(BlazeMessage copy) throws BlazeException {
         storeContent();
         copy.content = this.content;
     }
-    
+
     /**
      * @return the content data
      */
-    public BlazeData getContent(){
+    public BlazeData getContent() {
         return this.content;
     }
-    
+
     /**
      * Set the content data
      * 
      * @param content
      */
-    public void setContent(BlazeData content){
+    public void setContent(BlazeData content) {
         this.content = content;
     }
-    
-    protected void marshallMap(MapDataBean mapData,String name,Object value) throws BlazeRuntimeException{
+
+    protected void marshallMap(MapDataBean mapData, String name, Object value) throws BlazeRuntimeException {
         if (value != null) {
             if (value.getClass() == Boolean.class) {
                 BoolTypeBean type = new BoolTypeBean();
@@ -1094,8 +1177,8 @@
             }
         }
     }
-    
-    protected Map<String, Object> unmarshall(MapData mapData){
+
+    protected Map<String, Object> unmarshall(MapData mapData) {
         Map<String, Object> result = new ConcurrentHashMap<String, Object>();
         if (mapData.hasBoolType()) {
             for (BoolType type : mapData.getBoolTypeList()) {
@@ -1160,71 +1243,85 @@
         }
         return result;
     }
-    
+
     /**
      * Store content into a BlazeData object for serialization
      */
-    public void storeContent(){
+    public void storeContent() {
         if (getContent() == null) {
             BlazeDataBean bd = new BlazeDataBean();
-            MapDataBean mapData = new MapDataBean();
-            for (Map.Entry<String, Object> entry : this.map.entrySet()) {
-                marshallMap(mapData, entry.getKey().toString(), entry.getValue());
+            if (!this.map.isEmpty()) {
+                MapDataBean mapData = new MapDataBean();
+                for (Map.Entry<String, Object> entry : this.map.entrySet()) {
+                    marshallMap(mapData, entry.getKey().toString(), entry.getValue());
+                }
+                bd.setMapData(mapData);
             }
-            bd.setMapData(mapData);
             if (this.replyTo != null) {
                 bd.setReplyToData(this.replyTo.getData());
             }
             if (this.messageId != null) {
-                bd.setMessageId(new Buffer(this.messageId));
+                bd.setMessageId(new UTF8Buffer(this.messageId));
             }
             if (this.correlationId != null) {
-                bd.setCorrelationId(new Buffer(this.correlationId));
+                bd.setCorrelationId(new UTF8Buffer(this.correlationId));
             }
             if (this.fromId != null) {
-                bd.setFromId(new Buffer(this.fromId));
+                bd.setFromId(new UTF8Buffer(this.fromId));
             }
             if (this.messageType != null) {
-                bd.setMessageType(new Buffer(this.messageType));
+                bd.setMessageType(new UTF8Buffer(this.messageType));
+            }
+            if (this.timeStamp > 0) {
+                bd.setTimestamp(this.timeStamp);
+            }
+            if (this.expiration > 0) {
+                bd.setExpiration(this.expiration);
+            }
+            if (this.redeliveryCounter > 0) {
+                bd.setRedeliveryCounter(this.redeliveryCounter);
+            }
+            if (this.priority > 0) {
+                bd.setPriority(this.priority);
+            }
+            if (this.persistent) {
+                bd.setPersistent(this.persistent);
             }
-            bd.setTimestamp(this.timeStamp);
-            bd.setExpiration(this.expiration);
-            bd.setRedeliveryCounter(this.redeliveryCounter);
-            bd.setPriority(this.priority);
-            bd.setPersistent(this.persistent);
             this.content = bd;
         }
     }
-    
+
     /**
      * Builds the message body from data
      * 
      */
-    protected void loadContent() throws BlazeRuntimeException{
-        BlazeData data = getContent();
-        if (data != null && this.map.isEmpty()) {
-            this.map = unmarshall(data.getMapData());
-            if (data.hasReplyToData()) {
-                this.replyTo = new Destination(data.getReplyToData());
-            }
-            if (data.hasFromId()) {
-                this.fromId = data.getFromId().toStringUtf8();
-            }
-            if (data.hasMessageId()) {
-                this.messageId = data.getMessageId().toStringUtf8();
-            }
-            if (data.hasCorrelationId()) {
-                this.correlationId = data.getCorrelationId().toStringUtf8();
-            }
-            if (data.hasMessageType()) {
-                this.messageType = data.getMessageType().toStringUtf8();
+    protected void loadContent() throws BlazeRuntimeException {
+        if (!this.loaded) {
+            this.loaded = true;
+            BlazeData data = getContent();
+            if (data != null && this.map.isEmpty()) {
+                this.map = unmarshall(data.getMapData());
+                if (data.hasReplyToData()) {
+                    this.replyTo = new Destination(data.getReplyToData());
+                }
+                if (data.hasFromId()) {
+                    this.fromId = data.getFromId().toStringUtf8();
+                }
+                if (data.hasMessageId()) {
+                    this.messageId = data.getMessageId().toStringUtf8();
+                }
+                if (data.hasCorrelationId()) {
+                    this.correlationId = data.getCorrelationId().toStringUtf8();
+                }
+                if (data.hasMessageType()) {
+                    this.messageType = data.getMessageType().toStringUtf8();
+                }
+                this.timeStamp = data.getTimestamp();
+                this.expiration = data.getExpiration();
+                this.redeliveryCounter = data.getRedeliveryCounter();
+                this.priority = data.getPriority();
+                this.persistent = data.getPersistent();
             }
-            this.timeStamp = data.getTimestamp();
-            this.expiration = data.getExpiration();
-            this.redeliveryCounter = data.getRedeliveryCounter();
-            this.priority = data.getPriority();
-            this.persistent = data.getPersistent();
         }
     }
-    
 }
\ No newline at end of file

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessageProcessor.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessageProcessor.java?rev=753178&r1=753177&r2=753178&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessageProcessor.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessageProcessor.java Fri Mar 13 09:04:23 2009
@@ -28,7 +28,8 @@
      * Process a PacketData of that is a BlazeMessage type
      * @param data
      * @return the built BlazeMessage
+     * @throws Exception 
      */
-    BlazeMessage processBlazeMessage(PacketData data);
+    BlazeMessage processBlazeMessage(PacketData data) throws Exception;
 
 }

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java?rev=753178&r1=753177&r2=753178&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java Fri Mar 13 09:04:23 2009
@@ -459,16 +459,11 @@
         if (member != null) {
             SendRequest<PacketDataBuffer> request = new SendRequest<PacketDataBuffer>();
             Destination dest = new Destination(destinationName, false);
-            message.setDestination(dest);
-            message.storeContent();
-            BlazeDataBuffer blazeData = message.getContent().freeze();
-            PacketDataBean packetData = getPacketData(MessageType.BLAZE_DATA, blazeData);
-            packetData.setDestinationData(dest.getData());
-            packetData.setPayloadType(message.getType());
+            Packet packet = buildPacket(dest, message);
             synchronized (this.messageRequests) {
-                this.messageRequests.put(packetData.getMessageId(), request);
+                this.messageRequests.put(packet.getPacketData().getMessageId(), request);
             }
-            Packet packet = new Packet(packetData.freeze());
+           
             packet.setTo((member).getAddress());
             this.unicast.downStream(packet);
             PacketDataBuffer response = request.get(timeout);
@@ -486,32 +481,15 @@
      *      org.apache.activeblaze.BlazeMessage, java.lang.String)
      */
     public void sendReply(Member to,BlazeMessage response,String correlationId) throws Exception{
-        response.storeContent();
-        Destination dest = response.getDestination();
-        BlazeDataBuffer blazeData = response.getContent().freeze();
-        PacketDataBean data = getPacketData(MessageType.BLAZE_DATA, blazeData);
-        data.setCorrelationId(new Buffer(correlationId));
-        if (dest != null) {
-            data.setDestinationData(dest.getData());
-        }
-        data.setPayloadType(response.getType());
-        data.setReliable(true);
-        Packet packet = new Packet(data.freeze());
+        Destination dest = new Destination(to.getInBoxDestination(), false);
+        Packet packet = buildPacket(dest,response,correlationId);
         packet.setTo(((MemberImpl) to).getAddress());
         this.unicast.downStream(packet);
     }
     
     protected void send(MemberImpl member,Buffer destinationName,BlazeMessage message) throws Exception{
         Destination dest = new Destination(destinationName, false);
-        message.setDestination(dest);
-        message.storeContent();
-        
-        PacketDataBean data = getPacketData(MessageType.BLAZE_DATA, message.getContent().freeze());
-        data.setReliable(true);
-        data.setResponseRequired(true);
-        data.setDestinationData(dest.getData());
-        data.setPayloadType(message.getType());
-        Packet packet = new Packet(data.freeze());
+        Packet packet = buildPacket(dest, message,true);
         packet.setTo(member.getAddress());
         this.unicast.downStream(packet);
     }

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/simple/SimpleReliableProcessor.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/simple/SimpleReliableProcessor.java?rev=753178&r1=753177&r2=753178&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/simple/SimpleReliableProcessor.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/simple/SimpleReliableProcessor.java Fri Mar 13 09:04:23 2009
@@ -17,21 +17,63 @@
 package org.apache.activeblaze.impl.reliable.simple;
 
 import org.apache.activeblaze.impl.processor.DefaultChainedProcessor;
-
+import org.apache.activeblaze.impl.processor.Packet;
 /**
  * Very basic (none) reliability
- *
+ * 
  */
-public class SimpleReliableProcessor extends DefaultChainedProcessor{
-    
-   private SimpleFlow simpleFlow;
-   
-   /**
+public class SimpleReliableProcessor extends DefaultChainedProcessor {
+    int maxWindowSize = 64 * 1024;
+    int windowSize = 0;
+    int pauseTime = 0;
+
+    /**
      * Constructor
      */
     public SimpleReliableProcessor() {
-       this.simpleFlow=new SimpleFlow();
-       //setEnd(this.simpleFlow);
-   }
-   
+    }
+
+    /**
+     * @param p
+     * @throws Exception
+     * @see org.apache.activeblaze.impl.processor.DefaultChainedProcessor#downStream(org.apache.activeblaze.impl.processor.Packet)
+     */
+    public void downStream(Packet p) throws Exception {
+        this.windowSize += p.getPacketData().serializedSizeFramed();
+        if (this.windowSize >= this.maxWindowSize) {
+            Thread.sleep(this.pauseTime);
+            this.windowSize = 0;
+        }
+        super.downStream(p);
+    }
+
+    /**
+     * @return the maxWindowSize
+     */
+    public int getMaxWindowSize() {
+        return this.maxWindowSize;
+    }
+
+    /**
+     * @param maxWindowSize
+     *            the maxWindowSize to set
+     */
+    public void setMaxWindowSize(int maxWindowSize) {
+        this.maxWindowSize = maxWindowSize;
+    }
+
+    /**
+     * @return the pauseTime
+     */
+    public int getPauseTime() {
+        return this.pauseTime;
+    }
+
+    /**
+     * @param pauseTime
+     *            the pauseTime to set
+     */
+    public void setPauseTime(int pauseTime) {
+        this.pauseTime = pauseTime;
+    }
 }

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnection.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnection.java?rev=753178&r1=753177&r2=753178&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnection.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnection.java Fri Mar 13 09:04:23 2009
@@ -18,7 +18,6 @@
 
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
-
 import javax.jms.Connection;
 import javax.jms.ConnectionConsumer;
 import javax.jms.ConnectionMetaData;
@@ -34,20 +33,24 @@
 import javax.jms.Topic;
 import javax.jms.TopicConnection;
 import javax.jms.TopicSession;
-
 import org.apache.activeblaze.BlazeMessageListener;
 import org.apache.activeblaze.BlazeMessageProcessor;
 import org.apache.activeblaze.Subscription;
 import org.apache.activeblaze.group.BlazeGroupChannel;
 import org.apache.activeblaze.jms.message.BlazeJmsMessage;
+import org.apache.activeblaze.jms.message.BlazeJmsMessageTransformation;
 import org.apache.activeblaze.util.IdGenerator;
+import org.apache.activeblaze.wire.DestinationData;
 import org.apache.activeblaze.wire.PacketData;
+import org.apache.activeblaze.wire.BlazeData.BlazeDataBuffer;
+import org.apache.activemq.protobuf.Buffer;
+import org.apache.activemq.protobuf.InvalidProtocolBufferException;
 /**
  * Implementation of a JMS Connection
  * 
  */
 public class BlazeJmsConnection implements Connection, TopicConnection, QueueConnection,
-        org.apache.activeblaze.ExceptionListener,BlazeMessageProcessor{
+        org.apache.activeblaze.ExceptionListener, BlazeMessageProcessor {
     protected final BlazeGroupChannel channel;
     protected final IdGenerator tempDestinationGenerator = new IdGenerator("");
     private String clientId;
@@ -60,7 +63,7 @@
     protected BlazeJmsConnection(BlazeGroupChannel channel) {
         this.channel = channel;
         this.channel.setExceptionListener(this);
-        this.clientId = channel.getName(); 
+        this.clientId = channel.getName();
         this.channel.setBlazeMessageProcessor(this);
     }
 
@@ -333,9 +336,9 @@
     protected void removeMesssageDispatcher(BlazeMessageListener consumer, Subscription s) throws JMSException {
         try {
             if (s.isTopic()) {
-                this.channel.removeBlazeTopicMessageListener(s,consumer);
+                this.channel.removeBlazeTopicMessageListener(s, consumer);
             } else {
-                this.channel.removeBlazeQueueMessageListener(s,consumer);
+                this.channel.removeBlazeQueueMessageListener(s, consumer);
             }
         } catch (Exception e) {
             throw BlazeJmsExceptionSupport.create(e);
@@ -362,28 +365,34 @@
     }
 
     /**
-     * @param data 
+     * @param data
      * @return a BlazeMessage
+     * @throws Exception
      * 
      */
-    public BlazeJmsMessage processBlazeMessage(PacketData data){
+    public BlazeJmsMessage processBlazeMessage(PacketData data) throws Exception {
         BlazeJmsMessage result = null;
-        /*
-        int type = message.getType();
-        if (type == BlazeJmsMessage.JmsMessageType.BYTES.ordinal()) {
-            result = new BlazeJmsBytesMessage();
-        } else if (type == BlazeJmsMessage.JmsMessageType.MAP.ordinal()) {
-            result = new BlazeJmsMapMessage();
-        } else if (type == BlazeJmsMessage.JmsMessageType.OBJECT.ordinal()) {
-            result = new BlazeJmsObjectMessage();
-        } else if (type == BlazeJmsMessage.JmsMessageType.STREAM.ordinal()) {
-            result = new BlazeJmsStreamMessage();
-        } else if (type == BlazeJmsMessage.JmsMessageType.TEXT.ordinal()) {
-            result = new BlazeJmsTextMessage();
-        } else {
-            result = new BlazeJmsMessage();
+        if (data != null) {
+            DestinationData destination = data.getDestinationData();
+            Buffer payload = data.getPayload();
+            BlazeDataBuffer blazeData = BlazeDataBuffer.parseUnframed(payload);
+            String fromId = null;
+            if (data.hasProducerId()) {
+                fromId = data.getProducerId().toStringUtf8();
+            }
+            result = BlazeJmsMessageTransformation.createMessage(data.getPayloadType());
+            result.setDestination(destination);
+            result.setFromId(fromId);
+            if (data.hasMessageId()) {
+                result.setMessageId(data.getMessageId().toStringUtf8());
+            }
+            if (data.hasCorrelationId()) {
+                result.setCorrelationId(data.getCorrelationId().toStringUtf8());
+            }
+            result.setTimeStamp(blazeData.getTimestamp());
+            result.setType(data.getPayloadType());
+            result.setContent(blazeData);
         }
-        */
         return result;
     }
 }

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsMessageTransformation.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsMessageTransformation.java?rev=753178&r1=753177&r2=753178&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsMessageTransformation.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsMessageTransformation.java Fri Mar 13 09:04:23 2009
@@ -17,7 +17,6 @@
 package org.apache.activeblaze.jms.message;
 
 import java.util.Enumeration;
-
 import javax.jms.BytesMessage;
 import javax.jms.Destination;
 import javax.jms.JMSException;
@@ -27,34 +26,33 @@
 import javax.jms.ObjectMessage;
 import javax.jms.StreamMessage;
 import javax.jms.TextMessage;
-
 import org.apache.activeblaze.BlazeMessage;
 import org.apache.activeblaze.jms.BlazeJmsDestination;
-
 /**
- * A helper class for converting normal JMS interfaces into ActiveMQ specific ones.
+ * A helper class for converting normal JMS interfaces into ActiveMQ specific
+ * ones.
  * 
  * @version $Revision: 1.1 $
  */
-public final class BlazeJmsMessageTransformation{
+public final class BlazeJmsMessageTransformation {
     private BlazeJmsMessageTransformation() {
     }
-    
+
     /**
      * @param dest
      * @return a BlazeJmsDestination
      * @throws JMSException
      */
-    private static BlazeJmsDestination transformDestination(Destination dest) throws JMSException{
+    private static BlazeJmsDestination transformDestination(Destination dest) throws JMSException {
         return BlazeJmsDestination.transform(dest);
     }
-    
+
     /**
      * @param message
      * @return a BlazeJmsMessage
      * @throws JMSException
      */
-    public static BlazeJmsMessage transformMessage(BlazeMessage message) throws JMSException{
+    public static BlazeJmsMessage transformMessage(BlazeMessage message) throws JMSException {
         BlazeJmsMessage result = null;
         if (message instanceof BlazeJmsMessage) {
             result = (BlazeJmsMessage) message;
@@ -77,13 +75,13 @@
         }
         return result;
     }
-    
+
     /**
      * @param message
      * @return a BlazeJmsDestination
      * @throws JMSException
      */
-    public static BlazeJmsMessage transformMessage(Message message) throws JMSException{
+    public static BlazeJmsMessage transformMessage(Message message) throws JMSException {
         if (message instanceof BlazeJmsMessage) {
             return (BlazeJmsMessage) message;
         }
@@ -143,15 +141,18 @@
         copyProperties(message, transformedMessage);
         return transformedMessage;
     }
-    
+
     /**
-     * Copies the standard JMS and user defined properties from the givem message to the specified message
+     * Copies the standard JMS and user defined properties from the givem
+     * message to the specified message
      * 
-     * @param fromMessage the message to take the properties from
-     * @param toMessage the message to add the properties to
+     * @param fromMessage
+     *            the message to take the properties from
+     * @param toMessage
+     *            the message to add the properties to
      * @throws JMSException
      */
-    public static void copyProperties(Message fromMessage,Message toMessage) throws JMSException{
+    public static void copyProperties(Message fromMessage, Message toMessage) throws JMSException {
         toMessage.setJMSMessageID(fromMessage.getJMSMessageID());
         toMessage.setJMSCorrelationID(fromMessage.getJMSCorrelationID());
         toMessage.setJMSReplyTo(transformDestination(fromMessage.getJMSReplyTo()));
@@ -169,4 +170,27 @@
             toMessage.setObjectProperty(name, obj);
         }
     }
+
+    /**
+     * @param type
+     * @return a BlazeJmsMessage
+     */
+    public static BlazeJmsMessage createMessage(int type) {
+        if (type == BlazeJmsMessage.JmsMessageType.BYTES.ordinal()) {
+            return new BlazeJmsBytesMessage();
+        }
+        if (type == BlazeJmsMessage.JmsMessageType.MAP.ordinal()) {
+            return new BlazeJmsMapMessage();
+        }
+        if (type == BlazeJmsMessage.JmsMessageType.OBJECT.ordinal()) {
+            return new BlazeJmsObjectMessage();
+        }
+        if (type == BlazeJmsMessage.JmsMessageType.STREAM.ordinal()) {
+            return new BlazeJmsStreamMessage();
+        }
+        if (type == BlazeJmsMessage.JmsMessageType.TEXT.ordinal()) {
+            return new BlazeJmsTextMessage();
+        }
+        return new BlazeJmsMessage();
+    }
 }