You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2009/02/02 00:35:56 UTC

svn commit: r739885 [4/5] - in /activemq/activemq-blaze/trunk/src: main/java/org/apache/activeblaze/ main/java/org/apache/activeblaze/group/ main/java/org/apache/activeblaze/impl/transport/ main/java/org/apache/activeblaze/jms/ main/java/org/apache/act...

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsObjectMessage.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsObjectMessage.java?rev=739885&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsObjectMessage.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsObjectMessage.java Sun Feb  1 23:35:54 2009
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.jms.message;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import javax.jms.JMSException;
+import javax.jms.ObjectMessage;
+import org.apache.activeblaze.BlazeException;
+import org.apache.activeblaze.BlazeRuntimeException;
+import org.apache.activeblaze.jms.BlazeJmsExceptionSupport;
+import org.apache.activeblaze.util.ClassLoadingAwareObjectInputStream;
+import org.apache.activemq.protobuf.Buffer;
+import org.apache.activemq.protobuf.BufferInputStream;
+import org.apache.activemq.protobuf.BufferOutputStream;
+
+/**
+ * An <CODE>ObjectMessage</CODE> object is used to send a message that contains a serializable object in the Java
+ * programming language ("Java object"). It inherits from the <CODE>Message</CODE> interface and adds a body
+ * containing a single reference to an object. Only <CODE>Serializable</CODE> Java objects can be used. <p/>
+ * <P>
+ * If a collection of Java objects must be sent, one of the <CODE>Collection</CODE> classes provided since JDK 1.2 can
+ * be used. <p/>
+ * <P>
+ * When a client receives an <CODE>ObjectMessage</CODE>, it is in read-only mode. If a client attempts to write to
+ * the message at this point, a <CODE>MessageNotWriteableException</CODE> is thrown. If <CODE>clearBody</CODE> is
+ * called, the message can now be both read from and written to.
+ * 
+ * @see javax.jms.Session#createObjectMessage()
+ * @see javax.jms.Session#createObjectMessage(Serializable)
+ * @see javax.jms.BytesMessage
+ * @see javax.jms.MapMessage
+ * @see javax.jms.Message
+ * @see javax.jms.StreamMessage
+ * @see javax.jms.TextMessage
+ */
+public class BlazeJmsObjectMessage extends BlazeJmsMessage implements ObjectMessage {
+    static final ClassLoader JMS_OBJECT_CLASS_LOADER = BlazeJmsObjectMessage.class.getClassLoader();
+    protected transient Serializable object;
+
+    public BlazeJmsObjectMessage clone() {
+        BlazeJmsObjectMessage copy = new BlazeJmsObjectMessage();
+        try {
+            copy(copy);
+        } catch (BlazeException e) {
+            throw new BlazeRuntimeException(e);
+        }
+        return copy;
+    }
+
+    /**
+     * @param copy
+     * @throws BlazeException
+     */
+    protected void copy(BlazeJmsObjectMessage copy) throws BlazeException {
+        storeContent();
+        super.copy(copy);
+        copy.object = null;
+    }
+
+    public void storeContent() {
+        super.storeContent();
+        Buffer payload = getContent().getPayload();
+        if (payload == null && this.object != null) {
+            try {
+                BufferOutputStream os = new BufferOutputStream(128);
+                DataOutputStream dataOut = new DataOutputStream(os);
+                ObjectOutputStream objOut = new ObjectOutputStream(dataOut);
+                objOut.writeObject(this.object);
+                objOut.flush();
+                objOut.reset();
+                objOut.close();
+                payload = os.toBuffer();
+                getContent().setPayload(payload);
+            } catch (IOException ioe) {
+                throw new RuntimeException(ioe.getMessage(), ioe);
+            }
+        }
+    }
+
+    /**
+     * Clears out the message body. Clearing a message's body does not clear its header values or property entries. <p/>
+     * <P>
+     * If this message body was read-only, calling this method leaves the message body in the same state as an empty
+     * body in a newly created message.
+     * 
+     * @throws JMSException
+     *             if the JMS provider fails to clear the message body due to some internal error.
+     */
+    public void clearBody() throws JMSException {
+        super.clearBody();
+        this.object = null;
+    }
+
+    /**
+     * Sets the serializable object containing this message's data. It is important to note that an
+     * <CODE>ObjectMessage</CODE> contains a snapshot of the object at the time <CODE>setObject()</CODE> is called;
+     * subsequent modifications of the object will have no effect on the <CODE>ObjectMessage</CODE> body.
+     * 
+     * @param newObject
+     * 
+     */
+    public void setObject(Serializable newObject) {
+        this.object = newObject;
+        setContent(null);
+        storeContent();
+    }
+
+    /**
+     * Gets the serializable object containing this message's data. The default value is null.
+     * 
+     * @return the serializable object containing this message's data
+     * @throws JMSException
+     */
+    public Serializable getObject() throws JMSException {
+        if (this.object == null && getContent() != null && getContent().getPayload() != null) {
+            try {
+                Buffer payload = getContent().getPayload();
+                InputStream is = new BufferInputStream(payload);
+                DataInputStream dataIn = new DataInputStream(is);
+                ClassLoadingAwareObjectInputStream objIn = new ClassLoadingAwareObjectInputStream(dataIn);
+                try {
+                    this.object = (Serializable) objIn.readObject();
+                } catch (ClassNotFoundException ce) {
+                    throw new IOException(ce.getMessage());
+                }
+                dataIn.close();
+            } catch (IOException e) {
+                throw BlazeJmsExceptionSupport.create("Failed to build body from bytes. Reason: " + e, e);
+            }
+        }
+        return this.object;
+    }
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsObjectMessage.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsObjectMessage.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsStreamMessage.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsStreamMessage.java?rev=739885&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsStreamMessage.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsStreamMessage.java Sun Feb  1 23:35:54 2009
@@ -0,0 +1,1109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.jms.message;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import javax.jms.JMSException;
+import javax.jms.MessageEOFException;
+import javax.jms.MessageFormatException;
+import javax.jms.MessageNotReadableException;
+import javax.jms.MessageNotWriteableException;
+import javax.jms.StreamMessage;
+import org.apache.activeblaze.BlazeException;
+import org.apache.activeblaze.BlazeRuntimeException;
+import org.apache.activeblaze.jms.BlazeJmsExceptionSupport;
+import org.apache.activeblaze.wire.BlazeData;
+import org.apache.activemq.protobuf.Buffer;
+import org.apache.activemq.protobuf.BufferInputStream;
+
+/**
+ * A <CODE>StreamMessage</CODE> object is used to send a stream of primitive types in the Java programming language.
+ * It is filled and read sequentially. It inherits from the <CODE>Message</CODE> interface and adds a stream message
+ * body. Its methods are based largely on those found in <CODE>java.io.DataInputStream</CODE> and
+ * <CODE>java.io.DataOutputStream</CODE>. <p/>
+ * <P>
+ * The primitive types can be read or written explicitly using methods for each type. They may also be read or written
+ * generically as objects. For instance, a call to <CODE>StreamMessage.writeInt(6)</CODE> is equivalent to
+ * <CODE>StreamMessage.writeObject(new
+ * Integer(6))</CODE>. Both forms are provided, because the explicit form is
+ * convenient for static programming, and the object form is needed when types are not known at compile time. <p/>
+ * <P>
+ * When the message is first created, and when <CODE>clearBody</CODE> is called, the body of the message is in
+ * write-only mode. After the first call to <CODE>reset</CODE> has been made, the message body is in read-only mode.
+ * After a message has been sent, the client that sent it can retain and modify it without affecting the message that
+ * has been sent. The same message object can be sent multiple times. When a message has been received, the provider has
+ * called <CODE>reset</CODE> so that the message body is in read-only mode for the client. <p/>
+ * <P>
+ * If <CODE>clearBody</CODE> is called on a message in read-only mode, the message body is cleared and the message
+ * body is in write-only mode. <p/>
+ * <P>
+ * If a client attempts to read a message in write-only mode, a <CODE>MessageNotReadableException</CODE> is thrown.
+ * <p/>
+ * <P>
+ * If a client attempts to write a message in read-only mode, a <CODE>MessageNotWriteableException</CODE> is thrown.
+ * <p/>
+ * <P>
+ * <CODE>StreamMessage</CODE> objects support the following conversion table. The marked cases must be supported. The
+ * unmarked cases must throw a <CODE>JMSException</CODE>. The <CODE>String</CODE>-to-primitive conversions may
+ * throw a runtime exception if the primitive's <CODE>valueOf()</CODE> method does not accept it as a valid
+ * <CODE>String</CODE> representation of the primitive. <p/>
+ * <P>
+ * A value written as the row type can be read as the column type. <p/>
+ * 
+ * <PRE>
+ *  | | boolean byte short char int long float double String byte[]
+ * |----------------------------------------------------------------------
+ * |boolean | X X |byte | X X X X X |short | X X X X |char | X X |int | X X X
+ * |long | X X |float | X X X |double | X X |String | X X X X X X X X |byte[] |
+ * X |----------------------------------------------------------------------
+ * 
+ * </PRE>
+ * 
+ * <p/>
+ * <P>
+ * Attempting to read a null value as a primitive type must be treated as calling the primitive's corresponding
+ * <code>valueOf(String)</code> conversion method with a null value. Since <code>char</code> does not support a
+ * <code>String</code> conversion, attempting to read a null value as a <code>char</code> must throw a
+ * <code>NullPointerException</code>.
+ * 
+ * 
+ * @see javax.jms.Session#createStreamMessage()
+ * @see javax.jms.BytesMessage
+ * @see javax.jms.MapMessage
+ * @see javax.jms.Message
+ * @see javax.jms.ObjectMessage
+ * @see javax.jms.TextMessage
+ */
+public class BlazeJmsStreamMessage extends BlazeJmsMessage implements StreamMessage {
+    protected transient DataOutputStream dataOut;
+    protected transient ByteArrayOutputStream bytesOut;
+    protected transient DataInputStream dataIn;
+    protected transient int remainingBytes = -1;
+
+    public BlazeJmsStreamMessage clone() {
+        BlazeJmsStreamMessage copy = new BlazeJmsStreamMessage();
+        try {
+            copy(copy);
+        } catch (BlazeException e) {
+            throw new BlazeRuntimeException(e);
+        }
+        return copy;
+    }
+
+    /**
+     * @param copy
+     * @throws BlazeException
+     */
+    protected void copy(BlazeJmsStreamMessage copy) throws BlazeException {
+        storeContent();
+        super.copy(copy);
+        copy.dataOut = null;
+        copy.bytesOut = null;
+        copy.dataIn = null;
+    }
+
+    public void storeContent() {
+        super.storeContent();
+        if (this.dataOut != null && getContent() != null) {
+            try {
+                this.dataOut.close();
+                Buffer buffer = new Buffer(bytesOut.toByteArray());
+                getContent().setPayload(buffer);
+                this.bytesOut = null;
+                this.dataOut = null;
+            } catch (IOException ioe) {
+                throw new RuntimeException(ioe);
+            }
+        }
+    }
+
+    protected void loadContent() {
+        super.loadContent();
+        if (this.dataIn == null) {
+            BlazeData bd = getContent();
+            Buffer data = null;
+            if (bd == null || bd.getPayload() == null) {
+                data = bd.getPayload();
+                if (data == null) {
+                    data = new Buffer(new byte[0]);
+                }
+            }
+            if (data == null) {
+                data = bd.getPayload();
+            }
+            BufferInputStream is = new BufferInputStream(data);
+            this.dataIn = new DataInputStream(is);
+        }
+    }
+
+    /**
+     * Clears out the message body. Clearing a message's body does not clear its header values or property entries. <p/>
+     * <P>
+     * If this message body was read-only, calling this method leaves the message body in the same state as an empty
+     * body in a newly created message.
+     * 
+     * @throws JMSException
+     *             if the JMS provider fails to clear the message body due to some internal error.
+     */
+    public void clearBody() throws JMSException {
+        super.clearBody();
+        this.dataOut = null;
+        this.dataIn = null;
+        this.bytesOut = null;
+        this.remainingBytes = -1;
+    }
+
+    /**
+     * Reads a <code>boolean</code> from the stream message.
+     * 
+     * @return the <code>boolean</code> value read
+     * @throws JMSException
+     *             if the JMS provider fails to read the message due to some internal error.
+     * @throws MessageEOFException
+     *             if unexpected end of message stream has been reached.
+     * @throws MessageFormatException
+     *             if this type conversion is invalid.
+     * @throws MessageNotReadableException
+     *             if the message is in write-only mode.
+     */
+    public boolean readBoolean() throws JMSException {
+        initializeReading();
+        try {
+            this.dataIn.mark(10);
+            int type = this.dataIn.read();
+            if (type == -1) {
+                throw new MessageEOFException("reached end of data");
+            }
+            if (type == MarshallingSupport.BOOLEAN_TYPE) {
+                return this.dataIn.readBoolean();
+            }
+            if (type == MarshallingSupport.STRING_TYPE) {
+                return Boolean.valueOf(this.dataIn.readUTF()).booleanValue();
+            }
+            if (type == MarshallingSupport.NULL) {
+                this.dataIn.reset();
+                throw new NullPointerException("Cannot convert NULL value to boolean.");
+            }
+            this.dataIn.reset();
+            throw new MessageFormatException(" not a boolean type");
+        } catch (EOFException e) {
+            throw BlazeJmsExceptionSupport.createMessageEOFException(e);
+        } catch (IOException e) {
+            throw BlazeJmsExceptionSupport.createMessageFormatException(e);
+        }
+    }
+
+    /**
+     * Reads a <code>byte</code> value from the stream message.
+     * 
+     * @return the next byte from the stream message as a 8-bit <code>byte</code>
+     * @throws JMSException
+     *             if the JMS provider fails to read the message due to some internal error.
+     * @throws MessageEOFException
+     *             if unexpected end of message stream has been reached.
+     * @throws MessageFormatException
+     *             if this type conversion is invalid.
+     * @throws MessageNotReadableException
+     *             if the message is in write-only mode.
+     */
+    public byte readByte() throws JMSException {
+        initializeReading();
+        try {
+            this.dataIn.mark(10);
+            int type = this.dataIn.read();
+            if (type == -1) {
+                throw new MessageEOFException("reached end of data");
+            }
+            if (type == MarshallingSupport.BYTE_TYPE) {
+                return this.dataIn.readByte();
+            }
+            if (type == MarshallingSupport.STRING_TYPE) {
+                return Byte.valueOf(this.dataIn.readUTF()).byteValue();
+            }
+            if (type == MarshallingSupport.NULL) {
+                this.dataIn.reset();
+                throw new NullPointerException("Cannot convert NULL value to byte.");
+            } else {
+                this.dataIn.reset();
+                throw new MessageFormatException(" not a byte type");
+            }
+        } catch (NumberFormatException mfe) {
+            try {
+                this.dataIn.reset();
+            } catch (IOException ioe) {
+                throw BlazeJmsExceptionSupport.create(ioe);
+            }
+            throw mfe;
+        } catch (EOFException e) {
+            throw BlazeJmsExceptionSupport.createMessageEOFException(e);
+        } catch (IOException e) {
+            throw BlazeJmsExceptionSupport.createMessageFormatException(e);
+        }
+    }
+
+    /**
+     * Reads a 16-bit integer from the stream message.
+     * 
+     * @return a 16-bit integer from the stream message
+     * @throws JMSException
+     *             if the JMS provider fails to read the message due to some internal error.
+     * @throws MessageEOFException
+     *             if unexpected end of message stream has been reached.
+     * @throws MessageFormatException
+     *             if this type conversion is invalid.
+     * @throws MessageNotReadableException
+     *             if the message is in write-only mode.
+     */
+    public short readShort() throws JMSException {
+        initializeReading();
+        try {
+            this.dataIn.mark(17);
+            int type = this.dataIn.read();
+            if (type == -1) {
+                throw new MessageEOFException("reached end of data");
+            }
+            if (type == MarshallingSupport.SHORT_TYPE) {
+                return this.dataIn.readShort();
+            }
+            if (type == MarshallingSupport.BYTE_TYPE) {
+                return this.dataIn.readByte();
+            }
+            if (type == MarshallingSupport.STRING_TYPE) {
+                return Short.valueOf(this.dataIn.readUTF()).shortValue();
+            }
+            if (type == MarshallingSupport.NULL) {
+                this.dataIn.reset();
+                throw new NullPointerException("Cannot convert NULL value to short.");
+            } else {
+                this.dataIn.reset();
+                throw new MessageFormatException(" not a short type");
+            }
+        } catch (NumberFormatException mfe) {
+            try {
+                this.dataIn.reset();
+            } catch (IOException ioe) {
+                throw BlazeJmsExceptionSupport.create(ioe);
+            }
+            throw mfe;
+        } catch (EOFException e) {
+            throw BlazeJmsExceptionSupport.createMessageEOFException(e);
+        } catch (IOException e) {
+            throw BlazeJmsExceptionSupport.createMessageFormatException(e);
+        }
+    }
+
+    /**
+     * Reads a Unicode character value from the stream message.
+     * 
+     * @return a Unicode character from the stream message
+     * @throws JMSException
+     *             if the JMS provider fails to read the message due to some internal error.
+     * @throws MessageEOFException
+     *             if unexpected end of message stream has been reached.
+     * @throws MessageFormatException
+     *             if this type conversion is invalid
+     * @throws MessageNotReadableException
+     *             if the message is in write-only mode.
+     */
+    public char readChar() throws JMSException {
+        initializeReading();
+        try {
+            this.dataIn.mark(17);
+            int type = this.dataIn.read();
+            if (type == -1) {
+                throw new MessageEOFException("reached end of data");
+            }
+            if (type == MarshallingSupport.CHAR_TYPE) {
+                return this.dataIn.readChar();
+            }
+            if (type == MarshallingSupport.NULL) {
+                this.dataIn.reset();
+                throw new NullPointerException("Cannot convert NULL value to char.");
+            } else {
+                this.dataIn.reset();
+                throw new MessageFormatException(" not a char type");
+            }
+        } catch (NumberFormatException mfe) {
+            try {
+                this.dataIn.reset();
+            } catch (IOException ioe) {
+                throw BlazeJmsExceptionSupport.create(ioe);
+            }
+            throw mfe;
+        } catch (EOFException e) {
+            throw BlazeJmsExceptionSupport.createMessageEOFException(e);
+        } catch (IOException e) {
+            throw BlazeJmsExceptionSupport.createMessageFormatException(e);
+        }
+    }
+
+    /**
+     * Reads a 32-bit integer from the stream message.
+     * 
+     * @return a 32-bit integer value from the stream message, interpreted as an <code>int</code>
+     * @throws JMSException
+     *             if the JMS provider fails to read the message due to some internal error.
+     * @throws MessageEOFException
+     *             if unexpected end of message stream has been reached.
+     * @throws MessageFormatException
+     *             if this type conversion is invalid.
+     * @throws MessageNotReadableException
+     *             if the message is in write-only mode.
+     */
+    public int readInt() throws JMSException {
+        initializeReading();
+        try {
+            this.dataIn.mark(33);
+            int type = this.dataIn.read();
+            if (type == -1) {
+                throw new MessageEOFException("reached end of data");
+            }
+            if (type == MarshallingSupport.INTEGER_TYPE) {
+                return this.dataIn.readInt();
+            }
+            if (type == MarshallingSupport.SHORT_TYPE) {
+                return this.dataIn.readShort();
+            }
+            if (type == MarshallingSupport.BYTE_TYPE) {
+                return this.dataIn.readByte();
+            }
+            if (type == MarshallingSupport.STRING_TYPE) {
+                return Integer.valueOf(this.dataIn.readUTF()).intValue();
+            }
+            if (type == MarshallingSupport.NULL) {
+                this.dataIn.reset();
+                throw new NullPointerException("Cannot convert NULL value to int.");
+            } else {
+                this.dataIn.reset();
+                throw new MessageFormatException(" not an int type");
+            }
+        } catch (NumberFormatException mfe) {
+            try {
+                this.dataIn.reset();
+            } catch (IOException ioe) {
+                throw BlazeJmsExceptionSupport.create(ioe);
+            }
+            throw mfe;
+        } catch (EOFException e) {
+            throw BlazeJmsExceptionSupport.createMessageEOFException(e);
+        } catch (IOException e) {
+            throw BlazeJmsExceptionSupport.createMessageFormatException(e);
+        }
+    }
+
+    /**
+     * Reads a 64-bit integer from the stream message.
+     * 
+     * @return a 64-bit integer value from the stream message, interpreted as a <code>long</code>
+     * @throws JMSException
+     *             if the JMS provider fails to read the message due to some internal error.
+     * @throws MessageEOFException
+     *             if unexpected end of message stream has been reached.
+     * @throws MessageFormatException
+     *             if this type conversion is invalid.
+     * @throws MessageNotReadableException
+     *             if the message is in write-only mode.
+     */
+    public long readLong() throws JMSException {
+        initializeReading();
+        try {
+            this.dataIn.mark(65);
+            int type = this.dataIn.read();
+            if (type == -1) {
+                throw new MessageEOFException("reached end of data");
+            }
+            if (type == MarshallingSupport.LONG_TYPE) {
+                return this.dataIn.readLong();
+            }
+            if (type == MarshallingSupport.INTEGER_TYPE) {
+                return this.dataIn.readInt();
+            }
+            if (type == MarshallingSupport.SHORT_TYPE) {
+                return this.dataIn.readShort();
+            }
+            if (type == MarshallingSupport.BYTE_TYPE) {
+                return this.dataIn.readByte();
+            }
+            if (type == MarshallingSupport.STRING_TYPE) {
+                return Long.valueOf(this.dataIn.readUTF()).longValue();
+            }
+            if (type == MarshallingSupport.NULL) {
+                this.dataIn.reset();
+                throw new NullPointerException("Cannot convert NULL value to long.");
+            } else {
+                this.dataIn.reset();
+                throw new MessageFormatException(" not a long type");
+            }
+        } catch (NumberFormatException mfe) {
+            try {
+                this.dataIn.reset();
+            } catch (IOException ioe) {
+                throw BlazeJmsExceptionSupport.create(ioe);
+            }
+            throw mfe;
+        } catch (EOFException e) {
+            throw BlazeJmsExceptionSupport.createMessageEOFException(e);
+        } catch (IOException e) {
+            throw BlazeJmsExceptionSupport.createMessageFormatException(e);
+        }
+    }
+
+    /**
+     * Reads a <code>float</code> from the stream message.
+     * 
+     * @return a <code>float</code> value from the stream message
+     * @throws JMSException
+     *             if the JMS provider fails to read the message due to some internal error.
+     * @throws MessageEOFException
+     *             if unexpected end of message stream has been reached.
+     * @throws MessageFormatException
+     *             if this type conversion is invalid.
+     * @throws MessageNotReadableException
+     *             if the message is in write-only mode.
+     */
+    public float readFloat() throws JMSException {
+        initializeReading();
+        try {
+            this.dataIn.mark(33);
+            int type = this.dataIn.read();
+            if (type == -1) {
+                throw new MessageEOFException("reached end of data");
+            }
+            if (type == MarshallingSupport.FLOAT_TYPE) {
+                return this.dataIn.readFloat();
+            }
+            if (type == MarshallingSupport.STRING_TYPE) {
+                return Float.valueOf(this.dataIn.readUTF()).floatValue();
+            }
+            if (type == MarshallingSupport.NULL) {
+                this.dataIn.reset();
+                throw new NullPointerException("Cannot convert NULL value to float.");
+            } else {
+                this.dataIn.reset();
+                throw new MessageFormatException(" not a float type");
+            }
+        } catch (NumberFormatException mfe) {
+            try {
+                this.dataIn.reset();
+            } catch (IOException ioe) {
+                throw BlazeJmsExceptionSupport.create(ioe);
+            }
+            throw mfe;
+        } catch (EOFException e) {
+            throw BlazeJmsExceptionSupport.createMessageEOFException(e);
+        } catch (IOException e) {
+            throw BlazeJmsExceptionSupport.createMessageFormatException(e);
+        }
+    }
+
+    /**
+     * Reads a <code>double</code> from the stream message.
+     * 
+     * @return a <code>double</code> value from the stream message
+     * @throws JMSException
+     *             if the JMS provider fails to read the message due to some internal error.
+     * @throws MessageEOFException
+     *             if unexpected end of message stream has been reached.
+     * @throws MessageFormatException
+     *             if this type conversion is invalid.
+     * @throws MessageNotReadableException
+     *             if the message is in write-only mode.
+     */
+    public double readDouble() throws JMSException {
+        initializeReading();
+        try {
+            this.dataIn.mark(65);
+            int type = this.dataIn.read();
+            if (type == -1) {
+                throw new MessageEOFException("reached end of data");
+            }
+            if (type == MarshallingSupport.DOUBLE_TYPE) {
+                return this.dataIn.readDouble();
+            }
+            if (type == MarshallingSupport.FLOAT_TYPE) {
+                return this.dataIn.readFloat();
+            }
+            if (type == MarshallingSupport.STRING_TYPE) {
+                return Double.valueOf(this.dataIn.readUTF()).doubleValue();
+            }
+            if (type == MarshallingSupport.NULL) {
+                this.dataIn.reset();
+                throw new NullPointerException("Cannot convert NULL value to double.");
+            } else {
+                this.dataIn.reset();
+                throw new MessageFormatException(" not a double type");
+            }
+        } catch (NumberFormatException mfe) {
+            try {
+                this.dataIn.reset();
+            } catch (IOException ioe) {
+                throw BlazeJmsExceptionSupport.create(ioe);
+            }
+            throw mfe;
+        } catch (EOFException e) {
+            throw BlazeJmsExceptionSupport.createMessageEOFException(e);
+        } catch (IOException e) {
+            throw BlazeJmsExceptionSupport.createMessageFormatException(e);
+        }
+    }
+
+    /**
+     * Reads a <CODE>String</CODE> from the stream message.
+     * 
+     * @return a Unicode string from the stream message
+     * @throws JMSException
+     *             if the JMS provider fails to read the message due to some internal error.
+     * @throws MessageEOFException
+     *             if unexpected end of message stream has been reached.
+     * @throws MessageFormatException
+     *             if this type conversion is invalid.
+     * @throws MessageNotReadableException
+     *             if the message is in write-only mode.
+     */
+    public String readString() throws JMSException {
+        initializeReading();
+        try {
+            this.dataIn.mark(65);
+            int type = this.dataIn.read();
+            if (type == -1) {
+                throw new MessageEOFException("reached end of data");
+            }
+            if (type == MarshallingSupport.NULL) {
+                return null;
+            }
+            if (type == MarshallingSupport.BIG_STRING_TYPE) {
+                return MarshallingSupport.readUTF8(dataIn);
+            }
+            if (type == MarshallingSupport.STRING_TYPE) {
+                return this.dataIn.readUTF();
+            }
+            if (type == MarshallingSupport.LONG_TYPE) {
+                return new Long(this.dataIn.readLong()).toString();
+            }
+            if (type == MarshallingSupport.INTEGER_TYPE) {
+                return new Integer(this.dataIn.readInt()).toString();
+            }
+            if (type == MarshallingSupport.SHORT_TYPE) {
+                return new Short(this.dataIn.readShort()).toString();
+            }
+            if (type == MarshallingSupport.BYTE_TYPE) {
+                return new Byte(this.dataIn.readByte()).toString();
+            }
+            if (type == MarshallingSupport.FLOAT_TYPE) {
+                return new Float(this.dataIn.readFloat()).toString();
+            }
+            if (type == MarshallingSupport.DOUBLE_TYPE) {
+                return new Double(this.dataIn.readDouble()).toString();
+            }
+            if (type == MarshallingSupport.BOOLEAN_TYPE) {
+                return (this.dataIn.readBoolean() ? Boolean.TRUE : Boolean.FALSE).toString();
+            }
+            if (type == MarshallingSupport.CHAR_TYPE) {
+                return new Character(this.dataIn.readChar()).toString();
+            } else {
+                this.dataIn.reset();
+                throw new MessageFormatException(" not a String type");
+            }
+        } catch (NumberFormatException mfe) {
+            try {
+                this.dataIn.reset();
+            } catch (IOException ioe) {
+                throw BlazeJmsExceptionSupport.create(ioe);
+            }
+            throw mfe;
+        } catch (EOFException e) {
+            throw BlazeJmsExceptionSupport.createMessageEOFException(e);
+        } catch (IOException e) {
+            throw BlazeJmsExceptionSupport.createMessageFormatException(e);
+        }
+    }
+
+    /**
+     * Reads a byte array field from the stream message into the specified <CODE>byte[]</CODE> object (the read
+     * buffer). <p/>
+     * <P>
+     * To read the field value, <CODE>readBytes</CODE> should be successively called until it returns a value less
+     * than the length of the read buffer. The value of the bytes in the buffer following the last byte read is
+     * undefined. <p/>
+     * <P>
+     * If <CODE>readBytes</CODE> returns a value equal to the length of the buffer, a subsequent
+     * <CODE>readBytes</CODE> call must be made. If there are no more bytes to be read, this call returns -1. <p/>
+     * <P>
+     * If the byte array field value is null, <CODE>readBytes</CODE> returns -1. <p/>
+     * <P>
+     * If the byte array field value is empty, <CODE>readBytes</CODE> returns 0. <p/>
+     * <P>
+     * Once the first <CODE>readBytes</CODE> call on a <CODE>byte[]</CODE> field value has been made, the full value
+     * of the field must be read before it is valid to read the next field. An attempt to read the next field before
+     * that has been done will throw a <CODE>MessageFormatException</CODE>. <p/>
+     * <P>
+     * To read the byte field value into a new <CODE>byte[]</CODE> object, use the <CODE>readObject</CODE> method.
+     * 
+     * @param value
+     *            the buffer into which the data is read
+     * @return the total number of bytes read into the buffer, or -1 if there is no more data because the end of the
+     *         byte field has been reached
+     * @throws JMSException
+     *             if the JMS provider fails to read the message due to some internal error.
+     * @throws MessageEOFException
+     *             if unexpected end of message stream has been reached.
+     * @throws MessageFormatException
+     *             if this type conversion is invalid.
+     * @throws MessageNotReadableException
+     *             if the message is in write-only mode.
+     * @see #readObject()
+     */
+    public int readBytes(byte[] value) throws JMSException {
+        initializeReading();
+        try {
+            if (value == null) {
+                throw new NullPointerException();
+            }
+            if (remainingBytes == -1) {
+                this.dataIn.mark(value.length + 1);
+                int type = this.dataIn.read();
+                if (type == -1) {
+                    throw new MessageEOFException("reached end of data");
+                }
+                if (type != MarshallingSupport.BYTE_ARRAY_TYPE) {
+                    throw new MessageFormatException("Not a byte array");
+                }
+                remainingBytes = this.dataIn.readInt();
+            } else if (remainingBytes == 0) {
+                remainingBytes = -1;
+                return -1;
+            }
+            if (value.length <= remainingBytes) {
+                // small buffer
+                remainingBytes -= value.length;
+                this.dataIn.readFully(value);
+                return value.length;
+            } else {
+                // big buffer
+                int rc = this.dataIn.read(value, 0, remainingBytes);
+                remainingBytes = 0;
+                return rc;
+            }
+        } catch (EOFException e) {
+            JMSException jmsEx = new MessageEOFException(e.getMessage());
+            jmsEx.setLinkedException(e);
+            throw jmsEx;
+        } catch (IOException e) {
+            JMSException jmsEx = new MessageFormatException(e.getMessage());
+            jmsEx.setLinkedException(e);
+            throw jmsEx;
+        }
+    }
+
+    /**
+     * Reads an object from the stream message. <p/>
+     * <P>
+     * This method can be used to return, in objectified format, an object in the Java programming language ("Java
+     * object") that has been written to the stream with the equivalent <CODE>writeObject</CODE> method call, or its
+     * equivalent primitive <CODE>write<I>type</I></CODE> method. <p/>
+     * <P>
+     * Note that byte values are returned as <CODE>byte[]</CODE>, not <CODE>Byte[]</CODE>. <p/>
+     * <P>
+     * An attempt to call <CODE>readObject</CODE> to read a byte field value into a new <CODE>byte[]</CODE> object
+     * before the full value of the byte field has been read will throw a <CODE>MessageFormatException</CODE>.
+     * 
+     * @return a Java object from the stream message, in objectified format (for example, if the object was written as
+     *         an <CODE>int</CODE>, an <CODE>Integer</CODE> is returned)
+     * @throws JMSException
+     *             if the JMS provider fails to read the message due to some internal error.
+     * @throws MessageEOFException
+     *             if unexpected end of message stream has been reached.
+     * @throws MessageFormatException
+     *             if this type conversion is invalid.
+     * @throws MessageNotReadableException
+     *             if the message is in write-only mode.
+     * @see #readBytes(byte[] value)
+     */
+    public Object readObject() throws JMSException {
+        initializeReading();
+        try {
+            this.dataIn.mark(65);
+            int type = this.dataIn.read();
+            if (type == -1) {
+                throw new MessageEOFException("reached end of data");
+            }
+            if (type == MarshallingSupport.NULL) {
+                return null;
+            }
+            if (type == MarshallingSupport.BIG_STRING_TYPE) {
+                return MarshallingSupport.readUTF8(dataIn);
+            }
+            if (type == MarshallingSupport.STRING_TYPE) {
+                return this.dataIn.readUTF();
+            }
+            if (type == MarshallingSupport.LONG_TYPE) {
+                return Long.valueOf(this.dataIn.readLong());
+            }
+            if (type == MarshallingSupport.INTEGER_TYPE) {
+                return Integer.valueOf(this.dataIn.readInt());
+            }
+            if (type == MarshallingSupport.SHORT_TYPE) {
+                return Short.valueOf(this.dataIn.readShort());
+            }
+            if (type == MarshallingSupport.BYTE_TYPE) {
+                return Byte.valueOf(this.dataIn.readByte());
+            }
+            if (type == MarshallingSupport.FLOAT_TYPE) {
+                return new Float(this.dataIn.readFloat());
+            }
+            if (type == MarshallingSupport.DOUBLE_TYPE) {
+                return new Double(this.dataIn.readDouble());
+            }
+            if (type == MarshallingSupport.BOOLEAN_TYPE) {
+                return this.dataIn.readBoolean() ? Boolean.TRUE : Boolean.FALSE;
+            }
+            if (type == MarshallingSupport.CHAR_TYPE) {
+                return Character.valueOf(this.dataIn.readChar());
+            }
+            if (type == MarshallingSupport.BYTE_ARRAY_TYPE) {
+                int len = this.dataIn.readInt();
+                byte[] value = new byte[len];
+                this.dataIn.readFully(value);
+                return value;
+            } else {
+                this.dataIn.reset();
+                throw new MessageFormatException("unknown type");
+            }
+        } catch (NumberFormatException mfe) {
+            try {
+                this.dataIn.reset();
+            } catch (IOException ioe) {
+                throw BlazeJmsExceptionSupport.create(ioe);
+            }
+            throw mfe;
+        } catch (EOFException e) {
+            JMSException jmsEx = new MessageEOFException(e.getMessage());
+            jmsEx.setLinkedException(e);
+            throw jmsEx;
+        } catch (IOException e) {
+            JMSException jmsEx = new MessageFormatException(e.getMessage());
+            jmsEx.setLinkedException(e);
+            throw jmsEx;
+        }
+    }
+
+    /**
+     * Writes a <code>boolean</code> to the stream message. The value <code>true</code> is written as the value
+     * <code>(byte)1</code>; the value <code>false</code> is written as the value <code>(byte)0</code>.
+     * 
+     * @param value
+     *            the <code>boolean</code> value to be written
+     * @throws JMSException
+     *             if the JMS provider fails to write the message due to some internal error.
+     * @throws MessageNotWriteableException
+     *             if the message is in read-only mode.
+     */
+    public void writeBoolean(boolean value) throws JMSException {
+        initializeWriting();
+        try {
+            MarshallingSupport.marshalBoolean(dataOut, value);
+        } catch (IOException ioe) {
+            throw BlazeJmsExceptionSupport.create(ioe);
+        }
+    }
+
+    /**
+     * Writes a <code>byte</code> to the stream message.
+     * 
+     * @param value
+     *            the <code>byte</code> value to be written
+     * @throws JMSException
+     *             if the JMS provider fails to write the message due to some internal error.
+     * @throws MessageNotWriteableException
+     *             if the message is in read-only mode.
+     */
+    public void writeByte(byte value) throws JMSException {
+        initializeWriting();
+        try {
+            MarshallingSupport.marshalByte(dataOut, value);
+        } catch (IOException ioe) {
+            throw BlazeJmsExceptionSupport.create(ioe);
+        }
+    }
+
+    /**
+     * Writes a <code>short</code> to the stream message.
+     * 
+     * @param value
+     *            the <code>short</code> value to be written
+     * @throws JMSException
+     *             if the JMS provider fails to write the message due to some internal error.
+     * @throws MessageNotWriteableException
+     *             if the message is in read-only mode.
+     */
+    public void writeShort(short value) throws JMSException {
+        initializeWriting();
+        try {
+            MarshallingSupport.marshalShort(dataOut, value);
+        } catch (IOException ioe) {
+            throw BlazeJmsExceptionSupport.create(ioe);
+        }
+    }
+
+    /**
+     * Writes a <code>char</code> to the stream message.
+     * 
+     * @param value
+     *            the <code>char</code> value to be written
+     * @throws JMSException
+     *             if the JMS provider fails to write the message due to some internal error.
+     * @throws MessageNotWriteableException
+     *             if the message is in read-only mode.
+     */
+    public void writeChar(char value) throws JMSException {
+        initializeWriting();
+        try {
+            MarshallingSupport.marshalChar(dataOut, value);
+        } catch (IOException ioe) {
+            throw BlazeJmsExceptionSupport.create(ioe);
+        }
+    }
+
+    /**
+     * Writes an <code>int</code> to the stream message.
+     * 
+     * @param value
+     *            the <code>int</code> value to be written
+     * @throws JMSException
+     *             if the JMS provider fails to write the message due to some internal error.
+     * @throws MessageNotWriteableException
+     *             if the message is in read-only mode.
+     */
+    public void writeInt(int value) throws JMSException {
+        initializeWriting();
+        try {
+            MarshallingSupport.marshalInt(dataOut, value);
+        } catch (IOException ioe) {
+            throw BlazeJmsExceptionSupport.create(ioe);
+        }
+    }
+
+    /**
+     * Writes a <code>long</code> to the stream message.
+     * 
+     * @param value
+     *            the <code>long</code> value to be written
+     * @throws JMSException
+     *             if the JMS provider fails to write the message due to some internal error.
+     * @throws MessageNotWriteableException
+     *             if the message is in read-only mode.
+     */
+    public void writeLong(long value) throws JMSException {
+        initializeWriting();
+        try {
+            MarshallingSupport.marshalLong(dataOut, value);
+        } catch (IOException ioe) {
+            throw BlazeJmsExceptionSupport.create(ioe);
+        }
+    }
+
+    /**
+     * Writes a <code>float</code> to the stream message.
+     * 
+     * @param value
+     *            the <code>float</code> value to be written
+     * @throws JMSException
+     *             if the JMS provider fails to write the message due to some internal error.
+     * @throws MessageNotWriteableException
+     *             if the message is in read-only mode.
+     */
+    public void writeFloat(float value) throws JMSException {
+        initializeWriting();
+        try {
+            MarshallingSupport.marshalFloat(dataOut, value);
+        } catch (IOException ioe) {
+            throw BlazeJmsExceptionSupport.create(ioe);
+        }
+    }
+
+    /**
+     * Writes a <code>double</code> to the stream message.
+     * 
+     * @param value
+     *            the <code>double</code> value to be written
+     * @throws JMSException
+     *             if the JMS provider fails to write the message due to some internal error.
+     * @throws MessageNotWriteableException
+     *             if the message is in read-only mode.
+     */
+    public void writeDouble(double value) throws JMSException {
+        initializeWriting();
+        try {
+            MarshallingSupport.marshalDouble(dataOut, value);
+        } catch (IOException ioe) {
+            throw BlazeJmsExceptionSupport.create(ioe);
+        }
+    }
+
+    /**
+     * Writes a <code>String</code> to the stream message.
+     * 
+     * @param value
+     *            the <code>String</code> value to be written
+     * @throws JMSException
+     *             if the JMS provider fails to write the message due to some internal error.
+     * @throws MessageNotWriteableException
+     *             if the message is in read-only mode.
+     */
+    public void writeString(String value) throws JMSException {
+        initializeWriting();
+        try {
+            if (value == null) {
+                MarshallingSupport.marshalNull(dataOut);
+            } else {
+                MarshallingSupport.marshalString(dataOut, value);
+            }
+        } catch (IOException ioe) {
+            throw BlazeJmsExceptionSupport.create(ioe);
+        }
+    }
+
+    /**
+     * Writes a byte array field to the stream message. <p/>
+     * <P>
+     * The byte array <code>value</code> is written to the message as a byte array field. Consecutively written byte
+     * array fields are treated as two distinct fields when the fields are read.
+     * 
+     * @param value
+     *            the byte array value to be written
+     * @throws JMSException
+     *             if the JMS provider fails to write the message due to some internal error.
+     * @throws MessageNotWriteableException
+     *             if the message is in read-only mode.
+     */
+    public void writeBytes(byte[] value) throws JMSException {
+        writeBytes(value, 0, value.length);
+    }
+
+    /**
+     * Writes a portion of a byte array as a byte array field to the stream message. <p/>
+     * <P>
+     * The a portion of the byte array <code>value</code> is written to the message as a byte array field.
+     * Consecutively written byte array fields are treated as two distinct fields when the fields are read.
+     * 
+     * @param value
+     *            the byte array value to be written
+     * @param offset
+     *            the initial offset within the byte array
+     * @param length
+     *            the number of bytes to use
+     * @throws JMSException
+     *             if the JMS provider fails to write the message due to some internal error.
+     * @throws MessageNotWriteableException
+     *             if the message is in read-only mode.
+     */
+    public void writeBytes(byte[] value, int offset, int length) throws JMSException {
+        initializeWriting();
+        try {
+            MarshallingSupport.marshalByteArray(dataOut, value, offset, length);
+        } catch (IOException ioe) {
+            throw BlazeJmsExceptionSupport.create(ioe);
+        }
+    }
+
+    /**
+     * Writes an object to the stream message. <p/>
+     * <P>
+     * This method works only for the objectified primitive object types (<code>Integer</code>, <code>Double</code>,
+     * <code>Long</code>&nbsp;...), <code>String</code> objects, and byte arrays.
+     * 
+     * @param value
+     *            the Java object to be written
+     * @throws JMSException
+     *             if the JMS provider fails to write the message due to some internal error.
+     * @throws MessageFormatException
+     *             if the object is invalid.
+     * @throws MessageNotWriteableException
+     *             if the message is in read-only mode.
+     */
+    public void writeObject(Object value) throws JMSException {
+        initializeWriting();
+        if (value == null) {
+            try {
+                MarshallingSupport.marshalNull(dataOut);
+            } catch (IOException ioe) {
+                throw BlazeJmsExceptionSupport.create(ioe);
+            }
+        } else if (value instanceof String) {
+            writeString(value.toString());
+        } else if (value instanceof Character) {
+            writeChar(((Character) value).charValue());
+        } else if (value instanceof Boolean) {
+            writeBoolean(((Boolean) value).booleanValue());
+        } else if (value instanceof Byte) {
+            writeByte(((Byte) value).byteValue());
+        } else if (value instanceof Short) {
+            writeShort(((Short) value).shortValue());
+        } else if (value instanceof Integer) {
+            writeInt(((Integer) value).intValue());
+        } else if (value instanceof Float) {
+            writeFloat(((Float) value).floatValue());
+        } else if (value instanceof Double) {
+            writeDouble(((Double) value).doubleValue());
+        } else if (value instanceof byte[]) {
+            writeBytes((byte[]) value);
+        } else if (value instanceof Long) {
+            writeLong(((Long) value).longValue());
+        } else {
+            throw new MessageFormatException("Unsupported Object type: " + value.getClass());
+        }
+    }
+
+    /**
+     * Puts the message body in read-only mode and repositions the stream of bytes to the beginning.
+     */
+    public void reset() {
+        storeContent();
+        this.bytesOut = null;
+        this.dataIn = null;
+        this.dataOut = null;
+        this.remainingBytes = -1;
+    }
+
+    protected void initializeWriting() {
+        super.initializeWriting();
+        if (this.dataOut == null) {
+            this.bytesOut = new ByteArrayOutputStream();
+            this.dataOut = new DataOutputStream(this.bytesOut);
+        }
+    }
+
+    protected void initializeReading() {
+        if (this.dataIn == null) {
+            if (this.dataIn == null) {
+                BlazeData bd = getContent();
+                Buffer data = null;
+                if (bd != null && bd.getPayload() != null) {
+                    data = bd.getPayload();
+                } else {
+                    data = new Buffer(new byte[0]);
+                }
+                BufferInputStream is = new BufferInputStream(data);
+                this.dataIn = new DataInputStream(is);
+            }
+        }
+    }
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsStreamMessage.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsStreamMessage.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsTextMessage.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsTextMessage.java?rev=739885&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsTextMessage.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsTextMessage.java Sun Feb  1 23:35:54 2009
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.jms.message;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import javax.jms.JMSException;
+import javax.jms.TextMessage;
+import org.apache.activeblaze.BlazeException;
+import org.apache.activeblaze.BlazeRuntimeException;
+import org.apache.activemq.protobuf.Buffer;
+import org.apache.activemq.protobuf.BufferInputStream;
+import org.apache.activemq.protobuf.BufferOutputStream;
+
+/**
+ * 
+ * @version $Revision$
+ */
+public class BlazeJmsTextMessage extends BlazeJmsMessage implements TextMessage {
+    protected String text;
+
+    public BlazeJmsTextMessage clone() {
+        BlazeJmsTextMessage copy = new BlazeJmsTextMessage();
+        try {
+            copy(copy);
+        } catch (BlazeException e) {
+            throw new BlazeRuntimeException(e);
+        }
+        return copy;
+    }
+
+    /**
+     * @param copy
+     * @throws BlazeException
+     */
+    protected void copy(BlazeJmsTextMessage copy) throws BlazeException {
+        storeContent();
+        super.copy(copy);
+        copy.text = this.text;
+    }
+
+    public void setText(String text) {
+        this.text = text;
+        setContent(null);
+    }
+
+    public void storeContent() {
+        super.storeContent();
+        try {
+            BufferOutputStream os = new BufferOutputStream(this.text != null ? this.text.length() : 10);
+            DataOutputStream dataOut = new DataOutputStream(os);
+            MarshallingSupport.writeUTF8(dataOut, this.text);
+            getContent().setPayload(os.toBuffer());
+            dataOut.close();
+        } catch (IOException e) {
+            throw new BlazeRuntimeException(e);
+        }
+    }
+
+    public String getText() {
+        if (this.text == null && getContent() != null) {
+            Buffer data = null;
+            if (getContent() == null || getContent().getPayload() == null) {
+                data = getContent().getPayload();
+                if (data == null) {
+                    data = new Buffer(new byte[0]);
+                }
+            }
+            if (data == null) {
+                data = getContent().getPayload();
+            }
+            try {
+                BufferInputStream is = new BufferInputStream(data);
+                DataInputStream dataIn = new DataInputStream(is);
+                dataIn.close();
+                setContent(null);
+                this.text = MarshallingSupport.readUTF8(dataIn);
+            } catch (IOException e) {
+                throw new BlazeRuntimeException(e);
+            }
+        }
+        return this.text;
+    }
+
+    /**
+     * Clears out the message body. Clearing a message's body does not clear its header values or property entries. <p/>
+     * <P>
+     * If this message body was read-only, calling this method leaves the message body in the same state as an empty
+     * body in a newly created message.
+     * 
+     * @throws JMSException
+     *             if the JMS provider fails to clear the message body due to some internal error.
+     */
+    public void clearBody() throws JMSException {
+        super.clearBody();
+        this.text = null;
+    }
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsTextMessage.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsTextMessage.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/MarshallingSupport.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/MarshallingSupport.java?rev=739885&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/MarshallingSupport.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/MarshallingSupport.java Sun Feb  1 23:35:54 2009
@@ -0,0 +1,383 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.jms.message;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.UTFDataFormatException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * The fixed version of the UTF8 encoding function. Some older JVM's UTF8
+ * encoding function breaks when handling large strings.
+ * 
+ * @version $Revision$
+ */
+public final class MarshallingSupport {
+
+    public static final byte NULL = 0;
+    public static final byte BOOLEAN_TYPE = 1;
+    public static final byte BYTE_TYPE = 2;
+    public static final byte CHAR_TYPE = 3;
+    public static final byte SHORT_TYPE = 4;
+    public static final byte INTEGER_TYPE = 5;
+    public static final byte LONG_TYPE = 6;
+    public static final byte DOUBLE_TYPE = 7;
+    public static final byte FLOAT_TYPE = 8;
+    public static final byte STRING_TYPE = 9;
+    public static final byte BYTE_ARRAY_TYPE = 10;
+    public static final byte MAP_TYPE = 11;
+    public static final byte LIST_TYPE = 12;
+    public static final byte BIG_STRING_TYPE = 13;
+
+    private MarshallingSupport() {
+    }
+    
+    public static void marshalPrimitiveMap(Map<String,Object> map, DataOutputStream out) throws IOException {
+        if (map == null) {
+            out.writeInt(-1);
+        } else {
+            out.writeInt(map.size());
+            for (String name:map.keySet()) {
+                out.writeUTF(name);
+                Object value = map.get(name);
+                marshalPrimitive(out, value);
+            }
+        }
+    }
+
+    public static Map<String, Object> unmarshalPrimitiveMap(DataInputStream in) throws IOException {
+        return unmarshalPrimitiveMap(in, Integer.MAX_VALUE);
+    }
+
+    /**
+     * @param in
+     * @return
+     * @throws IOException
+     * @throws IOException
+     */
+    public static Map<String, Object> unmarshalPrimitiveMap(DataInputStream in, int maxPropertySize) throws IOException {
+        int size = in.readInt();
+        if (size > maxPropertySize) {
+            throw new IOException("Primitive map is larger than the allowed size: " + size);
+        }
+        if (size < 0) {
+            return null;
+        } else {
+            Map<String, Object> rc = new HashMap<String, Object>(size);
+            for (int i = 0; i < size; i++) {
+                String name = in.readUTF();
+                rc.put(name, unmarshalPrimitive(in));
+            }
+            return rc;
+        }
+
+    }
+
+    public static void marshalPrimitiveList(List list, DataOutputStream out) throws IOException {
+        out.writeInt(list.size());
+        for (Iterator iter = list.iterator(); iter.hasNext();) {
+            Object element = (Object)iter.next();
+            marshalPrimitive(out, element);
+        }
+    }
+
+    public static List<Object> unmarshalPrimitiveList(DataInputStream in) throws IOException {
+        int size = in.readInt();
+        List<Object> answer = new ArrayList<Object>(size);
+        while (size-- > 0) {
+            answer.add(unmarshalPrimitive(in));
+        }
+        return answer;
+    }
+
+    public static void marshalPrimitive(DataOutputStream out, Object value) throws IOException {
+        if (value == null) {
+            marshalNull(out);
+        } else if (value.getClass() == Boolean.class) {
+            marshalBoolean(out, ((Boolean)value).booleanValue());
+        } else if (value.getClass() == Byte.class) {
+            marshalByte(out, ((Byte)value).byteValue());
+        } else if (value.getClass() == Character.class) {
+            marshalChar(out, ((Character)value).charValue());
+        } else if (value.getClass() == Short.class) {
+            marshalShort(out, ((Short)value).shortValue());
+        } else if (value.getClass() == Integer.class) {
+            marshalInt(out, ((Integer)value).intValue());
+        } else if (value.getClass() == Long.class) {
+            marshalLong(out, ((Long)value).longValue());
+        } else if (value.getClass() == Float.class) {
+            marshalFloat(out, ((Float)value).floatValue());
+        } else if (value.getClass() == Double.class) {
+            marshalDouble(out, ((Double)value).doubleValue());
+        } else if (value.getClass() == byte[].class) {
+            marshalByteArray(out, (byte[])value);
+        } else if (value.getClass() == String.class) {
+            marshalString(out, (String)value);
+        } else if (value instanceof Map) {
+            out.writeByte(MAP_TYPE);
+            marshalPrimitiveMap((Map)value, out);
+        } else if (value instanceof List) {
+            out.writeByte(LIST_TYPE);
+            marshalPrimitiveList((List)value, out);
+        } else {
+            throw new IOException("Object is not a primitive: " + value);
+        }
+    }
+
+    public static Object unmarshalPrimitive(DataInputStream in) throws IOException {
+        Object value = null;
+        byte type = in.readByte();
+        switch (type) {
+        case BYTE_TYPE:
+            value = Byte.valueOf(in.readByte());
+            break;
+        case BOOLEAN_TYPE:
+            value = in.readBoolean() ? Boolean.TRUE : Boolean.FALSE;
+            break;
+        case CHAR_TYPE:
+            value = Character.valueOf(in.readChar());
+            break;
+        case SHORT_TYPE:
+            value = Short.valueOf(in.readShort());
+            break;
+        case INTEGER_TYPE:
+            value = Integer.valueOf(in.readInt());
+            break;
+        case LONG_TYPE:
+            value = Long.valueOf(in.readLong());
+            break;
+        case FLOAT_TYPE:
+            value = new Float(in.readFloat());
+            break;
+        case DOUBLE_TYPE:
+            value = new Double(in.readDouble());
+            break;
+        case BYTE_ARRAY_TYPE:
+            value = new byte[in.readInt()];
+            in.readFully((byte[])value);
+            break;
+        case STRING_TYPE:
+            value = in.readUTF();
+            break;
+        case BIG_STRING_TYPE:
+            value = readUTF8(in);
+            break;
+        case MAP_TYPE:
+            value = unmarshalPrimitiveMap(in);
+            break;
+        case LIST_TYPE:
+            value = unmarshalPrimitiveList(in);
+            break;
+        case NULL:
+            value = null;
+            break;
+        default:
+            throw new IOException("Unknown primitive type: " + type);
+        }
+        return value;
+    }
+
+    public static void marshalNull(DataOutputStream out) throws IOException {
+        out.writeByte(NULL);
+    }
+
+    public static void marshalBoolean(DataOutputStream out, boolean value) throws IOException {
+        out.writeByte(BOOLEAN_TYPE);
+        out.writeBoolean(value);
+    }
+
+    public static void marshalByte(DataOutputStream out, byte value) throws IOException {
+        out.writeByte(BYTE_TYPE);
+        out.writeByte(value);
+    }
+
+    public static void marshalChar(DataOutputStream out, char value) throws IOException {
+        out.writeByte(CHAR_TYPE);
+        out.writeChar(value);
+    }
+
+    public static void marshalShort(DataOutputStream out, short value) throws IOException {
+        out.writeByte(SHORT_TYPE);
+        out.writeShort(value);
+    }
+
+    public static void marshalInt(DataOutputStream out, int value) throws IOException {
+        out.writeByte(INTEGER_TYPE);
+        out.writeInt(value);
+    }
+
+    public static void marshalLong(DataOutputStream out, long value) throws IOException {
+        out.writeByte(LONG_TYPE);
+        out.writeLong(value);
+    }
+
+    public static void marshalFloat(DataOutputStream out, float value) throws IOException {
+        out.writeByte(FLOAT_TYPE);
+        out.writeFloat(value);
+    }
+
+    public static void marshalDouble(DataOutputStream out, double value) throws IOException {
+        out.writeByte(DOUBLE_TYPE);
+        out.writeDouble(value);
+    }
+
+    public static void marshalByteArray(DataOutputStream out, byte[] value) throws IOException {
+        marshalByteArray(out, value, 0, value.length);
+    }
+
+    public static void marshalByteArray(DataOutputStream out, byte[] value, int offset, int length) throws IOException {
+        out.writeByte(BYTE_ARRAY_TYPE);
+        out.writeInt(length);
+        out.write(value, offset, length);
+    }
+
+    public static void marshalString(DataOutputStream out, String s) throws IOException {
+        // If it's too big, out.writeUTF may not able able to write it out.
+        if (s.length() < Short.MAX_VALUE / 4) {
+            out.writeByte(STRING_TYPE);
+            out.writeUTF(s);
+        } else {
+            out.writeByte(BIG_STRING_TYPE);
+            writeUTF8(out, s);
+        }
+    }
+
+    public static void writeUTF8(DataOutput dataOut, String text) throws IOException {
+        if (text != null) {
+            int strlen = text.length();
+            int utflen = 0;
+            char[] charr = new char[strlen];
+            int c = 0;
+            int count = 0;
+
+            text.getChars(0, strlen, charr, 0);
+
+            for (int i = 0; i < strlen; i++) {
+                c = charr[i];
+                if ((c >= 0x0001) && (c <= 0x007F)) {
+                    utflen++;
+                } else if (c > 0x07FF) {
+                    utflen += 3;
+                } else {
+                    utflen += 2;
+                }
+            }
+            // TODO diff: Sun code - removed
+            byte[] bytearr = new byte[utflen + 4]; // TODO diff: Sun code
+            bytearr[count++] = (byte)((utflen >>> 24) & 0xFF); // TODO diff:
+            // Sun code
+            bytearr[count++] = (byte)((utflen >>> 16) & 0xFF); // TODO diff:
+            // Sun code
+            bytearr[count++] = (byte)((utflen >>> 8) & 0xFF);
+            bytearr[count++] = (byte)((utflen >>> 0) & 0xFF);
+            for (int i = 0; i < strlen; i++) {
+                c = charr[i];
+                if ((c >= 0x0001) && (c <= 0x007F)) {
+                    bytearr[count++] = (byte)c;
+                } else if (c > 0x07FF) {
+                    bytearr[count++] = (byte)(0xE0 | ((c >> 12) & 0x0F));
+                    bytearr[count++] = (byte)(0x80 | ((c >> 6) & 0x3F));
+                    bytearr[count++] = (byte)(0x80 | ((c >> 0) & 0x3F));
+                } else {
+                    bytearr[count++] = (byte)(0xC0 | ((c >> 6) & 0x1F));
+                    bytearr[count++] = (byte)(0x80 | ((c >> 0) & 0x3F));
+                }
+            }
+            dataOut.write(bytearr);
+
+        } else {
+            dataOut.writeInt(-1);
+        }
+    }
+
+    public static String readUTF8(DataInput dataIn) throws IOException {
+        int utflen = dataIn.readInt(); // TODO diff: Sun code
+        if (utflen > -1) {
+            StringBuffer str = new StringBuffer(utflen);
+            byte bytearr[] = new byte[utflen];
+            int c;
+            int char2;
+            int char3;
+            int count = 0;
+
+            dataIn.readFully(bytearr, 0, utflen);
+
+            while (count < utflen) {
+                c = bytearr[count] & 0xff;
+                switch (c >> 4) {
+                case 0:
+                case 1:
+                case 2:
+                case 3:
+                case 4:
+                case 5:
+                case 6:
+                case 7:
+                    /* 0xxxxxxx */
+                    count++;
+                    str.append((char)c);
+                    break;
+                case 12:
+                case 13:
+                    /* 110x xxxx 10xx xxxx */
+                    count += 2;
+                    if (count > utflen) {
+                        throw new UTFDataFormatException();
+                    }
+                    char2 = bytearr[count - 1];
+                    if ((char2 & 0xC0) != 0x80) {
+                        throw new UTFDataFormatException();
+                    }
+                    str.append((char)(((c & 0x1F) << 6) | (char2 & 0x3F)));
+                    break;
+                case 14:
+                    /* 1110 xxxx 10xx xxxx 10xx xxxx */
+                    count += 3;
+                    if (count > utflen) {
+                        throw new UTFDataFormatException();
+                    }
+                    char2 = bytearr[count - 2]; // TODO diff: Sun code
+                    char3 = bytearr[count - 1]; // TODO diff: Sun code
+                    if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) {
+                        throw new UTFDataFormatException();
+                    }
+                    str.append((char)(((c & 0x0F) << 12) | ((char2 & 0x3F) << 6) | ((char3 & 0x3F) << 0)));
+                    break;
+                default:
+                    /* 10xx xxxx, 1111 xxxx */
+                    throw new UTFDataFormatException();
+                }
+            }
+            // The number of chars produced may be less than utflen
+            return new String(str);
+        } else {
+            return null;
+        }
+    }
+
+   
+
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/MarshallingSupport.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/MarshallingSupport.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jndi/JNDIReferenceFactory.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jndi/JNDIReferenceFactory.java?rev=739885&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jndi/JNDIReferenceFactory.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jndi/JNDIReferenceFactory.java Sun Feb  1 23:35:54 2009
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.jndi;
+
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.Map;
+import javax.naming.Context;
+import javax.naming.Name;
+import javax.naming.NamingException;
+import javax.naming.RefAddr;
+import javax.naming.Reference;
+import javax.naming.StringRefAddr;
+import javax.naming.spi.ObjectFactory;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Converts objects implementing JNDIStorable into a property fields so they can be stored and regenerated from JNDI
+ */
+public class JNDIReferenceFactory implements ObjectFactory {
+    static Log log = LogFactory.getLog(JNDIReferenceFactory.class);
+
+    /**
+     * This will be called by a JNDIprovider when a Reference is retrieved from a JNDI store - and generates the orignal
+     * instance
+     * 
+     * @param object
+     *            the Reference object
+     * @param name
+     *            the JNDI name
+     * @param nameCtx
+     *            the context
+     * @param environment
+     *            the environment settings used by JNDI
+     * @return the instance built from the Reference object
+     * @throws Exception
+     *             if building the instance from Reference fails (usually class not found)
+     */
+    public Object getObjectInstance(Object object, Name name, Context nameCtx, Hashtable<?, ?> environment)
+            throws Exception {
+        Object result = null;
+        if (object instanceof Reference) {
+            Reference reference = (Reference) object;
+            if (log.isTraceEnabled()) {
+                log.trace("Getting instance of " + reference.getClassName());
+            }
+            Class<?> theClass = loadClass(this, reference.getClassName());
+            if (JNDIStorable.class.isAssignableFrom(theClass)) {
+                JNDIStorable store = (JNDIStorable) theClass.newInstance();
+                Map<String, String> properties = new HashMap<String, String>();
+                for (Enumeration<RefAddr> iter = reference.getAll(); iter.hasMoreElements();) {
+                    StringRefAddr addr = (StringRefAddr) iter.nextElement();
+                    properties.put(addr.getType(), (addr.getContent() == null) ? "" : addr.getContent().toString());
+                }
+                store.setProperties(properties);
+                result = store;
+            }
+        } else {
+            log.error("Object " + object + " is not a reference - cannot load");
+            throw new RuntimeException("Object " + object + " is not a reference");
+        }
+        return result;
+    }
+
+    /**
+     * Create a Reference instance from a JNDIStorable object
+     * 
+     * @param instanceClassName
+     * @param po
+     * @return Reference
+     * @throws NamingException
+     */
+    public static Reference createReference(String instanceClassName, JNDIStorable po) throws NamingException {
+        if (log.isTraceEnabled()) {
+            log.trace("Creating reference: " + instanceClassName + "," + po);
+        }
+        Reference result = new Reference(instanceClassName, JNDIReferenceFactory.class.getName(), null);
+        try {
+            Map<String, String> props = po.getProperties();
+            for (Map.Entry<String, String> entry : props.entrySet()) {
+                javax.naming.StringRefAddr addr = new javax.naming.StringRefAddr(entry.getKey(), entry.getValue());
+                result.add(addr);
+            }
+        } catch (Exception e) {
+            log.error(e.getMessage(), e);
+            throw new NamingException(e.getMessage());
+        }
+        return result;
+    }
+
+    /**
+     * Retrieve the class loader for a named class
+     * 
+     * @param thisObj
+     * @param className
+     * @return the class
+     * @throws ClassNotFoundException
+     */
+    public static Class<?> loadClass(Object thisObj, String className) throws ClassNotFoundException {
+        // try local ClassLoader first.
+        ClassLoader loader = thisObj.getClass().getClassLoader();
+        Class<?> theClass;
+        if (loader != null) {
+            theClass = loader.loadClass(className);
+        } else {
+            // Will be null in jdk1.1.8
+            // use default classLoader
+            theClass = Class.forName(className);
+        }
+        return theClass;
+    }
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jndi/JNDIReferenceFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jndi/JNDIStorable.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jndi/JNDIStorable.java?rev=739885&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jndi/JNDIStorable.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jndi/JNDIStorable.java Sun Feb  1 23:35:54 2009
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.jndi;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import javax.naming.NamingException;
+import javax.naming.Reference;
+import javax.naming.Referenceable;
+
+/**
+ * Facilitates objects to be stored in JNDI as properties
+ */
+
+public abstract class JNDIStorable implements Referenceable, Externalizable {
+
+    private Map<String,String> properties;
+
+    /**
+     * Set the properties that will represent the instance in JNDI
+     * 
+     * @param props
+     */
+    protected abstract void buildFromProperties(Map<String,String> props);
+
+    /**
+     * Initialize the instance from properties stored in JNDI
+     * 
+     * @param props
+     */
+
+    protected abstract void populateProperties(Map<String,String> props);
+
+    /**
+     * set the properties for this instance as retrieved from JNDI
+     * 
+     * @param props
+     */
+
+    public synchronized void setProperties(Map<String,String> props) {
+        this.properties = props;
+        buildFromProperties(props);
+    }
+
+    /**
+     * Get the properties from this instance for storing in JNDI
+     * 
+     * @return the properties
+     */
+
+    public synchronized Map<String,String> getProperties() {
+        if (this.properties == null) {
+            this.properties = new HashMap<String,String>();
+        }
+        populateProperties(this.properties);
+        return this.properties;
+    }
+
+    /**
+     * Retrive a Reference for this instance to store in JNDI
+     * 
+     * @return the built Reference
+     * @throws NamingException if error on building Reference
+     */
+    public Reference getReference() throws NamingException {
+        return JNDIReferenceFactory.createReference(this.getClass().getName(), this);
+    }
+
+    /**
+     * @param in
+     * @throws IOException
+     * @throws ClassNotFoundException
+     * @see java.io.Externalizable#readExternal(java.io.ObjectInput)
+     */
+    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        Map<String,String> props = (Map<String,String>)in.readObject();
+        if (props != null) {
+            setProperties(props);
+        }
+
+    }
+
+    /**
+     * @param out
+     * @throws IOException
+     * @see java.io.Externalizable#writeExternal(java.io.ObjectOutput)
+     */
+    public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeObject(getProperties());
+
+    }
+    
+    protected String getProperty(Map<String,String>map,String key, String defaultValue) {
+        String value = map.get(key);
+        if (value != null) {
+            return value;
+        }
+        return defaultValue;
+    }
+
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jndi/JNDIStorable.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/Callback.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/Callback.java?rev=739885&r1=739884&r2=739885&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/Callback.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/Callback.java Sun Feb  1 23:35:54 2009
@@ -21,12 +21,14 @@
 /**
  * A simple callback object 
  * @version $Revision: 1.2 $
+ * @param <T> 
  */
-public interface Callback {
+public interface Callback<T> {
 
     /**
      * Executes some piece of code 
+     * @param t 
      * @throws BlazeRuntimeException 
      */
-    void execute() throws BlazeRuntimeException;
+    void execute(T t) throws BlazeRuntimeException;
 }

Modified: activemq/activemq-blaze/trunk/src/main/proto/blaze.proto
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/proto/blaze.proto?rev=739885&r1=739884&r2=739885&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/proto/blaze.proto (original)
+++ activemq/activemq-blaze/trunk/src/main/proto/blaze.proto Sun Feb  1 23:35:54 2009
@@ -84,6 +84,15 @@
     optional bool temporary=3;
     }
     
+    message SubscriptionData {
+      optional bool durable =1;
+      optional int32 weight = 2;
+      optional string channelName =3;
+      optional string subscriberName =4;
+      optional string selector =5;
+      optional DestinationData destinationData =6;
+    }
+    
     message MemberData {
        //| option java_implments = "org.apache.activeblaze.impl.processor.PacketMessageType";
        //| option java_type_method = "MessageType";
@@ -98,11 +107,11 @@
        //if both weights are the same - the refined
        //weight can be used
        optional int64 refinedWeight = 8;
-       optional bool  destinationsChanged = 9;
+       optional bool  subscriptionsChanged = 9;
        optional bool  observer = 10;
        optional bool  lockedMaster = 11;
        repeated bytes groups = 12;
-       repeated DestinationData  destination = 13; 
+       repeated SubscriptionData  subscriptionData = 13; 
     }
     
     message StateKeyData {
@@ -242,6 +251,7 @@
      optional DestinationData destinationData = 11;  
      optional DestinationData replyToData = 12;  
      optional MapData mapData = 14;
+     optional bytes payload = 15;
       
     }
 

Modified: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazeChannelTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazeChannelTest.java?rev=739885&r1=739884&r2=739885&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazeChannelTest.java (original)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazeChannelTest.java Sun Feb  1 23:35:54 2009
@@ -39,7 +39,7 @@
         sender.start();
         receiver.start();
         final CountDownLatch latch = new CountDownLatch(count);
-        receiver.addBlazeTopicMessageListener(destination, new BlazeTopicListener() {
+        receiver.addBlazeTopicMessageListener(destination, new BlazeMessageListener() {
             public void onMessage(BlazeMessage message) {
                 message.size();
                 received.incrementAndGet();
@@ -69,7 +69,7 @@
             BlazeChannel channel = factory.createChannel();
             channel.start();
             channels.add(channel);
-            channel.addBlazeTopicMessageListener(destination, new BlazeTopicListener() {
+            channel.addBlazeTopicMessageListener(destination, new BlazeMessageListener() {
                 public void onMessage(BlazeMessage message) {
                     synchronized (count) {
                         if (count.incrementAndGet() == GROUP_SIZE) {