You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2009/02/13 12:10:14 UTC

svn commit: r744065 - in /activemq/activemq-blaze/trunk/src/main: java/org/apache/activeblaze/ java/org/apache/activeblaze/group/ java/org/apache/activeblaze/impl/reliable/simple/ java/org/apache/activeblaze/jms/ proto/

Author: rajdavies
Date: Fri Feb 13 11:10:13 2009
New Revision: 744065

URL: http://svn.apache.org/viewvc?rev=744065&view=rev
Log:
Pass through JMS messages to MessageConsumers directly - instead of being dispatched by the Session

Added:
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/SubscriptionHolder.java   (with props)
Removed:
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConsumer.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeMessageDispatcher.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeQueueMessageDispatcher.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeTopicMessageDispatcher.java
Modified:
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannel.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/Subscription.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannel.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupConfiguration.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/simple/SimpleFlow.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnection.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsMessageConsumer.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsSession.java
    activemq/activemq-blaze/trunk/src/main/proto/blaze.proto

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannel.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannel.java?rev=744065&r1=744064&r2=744065&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannel.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannel.java Fri Feb 13 11:10:13 2009
@@ -67,18 +67,18 @@
     /**
      * Remove a listener for messages
      * @param destination
-     * @return the removed listener
+     * @param l 
      * @throws Exception 
      */
-    public BlazeMessageListener removeBlazeTopicMessageListener(String destination) throws Exception;
+    public void removeBlazeTopicMessageListener(String destination,BlazeMessageListener l) throws Exception;
     
     /**
      * Remove the subscription
      * @param suscription
-     * @return the original subscription
+     * @param l 
      * @throws Exception
      */
-    public BlazeMessageListener removeBlazeTopicMessageListener(Subscription suscription) throws Exception;
+    public void removeBlazeTopicMessageListener(Subscription suscription, BlazeMessageListener l) throws Exception;
     
     /**
      * Set an exception listener for async exceptions that can be generated

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java?rev=744065&r1=744064&r2=744065&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java Fri Feb 13 11:10:13 2009
@@ -16,9 +16,6 @@
  */
 package org.apache.activeblaze;
 
-import java.net.URI;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 import org.apache.activeblaze.impl.network.Network;
 import org.apache.activeblaze.impl.network.NetworkFactory;
 import org.apache.activeblaze.impl.processor.ChainedProcessor;
@@ -34,7 +31,9 @@
 import org.apache.activeblaze.wire.PacketData;
 import org.apache.activemq.protobuf.Buffer;
 import org.apache.activemq.protobuf.Message;
-
+import java.net.URI;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
 /**
  * <P>
  * A <CODE>BlazeChannel</CODE> handles all client communication, either unicast,
@@ -42,18 +41,12 @@
  * 
  * 
  */
