You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2008/11/25 15:27:24 UTC

svn commit: r720505 [1/2] - in /activemq/activemq-blaze/trunk/src: main/java/org/apache/activeblaze/ main/java/org/apache/activeblaze/coordinated/ main/java/org/apache/activeblaze/group/ main/java/org/apache/activeblaze/impl/processor/ main/java/org/ap...

Author: rajdavies
Date: Tue Nov 25 06:27:23 2008
New Revision: 720505

URL: http://svn.apache.org/viewvc?rev=720505&view=rev
Log:
re-org some classes

Added:
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeNoRouteException.java   (with props)
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/GroupState.java   (with props)
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/AsyncGroupRequest.java
      - copied, changed from r719718, activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/AsyncGroupRequest.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/ClassLoadingAwareObjectInputStream.java   (with props)
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/LRUCache.java   (with props)
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/RequestCallback.java
      - copied, changed from r719718, activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/RequestCallback.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/SendRequest.java
      - copied, changed from r719718, activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/SendRequest.java
    activemq/activemq-blaze/trunk/src/test/resources/
    activemq/activemq-blaze/trunk/src/test/resources/log4j.properties
Removed:
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/AsyncGroupRequest.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/RequestCallback.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/SendRequest.java
Modified:
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeConfiguration.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeException.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessage.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeRuntimeException.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/CoordinatedGroup.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Group.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/ChainedProcessor.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/Packet.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/BaseTransport.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/MulticastTransport.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/UdpTransport.java
    activemq/activemq-blaze/trunk/src/main/proto/blaze.proto
    activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazeChannelTest.java
    activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/group/BlazeGroupChannelTest.java

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java?rev=720505&r1=720504&r2=720505&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java Tue Nov 25 06:27:23 2008
@@ -20,7 +20,6 @@
 import java.net.URI;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.activeblaze.impl.destination.DestinationMatch;
 import org.apache.activeblaze.impl.processor.ChainedProcessor;
@@ -54,8 +53,6 @@
     private Processor broadcast;
     private BlazeConfiguration configuration = new BlazeConfiguration();
     private String id;
-    private LinkedBlockingQueue<BlazeMessage> broadcastQueue;
-    private Thread broadcastQueueThread;
     private Buffer managementURI;
     private InetSocketAddress toAddress;
 
@@ -77,7 +74,7 @@
     /**
      * @param destination
      * @param l
-     * @throws Exception 
+     * @throws Exception
      * @see org.apache.activeblaze.BlazeChannel#addBlazeTopicMessageListener(java.lang.String,
      *      org.apache.activeblaze.BlazeTopicListener)
      */
@@ -90,7 +87,7 @@
      * @param destination
      * @param l
      * @return
-     * @throws Exception 
+     * @throws Exception
      * @see org.apache.activeblaze.BlazeChannel#removeBlazeMessageListener(java.lang.String,
      *      org.apache.activeblaze.BlazeTopicListener)
      */
