You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/06/01 20:37:54 UTC

svn commit: r780773 [4/31] - in /activemq/sandbox/activemq-flow: activemq-client/ activemq-client/src/main/java/org/ activemq-client/src/main/java/org/apache/ activemq-client/src/main/java/org/apache/activemq/ activemq-client/src/main/java/org/apache/a...

Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java Mon Jun  1 18:37:41 2009
@@ -0,0 +1,1152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.command;
+
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.zip.DeflaterOutputStream;
+import java.util.zip.InflaterInputStream;
+
+import javax.jms.JMSException;
+import javax.jms.MessageEOFException;
+import javax.jms.MessageFormatException;
+import javax.jms.MessageNotReadableException;
+import javax.jms.MessageNotWriteableException;
+import javax.jms.StreamMessage;
+
+import org.apache.activemq.IConnection;
+import org.apache.activemq.util.ByteArrayInputStream;
+import org.apache.activemq.util.ByteArrayOutputStream;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.JMSExceptionSupport;
+import org.apache.activemq.util.MarshallingSupport;
+
+/**
+ * A <CODE>StreamMessage</CODE> object is used to send a stream of primitive
+ * types in the Java programming language. It is filled and read sequentially.
+ * It inherits from the <CODE>Message</CODE> interface and adds a stream
+ * message body. Its methods are based largely on those found in
+ * <CODE>java.io.DataInputStream</CODE> and
+ * <CODE>java.io.DataOutputStream</CODE>. <p/>
+ * <P>
+ * The primitive types can be read or written explicitly using methods for each
+ * type. They may also be read or written generically as objects. For instance,
+ * a call to <CODE>StreamMessage.writeInt(6)</CODE> is equivalent to
+ * <CODE>StreamMessage.writeObject(new
+ * Integer(6))</CODE>. Both forms are
+ * provided, because the explicit form is convenient for static programming, and
+ * the object form is needed when types are not known at compile time. <p/>
+ * <P>
+ * When the message is first created, and when <CODE>clearBody</CODE> is
+ * called, the body of the message is in write-only mode. After the first call
+ * to <CODE>reset</CODE> has been made, the message body is in read-only mode.
+ * After a message has been sent, the client that sent it can retain and modify
+ * it without affecting the message that has been sent. The same message object
+ * can be sent multiple times. When a message has been received, the provider
+ * has called <CODE>reset</CODE> so that the message body is in read-only mode
+ * for the client. <p/>
+ * <P>
+ * If <CODE>clearBody</CODE> is called on a message in read-only mode, the
+ * message body is cleared and the message body is in write-only mode. <p/>
+ * <P>
+ * If a client attempts to read a message in write-only mode, a
+ * <CODE>MessageNotReadableException</CODE> is thrown. <p/>
+ * <P>
+ * If a client attempts to write a message in read-only mode, a
+ * <CODE>MessageNotWriteableException</CODE> is thrown. <p/>
+ * <P>
+ * <CODE>StreamMessage</CODE> objects support the following conversion table.
+ * The marked cases must be supported. The unmarked cases must throw a
+ * <CODE>JMSException</CODE>. The <CODE>String</CODE>-to-primitive
+ * conversions may throw a runtime exception if the primitive's
+ * <CODE>valueOf()</CODE> method does not accept it as a valid
+ * <CODE>String</CODE> representation of the primitive. <p/>
+ * <P>
+ * A value written as the row type can be read as the column type. <p/>
+ * 
+ * <PRE>
+ *  | | boolean byte short char int long float double String byte[]
+ * |----------------------------------------------------------------------
+ * |boolean | X X |byte | X X X X X |short | X X X X |char | X X |int | X X X
+ * |long | X X |float | X X X |double | X X |String | X X X X X X X X |byte[] |
+ * X |----------------------------------------------------------------------
+ * 
+ * </PRE>
+ * 
+ * <p/>
+ * <P>
+ * Attempting to read a null value as a primitive type must be treated as
+ * calling the primitive's corresponding <code>valueOf(String)</code>
+ * conversion method with a null value. Since <code>char</code> does not
+ * support a <code>String</code> conversion, attempting to read a null value
+ * as a <code>char</code> must throw a <code>NullPointerException</code>.
+ * 
+ * @openwire:marshaller code="27"
+ * @see javax.jms.Session#createStreamMessage()
+ * @see javax.jms.BytesMessage
+ * @see javax.jms.MapMessage
+ * @see javax.jms.Message
+ * @see javax.jms.ObjectMessage
+ * @see javax.jms.TextMessage
+ */
+public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMessage {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_STREAM_MESSAGE;
+
+    protected transient DataOutputStream dataOut;
+    protected transient ByteArrayOutputStream bytesOut;
+    protected transient DataInputStream dataIn;
+    protected transient int remainingBytes = -1;
+
+    public Message copy() {
+        ActiveMQStreamMessage copy = new ActiveMQStreamMessage();
+        copy(copy);
+        return copy;
+    }
+
+    private void copy(ActiveMQStreamMessage copy) {
+        storeContent();
+        super.copy(copy);
+        copy.dataOut = null;
+        copy.bytesOut = null;
+        copy.dataIn = null;
+    }
+
+    public void onSend() throws JMSException {
+        super.onSend();
+        storeContent();
+    }
+
+    private void storeContent() {
+        if (dataOut != null) {
+            try {
+                dataOut.close();
+                setContent(bytesOut.toByteSequence());
+                bytesOut = null;
+                dataOut = null;
+            } catch (IOException ioe) {
+                throw new RuntimeException(ioe);
+            }
+        }
+    }
+
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    public String getJMSXMimeType() {
+        return "jms/stream-message";
+    }
+
+    /**
+     * Clears out the message body. Clearing a message's body does not clear its
+     * header values or property entries. <p/>
+     * <P>
+     * If this message body was read-only, calling this method leaves the
+     * message body in the same state as an empty body in a newly created
+     * message.
+     * 
+     * @throws JMSException if the JMS provider fails to clear the message body
+     *                 due to some internal error.
+     */
+
+    public void clearBody() throws JMSException {
+        super.clearBody();
+        this.dataOut = null;
+        this.dataIn = null;
+        this.bytesOut = null;
+        this.remainingBytes = -1;
+    }
+
+    /**
+     * Reads a <code>boolean</code> from the stream message.
+     * 
+     * @return the <code>boolean</code> value read
+     * @throws JMSException if the JMS provider fails to read the message due to
+     *                 some internal error.
+     * @throws MessageEOFException if unexpected end of message stream has been
+     *                 reached.
+     * @throws MessageFormatException if this type conversion is invalid.
+     * @throws MessageNotReadableException if the message is in write-only mode.
+     */
+
+    public boolean readBoolean() throws JMSException {
+        initializeReading();
+        try {
+
+            this.dataIn.mark(10);
+            int type = this.dataIn.read();
+            if (type == -1) {
+                throw new MessageEOFException("reached end of data");
+            }
+            if (type == MarshallingSupport.BOOLEAN_TYPE) {
+                return this.dataIn.readBoolean();
+            }
+            if (type == MarshallingSupport.STRING_TYPE) {
+                return Boolean.valueOf(this.dataIn.readUTF()).booleanValue();
+            }
+            if (type == MarshallingSupport.NULL) {
+                this.dataIn.reset();
+                throw new NullPointerException("Cannot convert NULL value to boolean.");
+            } else {
+                this.dataIn.reset();
+                throw new MessageFormatException(" not a boolean type");
+            }
+        } catch (EOFException e) {
+            throw JMSExceptionSupport.createMessageEOFException(e);
+        } catch (IOException e) {
+            throw JMSExceptionSupport.createMessageFormatException(e);
+        }
+    }
+
+    /**
+     * Reads a <code>byte</code> value from the stream message.
+     * 
+     * @return the next byte from the stream message as a 8-bit
+     *         <code>byte</code>
+     * @throws JMSException if the JMS provider fails to read the message due to
+     *                 some internal error.
+     * @throws MessageEOFException if unexpected end of message stream has been
+     *                 reached.
+     * @throws MessageFormatException if this type conversion is invalid.
+     * @throws MessageNotReadableException if the message is in write-only mode.
+     */
+
+    public byte readByte() throws JMSException {
+        initializeReading();
+        try {
+
+            this.dataIn.mark(10);
+            int type = this.dataIn.read();
+            if (type == -1) {
+                throw new MessageEOFException("reached end of data");
+            }
+            if (type == MarshallingSupport.BYTE_TYPE) {
+                return this.dataIn.readByte();
+            }
+            if (type == MarshallingSupport.STRING_TYPE) {
+                return Byte.valueOf(this.dataIn.readUTF()).byteValue();
+            }
+            if (type == MarshallingSupport.NULL) {
+                this.dataIn.reset();
+                throw new NullPointerException("Cannot convert NULL value to byte.");
+            } else {
+                this.dataIn.reset();
+                throw new MessageFormatException(" not a byte type");
+            }
+        } catch (NumberFormatException mfe) {
+            try {
+                this.dataIn.reset();
+            } catch (IOException ioe) {
+                throw JMSExceptionSupport.create(ioe);
+            }
+            throw mfe;
+
+        } catch (EOFException e) {
+            throw JMSExceptionSupport.createMessageEOFException(e);
+        } catch (IOException e) {
+            throw JMSExceptionSupport.createMessageFormatException(e);
+        }
+    }
+
+    /**
+     * Reads a 16-bit integer from the stream message.
+     * 
+     * @return a 16-bit integer from the stream message
+     * @throws JMSException if the JMS provider fails to read the message due to
+     *                 some internal error.
+     * @throws MessageEOFException if unexpected end of message stream has been
+     *                 reached.
+     * @throws MessageFormatException if this type conversion is invalid.
+     * @throws MessageNotReadableException if the message is in write-only mode.
+     */
+
+    public short readShort() throws JMSException {
+        initializeReading();
+        try {
+
+            this.dataIn.mark(17);
+            int type = this.dataIn.read();
+            if (type == -1) {
+                throw new MessageEOFException("reached end of data");
+            }
+            if (type == MarshallingSupport.SHORT_TYPE) {
+                return this.dataIn.readShort();
+            }
+            if (type == MarshallingSupport.BYTE_TYPE) {
+                return this.dataIn.readByte();
+            }
+            if (type == MarshallingSupport.STRING_TYPE) {
+                return Short.valueOf(this.dataIn.readUTF()).shortValue();
+            }
+            if (type == MarshallingSupport.NULL) {
+                this.dataIn.reset();
+                throw new NullPointerException("Cannot convert NULL value to short.");
+            } else {
+                this.dataIn.reset();
+                throw new MessageFormatException(" not a short type");
+            }
+        } catch (NumberFormatException mfe) {
+            try {
+                this.dataIn.reset();
+            } catch (IOException ioe) {
+                throw JMSExceptionSupport.create(ioe);
+            }
+            throw mfe;
+
+        } catch (EOFException e) {
+            throw JMSExceptionSupport.createMessageEOFException(e);
+        } catch (IOException e) {
+            throw JMSExceptionSupport.createMessageFormatException(e);
+        }
+
+    }
+
+    /**
+     * Reads a Unicode character value from the stream message.
+     * 
+     * @return a Unicode character from the stream message
+     * @throws JMSException if the JMS provider fails to read the message due to
+     *                 some internal error.
+     * @throws MessageEOFException if unexpected end of message stream has been
+     *                 reached.
+     * @throws MessageFormatException if this type conversion is invalid
+     * @throws MessageNotReadableException if the message is in write-only mode.
+     */
+
+    public char readChar() throws JMSException {
+        initializeReading();
+        try {
+
+            this.dataIn.mark(17);
+            int type = this.dataIn.read();
+            if (type == -1) {
+                throw new MessageEOFException("reached end of data");
+            }
+            if (type == MarshallingSupport.CHAR_TYPE) {
+                return this.dataIn.readChar();
+            }
+            if (type == MarshallingSupport.NULL) {
+                this.dataIn.reset();
+                throw new NullPointerException("Cannot convert NULL value to char.");
+            } else {
+                this.dataIn.reset();
+                throw new MessageFormatException(" not a char type");
+            }
+        } catch (NumberFormatException mfe) {
+            try {
+                this.dataIn.reset();
+            } catch (IOException ioe) {
+                throw JMSExceptionSupport.create(ioe);
+            }
+            throw mfe;
+
+        } catch (EOFException e) {
+            throw JMSExceptionSupport.createMessageEOFException(e);
+        } catch (IOException e) {
+            throw JMSExceptionSupport.createMessageFormatException(e);
+        }
+    }
+
+    /**
+     * Reads a 32-bit integer from the stream message.
+     * 
+     * @return a 32-bit integer value from the stream message, interpreted as an
+     *         <code>int</code>
+     * @throws JMSException if the JMS provider fails to read the message due to
+     *                 some internal error.
+     * @throws MessageEOFException if unexpected end of message stream has been
+     *                 reached.
+     * @throws MessageFormatException if this type conversion is invalid.
+     * @throws MessageNotReadableException if the message is in write-only mode.
+     */
+
+    public int readInt() throws JMSException {
+        initializeReading();
+        try {
+
+            this.dataIn.mark(33);
+            int type = this.dataIn.read();
+            if (type == -1) {
+                throw new MessageEOFException("reached end of data");
+            }
+            if (type == MarshallingSupport.INTEGER_TYPE) {
+                return this.dataIn.readInt();
+            }
+            if (type == MarshallingSupport.SHORT_TYPE) {
+                return this.dataIn.readShort();
+            }
+            if (type == MarshallingSupport.BYTE_TYPE) {
+                return this.dataIn.readByte();
+            }
+            if (type == MarshallingSupport.STRING_TYPE) {
+                return Integer.valueOf(this.dataIn.readUTF()).intValue();
+            }
+            if (type == MarshallingSupport.NULL) {
+                this.dataIn.reset();
+                throw new NullPointerException("Cannot convert NULL value to int.");
+            } else {
+                this.dataIn.reset();
+                throw new MessageFormatException(" not an int type");
+            }
+        } catch (NumberFormatException mfe) {
+            try {
+                this.dataIn.reset();
+            } catch (IOException ioe) {
+                throw JMSExceptionSupport.create(ioe);
+            }
+            throw mfe;
+
+        } catch (EOFException e) {
+            throw JMSExceptionSupport.createMessageEOFException(e);
+        } catch (IOException e) {
+            throw JMSExceptionSupport.createMessageFormatException(e);
+        }
+    }
+
+    /**
+     * Reads a 64-bit integer from the stream message.
+     * 
+     * @return a 64-bit integer value from the stream message, interpreted as a
+     *         <code>long</code>
+     * @throws JMSException if the JMS provider fails to read the message due to
+     *                 some internal error.
+     * @throws MessageEOFException if unexpected end of message stream has been
+     *                 reached.
+     * @throws MessageFormatException if this type conversion is invalid.
+     * @throws MessageNotReadableException if the message is in write-only mode.
+     */
+
+    public long readLong() throws JMSException {
+        initializeReading();
+        try {
+
+            this.dataIn.mark(65);
+            int type = this.dataIn.read();
+            if (type == -1) {
+                throw new MessageEOFException("reached end of data");
+            }
+            if (type == MarshallingSupport.LONG_TYPE) {
+                return this.dataIn.readLong();
+            }
+            if (type == MarshallingSupport.INTEGER_TYPE) {
+                return this.dataIn.readInt();
+            }
+            if (type == MarshallingSupport.SHORT_TYPE) {
+                return this.dataIn.readShort();
+            }
+            if (type == MarshallingSupport.BYTE_TYPE) {
+                return this.dataIn.readByte();
+            }
+            if (type == MarshallingSupport.STRING_TYPE) {
+                return Long.valueOf(this.dataIn.readUTF()).longValue();
+            }
+            if (type == MarshallingSupport.NULL) {
+                this.dataIn.reset();
+                throw new NullPointerException("Cannot convert NULL value to long.");
+            } else {
+                this.dataIn.reset();
+                throw new MessageFormatException(" not a long type");
+            }
+        } catch (NumberFormatException mfe) {
+            try {
+                this.dataIn.reset();
+            } catch (IOException ioe) {
+                throw JMSExceptionSupport.create(ioe);
+            }
+            throw mfe;
+
+        } catch (EOFException e) {
+            throw JMSExceptionSupport.createMessageEOFException(e);
+        } catch (IOException e) {
+            throw JMSExceptionSupport.createMessageFormatException(e);
+        }
+    }
+
+    /**
+     * Reads a <code>float</code> from the stream message.
+     * 
+     * @return a <code>float</code> value from the stream message
+     * @throws JMSException if the JMS provider fails to read the message due to
+     *                 some internal error.
+     * @throws MessageEOFException if unexpected end of message stream has been
+     *                 reached.
+     * @throws MessageFormatException if this type conversion is invalid.
+     * @throws MessageNotReadableException if the message is in write-only mode.
+     */
+
+    public float readFloat() throws JMSException {
+        initializeReading();
+        try {
+            this.dataIn.mark(33);
+            int type = this.dataIn.read();
+            if (type == -1) {
+                throw new MessageEOFException("reached end of data");
+            }
+            if (type == MarshallingSupport.FLOAT_TYPE) {
+                return this.dataIn.readFloat();
+            }
+            if (type == MarshallingSupport.STRING_TYPE) {
+                return Float.valueOf(this.dataIn.readUTF()).floatValue();
+            }
+            if (type == MarshallingSupport.NULL) {
+                this.dataIn.reset();
+                throw new NullPointerException("Cannot convert NULL value to float.");
+            } else {
+                this.dataIn.reset();
+                throw new MessageFormatException(" not a float type");
+            }
+        } catch (NumberFormatException mfe) {
+            try {
+                this.dataIn.reset();
+            } catch (IOException ioe) {
+                throw JMSExceptionSupport.create(ioe);
+            }
+            throw mfe;
+
+        } catch (EOFException e) {
+            throw JMSExceptionSupport.createMessageEOFException(e);
+        } catch (IOException e) {
+            throw JMSExceptionSupport.createMessageFormatException(e);
+        }
+    }
+
+    /**
+     * Reads a <code>double</code> from the stream message.
+     * 
+     * @return a <code>double</code> value from the stream message
+     * @throws JMSException if the JMS provider fails to read the message due to
+     *                 some internal error.
+     * @throws MessageEOFException if unexpected end of message stream has been
+     *                 reached.
+     * @throws MessageFormatException if this type conversion is invalid.
+     * @throws MessageNotReadableException if the message is in write-only mode.
+     */
+
+    public double readDouble() throws JMSException {
+        initializeReading();
+        try {
+
+            this.dataIn.mark(65);
+            int type = this.dataIn.read();
+            if (type == -1) {
+                throw new MessageEOFException("reached end of data");
+            }
+            if (type == MarshallingSupport.DOUBLE_TYPE) {
+                return this.dataIn.readDouble();
+            }
+            if (type == MarshallingSupport.FLOAT_TYPE) {
+                return this.dataIn.readFloat();
+            }
+            if (type == MarshallingSupport.STRING_TYPE) {
+                return Double.valueOf(this.dataIn.readUTF()).doubleValue();
+            }
+            if (type == MarshallingSupport.NULL) {
+                this.dataIn.reset();
+                throw new NullPointerException("Cannot convert NULL value to double.");
+            } else {
+                this.dataIn.reset();
+                throw new MessageFormatException(" not a double type");
+            }
+        } catch (NumberFormatException mfe) {
+            try {
+                this.dataIn.reset();
+            } catch (IOException ioe) {
+                throw JMSExceptionSupport.create(ioe);
+            }
+            throw mfe;
+
+        } catch (EOFException e) {
+            throw JMSExceptionSupport.createMessageEOFException(e);
+        } catch (IOException e) {
+            throw JMSExceptionSupport.createMessageFormatException(e);
+        }
+    }
+
+    /**
+     * Reads a <CODE>String</CODE> from the stream message.
+     * 
+     * @return a Unicode string from the stream message
+     * @throws JMSException if the JMS provider fails to read the message due to
+     *                 some internal error.
+     * @throws MessageEOFException if unexpected end of message stream has been
+     *                 reached.
+     * @throws MessageFormatException if this type conversion is invalid.
+     * @throws MessageNotReadableException if the message is in write-only mode.
+     */
+
+    public String readString() throws JMSException {
+        initializeReading();
+        try {
+
+            this.dataIn.mark(65);
+            int type = this.dataIn.read();
+            if (type == -1) {
+                throw new MessageEOFException("reached end of data");
+            }
+            if (type == MarshallingSupport.NULL) {
+                return null;
+            }
+            if (type == MarshallingSupport.BIG_STRING_TYPE) {
+                return MarshallingSupport.readUTF8(dataIn);
+            }
+            if (type == MarshallingSupport.STRING_TYPE) {
+                return this.dataIn.readUTF();
+            }
+            if (type == MarshallingSupport.LONG_TYPE) {
+                return new Long(this.dataIn.readLong()).toString();
+            }
+            if (type == MarshallingSupport.INTEGER_TYPE) {
+                return new Integer(this.dataIn.readInt()).toString();
+            }
+            if (type == MarshallingSupport.SHORT_TYPE) {
+                return new Short(this.dataIn.readShort()).toString();
+            }
+            if (type == MarshallingSupport.BYTE_TYPE) {
+                return new Byte(this.dataIn.readByte()).toString();
+            }
+            if (type == MarshallingSupport.FLOAT_TYPE) {
+                return new Float(this.dataIn.readFloat()).toString();
+            }
+            if (type == MarshallingSupport.DOUBLE_TYPE) {
+                return new Double(this.dataIn.readDouble()).toString();
+            }
+            if (type == MarshallingSupport.BOOLEAN_TYPE) {
+                return (this.dataIn.readBoolean() ? Boolean.TRUE : Boolean.FALSE).toString();
+            }
+            if (type == MarshallingSupport.CHAR_TYPE) {
+                return new Character(this.dataIn.readChar()).toString();
+            } else {
+                this.dataIn.reset();
+                throw new MessageFormatException(" not a String type");
+            }
+        } catch (NumberFormatException mfe) {
+            try {
+                this.dataIn.reset();
+            } catch (IOException ioe) {
+                throw JMSExceptionSupport.create(ioe);
+            }
+            throw mfe;
+
+        } catch (EOFException e) {
+            throw JMSExceptionSupport.createMessageEOFException(e);
+        } catch (IOException e) {
+            throw JMSExceptionSupport.createMessageFormatException(e);
+        }
+    }
+
+    /**
+     * Reads a byte array field from the stream message into the specified
+     * <CODE>byte[]</CODE> object (the read buffer). <p/>
+     * <P>
+     * To read the field value, <CODE>readBytes</CODE> should be successively
+     * called until it returns a value less than the length of the read buffer.
+     * The value of the bytes in the buffer following the last byte read is
+     * undefined. <p/>
+     * <P>
+     * If <CODE>readBytes</CODE> returns a value equal to the length of the
+     * buffer, a subsequent <CODE>readBytes</CODE> call must be made. If there
+     * are no more bytes to be read, this call returns -1. <p/>
+     * <P>
+     * If the byte array field value is null, <CODE>readBytes</CODE> returns
+     * -1. <p/>
+     * <P>
+     * If the byte array field value is empty, <CODE>readBytes</CODE> returns
+     * 0. <p/>
+     * <P>
+     * Once the first <CODE>readBytes</CODE> call on a <CODE>byte[]</CODE>
+     * field value has been made, the full value of the field must be read
+     * before it is valid to read the next field. An attempt to read the next
+     * field before that has been done will throw a
+     * <CODE>MessageFormatException</CODE>. <p/>
+     * <P>
+     * To read the byte field value into a new <CODE>byte[]</CODE> object, use
+     * the <CODE>readObject</CODE> method.
+     * 
+     * @param value the buffer into which the data is read
+     * @return the total number of bytes read into the buffer, or -1 if there is
+     *         no more data because the end of the byte field has been reached
+     * @throws JMSException if the JMS provider fails to read the message due to
+     *                 some internal error.
+     * @throws MessageEOFException if unexpected end of message stream has been
+     *                 reached.
+     * @throws MessageFormatException if this type conversion is invalid.
+     * @throws MessageNotReadableException if the message is in write-only mode.
+     * @see #readObject()
+     */
+
+    public int readBytes(byte[] value) throws JMSException {
+
+        initializeReading();
+        try {
+            if (value == null) {
+                throw new NullPointerException();
+            }
+
+            if (remainingBytes == -1) {
+                this.dataIn.mark(value.length + 1);
+                int type = this.dataIn.read();
+                if (type == -1) {
+                    throw new MessageEOFException("reached end of data");
+                }
+                if (type != MarshallingSupport.BYTE_ARRAY_TYPE) {
+                    throw new MessageFormatException("Not a byte array");
+                }
+                remainingBytes = this.dataIn.readInt();
+            } else if (remainingBytes == 0) {
+                remainingBytes = -1;
+                return -1;
+            }
+
+            if (value.length <= remainingBytes) {
+                // small buffer
+                remainingBytes -= value.length;
+                this.dataIn.readFully(value);
+                return value.length;
+            } else {
+                // big buffer
+                int rc = this.dataIn.read(value, 0, remainingBytes);
+                remainingBytes = 0;
+                return rc;
+            }
+
+        } catch (EOFException e) {
+            JMSException jmsEx = new MessageEOFException(e.getMessage());
+            jmsEx.setLinkedException(e);
+            throw jmsEx;
+        } catch (IOException e) {
+            JMSException jmsEx = new MessageFormatException(e.getMessage());
+            jmsEx.setLinkedException(e);
+            throw jmsEx;
+        }
+    }
+
+    /**
+     * Reads an object from the stream message. <p/>
+     * <P>
+     * This method can be used to return, in objectified format, an object in
+     * the Java programming language ("Java object") that has been written to
+     * the stream with the equivalent <CODE>writeObject</CODE> method call, or
+     * its equivalent primitive <CODE>write<I>type</I></CODE> method. <p/>
+     * <P>
+     * Note that byte values are returned as <CODE>byte[]</CODE>, not
+     * <CODE>Byte[]</CODE>. <p/>
+     * <P>
+     * An attempt to call <CODE>readObject</CODE> to read a byte field value
+     * into a new <CODE>byte[]</CODE> object before the full value of the byte
+     * field has been read will throw a <CODE>MessageFormatException</CODE>.
+     * 
+     * @return a Java object from the stream message, in objectified format (for
+     *         example, if the object was written as an <CODE>int</CODE>, an
+     *         <CODE>Integer</CODE> is returned)
+     * @throws JMSException if the JMS provider fails to read the message due to
+     *                 some internal error.
+     * @throws MessageEOFException if unexpected end of message stream has been
+     *                 reached.
+     * @throws MessageFormatException if this type conversion is invalid.
+     * @throws MessageNotReadableException if the message is in write-only mode.
+     * @see #readBytes(byte[] value)
+     */
+
+    public Object readObject() throws JMSException {
+        initializeReading();
+        try {
+            this.dataIn.mark(65);
+            int type = this.dataIn.read();
+            if (type == -1) {
+                throw new MessageEOFException("reached end of data");
+            }
+            if (type == MarshallingSupport.NULL) {
+                return null;
+            }
+            if (type == MarshallingSupport.BIG_STRING_TYPE) {
+                return MarshallingSupport.readUTF8(dataIn);
+            }
+            if (type == MarshallingSupport.STRING_TYPE) {
+                return this.dataIn.readUTF();
+            }
+            if (type == MarshallingSupport.LONG_TYPE) {
+                return Long.valueOf(this.dataIn.readLong());
+            }
+            if (type == MarshallingSupport.INTEGER_TYPE) {
+                return Integer.valueOf(this.dataIn.readInt());
+            }
+            if (type == MarshallingSupport.SHORT_TYPE) {
+                return Short.valueOf(this.dataIn.readShort());
+            }
+            if (type == MarshallingSupport.BYTE_TYPE) {
+                return Byte.valueOf(this.dataIn.readByte());
+            }
+            if (type == MarshallingSupport.FLOAT_TYPE) {
+                return new Float(this.dataIn.readFloat());
+            }
+            if (type == MarshallingSupport.DOUBLE_TYPE) {
+                return new Double(this.dataIn.readDouble());
+            }
+            if (type == MarshallingSupport.BOOLEAN_TYPE) {
+                return this.dataIn.readBoolean() ? Boolean.TRUE : Boolean.FALSE;
+            }
+            if (type == MarshallingSupport.CHAR_TYPE) {
+                return Character.valueOf(this.dataIn.readChar());
+            }
+            if (type == MarshallingSupport.BYTE_ARRAY_TYPE) {
+                int len = this.dataIn.readInt();
+                byte[] value = new byte[len];
+                this.dataIn.readFully(value);
+                return value;
+            } else {
+                this.dataIn.reset();
+                throw new MessageFormatException("unknown type");
+            }
+        } catch (NumberFormatException mfe) {
+            try {
+                this.dataIn.reset();
+            } catch (IOException ioe) {
+                throw JMSExceptionSupport.create(ioe);
+            }
+            throw mfe;
+
+        } catch (EOFException e) {
+            JMSException jmsEx = new MessageEOFException(e.getMessage());
+            jmsEx.setLinkedException(e);
+            throw jmsEx;
+        } catch (IOException e) {
+            JMSException jmsEx = new MessageFormatException(e.getMessage());
+            jmsEx.setLinkedException(e);
+            throw jmsEx;
+        }
+    }
+
+    /**
+     * Writes a <code>boolean</code> to the stream message. The value
+     * <code>true</code> is written as the value <code>(byte)1</code>; the
+     * value <code>false</code> is written as the value <code>(byte)0</code>.
+     * 
+     * @param value the <code>boolean</code> value to be written
+     * @throws JMSException if the JMS provider fails to write the message due
+     *                 to some internal error.
+     * @throws MessageNotWriteableException if the message is in read-only mode.
+     */
+
+    public void writeBoolean(boolean value) throws JMSException {
+        initializeWriting();
+        try {
+            MarshallingSupport.marshalBoolean(dataOut, value);
+        } catch (IOException ioe) {
+            throw JMSExceptionSupport.create(ioe);
+        }
+    }
+
+    /**
+     * Writes a <code>byte</code> to the stream message.
+     * 
+     * @param value the <code>byte</code> value to be written
+     * @throws JMSException if the JMS provider fails to write the message due
+     *                 to some internal error.
+     * @throws MessageNotWriteableException if the message is in read-only mode.
+     */
+
+    public void writeByte(byte value) throws JMSException {
+        initializeWriting();
+        try {
+            MarshallingSupport.marshalByte(dataOut, value);
+        } catch (IOException ioe) {
+            throw JMSExceptionSupport.create(ioe);
+        }
+    }
+
+    /**
+     * Writes a <code>short</code> to the stream message.
+     * 
+     * @param value the <code>short</code> value to be written
+     * @throws JMSException if the JMS provider fails to write the message due
+     *                 to some internal error.
+     * @throws MessageNotWriteableException if the message is in read-only mode.
+     */
+
+    public void writeShort(short value) throws JMSException {
+        initializeWriting();
+        try {
+            MarshallingSupport.marshalShort(dataOut, value);
+        } catch (IOException ioe) {
+            throw JMSExceptionSupport.create(ioe);
+        }
+    }
+
+    /**
+     * Writes a <code>char</code> to the stream message.
+     * 
+     * @param value the <code>char</code> value to be written
+     * @throws JMSException if the JMS provider fails to write the message due
+     *                 to some internal error.
+     * @throws MessageNotWriteableException if the message is in read-only mode.
+     */
+
+    public void writeChar(char value) throws JMSException {
+        initializeWriting();
+        try {
+            MarshallingSupport.marshalChar(dataOut, value);
+        } catch (IOException ioe) {
+            throw JMSExceptionSupport.create(ioe);
+        }
+    }
+
+    /**
+     * Writes an <code>int</code> to the stream message.
+     * 
+     * @param value the <code>int</code> value to be written
+     * @throws JMSException if the JMS provider fails to write the message due
+     *                 to some internal error.
+     * @throws MessageNotWriteableException if the message is in read-only mode.
+     */
+
+    public void writeInt(int value) throws JMSException {
+        initializeWriting();
+        try {
+            MarshallingSupport.marshalInt(dataOut, value);
+        } catch (IOException ioe) {
+            throw JMSExceptionSupport.create(ioe);
+        }
+    }
+
+    /**
+     * Writes a <code>long</code> to the stream message.
+     * 
+     * @param value the <code>long</code> value to be written
+     * @throws JMSException if the JMS provider fails to write the message due
+     *                 to some internal error.
+     * @throws MessageNotWriteableException if the message is in read-only mode.
+     */
+
+    public void writeLong(long value) throws JMSException {
+        initializeWriting();
+        try {
+            MarshallingSupport.marshalLong(dataOut, value);
+        } catch (IOException ioe) {
+            throw JMSExceptionSupport.create(ioe);
+        }
+    }
+
+    /**
+     * Writes a <code>float</code> to the stream message.
+     * 
+     * @param value the <code>float</code> value to be written
+     * @throws JMSException if the JMS provider fails to write the message due
+     *                 to some internal error.
+     * @throws MessageNotWriteableException if the message is in read-only mode.
+     */
+
+    public void writeFloat(float value) throws JMSException {
+        initializeWriting();
+        try {
+            MarshallingSupport.marshalFloat(dataOut, value);
+        } catch (IOException ioe) {
+            throw JMSExceptionSupport.create(ioe);
+        }
+    }
+
+    /**
+     * Writes a <code>double</code> to the stream message.
+     * 
+     * @param value the <code>double</code> value to be written
+     * @throws JMSException if the JMS provider fails to write the message due
+     *                 to some internal error.
+     * @throws MessageNotWriteableException if the message is in read-only mode.
+     */
+
+    public void writeDouble(double value) throws JMSException {
+        initializeWriting();
+        try {
+            MarshallingSupport.marshalDouble(dataOut, value);
+        } catch (IOException ioe) {
+            throw JMSExceptionSupport.create(ioe);
+        }
+    }
+
+    /**
+     * Writes a <code>String</code> to the stream message.
+     * 
+     * @param value the <code>String</code> value to be written
+     * @throws JMSException if the JMS provider fails to write the message due
+     *                 to some internal error.
+     * @throws MessageNotWriteableException if the message is in read-only mode.
+     */
+
+    public void writeString(String value) throws JMSException {
+        initializeWriting();
+        try {
+            if (value == null) {
+                MarshallingSupport.marshalNull(dataOut);
+            } else {
+                MarshallingSupport.marshalString(dataOut, value);
+            }
+        } catch (IOException ioe) {
+            throw JMSExceptionSupport.create(ioe);
+        }
+    }
+
+    /**
+     * Writes a byte array field to the stream message. <p/>
+     * <P>
+     * The byte array <code>value</code> is written to the message as a byte
+     * array field. Consecutively written byte array fields are treated as two
+     * distinct fields when the fields are read.
+     * 
+     * @param value the byte array value to be written
+     * @throws JMSException if the JMS provider fails to write the message due
+     *                 to some internal error.
+     * @throws MessageNotWriteableException if the message is in read-only mode.
+     */
+
+    public void writeBytes(byte[] value) throws JMSException {
+        writeBytes(value, 0, value.length);
+    }
+
+    /**
+     * Writes a portion of a byte array as a byte array field to the stream
+     * message. <p/>
+     * <P>
+     * The a portion of the byte array <code>value</code> is written to the
+     * message as a byte array field. Consecutively written byte array fields
+     * are treated as two distinct fields when the fields are read.
+     * 
+     * @param value the byte array value to be written
+     * @param offset the initial offset within the byte array
+     * @param length the number of bytes to use
+     * @throws JMSException if the JMS provider fails to write the message due
+     *                 to some internal error.
+     * @throws MessageNotWriteableException if the message is in read-only mode.
+     */
+
+    public void writeBytes(byte[] value, int offset, int length) throws JMSException {
+        initializeWriting();
+        try {
+            MarshallingSupport.marshalByteArray(dataOut, value, offset, length);
+        } catch (IOException ioe) {
+            throw JMSExceptionSupport.create(ioe);
+        }
+    }
+
+    /**
+     * Writes an object to the stream message. <p/>
+     * <P>
+     * This method works only for the objectified primitive object types (<code>Integer</code>,
+     * <code>Double</code>, <code>Long</code>&nbsp;...),
+     * <code>String</code> objects, and byte arrays.
+     * 
+     * @param value the Java object to be written
+     * @throws JMSException if the JMS provider fails to write the message due
+     *                 to some internal error.
+     * @throws MessageFormatException if the object is invalid.
+     * @throws MessageNotWriteableException if the message is in read-only mode.
+     */
+
+    public void writeObject(Object value) throws JMSException {
+        initializeWriting();
+        if (value == null) {
+            try {
+                MarshallingSupport.marshalNull(dataOut);
+            } catch (IOException ioe) {
+                throw JMSExceptionSupport.create(ioe);
+            }
+        } else if (value instanceof String) {
+            writeString(value.toString());
+        } else if (value instanceof Character) {
+            writeChar(((Character)value).charValue());
+        } else if (value instanceof Boolean) {
+            writeBoolean(((Boolean)value).booleanValue());
+        } else if (value instanceof Byte) {
+            writeByte(((Byte)value).byteValue());
+        } else if (value instanceof Short) {
+            writeShort(((Short)value).shortValue());
+        } else if (value instanceof Integer) {
+            writeInt(((Integer)value).intValue());
+        } else if (value instanceof Float) {
+            writeFloat(((Float)value).floatValue());
+        } else if (value instanceof Double) {
+            writeDouble(((Double)value).doubleValue());
+        } else if (value instanceof byte[]) {
+            writeBytes((byte[])value);
+        }else if (value instanceof Long) {
+            writeLong(((Long)value).longValue());
+        }else {
+            throw new MessageFormatException("Unsupported Object type: " + value.getClass());
+        }
+    }
+
+    /**
+     * Puts the message body in read-only mode and repositions the stream of
+     * bytes to the beginning.
+     * 
+     * @throws JMSException if an internal error occurs
+     */
+
+    public void reset() throws JMSException {
+        storeContent();
+        this.bytesOut = null;
+        this.dataIn = null;
+        this.dataOut = null;
+        this.remainingBytes = -1;
+        setReadOnlyBody(true);
+    }
+
+    private void initializeWriting() throws MessageNotWriteableException {
+        checkReadOnlyBody();
+        if (this.dataOut == null) {
+            this.bytesOut = new ByteArrayOutputStream();
+            OutputStream os = bytesOut;
+            IConnection connection = getConnection();
+            if (connection != null && connection.isUseCompression()) {
+                compressed = true;
+                os = new DeflaterOutputStream(os);
+            }
+            this.dataOut = new DataOutputStream(os);
+        }
+    }
+
+    protected void checkWriteOnlyBody() throws MessageNotReadableException {
+        if (!readOnlyBody) {
+            throw new MessageNotReadableException("Message body is write-only");
+        }
+    }
+
+    private void initializeReading() throws MessageNotReadableException {
+        checkWriteOnlyBody();
+        if (this.dataIn == null) {
+            ByteSequence data = getContent();
+            if (data == null) {
+                data = new ByteSequence(new byte[] {}, 0, 0);
+            }
+            InputStream is = new ByteArrayInputStream(data);
+            if (isCompressed()) {
+                is = new InflaterInputStream(is);
+                is = new BufferedInputStream(is);
+            }
+            this.dataIn = new DataInputStream(is);
+        }
+    }
+
+    public String toString() {
+        return super.toString() + " ActiveMQStreamMessage{ " + "bytesOut = " + bytesOut + ", dataOut = " + dataOut + ", dataIn = " + dataIn + " }";
+    }
+}

Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTempDestination.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTempDestination.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTempDestination.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTempDestination.java Mon Jun  1 18:37:41 2009
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.command;
+
+import javax.jms.JMSException;
+import org.apache.activemq.IConnection;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * @openwire:marshaller
+ * @version $Revision: 1.5 $
+ */
+public abstract class ActiveMQTempDestination extends ActiveMQDestination {
+
+    private static final Log LOG = LogFactory.getLog(ActiveMQTempDestination.class);
+    protected transient IConnection connection;
+    protected transient String connectionId;
+    protected transient int sequenceId;
+
+    public ActiveMQTempDestination() {
+    }
+
+    public ActiveMQTempDestination(String name) {
+        super(name);
+    }
+
+    public ActiveMQTempDestination(String connectionId, long sequenceId) {
+        super(connectionId + ":" + sequenceId);
+    }
+
+    public boolean isTemporary() {
+        return true;
+    }
+
+    public void delete() throws JMSException {
+        connection.deleteTempDestination(this);
+    }
+
+    public IConnection getConnection() {
+        return connection;
+    }
+
+    public void setConnection(IConnection connection) {
+        this.connection = connection;
+    }
+
+    public void setPhysicalName(String physicalName) {
+        super.setPhysicalName(physicalName);
+        if (!isComposite()) {
+            // Parse off the sequenceId off the end.
+            // this can fail if the temp destination is
+            // generated by another JMS system via the JMS<->JMS Bridge
+            int p = this.physicalName.lastIndexOf(":");
+            if (p >= 0) {
+                String seqStr = this.physicalName.substring(p + 1).trim();
+                if (seqStr != null && seqStr.length() > 0) {
+                    try {
+                        sequenceId = Integer.parseInt(seqStr);
+                    } catch (NumberFormatException e) {
+                        LOG.debug("Did not parse sequence Id from " + physicalName);
+                    }
+                    // The rest should be the connection id.
+                    connectionId = this.physicalName.substring(0, p);
+                }
+            }
+        }
+    }
+
+    public String getConnectionId() {
+        return connectionId;
+    }
+
+    public void setConnectionId(String connectionId) {
+        this.connectionId = connectionId;
+    }
+
+    public int getSequenceId() {
+        return sequenceId;
+    }
+}

Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTempDestination.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTempQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTempQueue.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTempQueue.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTempQueue.java Mon Jun  1 18:37:41 2009
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.command;
+
+import javax.jms.JMSException;
+import javax.jms.TemporaryQueue;
+
+/**
+ * @openwire:marshaller code="102"
+ * @version $Revision: 1.6 $
+ */
+public class ActiveMQTempQueue extends ActiveMQTempDestination implements TemporaryQueue {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_TEMP_QUEUE;
+    private static final long serialVersionUID = 6683049467527633867L;
+
+    public ActiveMQTempQueue() {
+    }
+
+    public ActiveMQTempQueue(String name) {
+        super(name);
+    }
+
+    public ActiveMQTempQueue(ConnectionId connectionId, long sequenceId) {
+        super(connectionId.getValue(), sequenceId);
+    }
+
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    public boolean isQueue() {
+        return true;
+    }
+
+    public String getQueueName() throws JMSException {
+        return getPhysicalName();
+    }
+
+    public byte getDestinationType() {
+        return TEMP_QUEUE_TYPE;
+    }
+
+    protected String getQualifiedPrefix() {
+        return TEMP_QUEUE_QUALIFED_PREFIX;
+    }
+
+}

Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTempQueue.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTempTopic.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTempTopic.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTempTopic.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTempTopic.java Mon Jun  1 18:37:41 2009
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.command;
+
+import javax.jms.JMSException;
+import javax.jms.TemporaryTopic;
+
+/**
+ * @openwire:marshaller code="103"
+ * @version $Revision: 1.6 $
+ */
+public class ActiveMQTempTopic extends ActiveMQTempDestination implements TemporaryTopic {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_TEMP_TOPIC;
+    private static final long serialVersionUID = -4325596784597300253L;
+
+    public ActiveMQTempTopic() {
+    }
+
+    public ActiveMQTempTopic(String name) {
+        super(name);
+    }
+
+    public ActiveMQTempTopic(ConnectionId connectionId, long sequenceId) {
+        super(connectionId.getValue(), sequenceId);
+    }
+
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    public boolean isTopic() {
+        return true;
+    }
+
+    public String getTopicName() throws JMSException {
+        return getPhysicalName();
+    }
+
+    public byte getDestinationType() {
+        return TEMP_TOPIC_TYPE;
+    }
+
+    protected String getQualifiedPrefix() {
+        return TEMP_TOPIC_QUALIFED_PREFIX;
+    }
+}

Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTempTopic.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java Mon Jun  1 18:37:41 2009
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.command;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.zip.DeflaterOutputStream;
+import java.util.zip.InflaterInputStream;
+
+import javax.jms.JMSException;
+import javax.jms.MessageNotWriteableException;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.IConnection;
+import org.apache.activemq.util.ByteArrayInputStream;
+import org.apache.activemq.util.ByteArrayOutputStream;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.JMSExceptionSupport;
+import org.apache.activemq.util.MarshallingSupport;
+import org.apache.activemq.wireformat.WireFormat;
+
+/**
+ * @openwire:marshaller code="28"
+ * @version $Revision$
+ */
+public class ActiveMQTextMessage extends ActiveMQMessage implements TextMessage {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_TEXT_MESSAGE;
+
+    protected String text;
+
+    public Message copy() {
+        ActiveMQTextMessage copy = new ActiveMQTextMessage();
+        copy(copy);
+        return copy;
+    }
+
+    private void copy(ActiveMQTextMessage copy) {
+        super.copy(copy);
+        copy.text = text;
+    }
+
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    public String getJMSXMimeType() {
+        return "jms/text-message";
+    }
+
+    public void setText(String text) throws MessageNotWriteableException {
+        checkReadOnlyBody();
+        this.text = text;
+        setContent(null);
+    }
+
+    public String getText() throws JMSException {
+        if (text == null && getContent() != null) {
+            InputStream is = null;
+            try {
+                ByteSequence bodyAsBytes = getContent();
+                if (bodyAsBytes != null) {
+                    is = new ByteArrayInputStream(bodyAsBytes);
+                    if (isCompressed()) {
+                        is = new InflaterInputStream(is);
+                    }
+                    DataInputStream dataIn = new DataInputStream(is);
+                    text = MarshallingSupport.readUTF8(dataIn);
+                    dataIn.close();
+                    setContent(null);
+                }
+            } catch (IOException ioe) {
+                throw JMSExceptionSupport.create(ioe);
+            } finally {
+                if (is != null) {
+                    try {
+                        is.close();
+                    } catch (IOException e) {
+                        // ignore
+                    }
+                }
+            }
+        }
+        return text;
+    }
+
+    public void beforeMarshall(WireFormat wireFormat) throws IOException {
+        super.beforeMarshall(wireFormat);
+
+        ByteSequence content = getContent();
+        if (content == null && text != null) {
+            ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+            OutputStream os = bytesOut;
+            IConnection connection = getConnection();
+            if (connection != null && connection.isUseCompression()) {
+                compressed = true;
+                os = new DeflaterOutputStream(os);
+            }
+            DataOutputStream dataOut = new DataOutputStream(os);
+            MarshallingSupport.writeUTF8(dataOut, this.text);
+            dataOut.close();
+            setContent(bytesOut.toByteSequence());
+            //see https://issues.apache.org/activemq/browse/AMQ-2103
+            this.text=null;
+        }
+    }
+
+    /**
+     * Clears out the message body. Clearing a message's body does not clear its
+     * header values or property entries. <p/>
+     * <P>
+     * If this message body was read-only, calling this method leaves the
+     * message body in the same state as an empty body in a newly created
+     * message.
+     * 
+     * @throws JMSException if the JMS provider fails to clear the message body
+     *                 due to some internal error.
+     */
+    public void clearBody() throws JMSException {
+        super.clearBody();
+        this.text = null;
+    }
+
+    public int getSize() {
+        if (size == 0 && content == null && text != null) {
+            size = getMinimumMessageSize();
+            if (marshalledProperties != null) {
+                size += marshalledProperties.getLength();
+            }
+            size = text.length() * 2;
+        }
+        return super.getSize();
+    }
+    
+    public String toString() {
+        try {
+            String text = getText();
+        	if (text != null && text.length() > 63) {
+        		text = text.substring(0, 45) + "..." + text.substring(text.length() - 12);
+        		HashMap<String, Object> overrideFields = new HashMap<String, Object>();
+        		overrideFields.put("text", text);
+        		return super.toString(overrideFields);
+        	}
+        } catch (JMSException e) {
+        }
+        return super.toString();
+    }
+}

Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTopic.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTopic.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTopic.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTopic.java Mon Jun  1 18:37:41 2009
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.command;
+
+import javax.jms.JMSException;
+import javax.jms.Topic;
+
+/**
+ * @org.apache.xbean.XBean element="topic" description="An ActiveMQ Topic
+ *                         Destination"
+ * @openwire:marshaller code="101"
+ * @version $Revision: 1.5 $
+ */
+public class ActiveMQTopic extends ActiveMQDestination implements Topic {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_TOPIC;
+    private static final long serialVersionUID = 7300307405896488588L;
+
+    public ActiveMQTopic() {
+    }
+
+    public ActiveMQTopic(String name) {
+        super(name);
+    }
+
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    public boolean isTopic() {
+        return true;
+    }
+
+    public String getTopicName() throws JMSException {
+        return getPhysicalName();
+    }
+
+    public byte getDestinationType() {
+        return TOPIC_TYPE;
+    }
+
+    protected String getQualifiedPrefix() {
+        return TOPIC_QUALIFIED_PREFIX;
+    }
+
+}

Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTopic.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/BaseCommand.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/BaseCommand.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/BaseCommand.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/BaseCommand.java Mon Jun  1 18:37:41 2009
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.command;
+
+import java.util.Map;
+
+import org.apache.activemq.util.IntrospectionSupport;
+
+
+/**
+ * 
+ * @openwire:marshaller
+ * @version $Revision: 1.11 $
+ */
+public abstract class BaseCommand implements Command {
+
+    protected int commandId;
+    protected boolean responseRequired;
+    
+    private transient Endpoint from;
+    private transient Endpoint to;
+    
+    public void copy(BaseCommand copy) {
+        copy.commandId = commandId;
+        copy.responseRequired = responseRequired;
+    }    
+
+    /**
+     * @openwire:property version=1
+     */
+    public int getCommandId() {
+        return commandId;
+    }
+
+    public void setCommandId(int commandId) {
+        this.commandId = commandId;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public boolean isResponseRequired() {
+        return responseRequired;
+    }
+
+    public void setResponseRequired(boolean responseRequired) {
+        this.responseRequired = responseRequired;
+    }
+
+    public String toString() {
+        return toString(null);
+    }
+    
+    public String toString(Map<String, Object>overrideFields) {
+    	return IntrospectionSupport.toString(this, BaseCommand.class, overrideFields);
+    }
+    
+    public boolean isWireFormatInfo() {
+        return false;
+    }
+
+    public boolean isBrokerInfo() {
+        return false;
+    }
+
+    public boolean isResponse() {
+        return false;
+    }
+
+    public boolean isMessageDispatch() {
+        return false;
+    }
+
+    public boolean isMessage() {
+        return false;
+    }
+
+    public boolean isMarshallAware() {
+        return false;
+    }
+
+    public boolean isMessageAck() {
+        return false;
+    }
+
+    public boolean isMessageDispatchNotification() {
+        return false;
+    }
+
+    public boolean isShutdownInfo() {
+        return false;
+    }
+
+    /**
+     * The endpoint within the transport where this message came from.
+     */
+    public Endpoint getFrom() {
+        return from;
+    }
+
+    public void setFrom(Endpoint from) {
+        this.from = from;
+    }
+
+    /**
+     * The endpoint within the transport where this message is going to - null means all endpoints.
+     */
+    public Endpoint getTo() {
+        return to;
+    }
+
+    public void setTo(Endpoint to) {
+        this.to = to;
+    }
+    
+    
+}

Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/BaseCommand.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/BaseEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/BaseEndpoint.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/BaseEndpoint.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/BaseEndpoint.java Mon Jun  1 18:37:41 2009
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.command;
+
+/**
+ * A default endpoint.
+ * 
+ * @version $Revision: 564679 $
+ */
+public class BaseEndpoint implements Endpoint {
+
+    private String name;
+    private BrokerInfo brokerInfo;
+
+    public BaseEndpoint(String name) {
+        this.name = name;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public String toString() {
+        String brokerText = "";
+        BrokerId brokerId = getBrokerId();
+        if (brokerId != null) {
+            brokerText = " broker: " + brokerId;
+        }
+        return "Endpoint[name:" + name + brokerText + "]";
+    }
+
+    /**
+     * Returns the broker ID for this endpoint, if the endpoint is a broker or
+     * null
+     */
+    public BrokerId getBrokerId() {
+        if (brokerInfo != null) {
+            return brokerInfo.getBrokerId();
+        }
+        return null;
+    }
+
+    /**
+     * Returns the broker information for this endpoint, if the endpoint is a
+     * broker or null
+     */
+    public BrokerInfo getBrokerInfo() {
+        return brokerInfo;
+    }
+
+    public void setBrokerInfo(BrokerInfo brokerInfo) {
+        this.brokerInfo = brokerInfo;
+    }
+
+}

Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/BrokerId.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/BrokerId.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/BrokerId.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/BrokerId.java Mon Jun  1 18:37:41 2009
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.command;
+
+/**
+ * @openwire:marshaller code="124"
+ * @version $Revision$
+ */
+public class BrokerId implements DataStructure {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.BROKER_ID;
+    protected String value;
+
+    public BrokerId() {
+    }
+
+    public BrokerId(String brokerId) {
+        this.value = brokerId;
+    }
+
+    public int hashCode() {
+        return value.hashCode();
+    }
+
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || o.getClass() != BrokerId.class) {
+            return false;
+        }
+        BrokerId id = (BrokerId)o;
+        return value.equals(id.value);
+    }
+
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    public String toString() {
+        return value;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public String getValue() {
+        return value;
+    }
+
+    public void setValue(String brokerId) {
+        this.value = brokerId;
+    }
+
+    public boolean isMarshallAware() {
+        return false;
+    }
+}

Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/BrokerId.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/BrokerInfo.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/BrokerInfo.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/BrokerInfo.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/BrokerInfo.java Mon Jun  1 18:37:41 2009
@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.command;
+
+import org.apache.activemq.state.CommandVisitor;
+
+/**
+ * When a client connects to a broker, the broker send the client a BrokerInfo
+ * so that the client knows which broker node he's talking to and also any peers
+ * that the node has in his cluster. This is the broker helping the client out
+ * in discovering other nodes in the cluster.
+ * 
+ * @openwire:marshaller code="2"
+ * @version $Revision: 1.7 $
+ */
+public class BrokerInfo extends BaseCommand {
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.BROKER_INFO;
+    BrokerId brokerId;
+    String brokerURL;
+    boolean slaveBroker;
+    boolean masterBroker;
+    boolean faultTolerantConfiguration;
+    boolean networkConnection;
+    boolean duplexConnection;
+    BrokerInfo peerBrokerInfos[];
+    String brokerName;
+    long connectionId;
+    String brokerUploadUrl;
+    String networkProperties;
+
+    public boolean isBrokerInfo() {
+        return true;
+    }
+
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    /**
+     * @openwire:property version=1 cache=true
+     */
+    public BrokerId getBrokerId() {
+        return brokerId;
+    }
+
+    public void setBrokerId(BrokerId brokerId) {
+        this.brokerId = brokerId;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public String getBrokerURL() {
+        return brokerURL;
+    }
+
+    public void setBrokerURL(String brokerURL) {
+        this.brokerURL = brokerURL;
+    }
+
+    /**
+     * @openwire:property version=1 testSize=0
+     */
+    public BrokerInfo[] getPeerBrokerInfos() {
+        return peerBrokerInfos;
+    }
+
+    public void setPeerBrokerInfos(BrokerInfo[] peerBrokerInfos) {
+        this.peerBrokerInfos = peerBrokerInfos;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public String getBrokerName() {
+        return brokerName;
+    }
+
+    public void setBrokerName(String brokerName) {
+        this.brokerName = brokerName;
+    }
+
+    public Response visit(CommandVisitor visitor) throws Exception {
+        return visitor.processBrokerInfo(this);
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public boolean isSlaveBroker() {
+        return slaveBroker;
+    }
+
+    public void setSlaveBroker(boolean slaveBroker) {
+        this.slaveBroker = slaveBroker;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public boolean isMasterBroker() {
+        return masterBroker;
+    }
+
+    /**
+     * @param masterBroker The masterBroker to set.
+     */
+    public void setMasterBroker(boolean masterBroker) {
+        this.masterBroker = masterBroker;
+    }
+
+    /**
+     * @openwire:property version=1
+     * @return Returns the faultTolerantConfiguration.
+     */
+    public boolean isFaultTolerantConfiguration() {
+        return faultTolerantConfiguration;
+    }
+
+    /**
+     * @param faultTolerantConfiguration The faultTolerantConfiguration to set.
+     */
+    public void setFaultTolerantConfiguration(boolean faultTolerantConfiguration) {
+        this.faultTolerantConfiguration = faultTolerantConfiguration;
+    }
+
+    /**
+     * @openwire:property version=2
+     * @return the duplexConnection
+     */
+    public boolean isDuplexConnection() {
+        return this.duplexConnection;
+    }
+
+    /**
+     * @param duplexConnection the duplexConnection to set
+     */
+    public void setDuplexConnection(boolean duplexConnection) {
+        this.duplexConnection = duplexConnection;
+    }
+
+    /**
+     * @openwire:property version=2
+     * @return the networkConnection
+     */
+    public boolean isNetworkConnection() {
+        return this.networkConnection;
+    }
+
+    /**
+     * @param networkConnection the networkConnection to set
+     */
+    public void setNetworkConnection(boolean networkConnection) {
+        this.networkConnection = networkConnection;
+    }
+
+    /**
+     * The broker assigns a each connection it accepts a connection id.
+     * 
+     * @openwire:property version=2
+     */
+    public long getConnectionId() {
+        return connectionId;
+    }
+
+    public void setConnectionId(long connectionId) {
+        this.connectionId = connectionId;
+    }
+
+    /**
+     * The URL to use when uploading BLOBs to the broker or some other external
+     * file/http server
+     * 
+     * @openwire:property version=3
+     */
+    public String getBrokerUploadUrl() {
+        return brokerUploadUrl;
+    }
+
+    public void setBrokerUploadUrl(String brokerUploadUrl) {
+        this.brokerUploadUrl = brokerUploadUrl;
+    }
+
+    /**
+     * @openwire:property version=3 cache=false
+     * @return the networkProperties
+     */
+    public String getNetworkProperties() {
+        return this.networkProperties;
+    }
+
+    /**
+     * @param networkProperties the networkProperties to set
+     */
+    public void setNetworkProperties(String networkProperties) {
+        this.networkProperties = networkProperties;
+    }
+}

Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/BrokerInfo.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/Command.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/Command.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/Command.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/Command.java Mon Jun  1 18:37:41 2009
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.command;
+
+import org.apache.activemq.state.CommandVisitor;
+
+/**
+ * The Command Pattern so that we can send and receive commands on the different
+ * transports
+ * 
+ * @version $Revision: 1.7 $
+ */
+public interface Command extends DataStructure {
+
+    void setCommandId(int value);
+
+    /**
+     * @return the unique ID of this request used to map responses to requests
+     */
+    int getCommandId();
+
+    void setResponseRequired(boolean responseRequired);
+
+    boolean isResponseRequired();
+
+    boolean isResponse();
+
+    boolean isMessageDispatch();
+
+    boolean isBrokerInfo();
+
+    boolean isWireFormatInfo();
+
+    boolean isMessage();
+
+    boolean isMessageAck();
+
+    boolean isMessageDispatchNotification();
+
+    boolean isShutdownInfo();
+
+    Response visit(CommandVisitor visitor) throws Exception;
+
+    /**
+     * The endpoint within the transport where this message came from which
+     * could be null if the transport only supports a single endpoint.
+     */
+    Endpoint getFrom();
+
+    void setFrom(Endpoint from);
+
+    /**
+     * The endpoint within the transport where this message is going to - null
+     * means all endpoints.
+     */
+    Endpoint getTo();
+
+    void setTo(Endpoint to);
+}

Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/Command.java
------------------------------------------------------------------------------
    svn:executable = *