-public class BlazeChannelImpl extends DefaultChainedProcessor implements
-        BlazeChannel, ExceptionListener {
-    protected Map<Subscription, BlazeMessageListener> topicessageListenerMap = new ConcurrentHashMap<Subscription, BlazeMessageListener>();
-
+public class BlazeChannelImpl extends DefaultChainedProcessor implements BlazeChannel, ExceptionListener {
+    protected List<SubscriptionHolder> topicMessageListeners = new CopyOnWriteArrayList<SubscriptionHolder>();
     protected final IdGenerator idGenerator = new IdGenerator();
-
     protected Buffer producerId;
-
     protected Processor broadcast;
-
     protected BlazeConfiguration configuration = new BlazeConfiguration();
-
     private String id;
 
     /**
@@ -89,10 +82,9 @@
      * @see org.apache.activeblaze.BlazeChannel#addBlazeTopicMessageListener(java.lang.String,
      *      org.apache.activeblaze.BlazeMessageListener)
      */
-    public void addBlazeTopicMessageListener(String destination,
-            BlazeMessageListener l) throws Exception {
-        Subscription sub = new Subscription(destination);
-        this.topicessageListenerMap.put(sub, l);
+    public void addBlazeTopicMessageListener(String destination, BlazeMessageListener l) throws Exception {
+        SubscriptionHolder key = new SubscriptionHolder(destination, l);
+        this.topicMessageListeners.add(key);
     }
 
     /**
@@ -102,58 +94,52 @@
      * @see org.apache.activeblaze.BlazeChannel#addBlazeTopicMessageListener(java.lang.String,
      *      org.apache.activeblaze.BlazeMessageListener)
      */
-    public void addBlazeTopicMessageListener(Subscription subscription,
-            BlazeMessageListener l) throws Exception {
-        this.topicessageListenerMap.put(subscription, l);
+    public void addBlazeTopicMessageListener(Subscription subscription, BlazeMessageListener l) throws Exception {
+        SubscriptionHolder key = new SubscriptionHolder(subscription, l);
+        this.topicMessageListeners.add(key);
     }
 
     /**
      * @param destination
      * 
-     * @return the TopicListener
      * @throws Exception
      * @see org.apache.activeblaze.BlazeChannel#removeBlazeTopicMessageListener(String
      *      destination)
      */
-    public BlazeMessageListener removeBlazeTopicMessageListener(
-            String destination) throws Exception {
-        Subscription key = new Subscription(destination);
-        return this.topicessageListenerMap.remove(key);
+    public void removeBlazeTopicMessageListener(String destination, BlazeMessageListener l) throws Exception {
+        SubscriptionHolder key = new SubscriptionHolder(destination, l);
+        this.topicMessageListeners.remove(key);
     }
 
     /**
-     * @param key
-     * 
-     * @return the TopicListener
+     * @param subs
+     * @param l
      * @throws Exception
      * @see org.apache.activeblaze.BlazeChannel#removeBlazeTopicMessageListener(String
      *      destination)
      */
-    public BlazeMessageListener removeBlazeTopicMessageListener(Subscription key)
-            throws Exception {
-        return this.topicessageListenerMap.remove(key);
+    public void removeBlazeTopicMessageListener(Subscription subs, BlazeMessageListener l) throws Exception {
+        SubscriptionHolder key = new SubscriptionHolder(subs, l);
+        this.topicMessageListeners.remove(key);
     }
 
     public void doInit() throws Exception {
         super.doInit();
         String broadcastURIStr = getConfiguration().getBroadcastURI();
-        broadcastURIStr = PropertyUtil.addPropertiesToURIFromBean(
-                broadcastURIStr, getConfiguration());
+        broadcastURIStr = PropertyUtil.addPropertiesToURIFromBean(broadcastURIStr, getConfiguration());
         URI broadcastURI = new URI(broadcastURIStr);
         URI managementURI = null;
         String managementURIStr = getConfiguration().getManagementURI();
         if (managementURIStr != null && managementURIStr.length() > 0) {
             managementURI = new URI(getConfiguration().getManagementURI());
         }
-        Network transport = NetworkFactory.get(broadcastURI, managementURI,
-                getConfiguration().getReliableBroadcast());
+        Network transport = NetworkFactory.get(broadcastURI, managementURI, getConfiguration().getReliableBroadcast());
         transport.setName(getName());
         this.broadcast = configureProcess(transport);
         this.broadcast.init();
     }
 
-    protected final Processor configureProcess(ChainedProcessor transport)
-            throws Exception {
+    protected final Processor configureProcess(ChainedProcessor transport) throws Exception {
         int maxPacketSize = getConfiguration().getMaxPacketSize();
         CompressionProcessor result = new CompressionProcessor();
         result.setPrev(this);
@@ -166,8 +152,7 @@
         return result;
     }
 
-    protected ChainedProcessor getReliability(String reliability)
-            throws Exception {
+    protected ChainedProcessor getReliability(String reliability) throws Exception {
         DefaultChainedProcessor reliable = ReliableFactory.get(reliability);
         return reliable;
     }
@@ -190,13 +175,11 @@
         this.broadcast.stop();
     }
 
-    public void broadcast(String destination, BlazeMessage msg)
-            throws Exception {
+    public void broadcast(String destination, BlazeMessage msg) throws Exception {
         broadcast(new Destination(destination), msg);
     }
 
-    public synchronized void broadcast(Destination destination, BlazeMessage msg)
-            throws Exception {
+    public synchronized void broadcast(Destination destination, BlazeMessage msg) throws Exception {
         msg.setDestination(destination);
         msg.storeContent();
         BlazeData blazeData = msg.getContent();
@@ -206,8 +189,7 @@
         this.broadcast.downStream(packet);
     }
 
-    protected final synchronized PacketData getPacketData(MessageType type,
-            Message<?> message) {
+    protected final synchronized PacketData getPacketData(MessageType type, Message<?> message) {
         PacketData packetData = new PacketData();
         packetData.setType(type.getNumber());
         packetData.setProducerId(this.producerId);
@@ -221,8 +203,7 @@
         processData(packet.getId(), data.getCorrelationId(), data);
     }
 
-    protected void processData(String id, Buffer correlationId, PacketData data)
-            throws Exception {
+    protected void processData(String id, Buffer correlationId, PacketData data) throws Exception {
         MessageType type = MessageType.valueOf(data.getType());
         if (type == MessageType.BLAZE_DATA) {
             doProcessBlazeData(data);
@@ -255,8 +236,7 @@
         dispatch(message);
     }
 
-    protected final BlazeMessage buildBlazeMessage(PacketData data)
-            throws Exception {
+    protected final BlazeMessage buildBlazeMessage(PacketData data) throws Exception {
         BlazeMessage message = null;
         if (data != null) {
             MessageType type = MessageType.BLAZE_DATA;
@@ -274,9 +254,7 @@
                 message.setMessageId(data.getMessageId().toStringUtf8());
             }
             if (data.hasCorrelationId()) {
-                message
-                        .setCorrelationId(data.getCorrelationId()
-                                .toStringUtf8());
+                message.setCorrelationId(data.getCorrelationId().toStringUtf8());
             }
             message.setTimeStamp(blazeData.getTimestamp());
             message.setContent(blazeData);
@@ -290,12 +268,10 @@
 
     protected final void dispatch(BlazeMessage message) {
         if (message != null) {
-            Buffer destination = message.getContent().getDestinationData()
-                    .getName();
-            for (Map.Entry<Subscription, BlazeMessageListener> entry : this.topicessageListenerMap
-                    .entrySet()) {
-                if (entry.getKey().matches(destination)) {
-                    entry.getValue().onMessage(message);
+            Buffer destination = message.getContent().getDestinationData().getName();
+            for (SubscriptionHolder entry : this.topicMessageListeners) {
+                if (entry.getSubscription().matches(destination)) {
+                    entry.getListener().onMessage(message);
                 }
             }
         }

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/Subscription.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/Subscription.java?rev=744065&r1=744064&r2=744065&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/Subscription.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/Subscription.java Fri Feb 13 11:10:13 2009
@@ -248,10 +248,19 @@
         this.data.setNoLocal(noLocal);
     }
 
+    /** 
+     * @return hash code
+     * @see java.lang.Object#hashCode()
+     */
     public int hashCode() {
         return this.data.getDestinationData().getName().hashCode();
     }
     
+    /** 
+     * @param obj
+     * @return true if equals <Code>this</Code>
+     * @see java.lang.Object#equals(java.lang.Object)
+     */
     public boolean equals(Object obj) {
         if( obj==this )
            return true;

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/SubscriptionHolder.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/SubscriptionHolder.java?rev=744065&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/SubscriptionHolder.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/SubscriptionHolder.java Fri Feb 13 11:10:13 2009
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze;
+
+/**
+ * A SubscriptionHolder
+ * 
+ */
+public class SubscriptionHolder {
+    private final BlazeMessageListener listener;
+    private final Subscription subscription;
+
+    /**
+     * Constructor
+     * 
+     * @param destination
+     * @param listener
+     */
+    public SubscriptionHolder(String destination, BlazeMessageListener listener) {
+        this.listener = listener;
+        this.subscription = new Subscription(destination);
+    }
+    
+    /**
+     * Constructor
+     * 
+     * @param destination
+     * @param topic 
+     * @param listener
+     */
+    public SubscriptionHolder(String destination, boolean topic,BlazeMessageListener listener) {
+        this.listener = listener;
+        this.subscription = new Subscription(destination,topic);
+    }
+
+    /**
+     * Constructor
+     * 
+     * @param listener
+     * @param subscription
+     */
+    public SubscriptionHolder(Subscription subscription, BlazeMessageListener listener) {
+        this.listener = listener;
+        this.subscription = subscription;
+    }
+
+    /**
+     * Get the listener
+     * 
+     * @return the listener
+     */
+    public BlazeMessageListener getListener() {
+        return this.listener;
+    }
+
+    /**
+     * Get the subscription
+     * 
+     * @return the subscription
+     */
+    public Subscription getSubscription() {
+        return this.subscription;
+    }
+
+    /**
+     * @return hash code
+     * @see java.lang.Object#hashCode()
+     */
+    public int hashCode() {
+        return this.listener.hashCode() ^ this.subscription.hashCode();
+    }
+
+    /**
+     * @param obj
+     * @return true if equals
+     */
+    public boolean equals(Object obj) {
+        if (obj instanceof SubscriptionHolder) {
+            SubscriptionHolder other = (SubscriptionHolder) obj;
+            return this.listener.equals(other.listener) && this.subscription.equals(other.subscription);
+        }
+        return false;
+    }
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/SubscriptionHolder.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/SubscriptionHolder.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannel.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannel.java?rev=744065&r1=744064&r2=744065&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannel.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannel.java Fri Feb 13 11:10:13 2009
@@ -245,20 +245,20 @@
      * Remove a listener for messages
      * 
      * @param destination
-     * @return the removed listener
+     * @param l 
      * @throws Exception
      */
-    public BlazeMessageListener removeBlazeQueueMessageListener(String destination) throws Exception;
+    public void removeBlazeQueueMessageListener(String destination, BlazeMessageListener l) throws Exception;
 
     /**
      * Remove a listener for messages
      * 
      * @param subscription
+     * @param l 
      * 
-     * @return the removed listener
      * @throws Exception
      */
-    public BlazeMessageListener removeBlazeQueueMessageListener(Subscription subscription) throws Exception;
+    public void removeBlazeQueueMessageListener(Subscription subscription, BlazeMessageListener l) throws Exception;
 
     /**
      * Add member to a group

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java?rev=744065&r1=744064&r2=744065&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java Fri Feb 13 11:10:13 2009
@@ -16,12 +16,6 @@
  */
 package org.apache.activeblaze.group;
 
-import java.net.InetSocketAddress;
-import java.net.URI;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 import org.apache.activeblaze.BlazeChannelImpl;
 import org.apache.activeblaze.BlazeMessage;
 import org.apache.activeblaze.BlazeMessageListener;
@@ -30,6 +24,7 @@
 import org.apache.activeblaze.Destination;
 import org.apache.activeblaze.Processor;
 import org.apache.activeblaze.Subscription;
+import org.apache.activeblaze.SubscriptionHolder;
 import org.apache.activeblaze.impl.processor.ChainedProcessor;
 import org.apache.activeblaze.impl.processor.CompressionProcessor;
 import org.apache.activeblaze.impl.processor.DefaultChainedProcessor;
@@ -50,37 +45,29 @@
 import org.apache.activemq.protobuf.Message;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
 /**
  * <P>
  * A <CODE>BlazeGroupChannel</CODE> enables peer-based point to point
  * communication
  * 
  */
-public class BlazeGroupChannelImpl extends BlazeChannelImpl implements
-        BlazeGroupChannel {
-    private static final Log LOG = LogFactory
-            .getLog(BlazeGroupChannelImpl.class);
-
+public class BlazeGroupChannelImpl extends BlazeChannelImpl implements BlazeGroupChannel {
+    private static final Log LOG = LogFactory.getLog(BlazeGroupChannelImpl.class);
     private String name;
-
     protected Processor unicast;
-
     private MemberImpl local;
-
     private BlazeMessageListener inboxListener;
-
-    protected Map<Buffer, SendRequest> messageRequests = new LRUCache<Buffer, SendRequest>(
-            10000);
-
-    private Map<Subscription, BlazeMessageListener> queueMessageListenerMap = new ConcurrentHashMap<Subscription, BlazeMessageListener>();
-
+    protected Map<Buffer, SendRequest> messageRequests = new LRUCache<Buffer, SendRequest>(10000);
+    private final List<SubscriptionHolder> queueMessageListeners = new CopyOnWriteArrayList<SubscriptionHolder>();
     private Group group;
-
     protected Buffer inboxAddress;
-
     protected int inBoxPort;
-
     protected final Object localMutex = new Object();
 
     /**
@@ -100,27 +87,23 @@
     public void doInit() throws Exception {
         super.doInit();
         String unicastURIStr = getConfiguration().getUnicastURI();
-        unicastURIStr = PropertyUtil.addPropertiesToURIFromBean(unicastURIStr,
-                getConfiguration());
+        unicastURIStr = PropertyUtil.addPropertiesToURIFromBean(unicastURIStr, getConfiguration());
         URI unicastURI = new URI(unicastURIStr);
         BaseTransport transport = TransportFactory.get(unicastURI);
         transport.setName(getId() + "-Unicast");
-        this.unicast = configureProcess(transport, getConfiguration()
-                .getReliableUnicast());
+        this.unicast = configureProcess(transport, getConfiguration().getReliableUnicast());
         this.unicast.init();
         // if using a port of zero - the port will be assigned automatically,
         // so need to get the potentially new value
         unicastURI = transport.getLocalURI();
-        InetSocketAddress addr = new InetSocketAddress(unicastURI.getHost(),
-                unicastURI.getPort());
+        InetSocketAddress addr = new InetSocketAddress(unicastURI.getHost(), unicastURI.getPort());
         this.inboxAddress = new Buffer(addr.getAddress().getAddress());
         this.inBoxPort = addr.getPort();
         this.local = createLocal(unicastURI);
         this.group = createGroup();
     }
 
-    protected final Processor configureProcess(ChainedProcessor transport,
-            String reliability) throws Exception {
+    protected final Processor configureProcess(ChainedProcessor transport, String reliability) throws Exception {
         int maxPacketSize = getConfiguration().getMaxPacketSize();
         CompressionProcessor result = new CompressionProcessor();
         result.setPrev(this);
@@ -135,8 +118,7 @@
         return result;
     }
 
-    protected ChainedProcessor getReliability(String reliability)
-            throws Exception {
+    protected ChainedProcessor getReliability(String reliability) throws Exception {
         DefaultChainedProcessor reliable = ReliableFactory.get(reliability);
         return reliable;
     }
@@ -251,8 +233,7 @@
      * @throws Exception
      * @see org.apache.activeblaze.group.BlazeGroupChannel#addMemberChangedListener(org.apache.activeblaze.group.MemberChangedListener)
      */
-    public void addMemberChangedListener(MemberChangedListener l)
-            throws Exception {
+    public void addMemberChangedListener(MemberChangedListener l) throws Exception {
         init();
         this.group.addMemberChangedListener(l);
     }
@@ -262,8 +243,7 @@
      * @throws Exception
      * @see org.apache.activeblaze.group.BlazeGroupChannel#removeMemberChangedListener(org.apache.activeblaze.group.MemberChangedListener)
      */
-    public void removeMemberChangedListener(MemberChangedListener l)
-            throws Exception {
+    public void removeMemberChangedListener(MemberChangedListener l) throws Exception {
         init();
         this.group.removeMemberChangedListener(l);
     }
@@ -298,8 +278,7 @@
      * @return the member or null
      * @throws Exception
      */
-    public Member getAndWaitForMemberByName(String name, int timeout)
-            throws Exception {
+    public Member getAndWaitForMemberByName(String name, int timeout) throws Exception {
         init();
         return this.group.getAndWaitForMemberByName(name, timeout);
     }
@@ -336,8 +315,7 @@
      * @see org.apache.activeblaze.group.BlazeGroupChannel#send(java.lang.String,
      *      org.apache.activeblaze.BlazeMessage)
      */
-    public void send(Destination destination, BlazeMessage message)
-            throws Exception {
+    public void send(Destination destination, BlazeMessage message) throws Exception {
         while (true) {
             MemberImpl member = getQueueDestination(destination.getName());
             if (member != null) {
@@ -361,8 +339,7 @@
      *      org.apache.activeblaze.BlazeMessage)
      */
     public void send(Member member, BlazeMessage message) throws Exception {
-        send((MemberImpl) member, new Buffer(member.getInBoxDestination()),
-                message);
+        send((MemberImpl) member, new Buffer(member.getInBoxDestination()), message);
     }
 
     /**
@@ -373,8 +350,7 @@
      * @see org.apache.activeblaze.group.BlazeGroupChannel#sendRequest(org.apache.activeblaze.group.Member,
      *      org.apache.activeblaze.BlazeMessage)
      */
-    public BlazeMessage sendRequest(Member member, BlazeMessage message)
-            throws Exception {
+    public BlazeMessage sendRequest(Member member, BlazeMessage message) throws Exception {
         return sendRequest(member, message, 0);
     }
 
@@ -387,10 +363,8 @@
      * @see org.apache.activeblaze.group.BlazeGroupChannel#sendRequest(org.apache.activeblaze.group.Member,
      *      org.apache.activeblaze.BlazeMessage, int)
      */
-    public BlazeMessage sendRequest(Member member, BlazeMessage message,
-            int timeout) throws Exception {
-        return sendRequest((MemberImpl) member, new Buffer(member
-                .getInBoxDestination()), message, timeout);
+    public BlazeMessage sendRequest(Member member, BlazeMessage message, int timeout) throws Exception {
+        return sendRequest((MemberImpl) member, new Buffer(member.getInBoxDestination()), message, timeout);
     }
 
     /**
@@ -401,8 +375,7 @@
      * @see org.apache.activeblaze.group.BlazeGroupChannel#sendRequest(java.lang.String,
      *      org.apache.activeblaze.BlazeMessage)
      */
-    public BlazeMessage sendRequest(String destination, BlazeMessage message)
-            throws Exception {
+    public BlazeMessage sendRequest(String destination, BlazeMessage message) throws Exception {
         return sendRequest(new Destination(destination, false), message, 0);
     }
 
@@ -414,8 +387,7 @@
      * @see org.apache.activeblaze.group.BlazeGroupChannel#sendRequest(java.lang.String,
      *      org.apache.activeblaze.BlazeMessage)
      */
-    public BlazeMessage sendRequest(Destination destination,
-            BlazeMessage message) throws Exception {
+    public BlazeMessage sendRequest(Destination destination, BlazeMessage message) throws Exception {
         return sendRequest(destination, message, 0);
     }
 
@@ -428,10 +400,8 @@
      * @see org.apache.activeblaze.group.BlazeGroupChannel#sendRequest(java.lang.String,
      *      org.apache.activeblaze.BlazeMessage, int)
      */
-    public BlazeMessage sendRequest(String destination, BlazeMessage message,
-            int timeout) throws Exception {
-        return sendRequest(new Destination(destination, false), message,
-                timeout);
+    public BlazeMessage sendRequest(String destination, BlazeMessage message, int timeout) throws Exception {
+        return sendRequest(new Destination(destination, false), message, timeout);
     }
 
     /**
@@ -443,8 +413,7 @@
      * @see org.apache.activeblaze.group.BlazeGroupChannel#sendRequest(java.lang.String,
      *      org.apache.activeblaze.BlazeMessage, int)
      */
-    public BlazeMessage sendRequest(Destination destination,
-            BlazeMessage message, int timeout) throws Exception {
+    public BlazeMessage sendRequest(Destination destination, BlazeMessage message, int timeout) throws Exception {
         Buffer key = destination.getName();
         long deadline = 0;
         long waitTime = timeout;
@@ -455,16 +424,14 @@
             MemberImpl member = getQueueDestination(key);
             if (member != null) {
                 try {
-                    BlazeMessage result = sendRequest(member, key, message,
-                            (int) waitTime);
+                    BlazeMessage result = sendRequest(member, key, message, (int) waitTime);
                     if (result != null) {
                         return result;
                     }
                 } catch (BlazeNoRouteException e) {
                 } finally {
                     if (timeout > 0) {
-                        waitTime = (int) Math.max(deadline
-                                - System.currentTimeMillis(), 0);
+                        waitTime = (int) Math.max(deadline - System.currentTimeMillis(), 0);
                     }
                 }
             } else {
@@ -484,8 +451,8 @@
      * @return the response
      * @throws Exception
      */
-    public BlazeMessage sendRequest(MemberImpl member, Buffer destinationName,
-            BlazeMessage message, int timeout) throws Exception {
+    public BlazeMessage sendRequest(MemberImpl member, Buffer destinationName, BlazeMessage message, int timeout)
+            throws Exception {
         BlazeMessage result = null;
         if (member != null) {
             SendRequest request = new SendRequest();
@@ -514,8 +481,7 @@
      * @see org.apache.activeblaze.group.BlazeGroupChannel#sendReply(org.apache.activeblaze.group.Member,
      *      org.apache.activeblaze.BlazeMessage, java.lang.String)
      */
-    public void sendReply(Member to, BlazeMessage response, String correlationId)
-            throws Exception {
+    public void sendReply(Member to, BlazeMessage response, String correlationId) throws Exception {
         response.storeContent();
         BlazeData blazeData = response.getContent();
         PacketData data = getPacketData(blazeData.type(), blazeData);
@@ -526,15 +492,13 @@
         this.unicast.downStream(packet);
     }
 
-    protected void send(MemberImpl member, Buffer destination,
-            BlazeMessage message) throws Exception {
+    protected void send(MemberImpl member, Buffer destination, BlazeMessage message) throws Exception {
         message.storeContent();
         BlazeData blazeData = message.getContent();
         send(member, destination, blazeData);
     }
 
-    protected void send(MemberImpl member, Buffer destinationName,
-            BlazeData blazeData) throws Exception {
+    protected void send(MemberImpl member, Buffer destinationName, BlazeData blazeData) throws Exception {
         Destination dest = new Destination(destinationName, false);
         blazeData.clearDestinationData();
         blazeData.setDestinationData(dest.getData());
@@ -553,55 +517,61 @@
      * @see org.apache.activeblaze.group.BlazeGroupChannel#addBlazeQueueMessageListener(java.lang.String,
      *      org.apache.activeblaze.group.BlazeMessageListener)
      */
-    public void addBlazeQueueMessageListener(String destination,
-            BlazeMessageListener l) throws Exception {
+    public void addBlazeQueueMessageListener(String destination, BlazeMessageListener l) throws Exception {
         init();
-        Subscription key = new Subscription(destination, false);
-        this.queueMessageListenerMap.put(key, l);
+        SubscriptionHolder key = new SubscriptionHolder(destination, false,l);
+        
+            this.queueMessageListeners.add(key);
+       
         buildLocal();
     }
 
     /**
-     * @param key
+     * @param subscription 
      * @param l
      * @throws Exception
      * @see org.apache.activeblaze.group.BlazeGroupChannel#addBlazeQueueMessageListener(java.lang.String,
      *      org.apache.activeblaze.group.BlazeMessageListener)
      */
-    public void addBlazeQueueMessageListener(Subscription key,
-            BlazeMessageListener l) throws Exception {
+    public void addBlazeQueueMessageListener(Subscription subscription, BlazeMessageListener l) throws Exception {
         init();
-        this.queueMessageListenerMap.put(key, l);
+        SubscriptionHolder key = new SubscriptionHolder(subscription, l);
+            this.queueMessageListeners.add(key);
+       
         buildLocal();
     }
 
     /**
      * @param destination
-     * @return the removed <Code>BlazeMessageListener</Code>
+     * @param l
      * @throws Exception
      * @see org.apache.activeblaze.group.BlazeGroupChannel#removeBlazeQueueMessageListener(java.lang.String)
      */
-    public BlazeMessageListener removeBlazeQueueMessageListener(
-            String destination) throws Exception {
+    public void removeBlazeQueueMessageListener(String destination,BlazeMessageListener l) throws Exception {
         init();
-        Subscription key = new Subscription(destination, false);
-        BlazeMessageListener result = this.queueMessageListenerMap.remove(key);
+        SubscriptionHolder key = new SubscriptionHolder(destination, false,l);
+        
+       
+             this.queueMessageListeners.remove(key);
+       
         buildLocal();
-        return result;
+       
     }
 
     /**
-     * @param key
-     * @return the removed <Code>BlazeMessageListener</Code>
+     * @param subscription
+     * @param l
      * @throws Exception
      * @see org.apache.activeblaze.group.BlazeGroupChannel#removeBlazeQueueMessageListener(java.lang.String)
      */
-    public BlazeMessageListener removeBlazeQueueMessageListener(Subscription key)
-            throws Exception {
+    public void removeBlazeQueueMessageListener(Subscription subscription,BlazeMessageListener l) throws Exception {
         init();
-        BlazeMessageListener result = this.queueMessageListenerMap.remove(key);
+       
+        SubscriptionHolder key = new SubscriptionHolder(subscription, l);
+            this.queueMessageListeners.remove(key);
+       
         buildLocal();
-        return result;
+       
     }
 
     /**
@@ -611,8 +581,7 @@
      * @see org.apache.activeblaze.BlazeChannel#addBlazeTopicMessageListener(java.lang.String,
      *      org.apache.activeblaze.BlazeMessageListener)
      */
-    public void addBlazeTopicMessageListener(String destination,
-            BlazeMessageListener l) throws Exception {
+    public void addBlazeTopicMessageListener(String destination, BlazeMessageListener l) throws Exception {
         init();
         super.addBlazeTopicMessageListener(destination, l);
         buildLocal();
@@ -624,13 +593,11 @@
      * @throws Exception
      * @see org.apache.activeblaze.BlazeChannel#removeBlazeTopicMessageListener(java.lang.String)
      */
-    public BlazeMessageListener removeBlazeTopicMessageListener(
-            String destination) throws Exception {
+    public void removeBlazeTopicMessageListener(String destination,BlazeMessageListener l) throws Exception {
         init();
-        BlazeMessageListener result = super
-                .removeBlazeTopicMessageListener(destination);
+         super.removeBlazeTopicMessageListener(destination,l);
         buildLocal();
-        return result;
+       
     }
 
     /**
@@ -663,8 +630,7 @@
         return this.local.getGroups();
     }
 
-    protected void processData(String id, Buffer correlationId, PacketData data)
-            throws Exception {
+    protected void processData(String id, Buffer correlationId, PacketData data) throws Exception {
         if (isStarted()) {
             if (!processRequest(correlationId, data)) {
                 MessageType type = MessageType.valueOf(data.getType());
@@ -697,19 +663,23 @@
         if (message.getContent().getDestinationData().getTopic()) {
             dispatch(message);
         } else {
-            Buffer destinationName = message.getContent().getDestinationData()
-                    .getName();
-            if (this.inboxListener != null
-                    && this.producerId.equals(destinationName)) {
+            Buffer destinationName = message.getContent().getDestinationData().getName();
+            if (this.inboxListener != null && this.producerId.equals(destinationName)) {
                 this.inboxListener.onMessage(message);
             } else {
-                for (Map.Entry<Subscription, BlazeMessageListener> entry : this.queueMessageListenerMap
-                        .entrySet()) {
-                    if (entry.getKey().matches(destinationName)) {
-                        entry.getValue().onMessage(message);
-                        break;
+                
+                    int index=0;
+                    for (SubscriptionHolder entry : this.queueMessageListeners) {
+                        if (entry.getSubscription().matches(destinationName)) {
+                           entry.getListener().onMessage(message);
+                           this.queueMessageListeners.remove(index);
+                           this.queueMessageListeners.add(entry);
+                        }
+                        index++;
                     }
-                }
+                   
+               
+                
             }
         }
     }
@@ -737,8 +707,7 @@
      * @param message
      * @throws Exception
      */
-    public void broadcastMessage(MessageType messageType, Message<?> message)
-            throws Exception {
+    public void broadcastMessage(MessageType messageType, Message<?> message) throws Exception {
         PacketData data = getPacketData(messageType, message);
         data.setReliable(false);
         Packet packet = new Packet(data);
@@ -754,8 +723,8 @@
      * @param message
      * @throws Exception
      */
-    public void sendMessage(AsyncGroupRequest asyncRequest, MemberImpl member,
-            MessageType messageType, Message<?> message) throws Exception {
+    public void sendMessage(AsyncGroupRequest asyncRequest, MemberImpl member, MessageType messageType,
+            Message<?> message) throws Exception {
         SendRequest request = new SendRequest();
         PacketData data = getPacketData(messageType, message);
         asyncRequest.add(data.getMessageId(), request);
@@ -776,8 +745,7 @@
      * @param correlationId
      * @throws Exception
      */
-    public void broadcastMessage(MessageType messageType, Message<?> message,
-            String correlationId) throws Exception {
+    public void broadcastMessage(MessageType messageType, Message<?> message, String correlationId) throws Exception {
         PacketData data = getPacketData(messageType, message);
         data.setCorrelationId(new Buffer(correlationId));
         data.setReliable(true);
@@ -791,8 +759,7 @@
      * @param message
      * @throws Exception
      */
-    public void sendMessage(InetSocketAddress to, MessageType messageType,
-            Message<?> message) throws Exception {
+    public void sendMessage(InetSocketAddress to, MessageType messageType, Message<?> message) throws Exception {
         PacketData data = getPacketData(messageType, message);
         data.setReliable(false);
         Packet packet = new Packet(data);
@@ -807,8 +774,8 @@
      * @param correlationId
      * @throws Exception
      */
-    public void sendReply(MemberImpl to, MessageType messageType,
-            Message<?> message, String correlationId) throws Exception {
+    public void sendReply(MemberImpl to, MessageType messageType, Message<?> message, String correlationId)
+            throws Exception {
         PacketData data = getPacketData(messageType, message);
         data.setCorrelationId(new Buffer(correlationId));
         data.setReliable(false);
@@ -844,17 +811,18 @@
         if (isInitialized()) {
             try {
                 synchronized (this.localMutex) {
-                    MemberImpl result = new MemberImpl(getLocalMember()
-                            .getData().clone());
+                    MemberImpl result = new MemberImpl(getLocalMember().getData().clone());
                     result.getData().clearSubscriptionData();
                     // add topic destinations
-                    for (Subscription s : this.topicessageListenerMap.keySet()) {
-                        result.getData().addSubscriptionData(s.getData());
+                    for (SubscriptionHolder s : this.topicMessageListeners) {
+                        result.getData().addSubscriptionData(s.getSubscription().getData());
                     }
                     // add Queue Destinations
-                    for (Subscription s : this.queueMessageListenerMap.keySet()) {
-                        result.getData().addSubscriptionData(s.getData());
+                    
+                    for (SubscriptionHolder s : this.queueMessageListeners) {
+                        result.getData().addSubscriptionData(s.getSubscription().getData());
                     }
+                   
                     this.group.processMemberUpdate(this.local, result);
                     result.getData().setSubscriptionsChanged(true);
                     this.group.broadcastHeartBeat(result);

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupConfiguration.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupConfiguration.java?rev=744065&r1=744064&r2=744065&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupConfiguration.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupConfiguration.java Fri Feb 13 11:10:13 2009
@@ -23,7 +23,7 @@
  * 
  */
 public class BlazeGroupConfiguration extends BlazeConfiguration {
-    private int heartBeatInterval = 800;
+    private int heartBeatInterval = 1000;
     private String unicastURI = "udp://localhost:0";
     //reliability
     private String reliableUnicast ="swp";

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/simple/SimpleFlow.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/simple/SimpleFlow.java?rev=744065&r1=744064&r2=744065&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/simple/SimpleFlow.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/simple/SimpleFlow.java Fri Feb 13 11:10:13 2009
@@ -24,7 +24,7 @@
  * 
  */
 public class SimpleFlow extends DefaultChainedProcessor {
-    int maxWindowSize = 4 * 1024;
+    int maxWindowSize = 32 * 1024;
     int windowSize = 0;
     int pauseTime = 2;
 
@@ -34,6 +34,7 @@
             Thread.sleep(this.pauseTime);
             this.windowSize = 0;
         }
+        System.err.println("SIMPLE FLOW");
         super.downStream(p);
     }
 

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnection.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnection.java?rev=744065&r1=744064&r2=744065&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnection.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnection.java Fri Feb 13 11:10:13 2009
@@ -33,10 +33,10 @@
 import javax.jms.Topic;
 import javax.jms.TopicConnection;
 import javax.jms.TopicSession;
+import org.apache.activeblaze.BlazeMessageListener;
 import org.apache.activeblaze.Subscription;
 import org.apache.activeblaze.group.BlazeGroupChannel;
 import org.apache.activeblaze.util.IdGenerator;
-
 /**
  * Implementation of a JMS Connection
  * 
@@ -49,17 +49,13 @@
     private boolean clientIdSet;
     private ExceptionListener exceptionListener;
     private List<Session> sessions = new CopyOnWriteArrayList<Session>();
-    private final BlazeMessageDispatcher queueDispatcher;
-    private final BlazeMessageDispatcher topicDispatcher;
     private boolean closed;
-    private int consumerMaxDispatchQueueDepth=10000;
+    private int consumerMaxDispatchQueueDepth = 10000;
 
     protected BlazeJmsConnection(BlazeGroupChannel channel) {
         this.channel = channel;
         this.channel.setExceptionListener(this);
         this.clientId = channel.getName();
-        this.queueDispatcher = new BlazeQueueMessageDispatcher(this);
-        this.topicDispatcher = new BlazeTopicMessageDispatcher(this);
     }
 
     /**
@@ -86,8 +82,8 @@
      * @param maxMessages
      * @return ConnectionConsumer
      * @throws JMSException
-     * @see javax.jms.Connection#createConnectionConsumer(javax.jms.Destination, java.lang.String,
-     *      javax.jms.ServerSessionPool, int)
+     * @see javax.jms.Connection#createConnectionConsumer(javax.jms.Destination,
+     *      java.lang.String, javax.jms.ServerSessionPool, int)
      */
     public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector,
             ServerSessionPool sessionPool, int maxMessages) throws JMSException {
@@ -103,8 +99,9 @@
      * @param maxMessages
      * @return ConnectionConsumer
      * @throws JMSException
-     * @see javax.jms.Connection#createDurableConnectionConsumer(javax.jms.Topic, java.lang.String, java.lang.String,
-     *      javax.jms.ServerSessionPool, int)
+     * @see javax.jms.Connection#createDurableConnectionConsumer(javax.jms.Topic,
+     *      java.lang.String, java.lang.String, javax.jms.ServerSessionPool,
+     *      int)
      */
     public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName,
             String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
@@ -209,8 +206,8 @@
      * @param maxMessages
      * @return ConnectionConsumer
      * @throws JMSException
-     * @see javax.jms.TopicConnection#createConnectionConsumer(javax.jms.Topic, java.lang.String,
-     *      javax.jms.ServerSessionPool, int)
+     * @see javax.jms.TopicConnection#createConnectionConsumer(javax.jms.Topic,
+     *      java.lang.String, javax.jms.ServerSessionPool, int)
      */
     public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector,
             ServerSessionPool sessionPool, int maxMessages) throws JMSException {
@@ -240,8 +237,8 @@
      * @param maxMessages
      * @return ConnectionConsumer
      * @throws JMSException
-     * @see javax.jms.QueueConnection#createConnectionConsumer(javax.jms.Queue, java.lang.String,
-     *      javax.jms.ServerSessionPool, int)
+     * @see javax.jms.QueueConnection#createConnectionConsumer(javax.jms.Queue,
+     *      java.lang.String, javax.jms.ServerSessionPool, int)
      */
     public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector,
             ServerSessionPool sessionPool, int maxMessages) throws JMSException {
@@ -268,11 +265,10 @@
      * @param ex
      * @see org.apache.activeblaze.ExceptionListener#onException(java.lang.Exception)
      */
-    
     public void onException(Exception ex) {
-       onException(BlazeJmsExceptionSupport.create(ex));
+        onException(BlazeJmsExceptionSupport.create(ex));
     }
-    
+
     /**
      * @param ex
      * @see org.apache.activeblaze.ExceptionListener#onException(java.lang.Exception)
@@ -316,27 +312,44 @@
         }
     }
 
-    protected void addMesssageDispatcher(BlazeJmsConsumer consumer, Subscription s) throws JMSException {
-        BlazeMessageDispatcher dispatcher = s.isTopic() ? this.topicDispatcher : this.queueDispatcher;
-        dispatcher.add(consumer, s);
+    protected void addMesssageDispatcher(BlazeMessageListener consumer, Subscription s) throws JMSException {
+        try {
+            if (s.isTopic()) {
+                this.channel.addBlazeTopicMessageListener(s, consumer);
+            } else {
+                this.channel.addBlazeQueueMessageListener(s, consumer);
+            }
+        } catch (Exception e) {
+            throw BlazeJmsExceptionSupport.create(e);
+        }
     }
 
-    protected void removeMesssageDispatcher(BlazeJmsConsumer consumer, Subscription s) throws JMSException {
-        BlazeMessageDispatcher dispatcher = s.isTopic() ? this.topicDispatcher : this.queueDispatcher;
-        dispatcher.remove(consumer);
+    protected void removeMesssageDispatcher(BlazeMessageListener consumer, Subscription s) throws JMSException {
+        try {
+            if (s.isTopic()) {
+                this.channel.removeBlazeTopicMessageListener(s,consumer);
+            } else {
+                this.channel.removeBlazeQueueMessageListener(s,consumer);
+            }
+        } catch (Exception e) {
+            throw BlazeJmsExceptionSupport.create(e);
+        }
     }
 
     /**
      * Get the consumerMaxDispatchQueueDepth
+     * 
      * @return the consumerMaxDispatchQueueDepth
      */
     public int getConsumerMaxDispatchQueueDepth() {
-        return consumerMaxDispatchQueueDepth;
+        return this.consumerMaxDispatchQueueDepth;
     }
 
     /**
      * Set the consumerMaxDispatchQueueDepth
-     * @param consumerMaxDispatchQueueDepth the consumerMaxDispatchQueueDepth to set
+     * 
+     * @param consumerMaxDispatchQueueDepth
+     *            the consumerMaxDispatchQueueDepth to set
      */
     public void setConsumerMaxDispatchQueueDepth(int consumerMaxDispatchQueueDepth) {
         this.consumerMaxDispatchQueueDepth = consumerMaxDispatchQueueDepth;

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsMessageConsumer.java?rev=744065&r1=744064&r2=744065&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsMessageConsumer.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsMessageConsumer.java Fri Feb 13 11:10:13 2009
@@ -16,8 +16,11 @@
  */
 package org.apache.activeblaze.jms;
 
+import org.apache.activeblaze.BlazeMessage;
+import org.apache.activeblaze.BlazeMessageListener;
 import org.apache.activeblaze.Subscription;
 import org.apache.activeblaze.jms.message.BlazeJmsMessage;
+import org.apache.activeblaze.jms.message.BlazeJmsMessageTransformation;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -29,12 +32,11 @@
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
-
 /**
  * implementation of a Jms Message Consumer
  * 
  */
-public class BlazeJmsMessageConsumer implements MessageConsumer, BlazeJmsConsumer {
+public class BlazeJmsMessageConsumer implements MessageConsumer, BlazeMessageListener {
     protected final BlazeJmsSession session;
     protected final BlazeJmsDestination destination;
     protected final Subscription subscription = new Subscription();
@@ -44,21 +46,20 @@
     private final Lock lock = new ReentrantLock();
     private LinkedBlockingQueue<BlazeJmsMessage> dispatchQueue;
 
-    protected BlazeJmsMessageConsumer(BlazeJmsSession s, BlazeJmsDestination destination,int queueDepth) {
+    protected BlazeJmsMessageConsumer(BlazeJmsSession s, BlazeJmsDestination destination, int queueDepth) {
         this.session = s;
         this.destination = destination;
         this.subscription.setDestination(this.destination.getDestination().getData());
-        this.dispatchQueue= new LinkedBlockingQueue<BlazeJmsMessage>(queueDepth);
+        this.dispatchQueue = new LinkedBlockingQueue<BlazeJmsMessage>(queueDepth);
     }
 
     /**
-     * @throws JMSException 
+     * @throws JMSException
      * @see javax.jms.MessageConsumer#close()
      */
     public void close() throws JMSException {
         this.closed = true;
         this.session.remove(this);
-       
     }
 
     protected Subscription getSubscription() {
@@ -104,7 +105,7 @@
     public Message receive(long timeout) throws JMSException {
         checkClosed();
         try {
-            return this.dispatchQueue.poll(timeout,TimeUnit.MILLISECONDS);
+            return this.dispatchQueue.poll(timeout, TimeUnit.MILLISECONDS);
         } catch (InterruptedException e) {
             throw BlazeJmsExceptionSupport.create(e);
         }
@@ -117,8 +118,8 @@
      */
     public Message receiveNoWait() throws JMSException {
         checkClosed();
-        Message result =  this.dispatchQueue.peek();
-        if (result != null){
+        Message result = this.dispatchQueue.peek();
+        if (result != null) {
             this.dispatchQueue.remove(result);
         }
         return result;
@@ -132,17 +133,17 @@
     public void setMessageListener(MessageListener listener) throws JMSException {
         checkClosed();
         this.lock.lock();
-        try{
-        this.messageListener = listener;
-        if (!this.dispatchQueue.isEmpty() && this.messageListener != null){
-            List<BlazeJmsMessage> drain = new ArrayList<BlazeJmsMessage>(this.dispatchQueue.size());
-            this.dispatchQueue.drainTo(drain);
-            for (BlazeJmsMessage m:drain){
-                this.messageListener.onMessage(m);
+        try {
+            this.messageListener = listener;
+            if (!this.dispatchQueue.isEmpty() && this.messageListener != null) {
+                List<BlazeJmsMessage> drain = new ArrayList<BlazeJmsMessage>(this.dispatchQueue.size());
+                this.dispatchQueue.drainTo(drain);
+                for (BlazeJmsMessage m : drain) {
+                    this.messageListener.onMessage(m);
+                }
+                drain.clear();
             }
-            drain.clear();
-        }
-        }finally{
+        } finally {
             this.lock.unlock();
         }
     }
@@ -169,14 +170,30 @@
      */
     public void onMessage(BlazeJmsMessage message) {
         this.lock.lock();
-        try{
-        if (this.messageListener != null) {
-            this.messageListener.onMessage(message);
-        }else{
-            this.dispatchQueue.add(message);
-        }
-        }finally{
+        try {
+            if (this.messageListener != null) {
+                this.messageListener.onMessage(message);
+            } else {
+                this.dispatchQueue.add(message);
+            }
+        } finally {
             lock.unlock();
         }
     }
+
+    public void onMessage(BlazeMessage message) {
+        this.lock.lock();
+        try {
+            BlazeJmsMessage result = BlazeJmsMessageTransformation.transformMessage(message);
+            if (this.messageListener != null) {
+                this.messageListener.onMessage(result);
+            } else {
+                this.dispatchQueue.add(result);
+            }
+        } catch (JMSException e) {
+            this.session.onException(e);
+        } finally {
+            this.lock.unlock();
+        }
+    }
 }

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsSession.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsSession.java?rev=744065&r1=744064&r2=744065&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsSession.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsSession.java Fri Feb 13 11:10:13 2009
@@ -21,6 +21,7 @@
 import java.util.concurrent.CopyOnWriteArrayList;
 import javax.jms.BytesMessage;
 import javax.jms.Destination;
+import javax.jms.ExceptionListener;
 import javax.jms.IllegalStateException;
 import javax.jms.JMSException;
 import javax.jms.MapMessage;
@@ -529,6 +530,18 @@
     protected void remove(MessageProducer producer) {
         this.producers.remove(producer);
     }
+    
+    protected void onException(Exception ex) {
+        this.connection.onException(ex);
+     }
+     
+     /**
+      * @param ex
+      * @see org.apache.activeblaze.ExceptionListener#onException(java.lang.Exception)
+      */
+     protected void onException(JMSException ex) {
+         this.connection.onException(ex);
+     }
 
     protected void send(Destination dest, Message msg, int deliveryMode, int priority, long timeToLive)
             throws JMSException {

Modified: activemq/activemq-blaze/trunk/src/main/proto/blaze.proto
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/proto/blaze.proto?rev=744065&r1=744064&r2=744065&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/proto/blaze.proto (original)
+++ activemq/activemq-blaze/trunk/src/main/proto/blaze.proto Fri Feb 13 11:10:13 2009
@@ -52,6 +52,27 @@
 	  
     }
     
+     message BlazeData {
+      //| option java_implments = "org.apache.activeblaze.impl.processor.PacketMessageType";
+    //| option java_type_method = "MessageType";
+     optional bool persistent = 1;
+     optional int32 priority = 2;
+     optional int32 redeliveryCounter = 3;
+     optional int32 type =4;
+     optional int64 timestamp = 5;
+     optional int64 expiration = 6;
+     optional bytes messageId = 7;
+     optional bytes correlationId = 8;
+     optional bytes fromId =9;
+     optional bytes messageType = 10;
+     optional bytes payload = 11;
+     optional DestinationData destinationData = 12;  
+     optional DestinationData replyToData = 13;  
+     optional MapData mapData = 14;
+     optional bytes payload = 15;
+      
+    }
+    
     message AckData {
      //| option java_implments = "org.apache.activeblaze.impl.processor.PacketMessageType";
        //| option java_type_method = "MessageType";
@@ -234,28 +255,4 @@
       
     }
     
-    
-    
-    message BlazeData {
-      //| option java_implments = "org.apache.activeblaze.impl.processor.PacketMessageType";
-    //| option java_type_method = "MessageType";
-     optional bool persistent = 1;
-     optional int32 priority = 2;
-     optional int32 redeliveryCounter = 3;
-     optional int32 type =4;
-     optional int64 timestamp = 5;
-     optional int64 expiration = 6;
-     optional bytes messageId = 7;
-     optional bytes correlationId = 8;
-     optional bytes fromId =9;
-     optional bytes messageType = 10;
-     optional bytes payload = 11;
-     optional DestinationData destinationData = 12;  
-     optional DestinationData replyToData = 13;  
-     optional MapData mapData = 14;
-     optional bytes payload = 15;
-      
-    }
-
-	
-        
\ No newline at end of file
+       
\ No newline at end of file