@@ -102,9 +99,8 @@
     public boolean init() throws Exception {
         boolean result = super.init();
         if (result) {
-            this.broadcastQueue = new LinkedBlockingQueue<BlazeMessage>(getConfiguration().getMaxDispatchQueueSize());
             String broadcastURIStr = getConfiguration().getBroadcastURI();
-            broadcastURIStr=PropertyUtil.addPropertiesToURIFromBean(broadcastURIStr, getConfiguration());
+            broadcastURIStr = PropertyUtil.addPropertiesToURIFromBean(broadcastURIStr, getConfiguration());
             URI broadcastURI = new URI(broadcastURIStr);
             this.toAddress = new InetSocketAddress(broadcastURI.getHost(), broadcastURI.getPort());
             this.managementURI = new Buffer(new URI(getConfiguration().getManagementURI()).toString());
@@ -123,7 +119,6 @@
     protected Processor configureProcess(BaseTransport transport) throws Exception {
         int maxPacketSize = getConfiguration().getMaxPacketSize();
         configureTransport(transport);
-        
         CompressionProcessor result = new CompressionProcessor();
         result.setPrev(this);
         result.setExceptionListener(this);
@@ -147,18 +142,6 @@
     public boolean start() throws Exception {
         boolean result = super.start();
         if (result) {
-            if (getConfiguration().isUseDispatchThread()) {
-                Runnable runable = new Runnable() {
-                    public void run() {
-                        while (isStarted()) {
-                            dequeueBroadcastMessages();
-                        }
-                    }
-                };
-                this.broadcastQueueThread = new Thread(runable, getId() + "-BroadcastQueue");
-                this.broadcastQueueThread.setDaemon(true);
-                this.broadcastQueueThread.start();
-            }
             this.broadcast.start();
         }
         return result;
@@ -167,13 +150,6 @@
     public boolean stop() throws Exception {
         boolean result = super.stop();
         if (result) {
-            if (this.broadcastQueueThread != null) {
-                this.broadcastQueueThread.interrupt();
-                try {
-                    this.broadcastQueueThread.join(1000);
-                } catch (InterruptedException e) {
-                }
-            }
             this.broadcast.stop();
         }
         return result;
@@ -241,7 +217,7 @@
 
     protected void doProcessBlazeData(PacketData data) throws Exception {
         BlazeMessage message = buildBlazeMessage(data);
-        processBlazeMessage(message);
+        dispatch(message);
     }
 
     protected final BlazeMessage buildBlazeMessage(PacketData data) throws Exception {
@@ -274,28 +250,7 @@
         return new BlazeMessage();
     }
 
-    protected void processBlazeMessage(BlazeMessage message) {
-        if (this.broadcastQueueThread == null) {
-            dispatch(message);
-        } else {
-            try {
-                this.broadcastQueue.put(message);
-            } catch (InterruptedException e) {
-                // ignore - we are stopping
-            }
-        }
-    }
-
-    protected void dequeueBroadcastMessages() {
-        BlazeMessage message = null;
-        try {
-            message = this.broadcastQueue.take();
-        } catch (InterruptedException e1) {
-        }
-        dispatch(message);
-    }
-
-    protected void dispatch(BlazeMessage message) {
+    protected final void dispatch(BlazeMessage message) {
         if (message != null) {
             Buffer destination = message.getContent().getDestination();
             for (Map.Entry<Buffer, BlazeTopicListener> entry : this.topicessageListenerMap.entrySet()) {
@@ -305,9 +260,10 @@
             }
         }
     }
-    
+
     /**
      * shutdown on gc
+     * 
      * @throws Throwable
      * @see java.lang.Object#finalize()
      */

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeConfiguration.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeConfiguration.java?rev=720505&r1=720504&r2=720505&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeConfiguration.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeConfiguration.java Tue Nov 25 06:27:23 2008
@@ -33,8 +33,6 @@
     private String unicastURI = "udp://localhost:0";
     private String broadcastURI = "mcast://224.2.2.2:9999";
     private String managementURI = "mcast://224.2.2.2:8888";
-    // Channel internals
-    private boolean useDispatchThread = true;
     private int maxDispatchQueueSize = 10000;
     private int maxPacketSize = DEFAULT_MAX_PACKET_SIZE;
     //reliability
@@ -84,22 +82,6 @@
     public void setBroadcastURI(String broadcastURL) {
         this.broadcastURI = broadcastURL;
     }
-
-    /**
-     * @return the useDispatchThread
-     */
-    public boolean isUseDispatchThread() {
-        return this.useDispatchThread;
-    }
-
-    /**
-     * @param useDispatchThread
-     *            the useDispatchThread to set
-     */
-    public void setUseDispatchThread(boolean useDispatchThread) {
-        this.useDispatchThread = useDispatchThread;
-    }
-
     /**
      * @return the maxDispatchQueueSize
      */

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeException.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeException.java?rev=720505&r1=720504&r2=720505&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeException.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeException.java Tue Nov 25 06:27:23 2008
@@ -21,9 +21,6 @@
  *
  */
 public class BlazeException extends Exception  {
-    /**
-     * 
-     */
     private static final long serialVersionUID = 1064152356749288271L;
 
     /**

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessage.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessage.java?rev=720505&r1=720504&r2=720505&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessage.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessage.java Tue Nov 25 06:27:23 2008
@@ -16,6 +16,11 @@
  */
 package org.apache.activeblaze;
 
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectOutputStream;
 import java.security.Key;
 import java.util.Collection;
 import java.util.Collections;
@@ -23,8 +28,10 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import org.apache.activeblaze.util.ClassLoadingAwareObjectInputStream;
 import org.apache.activeblaze.wire.BlazeData;
 import org.apache.activeblaze.wire.BoolType;
+import org.apache.activeblaze.wire.BufferType;
 import org.apache.activeblaze.wire.ByteType;
 import org.apache.activeblaze.wire.BytesType;
 import org.apache.activeblaze.wire.CharType;
@@ -36,6 +43,8 @@
 import org.apache.activeblaze.wire.ShortType;
 import org.apache.activeblaze.wire.StringType;
 import org.apache.activemq.protobuf.Buffer;
+import org.apache.activemq.protobuf.BufferInputStream;
+import org.apache.activemq.protobuf.BufferOutputStream;
 
 
 /**
@@ -73,9 +82,10 @@
  * <code>NullPointerException</code>.
  * 
  */
-public class BlazeMessage implements Map<String, Object>{
+public class BlazeMessage implements Map<String, Object> {
     private static final String DEFAULT_TEXT_PAYLOAD = "DEFAULT_TEXT_PAYLOAD";
     private static final String DEFAULT_BYTES_PAYLOAD = "DEFAULT_BYTES_PAYLOAD";
+    private static final String DEFAULT_OBJECT_PAYLOAD = "DEFAULT_OBJECT_PAYLOAD";
     private transient Map<String, Object> map = new ConcurrentHashMap<String, Object>();
     private transient String destination;
     private transient String fromId;
@@ -83,88 +93,145 @@
     private transient String correlationId;
     private transient long timeStamp;
     private BlazeData content;
-    
+
     /**
      * Default Constructor
      */
     public BlazeMessage() {
     }
-    
+
     /**
      * Constructor - Utility to construct a message with a text <Code>String</Code> payload
+     * 
      * @param text
      */
     public BlazeMessage(String text) {
-        setString(DEFAULT_TEXT_PAYLOAD,text);
+        setString(DEFAULT_TEXT_PAYLOAD, text);
     }
-    
+
     /**
      * Constructor - Utility to construct a message with a byte[] array payload
+     * 
      * @param data
      */
     public BlazeMessage(byte[] data) {
-        setBytes(DEFAULT_BYTES_PAYLOAD,data);
+        setBytes(DEFAULT_BYTES_PAYLOAD, data);
     }
-    
+
+    /**
+     * Constructor - Utility to construct a message with an object payload
+     * 
+     * @param data
+     */
+    public BlazeMessage(Object data) {
+        setObject(data);
+    }
+
     /**
      * Utility method for setting a default <Code>String</Code> payload
+     * 
      * @param text
      */
     public void setText(String text) {
-        setString(DEFAULT_TEXT_PAYLOAD,text);
+        setString(DEFAULT_TEXT_PAYLOAD, text);
     }
-    
+
     /**
-     * Utility method used for when a BlazeMessage is only carrying a String
+     * Utility method used for when a BlazeMessage is only carrying a byte[] array
+     * 
      * @return text the default text
      * @throws Exception
      */
     public String getText() throws Exception {
         return getString(DEFAULT_TEXT_PAYLOAD);
     }
-    
+
     /**
-     * Utility method for setting a default <Code>String</Code> payload
-     * @param payload 
+     * Utility method for setting a default <Code>byte[]</Code> payload
+     * 
+     * @param payload
      */
     public void setBytes(byte[] payload) {
-        setBytes(DEFAULT_BYTES_PAYLOAD,payload);
+        setBytes(DEFAULT_BYTES_PAYLOAD, payload);
+    }
+
+    /**
+     * Utility method used for when a BlazeMessage is only carrying an Object
+     * 
+     * @return text the default text
+     * @throws Exception
+     */
+    public Object getObject()  throws Exception {
+        Object result = null;
+        Buffer buffer = getBuffer(DEFAULT_OBJECT_PAYLOAD);
+        InputStream is  = new BufferInputStream(buffer);
+        DataInputStream dataIn = new DataInputStream(is);
+        ClassLoadingAwareObjectInputStream objIn = new ClassLoadingAwareObjectInputStream(dataIn);
+        result = objIn.readObject();
+        return result;
     }
     
     /**
+     * Utility method for setting a default <Code>Object</Code> payload
+     * 
+     * @param payload
+     */
+   
+    public void setObject(Object payload) {
+        BufferOutputStream bufferOut = new BufferOutputStream(1024);
+        DataOutputStream dataOut = new DataOutputStream(bufferOut);
+        try {
+            ObjectOutputStream objOut = new ObjectOutputStream(dataOut);
+            objOut.writeObject(payload);
+            objOut.flush();
+            objOut.reset();
+            objOut.close();
+        } catch (IOException e) {
+            throw new BlazeRuntimeException(e);
+        }
+        put(DEFAULT_OBJECT_PAYLOAD, bufferOut.toBuffer());
+    }
+    
+    
+
+    /**
      * Utility method used for when a BlazeMessage is only carrying a String
+     * 
      * @return text the default text
      * @throws Exception
      */
     public byte[] getBytes() throws Exception {
         return getBytes(DEFAULT_BYTES_PAYLOAD);
     }
-    
+
     /**
-     * @param copy2 
+     * @param copy2
      * @return a copy of this message
      * @throws BlazeException
      */
-    public BlazeMessage copy() throws BlazeException{
+    public BlazeMessage copy() throws BlazeException {
         BlazeMessage copy = new BlazeMessage();
         copy(copy);
         return copy;
     }
-    
+
     /**
      * clear the contents of this message
      */
-    public void clear(){
+    public void clear() {
         this.map.clear();
     }
+
     /**
      * Returns the <CODE>boolean</CODE> value with the specified name.
      * 
-     * @param name the name of the <CODE>boolean</CODE>
+     * @param name
+     *            the name of the <CODE>boolean</CODE>
      * @return the <CODE>boolean</CODE> value with the specified name
-     * @throws BlazeMessageFormatException if this type conversion is invalid.
+     * @throws BlazeMessageFormatException
+     *             if this type conversion is invalid.
      */
-    public boolean getBoolean(String name) throws BlazeMessageFormatException{
+    public boolean getBoolean(String name) throws BlazeMessageFormatException {
         initializeReading();
         Object value = this.map.get(name);
         if (value == null) {
@@ -179,14 +246,17 @@
             throw new BlazeMessageFormatException(" cannot read a boolean from " + value.getClass().getName());
         }
     }
+
     /**
      * Returns the <CODE>byte</CODE> value with the specified name.
      * 
-     * @param name the name of the <CODE>byte</CODE>
+     * @param name
+     *            the name of the <CODE>byte</CODE>
      * @return the <CODE>byte</CODE> value with the specified name
-     * @throws BlazeMessageFormatException if this type conversion is invalid.
+     * @throws BlazeMessageFormatException
+     *             if this type conversion is invalid.
      */
-    public byte getByte(String name) throws BlazeMessageFormatException{
+    public byte getByte(String name) throws BlazeMessageFormatException {
         initializeReading();
         Object value = this.map.get(name);
         if (value == null) {
@@ -201,14 +271,17 @@
             throw new BlazeMessageFormatException(" cannot read a byte from " + value.getClass().getName());
         }
     }
+
     /**
      * Returns the <CODE>short</CODE> value with the specified name.
      * 
-     * @param name the name of the <CODE>short</CODE>
+     * @param name
+     *            the name of the <CODE>short</CODE>
      * @return the <CODE>short</CODE> value with the specified name
-     * @throws BlazeMessageFormatException if this type conversion is invalid.
+     * @throws BlazeMessageFormatException
+     *             if this type conversion is invalid.
      */
-    public short getShort(String name) throws BlazeMessageFormatException{
+    public short getShort(String name) throws BlazeMessageFormatException {
         initializeReading();
         Object value = this.map.get(name);
         if (value == null) {
@@ -226,14 +299,17 @@
             throw new BlazeMessageFormatException(" cannot read a short from " + value.getClass().getName());
         }
     }
+
     /**
      * Returns the Unicode character value with the specified name.
      * 
-     * @param name the name of the Unicode character
+     * @param name
+     *            the name of the Unicode character
      * @return the Unicode character value with the specified name
-     * @throws BlazeMessageFormatException if this type conversion is invalid.
+     * @throws BlazeMessageFormatException
+     *             if this type conversion is invalid.
      */
-    public char getChar(String name) throws BlazeMessageFormatException{
+    public char getChar(String name) throws BlazeMessageFormatException {
         initializeReading();
         Object value = this.map.get(name);
         if (value == null) {
@@ -245,14 +321,17 @@
             throw new BlazeMessageFormatException(" cannot read a short from " + value.getClass().getName());
         }
     }
+
     /**
      * Returns the <CODE>int</CODE> value with the specified name.
      * 
-     * @param name the name of the <CODE>int</CODE>
+     * @param name
+     *            the name of the <CODE>int</CODE>
      * @return the <CODE>int</CODE> value with the specified name
-     * @throws BlazeMessageFormatException if this type conversion is invalid.
+     * @throws BlazeMessageFormatException
+     *             if this type conversion is invalid.
      */
-    public int getInt(String name) throws BlazeMessageFormatException{
+    public int getInt(String name) throws BlazeMessageFormatException {
         initializeReading();
         Object value = this.map.get(name);
         if (value == null) {
@@ -273,14 +352,17 @@
             throw new BlazeMessageFormatException(" cannot read an int from " + value.getClass().getName());
         }
     }
+
     /**
      * Returns the <CODE>long</CODE> value with the specified name.
      * 
-     * @param name the name of the <CODE>long</CODE>
+     * @param name
+     *            the name of the <CODE>long</CODE>
      * @return the <CODE>long</CODE> value with the specified name
-     * @throws BlazeMessageFormatException if this type conversion is invalid.
+     * @throws BlazeMessageFormatException
+     *             if this type conversion is invalid.
      */
-    public long getLong(String name) throws BlazeMessageFormatException{
+    public long getLong(String name) throws BlazeMessageFormatException {
         initializeReading();
         Object value = this.map.get(name);
         if (value == null) {
@@ -304,14 +386,17 @@
             throw new BlazeMessageFormatException(" cannot read a long from " + value.getClass().getName());
         }
     }
+
     /**
      * Returns the <CODE>float</CODE> value with the specified name.
      * 
-     * @param name the name of the <CODE>float</CODE>
+     * @param name
+     *            the name of the <CODE>float</CODE>
      * @return the <CODE>float</CODE> value with the specified name
-     * @throws BlazeMessageFormatException if this type conversion is invalid.
+     * @throws BlazeMessageFormatException
+     *             if this type conversion is invalid.
      */
-    public float getFloat(String name) throws BlazeMessageFormatException{
+    public float getFloat(String name) throws BlazeMessageFormatException {
         initializeReading();
         Object value = this.map.get(name);
         if (value == null) {
@@ -326,14 +411,17 @@
             throw new BlazeMessageFormatException(" cannot read a float from " + value.getClass().getName());
         }
     }
+
     /**
      * Returns the <CODE>double</CODE> value with the specified name.
      * 
-     * @param name the name of the <CODE>double</CODE>
+     * @param name
+     *            the name of the <CODE>double</CODE>
      * @return the <CODE>double</CODE> value with the specified name
-     * @throws BlazeMessageFormatException if this type conversion is invalid.
+     * @throws BlazeMessageFormatException
+     *             if this type conversion is invalid.
      */
-    public double getDouble(String name) throws BlazeMessageFormatException{
+    public double getDouble(String name) throws BlazeMessageFormatException {
         initializeReading();
         Object value = this.map.get(name);
         if (value == null) {
@@ -351,15 +439,18 @@
             throw new BlazeMessageFormatException(" cannot read a double from " + value.getClass().getName());
         }
     }
+
     /**
      * Returns the <CODE>String</CODE> value with the specified name.
      * 
-     * @param name the name of the <CODE>String</CODE>
+     * @param name
+     *            the name of the <CODE>String</CODE>
      * @return the <CODE>String</CODE> value with the specified name; if there is no item by this name, a null value
      *         is returned
-     * @throws BlazeMessageFormatException if this type conversion is invalid.
+     * @throws BlazeMessageFormatException
+     *             if this type conversion is invalid.
      */
-    public String getString(String name) throws BlazeMessageFormatException{
+    public String getString(String name) throws BlazeMessageFormatException {
         initializeReading();
         Object value = this.map.get(name);
         if (value == null) {
@@ -371,14 +462,17 @@
             return value.toString();
         }
     }
+
     /**
      * Returns the byte array value with the specified name.
      * 
-     * @param name the name of the byte array
+     * @param name
+     *            the name of the byte array
      * @return the byte array value with the specified name; if there is no item by this name, a null value is returned.
-     * @throws BlazeMessageFormatException if this type conversion is invalid.
+     * @throws BlazeMessageFormatException
+     *             if this type conversion is invalid.
      */
-    public byte[] getBytes(String name) throws BlazeMessageFormatException{
+    public byte[] getBytes(String name) throws BlazeMessageFormatException {
         initializeReading();
         Object value = this.map.get(name);
         if (value instanceof byte[]) {
@@ -387,6 +481,28 @@
             throw new BlazeMessageFormatException(" cannot read a byte[] from " + value.getClass().getName());
         }
     }
+    
+    /**
+     * Returns a Buffer with the specified name.
+     * 
+     * @param name
+     *            the name of the byte array
+     * @return the byte array value with the specified name; if there is no item by this name, a null value is returned.
+     * @throws BlazeMessageFormatException
+     *             if this type conversion is invalid.
+     */
+    public Buffer getBuffer(String name) throws BlazeMessageFormatException {
+        initializeReading();
+        Object value = this.map.get(name);
+        if (value instanceof Buffer) {
+            return (Buffer) value;
+        } else {
+            throw new BlazeMessageFormatException(" cannot read a Buffer from " + value.getClass().getName());
+        }
+    }
+    
+    
+
     /**
      * Returns the value of the object with the specified name.
      * <P>
@@ -396,33 +512,37 @@
      * <P>
      * Note that byte values are returned as <CODE>byte[]</CODE>, not <CODE>Byte[]</CODE>.
      * 
-     * @param name the name of the Java object
+     * @param name
+     *            the name of the Java object
      * @return a copy of the Java object value with the specified name, in objectified format (for example, if the
      *         object was set as an <CODE>int</CODE>, an <CODE>Integer</CODE> is returned); if there is no item by
      *         this name, a null value is returned
      */
-    public Object getObject(String name){
+    public Object getObject(String name) {
         initializeReading();
         return this.map.get(name);
     }
+
     /**
      * Returns an <CODE>Enumeration</CODE> of all the names in the <CODE>BlazeMessage</CODE> object.
      * 
      * @return an enumeration of all the names in this <CODE>BlazeMessage</CODE>
      */
-    public Enumeration<String> getMapNames(){
+    public Enumeration<String> getMapNames() {
         initializeReading();
         return Collections.enumeration(this.map.keySet());
     }
-    
+
     /**
      * put a key,value pair into the message
-     * @param name 
-     * @param value must be a supported primitive, or map of supported primitives
+     * 
+     * @param name
+     * @param value
+     *            must be a supported primitive, or map of supported primitives
      * @return the previous value associated with the key
      */
-    public Object put(String name,Object value){
-        initializeWriting();  
+    public Object put(String name, Object value) {
+        initializeWriting();
         if (name == null) {
             throw new IllegalArgumentException("The name of the property cannot be null.");
         }
@@ -432,106 +552,155 @@
         checkValidObject(value);
         return this.map.put(name, value);
     }
+
     /**
      * Sets a <CODE>boolean</CODE> value with the specified name into the Map.
      * 
-     * @param name the name of the <CODE>boolean</CODE>
-     * @param value the <CODE>boolean</CODE> value to set in the Map
+     * @param name
+     *            the name of the <CODE>boolean</CODE>
+     * @param value
+     *            the <CODE>boolean</CODE> value to set in the Map
      */
-    public void setBoolean(String name,boolean value){
+    public void setBoolean(String name, boolean value) {
         initializeWriting();
         put(name, value ? Boolean.TRUE : Boolean.FALSE);
     }
-    
+
     /**
      * Sets a <CODE>byte</CODE> value with the specified name into the Map.
      * 
-     * @param name the name of the <CODE>byte</CODE>
-     * @param value the <CODE>byte</CODE> value to set in the Map
+     * @param name
+     *            the name of the <CODE>byte</CODE>
+     * @param value
+     *            the <CODE>byte</CODE> value to set in the Map
      */
-    public void setByte(String name,byte value){
+    public void setByte(String name, byte value) {
         initializeWriting();
         put(name, Byte.valueOf(value));
     }
+
     /**
      * Sets a <CODE>short</CODE> value with the specified name into the Map.
      * 
-     * @param name the name of the <CODE>short</CODE>
-     * @param value the <CODE>short</CODE> value to set in the Map
+     * @param name
+     *            the name of the <CODE>short</CODE>
+     * @param value
+     *            the <CODE>short</CODE> value to set in the Map
      */
-    public void setShort(String name,short value){
+    public void setShort(String name, short value) {
         initializeWriting();
         put(name, Short.valueOf(value));
     }
+
     /**
      * Sets a Unicode character value with the specified name into the Map.
      * 
-     * @param name the name of the Unicode character
-     * @param value the Unicode character value to set in the Map
+     * @param name
+     *            the name of the Unicode character
+     * @param value
+     *            the Unicode character value to set in the Map
      */
-    public void setChar(String name,char value){
+    public void setChar(String name, char value) {
         initializeWriting();
         put(name, Character.valueOf(value));
     }
+
     /**
      * Sets an <CODE>int</CODE> value with the specified name into the Map.
      * 
-     * @param name the name of the <CODE>int</CODE>
-     * @param value the <CODE>int</CODE> value to set in the Map
+     * @param name
+     *            the name of the <CODE>int</CODE>
+     * @param value
+     *            the <CODE>int</CODE> value to set in the Map
      */
-    public void setInt(String name,int value){
+    public void setInt(String name, int value) {
         initializeWriting();
         put(name, Integer.valueOf(value));
     }
+
     /**
      * Sets a <CODE>long</CODE> value with the specified name into the Map.
      * 
-     * @param name the name of the <CODE>long</CODE>
-     * @param value the <CODE>long</CODE> value to set in the Map
+     * @param name
+     *            the name of the <CODE>long</CODE>
+     * @param value
+     *            the <CODE>long</CODE> value to set in the Map
      */
-    public void setLong(String name,long value){
+    public void setLong(String name, long value) {
         initializeWriting();
         put(name, Long.valueOf(value));
     }
+
     /**
      * Sets a <CODE>float</CODE> value with the specified name into the Map.
      * 
-     * @param name the name of the <CODE>float</CODE>
-     * @param value the <CODE>float</CODE> value to set in the Map
+     * @param name
+     *            the name of the <CODE>float</CODE>
+     * @param value
+     *            the <CODE>float</CODE> value to set in the Map
      */
-    public void setFloat(String name,float value){
+    public void setFloat(String name, float value) {
         initializeWriting();
         put(name, new Float(value));
     }
+
     /**
      * Sets a <CODE>double</CODE> value with the specified name into the Map.
      * 
-     * @param name the name of the <CODE>double</CODE>
-     * @param value the <CODE>double</CODE> value to set in the Map
+     * @param name
+     *            the name of the <CODE>double</CODE>
+     * @param value
+     *            the <CODE>double</CODE> value to set in the Map
      */
-    public void setDouble(String name,double value){
+    public void setDouble(String name, double value) {
         initializeWriting();
         put(name, new Double(value));
     }
+
     /**
      * Sets a <CODE>String</CODE> value with the specified name into the Map.
      * 
-     * @param name the name of the <CODE>String</CODE>
-     * @param value the <CODE>String</CODE> value to set in the Map
+     * @param name
+     *            the name of the <CODE>String</CODE>
+     * @param value
+     *            the <CODE>String</CODE> value to set in the Map
      */
-    public void setString(String name,String value){
+    public void setString(String name, String value) {
         initializeWriting();
         put(name, value);
     }
+
     /**
      * Sets a byte array value with the specified name into the Map.
      * 
-     * @param name the name of the byte array
-     * @param value the byte array value to set in the Map; the array is copied so that the value for <CODE>name </CODE>
+     * @param name
+     *            the name of the byte array
+     * @param value
+     *            the byte array value to set in the Map; the array is copied so that the value for <CODE>name </CODE>
      *            will not be altered by future modifications
-     * @throws NullPointerException if the name is null, or if the name is an empty string.
+     * @throws NullPointerException
+     *             if the name is null, or if the name is an empty string.
+     */
+    public void setBytes(String name, byte[] value) {
+        initializeWriting();
+        if (value != null) {
+            put(name, value);
+        } else {
+            this.map.remove(name);
+        }
+    }
+    
+    /**
+     * Sets a Buffer value with the specified name into the Map.
+     * 
+     * @param name
+     *            the name of the byte array
+     * @param value
+     *            the Buffer value to set in the Map
+     * @throws NullPointerException
+     *             if the name is null, or if the name is an empty string.
      */
-    public void setBytes(String name,byte[] value){
+    public void setBuffer(String name, Buffer value) {
         initializeWriting();
         if (value != null) {
             put(name, value);
@@ -539,163 +708,173 @@
             this.map.remove(name);
         }
     }
+
     /**
      * Sets a portion of the byte array value with the specified name into the Map.
      * 
-     * @param name the name of the byte array
-     * @param value the byte array value to set in the Map
-     * @param offset the initial offset within the byte array
-     * @param length the number of bytes to use
+     * @param name
+     *            the name of the byte array
+     * @param value
+     *            the byte array value to set in the Map
+     * @param offset
+     *            the initial offset within the byte array
+     * @param length
+     *            the number of bytes to use
      */
-    public void setBytes(String name,byte[] value,int offset,int length){
+    public void setBytes(String name, byte[] value, int offset, int length) {
         initializeWriting();
         byte[] data = new byte[length];
         System.arraycopy(value, offset, data, 0, length);
         put(name, data);
     }
-    
+
+   
+
     /**
-     * Find out if the message contains a key
-     * This isn't recursive
-     * @param key 
+     * Find out if the message contains a key This isn't recursive
+     * 
+     * @param key
      * @return true if the message contains the key
      * 
-     */    
-    public boolean containsKey(Object key){
+     */
+    public boolean containsKey(Object key) {
         initializeReading();
         return this.map.containsKey(key.toString());
     }
-    
+
     /**
      * Find out if the message contains a value
-     * @param value 
+     * 
+     * @param value
      * @return true if the value exists
      * 
      */
-    public boolean containsValue(Object value){
+    public boolean containsValue(Object value) {
         initializeReading();
         return this.map.containsValue(value);
     }
-    
+
     /**
      * @return a set of Map.Entry values
      * 
      */
-    public Set<java.util.Map.Entry<String, Object>> entrySet(){
+    public Set<java.util.Map.Entry<String, Object>> entrySet() {
         initializeReading();
         return this.map.entrySet();
     }
-    
+
     /**
      * Retrieve the object associated with the key
-     * @param key 
+     * 
+     * @param key
      * @return the object
      */
-    public Object get(Object key){
+    public Object get(Object key) {
         initializeReading();
         return getObject(key.toString());
     }
-    
+
     /**
      * @return true if the message is empty
      * 
      */
-    public boolean isEmpty(){
+    public boolean isEmpty() {
         initializeReading();
         return this.map.isEmpty();
     }
-    
+
     /**
      * @return a Set of all the keys
      */
-    public Set<String> keySet(){
+    public Set<String> keySet() {
         initializeReading();
         return this.map.keySet();
     }
-    
+
     /**
      * Add all entries in a Map to the message
-     * @param t the map
+     * 
+     * @param t
+     *            the map
      * 
      */
-    public void putAll(Map<? extends String, ? extends Object> t){
+    public void putAll(Map<? extends String, ? extends Object> t) {
         for (Map.Entry<? extends String, ? extends Object> entry : t.entrySet()) {
             put(entry.getKey(), entry.getValue());
         }
-        
     }
-    
+
     /**
      * Remove a key/value pair from the message
-     * @param key 
+     * 
+     * @param key
      * @return the value removed or null
      * 
      */
-    public Object remove(Object key){
+    public Object remove(Object key) {
         setContent(null);
         return this.map.remove(key.toString());
     }
-    
+
     /**
      * @return the number of entries in the message
      */
-    public int size(){
+    public int size() {
         initializeReading();
         return this.map.size();
     }
-    
+
     /**
      * @return a Collection of the values in the message
      */
-    public Collection<Object> values(){
+    public Collection<Object> values() {
         initializeReading();
         return this.map.values();
     }
-    
-    private void initializeReading(){
+
+    private void initializeReading() {
         loadContent();
     }
-    
-    private void initializeWriting(){
+
+    private void initializeWriting() {
         setContent(null);
     }
-    
-    protected void checkValidObject(Object value) throws IllegalArgumentException{
+
+    protected void checkValidObject(Object value) throws IllegalArgumentException {
         boolean valid = value instanceof Boolean || value instanceof Byte || value instanceof Short
                 || value instanceof Integer || value instanceof Long;
         valid = valid || value instanceof Float || value instanceof Double || value instanceof Character
                 || value instanceof String || value == null || value instanceof byte[];
         if (value instanceof Map) {
-            Map map =  (Map) value;
-            for(Object v:map.values()) {
+            Map map = (Map) value;
+            for (Object v : map.values()) {
                 checkValidObject(v);
             }
             valid = true;
         }
         if (!valid) {
-            throw new IllegalArgumentException("Not a valid message value: "+value);
+            throw new IllegalArgumentException("Not a valid message value: " + value);
         }
     }
-    
-    public String toString(){
+
+    public String toString() {
         return super.toString() + "MQBlazeMessage{ " + "map = " + this.map + " }";
     }
-    
-    protected void copy(BlazeMessage copy) throws BlazeException{
+
+    protected void copy(BlazeMessage copy) throws BlazeException {
         storeContent();
         copy.content = this.content;
     }
-    
-    
-    public BlazeData getContent(){
+
+    public BlazeData getContent() {
         return this.content;
     }
-    
-    public void setContent(BlazeData content){
+
+    public void setContent(BlazeData content) {
         this.content = content;
     }
-    
-    private void marshallMap(MapData mapData,String name,Object value) throws BlazeMessageFormatException{
+
+    private void marshallMap(MapData mapData, String name, Object value) throws BlazeMessageFormatException {
         if (value != null) {
             if (value.getClass() == Boolean.class) {
                 BoolType type = new BoolType();
@@ -747,6 +926,10 @@
                 type.setName(name);
                 type.setValue(value.toString());
                 mapData.addStringType(type);
+            } else if (value.getClass() == Buffer.class) {
+                BufferType type = new BufferType();
+                type.setName(name);
+                type.setValue((Buffer) value);
             } else if (value instanceof Map) {
                 Map<String, Key> subMap = (Map<String, Key>) value;
                 for (Map.Entry<String, Key> entry : subMap.entrySet()) {
@@ -760,8 +943,8 @@
             }
         }
     }
-    
-    Map<String, Object> unmarshall(MapData mapData){
+
+    Map<String, Object> unmarshall(MapData mapData) {
         Map<String, Object> result = new ConcurrentHashMap<String, Object>();
         if (mapData.hasBoolType()) {
             for (BoolType type : mapData.getBoolTypeList()) {
@@ -813,6 +996,11 @@
                 result.put(type.getName(), type.getValue().toByteArray());
             }
         }
+        if (mapData.hasBufferType()) {
+            for (BufferType type : mapData.getBufferTypeList()) {
+                result.put(type.getName(), type.getValue());
+            }
+        }
         if (mapData.hasMapType()) {
             for (MapData type : mapData.getMapTypeList()) {
                 Map<String, Object> map = unmarshall(type);
@@ -821,8 +1009,8 @@
         }
         return result;
     }
-    
-    public void storeContent() throws BlazeMessageFormatException{
+
+    public void storeContent() throws BlazeMessageFormatException {
         if (getContent() == null && !this.map.isEmpty()) {
             BlazeData bd = new BlazeData();
             MapData mapData = new MapData();
@@ -833,12 +1021,12 @@
             this.content = bd;
         }
     }
-    
+
     /**
      * Builds the message body from data
      * 
      */
-    void loadContent() throws BlazeRuntimeException{
+    void loadContent() throws BlazeRuntimeException {
         BlazeData data = getContent();
         if (data != null && this.map.isEmpty()) {
             this.map = unmarshall(data.getMapData());
@@ -853,7 +1041,8 @@
     }
 
     /**
-     * @param destination the destination to set
+     * @param destination
+     *            the destination to set
      */
     public void setDestination(String destination) {
         this.destination = destination;
@@ -861,6 +1050,7 @@
 
     /**
      * The id of the channel that sent the message
+     * 
      * @return the fromId
      */
     public String getFromId() {
@@ -868,7 +1058,8 @@
     }
 
     /**
-     * @param fromId the fromId to set
+     * @param fromId
+     *            the fromId to set
      */
     public void setFromId(String fromId) {
         this.fromId = fromId;
@@ -882,7 +1073,8 @@
     }
 
     /**
-     * @param messageId the messageId to set
+     * @param messageId
+     *            the messageId to set
      */
     public void setMessageId(String messageId) {
         this.messageId = messageId;
@@ -896,7 +1088,8 @@
     }
 
     /**
-     * @param correlationId the correlationId to set
+     * @param correlationId
+     *            the correlationId to set
      */
     public void setCorrelationId(String correlationId) {
         this.correlationId = correlationId;
@@ -910,9 +1103,12 @@
     }
 
     /**
-     * @param timeStamp the timeStamp to set
+     * @param timeStamp
+     *            the timeStamp to set
      */
     public void setTimeStamp(long timeStamp) {
         this.timeStamp = timeStamp;
     }
+    
+    
 }
\ No newline at end of file

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeNoRouteException.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeNoRouteException.java?rev=720505&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeNoRouteException.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeNoRouteException.java Tue Nov 25 06:27:23 2008
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze;
+
+/**
+ * Blaze BlazeNoRouteException
+ * 
+ */
+public class BlazeNoRouteException extends BlazeException {
+    private static final long serialVersionUID = 3951297225484077839L;
+
+    /**
+     * Constructs a new exception with <code>null</code> as its detail message. The cause is not initialized, and may
+     * subsequently be initialized by a call to {@link #initCause}.
+     */
+    public BlazeNoRouteException() {
+        super();
+    }
+
+    /**
+     * Constructs a new exception with the specified detail message. The cause is not initialized, and may subsequently
+     * be initialized by a call to {@link #initCause}.
+     * 
+     * @param message
+     *            the detail message. The detail message is saved for later retrieval by the {@link #getMessage()}
+     *            method.
+     */
+    public BlazeNoRouteException(String message) {
+        super(message);
+    }
+
+    /**
+     * Constructs a new exception with the specified detail message and cause.
+     * <p>
+     * Note that the detail message associated with <code>cause</code> is <i>not</i> automatically incorporated in
+     * this exception's detail message.
+     * 
+     * @param message
+     *            the detail message (which is saved for later retrieval by the {@link #getMessage()} method).
+     * @param cause
+     *            the cause (which is saved for later retrieval by the {@link #getCause()} method). (A <tt>null</tt>
+     *            value is permitted, and indicates that the cause is nonexistent or unknown.)
+     */
+    public BlazeNoRouteException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    /**
+     * Constructs a new exception with the specified cause and a detail message of
+     * <tt>(cause==null ? null : cause.toString())</tt> (which typically contains the class and detail message of
+     * <tt>cause</tt>). This constructor is useful for exceptions that are little more than wrappers for other
+     * throwables (for example, {@link java.security.PrivilegedActionException}).
+     * 
+     * @param cause
+     *            the cause (which is saved for later retrieval by the {@link #getCause()} method). (A <tt>null</tt>
+     *            value is permitted, and indicates that the cause is nonexistent or unknown.)
+     */
+    public BlazeNoRouteException(Throwable cause) {
+        super(cause);
+    }
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeNoRouteException.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeRuntimeException.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeRuntimeException.java?rev=720505&r1=720504&r2=720505&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeRuntimeException.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeRuntimeException.java Tue Nov 25 06:27:23 2008
@@ -21,11 +21,8 @@
  *
  */
 public class BlazeRuntimeException extends RuntimeException {
-
-	/**
-     * 
-     */
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = -239755000850890447L;
+    
 
     /**
      * Constructs a new exception with <code>null</code> as its detail message.

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/CoordinatedGroup.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/CoordinatedGroup.java?rev=720505&r1=720504&r2=720505&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/CoordinatedGroup.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/CoordinatedGroup.java Tue Nov 25 06:27:23 2008
@@ -26,10 +26,10 @@
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.activeblaze.group.AsyncGroupRequest;
 import org.apache.activeblaze.group.Group;
 import org.apache.activeblaze.group.Member;
 import org.apache.activeblaze.group.MemberImpl;
+import org.apache.activeblaze.util.AsyncGroupRequest;
 import org.apache.activeblaze.wire.ElectionMessage;
 import org.apache.activeblaze.wire.ElectionType;
 import org.apache.activeblaze.wire.MemberData;
@@ -242,25 +242,27 @@
 
     boolean waitForElection(int timeout) throws Exception {
         long deadline = 0;
+        long waitTime = timeout;
         if (timeout > 0) {
             deadline = System.currentTimeMillis() + timeout;
         }
         synchronized (this.electionFinished) {
-            while (isStarted() && !this.electionFinished.get()) {
+            while (isStarted() && !this.electionFinished.get() && (timeout == 0 || waitTime > 0)) {
                 try {
-                    this.electionFinished.wait(timeout);
+                    this.electionFinished.wait(waitTime);
                 } catch (InterruptedException e) {
                     LOG.warn("Interrupted in waitForElection");
                     stop();
                 }
                 if (timeout > 0) {
-                    timeout = (int) Math.max(deadline - System.currentTimeMillis(), 0l);
+                    waitTime = (int) Math.max(deadline - System.currentTimeMillis(), 0l);
                 }
             }
         }
         return !isStopped() && this.electionFinished.get();
     }
 
+
     protected static List<MemberImpl> sortMemberList(List<MemberImpl> list) {
         Collections.sort(list, new Comparator<Member>() {
             public int compare(Member m1, Member m2) {

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/GroupState.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/GroupState.java?rev=720505&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/GroupState.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/GroupState.java Tue Nov 25 06:27:23 2008
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.coordinated;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * <P>
+ * A <CODE>GroupState</CODE> is a distributed collaboration implementation that is
+ * used to shared state and process messages amongst a distributed group of
+ * other <CODE>Group</CODE> instances. Membership of a group is handled
+ * automatically using discovery.
+ * <P>
+ * The underlying transport is JMS and there are some optimizations that occur
+ * for membership if used with ActiveMQ - but <CODE>Group</CODE> can be used
+ * with any JMS implementation.
+ * 
+ * <P>
+ * Updates to the group shared map are controlled by a coordinator. The
+ * coordinator is elected by the member with the lowest lexicographical id -
+ * based on the bully algorithm [Silberschatz et al. 1993]
+ * <P>
+ * The {@link #selectCordinator(Collection<Member> members)} method may be
+ * overridden to implement a custom mechanism for choosing how the coordinator
+ * is elected for the map.
+ * <P>
+ * New <CODE>Group</CODE> instances have their state updated by the
+ * coordinator, and coordinator failure is handled automatically within the
+ * group.
+ * <P>
+ * All map updates are totally ordered through the coordinator, whilst read
+ * operations happen locally.
+ * <P>
+ * A <CODE>Group</CODE> supports the concept of owner only updates(write
+ * locks), shared updates, entry expiration times and removal on owner exit -
+ * all of which are optional. In addition, you can grab and release locks for
+ * values in the map, independently of who created them.
+ * <P>
+ * In addition, members of a group can broadcast messages and implement
+ * request/response with other <CODE>Group</CODE> instances.
+ * 
+ * <P>
+ * @param <String> 
+ * @param <V> 
+ * 
+ */
+
+public class  GroupState<String,V> implements Map<String,V>{
+    
+    private final BlazeCoordinatedGroupChannelImpl channel;
+    protected GroupState(BlazeCoordinatedGroupChannelImpl channel) {
+        this.channel=channel;
+    }
+    /**
+     * 
+     * @see java.util.Map#clear()
+     */
+    public void clear() {
+        // TODO Auto-generated method stub
+        
+    }
+    /**
+     * @param key
+     * @return
+     * @see java.util.Map#containsKey(java.lang.Object)
+     */
+    public boolean containsKey(Object key) {
+        // TODO Auto-generated method stub
+        return false;
+    }
+    /**
+     * @param value
+     * @return
+     * @see java.util.Map#containsValue(java.lang.Object)
+     */
+    public boolean containsValue(Object value) {
+        // TODO Auto-generated method stub
+        return false;
+    }
+    /**
+     * @return
+     * @see java.util.Map#entrySet()
+     */
+    public Set<java.util.Map.Entry<String, V>> entrySet() {
+        // TODO Auto-generated method stub
+        return null;
+    }
+    /**
+     * @param key
+     * @return
+     * @see java.util.Map#get(java.lang.Object)
+     */
+    public V get(Object key) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+    /**
+     * @return
+     * @see java.util.Map#isEmpty()
+     */
+    public boolean isEmpty() {
+        // TODO Auto-generated method stub
+        return false;
+    }
+    /**
+     * @return
+     * @see java.util.Map#keySet()
+     */
+    public Set<String> keySet() {
+        // TODO Auto-generated method stub
+        return null;
+    }
+    /**
+     * @param key
+     * @param value
+     * @return
+     * @see java.util.Map#put(java.lang.Object, java.lang.Object)
+     */
+    public V put(String key, V value) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+    /**
+     * @param t
+     * @see java.util.Map#putAll(java.util.Map)
+     */
+    public void putAll(Map<? extends String, ? extends V> t) {
+        // TODO Auto-generated method stub
+        
+    }
+    /**
+     * @param key
+     * @return
+     * @see java.util.Map#remove(java.lang.Object)
+     */
+    public V remove(Object key) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+    /**
+     * @return
+     * @see java.util.Map#size()
+     */
+    public int size() {
+        // TODO Auto-generated method stub
+        return 0;
+    }
+    /**
+     * @return
+     * @see java.util.Map#values()
+     */
+    public Collection<V> values() {
+        // TODO Auto-generated method stub
+        return null;
+    }
+}
\ No newline at end of file

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/GroupState.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java?rev=720505&r1=720504&r2=720505&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java Tue Nov 25 06:27:23 2008
@@ -25,6 +25,7 @@
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.activeblaze.BlazeChannelImpl;
 import org.apache.activeblaze.BlazeMessage;
+import org.apache.activeblaze.BlazeNoRouteException;
 import org.apache.activeblaze.BlazeRuntimeException;
 import org.apache.activeblaze.BlazeTopicListener;
 import org.apache.activeblaze.Processor;
@@ -32,7 +33,10 @@
 import org.apache.activeblaze.impl.processor.Packet;
 import org.apache.activeblaze.impl.transport.BaseTransport;
 import org.apache.activeblaze.impl.transport.TransportFactory;
+import org.apache.activeblaze.util.AsyncGroupRequest;
+import org.apache.activeblaze.util.LRUCache;
 import org.apache.activeblaze.util.PropertyUtil;
+import org.apache.activeblaze.util.SendRequest;
 import org.apache.activeblaze.wire.BlazeData;
 import org.apache.activeblaze.wire.DestinationData;
 import org.apache.activeblaze.wire.MemberData;
@@ -56,7 +60,7 @@
     private InetSocketAddress toManagementAddress;
     private MemberImpl local;
     private BlazeQueueListener inboxListener;
-    private Map<Buffer, SendRequest> messageRequests = new HashMap<Buffer, SendRequest>();
+    private Map<Buffer, SendRequest> messageRequests = new LRUCache<Buffer, SendRequest>(1000);
     private Map<Buffer, BlazeQueueListener> queueMessageListenerMap = new ConcurrentHashMap<Buffer, BlazeQueueListener>();
     private Group group;
     private Buffer inboxURI;
@@ -81,7 +85,7 @@
         boolean result = super.init();
         if (result) {
             String unicastURIStr = getConfiguration().getUnicastURI();
-            unicastURIStr=PropertyUtil.addPropertiesToURIFromBean(unicastURIStr, getConfiguration());
+            unicastURIStr = PropertyUtil.addPropertiesToURIFromBean(unicastURIStr, getConfiguration());
             URI unicastURI = new URI(unicastURIStr);
             this.inboxURI = new Buffer(unicastURIStr);
             BaseTransport transport = TransportFactory.get(unicastURI);
@@ -91,10 +95,9 @@
             // if using a port of zero - the port will be assigned automatically,
             // so need to get the potentially new value
             unicastURI = transport.getLocalURI();
-            //append configuration properties
-            
+            // append configuration properties
             String groupManagementURIStr = getGroupConfiguration().getGroupManagementURI();
-            groupManagementURIStr=PropertyUtil.addPropertiesToURIFromBean(groupManagementURIStr, getConfiguration());
+            groupManagementURIStr = PropertyUtil.addPropertiesToURIFromBean(groupManagementURIStr, getConfiguration());
             URI groupManagementURI = new URI(groupManagementURIStr);
             this.toManagementAddress = new InetSocketAddress(groupManagementURI.getHost(), groupManagementURI.getPort());
             this.groupManagementTransport = TransportFactory.get(groupManagementURI);
@@ -265,9 +268,17 @@
      */
     public void send(String destination, BlazeMessage message) throws Exception {
         Buffer key = new Buffer(destination);
-        MemberImpl member = getQueueDestination(key);
-        if (member != null) {
-            send(member, key, message);
+        while (true) {
+            MemberImpl member = getQueueDestination(key);
+            if (member != null) {
+                try {
+                    send(member, key, message);
+                    return;
+                } catch (BlazeNoRouteException e) {
+                }
+            } else {
+                return;
+            }
         }
     }
 
@@ -317,9 +328,7 @@
      *      org.apache.activeblaze.BlazeMessage)
      */
     public BlazeMessage sendRequest(String destination, BlazeMessage message) throws Exception {
-        Buffer key = new Buffer(destination);
-        MemberImpl member = getQueueDestination(key);
-        return sendRequest(member, key, message, 0);
+        return sendRequest(destination, message, 0);
     }
 
     /**
@@ -333,8 +342,30 @@
      */
     public BlazeMessage sendRequest(String destination, BlazeMessage message, int timeout) throws Exception {
         Buffer key = new Buffer(destination);
-        MemberImpl member = getQueueDestination(key);
-        return sendRequest(member, key, message, timeout);
+        long deadline = 0;
+        long waitTime = timeout;
+        if (timeout > 0) {
+            deadline = System.currentTimeMillis() + timeout;
+        }
+        while (!isStopped() && (timeout == 0 || waitTime > 0)) {
+            MemberImpl member = getQueueDestination(key);
+            if (member != null) {
+                try {
+                    BlazeMessage result = sendRequest(member, key, message, (int) waitTime);
+                    if (result != null) {
+                        return result;
+                    }
+                } catch (BlazeNoRouteException e) {
+                } finally {
+                    if (timeout > 0) {
+                        waitTime = (int) Math.max(deadline - System.currentTimeMillis(), 0);
+                    }
+                }
+            }else {
+                this.group.waitForNewMember((int) waitTime);
+            }
+        }
+        return null;
     }
 
     protected synchronized BlazeMessage sendRequest(MemberImpl member, Buffer destination, BlazeMessage message,
@@ -379,7 +410,6 @@
         Packet packet = new Packet(data);
         packet.setTo(((MemberImpl) to).getAddress());
         this.unicast.downStream(packet);
-        
     }
 
     protected void send(MemberImpl member, Buffer destination, BlazeMessage message) throws Exception {
@@ -392,6 +422,8 @@
         blazeData.setTopic(false);
         blazeData.setDestination(destination);
         PacketData data = getPacketData(MessageType.BLAZE_DATA, blazeData);
+        data.setReliable(true);
+        data.setResponseRequired(true);
         data.setFromAddress(this.inboxURI);
         Packet packet = new Packet(data);
         packet.setTo(member.getAddress());
@@ -485,7 +517,7 @@
     protected void doProcessBlazeData(PacketData data) throws Exception {
         BlazeMessage message = (BlazeMessage) buildBlazeMessage(data);
         if (message.getContent().getTopic()) {
-            super.processBlazeMessage(message);
+            dispatch(message);
         } else {
             Buffer destination = message.getContent().getDestination();
             if (this.inboxListener != null && this.producerId.equals(destination)) {
@@ -571,13 +603,13 @@
     }
 
     /**
-     * @param to 
+     * @param to
      * @param messageType
      * @param message
      * @param correlationId
      * @throws Exception
      */
-    public synchronized void sendReply(MemberImpl to,MessageType messageType, Message<?> message, String correlationId)
+    public synchronized void sendReply(MemberImpl to, MessageType messageType, Message<?> message, String correlationId)
             throws Exception {
         PacketData data = getPacketData(messageType, message);
         data.setCorrelationId(new Buffer(correlationId));

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Group.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Group.java?rev=720505&r1=720504&r2=720505&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Group.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Group.java Tue Nov 25 06:27:23 2008
@@ -48,6 +48,7 @@
     private List<MemberChangedListener> membershipListeners = new CopyOnWriteArrayList<MemberChangedListener>();
     private final Map<Buffer, List<MemberImpl>> queueMap = new ConcurrentHashMap<Buffer, List<MemberImpl>>();
     private final Map<Buffer, List<MemberImpl>> topicMap = new ConcurrentHashMap<Buffer, List<MemberImpl>>();
+    private final Object memberMutex = new Object();
 
     /**
      * Constructor
@@ -69,7 +70,7 @@
     public MemberImpl getLocalMember() throws Exception {
         return this.channel.getLocalMember();
     }
-    
+
     void updateLocal(MemberImpl member) {
         this.members.put(member.getId(), member);
     }
@@ -80,7 +81,7 @@
     public String getId() {
         return this.channel.getId();
     }
-    
+
     /**
      * @return the name of the local channel
      */
@@ -143,13 +144,14 @@
         }
         return null;
     }
-    
+
     /**
      * Will wait for a member to advertise itself if not available
+     * 
      * @param name
      * @param timeout
      * @return the member or null
-     * @throws InterruptedException 
+     * @throws InterruptedException
      */
     public Member getAndWaitForMemberByName(String name, int timeout) throws InterruptedException {
         Member result = null;
@@ -160,13 +162,13 @@
         while (true) {
             result = getMemberByName(name);
             if (result == null) {
-                synchronized(this.members) {
+                synchronized (this.members) {
                     this.members.wait(timeout);
                 }
                 if (timeout > 0) {
                     timeout = (int) Math.max(deadline - System.currentTimeMillis(), 0l);
                 }
-            }else {
+            } else {
                 break;
             }
         }
@@ -218,7 +220,7 @@
                     try {
                         broadcastHeartBeat(getLocalMember());
                     } catch (Exception e) {
-                        LOG.error("Failed to send heartbeat",e);
+                        LOG.error("Failed to send heartbeat", e);
                     }
                 }
             };
@@ -297,6 +299,9 @@
     }
 
     private void fireMemberStarted(Member member) {
+        synchronized (this.memberMutex) {
+            this.memberMutex.notifyAll();
+        }
         LOG.debug(this.channel.getName() + " Member started " + member);
         for (MemberChangedListener l : this.membershipListeners) {
             l.memberStarted(member);
@@ -304,6 +309,9 @@
     }
 
     private void fireMemberStopped(Member member) {
+        synchronized (this.memberMutex) {
+            this.memberMutex.notifyAll();
+        }
         LOG.debug(this.channel.getName() + " Member stopped " + member);
         for (MemberChangedListener l : this.membershipListeners) {
             l.memberStopped(member);
@@ -315,7 +323,7 @@
             long checkTime = System.currentTimeMillis() - this.configuration.getHeartBeatInterval();
             for (MemberImpl member : this.members.values()) {
                 if (!member.getId().equals(getId()) && member.getTimeStamp() < checkTime) {
-                    LOG.debug(getId() +" Member timestamp expired " + member);
+                    LOG.debug(getId() + " Member timestamp expired " + member);
                     this.members.remove(member.getId());
                     processMemberStopped(member);
                 }
@@ -326,7 +334,7 @@
     protected void processMemberStarted(MemberImpl member) throws Exception {
         processDestinationsForStarted(member);
         fireMemberStarted(member);
-        synchronized(this.members) {
+        synchronized (this.members) {
             this.members.notifyAll();
         }
     }
@@ -393,10 +401,33 @@
     protected Map<Buffer, List<MemberImpl>> getTopicMap() {
         return this.topicMap;
     }
-    
+
     protected void broadcastHeartBeat(MemberImpl local) throws Exception {
         if (isStarted()) {
             Group.this.channel.broadcastMessage(MessageType.MEMBER_DATA, local.getData());
         }
     }
+
+    protected boolean waitForNewMember(int timeout) throws Exception {
+        int memberCount = this.members.size();
+        long deadline = 0;
+        long waitTime = timeout;
+        if (timeout > 0) {
+            deadline = System.currentTimeMillis() + timeout;
+        }
+        synchronized (this.memberMutex) {
+            while (isStarted() && memberCount >= this.members.size() && (timeout == 0 || waitTime > 0)) {
+                try {
+                    this.memberMutex.wait(waitTime);
+                } catch (InterruptedException e) {
+                    LOG.warn("Interrupted in waitForMember");
+                    stop();
+                }
+                if (timeout > 0) {
+                    waitTime = Math.max(deadline - System.currentTimeMillis(), 0l);
+                }
+            }
+        }
+        return !isStopped() && memberCount < this.members.size();
+    }
 }

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/ChainedProcessor.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/ChainedProcessor.java?rev=720505&r1=720504&r2=720505&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/ChainedProcessor.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/ChainedProcessor.java Tue Nov 25 06:27:23 2008
@@ -178,4 +178,15 @@
             LOG.error("No exception listener - caught exception ", e);
         }
     }
+    
+    /**
+     * calls stop - but catches exceptions
+     */
+    protected void stopInternal() {
+        try {
+            stop();
+        } catch (Throwable e) {
+           LOG.error("Caught an exception stopping",e);
+        }
+    }
 }

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/Packet.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/Packet.java?rev=720505&r1=720504&r2=720505&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/Packet.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/Packet.java Tue Nov 25 06:27:23 2008
@@ -27,7 +27,7 @@
  */
 public final class Packet {
     final private SocketAddress from;
-    private InetSocketAddress to;
+    private SocketAddress to;
     private String id;
     final private PacketData packetData;
 
@@ -138,14 +138,14 @@
     /**
      * @return the to
      */
-    public InetSocketAddress getTo() {
+    public SocketAddress getTo() {
         return this.to;
     }
 
     /**
      * @param to the to to set
      */
-    public void setTo(InetSocketAddress to) {
+    public void setTo(SocketAddress to) {
         this.to = to;
     }
 }

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/BaseTransport.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/BaseTransport.java?rev=720505&r1=720504&r2=720505&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/BaseTransport.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/BaseTransport.java Tue Nov 25 06:27:23 2008
@@ -17,17 +17,26 @@
 package org.apache.activeblaze.impl.transport;
 
 import java.net.URI;
+import java.util.concurrent.LinkedBlockingQueue;
 import org.apache.activeblaze.BlazeConfiguration;
+import org.apache.activeblaze.BlazeMessage;
+import org.apache.activeblaze.impl.processor.ChainedProcessor;
+import org.apache.activeblaze.impl.processor.Packet;
 import org.apache.activeblaze.impl.processor.PacketAudit;
+import org.apache.activemq.protobuf.Buffer;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 /**
  * Base Class for transports
- *
+ * 
  */
-public abstract class BaseTransport extends ThreadChainedProcessor{
+public abstract class BaseTransport extends ThreadChainedProcessor {
+    private static final Log LOG = LogFactory.getLog(BaseTransport.class);
     static final int DEFAULT_MAX_PACKET_SIZE = BlazeConfiguration.DEFAULT_MAX_PACKET_SIZE;
     static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
     private URI localURI;
+    private Buffer bufferOfLocalURI;
     private int maxPacketSize = DEFAULT_MAX_PACKET_SIZE;
     private int bufferSize = DEFAULT_BUFFER_SIZE;
     private int soTimeout = 2000;
@@ -36,39 +45,59 @@
     protected final PacketAudit audit = new PacketAudit();
     private boolean broadcast = false;
     private boolean enableAudit = false;
-    
-        
+    private int maxDispatchQueueSize = 10000;
+    private LinkedBlockingQueue<Packet> dispatchQueue;
+    private Thread dispatchQueueThread;
+
     public boolean init() throws Exception {
         boolean result = super.init();
-        if(result) {
+        if (result) {
             this.audit.init();
+            if (this.localURI != null) {
+                this.bufferOfLocalURI = new Buffer(this.localURI.toString());
+            }
+            this.dispatchQueue = new LinkedBlockingQueue<Packet>(getMaxDispatchQueueSize());
         }
         return result;
     }
 
-    
     public boolean shutDown() throws Exception {
         boolean result = super.shutDown();
-        if(result) {
+        if (result) {
             this.audit.shutDown();
         }
         return result;
     }
 
-    
     public boolean start() throws Exception {
         boolean result = super.start();
-        if(result) {
+        if (result) {
             this.audit.start();
+            Runnable runable = new Runnable() {
+                public void run() {
+                    while (isStarted()) {
+                        dequeuePackets();
+                    }
+                }
+            };
+            this.dispatchQueueThread = new Thread(runable, getLocalURI() + "-DispatchQueue");
+            this.dispatchQueueThread.setDaemon(true);
+            this.dispatchQueueThread.start();
         }
-        return result; 
+        return result;
     }
 
-    
     public boolean stop() throws Exception {
         boolean result = super.stop();
-        if(result) {
+        if (result) {
             this.audit.stop();
+            if (this.dispatchQueueThread != null) {
+                this.dispatchQueueThread.interrupt();
+                try {
+                    this.dispatchQueueThread.join(100);
+                } catch (InterruptedException e) {
+                }
+            }
         }
         return result;
     }
@@ -76,94 +105,103 @@
     /**
      * @return the localURI
      */
-    public URI getLocalURI(){
+    public URI getLocalURI() {
         return this.localURI;
     }
 
     /**
-     * @param localURI the localURI to set
+     * @param localURI
+     *            the localURI to set
      */
-    public void setLocalURI(URI localURI){
+    public void setLocalURI(URI localURI) {
         this.localURI = localURI;
+        if (this.localURI != null) {
+            this.bufferOfLocalURI = new Buffer(this.localURI.toString());
+        }
     }
 
     /**
      * @return the maxPacketSize
      */
-    public int getMaxPacketSize(){
+    public int getMaxPacketSize() {
         return this.maxPacketSize;
     }
 
     /**
-     * @param maxPacketSize the maxPacketSize to set
+     * @param maxPacketSize
+     *            the maxPacketSize to set
      */
-    public void setMaxPacketSize(int maxPacketSize){
+    public void setMaxPacketSize(int maxPacketSize) {
         this.maxPacketSize = maxPacketSize;
     }
 
     /**
      * @return the bufferSize
      */
-    public int getBufferSize(){
+    public int getBufferSize() {
         return this.bufferSize;
     }
 
     /**
-     * @param bufferSize the bufferSize to set
+     * @param bufferSize
+     *            the bufferSize to set
      */
-    public void setBufferSize(int bufferSize){
+    public void setBufferSize(int bufferSize) {
         this.bufferSize = bufferSize;
     }
 
     /**
      * @return the soTimeout
      */
-    public int getSoTimeout(){
+    public int getSoTimeout() {
         return this.soTimeout;
     }
 
     /**
-     * @param soTimeout the soTimeout to set
+     * @param soTimeout
+     *            the soTimeout to set
      */
-    public void setSoTimeout(int soTimeout){
+    public void setSoTimeout(int soTimeout) {
         this.soTimeout = soTimeout;
     }
-    
+
     /**
      * @return the timeToLive
      */
-    public int getTimeToLive(){
+    public int getTimeToLive() {
         return this.timeToLive;
     }
 
     /**
-     * @param timeToLive the timeToLive to set
+     * @param timeToLive
+     *            the timeToLive to set
      */
-    public void setTimeToLive(int timeToLive){
+    public void setTimeToLive(int timeToLive) {
         this.timeToLive = timeToLive;
     }
 
     /**
      * @return the loopBack
      */
-    public boolean isLoopBack(){
+    public boolean isLoopBack() {
         return this.loopBack;
     }
 
     /**
-     * @param loopBack the loopBack to set
+     * @param loopBack
+     *            the loopBack to set
      */
-    public void setLoopBack(boolean loopBack){
+    public void setLoopBack(boolean loopBack) {
         this.loopBack = loopBack;
     }
-    
+
     /**
      * @return the audit
      */
-    protected PacketAudit getAudit(){
+    protected PacketAudit getAudit() {
         return this.audit;
     }
-    
+
     /**
      * @return the broadcast
      */
@@ -179,7 +217,6 @@
         this.broadcast = broadcast;
     }
 
-
     /**
      * @return the enableAudit
      */
@@ -187,11 +224,58 @@
         return this.enableAudit;
     }
 
-
     /**
-     * @param enableAudit the enableAudit to set
+     * @param enableAudit
+     *            the enableAudit to set
      */
     public void setEnableAudit(boolean enableAudit) {
         this.enableAudit = enableAudit;
     }
+
+    /**
+     * @return the bufferOfLocalURI
+     */
+    public Buffer getBufferOfLocalURI() {
+        return this.bufferOfLocalURI;
+    }
+
+    public String toString() {
+        return this.localURI != null ? this.localURI.toString() : " Uninitialized Transport";
+    }
+
+    /**
+     * @return the maxDispatchQueueSize
+     */
+    public int getMaxDispatchQueueSize() {
+        return this.maxDispatchQueueSize;
+    }
+
+    /**
+     * @param maxDispatchQueueSize
+     *            the maxDispatchQueueSize to set
+     */
+    public void setMaxDispatchQueueSize(int maxDispatchQueueSize) {
+        this.maxDispatchQueueSize = maxDispatchQueueSize;
+    }
+    
+    public void upStream(Packet packet) throws Exception {
+        if (!isStopped()) {
+            this.dispatchQueue.put(packet);
+        }
+    }
+
+    protected void dequeuePackets() {
+        Packet packet = null;
+        try {
+            packet = this.dispatchQueue.take();
+            if (packet != null) {
+                super.upStream(packet);
+            }
+        } catch (InterruptedException e1) {
+            // we've stopped
+        } catch (Exception e) {
+            LOG.error("Caught an exception processing a packet: " + packet, e);
+            stopInternal();
+        }
+    }
 }

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/MulticastTransport.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/MulticastTransport.java?rev=720505&r1=720504&r2=720505&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/MulticastTransport.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/MulticastTransport.java Tue Nov 25 06:27:23 2008
@@ -33,13 +33,11 @@
 public class MulticastTransport extends BaseTransport {
     private MulticastSocket socket;
     private String networkInterface;
-    private byte[] receiveData;
     private InetSocketAddress socketAddress;
 
     public boolean init() throws Exception {
         boolean result = super.init();
         if (result) {
-            this.receiveData = new byte[getMaxPacketSize()];
             this.socket = new MulticastSocket(getLocalURI().getPort());
             this.socket.setTimeToLive(getTimeToLive());
             this.socket.setLoopbackMode(isLoopBack());
@@ -76,7 +74,8 @@
 
     protected void doProcess() throws Exception {
         if (isInitialized()) {
-            DatagramPacket dp = new DatagramPacket(this.receiveData, this.receiveData.length);
+            byte[] receiveData = new byte[getMaxPacketSize()];
+            DatagramPacket dp = new DatagramPacket(receiveData, receiveData.length);
             this.socket.receive(dp);
             if (dp.getLength() > 0) {
                 PacketData data = PacketData.parseFramed(dp.getData());
@@ -96,7 +95,7 @@
                 this.audit.isDuplicate(packet);
             }
             byte[] data = packet.getPacketData().toFramedByteArray();
-            InetSocketAddress to = packet.getTo();
+            SocketAddress to = packet.getTo();
             DatagramPacket dp = new DatagramPacket(data, data.length, to);
             this.socket.send(dp);
         } else {