You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2013/07/19 20:44:24 UTC
svn commit: r1504961 [2/11] - in /activemq/activemq-blaze/trunk: ./
src/main/java/org/apache/activeblaze/
src/main/java/org/apache/activeblaze/cluster/
src/main/java/org/apache/activeblaze/group/
src/main/java/org/apache/activeblaze/impl/destination/ s...
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessage.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessage.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessage.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessage.java Fri Jul 19 18:44:21 2013
@@ -16,43 +16,22 @@
*/
package org.apache.activeblaze;
-import java.security.Key;
+import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Enumeration;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import org.apache.activeblaze.util.IOUtils;
-import org.apache.activeblaze.wire.BlazeData;
-import org.apache.activeblaze.wire.BoolType;
-import org.apache.activeblaze.wire.BufferType;
-import org.apache.activeblaze.wire.ByteType;
-import org.apache.activeblaze.wire.BytesType;
-import org.apache.activeblaze.wire.CharType;
-import org.apache.activeblaze.wire.DestinationData;
-import org.apache.activeblaze.wire.DoubleType;
-import org.apache.activeblaze.wire.FloatType;
-import org.apache.activeblaze.wire.IntType;
-import org.apache.activeblaze.wire.LongType;
-import org.apache.activeblaze.wire.MapData;
-import org.apache.activeblaze.wire.ShortType;
-import org.apache.activeblaze.wire.StringType;
-import org.apache.activeblaze.wire.BlazeData.BlazeDataBean;
-import org.apache.activeblaze.wire.BoolType.BoolTypeBean;
-import org.apache.activeblaze.wire.BufferType.BufferTypeBean;
-import org.apache.activeblaze.wire.ByteType.ByteTypeBean;
-import org.apache.activeblaze.wire.BytesType.BytesTypeBean;
-import org.apache.activeblaze.wire.CharType.CharTypeBean;
-import org.apache.activeblaze.wire.DoubleType.DoubleTypeBean;
-import org.apache.activeblaze.wire.FloatType.FloatTypeBean;
-import org.apache.activeblaze.wire.IntType.IntTypeBean;
-import org.apache.activeblaze.wire.LongType.LongTypeBean;
-import org.apache.activeblaze.wire.MapData.MapDataBean;
-import org.apache.activeblaze.wire.ShortType.ShortTypeBean;
-import org.apache.activeblaze.wire.StringType.StringTypeBean;
-import org.apache.activemq.protobuf.Buffer;
-import org.apache.activemq.protobuf.UTF8Buffer;
+
+import org.apache.activeblaze.wire.Buffer;
+import org.apache.activeblaze.wire.BufferInputStream;
+import org.apache.activeblaze.wire.BufferOutputStream;
+import org.apache.activeblaze.wire.IOUtils;
+import org.apache.activeblaze.wire.MarshallingSupport;
+import org.apache.activeblaze.wire.Packet;
+import org.apache.activeblaze.wire.PacketType;
+
/**
* A <CODE>BlazeMessage</CODE> object is used to send a set of name-value pairs.
* The names are <CODE>String</CODE> objects, and the values are primitive data
@@ -61,59 +40,55 @@ import org.apache.activemq.protobuf.UTF8
* or randomly by name. The order of the entries is undefined.
* <CODE>BlazeMessage</CODE> inherits from the <CODE>Message</CODE> interface
* and adds a message body that contains a Map.
- * <P>
+ * <p/>
* The primitive types can be read or written explicitly using methods for each
* type. They may also be read or written generically as objects. For instance,
* a call to <CODE>BlazeMessage.setInt("foo", 6)</CODE> is equivalent to
* <CODE> BlazeMessage.setObject("foo", new Integer(6))</CODE>. Both forms are
* provided, because the explicit form is convenient for static programming, and
* the object form is needed when types are not known at compile time.
- * <P>
- * <P>
+ * <p/>
+ * <p/>
* <CODE>BlazeMessage</CODE> objects support the following conversion table. The
* marked cases must be supported. The unmarked cases must throw a
* <CODE>JMSException</CODE>. The <CODE>String</CODE> -to-primitive conversions
* may throw a runtime exception if the primitive's <CODE>valueOf()</CODE>
* method does not accept it as a valid <CODE> String</CODE> representation of
* the primitive.
- * <P>
+ * <p/>
* A value written as the row type can be read as the column type.
* <p/>
- *
+ * <p/>
* <PRE>
* | | boolean byte short char int long float double String byte[] |----------------------------------------------------------------------
* |boolean | X X |byte | X X X X X |short | X X X X |char | X X |int | X X X |long | X X |float | X X X |double | X X
* |String | X X X X X X X X |byte[] | X |----------------------------------------------------------------------
* <p/>
* </PRE>
- *
* <p/>
- * <P>
+ * <p/>
+ * <p/>
* Attempting to read a null value as a primitive type must be treated as
* calling the primitive's corresponding <code>valueOf(String)</code> conversion
* method with a null value. Since <code>char</code> does not support a
* <code>String</code> conversion, attempting to read a null value as a
* <code>char</code> must throw a <code>NullPointerException</code>.
- *
*/
-public class BlazeMessage implements Map<String, Object> {
+public class BlazeMessage extends Packet implements Map<String, Object> {
private static final String DEFAULT_TEXT_PAYLOAD = "DEFAULT_TEXT_PAYLOAD";
private static final String DEFAULT_BYTES_PAYLOAD = "DEFAULT_BYTES_PAYLOAD";
private static final String DEFAULT_OBJECT_PAYLOAD = "DEFAULT_OBJECT_PAYLOAD";
- private transient Map<String, Object> map = new ConcurrentHashMap<String, Object>();
- private transient Destination destination;
- private transient Destination replyTo;
- private transient String fromId;
- private transient String messageId;
- private transient String correlationId;
- private transient String messageType;
- private transient long timeStamp;
- private transient long expiration;
- private transient int redeliveryCounter;
- private transient int priority;
- private transient boolean persistent;
- private transient int type;
- private BlazeData content;
+ private Map<String, Object> map = new ConcurrentHashMap<String, Object>();
+ private Destination destination;
+ private Destination replyTo;
+ private String messageType;
+ private long timeStamp;
+ private long expiration;
+ private int redeliveryCounter;
+ private int priority;
+ private boolean persistent;
+ private int type;
+ private Buffer content;
private transient boolean loaded;
/**
@@ -125,8 +100,6 @@ public class BlazeMessage implements Map
/**
* Constructor - Utility to construct a message with a text
* <Code>String</Code> payload
- *
- * @param text
*/
public BlazeMessage(String text) {
setStringValue(DEFAULT_TEXT_PAYLOAD, text);
@@ -134,8 +107,6 @@ public class BlazeMessage implements Map
/**
* Constructor - Utility to construct a message with a byte[] array payload
- *
- * @param data
*/
public BlazeMessage(byte[] data) {
setBytesValue(DEFAULT_BYTES_PAYLOAD, data);
@@ -143,17 +114,22 @@ public class BlazeMessage implements Map
/**
* Constructor - Utility to construct a message with an object payload
- *
- * @param data
*/
public BlazeMessage(Object data) {
setObject(data);
}
/**
+ * Get the packetType
+ *
+ * @return the packetType
+ */
+ public int getPacketType() {
+ return PacketType.MESSAGE.getNumber();
+ }
+
+ /**
* Utility method for setting a default <Code>String</Code> payload
- *
- * @param text
*/
public void setText(String text) {
setStringValue(DEFAULT_TEXT_PAYLOAD, text);
@@ -162,9 +138,8 @@ public class BlazeMessage implements Map
/**
* Utility method used for when a BlazeMessage is only carrying a byte[]
* array
- *
+ *
* @return text the default text
- * @throws Exception
*/
public String getText() throws Exception {
return getStringValue(DEFAULT_TEXT_PAYLOAD);
@@ -172,8 +147,6 @@ public class BlazeMessage implements Map
/**
* Utility method for setting a default <Code>byte[]</Code> payload
- *
- * @param payload
*/
public void setBytes(byte[] payload) {
setBytesValue(DEFAULT_BYTES_PAYLOAD, payload);
@@ -181,9 +154,8 @@ public class BlazeMessage implements Map
/**
* Utility method used for when a BlazeMessage is only carrying an Object
- *
+ *
* @return text the default text
- * @throws Exception
*/
public Object getObject() throws Exception {
Buffer buffer = getBufferValue(DEFAULT_OBJECT_PAYLOAD);
@@ -192,8 +164,6 @@ public class BlazeMessage implements Map
/**
* Utility method for setting a default <Code>Object</Code> payload
- *
- * @param payload
*/
public void setObject(Object payload) {
try {
@@ -205,9 +175,8 @@ public class BlazeMessage implements Map
/**
* Utility method used for when a BlazeMessage is only carrying a String
- *
+ *
* @return text the default text
- * @throws Exception
*/
public byte[] getBytes() throws Exception {
return getBytesValue(DEFAULT_BYTES_PAYLOAD);
@@ -222,70 +191,26 @@ public class BlazeMessage implements Map
}
/**
- * @param destination
- * the destination to set
+ * @param destination the destination to set
*/
public void setDestination(Destination destination) {
this.destination = destination;
}
/**
- * @param destination
- */
- public void setDestination(DestinationData destinationData) {
- if (destinationData != null) {
- this.destination = new Destination(destinationData);
- }
- }
-
- /**
* The id of the channel that sent the message
- *
+ *
* @return the fromId
*/
public String getFromId() {
- initializeReading();
- return this.fromId;
- }
-
- /**
- * @param fromId
- * the fromId to set
- */
- public void setFromId(String fromId) {
- this.fromId = fromId;
+ return getProducerId();
}
/**
* @return the messageId
*/
public String getMessageId() {
- initializeReading();
- return this.messageId;
- }
-
- /**
- * @param messageId
- * the messageId to set
- */
- public void setMessageId(String messageId) {
- this.messageId = messageId;
- }
-
- /**
- * @return the correlationId
- */
- public String getCorrelationId() {
- initializeReading();
- return this.correlationId;
- }
-
- /**
- * @param correlationId
- * the correlationId to set
- */
- public void setCorrelationId(String correlationId) {
- this.correlationId = correlationId;
+ return getId();
}
/**
@@ -297,8 +222,7 @@ public class BlazeMessage implements Map
}
/**
- * @param timeStamp
- * the timeStamp to set
+ * @param timeStamp the timeStamp to set
*/
public void setTimeStamp(long timeStamp) {
this.timeStamp = timeStamp;
@@ -313,22 +237,13 @@ public class BlazeMessage implements Map
}
/**
- * @param replyTo
- * the replyTo to set
+ * @param replyTo the replyTo to set
*/
public void setReplyTo(Destination replyTo) {
this.replyTo = replyTo;
}
/**
- * @param replyTo
- * the replyTo to set
- */
- public void setReplyTo(DestinationData replyTo) {
- this.replyTo = new Destination(replyTo);
- }
-
- /**
* @return the expiration
*/
public long getExpiration() {
@@ -337,8 +252,7 @@ public class BlazeMessage implements Map
}
/**
- * @param expiration
- * the expiration to set
+ * @param expiration the expiration to set
*/
public void setExpiration(long expiration) {
this.expiration = expiration;
@@ -353,8 +267,7 @@ public class BlazeMessage implements Map
}
/**
- * @param redeliveryCounter
- * the redeliveryCounter to set
+ * @param redeliveryCounter the redeliveryCounter to set
*/
public void setRedeliveryCounter(int redeliveryCounter) {
this.redeliveryCounter = redeliveryCounter;
@@ -369,8 +282,7 @@ public class BlazeMessage implements Map
}
/**
- * @param priority
- * the priority to set
+ * @param priority the priority to set
*/
public void setPriority(int priority) {
this.priority = priority;
@@ -385,8 +297,7 @@ public class BlazeMessage implements Map
}
/**
- * @param persistent
- * the persistent to set
+ * @param persistent the persistent to set
*/
public void setPersistent(boolean persistent) {
this.persistent = persistent;
@@ -401,8 +312,7 @@ public class BlazeMessage implements Map
}
/**
- * @param type
- * the type to set
+ * @param type the type to set
*/
public void setMessageType(String type) {
this.messageType = type;
@@ -410,7 +320,7 @@ public class BlazeMessage implements Map
/**
* Get the type
- *
+ *
* @return the type
*/
public int getType() {
@@ -428,7 +338,7 @@ public class BlazeMessage implements Map
BlazeMessage copy = new BlazeMessage();
try {
copy(copy);
- } catch (BlazeException e) {
+ } catch (Exception e) {
throw new BlazeRuntimeException(e);
}
return copy;
@@ -439,16 +349,15 @@ public class BlazeMessage implements Map
*/
public void clear() {
this.map.clear();
+ this.content = null;
}
/**
* Returns the <CODE>boolean</CODE> value with the specified name.
- *
- * @param name
- * the name of the <CODE>boolean</CODE>
+ *
+ * @param name the name of the <CODE>boolean</CODE>
* @return the <CODE>boolean</CODE> value with the specified name
- * @throws BlazeMessageFormatException
- * if this type conversion is invalid.
+ * @throws BlazeMessageFormatException if this type conversion is invalid.
*/
public boolean getBooleanValue(String name) throws BlazeMessageFormatException {
initializeReading();
@@ -467,12 +376,10 @@ public class BlazeMessage implements Map
/**
* Returns the <CODE>byte</CODE> value with the specified name.
- *
- * @param name
- * the name of the <CODE>byte</CODE>
+ *
+ * @param name the name of the <CODE>byte</CODE>
* @return the <CODE>byte</CODE> value with the specified name
- * @throws BlazeMessageFormatException
- * if this type conversion is invalid.
+ * @throws BlazeMessageFormatException if this type conversion is invalid.
*/
public byte getByteValue(String name) throws BlazeMessageFormatException {
initializeReading();
@@ -491,12 +398,10 @@ public class BlazeMessage implements Map
/**
* Returns the <CODE>short</CODE> value with the specified name.
- *
- * @param name
- * the name of the <CODE>short</CODE>
+ *
+ * @param name the name of the <CODE>short</CODE>
* @return the <CODE>short</CODE> value with the specified name
- * @throws BlazeMessageFormatException
- * if this type conversion is invalid.
+ * @throws BlazeMessageFormatException if this type conversion is invalid.
*/
public short getShortValue(String name) throws BlazeMessageFormatException {
initializeReading();
@@ -518,12 +423,10 @@ public class BlazeMessage implements Map
/**
* Returns the Unicode character value with the specified name.
- *
- * @param name
- * the name of the Unicode character
+ *
+ * @param name the name of the Unicode character
* @return the Unicode character value with the specified name
- * @throws BlazeMessageFormatException
- * if this type conversion is invalid.
+ * @throws BlazeMessageFormatException if this type conversion is invalid.
*/
public char getCharValue(String name) throws BlazeMessageFormatException {
initializeReading();
@@ -539,12 +442,10 @@ public class BlazeMessage implements Map
/**
* Returns the <CODE>int</CODE> value with the specified name.
- *
- * @param name
- * the name of the <CODE>int</CODE>
+ *
+ * @param name the name of the <CODE>int</CODE>
* @return the <CODE>int</CODE> value with the specified name
- * @throws BlazeMessageFormatException
- * if this type conversion is invalid.
+ * @throws BlazeMessageFormatException if this type conversion is invalid.
*/
public int getIntValue(String name) throws BlazeMessageFormatException {
initializeReading();
@@ -569,12 +470,10 @@ public class BlazeMessage implements Map
/**
* Returns the <CODE>long</CODE> value with the specified name.
- *
- * @param name
- * the name of the <CODE>long</CODE>
+ *
+ * @param name the name of the <CODE>long</CODE>
* @return the <CODE>long</CODE> value with the specified name
- * @throws BlazeMessageFormatException
- * if this type conversion is invalid.
+ * @throws BlazeMessageFormatException if this type conversion is invalid.
*/
public long getLongValue(String name) throws BlazeMessageFormatException {
initializeReading();
@@ -602,12 +501,10 @@ public class BlazeMessage implements Map
/**
* Returns the <CODE>float</CODE> value with the specified name.
- *
- * @param name
- * the name of the <CODE>float</CODE>
+ *
+ * @param name the name of the <CODE>float</CODE>
* @return the <CODE>float</CODE> value with the specified name
- * @throws BlazeMessageFormatException
- * if this type conversion is invalid.
+ * @throws BlazeMessageFormatException if this type conversion is invalid.
*/
public float getFloatValue(String name) throws BlazeMessageFormatException {
initializeReading();
@@ -626,12 +523,10 @@ public class BlazeMessage implements Map
/**
* Returns the <CODE>double</CODE> value with the specified name.
- *
- * @param name
- * the name of the <CODE>double</CODE>
+ *
+ * @param name the name of the <CODE>double</CODE>
* @return the <CODE>double</CODE> value with the specified name
- * @throws BlazeMessageFormatException
- * if this type conversion is invalid.
+ * @throws BlazeMessageFormatException if this type conversion is invalid.
*/
public double getDoubleValue(String name) throws BlazeMessageFormatException {
initializeReading();
@@ -653,13 +548,11 @@ public class BlazeMessage implements Map
/**
* Returns the <CODE>String</CODE> value with the specified name.
- *
- * @param name
- * the name of the <CODE>String</CODE>
+ *
+ * @param name the name of the <CODE>String</CODE>
* @return the <CODE>String</CODE> value with the specified name; if there
* is no item by this name, a null value is returned
- * @throws BlazeMessageFormatException
- * if this type conversion is invalid.
+ * @throws BlazeMessageFormatException if this type conversion is invalid.
*/
public String getStringValue(String name) throws BlazeMessageFormatException {
initializeReading();
@@ -675,13 +568,11 @@ public class BlazeMessage implements Map
/**
* Returns the byte array value with the specified name.
- *
- * @param name
- * the name of the byte array
+ *
+ * @param name the name of the byte array
* @return the byte array value with the specified name; if there is no item
* by this name, a null value is returned.
- * @throws BlazeMessageFormatException
- * if this type conversion is invalid.
+ * @throws BlazeMessageFormatException if this type conversion is invalid.
*/
public byte[] getBytesValue(String name) throws BlazeMessageFormatException {
initializeReading();
@@ -694,13 +585,11 @@ public class BlazeMessage implements Map
/**
* Returns a Buffer with the specified name.
- *
- * @param name
- * the name of the byte array
+ *
+ * @param name the name of the byte array
* @return the byte array value with the specified name; if there is no item
* by this name, a null value is returned.
- * @throws BlazeMessageFormatException
- * if this type conversion is invalid.
+ * @throws BlazeMessageFormatException if this type conversion is invalid.
*/
public Buffer getBufferValue(String name) throws BlazeMessageFormatException {
initializeReading();
@@ -713,17 +602,16 @@ public class BlazeMessage implements Map
/**
* Returns the value of the object with the specified name.
- * <P>
+ * <p/>
* This method can be used to return, in objectified format, an object in
* the Java programming language ("Java object") that had been stored in the
* Map with the equivalent <CODE>setObject</CODE> method call, or its
* equivalent primitive <CODE>set <I>type </I></CODE> method.
- * <P>
+ * <p/>
* Note that byte values are returned as <CODE>byte[]</CODE>, not
* <CODE>Byte[]</CODE>.
- *
- * @param name
- * the name of the Java object
+ *
+ * @param name the name of the Java object
* @return a copy of the Java object value with the specified name, in
* objectified format (for example, if the object was set as an
* <CODE>int</CODE>, an <CODE>Integer</CODE> is returned); if there
@@ -737,7 +625,7 @@ public class BlazeMessage implements Map
/**
* Returns an <CODE>Enumeration</CODE> of all the names in the
* <CODE>BlazeMessage</CODE> object.
- *
+ *
* @return an enumeration of all the names in this <CODE>BlazeMessage</CODE>
*/
public Enumeration<String> getNames() {
@@ -747,10 +635,8 @@ public class BlazeMessage implements Map
/**
* put a key,value pair into the message
- *
- * @param name
- * @param value
- * must be a supported primitive, or map of supported primitives
+ *
+ * @param value must be a supported primitive, or map of supported primitives
* @return the previous value associated with the key
*/
public Object put(String name, Object value) {
@@ -767,11 +653,9 @@ public class BlazeMessage implements Map
/**
* Sets a <CODE>boolean</CODE> value with the specified name into the Map.
- *
- * @param name
- * the name of the <CODE>boolean</CODE>
- * @param value
- * the <CODE>boolean</CODE> value to set in the Map
+ *
+ * @param name the name of the <CODE>boolean</CODE>
+ * @param value the <CODE>boolean</CODE> value to set in the Map
*/
public void setBooleanValue(String name, boolean value) {
initializeWriting();
@@ -780,11 +664,9 @@ public class BlazeMessage implements Map
/**
* Sets a <CODE>byte</CODE> value with the specified name into the Map.
- *
- * @param name
- * the name of the <CODE>byte</CODE>
- * @param value
- * the <CODE>byte</CODE> value to set in the Map
+ *
+ * @param name the name of the <CODE>byte</CODE>
+ * @param value the <CODE>byte</CODE> value to set in the Map
*/
public void setByteValue(String name, byte value) {
initializeWriting();
@@ -793,11 +675,9 @@ public class BlazeMessage implements Map
/**
* Sets a <CODE>short</CODE> value with the specified name into the Map.
- *
- * @param name
- * the name of the <CODE>short</CODE>
- * @param value
- * the <CODE>short</CODE> value to set in the Map
+ *
+ * @param name the name of the <CODE>short</CODE>
+ * @param value the <CODE>short</CODE> value to set in the Map
*/
public void setShortValue(String name, short value) {
initializeWriting();
@@ -806,11 +686,9 @@ public class BlazeMessage implements Map
/**
* Sets a Unicode character value with the specified name into the Map.
- *
- * @param name
- * the name of the Unicode character
- * @param value
- * the Unicode character value to set in the Map
+ *
+ * @param name the name of the Unicode character
+ * @param value the Unicode character value to set in the Map
*/
public void setCharValue(String name, char value) {
initializeWriting();
@@ -819,11 +697,9 @@ public class BlazeMessage implements Map
/**
* Sets an <CODE>int</CODE> value with the specified name into the Map.
- *
- * @param name
- * the name of the <CODE>int</CODE>
- * @param value
- * the <CODE>int</CODE> value to set in the Map
+ *
+ * @param name the name of the <CODE>int</CODE>
+ * @param value the <CODE>int</CODE> value to set in the Map
*/
public void setIntValue(String name, int value) {
initializeWriting();
@@ -832,11 +708,9 @@ public class BlazeMessage implements Map
/**
* Sets a <CODE>long</CODE> value with the specified name into the Map.
- *
- * @param name
- * the name of the <CODE>long</CODE>
- * @param value
- * the <CODE>long</CODE> value to set in the Map
+ *
+ * @param name the name of the <CODE>long</CODE>
+ * @param value the <CODE>long</CODE> value to set in the Map
*/
public void setLongValue(String name, long value) {
initializeWriting();
@@ -845,11 +719,9 @@ public class BlazeMessage implements Map
/**
* Sets a <CODE>float</CODE> value with the specified name into the Map.
- *
- * @param name
- * the name of the <CODE>float</CODE>
- * @param value
- * the <CODE>float</CODE> value to set in the Map
+ *
+ * @param name the name of the <CODE>float</CODE>
+ * @param value the <CODE>float</CODE> value to set in the Map
*/
public void setFloatValue(String name, float value) {
initializeWriting();
@@ -858,11 +730,9 @@ public class BlazeMessage implements Map
/**
* Sets a <CODE>double</CODE> value with the specified name into the Map.
- *
- * @param name
- * the name of the <CODE>double</CODE>
- * @param value
- * the <CODE>double</CODE> value to set in the Map
+ *
+ * @param name the name of the <CODE>double</CODE>
+ * @param value the <CODE>double</CODE> value to set in the Map
*/
public void setDoubleValue(String name, double value) {
initializeWriting();
@@ -871,11 +741,9 @@ public class BlazeMessage implements Map
/**
* Sets a <CODE>String</CODE> value with the specified name into the Map.
- *
- * @param name
- * the name of the <CODE>String</CODE>
- * @param value
- * the <CODE>String</CODE> value to set in the Map
+ *
+ * @param name the name of the <CODE>String</CODE>
+ * @param value the <CODE>String</CODE> value to set in the Map
*/
public void setStringValue(String name, String value) {
initializeWriting();
@@ -884,15 +752,12 @@ public class BlazeMessage implements Map
/**
* Sets a byte array value with the specified name into the Map.
- *
- * @param name
- * the name of the byte array
- * @param value
- * the byte array value to set in the Map; the array is copied so
- * that the value for <CODE>name </CODE> will not be altered by
- * future modifications
- * @throws NullPointerException
- * if the name is null, or if the name is an empty string.
+ *
+ * @param name the name of the byte array
+ * @param value the byte array value to set in the Map; the array is copied so
+ * that the value for <CODE>name </CODE> will not be altered by
+ * future modifications
+ * @throws NullPointerException if the name is null, or if the name is an empty string.
*/
public void setBytesValue(String name, byte[] value) {
initializeWriting();
@@ -905,13 +770,10 @@ public class BlazeMessage implements Map
/**
* Sets a Buffer value with the specified name into the Map.
- *
- * @param name
- * the name of the byte array
- * @param value
- * the Buffer value to set in the Map
- * @throws NullPointerException
- * if the name is null, or if the name is an empty string.
+ *
+ * @param name the name of the byte array
+ * @param value the Buffer value to set in the Map
+ * @throws NullPointerException if the name is null, or if the name is an empty string.
*/
public void setBufferValue(String name, Buffer value) {
initializeWriting();
@@ -925,15 +787,11 @@ public class BlazeMessage implements Map
/**
* Sets a portion of the byte array value with the specified name into the
* Map.
- *
- * @param name
- * the name of the byte array
- * @param value
- * the byte array value to set in the Map
- * @param offset
- * the initial offset within the byte array
- * @param length
- * the number of bytes to use
+ *
+ * @param name the name of the byte array
+ * @param value the byte array value to set in the Map
+ * @param offset the initial offset within the byte array
+ * @param length the number of bytes to use
*/
public void setBytesValue(String name, byte[] value, int offset, int length) {
initializeWriting();
@@ -944,10 +802,8 @@ public class BlazeMessage implements Map
/**
* Find out if the message contains a key This isn't recursive
- *
- * @param key
+ *
* @return true if the message contains the key
- *
*/
public boolean containsKey(Object key) {
initializeReading();
@@ -956,10 +812,8 @@ public class BlazeMessage implements Map
/**
* Find out if the message contains a value
- *
- * @param value
+ *
* @return true if the value exists
- *
*/
public boolean containsValue(Object value) {
initializeReading();
@@ -968,7 +822,6 @@ public class BlazeMessage implements Map
/**
* @return a set of Map.Entry values
- *
*/
public Set<java.util.Map.Entry<String, Object>> entrySet() {
initializeReading();
@@ -977,8 +830,7 @@ public class BlazeMessage implements Map
/**
* Retrieve the object associated with the key
- *
- * @param key
+ *
* @return the object
*/
public Object get(Object key) {
@@ -988,7 +840,6 @@ public class BlazeMessage implements Map
/**
* @return true if the message is empty
- *
*/
public boolean isEmpty() {
initializeReading();
@@ -1005,10 +856,8 @@ public class BlazeMessage implements Map
/**
* Add all entries in a Map to the message
- *
- * @param t
- * the map
- *
+ *
+ * @param t the map
*/
public void putAll(Map<? extends String, ? extends Object> t) {
for (Map.Entry<? extends String, ? extends Object> entry : t.entrySet()) {
@@ -1018,10 +867,8 @@ public class BlazeMessage implements Map
/**
* Remove a key/value pair from the message
- *
- * @param key
+ *
* @return the value removed or null
- *
*/
public Object remove(Object key) {
setContent(null);
@@ -1046,22 +893,13 @@ public class BlazeMessage implements Map
/**
* check if a named value exists in the message
- *
- * @param name
+ *
* @return true if value exits
*/
public boolean valueExists(String name) {
return this.map.containsKey(name);
}
- protected void initializeReading() {
- loadContent();
- }
-
- protected void initializeWriting() {
- setContent(null);
- }
-
protected void checkValidObject(Object value) throws IllegalArgumentException {
boolean valid = value instanceof Boolean || value instanceof Byte || value instanceof Short
|| value instanceof Integer || value instanceof Long;
@@ -1087,222 +925,124 @@ public class BlazeMessage implements Map
return super.toString() + "MQBlazeMessage{ " + "map = " + this.map + " }";
}
- protected void copy(BlazeMessage copy) throws BlazeException {
+ protected void copy(BlazeMessage copy) throws IOException {
+ super.copy(copy);
storeContent();
+ copy.persistent = this.persistent;
+ copy.destination = this.destination;
+ copy.replyTo = this.replyTo;
+ copy.messageType = this.messageType;
+ copy.timeStamp = this.timeStamp;
+ copy.expiration = this.expiration;
+ copy.redeliveryCounter = this.redeliveryCounter;
+ copy.priority = this.priority;
+ copy.type = this.type;
copy.content = this.content;
}
/**
* @return the content data
*/
- public BlazeData getContent() {
+ public Buffer getContent() {
return this.content;
}
/**
* Set the content data
- *
- * @param content
*/
- public void setContent(BlazeData content) {
+ public void setContent(Buffer content) {
this.content = content;
}
- protected void marshallMap(MapDataBean mapData, String name, Object value) throws BlazeRuntimeException {
- if (value != null) {
- if (value.getClass() == Boolean.class) {
- BoolTypeBean type = new BoolTypeBean();
- type.setName(name);
- type.setValue(((Boolean) value).booleanValue());
- mapData.addBoolType(type);
- } else if (value.getClass() == Byte.class) {
- ByteTypeBean type = new ByteTypeBean();
- type.setName(name);
- type.setValue(((Byte) value).byteValue());
- mapData.addByteType(type);
- } else if (value.getClass() == Character.class) {
- CharTypeBean type = new CharTypeBean();
- type.setName(name);
- type.setValue(value.toString());
- mapData.addCharType(type);
- } else if (value.getClass() == Short.class) {
- ShortTypeBean type = new ShortTypeBean();
- type.setName(name);
- type.setValue(((Short) value).shortValue());
- mapData.addShortType(type);
- } else if (value.getClass() == Integer.class) {
- IntTypeBean type = new IntTypeBean();
- type.setName(name);
- type.setValue(((Integer) value).intValue());
- mapData.addIntType(type);
- } else if (value.getClass() == Long.class) {
- LongTypeBean type = new LongTypeBean();
- type.setName(name);
- type.setValue(((Long) value).longValue());
- mapData.addLongType(type);
- } else if (value.getClass() == Float.class) {
- FloatTypeBean type = new FloatTypeBean();
- type.setName(name);
- type.setValue(((Float) value).floatValue());
- mapData.addFloatType(type);
- } else if (value.getClass() == Double.class) {
- DoubleTypeBean type = new DoubleTypeBean();
- type.setName(name);
- type.setValue(((Double) value).doubleValue());
- mapData.addDoubleType(type);
- } else if (value.getClass() == byte[].class) {
- BytesTypeBean type = new BytesTypeBean();
- type.setName(name);
- type.setValue(new Buffer((byte[]) value));
- mapData.addBytesType(type);
- } else if (value.getClass() == String.class) {
- StringTypeBean type = new StringTypeBean();
- type.setName(name);
- type.setValue(value.toString());
- mapData.addStringType(type);
- } else if (value.getClass() == Buffer.class) {
- BufferTypeBean type = new BufferTypeBean();
- type.setName(name);
- type.setValue((Buffer) value);
- } else if (value instanceof Map) {
- Map<String, Key> subMap = (Map<String, Key>) value;
- for (Map.Entry<String, Key> entry : subMap.entrySet()) {
- MapDataBean md = new MapDataBean();
- md.setName(name);
- marshallMap(md, entry.getKey().toString(), entry.getValue());
- mapData.addMapType(md);
- }
- } else {
- throw new BlazeRuntimeException("Cannot seralize type " + value);
- }
- }
+ /**
+ * Initialize from a Buffer
+ */
+ public void read(BufferInputStream in) throws IOException {
+ super.read(in);
+ this.persistent = this.byteBool.readBoolean();
+ if (this.byteBool.readBoolean()) {
+ this.destination = new Destination();
+ this.destination.read(in);
+ }
+ if (this.byteBool.readBoolean()) {
+ this.replyTo = new Destination();
+ this.replyTo.read(in);
+ }
+ this.messageType = in.readString();
+ this.timeStamp = in.readLong();
+ this.expiration = in.readLong();
+ this.redeliveryCounter = in.readByte();
+ this.priority = in.readByte();
+ this.type = in.readByte();
+ this.content = in.readBuffer();
+ }
+
+ protected void preWrite() {
+ super.preWrite();
+ this.byteBool.writeBoolean(this.persistent);
+ this.byteBool.writeBoolean(this.destination != null);
+ this.byteBool.writeBoolean(this.replyTo != null);
}
- protected Map<String, Object> unmarshall(MapData mapData) {
- Map<String, Object> result = new ConcurrentHashMap<String, Object>();
- if (mapData.hasBoolType()) {
- for (BoolType type : mapData.getBoolTypeList()) {
- result.put(type.getName(), new Boolean(type.getValue()));
- }
- }
- if (mapData.hasCharType()) {
- for (CharType type : mapData.getCharTypeList()) {
- result.put(type.getName(), new Character(type.getValue().charAt(0)));
- }
- }
- if (mapData.hasShortType()) {
- for (ShortType type : mapData.getShortTypeList()) {
- result.put(type.getName(), new Short((short) type.getValue()));
- }
- }
- if (mapData.hasIntType()) {
- for (IntType type : mapData.getIntTypeList()) {
- result.put(type.getName(), new Integer(type.getValue()));
- }
- }
- if (mapData.hasLongType()) {
- for (LongType type : mapData.getLongTypeList()) {
- result.put(type.getName(), new Long(type.getValue()));
- }
- }
- if (mapData.hasFloatType()) {
- for (FloatType type : mapData.getFloatTypeList()) {
- result.put(type.getName(), new Float(type.getValue()));
- }
- }
- if (mapData.hasDoubleType()) {
- for (DoubleType type : mapData.getDoubleTypeList()) {
- result.put(type.getName(), new Double(type.getValue()));
- }
- }
- if (mapData.hasByteType()) {
- for (ByteType type : mapData.getByteTypeList()) {
- result.put(type.getName(), new Byte((byte) type.getValue()));
- }
- }
- if (mapData.hasStringType()) {
- for (StringType type : mapData.getStringTypeList()) {
- result.put(type.getName(), type.getValue());
- }
- }
- if (mapData.hasBytesType()) {
- for (BytesType type : mapData.getBytesTypeList()) {
- result.put(type.getName(), type.getValue().toByteArray());
- }
+ /**
+ * Write state to a Buffer
+ */
+ public void write(BufferOutputStream out) throws IOException {
+ super.write(out);
+ if (this.destination != null) {
+ this.destination.write(out);
+ } else {
}
- if (mapData.hasBufferType()) {
- for (BufferType type : mapData.getBufferTypeList()) {
- result.put(type.getName(), type.getValue());
- }
+ if (this.replyTo != null) {
+ this.replyTo.write(out);
}
- if (mapData.hasMapType()) {
- for (MapData type : mapData.getMapTypeList()) {
- Map<String, Object> map = unmarshall(type);
- result.put(type.getName(), map);
- }
+ out.writeString(this.messageType);
+ out.writeLong(this.timeStamp);
+ out.writeLong(this.expiration);
+ out.writeByte(this.redeliveryCounter);
+ out.writeByte(this.priority);
+ out.writeByte(this.type);
+ storeContent();
+ out.write(this.content);
+ }
+
+ protected void initializeReading() {
+ try {
+ loadContent();
+ } catch (IOException e) {
+ BlazeRuntimeException ex = new BlazeRuntimeException(e);
+ ex.initCause(e);
+ throw ex;
}
- return result;
+ }
+
+ protected void initializeWriting() {
+ setContent(null);
}
/**
* Store content into a BlazeData object for serialization
*/
- public void storeContent() {
+ protected void storeContent() throws IOException {
if (getContent() == null) {
- BlazeDataBean bd = new BlazeDataBean();
- if (!this.map.isEmpty()) {
- MapDataBean mapData = new MapDataBean();
- for (Map.Entry<String, Object> entry : this.map.entrySet()) {
- marshallMap(mapData, entry.getKey().toString(), entry.getValue());
- }
- bd.setMapData(mapData);
- }
- if (this.replyTo != null) {
- bd.setReplyToData(this.replyTo.getData());
- }
- if (this.messageType != null) {
- bd.setMessageType(new UTF8Buffer(this.messageType));
- }
- if (this.timeStamp > 0) {
- bd.setTimestamp(this.timeStamp);
- }
- if (this.expiration > 0) {
- bd.setExpiration(this.expiration);
- }
- if (this.redeliveryCounter > 0) {
- bd.setRedeliveryCounter(this.redeliveryCounter);
- }
- if (this.priority > 0) {
- bd.setPriority(this.priority);
- }
- if (this.persistent) {
- bd.setPersistent(this.persistent);
- }
- this.content = bd;
+ int streamSize = Math.max(16, this.map.size() * 16);
+ BufferOutputStream out = new BufferOutputStream(streamSize);
+ MarshallingSupport.marshalPrimitiveMap(this.map, out);
+ out.flush();
+ this.content = out.toBuffer();
}
}
/**
* Builds the message body from data
- *
*/
- protected void loadContent() throws BlazeRuntimeException {
+ protected void loadContent() throws IOException {
if (!this.loaded) {
this.loaded = true;
- BlazeData data = getContent();
- if (data != null && this.map.isEmpty()) {
- this.map = unmarshall(data.getMapData());
- if (data.hasReplyToData()) {
- this.replyTo = new Destination(data.getReplyToData());
- }
- if (data.hasMessageType()) {
- this.messageType = data.getMessageType().toStringUtf8();
- }
- this.timeStamp = data.getTimestamp();
- this.expiration = data.getExpiration();
- this.redeliveryCounter = data.getRedeliveryCounter();
- this.priority = data.getPriority();
- this.persistent = data.getPersistent();
+ Buffer buffer = getContent();
+ if (buffer != null && (this.map == null || this.map.isEmpty())) {
+ BufferInputStream in = new BufferInputStream(buffer);
+ this.map = MarshallingSupport.unmarshalPrimitiveMap(in);
}
}
}
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessageFormatException.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessageFormatException.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessageFormatException.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessageFormatException.java Fri Jul 19 18:44:21 2013
@@ -18,14 +18,12 @@ package org.apache.activeblaze;
/**
* Exception raised for message format exceptions
- *
*/
-public class BlazeMessageFormatException extends BlazeException{
+public class BlazeMessageFormatException extends BlazeException {
private static final long serialVersionUID = 1925143462979839452L;
/**
* Constructor
- * @param reason
*/
public BlazeMessageFormatException(String reason) {
super(reason);
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessageListener.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessageListener.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessageListener.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessageListener.java Fri Jul 19 18:44:21 2013
@@ -18,13 +18,11 @@ package org.apache.activeblaze;
/**
* A listener for BlazeMessages
- *
*/
public interface BlazeMessageListener {
-
+
/**
* Called when a Message is available to be processes
- * @param message
*/
public void onMessage(BlazeMessage message);
}
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeNoRouteException.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeNoRouteException.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeNoRouteException.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeNoRouteException.java Fri Jul 19 18:44:21 2013
@@ -18,7 +18,6 @@ package org.apache.activeblaze;
/**
* Blaze BlazeNoRouteException
- *
*/
public class BlazeNoRouteException extends BlazeException {
private static final long serialVersionUID = 3951297225484077839L;
@@ -34,10 +33,9 @@ public class BlazeNoRouteException exten
/**
* Constructs a new exception with the specified detail message. The cause is not initialized, and may subsequently
* be initialized by a call to {@link #initCause}.
- *
- * @param message
- * the detail message. The detail message is saved for later retrieval by the {@link #getMessage()}
- * method.
+ *
+ * @param message the detail message. The detail message is saved for later retrieval by the {@link #getMessage()}
+ * method.
*/
public BlazeNoRouteException(String message) {
super(message);
@@ -45,15 +43,13 @@ public class BlazeNoRouteException exten
/**
* Constructs a new exception with the specified detail message and cause.
- * <p>
+ * <p/>
* Note that the detail message associated with <code>cause</code> is <i>not</i> automatically incorporated in
* this exception's detail message.
- *
- * @param message
- * the detail message (which is saved for later retrieval by the {@link #getMessage()} method).
- * @param cause
- * the cause (which is saved for later retrieval by the {@link #getCause()} method). (A <tt>null</tt>
- * value is permitted, and indicates that the cause is nonexistent or unknown.)
+ *
+ * @param message the detail message (which is saved for later retrieval by the {@link #getMessage()} method).
+ * @param cause the cause (which is saved for later retrieval by the {@link #getCause()} method). (A <tt>null</tt>
+ * value is permitted, and indicates that the cause is nonexistent or unknown.)
*/
public BlazeNoRouteException(String message, Throwable cause) {
super(message, cause);
@@ -64,10 +60,9 @@ public class BlazeNoRouteException exten
* <tt>(cause==null ? null : cause.toString())</tt> (which typically contains the class and detail message of
* <tt>cause</tt>). This constructor is useful for exceptions that are little more than wrappers for other
* throwables (for example, {@link java.security.PrivilegedActionException}).
- *
- * @param cause
- * the cause (which is saved for later retrieval by the {@link #getCause()} method). (A <tt>null</tt>
- * value is permitted, and indicates that the cause is nonexistent or unknown.)
+ *
+ * @param cause the cause (which is saved for later retrieval by the {@link #getCause()} method). (A <tt>null</tt>
+ * value is permitted, and indicates that the cause is nonexistent or unknown.)
*/
public BlazeNoRouteException(Throwable cause) {
super(cause);
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeRuntimeException.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeRuntimeException.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeRuntimeException.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeRuntimeException.java Fri Jul 19 18:44:21 2013
@@ -18,11 +18,10 @@ package org.apache.activeblaze;
/**
* Blaze RuntimeException
- *
*/
public class BlazeRuntimeException extends RuntimeException {
private static final long serialVersionUID = -239755000850890447L;
-
+
/**
* Constructs a new exception with <code>null</code> as its detail message.
@@ -30,7 +29,7 @@ public class BlazeRuntimeException exten
* call to {@link #initCause}.
*/
public BlazeRuntimeException() {
- super();
+ super();
}
/**
@@ -38,11 +37,11 @@ public class BlazeRuntimeException exten
* cause is not initialized, and may subsequently be initialized by
* a call to {@link #initCause}.
*
- * @param message the detail message. The detail message is saved for
- * later retrieval by the {@link #getMessage()} method.
+ * @param message the detail message. The detail message is saved for
+ * later retrieval by the {@link #getMessage()} method.
*/
public BlazeRuntimeException(String message) {
- super(message);
+ super(message);
}
/**
@@ -51,12 +50,12 @@ public class BlazeRuntimeException exten
* <code>cause</code> is <i>not</i> automatically incorporated in
* this exception's detail message.
*
- * @param message the detail message (which is saved for later retrieval
- * by the {@link #getMessage()} method).
- * @param cause the cause (which is saved for later retrieval by the
- * {@link #getCause()} method). (A <tt>null</tt> value is
- * permitted, and indicates that the cause is nonexistent or
- * unknown.)
+ * @param message the detail message (which is saved for later retrieval
+ * by the {@link #getMessage()} method).
+ * @param cause the cause (which is saved for later retrieval by the
+ * {@link #getCause()} method). (A <tt>null</tt> value is
+ * permitted, and indicates that the cause is nonexistent or
+ * unknown.)
*/
public BlazeRuntimeException(String message, Throwable cause) {
super(message, cause);
@@ -70,10 +69,10 @@ public class BlazeRuntimeException exten
* wrappers for other throwables (for example, {@link
* java.security.PrivilegedActionException}).
*
- * @param cause the cause (which is saved for later retrieval by the
- * {@link #getCause()} method). (A <tt>null</tt> value is
- * permitted, and indicates that the cause is nonexistent or
- * unknown.)
+ * @param cause the cause (which is saved for later retrieval by the
+ * {@link #getCause()} method). (A <tt>null</tt> value is
+ * permitted, and indicates that the cause is nonexistent or
+ * unknown.)
*/
public BlazeRuntimeException(Throwable cause) {
super(cause);
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/Destination.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/Destination.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/Destination.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/Destination.java Fri Jul 19 18:44:21 2013
@@ -16,149 +16,124 @@
*/
package org.apache.activeblaze;
-import org.apache.activeblaze.wire.DestinationData;
-import org.apache.activeblaze.wire.DestinationData.DestinationDataBean;
-import org.apache.activeblaze.wire.DestinationData.DestinationDataBuffer;
-import org.apache.activemq.protobuf.Buffer;
+import java.io.IOException;
+
+import org.apache.activeblaze.wire.Buffer;
+import org.apache.activeblaze.wire.BufferInputStream;
+import org.apache.activeblaze.wire.BufferOutputStream;
+import org.apache.activeblaze.wire.ByteBool;
/**
* Holds information about a Destination
- *
*/
public class Destination {
-
- private DestinationDataBuffer data;
-
- /**
- * Default Constructor
- */
- public Destination() {
- this.data = new DestinationDataBean().freeze();
- }
-
- /**
- * Constructor
- * @param data
- */
- public Destination(DestinationData data) {
- this.data=data.freeze();
- }
-
- /**
- * Constructor
- * @param name
- */
- public Destination(String name) {
- this(name,true);
- }
-
-
- /**
- * Constructor
- * @param name
- * @param topic
- */
- public Destination(String name,boolean topic) {
- this(name,topic,false);
- }
-
+ private Buffer name;
+ private boolean topic;
+ private boolean temporary;
+
+
/**
* Constructor
- * @param name
- * @param topic
- * @param temporary
*/
- public Destination(String name,boolean topic,boolean temporary) {
- this(new Buffer(name), topic, temporary);
+ public Destination() {
}
-
+
/**
* Constructor
- * @param name
*/
- public Destination(Buffer name) {
- this(name,true);
+ public Destination(Buffer name, boolean topic) {
+ this.name = name;
+ this.topic = topic;
}
-
+
/**
* Constructor
- * @param name
- * @param topic
*/
- public Destination(Buffer name,boolean topic) {
- this(name,topic,false);
+ public Destination(String name, boolean topic) {
+ this.name = new Buffer(name);
+ this.topic = topic;
}
-
- /**
- * Constructor
- * @param name
- * @param topic
- * @param temporary
- */
- public Destination(Buffer name,boolean topic,boolean temporary) {
- DestinationDataBean bean=new DestinationDataBean();
- bean.setName(name);
- bean.setTopic(topic);
- bean.setTemporary(temporary);
- this.data = bean.freeze();
- }
-
-
+
/**
+ * Get the name
+ *
* @return the name
*/
public Buffer getName() {
- return this.data.getName();
+ return this.name;
}
+
/**
+ * Set the name
+ *
* @param name the name to set
*/
public void setName(Buffer name) {
- this.data = this.data.copy().setName(name).freeze();
+ this.name = name;
}
+
/**
+ * Get the topic
+ *
* @return the topic
*/
public boolean isTopic() {
- return this.data.getTopic();
+ return this.topic;
}
+
/**
+ * Set the topic
+ *
* @param topic the topic to set
*/
public void setTopic(boolean topic) {
- this.data = this.data.copy().setTopic(topic).freeze();
+ this.topic = topic;
+ }
+
+ /**
+ * Get the queue
+ *
+ * @return the queue
+ */
+ public boolean isQueue() {
+ return !isTopic();
}
+
/**
+ * Get the temporary
+ *
* @return the temporary
*/
public boolean isTemporary() {
- return this.data.getTopic();
+ return this.temporary;
}
+
/**
+ * Set the temporary
+ *
* @param temporary the temporary to set
*/
public void setTemporary(boolean temporary) {
- this.data = this.data.copy().setTemporary(temporary).freeze();
+ this.temporary = temporary;
}
-
- /**
- * @return true if a Topic
- */
- public boolean isQueue() {
- return !isTopic();
+
+ public void write(BufferOutputStream out) throws IOException {
+ ByteBool byteBool = new ByteBool();
+ byteBool.writeBoolean(this.topic);
+ byteBool.writeBoolean(this.temporary);
+ out.writeByte(byteBool.getData());
+ out.write(this.name);
}
- /**
- * @return the data
- */
- public DestinationDataBuffer getData() {
-
- return this.data;
- }
-
-
-
+
+ public void read(BufferInputStream in) throws IOException {
+ ByteBool byteBool = new ByteBool();
+ byteBool.setData(in.readByte());
+ this.topic = byteBool.readBoolean();
+ this.temporary = byteBool.readBoolean();
+ this.name = in.readBuffer();
+ }
+
public String toString() {
- return "Destination{"+ (isTopic()?"Topic":"Queue") + ": " +this.data.getName().toStringUtf8()+"}";
+ return "Destination{" + (isTopic() ? "Topic" : "Queue") + ": " + getName() + "}";
}
-
}
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/ExceptionListener.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/ExceptionListener.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/ExceptionListener.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/ExceptionListener.java Fri Jul 19 18:44:21 2013
@@ -18,12 +18,10 @@ package org.apache.activeblaze;
/**
* Listener for async exceptions
- *
*/
public interface ExceptionListener {
/**
* Called when an Async exception has been raised
- * @param ex
*/
void onException(Exception ex);
}
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/Processor.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/Processor.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/Processor.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/Processor.java Fri Jul 19 18:44:21 2013
@@ -17,35 +17,31 @@
package org.apache.activeblaze;
-import org.apache.activeblaze.impl.processor.Packet;
+import org.apache.activeblaze.wire.Packet;
/**
* Processes a Packet
- *
*/
-public interface Processor extends Service{
- /**
- * @param packet
- * @throws Exception
- */
- void downStream(Packet packet) throws Exception;
-
- /**
- * Send a management packet - this may be on a different address
+public interface Processor extends Service {
+ /**
* @param packet
* @throws Exception
*/
+ void downStream(Packet packet) throws Exception;
+
+ /**
+ * Send a management packet - this may be on a different address
+ */
public void downStreamManagement(Packet packet) throws Exception;
-
- /**
- * @param packet
- * @throws Exception
- */
- void upStream(Packet packet) throws Exception;
-
- /**
- * Set An exception Listener
- * @param l
- */
- void setExceptionListener(ExceptionListener l);
+
+ /**
+ * @param packet
+ * @throws Exception
+ */
+ void upStream(Packet packet) throws Exception;
+
+ /**
+ * Set An exception Listener
+ */
+ void setExceptionListener(ExceptionListener l);
}
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/Service.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/Service.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/Service.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/Service.java Fri Jul 19 18:44:21 2013
@@ -18,45 +18,51 @@ package org.apache.activeblaze;
/**
* LifeCycle for an administrated object
- *
*/
public interface Service {
/**
* initialize the service
+ *
* @return true if initialized
- * @throws Exception
*/
public boolean init() throws Exception;
+
/**
* Start the service
+ *
* @return true if started
- * @throws Exception
*/
public boolean start() throws Exception;
+
/**
* Stop the service
+ *
* @return true if stopped
- * @throws Exception
*/
public boolean stop() throws Exception;
+
/**
* Shutdown the Service
+ *
* @return true if shutdown, false if already in the shutdown state
- * @throws Exception
*/
public boolean shutDown() throws Exception;
+
/**
* @return true if started
*/
public boolean isStarted();
+
/**
* @return true if stopped
*/
public boolean isStopped();
+
/**
* @return true if initialized
*/
public boolean isInitialized();
+
/**
* @return true if shutDown
*/
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/Subscription.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/Subscription.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/Subscription.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/Subscription.java Fri Jul 19 18:44:21 2013
@@ -16,293 +16,232 @@
*/
package org.apache.activeblaze;
+import java.io.IOException;
+
import org.apache.activeblaze.impl.destination.DestinationMatch;
-import org.apache.activeblaze.wire.DestinationData;
-import org.apache.activeblaze.wire.DestinationData.DestinationDataBean;
-import org.apache.activeblaze.wire.SubscriptionData.SubscriptionDataBean;
-import org.apache.activemq.protobuf.Buffer;
+import org.apache.activeblaze.wire.Buffer;
+import org.apache.activeblaze.wire.BufferInputStream;
+import org.apache.activeblaze.wire.BufferOutputStream;
/**
* Subscription Info
- *
*/
public class Subscription {
- private final SubscriptionDataBean data;
+ private String channelName;
+ private String selector;
+ private String subscriberName;
+ private boolean durable;
+ private boolean noLocal;
+ private int weight;
+ private Destination destination;
/**
* Default Constructor
*/
public Subscription() {
- this.data = new SubscriptionDataBean();
- }
-
- /**
- * Default Constructor
- *
- * @param data
- */
- public Subscription(SubscriptionDataBean data) {
- this.data = data;
}
- /**
- * Constructor - create a vanilla subscription
- *
- * @param destination
- */
- public Subscription(String destination) {
- this.data = new SubscriptionDataBean();
- Destination dest = new Destination(destination);
- this.data.setDestinationData(dest.getData());
- }
/**
* Constructor - create a vanilla subscription
- *
- * @param destination
- * @param topic
*/
public Subscription(String destination, boolean topic) {
- this.data = new SubscriptionDataBean();
- Destination dest = new Destination(destination,topic);
- this.data.setDestinationData(dest.getData());
- setTopic(topic);
+ this.destination = new Destination(new Buffer(destination), topic);
}
/**
* Constructor - create a vanilla subscription
- *
- * @param destination
*/
- public Subscription(Buffer destination) {
- this.data = new SubscriptionDataBean();
- Destination dest = new Destination(destination);
- this.data.setDestinationData(dest.getData());
+ public Subscription(Buffer destination, boolean topic) {
+ this.destination = new Destination(destination, topic);
}
- /**
- * @return the underlying SubscriptionDataBean
- */
- public SubscriptionDataBean getData() {
- return this.data;
- }
/**
* set the durability of the subscription
- *
- * @param value
*/
public void setDurable(boolean value) {
- this.data.setDurable(value);
+ this.durable = value;
}
/**
* @return true if durable
*/
public boolean isDurable() {
- return this.data.getDurable();
+ return this.durable;
}
/**
* Set the weight - for Queues - higher weighted subscribers take precedence
- *
- * @param value
*/
public void setWeight(int value) {
- this.data.setWeight(value);
+ this.weight = value;
}
/**
* @return the weight
*/
public int getWeight() {
- return this.data.getWeight();
+ return this.weight;
}
/**
* Set the channel name of the subscription
- *
- * @param value
*/
public void setChannelName(String value) {
- this.data.setChannelName(value);
+ this.channelName = value;
}
/**
* @return the channel name of the subscription
*/
public String getChannelName() {
- return this.data.getChannelName();
+ return this.channelName;
}
/**
* Set the name of the subscriber
- *
- * @param value
*/
public void setSubscriberName(String value) {
- this.data.setSubscriberName(value);
+ this.subscriberName = value;
}
/**
* @return the name of the subscriber
*/
public String getSubscriberName() {
- return this.data.getSubscriberName();
+ return this.subscriberName;
}
/**
* Set a SQL92 selector
- *
- * @param selector
*/
public void setSelector(String selector) {
- this.data.setSelector(selector);
+ this.selector = selector;
}
/**
* @return the selector
*/
public String getSelector() {
- return this.data.getSelector();
+ return this.selector;
}
/**
* Set the Destination
- *
- * @param data
- */
- public void setDestination(DestinationData data) {
- this.data.setDestinationData(data);
- }
-
- /**
- * @return the Destination
*/
- public DestinationDataBean getDestinationData() {
- if( !this.data.hasDestinationData() ) {
- this.data.setDestinationData(new DestinationDataBean());
- }
- return this.data.getDestinationData().copy();
+ public void setDestination(Destination destination) {
+ this.destination = destination;
}
/**
- * @return the Destination
+ * @return Destination
*/
public Destination getDestination() {
- return new Destination(this.data.getDestinationData());
+ return this.destination;
}
- /**
- * @return the name
- */
- public Buffer getDestinationName() {
- return this.getDestinationData().getName();
- }
-
- /**
- * @param name
- * the name to set
- */
- public void setDestinationName(Buffer name) {
- this.getDestinationData().setName(name);
- }
/**
* @return the topic
*/
public boolean isTopic() {
- return this.getDestinationData().getTopic();
+ return this.destination.isTopic();
}
- /**
- * @param topic
- * the topic to set
- */
- public void setTopic(boolean topic) {
- this.getDestinationData().setTopic(topic);
- }
/**
* @return the temporary
*/
public boolean isTemporary() {
- return this.getDestinationData().getTopic();
+ return this.destination.isTemporary();
}
- /**
- * @param temporary
- * the temporary to set
- */
- public void setTemporary(boolean temporary) {
- this.getDestinationData().setTemporary(temporary);
- }
-
+
/**
* @return noLocal
*/
public boolean isNoLocal() {
- return this.data.getNoLocal();
+ return this.noLocal;
}
-
+
/**
* @param noLocal
*/
public void setNoLocal(boolean noLocal) {
- this.data.setNoLocal(noLocal);
+ this.noLocal = noLocal;
}
- /**
+ /**
* @return hash code
* @see java.lang.Object#hashCode()
*/
public int hashCode() {
- return this.data.getDestinationData().getName().hashCode();
+ return this.destination.getName().hashCode();
}
-
- /**
- * @param obj
+
+ /**
* @return true if equals <Code>this</Code>
* @see java.lang.Object#equals(java.lang.Object)
*/
public boolean equals(Object obj) {
- if( obj==this )
- return true;
-
- if( obj==null || obj.getClass()!=Subscription.class )
- return false;
-
- return equals((Subscription)obj);
- }
+ if (obj == this)
+ return true;
+
+ if (obj == null || obj.getClass() != Subscription.class)
+ return false;
+
+ return equals((Subscription) obj);
+ }
/**
- * @param other
* @return true if other is equivalent
*/
public boolean equals(Subscription other) {
if (other != null) {
- return this.data.getDestinationData().getName().equals(other.data.getDestinationData().getName());
+ return this.destination.getName().equals(other.destination.getName()) && this.destination.isTopic() == other.destination.isTopic();
}
return false;
}
/**
* Does this subscription match a destination
- *
- * @param destination
+ *
* @return true if it matches a destination
*/
- public boolean matches(Destination destination) {
- return matches(destination.getName());
+ public boolean matches(final Destination dest) {
+ return matches(dest.getName());
}
/**
* Does this subscription match a destination
- *
- * @param destination
+ *
* @return true if it matches a destination
*/
- public boolean matches(Buffer destination) {
- return DestinationMatch.isMatch(this.data.getDestinationData().getName(), destination);
+ public boolean matches(Buffer destinationName) {
+ return DestinationMatch.isMatch(this.destination.getName(), destinationName);
+ }
+
+
+ public void write(BufferOutputStream out) throws IOException {
+ out.writeBoolean(this.durable);
+ out.writeBoolean(this.noLocal);
+ out.writeString(getChannelName());
+ out.writeString(getSelector());
+ out.writeInt(getWeight());
+ this.destination.write(out);
}
+
+ public void read(BufferInputStream in) throws IOException {
+
+ this.durable = in.readBoolean();
+ this.noLocal = in.readBoolean();
+ this.channelName = in.readString();
+ this.selector = in.readString();
+ this.weight = in.readInt();
+ this.destination = new Destination();
+ this.destination.read(in);
+ }
+
+
}
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/SubscriptionHolder.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/SubscriptionHolder.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/SubscriptionHolder.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/SubscriptionHolder.java Fri Jul 19 18:44:21 2013
@@ -18,40 +18,22 @@ package org.apache.activeblaze;
/**
* A SubscriptionHolder
- *
*/
public class SubscriptionHolder {
private final BlazeMessageListener listener;
private final Subscription subscription;
+
/**
* Constructor
- *
- * @param destination
- * @param listener
- */
- public SubscriptionHolder(String destination, BlazeMessageListener listener) {
- this.listener = listener;
- this.subscription = new Subscription(destination);
- }
-
- /**
- * Constructor
- *
- * @param destination
- * @param topic
- * @param listener
*/
- public SubscriptionHolder(String destination, boolean topic,BlazeMessageListener listener) {
+ public SubscriptionHolder(String destination, boolean topic, BlazeMessageListener listener) {
this.listener = listener;
- this.subscription = new Subscription(destination,topic);
+ this.subscription = new Subscription(destination, topic);
}
/**
* Constructor
- *
- * @param listener
- * @param subscription
*/
public SubscriptionHolder(Subscription subscription, BlazeMessageListener listener) {
this.listener = listener;
@@ -60,7 +42,7 @@ public class SubscriptionHolder {
/**
* Get the listener
- *
+ *
* @return the listener
*/
public BlazeMessageListener getListener() {
@@ -69,7 +51,7 @@ public class SubscriptionHolder {
/**
* Get the subscription
- *
+ *
* @return the subscription
*/
public Subscription getSubscription() {
@@ -85,7 +67,6 @@ public class SubscriptionHolder {
}
/**
- * @param obj
* @return true if equals
*/
public boolean equals(Object obj) {
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannel.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannel.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannel.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannel.java Fri Jul 19 18:44:21 2013
@@ -18,52 +18,45 @@ package org.apache.activeblaze.cluster;
import org.apache.activeblaze.group.BlazeGroupChannel;
import org.apache.activeblaze.group.Member;
+
/**
* A <CODE>BlazeGroupChannel</CODE> enables peer-based point to point communication and maintains a coordinator
* (elected leader) for the group
- *
*/
-public interface BlazeClusterGroupChannel extends BlazeGroupChannel{
+public interface BlazeClusterGroupChannel extends BlazeGroupChannel {
/**
* @return true if this Channel is the coordinator of the group
- * @throws Exception
*/
public boolean isMaster() throws Exception;
+
/**
* @return the member of the group which is the coordinator
- * @throws Exception
*/
public Member getMaster() throws Exception;
-
+
/**
* Add a listener for cluster changes
- *
- * @param l
- * @throws Exception
*/
public void addMasterChangedListener(MasterChangedListener l) throws Exception;
/**
* Remove a listener for cluster changes
- *
- * @param l
- * @throws Exception
*/
public void removeMasterChangedListener(MasterChangedListener l) throws Exception;
-
+
/**
* @return the configuration
*/
public BlazeClusterGroupConfiguration getConfiguration();
-
+
/**
* waits for election in the group to finish
+ *
* @param timeout time to wait in milliseconds
* @return true if election finished
- * @throws Exception
*/
public boolean waitForElection(int timeout) throws Exception;
-
+
/**
* @return a <Code>ClusterState</Code>
*/
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelFactory.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelFactory.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelFactory.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelFactory.java Fri Jul 19 18:44:21 2013
@@ -19,39 +19,36 @@ package org.apache.activeblaze.cluster;
import org.apache.activeblaze.group.BlazeGroupChannelFactory;
-
/**
* Factory class for creating <Code>BlazeGroupChannel</CODE>
*/
public class BlazeClusterGroupChannelFactory extends BlazeGroupChannelFactory {
-
+
/**
* Default Constructor
*/
public BlazeClusterGroupChannelFactory() {
super(new BlazeClusterGroupConfiguration());
}
-
+
/**
* Construct a factory to use the passed Configuration
- * @param config
*/
- public BlazeClusterGroupChannelFactory(BlazeClusterGroupConfiguration config){
+ public BlazeClusterGroupChannelFactory(BlazeClusterGroupConfiguration config) {
super(config);
}
-
+
/**
* Create a GroupChannel
- * @param name
+ *
* @return the Channel
- * @throws Exception
*/
public BlazeClusterGroupChannel createChannel(String name) throws Exception {
BlazeClusterGroupChannelImpl result = new BlazeClusterGroupChannelImpl(name);
result.setConfiguration(getConfiguration().copy());
return result;
}
-
+
/**
* @return the configuration
*/