You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2009/03/13 10:04:24 UTC
svn commit: r753178 - in
/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze: ./
group/ impl/reliable/simple/ jms/ jms/message/
Author: rajdavies
Date: Fri Mar 13 09:04:23 2009
New Revision: 753178
URL: http://svn.apache.org/viewvc?rev=753178&view=rev
Log:
remove depreciated code
Removed:
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/simple/SimpleFlow.java
Modified:
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessage.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessageProcessor.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/simple/SimpleReliableProcessor.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnection.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsMessageTransformation.java
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java?rev=753178&r1=753177&r2=753178&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java Fri Mar 13 09:04:23 2009
@@ -19,7 +19,6 @@
import java.net.URI;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
-
import org.apache.activeblaze.impl.network.Network;
import org.apache.activeblaze.impl.network.NetworkFactory;
import org.apache.activeblaze.impl.processor.ChainedProcessor;
@@ -38,6 +37,7 @@
import org.apache.activeblaze.wire.PacketData.PacketDataBuffer;
import org.apache.activemq.protobuf.Buffer;
import org.apache.activemq.protobuf.MessageBuffer;
+import org.apache.activemq.protobuf.UTF8Buffer;
/**
* <P>
* A <CODE>BlazeChannel</CODE> handles all client communication, either unicast,
@@ -185,14 +185,7 @@
}
public synchronized void broadcast(Destination destination, BlazeMessage msg) throws Exception {
- msg.setDestination(destination);
- msg.storeContent();
- BlazeDataBuffer blazeData = msg.getContent().freeze();
- PacketDataBean packetData = getPacketData(MessageType.BLAZE_DATA, blazeData);
- packetData.setReliable(true);
- packetData.setDestinationData(destination.getData());
- packetData.setPayloadType(msg.getType());
- Packet packet = new Packet(packetData.freeze());
+ Packet packet = buildPacket(destination, msg);
this.broadcast.downStream(packet);
}
@@ -229,19 +222,19 @@
public void setConfiguration(BlazeConfiguration configuration) {
this.configuration = configuration;
}
-
/**
* @return the blazeMessageProcessor
*/
- public BlazeMessageProcessor getBlazeMessageProcessor(){
+ public BlazeMessageProcessor getBlazeMessageProcessor() {
return this.blazeMessageProcessor;
}
/**
- * @param blazeMessageProcessor the blazeMessageProcessor to set
+ * @param blazeMessageProcessor
+ * the blazeMessageProcessor to set
*/
- public void setBlazeMessageProcessor(BlazeMessageProcessor blazeMessageProcessor){
+ public void setBlazeMessageProcessor(BlazeMessageProcessor blazeMessageProcessor) {
this.blazeMessageProcessor = blazeMessageProcessor;
}
@@ -261,29 +254,28 @@
BlazeMessage result = null;
if (this.blazeMessageProcessor != null) {
result = this.blazeMessageProcessor.processBlazeMessage(data);
- }else {
-
- if (data != null) {
- DestinationData destination = data.getDestinationData();
- Buffer payload = data.getPayload();
- BlazeDataBuffer blazeData = BlazeDataBuffer.parseUnframed(payload);
- String fromId = null;
- if (data.hasProducerId()) {
- fromId = data.getProducerId().toStringUtf8();
- }
- result = createMessage(fromId);
- result.setDestination(destination);
- result.setFromId(fromId);
- if (data.hasMessageId()) {
- result.setMessageId(data.getMessageId().toStringUtf8());
- }
- if (data.hasCorrelationId()) {
- result.setCorrelationId(data.getCorrelationId().toStringUtf8());
+ } else {
+ if (data != null) {
+ DestinationData destination = data.getDestinationData();
+ Buffer payload = data.getPayload();
+ BlazeDataBuffer blazeData = BlazeDataBuffer.parseUnframed(payload);
+ String fromId = null;
+ if (data.hasProducerId()) {
+ fromId = data.getProducerId().toStringUtf8();
+ }
+ result = createMessage(fromId);
+ result.setDestination(destination);
+ result.setFromId(fromId);
+ if (data.hasMessageId()) {
+ result.setMessageId(data.getMessageId().toStringUtf8());
+ }
+ if (data.hasCorrelationId()) {
+ result.setCorrelationId(data.getCorrelationId().toStringUtf8());
+ }
+ result.setTimeStamp(blazeData.getTimestamp());
+ result.setType(data.getPayloadType());
+ result.setContent(blazeData);
}
- result.setTimeStamp(blazeData.getTimestamp());
- result.setType(data.getPayloadType());
- result.setContent(blazeData);
- }
}
return result;
}
@@ -306,4 +298,33 @@
}
}
}
+
+ protected final Packet buildPacket(Destination destination, BlazeMessage message) {
+ return buildPacket(destination, message, false,null);
+ }
+
+ protected final Packet buildPacket(Destination destination, BlazeMessage message,String correlationId) {
+ return buildPacket(destination, message, false,correlationId);
+ }
+
+ protected final Packet buildPacket(Destination destination, BlazeMessage message,boolean responseRequired) {
+ return buildPacket(destination, message, responseRequired,null);
+ }
+
+ protected final Packet buildPacket(Destination destination, BlazeMessage message, boolean responseRequired,String correlationId) {
+ message.setDestination(destination);
+ message.storeContent();
+
+ BlazeDataBuffer blazeData = message.getContent().freeze();
+ PacketDataBean packetData = getPacketData(MessageType.BLAZE_DATA, blazeData);
+ packetData.setReliable(true);
+ packetData.setResponseRequired(responseRequired);
+ if (correlationId != null && correlationId.length() > 0){
+ packetData.setCorrelationId(new UTF8Buffer(correlationId));
+ }
+ packetData.setDestinationData(destination.getData());
+ packetData.setPayloadType(message.getType());
+ Packet packet = new Packet(packetData.freeze());
+ return packet;
+ }
}
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessage.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessage.java?rev=753178&r1=753177&r2=753178&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessage.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessage.java Fri Mar 13 09:04:23 2009
@@ -23,7 +23,6 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-
import org.apache.activeblaze.util.IOUtils;
import org.apache.activeblaze.wire.BlazeData;
import org.apache.activeblaze.wire.BoolType;
@@ -53,26 +52,33 @@
import org.apache.activeblaze.wire.ShortType.ShortTypeBean;
import org.apache.activeblaze.wire.StringType.StringTypeBean;
import org.apache.activemq.protobuf.Buffer;
-
+import org.apache.activemq.protobuf.UTF8Buffer;
/**
- * A <CODE>BlazeMessage</CODE> object is used to send a set of name-value pairs. The names are <CODE>String</CODE>
- * objects, and the values are primitive data types in the Java programming language. The names must have a value that
- * is not null, and not an empty string. The entries can be accessed sequentially or randomly by name. The order of the
- * entries is undefined. <CODE>BlazeMessage</CODE> inherits from the <CODE>Message</CODE> interface and adds a
- * message body that contains a Map.
+ * A <CODE>BlazeMessage</CODE> object is used to send a set of name-value pairs.
+ * The names are <CODE>String</CODE> objects, and the values are primitive data
+ * types in the Java programming language. The names must have a value that is
+ * not null, and not an empty string. The entries can be accessed sequentially
+ * or randomly by name. The order of the entries is undefined.
+ * <CODE>BlazeMessage</CODE> inherits from the <CODE>Message</CODE> interface
+ * and adds a message body that contains a Map.
* <P>
- * The primitive types can be read or written explicitly using methods for each type. They may also be read or written
- * generically as objects. For instance, a call to <CODE>BlazeMessage.setInt("foo", 6)</CODE> is equivalent to
- * <CODE> BlazeMessage.setObject("foo", new Integer(6))</CODE>. Both forms are provided, because the explicit form is
- * convenient for static programming, and the object form is needed when types are not known at compile time.
+ * The primitive types can be read or written explicitly using methods for each
+ * type. They may also be read or written generically as objects. For instance,
+ * a call to <CODE>BlazeMessage.setInt("foo", 6)</CODE> is equivalent to
+ * <CODE> BlazeMessage.setObject("foo", new Integer(6))</CODE>. Both forms are
+ * provided, because the explicit form is convenient for static programming, and
+ * the object form is needed when types are not known at compile time.
* <P>
* <P>
- * <CODE>BlazeMessage</CODE> objects support the following conversion table. The marked cases must be supported. The
- * unmarked cases must throw a <CODE>JMSException</CODE>. The <CODE>String</CODE> -to-primitive conversions may
- * throw a runtime exception if the primitive's <CODE>valueOf()</CODE> method does not accept it as a valid
- * <CODE> String</CODE> representation of the primitive.
+ * <CODE>BlazeMessage</CODE> objects support the following conversion table. The
+ * marked cases must be supported. The unmarked cases must throw a
+ * <CODE>JMSException</CODE>. The <CODE>String</CODE> -to-primitive conversions
+ * may throw a runtime exception if the primitive's <CODE>valueOf()</CODE>
+ * method does not accept it as a valid <CODE> String</CODE> representation of
+ * the primitive.
* <P>
- * A value written as the row type can be read as the column type. <p/>
+ * A value written as the row type can be read as the column type.
+ * <p/>
*
* <PRE>
* | | boolean byte short char int long float double String byte[] |----------------------------------------------------------------------
@@ -83,13 +89,14 @@
*
* <p/>
* <P>
- * Attempting to read a null value as a primitive type must be treated as calling the primitive's corresponding
- * <code>valueOf(String)</code> conversion method with a null value. Since <code>char</code> does not support a
- * <code>String</code> conversion, attempting to read a null value as a <code>char</code> must throw a
- * <code>NullPointerException</code>.
+ * Attempting to read a null value as a primitive type must be treated as
+ * calling the primitive's corresponding <code>valueOf(String)</code> conversion
+ * method with a null value. Since <code>char</code> does not support a
+ * <code>String</code> conversion, attempting to read a null value as a
+ * <code>char</code> must throw a <code>NullPointerException</code>.
*
*/
-public class BlazeMessage implements Map<String, Object>{
+public class BlazeMessage implements Map<String, Object> {
private static final String DEFAULT_TEXT_PAYLOAD = "DEFAULT_TEXT_PAYLOAD";
private static final String DEFAULT_BYTES_PAYLOAD = "DEFAULT_BYTES_PAYLOAD";
private static final String DEFAULT_OBJECT_PAYLOAD = "DEFAULT_OBJECT_PAYLOAD";
@@ -107,22 +114,24 @@
private transient boolean persistent;
private transient int type;
private BlazeData content;
-
+ private transient boolean loaded;
+
/**
* Default Constructor
*/
public BlazeMessage() {
}
-
+
/**
- * Constructor - Utility to construct a message with a text <Code>String</Code> payload
+ * Constructor - Utility to construct a message with a text
+ * <Code>String</Code> payload
*
* @param text
*/
public BlazeMessage(String text) {
setStringValue(DEFAULT_TEXT_PAYLOAD, text);
}
-
+
/**
* Constructor - Utility to construct a message with a byte[] array payload
*
@@ -131,7 +140,7 @@
public BlazeMessage(byte[] data) {
setBytesValue(DEFAULT_BYTES_PAYLOAD, data);
}
-
+
/**
* Constructor - Utility to construct a message with an object payload
*
@@ -140,269 +149,282 @@
public BlazeMessage(Object data) {
setObject(data);
}
-
+
/**
* Utility method for setting a default <Code>String</Code> payload
*
* @param text
*/
- public void setText(String text){
+ public void setText(String text) {
setStringValue(DEFAULT_TEXT_PAYLOAD, text);
}
-
+
/**
- * Utility method used for when a BlazeMessage is only carrying a byte[] array
+ * Utility method used for when a BlazeMessage is only carrying a byte[]
+ * array
*
* @return text the default text
* @throws Exception
*/
- public String getText() throws Exception{
+ public String getText() throws Exception {
return getStringValue(DEFAULT_TEXT_PAYLOAD);
}
-
+
/**
* Utility method for setting a default <Code>byte[]</Code> payload
*
* @param payload
*/
- public void setBytes(byte[] payload){
+ public void setBytes(byte[] payload) {
setBytesValue(DEFAULT_BYTES_PAYLOAD, payload);
}
-
+
/**
* Utility method used for when a BlazeMessage is only carrying an Object
*
* @return text the default text
* @throws Exception
*/
- public Object getObject() throws Exception{
+ public Object getObject() throws Exception {
Buffer buffer = getBufferValue(DEFAULT_OBJECT_PAYLOAD);
return IOUtils.getObject(buffer);
}
-
+
/**
* Utility method for setting a default <Code>Object</Code> payload
*
* @param payload
*/
- public void setObject(Object payload){
+ public void setObject(Object payload) {
try {
put(DEFAULT_OBJECT_PAYLOAD, IOUtils.getBuffer(payload));
} catch (Exception e) {
throw new BlazeRuntimeException(e);
}
}
-
+
/**
* Utility method used for when a BlazeMessage is only carrying a String
*
* @return text the default text
* @throws Exception
*/
- public byte[] getBytes() throws Exception{
+ public byte[] getBytes() throws Exception {
return getBytesValue(DEFAULT_BYTES_PAYLOAD);
}
-
+
/**
* @return the destination
*/
- public Destination getDestination(){
+ public Destination getDestination() {
initializeReading();
return this.destination;
}
-
+
/**
- * @param destination the destination to set
+ * @param destination
+ * the destination to set
*/
- public void setDestination(Destination destination){
+ public void setDestination(Destination destination) {
this.destination = destination;
}
-
+
/**
* @param destination
*/
- public void setDestination(DestinationData destinationData){
+ public void setDestination(DestinationData destinationData) {
if (destinationData != null) {
this.destination = new Destination(destinationData);
}
}
-
+
/**
* The id of the channel that sent the message
*
* @return the fromId
*/
- public String getFromId(){
+ public String getFromId() {
initializeReading();
return this.fromId;
}
-
+
/**
- * @param fromId the fromId to set
+ * @param fromId
+ * the fromId to set
*/
- public void setFromId(String fromId){
+ public void setFromId(String fromId) {
this.fromId = fromId;
}
-
+
/**
* @return the messageId
*/
- public String getMessageId(){
+ public String getMessageId() {
initializeReading();
return this.messageId;
}
-
+
/**
- * @param messageId the messageId to set
+ * @param messageId
+ * the messageId to set
*/
- public void setMessageId(String messageId){
+ public void setMessageId(String messageId) {
this.messageId = messageId;
}
-
+
/**
* @return the correlationId
*/
- public String getCorrelationId(){
+ public String getCorrelationId() {
initializeReading();
return this.correlationId;
}
-
+
/**
- * @param correlationId the correlationId to set
+ * @param correlationId
+ * the correlationId to set
*/
- public void setCorrelationId(String correlationId){
+ public void setCorrelationId(String correlationId) {
this.correlationId = correlationId;
}
-
+
/**
* @return the timeStamp
*/
- public long getTimeStamp(){
+ public long getTimeStamp() {
initializeReading();
return this.timeStamp;
}
-
+
/**
- * @param timeStamp the timeStamp to set
+ * @param timeStamp
+ * the timeStamp to set
*/
- public void setTimeStamp(long timeStamp){
+ public void setTimeStamp(long timeStamp) {
this.timeStamp = timeStamp;
}
-
+
/**
* @return the replyTo
*/
- public Destination getReplyTo(){
+ public Destination getReplyTo() {
initializeReading();
return this.replyTo;
}
-
+
/**
- * @param replyTo the replyTo to set
+ * @param replyTo
+ * the replyTo to set
*/
- public void setReplyTo(Destination replyTo){
+ public void setReplyTo(Destination replyTo) {
this.replyTo = replyTo;
}
-
+
/**
- * @param replyTo the replyTo to set
+ * @param replyTo
+ * the replyTo to set
*/
- public void setReplyTo(DestinationData replyTo){
+ public void setReplyTo(DestinationData replyTo) {
this.replyTo = new Destination(replyTo);
}
-
+
/**
* @return the expiration
*/
- public long getExpiration(){
+ public long getExpiration() {
initializeReading();
return this.expiration;
}
-
+
/**
- * @param expiration the expiration to set
+ * @param expiration
+ * the expiration to set
*/
- public void setExpiration(long expiration){
+ public void setExpiration(long expiration) {
this.expiration = expiration;
}
-
+
/**
* @return the redeliveryCounter
*/
- public int getRedeliveryCounter(){
+ public int getRedeliveryCounter() {
initializeReading();
return this.redeliveryCounter;
}
-
+
/**
- * @param redeliveryCounter the redeliveryCounter to set
+ * @param redeliveryCounter
+ * the redeliveryCounter to set
*/
- public void setRedeliveryCounter(int redeliveryCounter){
+ public void setRedeliveryCounter(int redeliveryCounter) {
this.redeliveryCounter = redeliveryCounter;
}
-
+
/**
* @return the priority
*/
- public int getPriority(){
+ public int getPriority() {
initializeReading();
return this.priority;
}
-
+
/**
- * @param priority the priority to set
+ * @param priority
+ * the priority to set
*/
- public void setPriority(int priority){
+ public void setPriority(int priority) {
this.priority = priority;
}
-
+
/**
* @return the persistent
*/
- public boolean isPersistent(){
+ public boolean isPersistent() {
initializeReading();
return this.persistent;
}
-
+
/**
- * @param persistent the persistent to set
+ * @param persistent
+ * the persistent to set
*/
- public void setPersistent(boolean persistent){
+ public void setPersistent(boolean persistent) {
this.persistent = persistent;
}
-
+
/**
* @return the type
*/
- public String getMessageType(){
+ public String getMessageType() {
initializeReading();
return this.messageType;
}
-
+
/**
- * @param type the type to set
+ * @param type
+ * the type to set
*/
- public void setMessageType(String type){
+ public void setMessageType(String type) {
this.messageType = type;
}
-
+
/**
* Get the type
*
* @return the type
*/
- public int getType(){
+ public int getType() {
return this.type;
}
-
- public void setType(int type){
+
+ public void setType(int type) {
this.type = type;
}
-
+
/**
* @return a copy of this message
*/
- public BlazeMessage clone(){
+ public BlazeMessage clone() {
BlazeMessage copy = new BlazeMessage();
try {
copy(copy);
@@ -411,22 +433,24 @@
}
return copy;
}
-
+
/**
* clear the contents of this message
*/
- public void clear(){
+ public void clear() {
this.map.clear();
}
-
+
/**
* Returns the <CODE>boolean</CODE> value with the specified name.
*
- * @param name the name of the <CODE>boolean</CODE>
+ * @param name
+ * the name of the <CODE>boolean</CODE>
* @return the <CODE>boolean</CODE> value with the specified name
- * @throws BlazeMessageFormatException if this type conversion is invalid.
+ * @throws BlazeMessageFormatException
+ * if this type conversion is invalid.
*/
- public boolean getBooleanValue(String name) throws BlazeMessageFormatException{
+ public boolean getBooleanValue(String name) throws BlazeMessageFormatException {
initializeReading();
Object value = this.map.get(name);
if (value == null) {
@@ -440,15 +464,17 @@
}
throw new BlazeMessageFormatException(" cannot read a boolean from " + value.getClass().getName());
}
-
+
/**
* Returns the <CODE>byte</CODE> value with the specified name.
*
- * @param name the name of the <CODE>byte</CODE>
+ * @param name
+ * the name of the <CODE>byte</CODE>
* @return the <CODE>byte</CODE> value with the specified name
- * @throws BlazeMessageFormatException if this type conversion is invalid.
+ * @throws BlazeMessageFormatException
+ * if this type conversion is invalid.
*/
- public byte getByteValue(String name) throws BlazeMessageFormatException{
+ public byte getByteValue(String name) throws BlazeMessageFormatException {
initializeReading();
Object value = this.map.get(name);
if (value == null) {
@@ -462,15 +488,17 @@
}
throw new BlazeMessageFormatException(" cannot read a byte from " + value.getClass().getName());
}
-
+
/**
* Returns the <CODE>short</CODE> value with the specified name.
*
- * @param name the name of the <CODE>short</CODE>
+ * @param name
+ * the name of the <CODE>short</CODE>
* @return the <CODE>short</CODE> value with the specified name
- * @throws BlazeMessageFormatException if this type conversion is invalid.
+ * @throws BlazeMessageFormatException
+ * if this type conversion is invalid.
*/
- public short getShortValue(String name) throws BlazeMessageFormatException{
+ public short getShortValue(String name) throws BlazeMessageFormatException {
initializeReading();
Object value = this.map.get(name);
if (value == null) {
@@ -487,15 +515,17 @@
}
throw new BlazeMessageFormatException(" cannot read a short from " + value.getClass().getName());
}
-
+
/**
* Returns the Unicode character value with the specified name.
*
- * @param name the name of the Unicode character
+ * @param name
+ * the name of the Unicode character
* @return the Unicode character value with the specified name
- * @throws BlazeMessageFormatException if this type conversion is invalid.
+ * @throws BlazeMessageFormatException
+ * if this type conversion is invalid.
*/
- public char getCharValue(String name) throws BlazeMessageFormatException{
+ public char getCharValue(String name) throws BlazeMessageFormatException {
initializeReading();
Object value = this.map.get(name);
if (value == null) {
@@ -506,15 +536,17 @@
}
throw new BlazeMessageFormatException(" cannot read a short from " + value.getClass().getName());
}
-
+
/**
* Returns the <CODE>int</CODE> value with the specified name.
*
- * @param name the name of the <CODE>int</CODE>
+ * @param name
+ * the name of the <CODE>int</CODE>
* @return the <CODE>int</CODE> value with the specified name
- * @throws BlazeMessageFormatException if this type conversion is invalid.
+ * @throws BlazeMessageFormatException
+ * if this type conversion is invalid.
*/
- public int getIntValue(String name) throws BlazeMessageFormatException{
+ public int getIntValue(String name) throws BlazeMessageFormatException {
initializeReading();
Object value = this.map.get(name);
if (value == null) {
@@ -534,15 +566,17 @@
}
throw new BlazeMessageFormatException(" cannot read an int from " + value.getClass().getName());
}
-
+
/**
* Returns the <CODE>long</CODE> value with the specified name.
*
- * @param name the name of the <CODE>long</CODE>
+ * @param name
+ * the name of the <CODE>long</CODE>
* @return the <CODE>long</CODE> value with the specified name
- * @throws BlazeMessageFormatException if this type conversion is invalid.
+ * @throws BlazeMessageFormatException
+ * if this type conversion is invalid.
*/
- public long getLongValue(String name) throws BlazeMessageFormatException{
+ public long getLongValue(String name) throws BlazeMessageFormatException {
initializeReading();
Object value = this.map.get(name);
if (value == null) {
@@ -565,15 +599,17 @@
}
throw new BlazeMessageFormatException(" cannot read a long from " + value.getClass().getName());
}
-
+
/**
* Returns the <CODE>float</CODE> value with the specified name.
*
- * @param name the name of the <CODE>float</CODE>
+ * @param name
+ * the name of the <CODE>float</CODE>
* @return the <CODE>float</CODE> value with the specified name
- * @throws BlazeMessageFormatException if this type conversion is invalid.
+ * @throws BlazeMessageFormatException
+ * if this type conversion is invalid.
*/
- public float getFloatValue(String name) throws BlazeMessageFormatException{
+ public float getFloatValue(String name) throws BlazeMessageFormatException {
initializeReading();
Object value = this.map.get(name);
if (value == null) {
@@ -587,15 +623,17 @@
}
throw new BlazeMessageFormatException(" cannot read a float from " + value.getClass().getName());
}
-
+
/**
* Returns the <CODE>double</CODE> value with the specified name.
*
- * @param name the name of the <CODE>double</CODE>
+ * @param name
+ * the name of the <CODE>double</CODE>
* @return the <CODE>double</CODE> value with the specified name
- * @throws BlazeMessageFormatException if this type conversion is invalid.
+ * @throws BlazeMessageFormatException
+ * if this type conversion is invalid.
*/
- public double getDoubleValue(String name) throws BlazeMessageFormatException{
+ public double getDoubleValue(String name) throws BlazeMessageFormatException {
initializeReading();
Object value = this.map.get(name);
if (value == null) {
@@ -612,16 +650,18 @@
}
throw new BlazeMessageFormatException(" cannot read a double from " + value.getClass().getName());
}
-
+
/**
* Returns the <CODE>String</CODE> value with the specified name.
*
- * @param name the name of the <CODE>String</CODE>
- * @return the <CODE>String</CODE> value with the specified name; if there is no item by this name, a null value
- * is returned
- * @throws BlazeMessageFormatException if this type conversion is invalid.
+ * @param name
+ * the name of the <CODE>String</CODE>
+ * @return the <CODE>String</CODE> value with the specified name; if there
+ * is no item by this name, a null value is returned
+ * @throws BlazeMessageFormatException
+ * if this type conversion is invalid.
*/
- public String getStringValue(String name) throws BlazeMessageFormatException{
+ public String getStringValue(String name) throws BlazeMessageFormatException {
initializeReading();
Object value = this.map.get(name);
if (value == null) {
@@ -632,15 +672,18 @@
}
return value.toString();
}
-
+
/**
* Returns the byte array value with the specified name.
*
- * @param name the name of the byte array
- * @return the byte array value with the specified name; if there is no item by this name, a null value is returned.
- * @throws BlazeMessageFormatException if this type conversion is invalid.
+ * @param name
+ * the name of the byte array
+ * @return the byte array value with the specified name; if there is no item
+ * by this name, a null value is returned.
+ * @throws BlazeMessageFormatException
+ * if this type conversion is invalid.
*/
- public byte[] getBytesValue(String name) throws BlazeMessageFormatException{
+ public byte[] getBytesValue(String name) throws BlazeMessageFormatException {
initializeReading();
Object value = this.map.get(name);
if (value instanceof byte[]) {
@@ -648,15 +691,18 @@
}
throw new BlazeMessageFormatException(" cannot read a byte[] from " + value.getClass().getName());
}
-
+
/**
* Returns a Buffer with the specified name.
*
- * @param name the name of the byte array
- * @return the byte array value with the specified name; if there is no item by this name, a null value is returned.
- * @throws BlazeMessageFormatException if this type conversion is invalid.
+ * @param name
+ * the name of the byte array
+ * @return the byte array value with the specified name; if there is no item
+ * by this name, a null value is returned.
+ * @throws BlazeMessageFormatException
+ * if this type conversion is invalid.
*/
- public Buffer getBufferValue(String name) throws BlazeMessageFormatException{
+ public Buffer getBufferValue(String name) throws BlazeMessageFormatException {
initializeReading();
Object value = this.map.get(name);
if (value instanceof Buffer) {
@@ -664,44 +710,50 @@
}
throw new BlazeMessageFormatException(" cannot read a Buffer from " + value.getClass().getName());
}
-
+
/**
* Returns the value of the object with the specified name.
* <P>
- * This method can be used to return, in objectified format, an object in the Java programming language ("Java
- * object") that had been stored in the Map with the equivalent <CODE>setObject</CODE> method call, or its
+ * This method can be used to return, in objectified format, an object in
+ * the Java programming language ("Java object") that had been stored in the
+ * Map with the equivalent <CODE>setObject</CODE> method call, or its
* equivalent primitive <CODE>set <I>type </I></CODE> method.
* <P>
- * Note that byte values are returned as <CODE>byte[]</CODE>, not <CODE>Byte[]</CODE>.
+ * Note that byte values are returned as <CODE>byte[]</CODE>, not
+ * <CODE>Byte[]</CODE>.
*
- * @param name the name of the Java object
- * @return a copy of the Java object value with the specified name, in objectified format (for example, if the
- * object was set as an <CODE>int</CODE>, an <CODE>Integer</CODE> is returned); if there is no item by
- * this name, a null value is returned
+ * @param name
+ * the name of the Java object
+ * @return a copy of the Java object value with the specified name, in
+ * objectified format (for example, if the object was set as an
+ * <CODE>int</CODE>, an <CODE>Integer</CODE> is returned); if there
+ * is no item by this name, a null value is returned
*/
- public Object getObjectValue(String name){
+ public Object getObjectValue(String name) {
initializeReading();
return this.map.get(name);
}
-
+
/**
- * Returns an <CODE>Enumeration</CODE> of all the names in the <CODE>BlazeMessage</CODE> object.
+ * Returns an <CODE>Enumeration</CODE> of all the names in the
+ * <CODE>BlazeMessage</CODE> object.
*
* @return an enumeration of all the names in this <CODE>BlazeMessage</CODE>
*/
- public Enumeration<String> getNames(){
+ public Enumeration<String> getNames() {
initializeReading();
return Collections.enumeration(this.map.keySet());
}
-
+
/**
* put a key,value pair into the message
*
* @param name
- * @param value must be a supported primitive, or map of supported primitives
+ * @param value
+ * must be a supported primitive, or map of supported primitives
* @return the previous value associated with the key
*/
- public Object put(String name,Object value){
+ public Object put(String name, Object value) {
initializeWriting();
if (name == null) {
throw new IllegalArgumentException("The name of the property cannot be null.");
@@ -712,115 +764,137 @@
checkValidObject(value);
return this.map.put(name, value);
}
-
+
/**
* Sets a <CODE>boolean</CODE> value with the specified name into the Map.
*
- * @param name the name of the <CODE>boolean</CODE>
- * @param value the <CODE>boolean</CODE> value to set in the Map
+ * @param name
+ * the name of the <CODE>boolean</CODE>
+ * @param value
+ * the <CODE>boolean</CODE> value to set in the Map
*/
- public void setBooleanValue(String name,boolean value){
+ public void setBooleanValue(String name, boolean value) {
initializeWriting();
put(name, value ? Boolean.TRUE : Boolean.FALSE);
}
-
+
/**
* Sets a <CODE>byte</CODE> value with the specified name into the Map.
*
- * @param name the name of the <CODE>byte</CODE>
- * @param value the <CODE>byte</CODE> value to set in the Map
+ * @param name
+ * the name of the <CODE>byte</CODE>
+ * @param value
+ * the <CODE>byte</CODE> value to set in the Map
*/
- public void setByteValue(String name,byte value){
+ public void setByteValue(String name, byte value) {
initializeWriting();
put(name, Byte.valueOf(value));
}
-
+
/**
* Sets a <CODE>short</CODE> value with the specified name into the Map.
*
- * @param name the name of the <CODE>short</CODE>
- * @param value the <CODE>short</CODE> value to set in the Map
+ * @param name
+ * the name of the <CODE>short</CODE>
+ * @param value
+ * the <CODE>short</CODE> value to set in the Map
*/
- public void setShortValue(String name,short value){
+ public void setShortValue(String name, short value) {
initializeWriting();
put(name, Short.valueOf(value));
}
-
+
/**
* Sets a Unicode character value with the specified name into the Map.
*
- * @param name the name of the Unicode character
- * @param value the Unicode character value to set in the Map
+ * @param name
+ * the name of the Unicode character
+ * @param value
+ * the Unicode character value to set in the Map
*/
- public void setCharValue(String name,char value){
+ public void setCharValue(String name, char value) {
initializeWriting();
put(name, Character.valueOf(value));
}
-
+
/**
* Sets an <CODE>int</CODE> value with the specified name into the Map.
*
- * @param name the name of the <CODE>int</CODE>
- * @param value the <CODE>int</CODE> value to set in the Map
+ * @param name
+ * the name of the <CODE>int</CODE>
+ * @param value
+ * the <CODE>int</CODE> value to set in the Map
*/
- public void setIntValue(String name,int value){
+ public void setIntValue(String name, int value) {
initializeWriting();
put(name, Integer.valueOf(value));
}
-
+
/**
* Sets a <CODE>long</CODE> value with the specified name into the Map.
*
- * @param name the name of the <CODE>long</CODE>
- * @param value the <CODE>long</CODE> value to set in the Map
+ * @param name
+ * the name of the <CODE>long</CODE>
+ * @param value
+ * the <CODE>long</CODE> value to set in the Map
*/
- public void setLongValue(String name,long value){
+ public void setLongValue(String name, long value) {
initializeWriting();
put(name, Long.valueOf(value));
}
-
+
/**
* Sets a <CODE>float</CODE> value with the specified name into the Map.
*
- * @param name the name of the <CODE>float</CODE>
- * @param value the <CODE>float</CODE> value to set in the Map
+ * @param name
+ * the name of the <CODE>float</CODE>
+ * @param value
+ * the <CODE>float</CODE> value to set in the Map
*/
- public void setFloatValue(String name,float value){
+ public void setFloatValue(String name, float value) {
initializeWriting();
put(name, new Float(value));
}
-
+
/**
* Sets a <CODE>double</CODE> value with the specified name into the Map.
*
- * @param name the name of the <CODE>double</CODE>
- * @param value the <CODE>double</CODE> value to set in the Map
+ * @param name
+ * the name of the <CODE>double</CODE>
+ * @param value
+ * the <CODE>double</CODE> value to set in the Map
*/
- public void setDoubleValue(String name,double value){
+ public void setDoubleValue(String name, double value) {
initializeWriting();
put(name, new Double(value));
}
-
+
/**
* Sets a <CODE>String</CODE> value with the specified name into the Map.
*
- * @param name the name of the <CODE>String</CODE>
- * @param value the <CODE>String</CODE> value to set in the Map
+ * @param name
+ * the name of the <CODE>String</CODE>
+ * @param value
+ * the <CODE>String</CODE> value to set in the Map
*/
- public void setStringValue(String name,String value){
+ public void setStringValue(String name, String value) {
initializeWriting();
put(name, value);
}
-
+
/**
* Sets a byte array value with the specified name into the Map.
*
- * @param name the name of the byte array
- * @param value the byte array value to set in the Map; the array is copied so that the value for <CODE>name </CODE>
- * will not be altered by future modifications
- * @throws NullPointerException if the name is null, or if the name is an empty string.
+ * @param name
+ * the name of the byte array
+ * @param value
+ * the byte array value to set in the Map; the array is copied so
+ * that the value for <CODE>name </CODE> will not be altered by
+ * future modifications
+ * @throws NullPointerException
+ * if the name is null, or if the name is an empty string.
*/
- public void setBytesValue(String name,byte[] value){
+ public void setBytesValue(String name, byte[] value) {
initializeWriting();
if (value != null) {
put(name, value);
@@ -828,15 +902,18 @@
this.map.remove(name);
}
}
-
+
/**
* Sets a Buffer value with the specified name into the Map.
*
- * @param name the name of the byte array
- * @param value the Buffer value to set in the Map
- * @throws NullPointerException if the name is null, or if the name is an empty string.
+ * @param name
+ * the name of the byte array
+ * @param value
+ * the Buffer value to set in the Map
+ * @throws NullPointerException
+ * if the name is null, or if the name is an empty string.
*/
- public void setBufferValue(String name,Buffer value){
+ public void setBufferValue(String name, Buffer value) {
initializeWriting();
if (value != null) {
put(name, value);
@@ -844,22 +921,27 @@
this.map.remove(name);
}
}
-
+
/**
- * Sets a portion of the byte array value with the specified name into the Map.
+ * Sets a portion of the byte array value with the specified name into the
+ * Map.
*
- * @param name the name of the byte array
- * @param value the byte array value to set in the Map
- * @param offset the initial offset within the byte array
- * @param length the number of bytes to use
+ * @param name
+ * the name of the byte array
+ * @param value
+ * the byte array value to set in the Map
+ * @param offset
+ * the initial offset within the byte array
+ * @param length
+ * the number of bytes to use
*/
- public void setBytesValue(String name,byte[] value,int offset,int length){
+ public void setBytesValue(String name, byte[] value, int offset, int length) {
initializeWriting();
byte[] data = new byte[length];
System.arraycopy(value, offset, data, 0, length);
put(name, data);
}
-
+
/**
* Find out if the message contains a key This isn't recursive
*
@@ -867,11 +949,11 @@
* @return true if the message contains the key
*
*/
- public boolean containsKey(Object key){
+ public boolean containsKey(Object key) {
initializeReading();
return this.map.containsKey(key.toString());
}
-
+
/**
* Find out if the message contains a value
*
@@ -879,60 +961,61 @@
* @return true if the value exists
*
*/
- public boolean containsValue(Object value){
+ public boolean containsValue(Object value) {
initializeReading();
return this.map.containsValue(value);
}
-
+
/**
* @return a set of Map.Entry values
*
*/
- public Set<java.util.Map.Entry<String, Object>> entrySet(){
+ public Set<java.util.Map.Entry<String, Object>> entrySet() {
initializeReading();
return this.map.entrySet();
}
-
+
/**
* Retrieve the object associated with the key
*
* @param key
* @return the object
*/
- public Object get(Object key){
+ public Object get(Object key) {
initializeReading();
return getObjectValue(key.toString());
}
-
+
/**
* @return true if the message is empty
*
*/
- public boolean isEmpty(){
+ public boolean isEmpty() {
initializeReading();
return this.map.isEmpty();
}
-
+
/**
* @return a Set of all the keys
*/
- public Set<String> keySet(){
+ public Set<String> keySet() {
initializeReading();
return this.map.keySet();
}
-
+
/**
* Add all entries in a Map to the message
*
- * @param t the map
+ * @param t
+ * the map
*
*/
- public void putAll(Map<? extends String, ? extends Object> t){
+ public void putAll(Map<? extends String, ? extends Object> t) {
for (Map.Entry<? extends String, ? extends Object> entry : t.entrySet()) {
put(entry.getKey(), entry.getValue());
}
}
-
+
/**
* Remove a key/value pair from the message
*
@@ -940,46 +1023,46 @@
* @return the value removed or null
*
*/
- public Object remove(Object key){
+ public Object remove(Object key) {
setContent(null);
return this.map.remove(key.toString());
}
-
+
/**
* @return the number of entries in the message
*/
- public int size(){
+ public int size() {
initializeReading();
return this.map.size();
}
-
+
/**
* @return a Collection of the values in the message
*/
- public Collection<Object> values(){
+ public Collection<Object> values() {
initializeReading();
return this.map.values();
}
-
+
/**
* check if a named value exists in the message
*
* @param name
* @return true if value exits
*/
- public boolean valueExists(String name){
+ public boolean valueExists(String name) {
return this.map.containsKey(name);
}
-
- protected void initializeReading(){
+
+ protected void initializeReading() {
loadContent();
}
-
- protected void initializeWriting(){
+
+ protected void initializeWriting() {
setContent(null);
}
-
- protected void checkValidObject(Object value) throws IllegalArgumentException{
+
+ protected void checkValidObject(Object value) throws IllegalArgumentException {
boolean valid = value instanceof Boolean || value instanceof Byte || value instanceof Short
|| value instanceof Integer || value instanceof Long;
valid = valid || value instanceof Float || value instanceof Double || value instanceof Character
@@ -995,37 +1078,37 @@
throw new IllegalArgumentException("Not a valid message value: " + value);
}
}
-
+
/**
* @return pretty print
* @see java.lang.Object#toString()
*/
- public String toString(){
+ public String toString() {
return super.toString() + "MQBlazeMessage{ " + "map = " + this.map + " }";
}
-
- protected void copy(BlazeMessage copy) throws BlazeException{
+
+ protected void copy(BlazeMessage copy) throws BlazeException {
storeContent();
copy.content = this.content;
}
-
+
/**
* @return the content data
*/
- public BlazeData getContent(){
+ public BlazeData getContent() {
return this.content;
}
-
+
/**
* Set the content data
*
* @param content
*/
- public void setContent(BlazeData content){
+ public void setContent(BlazeData content) {
this.content = content;
}
-
- protected void marshallMap(MapDataBean mapData,String name,Object value) throws BlazeRuntimeException{
+
+ protected void marshallMap(MapDataBean mapData, String name, Object value) throws BlazeRuntimeException {
if (value != null) {
if (value.getClass() == Boolean.class) {
BoolTypeBean type = new BoolTypeBean();
@@ -1094,8 +1177,8 @@
}
}
}
-
- protected Map<String, Object> unmarshall(MapData mapData){
+
+ protected Map<String, Object> unmarshall(MapData mapData) {
Map<String, Object> result = new ConcurrentHashMap<String, Object>();
if (mapData.hasBoolType()) {
for (BoolType type : mapData.getBoolTypeList()) {
@@ -1160,71 +1243,85 @@
}
return result;
}
-
+
/**
* Store content into a BlazeData object for serialization
*/
- public void storeContent(){
+ public void storeContent() {
if (getContent() == null) {
BlazeDataBean bd = new BlazeDataBean();
- MapDataBean mapData = new MapDataBean();
- for (Map.Entry<String, Object> entry : this.map.entrySet()) {
- marshallMap(mapData, entry.getKey().toString(), entry.getValue());
+ if (!this.map.isEmpty()) {
+ MapDataBean mapData = new MapDataBean();
+ for (Map.Entry<String, Object> entry : this.map.entrySet()) {
+ marshallMap(mapData, entry.getKey().toString(), entry.getValue());
+ }
+ bd.setMapData(mapData);
}
- bd.setMapData(mapData);
if (this.replyTo != null) {
bd.setReplyToData(this.replyTo.getData());
}
if (this.messageId != null) {
- bd.setMessageId(new Buffer(this.messageId));
+ bd.setMessageId(new UTF8Buffer(this.messageId));
}
if (this.correlationId != null) {
- bd.setCorrelationId(new Buffer(this.correlationId));
+ bd.setCorrelationId(new UTF8Buffer(this.correlationId));
}
if (this.fromId != null) {
- bd.setFromId(new Buffer(this.fromId));
+ bd.setFromId(new UTF8Buffer(this.fromId));
}
if (this.messageType != null) {
- bd.setMessageType(new Buffer(this.messageType));
+ bd.setMessageType(new UTF8Buffer(this.messageType));
+ }
+ if (this.timeStamp > 0) {
+ bd.setTimestamp(this.timeStamp);
+ }
+ if (this.expiration > 0) {
+ bd.setExpiration(this.expiration);
+ }
+ if (this.redeliveryCounter > 0) {
+ bd.setRedeliveryCounter(this.redeliveryCounter);
+ }
+ if (this.priority > 0) {
+ bd.setPriority(this.priority);
+ }
+ if (this.persistent) {
+ bd.setPersistent(this.persistent);
}
- bd.setTimestamp(this.timeStamp);
- bd.setExpiration(this.expiration);
- bd.setRedeliveryCounter(this.redeliveryCounter);
- bd.setPriority(this.priority);
- bd.setPersistent(this.persistent);
this.content = bd;
}
}
-
+
/**
* Builds the message body from data
*
*/
- protected void loadContent() throws BlazeRuntimeException{
- BlazeData data = getContent();
- if (data != null && this.map.isEmpty()) {
- this.map = unmarshall(data.getMapData());
- if (data.hasReplyToData()) {
- this.replyTo = new Destination(data.getReplyToData());
- }
- if (data.hasFromId()) {
- this.fromId = data.getFromId().toStringUtf8();
- }
- if (data.hasMessageId()) {
- this.messageId = data.getMessageId().toStringUtf8();
- }
- if (data.hasCorrelationId()) {
- this.correlationId = data.getCorrelationId().toStringUtf8();
- }
- if (data.hasMessageType()) {
- this.messageType = data.getMessageType().toStringUtf8();
+ protected void loadContent() throws BlazeRuntimeException {
+ if (!this.loaded) {
+ this.loaded = true;
+ BlazeData data = getContent();
+ if (data != null && this.map.isEmpty()) {
+ this.map = unmarshall(data.getMapData());
+ if (data.hasReplyToData()) {
+ this.replyTo = new Destination(data.getReplyToData());
+ }
+ if (data.hasFromId()) {
+ this.fromId = data.getFromId().toStringUtf8();
+ }
+ if (data.hasMessageId()) {
+ this.messageId = data.getMessageId().toStringUtf8();
+ }
+ if (data.hasCorrelationId()) {
+ this.correlationId = data.getCorrelationId().toStringUtf8();
+ }
+ if (data.hasMessageType()) {
+ this.messageType = data.getMessageType().toStringUtf8();
+ }
+ this.timeStamp = data.getTimestamp();
+ this.expiration = data.getExpiration();
+ this.redeliveryCounter = data.getRedeliveryCounter();
+ this.priority = data.getPriority();
+ this.persistent = data.getPersistent();
}
- this.timeStamp = data.getTimestamp();
- this.expiration = data.getExpiration();
- this.redeliveryCounter = data.getRedeliveryCounter();
- this.priority = data.getPriority();
- this.persistent = data.getPersistent();
}
}
-
}
\ No newline at end of file
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessageProcessor.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessageProcessor.java?rev=753178&r1=753177&r2=753178&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessageProcessor.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessageProcessor.java Fri Mar 13 09:04:23 2009
@@ -28,7 +28,8 @@
* Process a PacketData of that is a BlazeMessage type
* @param data
* @return the built BlazeMessage
+ * @throws Exception
*/
- BlazeMessage processBlazeMessage(PacketData data);
+ BlazeMessage processBlazeMessage(PacketData data) throws Exception;
}
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java?rev=753178&r1=753177&r2=753178&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java Fri Mar 13 09:04:23 2009
@@ -459,16 +459,11 @@
if (member != null) {
SendRequest<PacketDataBuffer> request = new SendRequest<PacketDataBuffer>();
Destination dest = new Destination(destinationName, false);
- message.setDestination(dest);
- message.storeContent();
- BlazeDataBuffer blazeData = message.getContent().freeze();
- PacketDataBean packetData = getPacketData(MessageType.BLAZE_DATA, blazeData);
- packetData.setDestinationData(dest.getData());
- packetData.setPayloadType(message.getType());
+ Packet packet = buildPacket(dest, message);
synchronized (this.messageRequests) {
- this.messageRequests.put(packetData.getMessageId(), request);
+ this.messageRequests.put(packet.getPacketData().getMessageId(), request);
}
- Packet packet = new Packet(packetData.freeze());
+
packet.setTo((member).getAddress());
this.unicast.downStream(packet);
PacketDataBuffer response = request.get(timeout);
@@ -486,32 +481,15 @@
* org.apache.activeblaze.BlazeMessage, java.lang.String)
*/
public void sendReply(Member to,BlazeMessage response,String correlationId) throws Exception{
- response.storeContent();
- Destination dest = response.getDestination();
- BlazeDataBuffer blazeData = response.getContent().freeze();
- PacketDataBean data = getPacketData(MessageType.BLAZE_DATA, blazeData);
- data.setCorrelationId(new Buffer(correlationId));
- if (dest != null) {
- data.setDestinationData(dest.getData());
- }
- data.setPayloadType(response.getType());
- data.setReliable(true);
- Packet packet = new Packet(data.freeze());
+ Destination dest = new Destination(to.getInBoxDestination(), false);
+ Packet packet = buildPacket(dest,response,correlationId);
packet.setTo(((MemberImpl) to).getAddress());
this.unicast.downStream(packet);
}
protected void send(MemberImpl member,Buffer destinationName,BlazeMessage message) throws Exception{
Destination dest = new Destination(destinationName, false);
- message.setDestination(dest);
- message.storeContent();
-
- PacketDataBean data = getPacketData(MessageType.BLAZE_DATA, message.getContent().freeze());
- data.setReliable(true);
- data.setResponseRequired(true);
- data.setDestinationData(dest.getData());
- data.setPayloadType(message.getType());
- Packet packet = new Packet(data.freeze());
+ Packet packet = buildPacket(dest, message,true);
packet.setTo(member.getAddress());
this.unicast.downStream(packet);
}
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/simple/SimpleReliableProcessor.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/simple/SimpleReliableProcessor.java?rev=753178&r1=753177&r2=753178&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/simple/SimpleReliableProcessor.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/simple/SimpleReliableProcessor.java Fri Mar 13 09:04:23 2009
@@ -17,21 +17,63 @@
package org.apache.activeblaze.impl.reliable.simple;
import org.apache.activeblaze.impl.processor.DefaultChainedProcessor;
-
+import org.apache.activeblaze.impl.processor.Packet;
/**
* Very basic (none) reliability
- *
+ *
*/
-public class SimpleReliableProcessor extends DefaultChainedProcessor{
-
- private SimpleFlow simpleFlow;
-
- /**
+public class SimpleReliableProcessor extends DefaultChainedProcessor {
+ int maxWindowSize = 64 * 1024;
+ int windowSize = 0;
+ int pauseTime = 0;
+
+ /**
* Constructor
*/
public SimpleReliableProcessor() {
- this.simpleFlow=new SimpleFlow();
- //setEnd(this.simpleFlow);
- }
-
+ }
+
+ /**
+ * @param p
+ * @throws Exception
+ * @see org.apache.activeblaze.impl.processor.DefaultChainedProcessor#downStream(org.apache.activeblaze.impl.processor.Packet)
+ */
+ public void downStream(Packet p) throws Exception {
+ this.windowSize += p.getPacketData().serializedSizeFramed();
+ if (this.windowSize >= this.maxWindowSize) {
+ Thread.sleep(this.pauseTime);
+ this.windowSize = 0;
+ }
+ super.downStream(p);
+ }
+
+ /**
+ * @return the maxWindowSize
+ */
+ public int getMaxWindowSize() {
+ return this.maxWindowSize;
+ }
+
+ /**
+ * @param maxWindowSize
+ * the maxWindowSize to set
+ */
+ public void setMaxWindowSize(int maxWindowSize) {
+ this.maxWindowSize = maxWindowSize;
+ }
+
+ /**
+ * @return the pauseTime
+ */
+ public int getPauseTime() {
+ return this.pauseTime;
+ }
+
+ /**
+ * @param pauseTime
+ * the pauseTime to set
+ */
+ public void setPauseTime(int pauseTime) {
+ this.pauseTime = pauseTime;
+ }
}
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnection.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnection.java?rev=753178&r1=753177&r2=753178&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnection.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnection.java Fri Mar 13 09:04:23 2009
@@ -18,7 +18,6 @@
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
-
import javax.jms.Connection;
import javax.jms.ConnectionConsumer;
import javax.jms.ConnectionMetaData;
@@ -34,20 +33,24 @@
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicSession;
-
import org.apache.activeblaze.BlazeMessageListener;
import org.apache.activeblaze.BlazeMessageProcessor;
import org.apache.activeblaze.Subscription;
import org.apache.activeblaze.group.BlazeGroupChannel;
import org.apache.activeblaze.jms.message.BlazeJmsMessage;
+import org.apache.activeblaze.jms.message.BlazeJmsMessageTransformation;
import org.apache.activeblaze.util.IdGenerator;
+import org.apache.activeblaze.wire.DestinationData;
import org.apache.activeblaze.wire.PacketData;
+import org.apache.activeblaze.wire.BlazeData.BlazeDataBuffer;
+import org.apache.activemq.protobuf.Buffer;
+import org.apache.activemq.protobuf.InvalidProtocolBufferException;
/**
* Implementation of a JMS Connection
*
*/
public class BlazeJmsConnection implements Connection, TopicConnection, QueueConnection,
- org.apache.activeblaze.ExceptionListener,BlazeMessageProcessor{
+ org.apache.activeblaze.ExceptionListener, BlazeMessageProcessor {
protected final BlazeGroupChannel channel;
protected final IdGenerator tempDestinationGenerator = new IdGenerator("");
private String clientId;
@@ -60,7 +63,7 @@
protected BlazeJmsConnection(BlazeGroupChannel channel) {
this.channel = channel;
this.channel.setExceptionListener(this);
- this.clientId = channel.getName();
+ this.clientId = channel.getName();
this.channel.setBlazeMessageProcessor(this);
}
@@ -333,9 +336,9 @@
protected void removeMesssageDispatcher(BlazeMessageListener consumer, Subscription s) throws JMSException {
try {
if (s.isTopic()) {
- this.channel.removeBlazeTopicMessageListener(s,consumer);
+ this.channel.removeBlazeTopicMessageListener(s, consumer);
} else {
- this.channel.removeBlazeQueueMessageListener(s,consumer);
+ this.channel.removeBlazeQueueMessageListener(s, consumer);
}
} catch (Exception e) {
throw BlazeJmsExceptionSupport.create(e);
@@ -362,28 +365,34 @@
}
/**
- * @param data
+ * @param data
* @return a BlazeMessage
+ * @throws Exception
*
*/
- public BlazeJmsMessage processBlazeMessage(PacketData data){
+ public BlazeJmsMessage processBlazeMessage(PacketData data) throws Exception {
BlazeJmsMessage result = null;
- /*
- int type = message.getType();
- if (type == BlazeJmsMessage.JmsMessageType.BYTES.ordinal()) {
- result = new BlazeJmsBytesMessage();
- } else if (type == BlazeJmsMessage.JmsMessageType.MAP.ordinal()) {
- result = new BlazeJmsMapMessage();
- } else if (type == BlazeJmsMessage.JmsMessageType.OBJECT.ordinal()) {
- result = new BlazeJmsObjectMessage();
- } else if (type == BlazeJmsMessage.JmsMessageType.STREAM.ordinal()) {
- result = new BlazeJmsStreamMessage();
- } else if (type == BlazeJmsMessage.JmsMessageType.TEXT.ordinal()) {
- result = new BlazeJmsTextMessage();
- } else {
- result = new BlazeJmsMessage();
+ if (data != null) {
+ DestinationData destination = data.getDestinationData();
+ Buffer payload = data.getPayload();
+ BlazeDataBuffer blazeData = BlazeDataBuffer.parseUnframed(payload);
+ String fromId = null;
+ if (data.hasProducerId()) {
+ fromId = data.getProducerId().toStringUtf8();
+ }
+ result = BlazeJmsMessageTransformation.createMessage(data.getPayloadType());
+ result.setDestination(destination);
+ result.setFromId(fromId);
+ if (data.hasMessageId()) {
+ result.setMessageId(data.getMessageId().toStringUtf8());
+ }
+ if (data.hasCorrelationId()) {
+ result.setCorrelationId(data.getCorrelationId().toStringUtf8());
+ }
+ result.setTimeStamp(blazeData.getTimestamp());
+ result.setType(data.getPayloadType());
+ result.setContent(blazeData);
}
- */
return result;
}
}
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsMessageTransformation.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsMessageTransformation.java?rev=753178&r1=753177&r2=753178&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsMessageTransformation.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsMessageTransformation.java Fri Mar 13 09:04:23 2009
@@ -17,7 +17,6 @@
package org.apache.activeblaze.jms.message;
import java.util.Enumeration;
-
import javax.jms.BytesMessage;
import javax.jms.Destination;
import javax.jms.JMSException;
@@ -27,34 +26,33 @@
import javax.jms.ObjectMessage;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
-
import org.apache.activeblaze.BlazeMessage;
import org.apache.activeblaze.jms.BlazeJmsDestination;
-
/**
- * A helper class for converting normal JMS interfaces into ActiveMQ specific ones.
+ * A helper class for converting normal JMS interfaces into ActiveMQ specific
+ * ones.
*
* @version $Revision: 1.1 $
*/
-public final class BlazeJmsMessageTransformation{
+public final class BlazeJmsMessageTransformation {
private BlazeJmsMessageTransformation() {
}
-
+
/**
* @param dest
* @return a BlazeJmsDestination
* @throws JMSException
*/
- private static BlazeJmsDestination transformDestination(Destination dest) throws JMSException{
+ private static BlazeJmsDestination transformDestination(Destination dest) throws JMSException {
return BlazeJmsDestination.transform(dest);
}
-
+
/**
* @param message
* @return a BlazeJmsMessage
* @throws JMSException
*/
- public static BlazeJmsMessage transformMessage(BlazeMessage message) throws JMSException{
+ public static BlazeJmsMessage transformMessage(BlazeMessage message) throws JMSException {
BlazeJmsMessage result = null;
if (message instanceof BlazeJmsMessage) {
result = (BlazeJmsMessage) message;
@@ -77,13 +75,13 @@
}
return result;
}
-
+
/**
* @param message
* @return a BlazeJmsDestination
* @throws JMSException
*/
- public static BlazeJmsMessage transformMessage(Message message) throws JMSException{
+ public static BlazeJmsMessage transformMessage(Message message) throws JMSException {
if (message instanceof BlazeJmsMessage) {
return (BlazeJmsMessage) message;
}
@@ -143,15 +141,18 @@
copyProperties(message, transformedMessage);
return transformedMessage;
}
-
+
/**
- * Copies the standard JMS and user defined properties from the givem message to the specified message
+ * Copies the standard JMS and user defined properties from the givem
+ * message to the specified message
*
- * @param fromMessage the message to take the properties from
- * @param toMessage the message to add the properties to
+ * @param fromMessage
+ * the message to take the properties from
+ * @param toMessage
+ * the message to add the properties to
* @throws JMSException
*/
- public static void copyProperties(Message fromMessage,Message toMessage) throws JMSException{
+ public static void copyProperties(Message fromMessage, Message toMessage) throws JMSException {
toMessage.setJMSMessageID(fromMessage.getJMSMessageID());
toMessage.setJMSCorrelationID(fromMessage.getJMSCorrelationID());
toMessage.setJMSReplyTo(transformDestination(fromMessage.getJMSReplyTo()));
@@ -169,4 +170,27 @@
toMessage.setObjectProperty(name, obj);
}
}
+
+ /**
+ * @param type
+ * @return a BlazeJmsMessage
+ */
+ public static BlazeJmsMessage createMessage(int type) {
+ if (type == BlazeJmsMessage.JmsMessageType.BYTES.ordinal()) {
+ return new BlazeJmsBytesMessage();
+ }
+ if (type == BlazeJmsMessage.JmsMessageType.MAP.ordinal()) {
+ return new BlazeJmsMapMessage();
+ }
+ if (type == BlazeJmsMessage.JmsMessageType.OBJECT.ordinal()) {
+ return new BlazeJmsObjectMessage();
+ }
+ if (type == BlazeJmsMessage.JmsMessageType.STREAM.ordinal()) {
+ return new BlazeJmsStreamMessage();
+ }
+ if (type == BlazeJmsMessage.JmsMessageType.TEXT.ordinal()) {
+ return new BlazeJmsTextMessage();
+ }
+ return new BlazeJmsMessage();
+ }
}