You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2013/07/19 20:44:24 UTC

svn commit: r1504961 [4/11] - in /activemq/activemq-blaze/trunk: ./ src/main/java/org/apache/activeblaze/ src/main/java/org/apache/activeblaze/cluster/ src/main/java/org/apache/activeblaze/group/ src/main/java/org/apache/activeblaze/impl/destination/ s...

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java Fri Jul 19 18:44:21 2013
@@ -33,10 +33,7 @@ import org.apache.activeblaze.Processor;
 import org.apache.activeblaze.Subscription;
 import org.apache.activeblaze.SubscriptionHolder;
 import org.apache.activeblaze.impl.processor.ChainedProcessor;
-import org.apache.activeblaze.impl.processor.CompressionProcessor;
 import org.apache.activeblaze.impl.processor.DefaultChainedProcessor;
-import org.apache.activeblaze.impl.processor.FragmentationProcessor;
-import org.apache.activeblaze.impl.processor.Packet;
 import org.apache.activeblaze.impl.reliable.ReliableFactory;
 import org.apache.activeblaze.impl.transport.BaseTransport;
 import org.apache.activeblaze.impl.transport.TransportFactory;
@@ -44,52 +41,42 @@ import org.apache.activeblaze.util.Async
 import org.apache.activeblaze.util.LRUCache;
 import org.apache.activeblaze.util.PropertyUtil;
 import org.apache.activeblaze.util.SendRequest;
-import org.apache.activeblaze.wire.MessageType;
-import org.apache.activeblaze.wire.PacketData;
-import org.apache.activeblaze.wire.BlazeData.BlazeDataBuffer;
-import org.apache.activeblaze.wire.MemberData.MemberDataBean;
-import org.apache.activeblaze.wire.MemberData.MemberDataBuffer;
-import org.apache.activeblaze.wire.PacketData.PacketDataBean;
-import org.apache.activeblaze.wire.PacketData.PacketDataBuffer;
-import org.apache.activemq.protobuf.Buffer;
-import org.apache.activemq.protobuf.MessageBuffer;
+import org.apache.activeblaze.wire.Buffer;
+import org.apache.activeblaze.wire.MemberImpl;
+import org.apache.activeblaze.wire.Packet;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 /**
- * <P>
- * A <CODE>BlazeGroupChannel</CODE> enables peer-based point to point communication
- * 
+ * <p/>
+ * A <CODE>BlazeGroupChannel</CODE> enables peer-based point to point
+ * communication
  */
