You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2009/02/02 00:35:56 UTC
svn commit: r739885 [4/5] - in /activemq/activemq-blaze/trunk/src:
main/java/org/apache/activeblaze/ main/java/org/apache/activeblaze/group/
main/java/org/apache/activeblaze/impl/transport/
main/java/org/apache/activeblaze/jms/ main/java/org/apache/act...
Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsObjectMessage.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsObjectMessage.java?rev=739885&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsObjectMessage.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsObjectMessage.java Sun Feb 1 23:35:54 2009
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.jms.message;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import javax.jms.JMSException;
+import javax.jms.ObjectMessage;
+import org.apache.activeblaze.BlazeException;
+import org.apache.activeblaze.BlazeRuntimeException;
+import org.apache.activeblaze.jms.BlazeJmsExceptionSupport;
+import org.apache.activeblaze.util.ClassLoadingAwareObjectInputStream;
+import org.apache.activemq.protobuf.Buffer;
+import org.apache.activemq.protobuf.BufferInputStream;
+import org.apache.activemq.protobuf.BufferOutputStream;
+
+/**
+ * An <CODE>ObjectMessage</CODE> object is used to send a message that contains a serializable object in the Java
+ * programming language ("Java object"). It inherits from the <CODE>Message</CODE> interface and adds a body
+ * containing a single reference to an object. Only <CODE>Serializable</CODE> Java objects can be used. <p/>
+ * <P>
+ * If a collection of Java objects must be sent, one of the <CODE>Collection</CODE> classes provided since JDK 1.2 can
+ * be used. <p/>
+ * <P>
+ * When a client receives an <CODE>ObjectMessage</CODE>, it is in read-only mode. If a client attempts to write to
+ * the message at this point, a <CODE>MessageNotWriteableException</CODE> is thrown. If <CODE>clearBody</CODE> is
+ * called, the message can now be both read from and written to.
+ *
+ * @see javax.jms.Session#createObjectMessage()
+ * @see javax.jms.Session#createObjectMessage(Serializable)
+ * @see javax.jms.BytesMessage
+ * @see javax.jms.MapMessage
+ * @see javax.jms.Message
+ * @see javax.jms.StreamMessage
+ * @see javax.jms.TextMessage
+ */
+public class BlazeJmsObjectMessage extends BlazeJmsMessage implements ObjectMessage {
+ static final ClassLoader JMS_OBJECT_CLASS_LOADER = BlazeJmsObjectMessage.class.getClassLoader();
+ protected transient Serializable object;
+
+ public BlazeJmsObjectMessage clone() {
+ BlazeJmsObjectMessage copy = new BlazeJmsObjectMessage();
+ try {
+ copy(copy);
+ } catch (BlazeException e) {
+ throw new BlazeRuntimeException(e);
+ }
+ return copy;
+ }
+
+ /**
+ * @param copy
+ * @throws BlazeException
+ */
+ protected void copy(BlazeJmsObjectMessage copy) throws BlazeException {
+ storeContent();
+ super.copy(copy);
+ copy.object = null;
+ }
+
+ public void storeContent() {
+ super.storeContent();
+ Buffer payload = getContent().getPayload();
+ if (payload == null && this.object != null) {
+ try {
+ BufferOutputStream os = new BufferOutputStream(128);
+ DataOutputStream dataOut = new DataOutputStream(os);
+ ObjectOutputStream objOut = new ObjectOutputStream(dataOut);
+ objOut.writeObject(this.object);
+ objOut.flush();
+ objOut.reset();
+ objOut.close();
+ payload = os.toBuffer();
+ getContent().setPayload(payload);
+ } catch (IOException ioe) {
+ throw new RuntimeException(ioe.getMessage(), ioe);
+ }
+ }
+ }
+
+ /**
+ * Clears out the message body. Clearing a message's body does not clear its header values or property entries. <p/>
+ * <P>
+ * If this message body was read-only, calling this method leaves the message body in the same state as an empty
+ * body in a newly created message.
+ *
+ * @throws JMSException
+ * if the JMS provider fails to clear the message body due to some internal error.
+ */
+ public void clearBody() throws JMSException {
+ super.clearBody();
+ this.object = null;
+ }
+
+ /**
+ * Sets the serializable object containing this message's data. It is important to note that an
+ * <CODE>ObjectMessage</CODE> contains a snapshot of the object at the time <CODE>setObject()</CODE> is called;
+ * subsequent modifications of the object will have no effect on the <CODE>ObjectMessage</CODE> body.
+ *
+ * @param newObject
+ *
+ */
+ public void setObject(Serializable newObject) {
+ this.object = newObject;
+ setContent(null);
+ storeContent();
+ }
+
+ /**
+ * Gets the serializable object containing this message's data. The default value is null.
+ *
+ * @return the serializable object containing this message's data
+ * @throws JMSException
+ */
+ public Serializable getObject() throws JMSException {
+ if (this.object == null && getContent() != null && getContent().getPayload() != null) {
+ try {
+ Buffer payload = getContent().getPayload();
+ InputStream is = new BufferInputStream(payload);
+ DataInputStream dataIn = new DataInputStream(is);
+ ClassLoadingAwareObjectInputStream objIn = new ClassLoadingAwareObjectInputStream(dataIn);
+ try {
+ this.object = (Serializable) objIn.readObject();
+ } catch (ClassNotFoundException ce) {
+ throw new IOException(ce.getMessage());
+ }
+ dataIn.close();
+ } catch (IOException e) {
+ throw BlazeJmsExceptionSupport.create("Failed to build body from bytes. Reason: " + e, e);
+ }
+ }
+ return this.object;
+ }
+}
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsObjectMessage.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsObjectMessage.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsStreamMessage.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsStreamMessage.java?rev=739885&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsStreamMessage.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsStreamMessage.java Sun Feb 1 23:35:54 2009
@@ -0,0 +1,1109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.jms.message;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import javax.jms.JMSException;
+import javax.jms.MessageEOFException;
+import javax.jms.MessageFormatException;
+import javax.jms.MessageNotReadableException;
+import javax.jms.MessageNotWriteableException;
+import javax.jms.StreamMessage;
+import org.apache.activeblaze.BlazeException;
+import org.apache.activeblaze.BlazeRuntimeException;
+import org.apache.activeblaze.jms.BlazeJmsExceptionSupport;
+import org.apache.activeblaze.wire.BlazeData;
+import org.apache.activemq.protobuf.Buffer;
+import org.apache.activemq.protobuf.BufferInputStream;
+
+/**
+ * A <CODE>StreamMessage</CODE> object is used to send a stream of primitive types in the Java programming language.
+ * It is filled and read sequentially. It inherits from the <CODE>Message</CODE> interface and adds a stream message
+ * body. Its methods are based largely on those found in <CODE>java.io.DataInputStream</CODE> and
+ * <CODE>java.io.DataOutputStream</CODE>. <p/>
+ * <P>
+ * The primitive types can be read or written explicitly using methods for each type. They may also be read or written
+ * generically as objects. For instance, a call to <CODE>StreamMessage.writeInt(6)</CODE> is equivalent to
+ * <CODE>StreamMessage.writeObject(new
+ * Integer(6))</CODE>. Both forms are provided, because the explicit form is
+ * convenient for static programming, and the object form is needed when types are not known at compile time. <p/>
+ * <P>
+ * When the message is first created, and when <CODE>clearBody</CODE> is called, the body of the message is in
+ * write-only mode. After the first call to <CODE>reset</CODE> has been made, the message body is in read-only mode.
+ * After a message has been sent, the client that sent it can retain and modify it without affecting the message that
+ * has been sent. The same message object can be sent multiple times. When a message has been received, the provider has
+ * called <CODE>reset</CODE> so that the message body is in read-only mode for the client. <p/>
+ * <P>
+ * If <CODE>clearBody</CODE> is called on a message in read-only mode, the message body is cleared and the message
+ * body is in write-only mode. <p/>
+ * <P>
+ * If a client attempts to read a message in write-only mode, a <CODE>MessageNotReadableException</CODE> is thrown.
+ * <p/>
+ * <P>
+ * If a client attempts to write a message in read-only mode, a <CODE>MessageNotWriteableException</CODE> is thrown.
+ * <p/>
+ * <P>
+ * <CODE>StreamMessage</CODE> objects support the following conversion table. The marked cases must be supported. The
+ * unmarked cases must throw a <CODE>JMSException</CODE>. The <CODE>String</CODE>-to-primitive conversions may
+ * throw a runtime exception if the primitive's <CODE>valueOf()</CODE> method does not accept it as a valid
+ * <CODE>String</CODE> representation of the primitive. <p/>
+ * <P>
+ * A value written as the row type can be read as the column type. <p/>
+ *
+ * <PRE>
+ * | | boolean byte short char int long float double String byte[]
+ * |----------------------------------------------------------------------
+ * |boolean | X X |byte | X X X X X |short | X X X X |char | X X |int | X X X
+ * |long | X X |float | X X X |double | X X |String | X X X X X X X X |byte[] |
+ * X |----------------------------------------------------------------------
+ *
+ * </PRE>
+ *
+ * <p/>
+ * <P>
+ * Attempting to read a null value as a primitive type must be treated as calling the primitive's corresponding
+ * <code>valueOf(String)</code> conversion method with a null value. Since <code>char</code> does not support a
+ * <code>String</code> conversion, attempting to read a null value as a <code>char</code> must throw a
+ * <code>NullPointerException</code>.
+ *
+ *
+ * @see javax.jms.Session#createStreamMessage()
+ * @see javax.jms.BytesMessage
+ * @see javax.jms.MapMessage
+ * @see javax.jms.Message
+ * @see javax.jms.ObjectMessage
+ * @see javax.jms.TextMessage
+ */
+public class BlazeJmsStreamMessage extends BlazeJmsMessage implements StreamMessage {
+ protected transient DataOutputStream dataOut;
+ protected transient ByteArrayOutputStream bytesOut;
+ protected transient DataInputStream dataIn;
+ protected transient int remainingBytes = -1;
+
+ public BlazeJmsStreamMessage clone() {
+ BlazeJmsStreamMessage copy = new BlazeJmsStreamMessage();
+ try {
+ copy(copy);
+ } catch (BlazeException e) {
+ throw new BlazeRuntimeException(e);
+ }
+ return copy;
+ }
+
+ /**
+ * @param copy
+ * @throws BlazeException
+ */
+ protected void copy(BlazeJmsStreamMessage copy) throws BlazeException {
+ storeContent();
+ super.copy(copy);
+ copy.dataOut = null;
+ copy.bytesOut = null;
+ copy.dataIn = null;
+ }
+
+ public void storeContent() {
+ super.storeContent();
+ if (this.dataOut != null && getContent() != null) {
+ try {
+ this.dataOut.close();
+ Buffer buffer = new Buffer(bytesOut.toByteArray());
+ getContent().setPayload(buffer);
+ this.bytesOut = null;
+ this.dataOut = null;
+ } catch (IOException ioe) {
+ throw new RuntimeException(ioe);
+ }
+ }
+ }
+
+ protected void loadContent() {
+ super.loadContent();
+ if (this.dataIn == null) {
+ BlazeData bd = getContent();
+ Buffer data = null;
+ if (bd == null || bd.getPayload() == null) {
+ data = bd.getPayload();
+ if (data == null) {
+ data = new Buffer(new byte[0]);
+ }
+ }
+ if (data == null) {
+ data = bd.getPayload();
+ }
+ BufferInputStream is = new BufferInputStream(data);
+ this.dataIn = new DataInputStream(is);
+ }
+ }
+
+ /**
+ * Clears out the message body. Clearing a message's body does not clear its header values or property entries. <p/>
+ * <P>
+ * If this message body was read-only, calling this method leaves the message body in the same state as an empty
+ * body in a newly created message.
+ *
+ * @throws JMSException
+ * if the JMS provider fails to clear the message body due to some internal error.
+ */
+ public void clearBody() throws JMSException {
+ super.clearBody();
+ this.dataOut = null;
+ this.dataIn = null;
+ this.bytesOut = null;
+ this.remainingBytes = -1;
+ }
+
+ /**
+ * Reads a <code>boolean</code> from the stream message.
+ *
+ * @return the <code>boolean</code> value read
+ * @throws JMSException
+ * if the JMS provider fails to read the message due to some internal error.
+ * @throws MessageEOFException
+ * if unexpected end of message stream has been reached.
+ * @throws MessageFormatException
+ * if this type conversion is invalid.
+ * @throws MessageNotReadableException
+ * if the message is in write-only mode.
+ */
+ public boolean readBoolean() throws JMSException {
+ initializeReading();
+ try {
+ this.dataIn.mark(10);
+ int type = this.dataIn.read();
+ if (type == -1) {
+ throw new MessageEOFException("reached end of data");
+ }
+ if (type == MarshallingSupport.BOOLEAN_TYPE) {
+ return this.dataIn.readBoolean();
+ }
+ if (type == MarshallingSupport.STRING_TYPE) {
+ return Boolean.valueOf(this.dataIn.readUTF()).booleanValue();
+ }
+ if (type == MarshallingSupport.NULL) {
+ this.dataIn.reset();
+ throw new NullPointerException("Cannot convert NULL value to boolean.");
+ }
+ this.dataIn.reset();
+ throw new MessageFormatException(" not a boolean type");
+ } catch (EOFException e) {
+ throw BlazeJmsExceptionSupport.createMessageEOFException(e);
+ } catch (IOException e) {
+ throw BlazeJmsExceptionSupport.createMessageFormatException(e);
+ }
+ }
+
+ /**
+ * Reads a <code>byte</code> value from the stream message.
+ *
+ * @return the next byte from the stream message as a 8-bit <code>byte</code>
+ * @throws JMSException
+ * if the JMS provider fails to read the message due to some internal error.
+ * @throws MessageEOFException
+ * if unexpected end of message stream has been reached.
+ * @throws MessageFormatException
+ * if this type conversion is invalid.
+ * @throws MessageNotReadableException
+ * if the message is in write-only mode.
+ */
+ public byte readByte() throws JMSException {
+ initializeReading();
+ try {
+ this.dataIn.mark(10);
+ int type = this.dataIn.read();
+ if (type == -1) {
+ throw new MessageEOFException("reached end of data");
+ }
+ if (type == MarshallingSupport.BYTE_TYPE) {
+ return this.dataIn.readByte();
+ }
+ if (type == MarshallingSupport.STRING_TYPE) {
+ return Byte.valueOf(this.dataIn.readUTF()).byteValue();
+ }
+ if (type == MarshallingSupport.NULL) {
+ this.dataIn.reset();
+ throw new NullPointerException("Cannot convert NULL value to byte.");
+ } else {
+ this.dataIn.reset();
+ throw new MessageFormatException(" not a byte type");
+ }
+ } catch (NumberFormatException mfe) {
+ try {
+ this.dataIn.reset();
+ } catch (IOException ioe) {
+ throw BlazeJmsExceptionSupport.create(ioe);
+ }
+ throw mfe;
+ } catch (EOFException e) {
+ throw BlazeJmsExceptionSupport.createMessageEOFException(e);
+ } catch (IOException e) {
+ throw BlazeJmsExceptionSupport.createMessageFormatException(e);
+ }
+ }
+
+ /**
+ * Reads a 16-bit integer from the stream message.
+ *
+ * @return a 16-bit integer from the stream message
+ * @throws JMSException
+ * if the JMS provider fails to read the message due to some internal error.
+ * @throws MessageEOFException
+ * if unexpected end of message stream has been reached.
+ * @throws MessageFormatException
+ * if this type conversion is invalid.
+ * @throws MessageNotReadableException
+ * if the message is in write-only mode.
+ */
+ public short readShort() throws JMSException {
+ initializeReading();
+ try {
+ this.dataIn.mark(17);
+ int type = this.dataIn.read();
+ if (type == -1) {
+ throw new MessageEOFException("reached end of data");
+ }
+ if (type == MarshallingSupport.SHORT_TYPE) {
+ return this.dataIn.readShort();
+ }
+ if (type == MarshallingSupport.BYTE_TYPE) {
+ return this.dataIn.readByte();
+ }
+ if (type == MarshallingSupport.STRING_TYPE) {
+ return Short.valueOf(this.dataIn.readUTF()).shortValue();
+ }
+ if (type == MarshallingSupport.NULL) {
+ this.dataIn.reset();
+ throw new NullPointerException("Cannot convert NULL value to short.");
+ } else {
+ this.dataIn.reset();
+ throw new MessageFormatException(" not a short type");
+ }
+ } catch (NumberFormatException mfe) {
+ try {
+ this.dataIn.reset();
+ } catch (IOException ioe) {
+ throw BlazeJmsExceptionSupport.create(ioe);
+ }
+ throw mfe;
+ } catch (EOFException e) {
+ throw BlazeJmsExceptionSupport.createMessageEOFException(e);
+ } catch (IOException e) {
+ throw BlazeJmsExceptionSupport.createMessageFormatException(e);
+ }
+ }
+
+ /**
+ * Reads a Unicode character value from the stream message.
+ *
+ * @return a Unicode character from the stream message
+ * @throws JMSException
+ * if the JMS provider fails to read the message due to some internal error.
+ * @throws MessageEOFException
+ * if unexpected end of message stream has been reached.
+ * @throws MessageFormatException
+ * if this type conversion is invalid
+ * @throws MessageNotReadableException
+ * if the message is in write-only mode.
+ */
+ public char readChar() throws JMSException {
+ initializeReading();
+ try {
+ this.dataIn.mark(17);
+ int type = this.dataIn.read();
+ if (type == -1) {
+ throw new MessageEOFException("reached end of data");
+ }
+ if (type == MarshallingSupport.CHAR_TYPE) {
+ return this.dataIn.readChar();
+ }
+ if (type == MarshallingSupport.NULL) {
+ this.dataIn.reset();
+ throw new NullPointerException("Cannot convert NULL value to char.");
+ } else {
+ this.dataIn.reset();
+ throw new MessageFormatException(" not a char type");
+ }
+ } catch (NumberFormatException mfe) {
+ try {
+ this.dataIn.reset();
+ } catch (IOException ioe) {
+ throw BlazeJmsExceptionSupport.create(ioe);
+ }
+ throw mfe;
+ } catch (EOFException e) {
+ throw BlazeJmsExceptionSupport.createMessageEOFException(e);
+ } catch (IOException e) {
+ throw BlazeJmsExceptionSupport.createMessageFormatException(e);
+ }
+ }
+
+ /**
+ * Reads a 32-bit integer from the stream message.
+ *
+ * @return a 32-bit integer value from the stream message, interpreted as an <code>int</code>
+ * @throws JMSException
+ * if the JMS provider fails to read the message due to some internal error.
+ * @throws MessageEOFException
+ * if unexpected end of message stream has been reached.
+ * @throws MessageFormatException
+ * if this type conversion is invalid.
+ * @throws MessageNotReadableException
+ * if the message is in write-only mode.
+ */
+ public int readInt() throws JMSException {
+ initializeReading();
+ try {
+ this.dataIn.mark(33);
+ int type = this.dataIn.read();
+ if (type == -1) {
+ throw new MessageEOFException("reached end of data");
+ }
+ if (type == MarshallingSupport.INTEGER_TYPE) {
+ return this.dataIn.readInt();
+ }
+ if (type == MarshallingSupport.SHORT_TYPE) {
+ return this.dataIn.readShort();
+ }
+ if (type == MarshallingSupport.BYTE_TYPE) {
+ return this.dataIn.readByte();
+ }
+ if (type == MarshallingSupport.STRING_TYPE) {
+ return Integer.valueOf(this.dataIn.readUTF()).intValue();
+ }
+ if (type == MarshallingSupport.NULL) {
+ this.dataIn.reset();
+ throw new NullPointerException("Cannot convert NULL value to int.");
+ } else {
+ this.dataIn.reset();
+ throw new MessageFormatException(" not an int type");
+ }
+ } catch (NumberFormatException mfe) {
+ try {
+ this.dataIn.reset();
+ } catch (IOException ioe) {
+ throw BlazeJmsExceptionSupport.create(ioe);
+ }
+ throw mfe;
+ } catch (EOFException e) {
+ throw BlazeJmsExceptionSupport.createMessageEOFException(e);
+ } catch (IOException e) {
+ throw BlazeJmsExceptionSupport.createMessageFormatException(e);
+ }
+ }
+
+ /**
+ * Reads a 64-bit integer from the stream message.
+ *
+ * @return a 64-bit integer value from the stream message, interpreted as a <code>long</code>
+ * @throws JMSException
+ * if the JMS provider fails to read the message due to some internal error.
+ * @throws MessageEOFException
+ * if unexpected end of message stream has been reached.
+ * @throws MessageFormatException
+ * if this type conversion is invalid.
+ * @throws MessageNotReadableException
+ * if the message is in write-only mode.
+ */
+ public long readLong() throws JMSException {
+ initializeReading();
+ try {
+ this.dataIn.mark(65);
+ int type = this.dataIn.read();
+ if (type == -1) {
+ throw new MessageEOFException("reached end of data");
+ }
+ if (type == MarshallingSupport.LONG_TYPE) {
+ return this.dataIn.readLong();
+ }
+ if (type == MarshallingSupport.INTEGER_TYPE) {
+ return this.dataIn.readInt();
+ }
+ if (type == MarshallingSupport.SHORT_TYPE) {
+ return this.dataIn.readShort();
+ }
+ if (type == MarshallingSupport.BYTE_TYPE) {
+ return this.dataIn.readByte();
+ }
+ if (type == MarshallingSupport.STRING_TYPE) {
+ return Long.valueOf(this.dataIn.readUTF()).longValue();
+ }
+ if (type == MarshallingSupport.NULL) {
+ this.dataIn.reset();
+ throw new NullPointerException("Cannot convert NULL value to long.");
+ } else {
+ this.dataIn.reset();
+ throw new MessageFormatException(" not a long type");
+ }
+ } catch (NumberFormatException mfe) {
+ try {
+ this.dataIn.reset();
+ } catch (IOException ioe) {
+ throw BlazeJmsExceptionSupport.create(ioe);
+ }
+ throw mfe;
+ } catch (EOFException e) {
+ throw BlazeJmsExceptionSupport.createMessageEOFException(e);
+ } catch (IOException e) {
+ throw BlazeJmsExceptionSupport.createMessageFormatException(e);
+ }
+ }
+
+ /**
+ * Reads a <code>float</code> from the stream message.
+ *
+ * @return a <code>float</code> value from the stream message
+ * @throws JMSException
+ * if the JMS provider fails to read the message due to some internal error.
+ * @throws MessageEOFException
+ * if unexpected end of message stream has been reached.
+ * @throws MessageFormatException
+ * if this type conversion is invalid.
+ * @throws MessageNotReadableException
+ * if the message is in write-only mode.
+ */
+ public float readFloat() throws JMSException {
+ initializeReading();
+ try {
+ this.dataIn.mark(33);
+ int type = this.dataIn.read();
+ if (type == -1) {
+ throw new MessageEOFException("reached end of data");
+ }
+ if (type == MarshallingSupport.FLOAT_TYPE) {
+ return this.dataIn.readFloat();
+ }
+ if (type == MarshallingSupport.STRING_TYPE) {
+ return Float.valueOf(this.dataIn.readUTF()).floatValue();
+ }
+ if (type == MarshallingSupport.NULL) {
+ this.dataIn.reset();
+ throw new NullPointerException("Cannot convert NULL value to float.");
+ } else {
+ this.dataIn.reset();
+ throw new MessageFormatException(" not a float type");
+ }
+ } catch (NumberFormatException mfe) {
+ try {
+ this.dataIn.reset();
+ } catch (IOException ioe) {
+ throw BlazeJmsExceptionSupport.create(ioe);
+ }
+ throw mfe;
+ } catch (EOFException e) {
+ throw BlazeJmsExceptionSupport.createMessageEOFException(e);
+ } catch (IOException e) {
+ throw BlazeJmsExceptionSupport.createMessageFormatException(e);
+ }
+ }
+
+ /**
+ * Reads a <code>double</code> from the stream message.
+ *
+ * @return a <code>double</code> value from the stream message
+ * @throws JMSException
+ * if the JMS provider fails to read the message due to some internal error.
+ * @throws MessageEOFException
+ * if unexpected end of message stream has been reached.
+ * @throws MessageFormatException
+ * if this type conversion is invalid.
+ * @throws MessageNotReadableException
+ * if the message is in write-only mode.
+ */
+ public double readDouble() throws JMSException {
+ initializeReading();
+ try {
+ this.dataIn.mark(65);
+ int type = this.dataIn.read();
+ if (type == -1) {
+ throw new MessageEOFException("reached end of data");
+ }
+ if (type == MarshallingSupport.DOUBLE_TYPE) {
+ return this.dataIn.readDouble();
+ }
+ if (type == MarshallingSupport.FLOAT_TYPE) {
+ return this.dataIn.readFloat();
+ }
+ if (type == MarshallingSupport.STRING_TYPE) {
+ return Double.valueOf(this.dataIn.readUTF()).doubleValue();
+ }
+ if (type == MarshallingSupport.NULL) {
+ this.dataIn.reset();
+ throw new NullPointerException("Cannot convert NULL value to double.");
+ } else {
+ this.dataIn.reset();
+ throw new MessageFormatException(" not a double type");
+ }
+ } catch (NumberFormatException mfe) {
+ try {
+ this.dataIn.reset();
+ } catch (IOException ioe) {
+ throw BlazeJmsExceptionSupport.create(ioe);
+ }
+ throw mfe;
+ } catch (EOFException e) {
+ throw BlazeJmsExceptionSupport.createMessageEOFException(e);
+ } catch (IOException e) {
+ throw BlazeJmsExceptionSupport.createMessageFormatException(e);
+ }
+ }
+
+ /**
+ * Reads a <CODE>String</CODE> from the stream message.
+ *
+ * @return a Unicode string from the stream message
+ * @throws JMSException
+ * if the JMS provider fails to read the message due to some internal error.
+ * @throws MessageEOFException
+ * if unexpected end of message stream has been reached.
+ * @throws MessageFormatException
+ * if this type conversion is invalid.
+ * @throws MessageNotReadableException
+ * if the message is in write-only mode.
+ */
+ public String readString() throws JMSException {
+ initializeReading();
+ try {
+ this.dataIn.mark(65);
+ int type = this.dataIn.read();
+ if (type == -1) {
+ throw new MessageEOFException("reached end of data");
+ }
+ if (type == MarshallingSupport.NULL) {
+ return null;
+ }
+ if (type == MarshallingSupport.BIG_STRING_TYPE) {
+ return MarshallingSupport.readUTF8(dataIn);
+ }
+ if (type == MarshallingSupport.STRING_TYPE) {
+ return this.dataIn.readUTF();
+ }
+ if (type == MarshallingSupport.LONG_TYPE) {
+ return new Long(this.dataIn.readLong()).toString();
+ }
+ if (type == MarshallingSupport.INTEGER_TYPE) {
+ return new Integer(this.dataIn.readInt()).toString();
+ }
+ if (type == MarshallingSupport.SHORT_TYPE) {
+ return new Short(this.dataIn.readShort()).toString();
+ }
+ if (type == MarshallingSupport.BYTE_TYPE) {
+ return new Byte(this.dataIn.readByte()).toString();
+ }
+ if (type == MarshallingSupport.FLOAT_TYPE) {
+ return new Float(this.dataIn.readFloat()).toString();
+ }
+ if (type == MarshallingSupport.DOUBLE_TYPE) {
+ return new Double(this.dataIn.readDouble()).toString();
+ }
+ if (type == MarshallingSupport.BOOLEAN_TYPE) {
+ return (this.dataIn.readBoolean() ? Boolean.TRUE : Boolean.FALSE).toString();
+ }
+ if (type == MarshallingSupport.CHAR_TYPE) {
+ return new Character(this.dataIn.readChar()).toString();
+ } else {
+ this.dataIn.reset();
+ throw new MessageFormatException(" not a String type");
+ }
+ } catch (NumberFormatException mfe) {
+ try {
+ this.dataIn.reset();
+ } catch (IOException ioe) {
+ throw BlazeJmsExceptionSupport.create(ioe);
+ }
+ throw mfe;
+ } catch (EOFException e) {
+ throw BlazeJmsExceptionSupport.createMessageEOFException(e);
+ } catch (IOException e) {
+ throw BlazeJmsExceptionSupport.createMessageFormatException(e);
+ }
+ }
+
+ /**
+ * Reads a byte array field from the stream message into the specified <CODE>byte[]</CODE> object (the read
+ * buffer). <p/>
+ * <P>
+ * To read the field value, <CODE>readBytes</CODE> should be successively called until it returns a value less
+ * than the length of the read buffer. The value of the bytes in the buffer following the last byte read is
+ * undefined. <p/>
+ * <P>
+ * If <CODE>readBytes</CODE> returns a value equal to the length of the buffer, a subsequent
+ * <CODE>readBytes</CODE> call must be made. If there are no more bytes to be read, this call returns -1. <p/>
+ * <P>
+ * If the byte array field value is null, <CODE>readBytes</CODE> returns -1. <p/>
+ * <P>
+ * If the byte array field value is empty, <CODE>readBytes</CODE> returns 0. <p/>
+ * <P>
+ * Once the first <CODE>readBytes</CODE> call on a <CODE>byte[]</CODE> field value has been made, the full value
+ * of the field must be read before it is valid to read the next field. An attempt to read the next field before
+ * that has been done will throw a <CODE>MessageFormatException</CODE>. <p/>
+ * <P>
+ * To read the byte field value into a new <CODE>byte[]</CODE> object, use the <CODE>readObject</CODE> method.
+ *
+ * @param value
+ * the buffer into which the data is read
+ * @return the total number of bytes read into the buffer, or -1 if there is no more data because the end of the
+ * byte field has been reached
+ * @throws JMSException
+ * if the JMS provider fails to read the message due to some internal error.
+ * @throws MessageEOFException
+ * if unexpected end of message stream has been reached.
+ * @throws MessageFormatException
+ * if this type conversion is invalid.
+ * @throws MessageNotReadableException
+ * if the message is in write-only mode.
+ * @see #readObject()
+ */
+ public int readBytes(byte[] value) throws JMSException {
+ initializeReading();
+ try {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ if (remainingBytes == -1) {
+ this.dataIn.mark(value.length + 1);
+ int type = this.dataIn.read();
+ if (type == -1) {
+ throw new MessageEOFException("reached end of data");
+ }
+ if (type != MarshallingSupport.BYTE_ARRAY_TYPE) {
+ throw new MessageFormatException("Not a byte array");
+ }
+ remainingBytes = this.dataIn.readInt();
+ } else if (remainingBytes == 0) {
+ remainingBytes = -1;
+ return -1;
+ }
+ if (value.length <= remainingBytes) {
+ // small buffer
+ remainingBytes -= value.length;
+ this.dataIn.readFully(value);
+ return value.length;
+ } else {
+ // big buffer
+ int rc = this.dataIn.read(value, 0, remainingBytes);
+ remainingBytes = 0;
+ return rc;
+ }
+ } catch (EOFException e) {
+ JMSException jmsEx = new MessageEOFException(e.getMessage());
+ jmsEx.setLinkedException(e);
+ throw jmsEx;
+ } catch (IOException e) {
+ JMSException jmsEx = new MessageFormatException(e.getMessage());
+ jmsEx.setLinkedException(e);
+ throw jmsEx;
+ }
+ }
+
+ /**
+ * Reads an object from the stream message. <p/>
+ * <P>
+ * This method can be used to return, in objectified format, an object in the Java programming language ("Java
+ * object") that has been written to the stream with the equivalent <CODE>writeObject</CODE> method call, or its
+ * equivalent primitive <CODE>write<I>type</I></CODE> method. <p/>
+ * <P>
+ * Note that byte values are returned as <CODE>byte[]</CODE>, not <CODE>Byte[]</CODE>. <p/>
+ * <P>
+ * An attempt to call <CODE>readObject</CODE> to read a byte field value into a new <CODE>byte[]</CODE> object
+ * before the full value of the byte field has been read will throw a <CODE>MessageFormatException</CODE>.
+ *
+ * @return a Java object from the stream message, in objectified format (for example, if the object was written as
+ * an <CODE>int</CODE>, an <CODE>Integer</CODE> is returned)
+ * @throws JMSException
+ * if the JMS provider fails to read the message due to some internal error.
+ * @throws MessageEOFException
+ * if unexpected end of message stream has been reached.
+ * @throws MessageFormatException
+ * if this type conversion is invalid.
+ * @throws MessageNotReadableException
+ * if the message is in write-only mode.
+ * @see #readBytes(byte[] value)
+ */
+ public Object readObject() throws JMSException {
+ initializeReading();
+ try {
+ this.dataIn.mark(65);
+ int type = this.dataIn.read();
+ if (type == -1) {
+ throw new MessageEOFException("reached end of data");
+ }
+ if (type == MarshallingSupport.NULL) {
+ return null;
+ }
+ if (type == MarshallingSupport.BIG_STRING_TYPE) {
+ return MarshallingSupport.readUTF8(dataIn);
+ }
+ if (type == MarshallingSupport.STRING_TYPE) {
+ return this.dataIn.readUTF();
+ }
+ if (type == MarshallingSupport.LONG_TYPE) {
+ return Long.valueOf(this.dataIn.readLong());
+ }
+ if (type == MarshallingSupport.INTEGER_TYPE) {
+ return Integer.valueOf(this.dataIn.readInt());
+ }
+ if (type == MarshallingSupport.SHORT_TYPE) {
+ return Short.valueOf(this.dataIn.readShort());
+ }
+ if (type == MarshallingSupport.BYTE_TYPE) {
+ return Byte.valueOf(this.dataIn.readByte());
+ }
+ if (type == MarshallingSupport.FLOAT_TYPE) {
+ return new Float(this.dataIn.readFloat());
+ }
+ if (type == MarshallingSupport.DOUBLE_TYPE) {
+ return new Double(this.dataIn.readDouble());
+ }
+ if (type == MarshallingSupport.BOOLEAN_TYPE) {
+ return this.dataIn.readBoolean() ? Boolean.TRUE : Boolean.FALSE;
+ }
+ if (type == MarshallingSupport.CHAR_TYPE) {
+ return Character.valueOf(this.dataIn.readChar());
+ }
+ if (type == MarshallingSupport.BYTE_ARRAY_TYPE) {
+ int len = this.dataIn.readInt();
+ byte[] value = new byte[len];
+ this.dataIn.readFully(value);
+ return value;
+ } else {
+ this.dataIn.reset();
+ throw new MessageFormatException("unknown type");
+ }
+ } catch (NumberFormatException mfe) {
+ try {
+ this.dataIn.reset();
+ } catch (IOException ioe) {
+ throw BlazeJmsExceptionSupport.create(ioe);
+ }
+ throw mfe;
+ } catch (EOFException e) {
+ JMSException jmsEx = new MessageEOFException(e.getMessage());
+ jmsEx.setLinkedException(e);
+ throw jmsEx;
+ } catch (IOException e) {
+ JMSException jmsEx = new MessageFormatException(e.getMessage());
+ jmsEx.setLinkedException(e);
+ throw jmsEx;
+ }
+ }
+
+ /**
+ * Writes a <code>boolean</code> to the stream message. The value <code>true</code> is written as the value
+ * <code>(byte)1</code>; the value <code>false</code> is written as the value <code>(byte)0</code>.
+ *
+ * @param value
+ * the <code>boolean</code> value to be written
+ * @throws JMSException
+ * if the JMS provider fails to write the message due to some internal error.
+ * @throws MessageNotWriteableException
+ * if the message is in read-only mode.
+ */
+ public void writeBoolean(boolean value) throws JMSException {
+ initializeWriting();
+ try {
+ MarshallingSupport.marshalBoolean(dataOut, value);
+ } catch (IOException ioe) {
+ throw BlazeJmsExceptionSupport.create(ioe);
+ }
+ }
+
+ /**
+ * Writes a <code>byte</code> to the stream message.
+ *
+ * @param value
+ * the <code>byte</code> value to be written
+ * @throws JMSException
+ * if the JMS provider fails to write the message due to some internal error.
+ * @throws MessageNotWriteableException
+ * if the message is in read-only mode.
+ */
+ public void writeByte(byte value) throws JMSException {
+ initializeWriting();
+ try {
+ MarshallingSupport.marshalByte(dataOut, value);
+ } catch (IOException ioe) {
+ throw BlazeJmsExceptionSupport.create(ioe);
+ }
+ }
+
+ /**
+ * Writes a <code>short</code> to the stream message.
+ *
+ * @param value
+ * the <code>short</code> value to be written
+ * @throws JMSException
+ * if the JMS provider fails to write the message due to some internal error.
+ * @throws MessageNotWriteableException
+ * if the message is in read-only mode.
+ */
+ public void writeShort(short value) throws JMSException {
+ initializeWriting();
+ try {
+ MarshallingSupport.marshalShort(dataOut, value);
+ } catch (IOException ioe) {
+ throw BlazeJmsExceptionSupport.create(ioe);
+ }
+ }
+
+ /**
+ * Writes a <code>char</code> to the stream message.
+ *
+ * @param value
+ * the <code>char</code> value to be written
+ * @throws JMSException
+ * if the JMS provider fails to write the message due to some internal error.
+ * @throws MessageNotWriteableException
+ * if the message is in read-only mode.
+ */
+ public void writeChar(char value) throws JMSException {
+ initializeWriting();
+ try {
+ MarshallingSupport.marshalChar(dataOut, value);
+ } catch (IOException ioe) {
+ throw BlazeJmsExceptionSupport.create(ioe);
+ }
+ }
+
+ /**
+ * Writes an <code>int</code> to the stream message.
+ *
+ * @param value
+ * the <code>int</code> value to be written
+ * @throws JMSException
+ * if the JMS provider fails to write the message due to some internal error.
+ * @throws MessageNotWriteableException
+ * if the message is in read-only mode.
+ */
+ public void writeInt(int value) throws JMSException {
+ initializeWriting();
+ try {
+ MarshallingSupport.marshalInt(dataOut, value);
+ } catch (IOException ioe) {
+ throw BlazeJmsExceptionSupport.create(ioe);
+ }
+ }
+
+ /**
+ * Writes a <code>long</code> to the stream message.
+ *
+ * @param value
+ * the <code>long</code> value to be written
+ * @throws JMSException
+ * if the JMS provider fails to write the message due to some internal error.
+ * @throws MessageNotWriteableException
+ * if the message is in read-only mode.
+ */
+ public void writeLong(long value) throws JMSException {
+ initializeWriting();
+ try {
+ MarshallingSupport.marshalLong(dataOut, value);
+ } catch (IOException ioe) {
+ throw BlazeJmsExceptionSupport.create(ioe);
+ }
+ }
+
+ /**
+ * Writes a <code>float</code> to the stream message.
+ *
+ * @param value
+ * the <code>float</code> value to be written
+ * @throws JMSException
+ * if the JMS provider fails to write the message due to some internal error.
+ * @throws MessageNotWriteableException
+ * if the message is in read-only mode.
+ */
+ public void writeFloat(float value) throws JMSException {
+ initializeWriting();
+ try {
+ MarshallingSupport.marshalFloat(dataOut, value);
+ } catch (IOException ioe) {
+ throw BlazeJmsExceptionSupport.create(ioe);
+ }
+ }
+
+ /**
+ * Writes a <code>double</code> to the stream message.
+ *
+ * @param value
+ * the <code>double</code> value to be written
+ * @throws JMSException
+ * if the JMS provider fails to write the message due to some internal error.
+ * @throws MessageNotWriteableException
+ * if the message is in read-only mode.
+ */
+ public void writeDouble(double value) throws JMSException {
+ initializeWriting();
+ try {
+ MarshallingSupport.marshalDouble(dataOut, value);
+ } catch (IOException ioe) {
+ throw BlazeJmsExceptionSupport.create(ioe);
+ }
+ }
+
+ /**
+ * Writes a <code>String</code> to the stream message.
+ *
+ * @param value
+ * the <code>String</code> value to be written
+ * @throws JMSException
+ * if the JMS provider fails to write the message due to some internal error.
+ * @throws MessageNotWriteableException
+ * if the message is in read-only mode.
+ */
+ public void writeString(String value) throws JMSException {
+ initializeWriting();
+ try {
+ if (value == null) {
+ MarshallingSupport.marshalNull(dataOut);
+ } else {
+ MarshallingSupport.marshalString(dataOut, value);
+ }
+ } catch (IOException ioe) {
+ throw BlazeJmsExceptionSupport.create(ioe);
+ }
+ }
+
+ /**
+ * Writes a byte array field to the stream message. <p/>
+ * <P>
+ * The byte array <code>value</code> is written to the message as a byte array field. Consecutively written byte
+ * array fields are treated as two distinct fields when the fields are read.
+ *
+ * @param value
+ * the byte array value to be written
+ * @throws JMSException
+ * if the JMS provider fails to write the message due to some internal error.
+ * @throws MessageNotWriteableException
+ * if the message is in read-only mode.
+ */
+ public void writeBytes(byte[] value) throws JMSException {
+ writeBytes(value, 0, value.length);
+ }
+
+ /**
+ * Writes a portion of a byte array as a byte array field to the stream message. <p/>
+ * <P>
+ * The a portion of the byte array <code>value</code> is written to the message as a byte array field.
+ * Consecutively written byte array fields are treated as two distinct fields when the fields are read.
+ *
+ * @param value
+ * the byte array value to be written
+ * @param offset
+ * the initial offset within the byte array
+ * @param length
+ * the number of bytes to use
+ * @throws JMSException
+ * if the JMS provider fails to write the message due to some internal error.
+ * @throws MessageNotWriteableException
+ * if the message is in read-only mode.
+ */
+ public void writeBytes(byte[] value, int offset, int length) throws JMSException {
+ initializeWriting();
+ try {
+ MarshallingSupport.marshalByteArray(dataOut, value, offset, length);
+ } catch (IOException ioe) {
+ throw BlazeJmsExceptionSupport.create(ioe);
+ }
+ }
+
+ /**
+ * Writes an object to the stream message. <p/>
+ * <P>
+ * This method works only for the objectified primitive object types (<code>Integer</code>, <code>Double</code>,
+ * <code>Long</code> ...), <code>String</code> objects, and byte arrays.
+ *
+ * @param value
+ * the Java object to be written
+ * @throws JMSException
+ * if the JMS provider fails to write the message due to some internal error.
+ * @throws MessageFormatException
+ * if the object is invalid.
+ * @throws MessageNotWriteableException
+ * if the message is in read-only mode.
+ */
+ public void writeObject(Object value) throws JMSException {
+ initializeWriting();
+ if (value == null) {
+ try {
+ MarshallingSupport.marshalNull(dataOut);
+ } catch (IOException ioe) {
+ throw BlazeJmsExceptionSupport.create(ioe);
+ }
+ } else if (value instanceof String) {
+ writeString(value.toString());
+ } else if (value instanceof Character) {
+ writeChar(((Character) value).charValue());
+ } else if (value instanceof Boolean) {
+ writeBoolean(((Boolean) value).booleanValue());
+ } else if (value instanceof Byte) {
+ writeByte(((Byte) value).byteValue());
+ } else if (value instanceof Short) {
+ writeShort(((Short) value).shortValue());
+ } else if (value instanceof Integer) {
+ writeInt(((Integer) value).intValue());
+ } else if (value instanceof Float) {
+ writeFloat(((Float) value).floatValue());
+ } else if (value instanceof Double) {
+ writeDouble(((Double) value).doubleValue());
+ } else if (value instanceof byte[]) {
+ writeBytes((byte[]) value);
+ } else if (value instanceof Long) {
+ writeLong(((Long) value).longValue());
+ } else {
+ throw new MessageFormatException("Unsupported Object type: " + value.getClass());
+ }
+ }
+
+ /**
+ * Puts the message body in read-only mode and repositions the stream of bytes to the beginning.
+ */
+ public void reset() {
+ storeContent();
+ this.bytesOut = null;
+ this.dataIn = null;
+ this.dataOut = null;
+ this.remainingBytes = -1;
+ }
+
+ protected void initializeWriting() {
+ super.initializeWriting();
+ if (this.dataOut == null) {
+ this.bytesOut = new ByteArrayOutputStream();
+ this.dataOut = new DataOutputStream(this.bytesOut);
+ }
+ }
+
+ protected void initializeReading() {
+ if (this.dataIn == null) {
+ if (this.dataIn == null) {
+ BlazeData bd = getContent();
+ Buffer data = null;
+ if (bd != null && bd.getPayload() != null) {
+ data = bd.getPayload();
+ } else {
+ data = new Buffer(new byte[0]);
+ }
+ BufferInputStream is = new BufferInputStream(data);
+ this.dataIn = new DataInputStream(is);
+ }
+ }
+ }
+}
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsStreamMessage.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsStreamMessage.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsTextMessage.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsTextMessage.java?rev=739885&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsTextMessage.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsTextMessage.java Sun Feb 1 23:35:54 2009
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.jms.message;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import javax.jms.JMSException;
+import javax.jms.TextMessage;
+import org.apache.activeblaze.BlazeException;
+import org.apache.activeblaze.BlazeRuntimeException;
+import org.apache.activemq.protobuf.Buffer;
+import org.apache.activemq.protobuf.BufferInputStream;
+import org.apache.activemq.protobuf.BufferOutputStream;
+
+/**
+ *
+ * @version $Revision$
+ */
+public class BlazeJmsTextMessage extends BlazeJmsMessage implements TextMessage {
+ protected String text;
+
+ public BlazeJmsTextMessage clone() {
+ BlazeJmsTextMessage copy = new BlazeJmsTextMessage();
+ try {
+ copy(copy);
+ } catch (BlazeException e) {
+ throw new BlazeRuntimeException(e);
+ }
+ return copy;
+ }
+
+ /**
+ * @param copy
+ * @throws BlazeException
+ */
+ protected void copy(BlazeJmsTextMessage copy) throws BlazeException {
+ storeContent();
+ super.copy(copy);
+ copy.text = this.text;
+ }
+
+ public void setText(String text) {
+ this.text = text;
+ setContent(null);
+ }
+
+ public void storeContent() {
+ super.storeContent();
+ try {
+ BufferOutputStream os = new BufferOutputStream(this.text != null ? this.text.length() : 10);
+ DataOutputStream dataOut = new DataOutputStream(os);
+ MarshallingSupport.writeUTF8(dataOut, this.text);
+ getContent().setPayload(os.toBuffer());
+ dataOut.close();
+ } catch (IOException e) {
+ throw new BlazeRuntimeException(e);
+ }
+ }
+
+ public String getText() {
+ if (this.text == null && getContent() != null) {
+ Buffer data = null;
+ if (getContent() == null || getContent().getPayload() == null) {
+ data = getContent().getPayload();
+ if (data == null) {
+ data = new Buffer(new byte[0]);
+ }
+ }
+ if (data == null) {
+ data = getContent().getPayload();
+ }
+ try {
+ BufferInputStream is = new BufferInputStream(data);
+ DataInputStream dataIn = new DataInputStream(is);
+ dataIn.close();
+ setContent(null);
+ this.text = MarshallingSupport.readUTF8(dataIn);
+ } catch (IOException e) {
+ throw new BlazeRuntimeException(e);
+ }
+ }
+ return this.text;
+ }
+
+ /**
+ * Clears out the message body. Clearing a message's body does not clear its header values or property entries. <p/>
+ * <P>
+ * If this message body was read-only, calling this method leaves the message body in the same state as an empty
+ * body in a newly created message.
+ *
+ * @throws JMSException
+ * if the JMS provider fails to clear the message body due to some internal error.
+ */
+ public void clearBody() throws JMSException {
+ super.clearBody();
+ this.text = null;
+ }
+}
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsTextMessage.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsTextMessage.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/MarshallingSupport.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/MarshallingSupport.java?rev=739885&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/MarshallingSupport.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/MarshallingSupport.java Sun Feb 1 23:35:54 2009
@@ -0,0 +1,383 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.jms.message;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.UTFDataFormatException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * The fixed version of the UTF8 encoding function. Some older JVM's UTF8
+ * encoding function breaks when handling large strings.
+ *
+ * @version $Revision$
+ */
+public final class MarshallingSupport {
+
+ public static final byte NULL = 0;
+ public static final byte BOOLEAN_TYPE = 1;
+ public static final byte BYTE_TYPE = 2;
+ public static final byte CHAR_TYPE = 3;
+ public static final byte SHORT_TYPE = 4;
+ public static final byte INTEGER_TYPE = 5;
+ public static final byte LONG_TYPE = 6;
+ public static final byte DOUBLE_TYPE = 7;
+ public static final byte FLOAT_TYPE = 8;
+ public static final byte STRING_TYPE = 9;
+ public static final byte BYTE_ARRAY_TYPE = 10;
+ public static final byte MAP_TYPE = 11;
+ public static final byte LIST_TYPE = 12;
+ public static final byte BIG_STRING_TYPE = 13;
+
+ private MarshallingSupport() {
+ }
+
+ public static void marshalPrimitiveMap(Map<String,Object> map, DataOutputStream out) throws IOException {
+ if (map == null) {
+ out.writeInt(-1);
+ } else {
+ out.writeInt(map.size());
+ for (String name:map.keySet()) {
+ out.writeUTF(name);
+ Object value = map.get(name);
+ marshalPrimitive(out, value);
+ }
+ }
+ }
+
+ public static Map<String, Object> unmarshalPrimitiveMap(DataInputStream in) throws IOException {
+ return unmarshalPrimitiveMap(in, Integer.MAX_VALUE);
+ }
+
+ /**
+ * @param in
+ * @return
+ * @throws IOException
+ * @throws IOException
+ */
+ public static Map<String, Object> unmarshalPrimitiveMap(DataInputStream in, int maxPropertySize) throws IOException {
+ int size = in.readInt();
+ if (size > maxPropertySize) {
+ throw new IOException("Primitive map is larger than the allowed size: " + size);
+ }
+ if (size < 0) {
+ return null;
+ } else {
+ Map<String, Object> rc = new HashMap<String, Object>(size);
+ for (int i = 0; i < size; i++) {
+ String name = in.readUTF();
+ rc.put(name, unmarshalPrimitive(in));
+ }
+ return rc;
+ }
+
+ }
+
+ public static void marshalPrimitiveList(List list, DataOutputStream out) throws IOException {
+ out.writeInt(list.size());
+ for (Iterator iter = list.iterator(); iter.hasNext();) {
+ Object element = (Object)iter.next();
+ marshalPrimitive(out, element);
+ }
+ }
+
+ public static List<Object> unmarshalPrimitiveList(DataInputStream in) throws IOException {
+ int size = in.readInt();
+ List<Object> answer = new ArrayList<Object>(size);
+ while (size-- > 0) {
+ answer.add(unmarshalPrimitive(in));
+ }
+ return answer;
+ }
+
+ public static void marshalPrimitive(DataOutputStream out, Object value) throws IOException {
+ if (value == null) {
+ marshalNull(out);
+ } else if (value.getClass() == Boolean.class) {
+ marshalBoolean(out, ((Boolean)value).booleanValue());
+ } else if (value.getClass() == Byte.class) {
+ marshalByte(out, ((Byte)value).byteValue());
+ } else if (value.getClass() == Character.class) {
+ marshalChar(out, ((Character)value).charValue());
+ } else if (value.getClass() == Short.class) {
+ marshalShort(out, ((Short)value).shortValue());
+ } else if (value.getClass() == Integer.class) {
+ marshalInt(out, ((Integer)value).intValue());
+ } else if (value.getClass() == Long.class) {
+ marshalLong(out, ((Long)value).longValue());
+ } else if (value.getClass() == Float.class) {
+ marshalFloat(out, ((Float)value).floatValue());
+ } else if (value.getClass() == Double.class) {
+ marshalDouble(out, ((Double)value).doubleValue());
+ } else if (value.getClass() == byte[].class) {
+ marshalByteArray(out, (byte[])value);
+ } else if (value.getClass() == String.class) {
+ marshalString(out, (String)value);
+ } else if (value instanceof Map) {
+ out.writeByte(MAP_TYPE);
+ marshalPrimitiveMap((Map)value, out);
+ } else if (value instanceof List) {
+ out.writeByte(LIST_TYPE);
+ marshalPrimitiveList((List)value, out);
+ } else {
+ throw new IOException("Object is not a primitive: " + value);
+ }
+ }
+
+ public static Object unmarshalPrimitive(DataInputStream in) throws IOException {
+ Object value = null;
+ byte type = in.readByte();
+ switch (type) {
+ case BYTE_TYPE:
+ value = Byte.valueOf(in.readByte());
+ break;
+ case BOOLEAN_TYPE:
+ value = in.readBoolean() ? Boolean.TRUE : Boolean.FALSE;
+ break;
+ case CHAR_TYPE:
+ value = Character.valueOf(in.readChar());
+ break;
+ case SHORT_TYPE:
+ value = Short.valueOf(in.readShort());
+ break;
+ case INTEGER_TYPE:
+ value = Integer.valueOf(in.readInt());
+ break;
+ case LONG_TYPE:
+ value = Long.valueOf(in.readLong());
+ break;
+ case FLOAT_TYPE:
+ value = new Float(in.readFloat());
+ break;
+ case DOUBLE_TYPE:
+ value = new Double(in.readDouble());
+ break;
+ case BYTE_ARRAY_TYPE:
+ value = new byte[in.readInt()];
+ in.readFully((byte[])value);
+ break;
+ case STRING_TYPE:
+ value = in.readUTF();
+ break;
+ case BIG_STRING_TYPE:
+ value = readUTF8(in);
+ break;
+ case MAP_TYPE:
+ value = unmarshalPrimitiveMap(in);
+ break;
+ case LIST_TYPE:
+ value = unmarshalPrimitiveList(in);
+ break;
+ case NULL:
+ value = null;
+ break;
+ default:
+ throw new IOException("Unknown primitive type: " + type);
+ }
+ return value;
+ }
+
+ public static void marshalNull(DataOutputStream out) throws IOException {
+ out.writeByte(NULL);
+ }
+
+ public static void marshalBoolean(DataOutputStream out, boolean value) throws IOException {
+ out.writeByte(BOOLEAN_TYPE);
+ out.writeBoolean(value);
+ }
+
+ public static void marshalByte(DataOutputStream out, byte value) throws IOException {
+ out.writeByte(BYTE_TYPE);
+ out.writeByte(value);
+ }
+
+ public static void marshalChar(DataOutputStream out, char value) throws IOException {
+ out.writeByte(CHAR_TYPE);
+ out.writeChar(value);
+ }
+
+ public static void marshalShort(DataOutputStream out, short value) throws IOException {
+ out.writeByte(SHORT_TYPE);
+ out.writeShort(value);
+ }
+
+ public static void marshalInt(DataOutputStream out, int value) throws IOException {
+ out.writeByte(INTEGER_TYPE);
+ out.writeInt(value);
+ }
+
+ public static void marshalLong(DataOutputStream out, long value) throws IOException {
+ out.writeByte(LONG_TYPE);
+ out.writeLong(value);
+ }
+
+ public static void marshalFloat(DataOutputStream out, float value) throws IOException {
+ out.writeByte(FLOAT_TYPE);
+ out.writeFloat(value);
+ }
+
+ public static void marshalDouble(DataOutputStream out, double value) throws IOException {
+ out.writeByte(DOUBLE_TYPE);
+ out.writeDouble(value);
+ }
+
+ public static void marshalByteArray(DataOutputStream out, byte[] value) throws IOException {
+ marshalByteArray(out, value, 0, value.length);
+ }
+
+ public static void marshalByteArray(DataOutputStream out, byte[] value, int offset, int length) throws IOException {
+ out.writeByte(BYTE_ARRAY_TYPE);
+ out.writeInt(length);
+ out.write(value, offset, length);
+ }
+
+ public static void marshalString(DataOutputStream out, String s) throws IOException {
+ // If it's too big, out.writeUTF may not able able to write it out.
+ if (s.length() < Short.MAX_VALUE / 4) {
+ out.writeByte(STRING_TYPE);
+ out.writeUTF(s);
+ } else {
+ out.writeByte(BIG_STRING_TYPE);
+ writeUTF8(out, s);
+ }
+ }
+
+ public static void writeUTF8(DataOutput dataOut, String text) throws IOException {
+ if (text != null) {
+ int strlen = text.length();
+ int utflen = 0;
+ char[] charr = new char[strlen];
+ int c = 0;
+ int count = 0;
+
+ text.getChars(0, strlen, charr, 0);
+
+ for (int i = 0; i < strlen; i++) {
+ c = charr[i];
+ if ((c >= 0x0001) && (c <= 0x007F)) {
+ utflen++;
+ } else if (c > 0x07FF) {
+ utflen += 3;
+ } else {
+ utflen += 2;
+ }
+ }
+ // TODO diff: Sun code - removed
+ byte[] bytearr = new byte[utflen + 4]; // TODO diff: Sun code
+ bytearr[count++] = (byte)((utflen >>> 24) & 0xFF); // TODO diff:
+ // Sun code
+ bytearr[count++] = (byte)((utflen >>> 16) & 0xFF); // TODO diff:
+ // Sun code
+ bytearr[count++] = (byte)((utflen >>> 8) & 0xFF);
+ bytearr[count++] = (byte)((utflen >>> 0) & 0xFF);
+ for (int i = 0; i < strlen; i++) {
+ c = charr[i];
+ if ((c >= 0x0001) && (c <= 0x007F)) {
+ bytearr[count++] = (byte)c;
+ } else if (c > 0x07FF) {
+ bytearr[count++] = (byte)(0xE0 | ((c >> 12) & 0x0F));
+ bytearr[count++] = (byte)(0x80 | ((c >> 6) & 0x3F));
+ bytearr[count++] = (byte)(0x80 | ((c >> 0) & 0x3F));
+ } else {
+ bytearr[count++] = (byte)(0xC0 | ((c >> 6) & 0x1F));
+ bytearr[count++] = (byte)(0x80 | ((c >> 0) & 0x3F));
+ }
+ }
+ dataOut.write(bytearr);
+
+ } else {
+ dataOut.writeInt(-1);
+ }
+ }
+
+ public static String readUTF8(DataInput dataIn) throws IOException {
+ int utflen = dataIn.readInt(); // TODO diff: Sun code
+ if (utflen > -1) {
+ StringBuffer str = new StringBuffer(utflen);
+ byte bytearr[] = new byte[utflen];
+ int c;
+ int char2;
+ int char3;
+ int count = 0;
+
+ dataIn.readFully(bytearr, 0, utflen);
+
+ while (count < utflen) {
+ c = bytearr[count] & 0xff;
+ switch (c >> 4) {
+ case 0:
+ case 1:
+ case 2:
+ case 3:
+ case 4:
+ case 5:
+ case 6:
+ case 7:
+ /* 0xxxxxxx */
+ count++;
+ str.append((char)c);
+ break;
+ case 12:
+ case 13:
+ /* 110x xxxx 10xx xxxx */
+ count += 2;
+ if (count > utflen) {
+ throw new UTFDataFormatException();
+ }
+ char2 = bytearr[count - 1];
+ if ((char2 & 0xC0) != 0x80) {
+ throw new UTFDataFormatException();
+ }
+ str.append((char)(((c & 0x1F) << 6) | (char2 & 0x3F)));
+ break;
+ case 14:
+ /* 1110 xxxx 10xx xxxx 10xx xxxx */
+ count += 3;
+ if (count > utflen) {
+ throw new UTFDataFormatException();
+ }
+ char2 = bytearr[count - 2]; // TODO diff: Sun code
+ char3 = bytearr[count - 1]; // TODO diff: Sun code
+ if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) {
+ throw new UTFDataFormatException();
+ }
+ str.append((char)(((c & 0x0F) << 12) | ((char2 & 0x3F) << 6) | ((char3 & 0x3F) << 0)));
+ break;
+ default:
+ /* 10xx xxxx, 1111 xxxx */
+ throw new UTFDataFormatException();
+ }
+ }
+ // The number of chars produced may be less than utflen
+ return new String(str);
+ } else {
+ return null;
+ }
+ }
+
+
+
+}
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/MarshallingSupport.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/MarshallingSupport.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jndi/JNDIReferenceFactory.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jndi/JNDIReferenceFactory.java?rev=739885&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jndi/JNDIReferenceFactory.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jndi/JNDIReferenceFactory.java Sun Feb 1 23:35:54 2009
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.jndi;
+
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.Map;
+import javax.naming.Context;
+import javax.naming.Name;
+import javax.naming.NamingException;
+import javax.naming.RefAddr;
+import javax.naming.Reference;
+import javax.naming.StringRefAddr;
+import javax.naming.spi.ObjectFactory;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Converts objects implementing JNDIStorable into a property fields so they can be stored and regenerated from JNDI
+ */
+public class JNDIReferenceFactory implements ObjectFactory {
+ static Log log = LogFactory.getLog(JNDIReferenceFactory.class);
+
+ /**
+ * This will be called by a JNDIprovider when a Reference is retrieved from a JNDI store - and generates the orignal
+ * instance
+ *
+ * @param object
+ * the Reference object
+ * @param name
+ * the JNDI name
+ * @param nameCtx
+ * the context
+ * @param environment
+ * the environment settings used by JNDI
+ * @return the instance built from the Reference object
+ * @throws Exception
+ * if building the instance from Reference fails (usually class not found)
+ */
+ public Object getObjectInstance(Object object, Name name, Context nameCtx, Hashtable<?, ?> environment)
+ throws Exception {
+ Object result = null;
+ if (object instanceof Reference) {
+ Reference reference = (Reference) object;
+ if (log.isTraceEnabled()) {
+ log.trace("Getting instance of " + reference.getClassName());
+ }
+ Class<?> theClass = loadClass(this, reference.getClassName());
+ if (JNDIStorable.class.isAssignableFrom(theClass)) {
+ JNDIStorable store = (JNDIStorable) theClass.newInstance();
+ Map<String, String> properties = new HashMap<String, String>();
+ for (Enumeration<RefAddr> iter = reference.getAll(); iter.hasMoreElements();) {
+ StringRefAddr addr = (StringRefAddr) iter.nextElement();
+ properties.put(addr.getType(), (addr.getContent() == null) ? "" : addr.getContent().toString());
+ }
+ store.setProperties(properties);
+ result = store;
+ }
+ } else {
+ log.error("Object " + object + " is not a reference - cannot load");
+ throw new RuntimeException("Object " + object + " is not a reference");
+ }
+ return result;
+ }
+
+ /**
+ * Create a Reference instance from a JNDIStorable object
+ *
+ * @param instanceClassName
+ * @param po
+ * @return Reference
+ * @throws NamingException
+ */
+ public static Reference createReference(String instanceClassName, JNDIStorable po) throws NamingException {
+ if (log.isTraceEnabled()) {
+ log.trace("Creating reference: " + instanceClassName + "," + po);
+ }
+ Reference result = new Reference(instanceClassName, JNDIReferenceFactory.class.getName(), null);
+ try {
+ Map<String, String> props = po.getProperties();
+ for (Map.Entry<String, String> entry : props.entrySet()) {
+ javax.naming.StringRefAddr addr = new javax.naming.StringRefAddr(entry.getKey(), entry.getValue());
+ result.add(addr);
+ }
+ } catch (Exception e) {
+ log.error(e.getMessage(), e);
+ throw new NamingException(e.getMessage());
+ }
+ return result;
+ }
+
+ /**
+ * Retrieve the class loader for a named class
+ *
+ * @param thisObj
+ * @param className
+ * @return the class
+ * @throws ClassNotFoundException
+ */
+ public static Class<?> loadClass(Object thisObj, String className) throws ClassNotFoundException {
+ // try local ClassLoader first.
+ ClassLoader loader = thisObj.getClass().getClassLoader();
+ Class<?> theClass;
+ if (loader != null) {
+ theClass = loader.loadClass(className);
+ } else {
+ // Will be null in jdk1.1.8
+ // use default classLoader
+ theClass = Class.forName(className);
+ }
+ return theClass;
+ }
+}
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jndi/JNDIReferenceFactory.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jndi/JNDIStorable.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jndi/JNDIStorable.java?rev=739885&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jndi/JNDIStorable.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jndi/JNDIStorable.java Sun Feb 1 23:35:54 2009
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.jndi;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import javax.naming.NamingException;
+import javax.naming.Reference;
+import javax.naming.Referenceable;
+
+/**
+ * Facilitates objects to be stored in JNDI as properties
+ */
+
+public abstract class JNDIStorable implements Referenceable, Externalizable {
+
+ private Map<String,String> properties;
+
+ /**
+ * Set the properties that will represent the instance in JNDI
+ *
+ * @param props
+ */
+ protected abstract void buildFromProperties(Map<String,String> props);
+
+ /**
+ * Initialize the instance from properties stored in JNDI
+ *
+ * @param props
+ */
+
+ protected abstract void populateProperties(Map<String,String> props);
+
+ /**
+ * set the properties for this instance as retrieved from JNDI
+ *
+ * @param props
+ */
+
+ public synchronized void setProperties(Map<String,String> props) {
+ this.properties = props;
+ buildFromProperties(props);
+ }
+
+ /**
+ * Get the properties from this instance for storing in JNDI
+ *
+ * @return the properties
+ */
+
+ public synchronized Map<String,String> getProperties() {
+ if (this.properties == null) {
+ this.properties = new HashMap<String,String>();
+ }
+ populateProperties(this.properties);
+ return this.properties;
+ }
+
+ /**
+ * Retrive a Reference for this instance to store in JNDI
+ *
+ * @return the built Reference
+ * @throws NamingException if error on building Reference
+ */
+ public Reference getReference() throws NamingException {
+ return JNDIReferenceFactory.createReference(this.getClass().getName(), this);
+ }
+
+ /**
+ * @param in
+ * @throws IOException
+ * @throws ClassNotFoundException
+ * @see java.io.Externalizable#readExternal(java.io.ObjectInput)
+ */
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ Map<String,String> props = (Map<String,String>)in.readObject();
+ if (props != null) {
+ setProperties(props);
+ }
+
+ }
+
+ /**
+ * @param out
+ * @throws IOException
+ * @see java.io.Externalizable#writeExternal(java.io.ObjectOutput)
+ */
+ public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeObject(getProperties());
+
+ }
+
+ protected String getProperty(Map<String,String>map,String key, String defaultValue) {
+ String value = map.get(key);
+ if (value != null) {
+ return value;
+ }
+ return defaultValue;
+ }
+
+}
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jndi/JNDIStorable.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/Callback.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/Callback.java?rev=739885&r1=739884&r2=739885&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/Callback.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/Callback.java Sun Feb 1 23:35:54 2009
@@ -21,12 +21,14 @@
/**
* A simple callback object
* @version $Revision: 1.2 $
+ * @param <T>
*/
-public interface Callback {
+public interface Callback<T> {
/**
* Executes some piece of code
+ * @param t
* @throws BlazeRuntimeException
*/
- void execute() throws BlazeRuntimeException;
+ void execute(T t) throws BlazeRuntimeException;
}
Modified: activemq/activemq-blaze/trunk/src/main/proto/blaze.proto
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/proto/blaze.proto?rev=739885&r1=739884&r2=739885&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/proto/blaze.proto (original)
+++ activemq/activemq-blaze/trunk/src/main/proto/blaze.proto Sun Feb 1 23:35:54 2009
@@ -84,6 +84,15 @@
optional bool temporary=3;
}
+ message SubscriptionData {
+ optional bool durable =1;
+ optional int32 weight = 2;
+ optional string channelName =3;
+ optional string subscriberName =4;
+ optional string selector =5;
+ optional DestinationData destinationData =6;
+ }
+
message MemberData {
//| option java_implments = "org.apache.activeblaze.impl.processor.PacketMessageType";
//| option java_type_method = "MessageType";
@@ -98,11 +107,11 @@
//if both weights are the same - the refined
//weight can be used
optional int64 refinedWeight = 8;
- optional bool destinationsChanged = 9;
+ optional bool subscriptionsChanged = 9;
optional bool observer = 10;
optional bool lockedMaster = 11;
repeated bytes groups = 12;
- repeated DestinationData destination = 13;
+ repeated SubscriptionData subscriptionData = 13;
}
message StateKeyData {
@@ -242,6 +251,7 @@
optional DestinationData destinationData = 11;
optional DestinationData replyToData = 12;
optional MapData mapData = 14;
+ optional bytes payload = 15;
}
Modified: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazeChannelTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazeChannelTest.java?rev=739885&r1=739884&r2=739885&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazeChannelTest.java (original)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazeChannelTest.java Sun Feb 1 23:35:54 2009
@@ -39,7 +39,7 @@
sender.start();
receiver.start();
final CountDownLatch latch = new CountDownLatch(count);
- receiver.addBlazeTopicMessageListener(destination, new BlazeTopicListener() {
+ receiver.addBlazeTopicMessageListener(destination, new BlazeMessageListener() {
public void onMessage(BlazeMessage message) {
message.size();
received.incrementAndGet();
@@ -69,7 +69,7 @@
BlazeChannel channel = factory.createChannel();
channel.start();
channels.add(channel);
- channel.addBlazeTopicMessageListener(destination, new BlazeTopicListener() {
+ channel.addBlazeTopicMessageListener(destination, new BlazeMessageListener() {
public void onMessage(BlazeMessage message) {
synchronized (count) {
if (count.incrementAndGet() == GROUP_SIZE) {