You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/06/01 20:37:54 UTC
svn commit: r780773 [4/31] - in /activemq/sandbox/activemq-flow:
activemq-client/ activemq-client/src/main/java/org/
activemq-client/src/main/java/org/apache/
activemq-client/src/main/java/org/apache/activemq/
activemq-client/src/main/java/org/apache/a...
Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java Mon Jun 1 18:37:41 2009
@@ -0,0 +1,1152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.command;
+
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.zip.DeflaterOutputStream;
+import java.util.zip.InflaterInputStream;
+
+import javax.jms.JMSException;
+import javax.jms.MessageEOFException;
+import javax.jms.MessageFormatException;
+import javax.jms.MessageNotReadableException;
+import javax.jms.MessageNotWriteableException;
+import javax.jms.StreamMessage;
+
+import org.apache.activemq.IConnection;
+import org.apache.activemq.util.ByteArrayInputStream;
+import org.apache.activemq.util.ByteArrayOutputStream;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.JMSExceptionSupport;
+import org.apache.activemq.util.MarshallingSupport;
+
+/**
+ * A <CODE>StreamMessage</CODE> object is used to send a stream of primitive
+ * types in the Java programming language. It is filled and read sequentially.
+ * It inherits from the <CODE>Message</CODE> interface and adds a stream
+ * message body. Its methods are based largely on those found in
+ * <CODE>java.io.DataInputStream</CODE> and
+ * <CODE>java.io.DataOutputStream</CODE>. <p/>
+ * <P>
+ * The primitive types can be read or written explicitly using methods for each
+ * type. They may also be read or written generically as objects. For instance,
+ * a call to <CODE>StreamMessage.writeInt(6)</CODE> is equivalent to
+ * <CODE>StreamMessage.writeObject(new
+ * Integer(6))</CODE>. Both forms are
+ * provided, because the explicit form is convenient for static programming, and
+ * the object form is needed when types are not known at compile time. <p/>
+ * <P>
+ * When the message is first created, and when <CODE>clearBody</CODE> is
+ * called, the body of the message is in write-only mode. After the first call
+ * to <CODE>reset</CODE> has been made, the message body is in read-only mode.
+ * After a message has been sent, the client that sent it can retain and modify
+ * it without affecting the message that has been sent. The same message object
+ * can be sent multiple times. When a message has been received, the provider
+ * has called <CODE>reset</CODE> so that the message body is in read-only mode
+ * for the client. <p/>
+ * <P>
+ * If <CODE>clearBody</CODE> is called on a message in read-only mode, the
+ * message body is cleared and the message body is in write-only mode. <p/>
+ * <P>
+ * If a client attempts to read a message in write-only mode, a
+ * <CODE>MessageNotReadableException</CODE> is thrown. <p/>
+ * <P>
+ * If a client attempts to write a message in read-only mode, a
+ * <CODE>MessageNotWriteableException</CODE> is thrown. <p/>
+ * <P>
+ * <CODE>StreamMessage</CODE> objects support the following conversion table.
+ * The marked cases must be supported. The unmarked cases must throw a
+ * <CODE>JMSException</CODE>. The <CODE>String</CODE>-to-primitive
+ * conversions may throw a runtime exception if the primitive's
+ * <CODE>valueOf()</CODE> method does not accept it as a valid
+ * <CODE>String</CODE> representation of the primitive. <p/>
+ * <P>
+ * A value written as the row type can be read as the column type. <p/>
+ *
+ * <PRE>
+ * | | boolean byte short char int long float double String byte[]
+ * |----------------------------------------------------------------------
+ * |boolean | X X |byte | X X X X X |short | X X X X |char | X X |int | X X X
+ * |long | X X |float | X X X |double | X X |String | X X X X X X X X |byte[] |
+ * X |----------------------------------------------------------------------
+ *
+ * </PRE>
+ *
+ * <p/>
+ * <P>
+ * Attempting to read a null value as a primitive type must be treated as
+ * calling the primitive's corresponding <code>valueOf(String)</code>
+ * conversion method with a null value. Since <code>char</code> does not
+ * support a <code>String</code> conversion, attempting to read a null value
+ * as a <code>char</code> must throw a <code>NullPointerException</code>.
+ *
+ * @openwire:marshaller code="27"
+ * @see javax.jms.Session#createStreamMessage()
+ * @see javax.jms.BytesMessage
+ * @see javax.jms.MapMessage
+ * @see javax.jms.Message
+ * @see javax.jms.ObjectMessage
+ * @see javax.jms.TextMessage
+ */
+public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMessage {
+
+ public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_STREAM_MESSAGE;
+
+ protected transient DataOutputStream dataOut;
+ protected transient ByteArrayOutputStream bytesOut;
+ protected transient DataInputStream dataIn;
+ protected transient int remainingBytes = -1;
+
+ public Message copy() {
+ ActiveMQStreamMessage copy = new ActiveMQStreamMessage();
+ copy(copy);
+ return copy;
+ }
+
+ private void copy(ActiveMQStreamMessage copy) {
+ storeContent();
+ super.copy(copy);
+ copy.dataOut = null;
+ copy.bytesOut = null;
+ copy.dataIn = null;
+ }
+
+ public void onSend() throws JMSException {
+ super.onSend();
+ storeContent();
+ }
+
+ private void storeContent() {
+ if (dataOut != null) {
+ try {
+ dataOut.close();
+ setContent(bytesOut.toByteSequence());
+ bytesOut = null;
+ dataOut = null;
+ } catch (IOException ioe) {
+ throw new RuntimeException(ioe);
+ }
+ }
+ }
+
+ public byte getDataStructureType() {
+ return DATA_STRUCTURE_TYPE;
+ }
+
+ public String getJMSXMimeType() {
+ return "jms/stream-message";
+ }
+
+ /**
+ * Clears out the message body. Clearing a message's body does not clear its
+ * header values or property entries. <p/>
+ * <P>
+ * If this message body was read-only, calling this method leaves the
+ * message body in the same state as an empty body in a newly created
+ * message.
+ *
+ * @throws JMSException if the JMS provider fails to clear the message body
+ * due to some internal error.
+ */
+
+ public void clearBody() throws JMSException {
+ super.clearBody();
+ this.dataOut = null;
+ this.dataIn = null;
+ this.bytesOut = null;
+ this.remainingBytes = -1;
+ }
+
+ /**
+ * Reads a <code>boolean</code> from the stream message.
+ *
+ * @return the <code>boolean</code> value read
+ * @throws JMSException if the JMS provider fails to read the message due to
+ * some internal error.
+ * @throws MessageEOFException if unexpected end of message stream has been
+ * reached.
+ * @throws MessageFormatException if this type conversion is invalid.
+ * @throws MessageNotReadableException if the message is in write-only mode.
+ */
+
+ public boolean readBoolean() throws JMSException {
+ initializeReading();
+ try {
+
+ this.dataIn.mark(10);
+ int type = this.dataIn.read();
+ if (type == -1) {
+ throw new MessageEOFException("reached end of data");
+ }
+ if (type == MarshallingSupport.BOOLEAN_TYPE) {
+ return this.dataIn.readBoolean();
+ }
+ if (type == MarshallingSupport.STRING_TYPE) {
+ return Boolean.valueOf(this.dataIn.readUTF()).booleanValue();
+ }
+ if (type == MarshallingSupport.NULL) {
+ this.dataIn.reset();
+ throw new NullPointerException("Cannot convert NULL value to boolean.");
+ } else {
+ this.dataIn.reset();
+ throw new MessageFormatException(" not a boolean type");
+ }
+ } catch (EOFException e) {
+ throw JMSExceptionSupport.createMessageEOFException(e);
+ } catch (IOException e) {
+ throw JMSExceptionSupport.createMessageFormatException(e);
+ }
+ }
+
+ /**
+ * Reads a <code>byte</code> value from the stream message.
+ *
+ * @return the next byte from the stream message as a 8-bit
+ * <code>byte</code>
+ * @throws JMSException if the JMS provider fails to read the message due to
+ * some internal error.
+ * @throws MessageEOFException if unexpected end of message stream has been
+ * reached.
+ * @throws MessageFormatException if this type conversion is invalid.
+ * @throws MessageNotReadableException if the message is in write-only mode.
+ */
+
+ public byte readByte() throws JMSException {
+ initializeReading();
+ try {
+
+ this.dataIn.mark(10);
+ int type = this.dataIn.read();
+ if (type == -1) {
+ throw new MessageEOFException("reached end of data");
+ }
+ if (type == MarshallingSupport.BYTE_TYPE) {
+ return this.dataIn.readByte();
+ }
+ if (type == MarshallingSupport.STRING_TYPE) {
+ return Byte.valueOf(this.dataIn.readUTF()).byteValue();
+ }
+ if (type == MarshallingSupport.NULL) {
+ this.dataIn.reset();
+ throw new NullPointerException("Cannot convert NULL value to byte.");
+ } else {
+ this.dataIn.reset();
+ throw new MessageFormatException(" not a byte type");
+ }
+ } catch (NumberFormatException mfe) {
+ try {
+ this.dataIn.reset();
+ } catch (IOException ioe) {
+ throw JMSExceptionSupport.create(ioe);
+ }
+ throw mfe;
+
+ } catch (EOFException e) {
+ throw JMSExceptionSupport.createMessageEOFException(e);
+ } catch (IOException e) {
+ throw JMSExceptionSupport.createMessageFormatException(e);
+ }
+ }
+
+ /**
+ * Reads a 16-bit integer from the stream message.
+ *
+ * @return a 16-bit integer from the stream message
+ * @throws JMSException if the JMS provider fails to read the message due to
+ * some internal error.
+ * @throws MessageEOFException if unexpected end of message stream has been
+ * reached.
+ * @throws MessageFormatException if this type conversion is invalid.
+ * @throws MessageNotReadableException if the message is in write-only mode.
+ */
+
+ public short readShort() throws JMSException {
+ initializeReading();
+ try {
+
+ this.dataIn.mark(17);
+ int type = this.dataIn.read();
+ if (type == -1) {
+ throw new MessageEOFException("reached end of data");
+ }
+ if (type == MarshallingSupport.SHORT_TYPE) {
+ return this.dataIn.readShort();
+ }
+ if (type == MarshallingSupport.BYTE_TYPE) {
+ return this.dataIn.readByte();
+ }
+ if (type == MarshallingSupport.STRING_TYPE) {
+ return Short.valueOf(this.dataIn.readUTF()).shortValue();
+ }
+ if (type == MarshallingSupport.NULL) {
+ this.dataIn.reset();
+ throw new NullPointerException("Cannot convert NULL value to short.");
+ } else {
+ this.dataIn.reset();
+ throw new MessageFormatException(" not a short type");
+ }
+ } catch (NumberFormatException mfe) {
+ try {
+ this.dataIn.reset();
+ } catch (IOException ioe) {
+ throw JMSExceptionSupport.create(ioe);
+ }
+ throw mfe;
+
+ } catch (EOFException e) {
+ throw JMSExceptionSupport.createMessageEOFException(e);
+ } catch (IOException e) {
+ throw JMSExceptionSupport.createMessageFormatException(e);
+ }
+
+ }
+
+ /**
+ * Reads a Unicode character value from the stream message.
+ *
+ * @return a Unicode character from the stream message
+ * @throws JMSException if the JMS provider fails to read the message due to
+ * some internal error.
+ * @throws MessageEOFException if unexpected end of message stream has been
+ * reached.
+ * @throws MessageFormatException if this type conversion is invalid
+ * @throws MessageNotReadableException if the message is in write-only mode.
+ */
+
+ public char readChar() throws JMSException {
+ initializeReading();
+ try {
+
+ this.dataIn.mark(17);
+ int type = this.dataIn.read();
+ if (type == -1) {
+ throw new MessageEOFException("reached end of data");
+ }
+ if (type == MarshallingSupport.CHAR_TYPE) {
+ return this.dataIn.readChar();
+ }
+ if (type == MarshallingSupport.NULL) {
+ this.dataIn.reset();
+ throw new NullPointerException("Cannot convert NULL value to char.");
+ } else {
+ this.dataIn.reset();
+ throw new MessageFormatException(" not a char type");
+ }
+ } catch (NumberFormatException mfe) {
+ try {
+ this.dataIn.reset();
+ } catch (IOException ioe) {
+ throw JMSExceptionSupport.create(ioe);
+ }
+ throw mfe;
+
+ } catch (EOFException e) {
+ throw JMSExceptionSupport.createMessageEOFException(e);
+ } catch (IOException e) {
+ throw JMSExceptionSupport.createMessageFormatException(e);
+ }
+ }
+
+ /**
+ * Reads a 32-bit integer from the stream message.
+ *
+ * @return a 32-bit integer value from the stream message, interpreted as an
+ * <code>int</code>
+ * @throws JMSException if the JMS provider fails to read the message due to
+ * some internal error.
+ * @throws MessageEOFException if unexpected end of message stream has been
+ * reached.
+ * @throws MessageFormatException if this type conversion is invalid.
+ * @throws MessageNotReadableException if the message is in write-only mode.
+ */
+
+ public int readInt() throws JMSException {
+ initializeReading();
+ try {
+
+ this.dataIn.mark(33);
+ int type = this.dataIn.read();
+ if (type == -1) {
+ throw new MessageEOFException("reached end of data");
+ }
+ if (type == MarshallingSupport.INTEGER_TYPE) {
+ return this.dataIn.readInt();
+ }
+ if (type == MarshallingSupport.SHORT_TYPE) {
+ return this.dataIn.readShort();
+ }
+ if (type == MarshallingSupport.BYTE_TYPE) {
+ return this.dataIn.readByte();
+ }
+ if (type == MarshallingSupport.STRING_TYPE) {
+ return Integer.valueOf(this.dataIn.readUTF()).intValue();
+ }
+ if (type == MarshallingSupport.NULL) {
+ this.dataIn.reset();
+ throw new NullPointerException("Cannot convert NULL value to int.");
+ } else {
+ this.dataIn.reset();
+ throw new MessageFormatException(" not an int type");
+ }
+ } catch (NumberFormatException mfe) {
+ try {
+ this.dataIn.reset();
+ } catch (IOException ioe) {
+ throw JMSExceptionSupport.create(ioe);
+ }
+ throw mfe;
+
+ } catch (EOFException e) {
+ throw JMSExceptionSupport.createMessageEOFException(e);
+ } catch (IOException e) {
+ throw JMSExceptionSupport.createMessageFormatException(e);
+ }
+ }
+
+ /**
+ * Reads a 64-bit integer from the stream message.
+ *
+ * @return a 64-bit integer value from the stream message, interpreted as a
+ * <code>long</code>
+ * @throws JMSException if the JMS provider fails to read the message due to
+ * some internal error.
+ * @throws MessageEOFException if unexpected end of message stream has been
+ * reached.
+ * @throws MessageFormatException if this type conversion is invalid.
+ * @throws MessageNotReadableException if the message is in write-only mode.
+ */
+
+ public long readLong() throws JMSException {
+ initializeReading();
+ try {
+
+ this.dataIn.mark(65);
+ int type = this.dataIn.read();
+ if (type == -1) {
+ throw new MessageEOFException("reached end of data");
+ }
+ if (type == MarshallingSupport.LONG_TYPE) {
+ return this.dataIn.readLong();
+ }
+ if (type == MarshallingSupport.INTEGER_TYPE) {
+ return this.dataIn.readInt();
+ }
+ if (type == MarshallingSupport.SHORT_TYPE) {
+ return this.dataIn.readShort();
+ }
+ if (type == MarshallingSupport.BYTE_TYPE) {
+ return this.dataIn.readByte();
+ }
+ if (type == MarshallingSupport.STRING_TYPE) {
+ return Long.valueOf(this.dataIn.readUTF()).longValue();
+ }
+ if (type == MarshallingSupport.NULL) {
+ this.dataIn.reset();
+ throw new NullPointerException("Cannot convert NULL value to long.");
+ } else {
+ this.dataIn.reset();
+ throw new MessageFormatException(" not a long type");
+ }
+ } catch (NumberFormatException mfe) {
+ try {
+ this.dataIn.reset();
+ } catch (IOException ioe) {
+ throw JMSExceptionSupport.create(ioe);
+ }
+ throw mfe;
+
+ } catch (EOFException e) {
+ throw JMSExceptionSupport.createMessageEOFException(e);
+ } catch (IOException e) {
+ throw JMSExceptionSupport.createMessageFormatException(e);
+ }
+ }
+
+ /**
+ * Reads a <code>float</code> from the stream message.
+ *
+ * @return a <code>float</code> value from the stream message
+ * @throws JMSException if the JMS provider fails to read the message due to
+ * some internal error.
+ * @throws MessageEOFException if unexpected end of message stream has been
+ * reached.
+ * @throws MessageFormatException if this type conversion is invalid.
+ * @throws MessageNotReadableException if the message is in write-only mode.
+ */
+
+ public float readFloat() throws JMSException {
+ initializeReading();
+ try {
+ this.dataIn.mark(33);
+ int type = this.dataIn.read();
+ if (type == -1) {
+ throw new MessageEOFException("reached end of data");
+ }
+ if (type == MarshallingSupport.FLOAT_TYPE) {
+ return this.dataIn.readFloat();
+ }
+ if (type == MarshallingSupport.STRING_TYPE) {
+ return Float.valueOf(this.dataIn.readUTF()).floatValue();
+ }
+ if (type == MarshallingSupport.NULL) {
+ this.dataIn.reset();
+ throw new NullPointerException("Cannot convert NULL value to float.");
+ } else {
+ this.dataIn.reset();
+ throw new MessageFormatException(" not a float type");
+ }
+ } catch (NumberFormatException mfe) {
+ try {
+ this.dataIn.reset();
+ } catch (IOException ioe) {
+ throw JMSExceptionSupport.create(ioe);
+ }
+ throw mfe;
+
+ } catch (EOFException e) {
+ throw JMSExceptionSupport.createMessageEOFException(e);
+ } catch (IOException e) {
+ throw JMSExceptionSupport.createMessageFormatException(e);
+ }
+ }
+
+ /**
+ * Reads a <code>double</code> from the stream message.
+ *
+ * @return a <code>double</code> value from the stream message
+ * @throws JMSException if the JMS provider fails to read the message due to
+ * some internal error.
+ * @throws MessageEOFException if unexpected end of message stream has been
+ * reached.
+ * @throws MessageFormatException if this type conversion is invalid.
+ * @throws MessageNotReadableException if the message is in write-only mode.
+ */
+
+ public double readDouble() throws JMSException {
+ initializeReading();
+ try {
+
+ this.dataIn.mark(65);
+ int type = this.dataIn.read();
+ if (type == -1) {
+ throw new MessageEOFException("reached end of data");
+ }
+ if (type == MarshallingSupport.DOUBLE_TYPE) {
+ return this.dataIn.readDouble();
+ }
+ if (type == MarshallingSupport.FLOAT_TYPE) {
+ return this.dataIn.readFloat();
+ }
+ if (type == MarshallingSupport.STRING_TYPE) {
+ return Double.valueOf(this.dataIn.readUTF()).doubleValue();
+ }
+ if (type == MarshallingSupport.NULL) {
+ this.dataIn.reset();
+ throw new NullPointerException("Cannot convert NULL value to double.");
+ } else {
+ this.dataIn.reset();
+ throw new MessageFormatException(" not a double type");
+ }
+ } catch (NumberFormatException mfe) {
+ try {
+ this.dataIn.reset();
+ } catch (IOException ioe) {
+ throw JMSExceptionSupport.create(ioe);
+ }
+ throw mfe;
+
+ } catch (EOFException e) {
+ throw JMSExceptionSupport.createMessageEOFException(e);
+ } catch (IOException e) {
+ throw JMSExceptionSupport.createMessageFormatException(e);
+ }
+ }
+
+ /**
+ * Reads a <CODE>String</CODE> from the stream message.
+ *
+ * @return a Unicode string from the stream message
+ * @throws JMSException if the JMS provider fails to read the message due to
+ * some internal error.
+ * @throws MessageEOFException if unexpected end of message stream has been
+ * reached.
+ * @throws MessageFormatException if this type conversion is invalid.
+ * @throws MessageNotReadableException if the message is in write-only mode.
+ */
+
+ public String readString() throws JMSException {
+ initializeReading();
+ try {
+
+ this.dataIn.mark(65);
+ int type = this.dataIn.read();
+ if (type == -1) {
+ throw new MessageEOFException("reached end of data");
+ }
+ if (type == MarshallingSupport.NULL) {
+ return null;
+ }
+ if (type == MarshallingSupport.BIG_STRING_TYPE) {
+ return MarshallingSupport.readUTF8(dataIn);
+ }
+ if (type == MarshallingSupport.STRING_TYPE) {
+ return this.dataIn.readUTF();
+ }
+ if (type == MarshallingSupport.LONG_TYPE) {
+ return new Long(this.dataIn.readLong()).toString();
+ }
+ if (type == MarshallingSupport.INTEGER_TYPE) {
+ return new Integer(this.dataIn.readInt()).toString();
+ }
+ if (type == MarshallingSupport.SHORT_TYPE) {
+ return new Short(this.dataIn.readShort()).toString();
+ }
+ if (type == MarshallingSupport.BYTE_TYPE) {
+ return new Byte(this.dataIn.readByte()).toString();
+ }
+ if (type == MarshallingSupport.FLOAT_TYPE) {
+ return new Float(this.dataIn.readFloat()).toString();
+ }
+ if (type == MarshallingSupport.DOUBLE_TYPE) {
+ return new Double(this.dataIn.readDouble()).toString();
+ }
+ if (type == MarshallingSupport.BOOLEAN_TYPE) {
+ return (this.dataIn.readBoolean() ? Boolean.TRUE : Boolean.FALSE).toString();
+ }
+ if (type == MarshallingSupport.CHAR_TYPE) {
+ return new Character(this.dataIn.readChar()).toString();
+ } else {
+ this.dataIn.reset();
+ throw new MessageFormatException(" not a String type");
+ }
+ } catch (NumberFormatException mfe) {
+ try {
+ this.dataIn.reset();
+ } catch (IOException ioe) {
+ throw JMSExceptionSupport.create(ioe);
+ }
+ throw mfe;
+
+ } catch (EOFException e) {
+ throw JMSExceptionSupport.createMessageEOFException(e);
+ } catch (IOException e) {
+ throw JMSExceptionSupport.createMessageFormatException(e);
+ }
+ }
+
+ /**
+ * Reads a byte array field from the stream message into the specified
+ * <CODE>byte[]</CODE> object (the read buffer). <p/>
+ * <P>
+ * To read the field value, <CODE>readBytes</CODE> should be successively
+ * called until it returns a value less than the length of the read buffer.
+ * The value of the bytes in the buffer following the last byte read is
+ * undefined. <p/>
+ * <P>
+ * If <CODE>readBytes</CODE> returns a value equal to the length of the
+ * buffer, a subsequent <CODE>readBytes</CODE> call must be made. If there
+ * are no more bytes to be read, this call returns -1. <p/>
+ * <P>
+ * If the byte array field value is null, <CODE>readBytes</CODE> returns
+ * -1. <p/>
+ * <P>
+ * If the byte array field value is empty, <CODE>readBytes</CODE> returns
+ * 0. <p/>
+ * <P>
+ * Once the first <CODE>readBytes</CODE> call on a <CODE>byte[]</CODE>
+ * field value has been made, the full value of the field must be read
+ * before it is valid to read the next field. An attempt to read the next
+ * field before that has been done will throw a
+ * <CODE>MessageFormatException</CODE>. <p/>
+ * <P>
+ * To read the byte field value into a new <CODE>byte[]</CODE> object, use
+ * the <CODE>readObject</CODE> method.
+ *
+ * @param value the buffer into which the data is read
+ * @return the total number of bytes read into the buffer, or -1 if there is
+ * no more data because the end of the byte field has been reached
+ * @throws JMSException if the JMS provider fails to read the message due to
+ * some internal error.
+ * @throws MessageEOFException if unexpected end of message stream has been
+ * reached.
+ * @throws MessageFormatException if this type conversion is invalid.
+ * @throws MessageNotReadableException if the message is in write-only mode.
+ * @see #readObject()
+ */
+
+ public int readBytes(byte[] value) throws JMSException {
+
+ initializeReading();
+ try {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+
+ if (remainingBytes == -1) {
+ this.dataIn.mark(value.length + 1);
+ int type = this.dataIn.read();
+ if (type == -1) {
+ throw new MessageEOFException("reached end of data");
+ }
+ if (type != MarshallingSupport.BYTE_ARRAY_TYPE) {
+ throw new MessageFormatException("Not a byte array");
+ }
+ remainingBytes = this.dataIn.readInt();
+ } else if (remainingBytes == 0) {
+ remainingBytes = -1;
+ return -1;
+ }
+
+ if (value.length <= remainingBytes) {
+ // small buffer
+ remainingBytes -= value.length;
+ this.dataIn.readFully(value);
+ return value.length;
+ } else {
+ // big buffer
+ int rc = this.dataIn.read(value, 0, remainingBytes);
+ remainingBytes = 0;
+ return rc;
+ }
+
+ } catch (EOFException e) {
+ JMSException jmsEx = new MessageEOFException(e.getMessage());
+ jmsEx.setLinkedException(e);
+ throw jmsEx;
+ } catch (IOException e) {
+ JMSException jmsEx = new MessageFormatException(e.getMessage());
+ jmsEx.setLinkedException(e);
+ throw jmsEx;
+ }
+ }
+
+ /**
+ * Reads an object from the stream message. <p/>
+ * <P>
+ * This method can be used to return, in objectified format, an object in
+ * the Java programming language ("Java object") that has been written to
+ * the stream with the equivalent <CODE>writeObject</CODE> method call, or
+ * its equivalent primitive <CODE>write<I>type</I></CODE> method. <p/>
+ * <P>
+ * Note that byte values are returned as <CODE>byte[]</CODE>, not
+ * <CODE>Byte[]</CODE>. <p/>
+ * <P>
+ * An attempt to call <CODE>readObject</CODE> to read a byte field value
+ * into a new <CODE>byte[]</CODE> object before the full value of the byte
+ * field has been read will throw a <CODE>MessageFormatException</CODE>.
+ *
+ * @return a Java object from the stream message, in objectified format (for
+ * example, if the object was written as an <CODE>int</CODE>, an
+ * <CODE>Integer</CODE> is returned)
+ * @throws JMSException if the JMS provider fails to read the message due to
+ * some internal error.
+ * @throws MessageEOFException if unexpected end of message stream has been
+ * reached.
+ * @throws MessageFormatException if this type conversion is invalid.
+ * @throws MessageNotReadableException if the message is in write-only mode.
+ * @see #readBytes(byte[] value)
+ */
+
+ public Object readObject() throws JMSException {
+ initializeReading();
+ try {
+ this.dataIn.mark(65);
+ int type = this.dataIn.read();
+ if (type == -1) {
+ throw new MessageEOFException("reached end of data");
+ }
+ if (type == MarshallingSupport.NULL) {
+ return null;
+ }
+ if (type == MarshallingSupport.BIG_STRING_TYPE) {
+ return MarshallingSupport.readUTF8(dataIn);
+ }
+ if (type == MarshallingSupport.STRING_TYPE) {
+ return this.dataIn.readUTF();
+ }
+ if (type == MarshallingSupport.LONG_TYPE) {
+ return Long.valueOf(this.dataIn.readLong());
+ }
+ if (type == MarshallingSupport.INTEGER_TYPE) {
+ return Integer.valueOf(this.dataIn.readInt());
+ }
+ if (type == MarshallingSupport.SHORT_TYPE) {
+ return Short.valueOf(this.dataIn.readShort());
+ }
+ if (type == MarshallingSupport.BYTE_TYPE) {
+ return Byte.valueOf(this.dataIn.readByte());
+ }
+ if (type == MarshallingSupport.FLOAT_TYPE) {
+ return new Float(this.dataIn.readFloat());
+ }
+ if (type == MarshallingSupport.DOUBLE_TYPE) {
+ return new Double(this.dataIn.readDouble());
+ }
+ if (type == MarshallingSupport.BOOLEAN_TYPE) {
+ return this.dataIn.readBoolean() ? Boolean.TRUE : Boolean.FALSE;
+ }
+ if (type == MarshallingSupport.CHAR_TYPE) {
+ return Character.valueOf(this.dataIn.readChar());
+ }
+ if (type == MarshallingSupport.BYTE_ARRAY_TYPE) {
+ int len = this.dataIn.readInt();
+ byte[] value = new byte[len];
+ this.dataIn.readFully(value);
+ return value;
+ } else {
+ this.dataIn.reset();
+ throw new MessageFormatException("unknown type");
+ }
+ } catch (NumberFormatException mfe) {
+ try {
+ this.dataIn.reset();
+ } catch (IOException ioe) {
+ throw JMSExceptionSupport.create(ioe);
+ }
+ throw mfe;
+
+ } catch (EOFException e) {
+ JMSException jmsEx = new MessageEOFException(e.getMessage());
+ jmsEx.setLinkedException(e);
+ throw jmsEx;
+ } catch (IOException e) {
+ JMSException jmsEx = new MessageFormatException(e.getMessage());
+ jmsEx.setLinkedException(e);
+ throw jmsEx;
+ }
+ }
+
+ /**
+ * Writes a <code>boolean</code> to the stream message. The value
+ * <code>true</code> is written as the value <code>(byte)1</code>; the
+ * value <code>false</code> is written as the value <code>(byte)0</code>.
+ *
+ * @param value the <code>boolean</code> value to be written
+ * @throws JMSException if the JMS provider fails to write the message due
+ * to some internal error.
+ * @throws MessageNotWriteableException if the message is in read-only mode.
+ */
+
+ public void writeBoolean(boolean value) throws JMSException {
+ initializeWriting();
+ try {
+ MarshallingSupport.marshalBoolean(dataOut, value);
+ } catch (IOException ioe) {
+ throw JMSExceptionSupport.create(ioe);
+ }
+ }
+
+ /**
+ * Writes a <code>byte</code> to the stream message.
+ *
+ * @param value the <code>byte</code> value to be written
+ * @throws JMSException if the JMS provider fails to write the message due
+ * to some internal error.
+ * @throws MessageNotWriteableException if the message is in read-only mode.
+ */
+
+ public void writeByte(byte value) throws JMSException {
+ initializeWriting();
+ try {
+ MarshallingSupport.marshalByte(dataOut, value);
+ } catch (IOException ioe) {
+ throw JMSExceptionSupport.create(ioe);
+ }
+ }
+
+ /**
+ * Writes a <code>short</code> to the stream message.
+ *
+ * @param value the <code>short</code> value to be written
+ * @throws JMSException if the JMS provider fails to write the message due
+ * to some internal error.
+ * @throws MessageNotWriteableException if the message is in read-only mode.
+ */
+
+ public void writeShort(short value) throws JMSException {
+ initializeWriting();
+ try {
+ MarshallingSupport.marshalShort(dataOut, value);
+ } catch (IOException ioe) {
+ throw JMSExceptionSupport.create(ioe);
+ }
+ }
+
+ /**
+ * Writes a <code>char</code> to the stream message.
+ *
+ * @param value the <code>char</code> value to be written
+ * @throws JMSException if the JMS provider fails to write the message due
+ * to some internal error.
+ * @throws MessageNotWriteableException if the message is in read-only mode.
+ */
+
+ public void writeChar(char value) throws JMSException {
+ initializeWriting();
+ try {
+ MarshallingSupport.marshalChar(dataOut, value);
+ } catch (IOException ioe) {
+ throw JMSExceptionSupport.create(ioe);
+ }
+ }
+
+ /**
+ * Writes an <code>int</code> to the stream message.
+ *
+ * @param value the <code>int</code> value to be written
+ * @throws JMSException if the JMS provider fails to write the message due
+ * to some internal error.
+ * @throws MessageNotWriteableException if the message is in read-only mode.
+ */
+
+ public void writeInt(int value) throws JMSException {
+ initializeWriting();
+ try {
+ MarshallingSupport.marshalInt(dataOut, value);
+ } catch (IOException ioe) {
+ throw JMSExceptionSupport.create(ioe);
+ }
+ }
+
+ /**
+ * Writes a <code>long</code> to the stream message.
+ *
+ * @param value the <code>long</code> value to be written
+ * @throws JMSException if the JMS provider fails to write the message due
+ * to some internal error.
+ * @throws MessageNotWriteableException if the message is in read-only mode.
+ */
+
+ public void writeLong(long value) throws JMSException {
+ initializeWriting();
+ try {
+ MarshallingSupport.marshalLong(dataOut, value);
+ } catch (IOException ioe) {
+ throw JMSExceptionSupport.create(ioe);
+ }
+ }
+
+ /**
+ * Writes a <code>float</code> to the stream message.
+ *
+ * @param value the <code>float</code> value to be written
+ * @throws JMSException if the JMS provider fails to write the message due
+ * to some internal error.
+ * @throws MessageNotWriteableException if the message is in read-only mode.
+ */
+
+ public void writeFloat(float value) throws JMSException {
+ initializeWriting();
+ try {
+ MarshallingSupport.marshalFloat(dataOut, value);
+ } catch (IOException ioe) {
+ throw JMSExceptionSupport.create(ioe);
+ }
+ }
+
+ /**
+ * Writes a <code>double</code> to the stream message.
+ *
+ * @param value the <code>double</code> value to be written
+ * @throws JMSException if the JMS provider fails to write the message due
+ * to some internal error.
+ * @throws MessageNotWriteableException if the message is in read-only mode.
+ */
+
+ public void writeDouble(double value) throws JMSException {
+ initializeWriting();
+ try {
+ MarshallingSupport.marshalDouble(dataOut, value);
+ } catch (IOException ioe) {
+ throw JMSExceptionSupport.create(ioe);
+ }
+ }
+
+ /**
+ * Writes a <code>String</code> to the stream message.
+ *
+ * @param value the <code>String</code> value to be written
+ * @throws JMSException if the JMS provider fails to write the message due
+ * to some internal error.
+ * @throws MessageNotWriteableException if the message is in read-only mode.
+ */
+
+ public void writeString(String value) throws JMSException {
+ initializeWriting();
+ try {
+ if (value == null) {
+ MarshallingSupport.marshalNull(dataOut);
+ } else {
+ MarshallingSupport.marshalString(dataOut, value);
+ }
+ } catch (IOException ioe) {
+ throw JMSExceptionSupport.create(ioe);
+ }
+ }
+
+ /**
+ * Writes a byte array field to the stream message. <p/>
+ * <P>
+ * The byte array <code>value</code> is written to the message as a byte
+ * array field. Consecutively written byte array fields are treated as two
+ * distinct fields when the fields are read.
+ *
+ * @param value the byte array value to be written
+ * @throws JMSException if the JMS provider fails to write the message due
+ * to some internal error.
+ * @throws MessageNotWriteableException if the message is in read-only mode.
+ */
+
+ public void writeBytes(byte[] value) throws JMSException {
+ writeBytes(value, 0, value.length);
+ }
+
+ /**
+ * Writes a portion of a byte array as a byte array field to the stream
+ * message. <p/>
+ * <P>
+ * The a portion of the byte array <code>value</code> is written to the
+ * message as a byte array field. Consecutively written byte array fields
+ * are treated as two distinct fields when the fields are read.
+ *
+ * @param value the byte array value to be written
+ * @param offset the initial offset within the byte array
+ * @param length the number of bytes to use
+ * @throws JMSException if the JMS provider fails to write the message due
+ * to some internal error.
+ * @throws MessageNotWriteableException if the message is in read-only mode.
+ */
+
+ public void writeBytes(byte[] value, int offset, int length) throws JMSException {
+ initializeWriting();
+ try {
+ MarshallingSupport.marshalByteArray(dataOut, value, offset, length);
+ } catch (IOException ioe) {
+ throw JMSExceptionSupport.create(ioe);
+ }
+ }
+
+ /**
+ * Writes an object to the stream message. <p/>
+ * <P>
+ * This method works only for the objectified primitive object types (<code>Integer</code>,
+ * <code>Double</code>, <code>Long</code> ...),
+ * <code>String</code> objects, and byte arrays.
+ *
+ * @param value the Java object to be written
+ * @throws JMSException if the JMS provider fails to write the message due
+ * to some internal error.
+ * @throws MessageFormatException if the object is invalid.
+ * @throws MessageNotWriteableException if the message is in read-only mode.
+ */
+
+ public void writeObject(Object value) throws JMSException {
+ initializeWriting();
+ if (value == null) {
+ try {
+ MarshallingSupport.marshalNull(dataOut);
+ } catch (IOException ioe) {
+ throw JMSExceptionSupport.create(ioe);
+ }
+ } else if (value instanceof String) {
+ writeString(value.toString());
+ } else if (value instanceof Character) {
+ writeChar(((Character)value).charValue());
+ } else if (value instanceof Boolean) {
+ writeBoolean(((Boolean)value).booleanValue());
+ } else if (value instanceof Byte) {
+ writeByte(((Byte)value).byteValue());
+ } else if (value instanceof Short) {
+ writeShort(((Short)value).shortValue());
+ } else if (value instanceof Integer) {
+ writeInt(((Integer)value).intValue());
+ } else if (value instanceof Float) {
+ writeFloat(((Float)value).floatValue());
+ } else if (value instanceof Double) {
+ writeDouble(((Double)value).doubleValue());
+ } else if (value instanceof byte[]) {
+ writeBytes((byte[])value);
+ }else if (value instanceof Long) {
+ writeLong(((Long)value).longValue());
+ }else {
+ throw new MessageFormatException("Unsupported Object type: " + value.getClass());
+ }
+ }
+
+ /**
+ * Puts the message body in read-only mode and repositions the stream of
+ * bytes to the beginning.
+ *
+ * @throws JMSException if an internal error occurs
+ */
+
+ public void reset() throws JMSException {
+ storeContent();
+ this.bytesOut = null;
+ this.dataIn = null;
+ this.dataOut = null;
+ this.remainingBytes = -1;
+ setReadOnlyBody(true);
+ }
+
+ private void initializeWriting() throws MessageNotWriteableException {
+ checkReadOnlyBody();
+ if (this.dataOut == null) {
+ this.bytesOut = new ByteArrayOutputStream();
+ OutputStream os = bytesOut;
+ IConnection connection = getConnection();
+ if (connection != null && connection.isUseCompression()) {
+ compressed = true;
+ os = new DeflaterOutputStream(os);
+ }
+ this.dataOut = new DataOutputStream(os);
+ }
+ }
+
+ protected void checkWriteOnlyBody() throws MessageNotReadableException {
+ if (!readOnlyBody) {
+ throw new MessageNotReadableException("Message body is write-only");
+ }
+ }
+
+ private void initializeReading() throws MessageNotReadableException {
+ checkWriteOnlyBody();
+ if (this.dataIn == null) {
+ ByteSequence data = getContent();
+ if (data == null) {
+ data = new ByteSequence(new byte[] {}, 0, 0);
+ }
+ InputStream is = new ByteArrayInputStream(data);
+ if (isCompressed()) {
+ is = new InflaterInputStream(is);
+ is = new BufferedInputStream(is);
+ }
+ this.dataIn = new DataInputStream(is);
+ }
+ }
+
+ public String toString() {
+ return super.toString() + " ActiveMQStreamMessage{ " + "bytesOut = " + bytesOut + ", dataOut = " + dataOut + ", dataIn = " + dataIn + " }";
+ }
+}
Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTempDestination.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTempDestination.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTempDestination.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTempDestination.java Mon Jun 1 18:37:41 2009
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.command;
+
+import javax.jms.JMSException;
+import org.apache.activemq.IConnection;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * @openwire:marshaller
+ * @version $Revision: 1.5 $
+ */
+public abstract class ActiveMQTempDestination extends ActiveMQDestination {
+
+ private static final Log LOG = LogFactory.getLog(ActiveMQTempDestination.class);
+ protected transient IConnection connection;
+ protected transient String connectionId;
+ protected transient int sequenceId;
+
+ public ActiveMQTempDestination() {
+ }
+
+ public ActiveMQTempDestination(String name) {
+ super(name);
+ }
+
+ public ActiveMQTempDestination(String connectionId, long sequenceId) {
+ super(connectionId + ":" + sequenceId);
+ }
+
+ public boolean isTemporary() {
+ return true;
+ }
+
+ public void delete() throws JMSException {
+ connection.deleteTempDestination(this);
+ }
+
+ public IConnection getConnection() {
+ return connection;
+ }
+
+ public void setConnection(IConnection connection) {
+ this.connection = connection;
+ }
+
+ public void setPhysicalName(String physicalName) {
+ super.setPhysicalName(physicalName);
+ if (!isComposite()) {
+ // Parse off the sequenceId off the end.
+ // this can fail if the temp destination is
+ // generated by another JMS system via the JMS<->JMS Bridge
+ int p = this.physicalName.lastIndexOf(":");
+ if (p >= 0) {
+ String seqStr = this.physicalName.substring(p + 1).trim();
+ if (seqStr != null && seqStr.length() > 0) {
+ try {
+ sequenceId = Integer.parseInt(seqStr);
+ } catch (NumberFormatException e) {
+ LOG.debug("Did not parse sequence Id from " + physicalName);
+ }
+ // The rest should be the connection id.
+ connectionId = this.physicalName.substring(0, p);
+ }
+ }
+ }
+ }
+
+ public String getConnectionId() {
+ return connectionId;
+ }
+
+ public void setConnectionId(String connectionId) {
+ this.connectionId = connectionId;
+ }
+
+ public int getSequenceId() {
+ return sequenceId;
+ }
+}
Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTempDestination.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTempQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTempQueue.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTempQueue.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTempQueue.java Mon Jun 1 18:37:41 2009
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.command;
+
+import javax.jms.JMSException;
+import javax.jms.TemporaryQueue;
+
+/**
+ * @openwire:marshaller code="102"
+ * @version $Revision: 1.6 $
+ */
+public class ActiveMQTempQueue extends ActiveMQTempDestination implements TemporaryQueue {
+
+ public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_TEMP_QUEUE;
+ private static final long serialVersionUID = 6683049467527633867L;
+
+ public ActiveMQTempQueue() {
+ }
+
+ public ActiveMQTempQueue(String name) {
+ super(name);
+ }
+
+ public ActiveMQTempQueue(ConnectionId connectionId, long sequenceId) {
+ super(connectionId.getValue(), sequenceId);
+ }
+
+ public byte getDataStructureType() {
+ return DATA_STRUCTURE_TYPE;
+ }
+
+ public boolean isQueue() {
+ return true;
+ }
+
+ public String getQueueName() throws JMSException {
+ return getPhysicalName();
+ }
+
+ public byte getDestinationType() {
+ return TEMP_QUEUE_TYPE;
+ }
+
+ protected String getQualifiedPrefix() {
+ return TEMP_QUEUE_QUALIFED_PREFIX;
+ }
+
+}
Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTempQueue.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTempTopic.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTempTopic.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTempTopic.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTempTopic.java Mon Jun 1 18:37:41 2009
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.command;
+
+import javax.jms.JMSException;
+import javax.jms.TemporaryTopic;
+
+/**
+ * @openwire:marshaller code="103"
+ * @version $Revision: 1.6 $
+ */
+public class ActiveMQTempTopic extends ActiveMQTempDestination implements TemporaryTopic {
+
+ public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_TEMP_TOPIC;
+ private static final long serialVersionUID = -4325596784597300253L;
+
+ public ActiveMQTempTopic() {
+ }
+
+ public ActiveMQTempTopic(String name) {
+ super(name);
+ }
+
+ public ActiveMQTempTopic(ConnectionId connectionId, long sequenceId) {
+ super(connectionId.getValue(), sequenceId);
+ }
+
+ public byte getDataStructureType() {
+ return DATA_STRUCTURE_TYPE;
+ }
+
+ public boolean isTopic() {
+ return true;
+ }
+
+ public String getTopicName() throws JMSException {
+ return getPhysicalName();
+ }
+
+ public byte getDestinationType() {
+ return TEMP_TOPIC_TYPE;
+ }
+
+ protected String getQualifiedPrefix() {
+ return TEMP_TOPIC_QUALIFED_PREFIX;
+ }
+}
Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTempTopic.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java Mon Jun 1 18:37:41 2009
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.command;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.zip.DeflaterOutputStream;
+import java.util.zip.InflaterInputStream;
+
+import javax.jms.JMSException;
+import javax.jms.MessageNotWriteableException;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.IConnection;
+import org.apache.activemq.util.ByteArrayInputStream;
+import org.apache.activemq.util.ByteArrayOutputStream;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.JMSExceptionSupport;
+import org.apache.activemq.util.MarshallingSupport;
+import org.apache.activemq.wireformat.WireFormat;
+
+/**
+ * @openwire:marshaller code="28"
+ * @version $Revision$
+ */
+public class ActiveMQTextMessage extends ActiveMQMessage implements TextMessage {
+
+ public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_TEXT_MESSAGE;
+
+ protected String text;
+
+ public Message copy() {
+ ActiveMQTextMessage copy = new ActiveMQTextMessage();
+ copy(copy);
+ return copy;
+ }
+
+ private void copy(ActiveMQTextMessage copy) {
+ super.copy(copy);
+ copy.text = text;
+ }
+
+ public byte getDataStructureType() {
+ return DATA_STRUCTURE_TYPE;
+ }
+
+ public String getJMSXMimeType() {
+ return "jms/text-message";
+ }
+
+ public void setText(String text) throws MessageNotWriteableException {
+ checkReadOnlyBody();
+ this.text = text;
+ setContent(null);
+ }
+
+ public String getText() throws JMSException {
+ if (text == null && getContent() != null) {
+ InputStream is = null;
+ try {
+ ByteSequence bodyAsBytes = getContent();
+ if (bodyAsBytes != null) {
+ is = new ByteArrayInputStream(bodyAsBytes);
+ if (isCompressed()) {
+ is = new InflaterInputStream(is);
+ }
+ DataInputStream dataIn = new DataInputStream(is);
+ text = MarshallingSupport.readUTF8(dataIn);
+ dataIn.close();
+ setContent(null);
+ }
+ } catch (IOException ioe) {
+ throw JMSExceptionSupport.create(ioe);
+ } finally {
+ if (is != null) {
+ try {
+ is.close();
+ } catch (IOException e) {
+ // ignore
+ }
+ }
+ }
+ }
+ return text;
+ }
+
+ public void beforeMarshall(WireFormat wireFormat) throws IOException {
+ super.beforeMarshall(wireFormat);
+
+ ByteSequence content = getContent();
+ if (content == null && text != null) {
+ ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+ OutputStream os = bytesOut;
+ IConnection connection = getConnection();
+ if (connection != null && connection.isUseCompression()) {
+ compressed = true;
+ os = new DeflaterOutputStream(os);
+ }
+ DataOutputStream dataOut = new DataOutputStream(os);
+ MarshallingSupport.writeUTF8(dataOut, this.text);
+ dataOut.close();
+ setContent(bytesOut.toByteSequence());
+ //see https://issues.apache.org/activemq/browse/AMQ-2103
+ this.text=null;
+ }
+ }
+
+ /**
+ * Clears out the message body. Clearing a message's body does not clear its
+ * header values or property entries. <p/>
+ * <P>
+ * If this message body was read-only, calling this method leaves the
+ * message body in the same state as an empty body in a newly created
+ * message.
+ *
+ * @throws JMSException if the JMS provider fails to clear the message body
+ * due to some internal error.
+ */
+ public void clearBody() throws JMSException {
+ super.clearBody();
+ this.text = null;
+ }
+
+ public int getSize() {
+ if (size == 0 && content == null && text != null) {
+ size = getMinimumMessageSize();
+ if (marshalledProperties != null) {
+ size += marshalledProperties.getLength();
+ }
+ size = text.length() * 2;
+ }
+ return super.getSize();
+ }
+
+ public String toString() {
+ try {
+ String text = getText();
+ if (text != null && text.length() > 63) {
+ text = text.substring(0, 45) + "..." + text.substring(text.length() - 12);
+ HashMap<String, Object> overrideFields = new HashMap<String, Object>();
+ overrideFields.put("text", text);
+ return super.toString(overrideFields);
+ }
+ } catch (JMSException e) {
+ }
+ return super.toString();
+ }
+}
Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTopic.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTopic.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTopic.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTopic.java Mon Jun 1 18:37:41 2009
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.command;
+
+import javax.jms.JMSException;
+import javax.jms.Topic;
+
+/**
+ * @org.apache.xbean.XBean element="topic" description="An ActiveMQ Topic
+ * Destination"
+ * @openwire:marshaller code="101"
+ * @version $Revision: 1.5 $
+ */
+public class ActiveMQTopic extends ActiveMQDestination implements Topic {
+
+ public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_TOPIC;
+ private static final long serialVersionUID = 7300307405896488588L;
+
+ public ActiveMQTopic() {
+ }
+
+ public ActiveMQTopic(String name) {
+ super(name);
+ }
+
+ public byte getDataStructureType() {
+ return DATA_STRUCTURE_TYPE;
+ }
+
+ public boolean isTopic() {
+ return true;
+ }
+
+ public String getTopicName() throws JMSException {
+ return getPhysicalName();
+ }
+
+ public byte getDestinationType() {
+ return TOPIC_TYPE;
+ }
+
+ protected String getQualifiedPrefix() {
+ return TOPIC_QUALIFIED_PREFIX;
+ }
+
+}
Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTopic.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/BaseCommand.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/BaseCommand.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/BaseCommand.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/BaseCommand.java Mon Jun 1 18:37:41 2009
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.command;
+
+import java.util.Map;
+
+import org.apache.activemq.util.IntrospectionSupport;
+
+
+/**
+ *
+ * @openwire:marshaller
+ * @version $Revision: 1.11 $
+ */
+public abstract class BaseCommand implements Command {
+
+ protected int commandId;
+ protected boolean responseRequired;
+
+ private transient Endpoint from;
+ private transient Endpoint to;
+
+ public void copy(BaseCommand copy) {
+ copy.commandId = commandId;
+ copy.responseRequired = responseRequired;
+ }
+
+ /**
+ * @openwire:property version=1
+ */
+ public int getCommandId() {
+ return commandId;
+ }
+
+ public void setCommandId(int commandId) {
+ this.commandId = commandId;
+ }
+
+ /**
+ * @openwire:property version=1
+ */
+ public boolean isResponseRequired() {
+ return responseRequired;
+ }
+
+ public void setResponseRequired(boolean responseRequired) {
+ this.responseRequired = responseRequired;
+ }
+
+ public String toString() {
+ return toString(null);
+ }
+
+ public String toString(Map<String, Object>overrideFields) {
+ return IntrospectionSupport.toString(this, BaseCommand.class, overrideFields);
+ }
+
+ public boolean isWireFormatInfo() {
+ return false;
+ }
+
+ public boolean isBrokerInfo() {
+ return false;
+ }
+
+ public boolean isResponse() {
+ return false;
+ }
+
+ public boolean isMessageDispatch() {
+ return false;
+ }
+
+ public boolean isMessage() {
+ return false;
+ }
+
+ public boolean isMarshallAware() {
+ return false;
+ }
+
+ public boolean isMessageAck() {
+ return false;
+ }
+
+ public boolean isMessageDispatchNotification() {
+ return false;
+ }
+
+ public boolean isShutdownInfo() {
+ return false;
+ }
+
+ /**
+ * The endpoint within the transport where this message came from.
+ */
+ public Endpoint getFrom() {
+ return from;
+ }
+
+ public void setFrom(Endpoint from) {
+ this.from = from;
+ }
+
+ /**
+ * The endpoint within the transport where this message is going to - null means all endpoints.
+ */
+ public Endpoint getTo() {
+ return to;
+ }
+
+ public void setTo(Endpoint to) {
+ this.to = to;
+ }
+
+
+}
Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/BaseCommand.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/BaseEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/BaseEndpoint.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/BaseEndpoint.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/BaseEndpoint.java Mon Jun 1 18:37:41 2009
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.command;
+
+/**
+ * A default endpoint.
+ *
+ * @version $Revision: 564679 $
+ */
+public class BaseEndpoint implements Endpoint {
+
+ private String name;
+ private BrokerInfo brokerInfo;
+
+ public BaseEndpoint(String name) {
+ this.name = name;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public String toString() {
+ String brokerText = "";
+ BrokerId brokerId = getBrokerId();
+ if (brokerId != null) {
+ brokerText = " broker: " + brokerId;
+ }
+ return "Endpoint[name:" + name + brokerText + "]";
+ }
+
+ /**
+ * Returns the broker ID for this endpoint, if the endpoint is a broker or
+ * null
+ */
+ public BrokerId getBrokerId() {
+ if (brokerInfo != null) {
+ return brokerInfo.getBrokerId();
+ }
+ return null;
+ }
+
+ /**
+ * Returns the broker information for this endpoint, if the endpoint is a
+ * broker or null
+ */
+ public BrokerInfo getBrokerInfo() {
+ return brokerInfo;
+ }
+
+ public void setBrokerInfo(BrokerInfo brokerInfo) {
+ this.brokerInfo = brokerInfo;
+ }
+
+}
Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/BrokerId.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/BrokerId.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/BrokerId.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/BrokerId.java Mon Jun 1 18:37:41 2009
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.command;
+
+/**
+ * @openwire:marshaller code="124"
+ * @version $Revision$
+ */
+public class BrokerId implements DataStructure {
+
+ public static final byte DATA_STRUCTURE_TYPE = CommandTypes.BROKER_ID;
+ protected String value;
+
+ public BrokerId() {
+ }
+
+ public BrokerId(String brokerId) {
+ this.value = brokerId;
+ }
+
+ public int hashCode() {
+ return value.hashCode();
+ }
+
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || o.getClass() != BrokerId.class) {
+ return false;
+ }
+ BrokerId id = (BrokerId)o;
+ return value.equals(id.value);
+ }
+
+ public byte getDataStructureType() {
+ return DATA_STRUCTURE_TYPE;
+ }
+
+ public String toString() {
+ return value;
+ }
+
+ /**
+ * @openwire:property version=1
+ */
+ public String getValue() {
+ return value;
+ }
+
+ public void setValue(String brokerId) {
+ this.value = brokerId;
+ }
+
+ public boolean isMarshallAware() {
+ return false;
+ }
+}
Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/BrokerId.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/BrokerInfo.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/BrokerInfo.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/BrokerInfo.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/BrokerInfo.java Mon Jun 1 18:37:41 2009
@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.command;
+
+import org.apache.activemq.state.CommandVisitor;
+
+/**
+ * When a client connects to a broker, the broker send the client a BrokerInfo
+ * so that the client knows which broker node he's talking to and also any peers
+ * that the node has in his cluster. This is the broker helping the client out
+ * in discovering other nodes in the cluster.
+ *
+ * @openwire:marshaller code="2"
+ * @version $Revision: 1.7 $
+ */
+public class BrokerInfo extends BaseCommand {
+ public static final byte DATA_STRUCTURE_TYPE = CommandTypes.BROKER_INFO;
+ BrokerId brokerId;
+ String brokerURL;
+ boolean slaveBroker;
+ boolean masterBroker;
+ boolean faultTolerantConfiguration;
+ boolean networkConnection;
+ boolean duplexConnection;
+ BrokerInfo peerBrokerInfos[];
+ String brokerName;
+ long connectionId;
+ String brokerUploadUrl;
+ String networkProperties;
+
+ public boolean isBrokerInfo() {
+ return true;
+ }
+
+ public byte getDataStructureType() {
+ return DATA_STRUCTURE_TYPE;
+ }
+
+ /**
+ * @openwire:property version=1 cache=true
+ */
+ public BrokerId getBrokerId() {
+ return brokerId;
+ }
+
+ public void setBrokerId(BrokerId brokerId) {
+ this.brokerId = brokerId;
+ }
+
+ /**
+ * @openwire:property version=1
+ */
+ public String getBrokerURL() {
+ return brokerURL;
+ }
+
+ public void setBrokerURL(String brokerURL) {
+ this.brokerURL = brokerURL;
+ }
+
+ /**
+ * @openwire:property version=1 testSize=0
+ */
+ public BrokerInfo[] getPeerBrokerInfos() {
+ return peerBrokerInfos;
+ }
+
+ public void setPeerBrokerInfos(BrokerInfo[] peerBrokerInfos) {
+ this.peerBrokerInfos = peerBrokerInfos;
+ }
+
+ /**
+ * @openwire:property version=1
+ */
+ public String getBrokerName() {
+ return brokerName;
+ }
+
+ public void setBrokerName(String brokerName) {
+ this.brokerName = brokerName;
+ }
+
+ public Response visit(CommandVisitor visitor) throws Exception {
+ return visitor.processBrokerInfo(this);
+ }
+
+ /**
+ * @openwire:property version=1
+ */
+ public boolean isSlaveBroker() {
+ return slaveBroker;
+ }
+
+ public void setSlaveBroker(boolean slaveBroker) {
+ this.slaveBroker = slaveBroker;
+ }
+
+ /**
+ * @openwire:property version=1
+ */
+ public boolean isMasterBroker() {
+ return masterBroker;
+ }
+
+ /**
+ * @param masterBroker The masterBroker to set.
+ */
+ public void setMasterBroker(boolean masterBroker) {
+ this.masterBroker = masterBroker;
+ }
+
+ /**
+ * @openwire:property version=1
+ * @return Returns the faultTolerantConfiguration.
+ */
+ public boolean isFaultTolerantConfiguration() {
+ return faultTolerantConfiguration;
+ }
+
+ /**
+ * @param faultTolerantConfiguration The faultTolerantConfiguration to set.
+ */
+ public void setFaultTolerantConfiguration(boolean faultTolerantConfiguration) {
+ this.faultTolerantConfiguration = faultTolerantConfiguration;
+ }
+
+ /**
+ * @openwire:property version=2
+ * @return the duplexConnection
+ */
+ public boolean isDuplexConnection() {
+ return this.duplexConnection;
+ }
+
+ /**
+ * @param duplexConnection the duplexConnection to set
+ */
+ public void setDuplexConnection(boolean duplexConnection) {
+ this.duplexConnection = duplexConnection;
+ }
+
+ /**
+ * @openwire:property version=2
+ * @return the networkConnection
+ */
+ public boolean isNetworkConnection() {
+ return this.networkConnection;
+ }
+
+ /**
+ * @param networkConnection the networkConnection to set
+ */
+ public void setNetworkConnection(boolean networkConnection) {
+ this.networkConnection = networkConnection;
+ }
+
+ /**
+ * The broker assigns a each connection it accepts a connection id.
+ *
+ * @openwire:property version=2
+ */
+ public long getConnectionId() {
+ return connectionId;
+ }
+
+ public void setConnectionId(long connectionId) {
+ this.connectionId = connectionId;
+ }
+
+ /**
+ * The URL to use when uploading BLOBs to the broker or some other external
+ * file/http server
+ *
+ * @openwire:property version=3
+ */
+ public String getBrokerUploadUrl() {
+ return brokerUploadUrl;
+ }
+
+ public void setBrokerUploadUrl(String brokerUploadUrl) {
+ this.brokerUploadUrl = brokerUploadUrl;
+ }
+
+ /**
+ * @openwire:property version=3 cache=false
+ * @return the networkProperties
+ */
+ public String getNetworkProperties() {
+ return this.networkProperties;
+ }
+
+ /**
+ * @param networkProperties the networkProperties to set
+ */
+ public void setNetworkProperties(String networkProperties) {
+ this.networkProperties = networkProperties;
+ }
+}
Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/BrokerInfo.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/Command.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/Command.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/Command.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/Command.java Mon Jun 1 18:37:41 2009
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.command;
+
+import org.apache.activemq.state.CommandVisitor;
+
+/**
+ * The Command Pattern so that we can send and receive commands on the different
+ * transports
+ *
+ * @version $Revision: 1.7 $
+ */
+public interface Command extends DataStructure {
+
+ void setCommandId(int value);
+
+ /**
+ * @return the unique ID of this request used to map responses to requests
+ */
+ int getCommandId();
+
+ void setResponseRequired(boolean responseRequired);
+
+ boolean isResponseRequired();
+
+ boolean isResponse();
+
+ boolean isMessageDispatch();
+
+ boolean isBrokerInfo();
+
+ boolean isWireFormatInfo();
+
+ boolean isMessage();
+
+ boolean isMessageAck();
+
+ boolean isMessageDispatchNotification();
+
+ boolean isShutdownInfo();
+
+ Response visit(CommandVisitor visitor) throws Exception;
+
+ /**
+ * The endpoint within the transport where this message came from which
+ * could be null if the transport only supports a single endpoint.
+ */
+ Endpoint getFrom();
+
+ void setFrom(Endpoint from);
+
+ /**
+ * The endpoint within the transport where this message is going to - null
+ * means all endpoints.
+ */
+ Endpoint getTo();
+
+ void setTo(Endpoint to);
+}
Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/Command.java
------------------------------------------------------------------------------
svn:executable = *