-public class BlazeGroupChannelImpl extends BlazeChannelImpl implements BlazeGroupChannel{
+public class BlazeGroupChannelImpl extends BlazeChannelImpl implements BlazeGroupChannel {
     private static final Log LOG = LogFactory.getLog(BlazeGroupChannelImpl.class);
     private String name;
     protected Processor unicast;
     private MemberImpl local;
     private BlazeMessageListener inboxListener;
-    protected Map<Buffer, SendRequest<PacketDataBuffer>> messageRequests = new LRUCache<Buffer, SendRequest<PacketDataBuffer>>(
-            10000);
+    protected Map<String, SendRequest> messageRequests = new LRUCache<String, SendRequest>(10000);
     private final List<SubscriptionHolder> queueMessageListeners = new CopyOnWriteArrayList<SubscriptionHolder>();
     private Group group;
     protected Buffer inboxAddress;
     protected int inBoxPort;
     protected final Object localMutex = new Object();
-    
+
     /**
      * Constructor
-     * 
-     * @param name
      */
     protected BlazeGroupChannelImpl(String name) {
         super();
         this.name = name;
     }
-    
+
     /**
-     * @throws Exception
      * @see org.apache.activeblaze.Service#init()
      */
-    public void doInit() throws Exception{
+    public void doInit() throws Exception {
         super.doInit();
         String unicastURIStr = getConfiguration().getUnicastURI();
         unicastURIStr = PropertyUtil.addPropertiesToURIFromBean(unicastURIStr, getConfiguration());
@@ -107,40 +94,34 @@ public class BlazeGroupChannelImpl exten
         this.local = createLocal(unicastURI);
         this.group = createGroup();
     }
-    
-    protected final Processor configureProcess(ChainedProcessor transport,String reliability) throws Exception{
+
+    protected final ChainedProcessor configureProcess(ChainedProcessor transport, String reliability) throws Exception {
         int maxPacketSize = getConfiguration().getMaxPacketSize();
-        CompressionProcessor result = new CompressionProcessor();
+        ChainedProcessor result = getReliability(reliability);
         result.setPrev(this);
         result.setExceptionListener(this);
         result.setMaxPacketSize(maxPacketSize);
-        FragmentationProcessor fp = new FragmentationProcessor();
-        fp.setMaxPacketSize(maxPacketSize);
-        result.setEnd(fp);
-        ChainedProcessor reliable = getReliability(reliability);
-        result.setEnd(reliable);
         result.setEnd(transport);
         return result;
     }
-    
-    protected ChainedProcessor getReliability(String reliability) throws Exception{
+
+    protected ChainedProcessor getReliability(String reliability) throws Exception {
         DefaultChainedProcessor reliable = ReliableFactory.get(reliability);
         return reliable;
     }
-    
-    protected MemberImpl createLocal(URI uri) throws Exception{
+
+    protected MemberImpl createLocal(URI uri) throws Exception {
         return new MemberImpl(getId(), getName(), 0, 0, uri);
     }
-    
-    protected Group createGroup(){
+
+    protected Group createGroup() {
         return new Group(this);
     }
-    
+
     /**
-     * @throws Exception
      * @see org.apache.activeblaze.Service#shutDown()
      */
-    public void doShutDown() throws Exception{
+    public void doShutDown() throws Exception {
         super.doShutDown();
         if (this.group != null) {
             this.group.shutDown();
@@ -149,43 +130,40 @@ public class BlazeGroupChannelImpl exten
             this.unicast.shutDown();
         }
     }
-    
+
     /**
-     * @throws Exception
      * @see org.apache.activeblaze.Service#start()
      */
-    public void doStart() throws Exception{
+    public void doStart() throws Exception {
         super.doStart();
         this.unicast.start();
         this.group.start();
     }
-    
+
     /**
-     * @throws Exception
      * @see org.apache.activeblaze.Service#stop()
      */
-    public void doStop() throws Exception{
+    public void doStop() throws Exception {
         super.doStop();
         this.group.stop();
         this.unicast.stop();
     }
-    
+
     /**
      * @return the name
      */
-    public String getName(){
+    public String getName() {
         synchronized (this.localMutex) {
             return this.name;
         }
     }
-    
+
     /**
      * set the name
-     * 
-     * @param name
+     *
      * @see org.apache.activeblaze.group.BlazeGroupChannel#setName(java.lang.String)
      */
-    public void setName(String name){
+    public void setName(String name) {
         synchronized (this.localMutex) {
             this.name = name;
             if (this.local != null) {
@@ -193,131 +171,115 @@ public class BlazeGroupChannelImpl exten
             }
         }
     }
-    
+
     /**
      * @return the inboxListener
      */
-    public BlazeMessageListener getInboxListener(){
+    public BlazeMessageListener getInboxListener() {
         return this.inboxListener;
     }
-    
+
     /**
      * @param inboxListener the inboxListener to set
      */
-    public void setInboxListener(BlazeMessageListener inboxListener){
+    public void setInboxListener(BlazeMessageListener inboxListener) {
         this.inboxListener = inboxListener;
     }
-    
+
     /**
      * @return this channel's configuration
      * @see org.apache.activeblaze.group.BlazeGroupChannel#getConfiguration()
      */
-    public BlazeGroupConfiguration getConfiguration(){
+    public BlazeGroupConfiguration getConfiguration() {
         return (BlazeGroupConfiguration) this.configuration;
     }
-    
+
     /**
      * @return the member for this channel
      * @see org.apache.activeblaze.group.BlazeGroupChannel#getLocalMember()
      */
-    public MemberImpl getLocalMember(){
+    public MemberImpl getLocalMember() {
         synchronized (this.localMutex) {
             return this.local;
         }
     }
-    
-    protected void setLocalMember(MemberImpl local){
+
+    protected void setLocalMember(MemberImpl local) {
         synchronized (this.localMutex) {
             this.local = local;
         }
     }
-    
+
     /**
-     * @param l
-     * @throws Exception
      * @see org.apache.activeblaze.group.BlazeGroupChannel#addMemberChangedListener(org.apache.activeblaze.group.MemberChangedListener)
      */
-    public void addMemberChangedListener(MemberChangedListener l) throws Exception{
+    public void addMemberChangedListener(MemberChangedListener l) throws Exception {
         init();
         this.group.addMemberChangedListener(l);
     }
-    
+
     /**
-     * @param l
-     * @throws Exception
      * @see org.apache.activeblaze.group.BlazeGroupChannel#removeMemberChangedListener(org.apache.activeblaze.group.MemberChangedListener)
      */
-    public void removeMemberChangedListener(MemberChangedListener l) throws Exception{
+    public void removeMemberChangedListener(MemberChangedListener l) throws Exception {
         init();
         this.group.removeMemberChangedListener(l);
     }
-    
+
     /**
-     * @param id
      * @return the Member
-     * @throws Exception
      * @see org.apache.activeblaze.group.BlazeGroupChannel#getMemberById(java.lang.String)
      */
-    public Member getMemberById(String id) throws Exception{
+    public Member getMemberById(String id) throws Exception {
         init();
         return this.group.getMemberById(id);
     }
-    
+
     /**
-     * @param name
      * @return the Member
-     * @throws Exception
      * @see org.apache.activeblaze.group.BlazeGroupChannel#getMemberByName(java.lang.String)
      */
-    public Member getMemberByName(String name) throws Exception{
+    public Member getMemberByName(String memberName) throws Exception {
         init();
-        return this.group.getMemberByName(name);
+        return this.group.getMemberByName(memberName);
     }
-    
+
     /**
      * Will wait for a member to advertise itself if not available
-     * 
-     * @param name
-     * @param timeout
+     *
      * @return the member or null
-     * @throws Exception
      */
-    public Member getAndWaitForMemberByName(String name,int timeout) throws Exception{
+    public Member getAndWaitForMemberByName(String memberName, int timeout) throws Exception {
         init();
-        return this.group.getAndWaitForMemberByName(name, timeout);
+        return this.group.getAndWaitForMemberByName(memberName, timeout);
     }
-    
+
     /**
      * @return the members
-     * @throws Exception
      * @see org.apache.activeblaze.group.BlazeGroupChannel#getMembers()
      */
-    public Set<Member> getMembers() throws Exception{
+    public Set<Member> getMembers() throws Exception {
         init();
         return this.group.getMembers();
     }
-    
+
     /**
      * Send a message to a member of the group - in a round-robin fashion
-     * 
-     * @param destination
-     * @param message
-     * @throws Exception
-     * @see org.apache.activeblaze.group.BlazeGroupChannel#send(java.lang.String, org.apache.activeblaze.BlazeMessage)
+     *
+     * @see org.apache.activeblaze.group.BlazeGroupChannel#send(java.lang.String,
+     *      org.apache.activeblaze.BlazeMessage)
      */
-    public void send(String destination,BlazeMessage message) throws Exception{
-        send(new Destination(destination, false), message);
+    public void send(String destination, BlazeMessage message) throws Exception {
+        send(new Destination(new Buffer(destination), false), message);
     }
-    
+
     /**
      * Send a message to a member of the group - in a round-robin fashion
-     * 
-     * @param destination
-     * @param message
-     * @throws Exception
-     * @see org.apache.activeblaze.group.BlazeGroupChannel#send(java.lang.String, org.apache.activeblaze.BlazeMessage)
+     *
+     * @see org.apache.activeblaze.group.BlazeGroupChannel#send(java.lang.String,
+     *      org.apache.activeblaze.BlazeMessage)
      */
-    public void send(Destination destination,BlazeMessage message) throws Exception{
+    public void send(Destination destination, BlazeMessage message) throws Exception {
         while (true) {
             MemberImpl member = getQueueDestination(destination.getName());
             if (member != null) {
@@ -332,90 +294,66 @@ public class BlazeGroupChannelImpl exten
             }
         }
     }
-    
+
     /**
-     * @param member
-     * @param message
-     * @throws Exception
      * @see org.apache.activeblaze.group.BlazeGroupChannel#send(org.apache.activeblaze.group.Member,
      *      org.apache.activeblaze.BlazeMessage)
      */
-    public void send(Member member,BlazeMessage message) throws Exception{
-        send((MemberImpl) member, new Buffer(member.getInBoxDestination()), message);
+    public void send(Member member, BlazeMessage message) throws Exception {
+        send((MemberImpl) member, member.getInBoxDestination(), message);
     }
-    
+
     /**
-     * @param member
-     * @param message
      * @return the response
-     * @throws Exception
      * @see org.apache.activeblaze.group.BlazeGroupChannel#sendRequest(org.apache.activeblaze.group.Member,
      *      org.apache.activeblaze.BlazeMessage)
      */
-    public BlazeMessage sendRequest(Member member,BlazeMessage message) throws Exception{
+    public BlazeMessage sendRequest(Member member, BlazeMessage message) throws Exception {
         return sendRequest(member, message, 0);
     }
-    
+
     /**
-     * @param member
-     * @param message
-     * @param timeout
      * @return the response
-     * @throws Exception
      * @see org.apache.activeblaze.group.BlazeGroupChannel#sendRequest(org.apache.activeblaze.group.Member,
      *      org.apache.activeblaze.BlazeMessage, int)
      */
-    public BlazeMessage sendRequest(Member member,BlazeMessage message,int timeout) throws Exception{
-        return sendRequest((MemberImpl) member, new Buffer(member.getInBoxDestination()), message, timeout);
+    public BlazeMessage sendRequest(Member member, BlazeMessage message, int timeout) throws Exception {
+        return sendRequest((MemberImpl) member, member.getInBoxDestination(), message, timeout);
     }
-    
+
     /**
-     * @param destination
-     * @param message
      * @return the response
-     * @throws Exception
      * @see org.apache.activeblaze.group.BlazeGroupChannel#sendRequest(java.lang.String,
      *      org.apache.activeblaze.BlazeMessage)
      */
-    public BlazeMessage sendRequest(String destination,BlazeMessage message) throws Exception{
+    public BlazeMessage sendRequest(String destination, BlazeMessage message) throws Exception {
         return sendRequest(new Destination(destination, false), message, 0);
     }
-    
+
     /**
-     * @param destination
-     * @param message
      * @return the response
-     * @throws Exception
      * @see org.apache.activeblaze.group.BlazeGroupChannel#sendRequest(java.lang.String,
      *      org.apache.activeblaze.BlazeMessage)
      */
-    public BlazeMessage sendRequest(Destination destination,BlazeMessage message) throws Exception{
+    public BlazeMessage sendRequest(Destination destination, BlazeMessage message) throws Exception {
         return sendRequest(destination, message, 0);
     }
-    
+
     /**
-     * @param destination
-     * @param message
-     * @param timeout
      * @return the request
-     * @throws Exception
      * @see org.apache.activeblaze.group.BlazeGroupChannel#sendRequest(java.lang.String,
      *      org.apache.activeblaze.BlazeMessage, int)
      */
-    public BlazeMessage sendRequest(String destination,BlazeMessage message,int timeout) throws Exception{
+    public BlazeMessage sendRequest(String destination, BlazeMessage message, int timeout) throws Exception {
         return sendRequest(new Destination(destination, false), message, timeout);
     }
-    
+
     /**
-     * @param destination
-     * @param message
-     * @param timeout
      * @return the response
-     * @throws Exception
      * @see org.apache.activeblaze.group.BlazeGroupChannel#sendRequest(java.lang.String,
      *      org.apache.activeblaze.BlazeMessage, int)
      */
-    public BlazeMessage sendRequest(Destination destination,BlazeMessage message,int timeout) throws Exception{
+    public BlazeMessage sendRequest(Destination destination, BlazeMessage message, int timeout) throws Exception {
         Buffer key = destination.getName();
         long deadline = 0;
         long waitTime = timeout;
@@ -426,7 +364,7 @@ public class BlazeGroupChannelImpl exten
             MemberImpl member = getQueueDestination(key);
             if (member != null) {
                 try {
-                    BlazeMessage result = sendRequest(member, key, message, (int) waitTime);
+                    BlazeMessage result = sendRequest(member, destination, message, (int) waitTime);
                     if (result != null) {
                         return result;
                     }
@@ -442,334 +380,275 @@ public class BlazeGroupChannelImpl exten
         }
         return null;
     }
-    
+
     /**
      * send Request
-     * 
-     * @param member
-     * @param destinationName
-     * @param message
-     * @param timeout
+     *
      * @return the response
-     * @throws Exception
      */
-    public BlazeMessage sendRequest(MemberImpl member,Buffer destinationName,BlazeMessage message,int timeout)
-            throws Exception{
+    public BlazeMessage sendRequest(MemberImpl member, String destinationName, BlazeMessage message, int timeout)
+            throws Exception {
+        return sendRequest(member, new Destination(destinationName, false), message, timeout);
+    }
+
+    /**
+     * send Request
+     *
+     * @return the response
+     */
+    public BlazeMessage sendRequest(MemberImpl member, Destination dest, BlazeMessage message, int timeout)
+            throws Exception {
         BlazeMessage result = null;
         if (member != null) {
-            SendRequest<PacketDataBuffer> request = new SendRequest<PacketDataBuffer>();
-            Destination dest = new Destination(destinationName, false);
-            Packet packet = buildPacket(dest, message);
+            SendRequest request = new SendRequest();
+            preProcessMessage(((MemberImpl) member).getAddress(), dest, message);
             synchronized (this.messageRequests) {
-                this.messageRequests.put(packet.getPacketData().getMessageId(), request);
+                this.messageRequests.put(message.getId(), request);
             }
-           
-            packet.setTo((member).getAddress());
-            this.unicast.downStream(packet);
-            PacketDataBuffer response = request.get(timeout);
-            result = buildBlazeMessage(response);
+            this.unicast.downStream(message);
+            result = (BlazeMessage) request.get(timeout);
         }
         return result;
     }
-    
+
     /**
-     * @param to
-     * @param response
-     * @param correlationId
-     * @throws Exception
      * @see org.apache.activeblaze.group.BlazeGroupChannel#sendReply(org.apache.activeblaze.group.Member,
      *      org.apache.activeblaze.BlazeMessage, java.lang.String)
      */
-    public void sendReply(Member to,BlazeMessage response,String correlationId) throws Exception{
+    public void sendReply(Member to, BlazeMessage response, String correlationId) throws Exception {
         Destination dest = new Destination(to.getInBoxDestination(), false);
-        Packet packet = buildPacket(dest,response,correlationId);
-        packet.setTo(((MemberImpl) to).getAddress());
-        this.unicast.downStream(packet);
+        preProcessMessage(((MemberImpl) to).getAddress(), dest, response);
+        response.setCorrelationId(correlationId);
+        this.unicast.downStream(response);
     }
-    
-    protected void send(MemberImpl member,Buffer destinationName,BlazeMessage message) throws Exception{
+
+    protected void send(MemberImpl member, String destinationName, BlazeMessage message) throws Exception {
+        send(member, new Buffer(destinationName), message);
+    }
+
+    protected void send(MemberImpl member, Buffer destinationName, BlazeMessage message) throws Exception {
         Destination dest = new Destination(destinationName, false);
-        Packet packet = buildPacket(dest, message,true);
-        packet.setTo(member.getAddress());
-        this.unicast.downStream(packet);
+        preProcessMessage(member.getAddress(), dest, message);
+        this.unicast.downStream(message);
+    }
+
+    protected void preProcessMessage(InetSocketAddress to, Destination destination, BlazeMessage message) {
+        preProcessMessage(destination, message);
+        message.setTo(to);
     }
-    
+
     /**
-     * @param destination
-     * @param l
-     * @throws Exception
      * @see org.apache.activeblaze.group.BlazeGroupChannel#addBlazeQueueMessageListener(java.lang.String,
-     *      org.apache.activeblaze.group.BlazeMessageListener)
+     *      BlazeMessageListener)
      */
-    public void addBlazeQueueMessageListener(String destination,BlazeMessageListener l) throws Exception{
+    public void addBlazeQueueMessageListener(String destination, BlazeMessageListener l) throws Exception {
         init();
         SubscriptionHolder key = new SubscriptionHolder(destination, false, l);
-        
         this.queueMessageListeners.add(key);
-        
         buildLocal();
     }
-    
+
     /**
-     * @param subscription
-     * @param l
-     * @throws Exception
      * @see org.apache.activeblaze.group.BlazeGroupChannel#addBlazeQueueMessageListener(java.lang.String,
-     *      org.apache.activeblaze.group.BlazeMessageListener)
+     *      BlazeMessageListener)
      */
-    public void addBlazeQueueMessageListener(Subscription subscription,BlazeMessageListener l) throws Exception{
+    public void addBlazeQueueMessageListener(Subscription subscription, BlazeMessageListener l) throws Exception {
         init();
         SubscriptionHolder key = new SubscriptionHolder(subscription, l);
         this.queueMessageListeners.add(key);
-        
         buildLocal();
     }
-    
+
     /**
-     * @param destination
-     * @param l
-     * @throws Exception
-     * @see org.apache.activeblaze.group.BlazeGroupChannel#removeBlazeQueueMessageListener(java.lang.String)
+     * @see org.apache.activeblaze.group.BlazeGroupChannel#removeBlazeQueueMessageListener(java.lang.String, BlazeMessageListener)
      */
-    public void removeBlazeQueueMessageListener(String destination,BlazeMessageListener l) throws Exception{
+    public void removeBlazeQueueMessageListener(String destination, BlazeMessageListener l) throws Exception {
         init();
         SubscriptionHolder key = new SubscriptionHolder(destination, false, l);
-        
         this.queueMessageListeners.remove(key);
-        
         buildLocal();
-        
     }
-    
+
     /**
-     * @param subscription
-     * @param l
-     * @throws Exception
-     * @see org.apache.activeblaze.group.BlazeGroupChannel#removeBlazeQueueMessageListener(java.lang.String)
+     * @see org.apache.activeblaze.group.BlazeGroupChannel#removeBlazeQueueMessageListener(java.lang.String, BlazeMessageListener)
      */
-    public void removeBlazeQueueMessageListener(Subscription subscription,BlazeMessageListener l) throws Exception{
+    public void removeBlazeQueueMessageListener(Subscription subscription, BlazeMessageListener l) throws Exception {
         init();
-        
         SubscriptionHolder key = new SubscriptionHolder(subscription, l);
         this.queueMessageListeners.remove(key);
-        
         buildLocal();
-        
     }
-    
+
     /**
-     * @param destination
-     * @param l
-     * @throws Exception
      * @see org.apache.activeblaze.BlazeChannel#addBlazeTopicMessageListener(java.lang.String,
      *      org.apache.activeblaze.BlazeMessageListener)
      */
-    public void addBlazeTopicMessageListener(String destination,BlazeMessageListener l) throws Exception{
+    public void addBlazeTopicMessageListener(String destination, BlazeMessageListener l) throws Exception {
         init();
         super.addBlazeTopicMessageListener(destination, l);
         buildLocal();
     }
-    
+
     /**
-     * @param destination
-     * @return the removed <Code>BlazeTopicListener</Code>
-     * @throws Exception
-     * @see org.apache.activeblaze.BlazeChannel#removeBlazeTopicMessageListener(java.lang.String)
+     * @see org.apache.activeblaze.BlazeChannel#removeBlazeTopicMessageListener(java.lang.String, BlazeMessageListener)
      */
-    public void removeBlazeTopicMessageListener(String destination,BlazeMessageListener l) throws Exception{
+    public void removeBlazeTopicMessageListener(String destination, BlazeMessageListener l) throws Exception {
         init();
         super.removeBlazeTopicMessageListener(destination, l);
         buildLocal();
-        
     }
-    
+
     /**
-     * @param groupName
-     * @throws Exception
      * @see org.apache.activeblaze.group.BlazeGroupChannel#addToGroup(java.lang.String)
      */
-    public void addToGroup(String groupName) throws Exception{
+    public void addToGroup(String groupName) throws Exception {
         init();
         this.local.addToGroup(groupName);
     }
-    
+
     /**
-     * @param groupName
-     * @throws Exception
      * @see org.apache.activeblaze.group.BlazeGroupChannel#removeFromGroup(java.lang.String)
      */
-    public void removeFromGroup(String groupName) throws Exception{
+    public void removeFromGroup(String groupName) throws Exception {
         init();
         this.local.removeFromGroup(groupName);
     }
-    
+
     /**
      * @return the groups
-     * @throws Exception
      * @see org.apache.activeblaze.group.BlazeGroupChannel#getGroups()
      */
-    public List<String> getGroups() throws Exception{
+    public List<String> getGroups() throws Exception {
         init();
         return this.local.getGroups();
     }
-    
-    protected void processData(String id,Buffer correlationId,PacketDataBuffer data) throws Exception{
+
+    protected void processPacket(Packet packet) throws Exception {
         if (isStarted()) {
-            if (!processRequest(correlationId, data)) {
-                MessageType type = data.getMessageType();
-                if (type == MessageType.BLAZE_DATA) {
-                    doProcessBlazeData(data);
-                } else if (type == MessageType.MEMBER_DATA) {
-                    doProcessMemberData(data);
+            if (!processRequest(packet)) {
+                if (packet instanceof BlazeMessage) {
+                    doProcessBlazeMessage((BlazeMessage) packet);
+                } else if (packet instanceof Member) {
+                    doProcessMember((MemberImpl) packet);
                 }
             }
         }
     }
-    
-    protected boolean processRequest(Buffer correlationId,PacketDataBuffer value){
+
+    protected boolean processRequest(Packet packet) {
         boolean result = false;
-        if (correlationId != null) {
-            SendRequest<PacketDataBuffer> request = null;
+        String correlationId = packet.getCorrelationId();
+        if (correlationId != null && correlationId.length() > 0) {
+            SendRequest request = null;
             synchronized (this.messageRequests) {
                 request = this.messageRequests.remove(correlationId);
             }
             if (request != null) {
-                request.put(correlationId, value);
+                request.put(correlationId, packet);
                 result = true;
             }
         }
         return result;
     }
-    
-    protected void doProcessBlazeData(PacketData data) throws Exception{
-        
-        if (data.hasDestinationData()&&data.getDestinationData().getTopic()) {
-            dispatch(data);
-        } else {
-            
-            Buffer destinationName = data.getDestinationData().getName();
-            BlazeMessage message = buildBlazeMessage(data);
-            if (this.inboxListener != null && this.producerId.equals(destinationName)) {
-                this.inboxListener.onMessage(message);
+
+    protected void doProcessBlazeMessage(BlazeMessage message) throws Exception {
+        Destination destination = message.getDestination();
+        if (destination != null) {
+            if (destination.isTopic()) {
+                dispatch(message);
             } else {
-                
-                int index = 0;
-                for (SubscriptionHolder entry : this.queueMessageListeners) {
-                    if (entry.getSubscription().matches(destinationName)) {
-                        entry.getListener().onMessage(message);
-                        this.queueMessageListeners.remove(index);
-                        this.queueMessageListeners.add(entry);
+                if (this.inboxListener != null && this.producerId.equals(destination.getName())) {
+                    this.inboxListener.onMessage(message);
+                } else {
+                    int index = 0;
+                    for (SubscriptionHolder entry : this.queueMessageListeners) {
+                        if (entry.getSubscription().matches(destination.getName())) {
+                            entry.getListener().onMessage(message);
+                            this.queueMessageListeners.remove(index);
+                            this.queueMessageListeners.add(entry);
+                        }
+                        index++;
                     }
-                    index++;
                 }
-                
             }
         }
     }
-    
-    protected Group getGroup(){
+
+    protected Group getGroup() {
         return this.group;
     }
-    
-    protected BlazeMessage createMessage(String fromId){
+
+    protected BlazeMessage createMessage(String fromId) {
         Member member = this.group.getMemberById(fromId);
         BlazeMessage message = new BlazeGroupMessage(member);
         return message;
     }
-    
-    protected final void doProcessMemberData(PacketData data) throws Exception{
-        Buffer payload = data.getPayload();
-        MemberDataBuffer memberData = MemberDataBuffer.parseUnframed(payload);
-        this.group.processMember(memberData);
+
+    protected final void doProcessMember(MemberImpl member) throws Exception {
+        this.group.processMember(member);
     }
-    
+
     /**
-     * @param messageType
-     * @param message
+     *
+     * @param packet
      * @throws Exception
      */
-    public void broadcastMessage(MessageType messageType,MessageBuffer message) throws Exception{
-        PacketDataBean data = getPacketData(messageType, message);
-        data.setReliable(false);
-        Packet packet = new Packet(data.freeze());
+    public void broadcastManagementMessage(Packet packet) throws Exception {
         this.broadcast.downStreamManagement(packet);
     }
-    
+
     /**
      * send a message
-     * 
-     * @param asyncRequest
-     * @param member
-     * @param messageType
-     * @param message
-     * @throws Exception
      */
-    public void sendMessage(AsyncGroupRequest asyncRequest,MemberImpl member,MessageType messageType,
-            MessageBuffer message) throws Exception{
-        SendRequest<PacketDataBuffer> request = new SendRequest<PacketDataBuffer>();
-        PacketDataBean data = getPacketData(messageType, message);
-        asyncRequest.add(data.getMessageId(), request);
+    public void sendGroupRequest(AsyncGroupRequest asyncRequest, MemberImpl member, Packet packet) throws Exception {
+        SendRequest request = new SendRequest();
+        asyncRequest.add(packet.getId(), request);
         synchronized (this.messageRequests) {
-            this.messageRequests.put(data.getMessageId(), request);
+            this.messageRequests.put(packet.getId(), request);
         }
-        data.setReliable(false);
-        Packet packet = new Packet(data.freeze());
+        packet.setReliable(true);
         packet.setTo(member.getAddress());
         this.unicast.downStream(packet);
     }
-    
+
     /**
      * broadcast a general message
-     * 
-     * @param messageType
-     * @param message
-     * @param correlationId
-     * @throws Exception
      */
-    public void broadcastMessage(MessageType messageType,MessageBuffer message,String correlationId) throws Exception{
-        PacketDataBean data = getPacketData(messageType, message);
-        data.setCorrelationId(new Buffer(correlationId));
-        data.setReliable(true);
-        Packet packet = new Packet(data.freeze());
+    public void broadcastManagementMessage(Packet packet, String correlationId) throws Exception {
+        packet.setCorrelationId(correlationId);
+        packet.setReliable(true);
         this.broadcast.downStreamManagement(packet);
     }
-    
+
     /**
      * @param to
-     * @param messageType
-     * @param message
+     * @param packet
      * @throws Exception
      */
-    public void sendMessage(InetSocketAddress to,MessageType messageType,MessageBuffer message) throws Exception{
-        PacketDataBean data = getPacketData(messageType, message);
-        data.setReliable(false);
-        Packet packet = new Packet(data.freeze());
+    public void sendManagementMessage(InetSocketAddress to, Packet packet) throws Exception {
+        packet.setReliable(false);
         packet.setTo(to);
         this.unicast.downStream(packet);
     }
-    
+
     /**
      * @param to
-     * @param messageType
-     * @param message
+     * @param packet
      * @param correlationId
      * @throws Exception
      */
-    public void sendReply(MemberImpl to,MessageType messageType,MessageBuffer message,String correlationId)
-            throws Exception{
-        PacketDataBean data = getPacketData(messageType, message);
-        data.setCorrelationId(new Buffer(correlationId));
-        data.setReliable(false);
-        Packet packet = new Packet(data.freeze());
+    public void sendReply(MemberImpl to, Packet packet, String correlationId) throws Exception {
+        packet.setReliable(false);
         packet.setTo(to.getAddress());
+        packet.setCorrelationId(correlationId);
         this.unicast.downStream(packet);
     }
-    
-    protected MemberImpl getQueueDestination(Buffer destination){
+
+    protected MemberImpl getQueueDestination(Buffer destination) {
         // choose a member
         MemberImpl result = null;
         Map<Subscription, List<MemberImpl>> map = this.group.getQueueMap();
-        Subscription key = new Subscription(destination);
+        Subscription key = new Subscription(destination, false);
         List<MemberImpl> list = map.get(key);
         if (list == null) {
             // search through wildcard matches
@@ -787,37 +666,23 @@ public class BlazeGroupChannelImpl exten
         }
         return result;
     }
-    
-    protected void buildLocal(){
+
+    protected void buildLocal() {
         if (isInitialized()) {
             try {
                 synchronized (this.localMutex) {
-                    
-                    MemberDataBean bean = getLocalMember().getData().copy();
-                    bean.clearSubscriptionData();
+                    MemberImpl result = getLocalMember().clone();
+                    result.clearSubscriptions();
                     // add topic destinations
                     for (SubscriptionHolder s : this.topicMessageListeners) {
-                        bean.addSubscriptionData(s.getSubscription().getData());
+                        result.addSubscription(s.getSubscription());
                     }
                     // add Queue Destinations
-                    
                     for (SubscriptionHolder s : this.queueMessageListeners) {
-                        bean.addSubscriptionData(s.getSubscription().getData());
+                        result.addSubscription(s.getSubscription());
                     }
-                    
-                    MemberImpl result = new MemberImpl(bean.freeze());
-                    
                     this.group.processMemberUpdate(this.local, result);
-                    
-                    bean = bean.copy();
-                    bean.setSubscriptionsChanged(true);
-                    result = new MemberImpl(bean.freeze());
                     this.group.broadcastHeartBeat(result);
-                    
-                    bean = bean.copy();
-                    bean.clearSubscriptionsChanged();
-                    result = new MemberImpl(bean.freeze());
-                    
                     this.local = result;
                     this.group.updateLocal(this.local);
                 }

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupConfiguration.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupConfiguration.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupConfiguration.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupConfiguration.java Fri Jul 19 18:44:21 2013
@@ -20,13 +20,12 @@ import org.apache.activeblaze.BlazeConfi
 
 /**
  * Configuration for a BlazeGroupChannel
- * 
  */
 public class BlazeGroupConfiguration extends BlazeConfiguration {
     private int heartBeatInterval = 1000;
     private String unicastURI = "udp://localhost:0";
     //reliability
-    private String reliableUnicast ="swp";
+    private String reliableUnicast = "swp";
 
     /**
      * @return the heartBeatInterval
@@ -36,8 +35,7 @@ public class BlazeGroupConfiguration ext
     }
 
     /**
-     * @param heartBeatInterval
-     *            the heartBeatInterval to set
+     * @param heartBeatInterval the heartBeatInterval to set
      */
     public void setHeartBeatInterval(int heartBeatInterval) {
         this.heartBeatInterval = heartBeatInterval;
@@ -55,8 +53,7 @@ public class BlazeGroupConfiguration ext
     }
 
     /**
-     * @param unicastURI
-     *            the unicastURI to set
+     * @param unicastURI the unicastURI to set
      */
     public void setUnicastURI(String unicastURI) {
         this.unicastURI = unicastURI;

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupMessage.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupMessage.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupMessage.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupMessage.java Fri Jul 19 18:44:21 2013
@@ -16,27 +16,26 @@
  */
 package org.apache.activeblaze.group;
 
-import org.apache.activeblaze.BlazeException;
+import java.io.IOException;
+
 import org.apache.activeblaze.BlazeMessage;
 
 /**
  * Has information about the sender of the Message
  * This type of message is created on receiver
- *
  */
 public class BlazeGroupMessage extends BlazeMessage {
     private final Member sender;
-    
-    
+
+
     /**
      * Constructor
-     * @param sender
      */
-    public BlazeGroupMessage(Member sender){
-        this.sender=sender;
+    public BlazeGroupMessage(Member sender) {
+        this.sender = sender;
     }
-    
-    public BlazeMessage copy() throws BlazeException{
+
+    protected BlazeMessage copy() throws IOException {
         BlazeMessage copy = new BlazeGroupMessage(this.sender);
         copy(copy);
         return copy;

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/DefaultMemberChangedListener.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/DefaultMemberChangedListener.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/DefaultMemberChangedListener.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/DefaultMemberChangedListener.java Fri Jul 19 18:44:21 2013
@@ -18,18 +18,15 @@ package org.apache.activeblaze.group;
 
 /**
  * A Default listener for membership changes to a group
- * 
  */
 public class DefaultMemberChangedListener implements MemberChangedListener {
     /**
-     * @param member
      * @see org.apache.activeblaze.group.MemberChangedListener#memberStarted(org.apache.activeblaze.group.Member)
      */
     public void memberStarted(Member member) {
     }
 
     /**
-     * @param member
      * @see org.apache.activeblaze.group.MemberChangedListener#memberStopped(org.apache.activeblaze.group.Member)
      */
     public void memberStopped(Member member) {

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Group.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Group.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Group.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Group.java Fri Jul 19 18:44:21 2013
@@ -16,6 +16,7 @@
  */
 package org.apache.activeblaze.group;
 
+import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -31,21 +32,18 @@ import java.util.concurrent.ThreadFactor
 
 import org.apache.activeblaze.BaseService;
 import org.apache.activeblaze.Subscription;
-import org.apache.activeblaze.wire.MessageType;
-import org.apache.activeblaze.wire.SubscriptionData;
-import org.apache.activeblaze.wire.MemberData.MemberDataBuffer;
+import org.apache.activeblaze.wire.MemberImpl;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 /**
  * Maintains members of a group
- * 
  */
 public class Group extends BaseService {
     static final Log LOG = LogFactory.getLog(Group.class);
     final BlazeGroupChannelImpl channel;
     private final BlazeGroupConfiguration configuration;
-    private Timer heartBeatTimer;
+    Timer heartBeatTimer;
     private Timer checkMemberShipTimer;
     protected Map<String, MemberImpl> members = new ConcurrentHashMap<String, MemberImpl>();
     private List<MemberChangedListener> membershipListeners = new CopyOnWriteArrayList<MemberChangedListener>();
@@ -56,11 +54,6 @@ public class Group extends BaseService {
 
     /**
      * Constructor
-     * 
-     * @param local
-     * @param channel
-     * @param transport
-     * @param config
      */
     protected Group(BlazeGroupChannelImpl channel) {
         this.channel = channel;
@@ -130,9 +123,6 @@ public class Group extends BaseService {
 
     /**
      * Get a member by its unique id
-     * 
-     * @param id
-     * @return
      */
     Member getMemberById(String id) {
         return this.members.get(id);
@@ -140,9 +130,6 @@ public class Group extends BaseService {
 
     /**
      * Return a member of the Group with the matching name
-     * 
-     * @param name
-     * @return
      */
     Member getMemberByName(String name) {
         if (name != null) {
@@ -157,11 +144,8 @@ public class Group extends BaseService {
 
     /**
      * Will wait for a member to advertise itself if not available
-     * 
-     * @param name
-     * @param timeout
+     *
      * @return the member or null
-     * @throws InterruptedException
      */
     public Member getAndWaitForMemberByName(String name, int timeout) throws InterruptedException {
         Member result = null;
@@ -187,8 +171,6 @@ public class Group extends BaseService {
     }
 
     /**
-     * 
-     * @throws Exception
      * @see org.apache.activeblaze.Service#init()
      */
     public void doInit() throws Exception {
@@ -204,8 +186,6 @@ public class Group extends BaseService {
     }
 
     /**
-     * 
-     * @throws Exception
      * @see org.apache.activeblaze.Service#shutDown()
      */
     public void doShutDown() throws Exception {
@@ -216,8 +196,6 @@ public class Group extends BaseService {
     }
 
     /**
-     * 
-     * @throws Exception
      * @see org.apache.activeblaze.Service#start()
      */
     public void doStart() throws Exception {
@@ -250,21 +228,21 @@ public class Group extends BaseService {
     }
 
     /**
-     * 
-     * @throws Exception
      * @see org.apache.activeblaze.Service#stop()
      */
     public void doStop() throws Exception {
         if (this.heartBeatTimer != null) {
-            // Make sure we shutdown the timer before shutting down the down stream 
+            // Make sure we shutdown the timer before shutting down the down
+            // stream
             // processors to avoid the timer getting errors.
             final CountDownLatch done = new CountDownLatch(1);
-            this.heartBeatTimer.schedule(new TimerTask(){
+            this.heartBeatTimer.schedule(new TimerTask() {
                 @Override
                 public void run() {
-                    heartBeatTimer.cancel();
+                    Group.this.heartBeatTimer.cancel();
                     done.countDown();
-                }}, 0);
+                }
+            }, 0);
             done.await();
             this.heartBeatTimer = null;
         }
@@ -273,34 +251,32 @@ public class Group extends BaseService {
         }
     }
 
+    /**
+     * @return the String representation
+     * @see java.lang.Object#toString()
+     */
     public String toString() {
         return "Group " + getLocalMember().getName();
     }
 
     /**
      * Process a new member
-     * 
-     * @param data
-     * @throws Exception
+     *
      * @return Member if a new member else null
      */
-    protected final MemberImpl processMember(MemberDataBuffer data) throws Exception {
+    protected final MemberImpl processMember(MemberImpl member) throws Exception {
         MemberImpl result = null;
         MemberImpl old = null;
-        MemberImpl member = new MemberImpl(data);
         if (!member.getId().equals(getLocalMember().getId()) && isInOurGroup(member)) {
             member.setTimeStamp(System.currentTimeMillis());
             if ((old = this.members.put(member.getId(), member)) == null) {
                 processMemberStarted(member);
                 if (!member.getId().equals(this.channel.getId())) {
-                    this.channel.sendMessage(member.getAddress(), MessageType.MEMBER_DATA, this.channel
-                            .getLocalMember().getData().freeze());
+                    this.channel.sendManagementMessage(member.getAddress(), this.channel.getLocalMember());
                 }
                 result = member;
             } else {
-                if (data.getSubscriptionsChanged()) {
-                    processMemberUpdate(old, member);
-                }
+                processMemberUpdate(old, member);
             }
         }
         return result;
@@ -348,7 +324,7 @@ public class Group extends BaseService {
     }
 
     protected void processMemberStarted(MemberImpl member) throws Exception {
-        processDestinationsForStarted(member);
+        processDestinationsForStarted(member, member.getSubscriptions());
         fireMemberStarted(member);
         synchronized (this.members) {
             this.members.notifyAll();
@@ -357,58 +333,68 @@ public class Group extends BaseService {
 
     protected void processMemberStopped(MemberImpl member) throws Exception {
         fireMemberStopped(member);
-        processDestinationsForStopped(member);
+        processDestinationsForStopped(member, member.getSubscriptions());
     }
 
-    private void processDestinationsForStarted(MemberImpl member) {
-        List<SubscriptionData> subscriptionList = member.getData().getSubscriptionDataList();
-        if( subscriptionList == null ) {
+    private void processDestinationsForStarted(MemberImpl member, List<Subscription> subscriptionList) {
+        if (subscriptionList == null) {
             return;
         }
-        for (SubscriptionData subData : subscriptionList) {
+        for (Subscription sub : subscriptionList) {
             Map<Subscription, List<MemberImpl>> map = null;
-            if (subData.getDestinationData().getTopic()) {
+            if (sub.getDestination().isTopic()) {
                 map = this.topicMap;
             } else {
                 map = this.queueMap;
             }
-            Subscription key = new Subscription(subData.copy());
-            List<MemberImpl> members = map.get(key);
-            if (members == null) {
-                members = new CopyOnWriteArrayList<MemberImpl>();
-                map.put(key, members);
+            List<MemberImpl> membersList = map.get(sub);
+            if (membersList == null) {
+                membersList = new CopyOnWriteArrayList<MemberImpl>();
+                map.put(sub, membersList);
             }
-            members.add(member);
+            membersList.add(member);
         }
     }
 
-    private void processDestinationsForStopped(MemberImpl member) {
-        List<SubscriptionData> subscriptionList = member.getData().getSubscriptionDataList();
-        if( subscriptionList == null ) {
+    private void processDestinationsForStopped(MemberImpl member, List<Subscription> subscriptionList) {
+        if (subscriptionList == null) {
             return;
         }
-        for (SubscriptionData subData : subscriptionList) {
+        for (Subscription sub : subscriptionList) {
             Map<Subscription, List<MemberImpl>> map = null;
-            if (subData.getDestinationData().getTopic()) {
+            if (sub.getDestination().isTopic()) {
                 map = this.topicMap;
             } else {
                 map = this.queueMap;
             }
-            Subscription key = new Subscription(subData.copy());
-            List<MemberImpl> members = map.get(key);
-            if (members != null) {
-                members.remove(member);
-                if (members.isEmpty()) {
-                    map.remove(key);
+            List<MemberImpl> membersList = map.get(sub);
+            if (membersList != null) {
+                membersList.remove(member);
+                if (membersList.isEmpty()) {
+                    map.remove(sub);
                 }
             }
         }
     }
 
     protected void processMemberUpdate(MemberImpl oldMember, MemberImpl newMember) throws Exception {
-        //check for deltas
-        processDestinationsForStopped(oldMember);
-        processDestinationsForStarted(newMember);
+        // check for deltas
+        List<Subscription> oldList = oldMember.getSubscriptions();
+        List<Subscription> newList = newMember.getSubscriptions();
+        List<Subscription> removed = new ArrayList<Subscription>();
+        for (Subscription s : oldList) {
+            if (!newList.contains(s)) {
+                removed.add(s);
+            }
+        }
+        processDestinationsForStopped(oldMember, removed);
+        List<Subscription> added = new ArrayList<Subscription>();
+        for (Subscription s : newList) {
+            if (!oldList.contains(s)) {
+                added.add(s);
+            }
+        }
+        processDestinationsForStarted(newMember, added);
     }
 
     /**
@@ -427,7 +413,7 @@ public class Group extends BaseService {
 
     protected void broadcastHeartBeat(MemberImpl local) throws Exception {
         if (isStarted()) {
-            Group.this.channel.broadcastMessage(MessageType.MEMBER_DATA, local.getData());
+            Group.this.channel.broadcastManagementMessage(local);
         }
     }
 

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Member.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Member.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Member.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Member.java Fri Jul 19 18:44:21 2013
@@ -20,12 +20,11 @@ import java.util.List;
 
 
 /**
- *A <CODE>Member</CODE> holds information about a member of the group
- *A Member has to be added to a group to interact with it
- *
+ * A <CODE>Member</CODE> holds information about a member of the group
+ * A Member has to be added to a group to interact with it
  */
 public interface Member {
-    
+
     /**
      * @return the name
      */
@@ -35,48 +34,51 @@ public interface Member {
      * @return the id
      */
     public String getId();
-    
-        
+
+
     /**
      * @return the startTime
      */
     public long getStartTime();
-    
+
     /**
      * @return the timeStamp
      */
     long getTimeStamp();
-    
+
     /**
      * Set the timestamp
-     * @param value
      */
     void setTimeStamp(long value);
+
     /**
      * @return the inbox destination
      */
     public String getInBoxDestination();
-    
-    
+
+
     /**
      * This weight can be used to help select a master
      * in the cluster - the highest weight becomes the master
+     *
      * @return the masterWeight
      */
     public long getMasterWeight();
-    
+
     /**
-     * If there is two members have the same master weight, 
+     * If there is two members have the same master weight,
      * a secondary weight can be used
+     *
      * @return refined weight
      */
     public long getRefinedMasterWeight();
-    
-    
+
+
     /**
      * Get an array of groups
+     *
      * @return an array of groups
      */
     public List<String> getGroups();
-    
+
 }

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/MemberChangedListener.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/MemberChangedListener.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/MemberChangedListener.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/MemberChangedListener.java Fri Jul 19 18:44:21 2013
@@ -20,19 +20,16 @@ package org.apache.activeblaze.group;
 
 /**
  * A listener for membership changes to a group
- *
  */
 public interface MemberChangedListener {
-    
+
     /**
      * Notification a member has started
-     * @param member
      */
     void memberStarted(Member member);
-    
+
     /**
      * Notification a member has stopped
-     * @param member
      */
     void memberStopped(Member member);
 }
\ No newline at end of file

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/package.html
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/package.html?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/package.html (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/package.html Fri Jul 19 18:44:21 2013
@@ -19,7 +19,7 @@
 </head>
 <body>
 
-Group channel for communicating true point-to-point using unicast and 
+Group channel for communicating true point-to-point using unicast and
 Group membership
 
 </body>

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/destination/DestinationMatch.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/destination/DestinationMatch.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/destination/DestinationMatch.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/destination/DestinationMatch.java Fri Jul 19 18:44:21 2013
@@ -16,55 +16,48 @@
  */
 package org.apache.activeblaze.impl.destination;
 
-import org.apache.activemq.protobuf.Buffer;
-import org.apache.activemq.protobuf.UTF8Buffer;
+import org.apache.activeblaze.wire.Buffer;
+
 
 /**
  * Matches a Destination Subject to wildcards
- * 
  */
 public final class DestinationMatch {
     static final byte MATCH_ELEMENT = '*';
     static final byte MATCH_ALL = '>';
-    static final byte[] DELIMETERS = { '.', '/', '|' };
+    //static final byte[] DELIMETERS = { '.', '/', '|' };
+    static final byte DELIMETER = '.';
 
     /**
      * See if the destination matches with wild cards
-     * 
-     * @param destination
-     * @param match
+     *
      * @return true if its a match
      */
     public static boolean isMatch(String destination, String match) {
-        return isMatch(new UTF8Buffer(destination), new UTF8Buffer(match));
+        return isMatch(new Buffer(destination), new Buffer(match));
     }
+
     /**
      * See if the destination matches with wild cards
-     * 
-     * @param destination
-     * @param match
+     *
      * @return true if its a match
      */
     public static boolean isMatch(Buffer destination, String match) {
-        return isMatch(destination, new UTF8Buffer(match));
+        return isMatch(destination, new Buffer(match));
     }
-    
+
     /**
      * See if the destination matches with wild cards
-     * 
-     * @param destination
-     * @param match
+     *
      * @return true if its a match
      */
     public static boolean isMatch(String destination, Buffer match) {
-        return isMatch(new UTF8Buffer(destination), match);
+        return isMatch(new Buffer(destination), match);
     }
 
     /**
      * See if the destination matches with wild cards
-     * 
-     * @param destination
-     * @param match
+     *
      * @return true if its a match
      */
     public static boolean isMatch(Buffer destination, Buffer match) {
@@ -80,8 +73,8 @@ public final class DestinationMatch {
         int matchOffset = 0;
         while (destinationOffset < destination.length
                 && matchOffset < match.length) {
-            byte matchByte = match.byteAt(matchOffset);
-            byte destinationByte = destination.byteAt(destinationOffset);
+            byte matchByte = match.data[match.offset + matchOffset];
+            byte destinationByte = destination.data[+destination.offset + destinationOffset];
             if (matchByte != destinationByte || matchByte == MATCH_ALL || destinationByte == MATCH_ALL
                     || matchByte == MATCH_ELEMENT || destinationByte == MATCH_ELEMENT) {
                 if (matchByte == MATCH_ALL || destinationByte == MATCH_ALL) {
@@ -89,7 +82,7 @@ public final class DestinationMatch {
                     break;
                 } else if ((matchByte == MATCH_ELEMENT || destinationByte == MATCH_ELEMENT)
                         && (isMatchElement(match, matchOffset, match.length) || isMatchElement(destination,
-                                destinationOffset, destination.length))) {
+                        destinationOffset, destination.length))) {
                     if (containsDelimeter(destination, destinationOffset, destination.length) == false
                             && containsDelimeter(match, matchOffset, match.length) == false) {
                         break;
@@ -120,12 +113,12 @@ public final class DestinationMatch {
         if (str != null && offset < str.length) {
             byte offByte = str.byteAt(offset);
             byte offBytePlusOne = (byte) ((offset + 1) < str.length ? str.byteAt(offset + 1) : ' ');
-            if (offset + 1 < str.length && isDelimiter(offByte)) {
+            if (offset + 1 < str.length && DELIMETER == offByte) {
                 if (offBytePlusOne == MATCH_ALL) {
                     result = true;
                 }
             } else if ((offByte == MATCH_ALL || offByte == MATCH_ELEMENT)
-                    && (isWhiteSpace(str, offset + 1, count) || isDelimiter(offBytePlusOne) || offset + 1 == str.length)) {
+                    && (isWhiteSpace(str, offset + 1, count) || DELIMETER == offBytePlusOne || offset + 1 == str.length)) {
                 result = true;
             }
         }
@@ -135,8 +128,8 @@ public final class DestinationMatch {
     static boolean isMatchElement(Buffer str, int offset, int count) {
         boolean result = false;
         if (str.byteAt(offset) == MATCH_ELEMENT) {
-            if (offset == 0 || isDelimiter(str.byteAt(offset - 1))) {
-                result = ((offset + 1) >= str.length) || isDelimiter(str.byteAt(offset + 1))
+            if (offset == 0 || str.byteAt(offset - 1) == DELIMETER) {
+                result = ((offset + 1) >= str.length) || str.byteAt(offset + 1) == DELIMETER
                         || isWhiteSpace(str, offset + 1, count);
             }
         }
@@ -156,7 +149,7 @@ public final class DestinationMatch {
 
     static int offsetToNextToken(Buffer str, int offset, int len) {
         while (offset < len) {
-            if (isDelimiter(str.byteAt(offset)))
+            if (str.byteAt(offset) == DELIMETER)
                 break;
             offset++;
         }
@@ -167,10 +160,10 @@ public final class DestinationMatch {
         int result = -1;
         int count = offset;
         while (count < len) {
-            if (isDelimiter(str.byteAt(count))) {
+            if (str.byteAt(count) == DELIMETER) {
                 result = ++count;
                 // check for double placed elements ...
-                while (count < len && isDelimiter(str.byteAt(count))) {
+                while (count < len && str.byteAt(count) == DELIMETER) {
                     result = ++count;
                 }
                 break;
@@ -183,7 +176,7 @@ public final class DestinationMatch {
     private static boolean containsDelimeter(Buffer str, int offset, int len) {
         boolean result = false;
         while (offset < len) {
-            if (isDelimiter(str.byteAt(offset++))) {
+            if (str.byteAt(offset++) == DELIMETER) {
                 result = true;
                 break;
             }
@@ -191,12 +184,4 @@ public final class DestinationMatch {
         return result;
     }
 
-    private static final boolean isDelimiter(byte b) {
-        for (int i = 0; i < DELIMETERS.length; i++) {
-            if (b == DELIMETERS[i]) {
-                return true;
-            }
-        }
-        return false;
-    }
 }

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/destination/package.html
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/destination/package.html?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/destination/package.html (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/destination/package.html Fri Jul 19 18:44:21 2013
@@ -1,18 +1,18 @@
 !--
-    Licensed to the Apache Software Foundation (ASF) under one or more
-    contributor license agreements.  See the NOTICE file distributed with
-    this work for additional information regarding copyright ownership.
-    The ASF licenses this file to You under the Apache License, Version 2.0
-    (the "License"); you may not use this file except in compliance with
-    the License.  You may obtain a copy of the License at
-   
-    http://www.apache.org/licenses/LICENSE-2.0
-   
-    Unless required by applicable law or agreed to in writing, software
-    distributed under the License is distributed on an "AS IS" BASIS,
-    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-    See the License for the specific language governing permissions and
-    limitations under the License.
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
 -->
 <html>
 <head>

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/BaseNetwork.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/BaseNetwork.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/BaseNetwork.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/BaseNetwork.java Fri Jul 19 18:44:21 2013
@@ -16,6 +16,9 @@
  */
 package org.apache.activeblaze.impl.network;
 
+import java.net.InetSocketAddress;
+import java.net.URI;
+
 import org.apache.activeblaze.ExceptionListener;
 import org.apache.activeblaze.Processor;
 import org.apache.activeblaze.impl.processor.ChainedProcessor;
@@ -26,12 +29,8 @@ import org.apache.activeblaze.impl.trans
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-import java.net.InetSocketAddress;
-import java.net.URI;
-
 /**
  * Base class for network broadcast protocols
- * 
  */
 public abstract class BaseNetwork extends DefaultChainedProcessor implements Network, ExceptionListener {
     protected static final Log LOG = LogFactory.getLog(BaseNetwork.class);
@@ -52,16 +51,13 @@ public abstract class BaseNetwork extend
     }
 
     /**
-     * @param name
-     *            the name to set
+     * @param name the name to set
      */
     public void setName(String name) {
         this.name = name;
     }
 
     /**
-     * @param uri
-     * @throws Exception
      * @see org.apache.activeblaze.impl.network.Network#setManagementURI(java.net.URI)
      */
     public void setManagementURI(URI uri) throws Exception {
@@ -69,8 +65,6 @@ public abstract class BaseNetwork extend
     }
 
     /**
-     * @param uri
-     * @throws Exception
      * @see org.apache.activeblaze.impl.network.Network#setURI(java.net.URI)
      */
     public void setURI(URI uri) throws Exception {
@@ -86,7 +80,6 @@ public abstract class BaseNetwork extend
     }
 
     /**
-     * @param reliability
      * @see org.apache.activeblaze.impl.network.Network#setReliability(java.lang.String)
      */
     public void setReliability(String reliability) {
@@ -95,8 +88,7 @@ public abstract class BaseNetwork extend
 
     /**
      * initialize the network
-     * 
-     * @throws Exception
+     *
      * @see org.apache.activeblaze.Service#init()
      */
     public void doInit() throws Exception {
@@ -132,8 +124,6 @@ public abstract class BaseNetwork extend
     }
 
     /**
-     * 
-     * @throws Exception
      * @see org.apache.activeblaze.Service#shutDown()
      */
     public void doShutDown() throws Exception {
@@ -147,8 +137,6 @@ public abstract class BaseNetwork extend
     }
 
     /**
-     * 
-     * @throws Exception
      * @see org.apache.activeblaze.Service#start()
      */
     public void doStart() throws Exception {
@@ -162,8 +150,6 @@ public abstract class BaseNetwork extend
     }
 
     /**
-     * 
-     * @throws Exception
      * @see org.apache.activeblaze.Service#stop()
      */
     public void doStop() throws Exception {
@@ -176,10 +162,8 @@ public abstract class BaseNetwork extend
         }
     }
 
-   
 
     /**
-     * @param l
      * @see org.apache.activeblaze.Processor#setExceptionListener(org.apache.activeblaze.ExceptionListener)
      */
     public void setExceptionListener(ExceptionListener l) {
@@ -187,7 +171,6 @@ public abstract class BaseNetwork extend
     }
 
     /**
-     * @param ex
      * @see org.apache.activeblaze.ExceptionListener#onException(java.lang.Exception)
      */
     public void onException(Exception ex) {

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/MulticastNetwork.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/MulticastNetwork.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/MulticastNetwork.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/MulticastNetwork.java Fri Jul 19 18:44:21 2013
@@ -17,13 +17,13 @@
 package org.apache.activeblaze.impl.network;
 
 import java.net.InetSocketAddress;
-import org.apache.activeblaze.impl.processor.Packet;
+
 import org.apache.activeblaze.impl.transport.BaseTransport;
 import org.apache.activeblaze.impl.transport.TransportFactory;
+import org.apache.activeblaze.wire.Packet;
 
 /**
  * Uses multicast to implement a Network
- * 
  */
 public class MulticastNetwork extends BaseNetwork {
     /**
@@ -32,19 +32,17 @@ public class MulticastNetwork extends Ba
     public MulticastNetwork() {
         this.reliability = "simple";
     }
-    
+
     public void doInit() throws Exception {
         super.doInit();
-        this.broadcastAddress = new InetSocketAddress(this.broadcastURI.getHost(),this.broadcastURI.getPort());
-        if (this.managementURI!=null) {
-            this.managementAddress = new InetSocketAddress(this.managementURI.getHost(),this.managementURI.getPort());
+        this.broadcastAddress = new InetSocketAddress(this.broadcastURI.getHost(), this.broadcastURI.getPort());
+        if (this.managementURI != null) {
+            this.managementAddress = new InetSocketAddress(this.managementURI.getHost(), this.managementURI.getPort());
         }
     }
 
     /**
-     * @param packet
-     * @throws Exception
-     * @see org.apache.activeblaze.impl.network.Network#downStreamManagement(org.apache.activeblaze.impl.processor.Packet)
+     * @see org.apache.activeblaze.impl.network.Network#downStreamManagement(org.apache.activeblaze.wire.Packet)
      */
     public void downStreamManagement(Packet packet) throws Exception {
         if (this.management != null) {
@@ -56,9 +54,7 @@ public class MulticastNetwork extends Ba
     }
 
     /**
-     * @param packet
-     * @throws Exception
-     * @see org.apache.activeblaze.Processor#downStream(org.apache.activeblaze.impl.processor.Packet)
+     * @see org.apache.activeblaze.Processor#downStream(org.apache.activeblaze.wire.Packet)
      */
     public void downStream(Packet packet) throws Exception {
         packet.setTo(this.broadcastAddress);
@@ -66,8 +62,6 @@ public class MulticastNetwork extends Ba
     }
 
     /**
-     * @return
-     * @throws Exception
      * @see org.apache.activeblaze.impl.network.BaseNetwork#createManagementTransport()
      */
     @Override
@@ -77,8 +71,6 @@ public class MulticastNetwork extends Ba
     }
 
     /**
-     * @return
-     * @throws Exception
      * @see org.apache.activeblaze.impl.network.BaseNetwork#createTransport()
      */
     @Override
@@ -88,7 +80,6 @@ public class MulticastNetwork extends Ba
     }
 
     /**
-     * @return
      * @see org.apache.activeblaze.impl.network.BaseNetwork#doCreateManagement()
      */
     @Override

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/Network.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/Network.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/Network.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/Network.java Fri Jul 19 18:44:21 2013
@@ -15,54 +15,47 @@
  * limitations under the License.
  */
 package org.apache.activeblaze.impl.network;
+
 import java.net.URI;
+
 import org.apache.activeblaze.impl.processor.ChainedProcessor;
 
 /**
- * <P>
+ * <p/>
  * A <CODE>Network</CODE> defines operations that can be applied to remote
  * channel instances
- * 
  */
-public interface Network extends ChainedProcessor{
-    
+public interface Network extends ChainedProcessor {
+
     /**
      * @return the name
      */
-    public String getName() ;
+    public String getName();
+
     /**
      * @param name the name to set
      */
     public void setName(String name);
-    
+
     /**
      * Set the uri for the <Code>Network</Code> to use
-     * @param uri
-     * @throws Exception 
      */
     public void setURI(URI uri) throws Exception;
-    
+
     /**
      * Set the uri for the <Code>Network</Code> to use for management
-     * @param uri
-     * @throws Exception 
      */
     public void setManagementURI(URI uri) throws Exception;
-    
+
     /**
      * Set the reliable protocol to use for this network
-     * @param reliability
      */
     public void setReliability(String reliability);
-    
+
     /**
      * @return the reliability protocol used for this network
      */
     public String getReliability();
-    
-        
-   
-    
-    
+
 
 }
\ No newline at end of file

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/NetworkFactory.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/NetworkFactory.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/NetworkFactory.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/NetworkFactory.java Fri Jul 19 18:44:21 2013
@@ -18,25 +18,21 @@ package org.apache.activeblaze.impl.netw
 
 import java.net.URI;
 import java.util.Map;
+
 import org.apache.activeblaze.util.ObjectFinder;
 import org.apache.activeblaze.util.PropertyUtil;
 
 /**
  * create a new Network instance
- * 
  */
 public abstract class NetworkFactory {
     private static final ObjectFinder OBJECT_FINDER = new ObjectFinder(
             "META-INF/services/org/apache/activeblaze/network/");
 
     /**
-     * @param broadcast
-     * @param management
-     * @param reliability 
      * @return the network associated with the URI
-     * @throws Exception
      */
-    public static Network get(URI broadcast, URI management,String reliability) throws Exception {
+    public static Network get(URI broadcast, URI management, String reliability) throws Exception {
         Network result = findNetwork(broadcast);
         result.setURI(broadcast);
         result.setReliability(reliability);

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/StaticNetwork.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/StaticNetwork.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/StaticNetwork.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/StaticNetwork.java Fri Jul 19 18:44:21 2013
@@ -16,20 +16,21 @@
  */
 package org.apache.activeblaze.impl.network;
 
-import org.apache.activeblaze.BlazeException;
-import org.apache.activeblaze.impl.processor.Packet;
-import org.apache.activeblaze.impl.transport.BaseTransport;
-import org.apache.activeblaze.impl.transport.TransportFactory;
-import org.apache.activeblaze.util.URISupport;
-import org.apache.activeblaze.util.URISupport.CompositeData;
 import java.net.BindException;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.List;
+
+import org.apache.activeblaze.BlazeException;
+import org.apache.activeblaze.impl.transport.BaseTransport;
+import org.apache.activeblaze.impl.transport.TransportFactory;
+import org.apache.activeblaze.util.URISupport;
+import org.apache.activeblaze.util.URISupport.CompositeData;
+import org.apache.activeblaze.wire.Packet;
+
 /**
  * Uses a list of URI's to create a network
- * 
  */
 public class StaticNetwork extends BaseNetwork {
     private InetSocketAddress localAddress;
@@ -48,8 +49,6 @@ public class StaticNetwork extends BaseN
     }
 
     /**
-     * @param location
-     * @throws Exception
      * @see org.apache.activeblaze.impl.network.Network#setManagementURI(java.net.URI)
      */
     public void setManagementURI(URI location) throws Exception {
@@ -70,8 +69,6 @@ public class StaticNetwork extends BaseN
     }
 
     /**
-     * @param location
-     * @throws Exception
      * @see org.apache.activeblaze.impl.network.Network#setURI(java.net.URI)
      */
     public void setURI(URI location) throws Exception {
@@ -91,14 +88,12 @@ public class StaticNetwork extends BaseN
     }
 
     /**
-     * 
+     *
      * @throws Exception
      * @see org.apache.activeblaze.Service#init()
      */
     /**
-     * @param packet
-     * @throws Exception
-     * @see org.apache.activeblaze.impl.network.Network#downStreamManagement(org.apache.activeblaze.impl.processor.Packet)
+     * @see org.apache.activeblaze.impl.network.Network#downStreamManagement(org.apache.activeblaze.wire.Packet)
      */
     public void downStreamManagement(Packet packet) throws Exception {
         if (this.management != null) {
@@ -117,9 +112,7 @@ public class StaticNetwork extends BaseN
     }
 
     /**
-     * @param packet
-     * @throws Exception
-     * @see org.apache.activeblaze.Processor#downStream(org.apache.activeblaze.impl.processor.Packet)
+     * @see org.apache.activeblaze.Processor#downStream(org.apache.activeblaze.wire.Packet)
      */
     public void downStream(Packet packet) throws Exception {
         for (InetSocketAddress address : this.broadcastAddresses) {

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/package.html
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/package.html?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/package.html (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/package.html Fri Jul 19 18:44:21 2013
@@ -1,25 +1,25 @@
 !--
-    Licensed to the Apache Software Foundation (ASF) under one or more
-    contributor license agreements.  See the NOTICE file distributed with
-    this work for additional information regarding copyright ownership.
-    The ASF licenses this file to You under the Apache License, Version 2.0
-    (the "License"); you may not use this file except in compliance with
-    the License.  You may obtain a copy of the License at
-   
-    http://www.apache.org/licenses/LICENSE-2.0
-   
-    Unless required by applicable law or agreed to in writing, software
-    distributed under the License is distributed on an "AS IS" BASIS,
-    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-    See the License for the specific language governing permissions and
-    limitations under the License.
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
 -->
 <html>
 <head>
 </head>
 <body>
 
-A Transport that represents all reachable nodes. 
+A Transport that represents all reachable nodes.
 A <Code>Network</Code>Can be a multicast address, defined by a list or urls
 or use a central location service to determine where to locate channels
 

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/ChainedProcessor.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/ChainedProcessor.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/ChainedProcessor.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/ChainedProcessor.java Fri Jul 19 18:44:21 2013
@@ -21,7 +21,6 @@ import org.apache.activeblaze.Processor;
 
 /**
  * Chains Processors together
- * 
  */
 public interface ChainedProcessor extends Processor {
     /**
@@ -31,16 +30,11 @@ public interface ChainedProcessor extend
 
     /**
      * Set Next at the end of the chain
-     * 
-     * @param next
-     * 
      */
     public abstract void setEnd(Processor next);
 
     /**
      * Set the next
-     * 
-     * @param next
      */
     public abstract void setNext(Processor next);
 
@@ -51,14 +45,11 @@ public interface ChainedProcessor extend
 
     /**
      * Set the next chain
-     * 
-     * @param p
      */
     public abstract void setNextChain(ChainedProcessor p);
 
     /**
-     * @param prev
-     *            the prev to set
+     * @param prev the prev to set
      */
     public abstract void setPrev(Processor prev);
 
@@ -68,8 +59,7 @@ public interface ChainedProcessor extend
     public ExceptionListener getExceptionListener();
 
     /**
-     * @param maxPacketSize
-     *            the maxPacketSize to set
+     * @param maxPacketSize the maxPacketSize to set
      */
     public void setMaxPacketSize(int maxPacketSize);
 

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/DefaultChainedProcessor.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/DefaultChainedProcessor.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/DefaultChainedProcessor.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/DefaultChainedProcessor.java Fri Jul 19 18:44:21 2013
@@ -21,12 +21,12 @@ import org.apache.activeblaze.BlazeConfi
 import org.apache.activeblaze.BlazeException;
 import org.apache.activeblaze.ExceptionListener;
 import org.apache.activeblaze.Processor;
+import org.apache.activeblaze.wire.Packet;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 /**
  * Default implementation of a ChainedProcessor
- * 
  */
 public class DefaultChainedProcessor extends BaseService implements ChainedProcessor {
     private static final Log LOG = LogFactory.getLog(DefaultChainedProcessor.class);
@@ -47,7 +47,6 @@ public class DefaultChainedProcessor ext
     }
 
     /**
-     * @param next
      * @see org.apache.activeblaze.impl.processor.ChainedProcessor#setEnd(org.apache.activeblaze.Processor)
      */
     public void setEnd(Processor next) {
@@ -68,7 +67,6 @@ public class DefaultChainedProcessor ext
     }
 
     /**
-     * @param next
      * @see org.apache.activeblaze.impl.processor.ChainedProcessor#setNext(org.apache.activeblaze.Processor)
      */
     public void setNext(Processor next) {
@@ -84,7 +82,6 @@ public class DefaultChainedProcessor ext
     }
 
     /**
-     * @param p
      * @see org.apache.activeblaze.impl.processor.ChainedProcessor#setNextChain(org.apache.activeblaze.impl.processor.ChainedProcessor)
      */
     public void setNextChain(ChainedProcessor p) {
@@ -105,7 +102,6 @@ public class DefaultChainedProcessor ext
     }
 
     /**
-     * @param prev
      * @see org.apache.activeblaze.impl.processor.ChainedProcessor#setPrev(org.apache.activeblaze.Processor)
      */
     public void setPrev(Processor prev) {
@@ -138,9 +134,6 @@ public class DefaultChainedProcessor ext
 
     /**
      * Send a management packet - this may be on a different address
-     * 
-     * @param packet
-     * @throws Exception
      */
     public void downStreamManagement(Packet packet) throws Exception {
         if (this.next != null) {
@@ -212,8 +205,7 @@ public class DefaultChainedProcessor ext
     }
 
     /**
-     * @param maxPacketSize
-     *            the maxPacketSize to set
+     * @param maxPacketSize the maxPacketSize to set
      */
     public void setMaxPacketSize(int maxPacketSize) {
         this.maxPacketSize = maxPacketSize;

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/package.html
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/package.html?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/package.html (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/package.html Fri Jul 19 18:44:21 2013
@@ -1,18 +1,18 @@
 !--
-    Licensed to the Apache Software Foundation (ASF) under one or more
-    contributor license agreements.  See the NOTICE file distributed with
-    this work for additional information regarding copyright ownership.
-    The ASF licenses this file to You under the Apache License, Version 2.0
-    (the "License"); you may not use this file except in compliance with
-    the License.  You may obtain a copy of the License at
-   
-    http://www.apache.org/licenses/LICENSE-2.0
-   
-    Unless required by applicable law or agreed to in writing, software
-    distributed under the License is distributed on an "AS IS" BASIS,
-    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-    See the License for the specific language governing permissions and
-    limitations under the License.
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
 -->
 <html>
 <head>