You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2013/07/19 20:44:24 UTC

svn commit: r1504961 [10/11] - in /activemq/activemq-blaze/trunk: ./ src/main/java/org/apache/activeblaze/ src/main/java/org/apache/activeblaze/cluster/ src/main/java/org/apache/activeblaze/group/ src/main/java/org/apache/activeblaze/impl/destination/ ...

Copied: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/MarshallingSupport.java (from r752825, activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/MarshallingSupport.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/MarshallingSupport.java?p2=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/MarshallingSupport.java&p1=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/MarshallingSupport.java&r1=752825&r2=1504961&rev=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/MarshallingSupport.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/MarshallingSupport.java Fri Jul 19 18:44:21 2013
@@ -14,12 +14,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activeblaze.jms.message;
+package org.apache.activeblaze.wire;
 
 import java.io.DataInput;
-import java.io.DataInputStream;
 import java.io.DataOutput;
-import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.UTFDataFormatException;
 import java.util.ArrayList;
@@ -31,7 +29,7 @@ import java.util.Map;
 /**
  * The fixed version of the UTF8 encoding function. Some older JVM's UTF8
  * encoding function breaks when handling large strings.
- * 
+ *
  * @version $Revision$
  */
 public final class MarshallingSupport {
@@ -53,13 +51,13 @@ public final class MarshallingSupport {
 
     private MarshallingSupport() {
     }
-    
-    public static void marshalPrimitiveMap(Map<String,Object> map, DataOutputStream out) throws IOException {
+
+    public static void marshalPrimitiveMap(Map<String, Object> map, DataOutput out) throws IOException {
         if (map == null) {
             out.writeInt(-1);
         } else {
             out.writeInt(map.size());
-            for (String name:map.keySet()) {
+            for (String name : map.keySet()) {
                 out.writeUTF(name);
                 Object value = map.get(name);
                 marshalPrimitive(out, value);
@@ -67,7 +65,7 @@ public final class MarshallingSupport {
         }
     }
 
-    public static Map<String, Object> unmarshalPrimitiveMap(DataInputStream in) throws IOException {
+    public static Map<String, Object> unmarshalPrimitiveMap(DataInput in) throws IOException {
         return unmarshalPrimitiveMap(in, Integer.MAX_VALUE);
     }
 
@@ -77,12 +75,12 @@ public final class MarshallingSupport {
      * @throws IOException
      * @throws IOException
      */
-    public static Map<String, Object> unmarshalPrimitiveMap(DataInputStream in, int maxPropertySize) throws IOException {
+    public static Map<String, Object> unmarshalPrimitiveMap(DataInput in, int maxPropertySize) throws IOException {
         int size = in.readInt();
         if (size > maxPropertySize) {
             throw new IOException("Primitive map is larger than the allowed size: " + size);
         }
-        if (size < 0) {
+        if (size <= 0) {
             return null;
         } else {
             Map<String, Object> rc = new HashMap<String, Object>(size);
@@ -95,15 +93,15 @@ public final class MarshallingSupport {
 
     }
 
-    public static void marshalPrimitiveList(List list, DataOutputStream out) throws IOException {
+    public static void marshalPrimitiveList(List list, DataOutput out) throws IOException {
         out.writeInt(list.size());
-        for (Iterator iter = list.iterator(); iter.hasNext();) {
-            Object element = (Object)iter.next();
+        for (Iterator iter = list.iterator(); iter.hasNext(); ) {
+            Object element = (Object) iter.next();
             marshalPrimitive(out, element);
         }
     }
 
-    public static List<Object> unmarshalPrimitiveList(DataInputStream in) throws IOException {
+    public static List<Object> unmarshalPrimitiveList(DataInput in) throws IOException {
         int size = in.readInt();
         List<Object> answer = new ArrayList<Object>(size);
         while (size-- > 0) {
@@ -112,148 +110,148 @@ public final class MarshallingSupport {
         return answer;
     }
 
-    public static void marshalPrimitive(DataOutputStream out, Object value) throws IOException {
+    public static void marshalPrimitive(DataOutput out, Object value) throws IOException {
         if (value == null) {
             marshalNull(out);
         } else if (value.getClass() == Boolean.class) {
-            marshalBoolean(out, ((Boolean)value).booleanValue());
+            marshalBoolean(out, ((Boolean) value).booleanValue());
         } else if (value.getClass() == Byte.class) {
-            marshalByte(out, ((Byte)value).byteValue());
+            marshalByte(out, ((Byte) value).byteValue());
         } else if (value.getClass() == Character.class) {
-            marshalChar(out, ((Character)value).charValue());
+            marshalChar(out, ((Character) value).charValue());
         } else if (value.getClass() == Short.class) {
-            marshalShort(out, ((Short)value).shortValue());
+            marshalShort(out, ((Short) value).shortValue());
         } else if (value.getClass() == Integer.class) {
-            marshalInt(out, ((Integer)value).intValue());
+            marshalInt(out, ((Integer) value).intValue());
         } else if (value.getClass() == Long.class) {
-            marshalLong(out, ((Long)value).longValue());
+            marshalLong(out, ((Long) value).longValue());
         } else if (value.getClass() == Float.class) {
-            marshalFloat(out, ((Float)value).floatValue());
+            marshalFloat(out, ((Float) value).floatValue());
         } else if (value.getClass() == Double.class) {
-            marshalDouble(out, ((Double)value).doubleValue());
+            marshalDouble(out, ((Double) value).doubleValue());
         } else if (value.getClass() == byte[].class) {
-            marshalByteArray(out, (byte[])value);
+            marshalByteArray(out, (byte[]) value);
         } else if (value.getClass() == String.class) {
-            marshalString(out, (String)value);
+            marshalString(out, (String) value);
         } else if (value instanceof Map) {
             out.writeByte(MAP_TYPE);
-            marshalPrimitiveMap((Map)value, out);
+            marshalPrimitiveMap((Map) value, out);
         } else if (value instanceof List) {
             out.writeByte(LIST_TYPE);
-            marshalPrimitiveList((List)value, out);
+            marshalPrimitiveList((List) value, out);
         } else {
             throw new IOException("Object is not a primitive: " + value);
         }
     }
 
-    public static Object unmarshalPrimitive(DataInputStream in) throws IOException {
+    public static Object unmarshalPrimitive(DataInput in) throws IOException {
         Object value = null;
         byte type = in.readByte();
         switch (type) {
-        case BYTE_TYPE:
-            value = Byte.valueOf(in.readByte());
-            break;
-        case BOOLEAN_TYPE:
-            value = in.readBoolean() ? Boolean.TRUE : Boolean.FALSE;
-            break;
-        case CHAR_TYPE:
-            value = Character.valueOf(in.readChar());
-            break;
-        case SHORT_TYPE:
-            value = Short.valueOf(in.readShort());
-            break;
-        case INTEGER_TYPE:
-            value = Integer.valueOf(in.readInt());
-            break;
-        case LONG_TYPE:
-            value = Long.valueOf(in.readLong());
-            break;
-        case FLOAT_TYPE:
-            value = new Float(in.readFloat());
-            break;
-        case DOUBLE_TYPE:
-            value = new Double(in.readDouble());
-            break;
-        case BYTE_ARRAY_TYPE:
-            value = new byte[in.readInt()];
-            in.readFully((byte[])value);
-            break;
-        case STRING_TYPE:
-            value = in.readUTF();
-            break;
-        case BIG_STRING_TYPE:
-            value = readUTF8(in);
-            break;
-        case MAP_TYPE:
-            value = unmarshalPrimitiveMap(in);
-            break;
-        case LIST_TYPE:
-            value = unmarshalPrimitiveList(in);
-            break;
-        case NULL:
-            value = null;
-            break;
-        default:
-            throw new IOException("Unknown primitive type: " + type);
+            case BYTE_TYPE:
+                value = Byte.valueOf(in.readByte());
+                break;
+            case BOOLEAN_TYPE:
+                value = in.readBoolean() ? Boolean.TRUE : Boolean.FALSE;
+                break;
+            case CHAR_TYPE:
+                value = Character.valueOf(in.readChar());
+                break;
+            case SHORT_TYPE:
+                value = Short.valueOf(in.readShort());
+                break;
+            case INTEGER_TYPE:
+                value = Integer.valueOf(in.readInt());
+                break;
+            case LONG_TYPE:
+                value = Long.valueOf(in.readLong());
+                break;
+            case FLOAT_TYPE:
+                value = new Float(in.readFloat());
+                break;
+            case DOUBLE_TYPE:
+                value = new Double(in.readDouble());
+                break;
+            case BYTE_ARRAY_TYPE:
+                value = new byte[in.readInt()];
+                in.readFully((byte[]) value);
+                break;
+            case STRING_TYPE:
+                value = in.readUTF();
+                break;
+            case BIG_STRING_TYPE:
+                value = readUTF8(in);
+                break;
+            case MAP_TYPE:
+                value = unmarshalPrimitiveMap(in);
+                break;
+            case LIST_TYPE:
+                value = unmarshalPrimitiveList(in);
+                break;
+            case NULL:
+                value = null;
+                break;
+            default:
+                throw new IOException("Unknown primitive type: " + type);
         }
         return value;
     }
 
-    public static void marshalNull(DataOutputStream out) throws IOException {
+    public static void marshalNull(DataOutput out) throws IOException {
         out.writeByte(NULL);
     }
 
-    public static void marshalBoolean(DataOutputStream out, boolean value) throws IOException {
+    public static void marshalBoolean(DataOutput out, boolean value) throws IOException {
         out.writeByte(BOOLEAN_TYPE);
         out.writeBoolean(value);
     }
 
-    public static void marshalByte(DataOutputStream out, byte value) throws IOException {
+    public static void marshalByte(DataOutput out, byte value) throws IOException {
         out.writeByte(BYTE_TYPE);
         out.writeByte(value);
     }
 
-    public static void marshalChar(DataOutputStream out, char value) throws IOException {
+    public static void marshalChar(DataOutput out, char value) throws IOException {
         out.writeByte(CHAR_TYPE);
         out.writeChar(value);
     }
 
-    public static void marshalShort(DataOutputStream out, short value) throws IOException {
+    public static void marshalShort(DataOutput out, short value) throws IOException {
         out.writeByte(SHORT_TYPE);
         out.writeShort(value);
     }
 
-    public static void marshalInt(DataOutputStream out, int value) throws IOException {
+    public static void marshalInt(DataOutput out, int value) throws IOException {
         out.writeByte(INTEGER_TYPE);
         out.writeInt(value);
     }
 
-    public static void marshalLong(DataOutputStream out, long value) throws IOException {
+    public static void marshalLong(DataOutput out, long value) throws IOException {
         out.writeByte(LONG_TYPE);
         out.writeLong(value);
     }
 
-    public static void marshalFloat(DataOutputStream out, float value) throws IOException {
+    public static void marshalFloat(DataOutput out, float value) throws IOException {
         out.writeByte(FLOAT_TYPE);
         out.writeFloat(value);
     }
 
-    public static void marshalDouble(DataOutputStream out, double value) throws IOException {
+    public static void marshalDouble(DataOutput out, double value) throws IOException {
         out.writeByte(DOUBLE_TYPE);
         out.writeDouble(value);
     }
 
-    public static void marshalByteArray(DataOutputStream out, byte[] value) throws IOException {
+    public static void marshalByteArray(DataOutput out, byte[] value) throws IOException {
         marshalByteArray(out, value, 0, value.length);
     }
 
-    public static void marshalByteArray(DataOutputStream out, byte[] value, int offset, int length) throws IOException {
+    public static void marshalByteArray(DataOutput out, byte[] value, int offset, int length) throws IOException {
         out.writeByte(BYTE_ARRAY_TYPE);
         out.writeInt(length);
         out.write(value, offset, length);
     }
 
-    public static void marshalString(DataOutputStream out, String s) throws IOException {
+    public static void marshalString(DataOutput out, String s) throws IOException {
         // If it's too big, out.writeUTF may not able able to write it out.
         if (s.length() < Short.MAX_VALUE / 4) {
             out.writeByte(STRING_TYPE);
@@ -286,23 +284,23 @@ public final class MarshallingSupport {
             }
             // TODO diff: Sun code - removed
             byte[] bytearr = new byte[utflen + 4]; // TODO diff: Sun code
-            bytearr[count++] = (byte)((utflen >>> 24) & 0xFF); // TODO diff:
+            bytearr[count++] = (byte) ((utflen >>> 24) & 0xFF); // TODO diff:
             // Sun code
-            bytearr[count++] = (byte)((utflen >>> 16) & 0xFF); // TODO diff:
+            bytearr[count++] = (byte) ((utflen >>> 16) & 0xFF); // TODO diff:
             // Sun code
-            bytearr[count++] = (byte)((utflen >>> 8) & 0xFF);
-            bytearr[count++] = (byte)((utflen >>> 0) & 0xFF);
+            bytearr[count++] = (byte) ((utflen >>> 8) & 0xFF);
+            bytearr[count++] = (byte) ((utflen >>> 0) & 0xFF);
             for (int i = 0; i < strlen; i++) {
                 c = charr[i];
                 if ((c >= 0x0001) && (c <= 0x007F)) {
-                    bytearr[count++] = (byte)c;
+                    bytearr[count++] = (byte) c;
                 } else if (c > 0x07FF) {
-                    bytearr[count++] = (byte)(0xE0 | ((c >> 12) & 0x0F));
-                    bytearr[count++] = (byte)(0x80 | ((c >> 6) & 0x3F));
-                    bytearr[count++] = (byte)(0x80 | ((c >> 0) & 0x3F));
+                    bytearr[count++] = (byte) (0xE0 | ((c >> 12) & 0x0F));
+                    bytearr[count++] = (byte) (0x80 | ((c >> 6) & 0x3F));
+                    bytearr[count++] = (byte) (0x80 | ((c >> 0) & 0x3F));
                 } else {
-                    bytearr[count++] = (byte)(0xC0 | ((c >> 6) & 0x1F));
-                    bytearr[count++] = (byte)(0x80 | ((c >> 0) & 0x3F));
+                    bytearr[count++] = (byte) (0xC0 | ((c >> 6) & 0x1F));
+                    bytearr[count++] = (byte) (0x80 | ((c >> 0) & 0x3F));
                 }
             }
             dataOut.write(bytearr);
@@ -327,47 +325,47 @@ public final class MarshallingSupport {
             while (count < utflen) {
                 c = bytearr[count] & 0xff;
                 switch (c >> 4) {
-                case 0:
-                case 1:
-                case 2:
-                case 3:
-                case 4:
-                case 5:
-                case 6:
-                case 7:
+                    case 0:
+                    case 1:
+                    case 2:
+                    case 3:
+                    case 4:
+                    case 5:
+                    case 6:
+                    case 7:
                     /* 0xxxxxxx */
-                    count++;
-                    str.append((char)c);
-                    break;
-                case 12:
-                case 13:
+                        count++;
+                        str.append((char) c);
+                        break;
+                    case 12:
+                    case 13:
                     /* 110x xxxx 10xx xxxx */
-                    count += 2;
-                    if (count > utflen) {
-                        throw new UTFDataFormatException();
-                    }
-                    char2 = bytearr[count - 1];
-                    if ((char2 & 0xC0) != 0x80) {
-                        throw new UTFDataFormatException();
-                    }
-                    str.append((char)(((c & 0x1F) << 6) | (char2 & 0x3F)));
-                    break;
-                case 14:
+                        count += 2;
+                        if (count > utflen) {
+                            throw new UTFDataFormatException();
+                        }
+                        char2 = bytearr[count - 1];
+                        if ((char2 & 0xC0) != 0x80) {
+                            throw new UTFDataFormatException();
+                        }
+                        str.append((char) (((c & 0x1F) << 6) | (char2 & 0x3F)));
+                        break;
+                    case 14:
                     /* 1110 xxxx 10xx xxxx 10xx xxxx */
-                    count += 3;
-                    if (count > utflen) {
-                        throw new UTFDataFormatException();
-                    }
-                    char2 = bytearr[count - 2]; // TODO diff: Sun code
-                    char3 = bytearr[count - 1]; // TODO diff: Sun code
-                    if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) {
-                        throw new UTFDataFormatException();
-                    }
-                    str.append((char)(((c & 0x0F) << 12) | ((char2 & 0x3F) << 6) | ((char3 & 0x3F) << 0)));
-                    break;
-                default:
+                        count += 3;
+                        if (count > utflen) {
+                            throw new UTFDataFormatException();
+                        }
+                        char2 = bytearr[count - 2]; // TODO diff: Sun code
+                        char3 = bytearr[count - 1]; // TODO diff: Sun code
+                        if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) {
+                            throw new UTFDataFormatException();
+                        }
+                        str.append((char) (((c & 0x0F) << 12) | ((char2 & 0x3F) << 6) | ((char3 & 0x3F) << 0)));
+                        break;
+                    default:
                     /* 10xx xxxx, 1111 xxxx */
-                    throw new UTFDataFormatException();
+                        throw new UTFDataFormatException();
                 }
             }
             // The number of chars produced may be less than utflen
@@ -377,6 +375,5 @@ public final class MarshallingSupport {
         }
     }
 
-   
 
 }

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/MarshallingSupport.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/MarshallingSupport.java
------------------------------------------------------------------------------
    svn:executable = *

Copied: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/MemberImpl.java (from r752825, activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/MemberImpl.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/MemberImpl.java?p2=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/MemberImpl.java&p1=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/MemberImpl.java&r1=752825&r2=1504961&rev=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/MemberImpl.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/MemberImpl.java Fri Jul 19 18:44:21 2013
@@ -14,109 +14,109 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activeblaze.group;
+package org.apache.activeblaze.wire;
 
+import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
 
+import org.apache.activeblaze.Subscription;
+import org.apache.activeblaze.group.Member;
 import org.apache.activeblaze.impl.destination.DestinationMatch;
-import org.apache.activeblaze.wire.MemberData;
-import org.apache.activeblaze.wire.MemberData.MemberDataBean;
-import org.apache.activeblaze.wire.MemberData.MemberDataBuffer;
-import org.apache.activemq.protobuf.Buffer;
 
 /**
  * Implementation of a Member
- * 
  */
-public class MemberImpl implements Member, Comparable<MemberImpl> {
-    private MemberDataBuffer data;
-    private final InetSocketAddress socketAddress;
-    private final Buffer socketAddressAsBuffer;
+public class MemberImpl extends Packet implements Member, Comparable<MemberImpl> {
+    private boolean observer;
+    private boolean lockedMaster;
+    private String name;
+    private long startTime;
+    private long timeStamp;
+    private long masterWeight;
+    private long refinedWeight;
+    private List<String> groups = new CopyOnWriteArrayList<String>();
+    private List<Subscription> subscriptions = new CopyOnWriteArrayList<Subscription>();
+    private InetSocketAddress socketAddress;
+
+    /**
+     * Constructor
+     */
+    MemberImpl() {
+    }
 
     /**
      * Default constructor
-     * 
-     * @param id
-     * @param name
-     * @param masterWeight
-     * @param refinedWeight
-     * @param localURI
-     * @throws Exception
      */
     public MemberImpl(String id, String name, long masterWeight, long refinedWeight, URI localURI) throws Exception {
+        setId(id);
         InetAddress addr = InetAddress.getByName(localURI.getHost());
         this.socketAddress = new InetSocketAddress(addr, localURI.getPort());
-        this.socketAddressAsBuffer = new Buffer(this.socketAddress.toString());
-        MemberDataBean bean = new MemberDataBean();
-        bean.setId(id);
-        bean.setName(name);
-        bean.setMasterWeight(masterWeight);
-        bean.setRefinedWeight(refinedWeight);
-        bean.setStartTime(System.currentTimeMillis());
-        bean.setInetAddress(new Buffer(addr.getHostAddress()));
-        bean.setPort(localURI.getPort());
-        this.data=bean.freeze();
+        this.name = name;
+        this.masterWeight = masterWeight;
+        this.refinedWeight = refinedWeight;
+        this.startTime = System.currentTimeMillis();
+        this.timeStamp = this.startTime;
     }
 
-    /**
-     * Constructor
-     * 
-     * @param data
-     * @throws Exception
-     */
-    public MemberImpl(MemberDataBuffer data) throws Exception {
-        this.data = data;
-        InetAddress addr = InetAddress.getByName(data.getInetAddress().toStringUtf8());
-        this.socketAddress = new InetSocketAddress(addr, data.getPort());
-        this.socketAddressAsBuffer = new Buffer(this.socketAddress.toString());
+    public MemberImpl clone() {
+        MemberImpl result = new MemberImpl();
+        copy(result);
+        return result;
+    }
+
+    protected void copy(MemberImpl copy) {
+        super.copy(copy);
+        copy.observer = this.observer;
+        copy.lockedMaster = this.lockedMaster;
+        copy.name = this.name;
+        copy.startTime = this.startTime;
+        copy.timeStamp = this.timeStamp;
+        copy.masterWeight = this.masterWeight;
+        copy.refinedWeight = this.refinedWeight;
+        copy.groups.addAll(this.groups);
+        copy.subscriptions.addAll(this.subscriptions);
+        copy.socketAddress = this.socketAddress;
+    }
+
+    public int getPacketType() {
+        return PacketType.MEMBER.getNumber();
     }
 
     /**
      * @return the name
      */
     public String getName() {
-       return this.data.getName();
+        return this.name;
     }
-    
+
     /**
      * Set the name
-     * @param name
      */
     public void setName(String name) {
-        this.data = this.data.copy().setName(name).freeze();
+        this.name = name;
     }
 
     public void setMasterWeight(long masterWeight) {
-        this.data = this.data.copy().setMasterWeight(masterWeight).freeze();
-    }
-
-    /**
-     * @return the id
-     */
-    public String getId() {
-        return this.data.getId();
-    }
-
-    void setId(String id) {
-        this.data = this.data.copy().setId(id).freeze();
+        this.masterWeight = masterWeight;
     }
 
     /**
      * @return the startTime
      */
     public long getStartTime() {
-        return this.data.getStartTime();
+        return this.startTime;
     }
 
     /**
      * @return the inbox destination
      */
     public String getInBoxDestination() {
-        return this.data.getId();
+        return getId();
     }
 
     /**
@@ -127,33 +127,24 @@ public class MemberImpl implements Membe
     }
 
     /**
-     * @return address as a Buffer
-     */
-    public Buffer getAddressAsBuffer() {
-        return this.socketAddressAsBuffer;
-    }
-
-    /**
      * @return the timeStamp
      */
     public long getTimeStamp() {
-        return this.data.getTimeStamp();
+        return this.timeStamp;
     }
 
     /**
      * Set the timestamp
-     * 
-     * @param value
      */
     public void setTimeStamp(long value) {
-        this.data = this.data.copy().setTimeStamp(value).freeze();
+        this.timeStamp = value;
     }
 
     /**
      * @return the masterWeight
      */
     public long getMasterWeight() {
-        return this.data.getMasterWeight();
+        return this.masterWeight;
     }
 
     /**
@@ -161,7 +152,7 @@ public class MemberImpl implements Membe
      * @see org.apache.activeblaze.group.Member#getRefinedMasterWeight()
      */
     public long getRefinedMasterWeight() {
-        return this.data.getRefinedWeight();
+        return this.refinedWeight;
     }
 
     public String toString() {
@@ -169,33 +160,26 @@ public class MemberImpl implements Membe
     }
 
     public int hashCode() {
-        return this.data.getId().hashCode();
+        return this.getId().hashCode();
     }
 
     public boolean equals(Object obj) {
         boolean result = false;
         if (obj instanceof MemberImpl) {
             MemberImpl other = (MemberImpl) obj;
-            result = this.data.getId().equals(other.data.getId());
+            result = this.getId().equals(other.getId());
         }
         return result;
     }
 
     /**
-     * @return the data
-     */
-    public MemberDataBuffer getData() {
-        return this.data;
-    }
-
-    /**
-     * * Compares this member with the specified member for order. Returns a negative integer, zero, or a positive
-     * integer as this object is less than, equal to, or greater than the specified member.
-     * <p>
-     * 
-     * @param member
-     * @return a negative integer, zero, or a positive integer as this member is less than, equal to, or greater than
-     *         the specified member.
+     * * Compares this member with the specified member for order. Returns a
+     * negative integer, zero, or a positive integer as this object is less
+     * than, equal to, or greater than the specified member.
+     * <p/>
+     *
+     * @return a negative integer, zero, or a positive integer as this member is
+     *         less than, equal to, or greater than the specified member.
      * @see java.lang.Comparable#compareTo(java.lang.Object)
      */
     public int compareTo(MemberImpl member) {
@@ -217,9 +201,7 @@ public class MemberImpl implements Membe
      * @param groupName
      */
     public void addToGroup(String groupName) {
-        {//synchronized (this.data) {
-            this.data = this.data.copy().addGroups(new Buffer(groupName)).freeze();
-        }
+        this.groups.add(groupName);
     }
 
     /**
@@ -227,35 +209,37 @@ public class MemberImpl implements Membe
      * @see org.apache.activeblaze.group.Member#getGroups()
      */
     public List<String> getGroups() {
-        List<Buffer> list = null;
-        synchronized (this.data) {
-            list = new ArrayList<Buffer>(this.data.getGroupsList());
-        }
-        List<String> result = new ArrayList<String>(list.size());
-        for (Buffer b : list) {
-            result.add(b.toStringUtf8());
-        }
-        return result;
+        return this.groups;
     }
 
     /**
      * @param groupName
      */
     public void removeFromGroup(String groupName) {
-        {//synchronized (this.data) {
-            this.data.getGroupsList().remove(new Buffer(groupName));
-        }
+        this.groups.remove(groupName);
     }
 
-    protected boolean isInSameGroup(MemberImpl other) {
-        { //synchronized (other.data) {
-            {  // synchronized (this.data) {
-                List<Buffer> list = this.data.getGroupsList();
-                if( list == null ) {
-                    return false;
-                }
-                for (Buffer b : list) {
-                    for (Buffer o : other.data.getGroupsList()) {
+    public void addSubscription(Subscription s) {
+        this.subscriptions.add(s);
+    }
+
+    public void removeSubscription(Subscription s) {
+        this.subscriptions.remove(s);
+    }
+
+    public void clearSubscriptions() {
+        this.subscriptions.clear();
+    }
+
+    public List<Subscription> getSubscriptions() {
+        return this.subscriptions;
+    }
+
+    public boolean isInSameGroup(MemberImpl other) {
+        { // synchronized (other.data) {
+            { // synchronized (this.data) {
+                for (String b : this.groups) {
+                    for (String o : other.groups) {
                         if (DestinationMatch.isMatch(b, o)) {
                             return true;
                         }
@@ -266,5 +250,58 @@ public class MemberImpl implements Membe
         return false;
     }
 
-
+    protected void preWrite() {
+        super.preWrite();
+        this.byteBool.writeBoolean(this.observer);
+        this.byteBool.writeBoolean(this.lockedMaster);
+    }
+
+    public void write(BufferOutputStream out) throws IOException {
+        super.write(out);
+        out.writeString(getName());
+        out.writeLong(this.startTime);
+        out.writeLong(this.timeStamp);
+        out.writeLong(this.masterWeight);
+        out.writeLong(this.refinedWeight);
+        List<String> list = new ArrayList<String>(this.groups);
+        out.writeShort(list.size());
+        for (String s : list) {
+            out.writeString(s);
+        }
+        List<Subscription> list2 = new ArrayList<Subscription>(this.subscriptions);
+        out.writeShort(list2.size());
+        for (Subscription s : list2) {
+            s.write(out);
+        }
+        InetAddress addr = this.socketAddress.getAddress();
+        out.write(new Buffer(addr.getAddress()));
+        out.writeInt(this.socketAddress.getPort());
+    }
+
+    public void read(BufferInputStream in) throws IOException {
+        super.read(in);
+        this.observer = this.byteBool.readBoolean();
+        this.lockedMaster = this.byteBool.readBoolean();
+        this.name = in.readString();
+        this.startTime = in.readLong();
+        this.timeStamp = in.readLong();
+        this.masterWeight = in.readLong();
+        this.refinedWeight = in.readLong();
+        this.groups.clear();
+        int size = in.readShort();
+        for (int i = 0; i < size; i++) {
+            this.groups.add(in.readString());
+        }
+        this.subscriptions.clear();
+        size = in.readShort();
+        for (int i = 0; i < size; i++) {
+            Subscription s = new Subscription();
+            s.read(in);
+            this.subscriptions.add(s);
+        }
+        Buffer buffer = in.readBuffer();
+        InetAddress addr = InetAddress.getByAddress(buffer.toByteArray());
+        int port = in.readInt();
+        this.socketAddress = new InetSocketAddress(addr, port);
+    }
 }

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/MemberImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Copied: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/Packet.java (from r754975, activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/Packet.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/Packet.java?p2=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/Packet.java&p1=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/Packet.java&r1=754975&r2=1504961&rev=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/Packet.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/Packet.java Fri Jul 19 18:44:21 2013
@@ -14,188 +14,280 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activeblaze.impl.processor;
+package org.apache.activeblaze.wire;
 
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
+import java.io.IOException;
 import java.net.SocketAddress;
-import org.apache.activeblaze.wire.PacketData;
-import org.apache.activeblaze.wire.PacketData.PacketDataBuffer;
 
 /**
- * Wrapper for PacketData
- * 
+ * Base class for
  */
-public final class Packet {
-    private SocketAddress from;
-    private SocketAddress to;
+public class Packet implements Cloneable {
+    private transient SocketAddress from;
+    private transient SocketAddress to;
     private String id;
-    private PacketDataBuffer packetData;
+    private String correlationId;
+    private boolean responseRequired;
+    private boolean response;
+    private boolean replayed;
+    private String producerId;
+    private long messageSequence;
+    private transient boolean reliable;
+    private transient boolean noLocal;
+    protected ByteBool byteBool = new ByteBool();
+
+    public Packet clone() {
+        Packet result = new Packet();
+        copy(result);
+        return result;
+    }
+
+    protected void copy(Packet copy) {
+        copy.from = this.from;
+        copy.to = this.to;
+        copy.id = this.id;
+        copy.correlationId = this.correlationId;
+        copy.responseRequired = this.responseRequired;
+        copy.response = this.response;
+        copy.replayed = this.replayed;
+        copy.producerId = this.producerId;
+        copy.messageSequence = this.messageSequence;
+        copy.reliable = this.reliable;
+    }
 
     /**
-     * Internal Constructor
-     * 
-     * @param id
-     * @param data
+     * Get the packetType
+     *
+     * @return the packetType
      */
-    private Packet(String id, PacketDataBuffer data) {
-        this.id = id;
-        this.packetData = data;
-        this.from = null;
-        this.to = null;
+    public int getPacketType() {
+        return PacketType.PACKET.getNumber();
     }
 
     /**
-     * Construct a Packet from PacketData
-     * 
-     * @param data
+     * Get the from
+     *
+     * @return the from
      */
-    public Packet(PacketDataBuffer data) {
-        this.packetData = data;
-        this.from = null;
-        this.to = null;
+    public SocketAddress getFrom() {
+        return this.from;
     }
 
     /**
-     * Construct a Packet received
-     * 
-     * @param from
-     * @param data
-     * @throws Exception
+     * Set the from
+     *
+     * @param from the from to set
      */
-    public Packet(SocketAddress from, PacketDataBuffer data) throws Exception {
+    public void setFrom(SocketAddress from) {
         this.from = from;
-        this.packetData = data;
-        this.to = null;
     }
 
     /**
-     * Construct a Packet to send
-     * 
-     * @param toAddress
-     * @param toPort
-     * @param data
+     * Get the to
+     *
+     * @return the to
      */
-    public Packet(InetAddress toAddress, int toPort, PacketDataBuffer data) {
-        this.to = new InetSocketAddress(toAddress, toPort);
-        this.packetData = data;
-        this.from = null;
+    public SocketAddress getTo() {
+        return this.to;
     }
 
     /**
-     * Construct a Packet to send
-     * 
-     * @param toAddress
-     * @param toPort
-     * @param data
+     * Set the to
+     *
+     * @param to the to to set
      */
-    public Packet(String toAddress, int toPort, PacketDataBuffer data) {
-        this.to = new InetSocketAddress(toAddress, toPort);
-        this.packetData = data;
-        this.from = null;
-    }
-
-    public String toString() {
-        StringBuilder builder = new StringBuilder("Packet:");
-        builder.append(getId());
-        builder.append("[");
-        builder.append(getPacketData().toString());
-        builder.append("]");
-        return builder.toString();
+    public void setTo(SocketAddress to) {
+        this.to = to;
     }
 
     /**
+     * Get the id
+     *
      * @return the id
      */
     public String getId() {
-        if (this.id == null && this.packetData != null) {
-            if (this.packetData.hasMessageId()) {
-                this.id = this.packetData.getMessageId().toStringUtf8();
-            }
-        }
         return this.id;
     }
 
     /**
-     * @return the message sequence
+     * Set the id
+     *
+     * @param id the id to set
      */
-    public long getMessageSequence() {
-        return this.packetData.getMessageSequence();
+    public void setId(String id) {
+        this.id = id;
     }
 
     /**
-     * @return the packetData
+     * @return the correlationId
      */
-    public PacketDataBuffer getPacketData() {
-        return this.packetData;
+    public String getCorrelationId() {
+        return this.correlationId;
     }
 
-    
     /**
-     * 
-     * @return a deep copy of <Code>this</Code>
-     * @see java.lang.Object#clone()
+     * @param correlationId the correlationId to set
      */
-    public Packet clone() {
-        Packet result = new Packet(this.id, packetData);
-        result.to = this.to;
-        result.from = this.from;
-        return result;
+    public void setCorrelationId(String correlationId) {
+        this.correlationId = correlationId;
     }
 
     /**
-     * @return the from
+     * Get the reponseRequired
+     *
+     * @return the reponseRequired
      */
-    public SocketAddress getFrom() {
-        return this.from;
+    public boolean isResponseRequired() {
+        return this.responseRequired;
     }
 
     /**
-     * @param address
+     * Set the reponseRequired
+     *
+     * @param responseRequired the reponseRequired to set
      */
-    public void setFrom(SocketAddress address) {
-        this.from = address;
+    public void setResponseRequired(boolean responseRequired) {
+        this.responseRequired = responseRequired;
     }
 
     /**
-     * @return the to
+     * Get the response
+     *
+     * @return the response
      */
-    public SocketAddress getTo() {
-        return this.to;
+    public boolean isResponse() {
+        return this.response;
     }
 
     /**
-     * @param to
-     *            the to to set
+     * Set the response
+     *
+     * @param response the response to set
      */
-    public void setTo(SocketAddress to) {
-        this.to = to;
+    public void setResponse(boolean response) {
+        this.response = response;
     }
 
     /**
-     * Is this Packet replayed
-     * 
-     * @return true if replayed
+     * Get the replayed
+     *
+     * @return the replayed
      */
     public boolean isReplayed() {
-        return this.packetData.getReplayed();
+        return this.replayed;
     }
 
     /**
-     * @return true if a response
+     * Set the replayed
+     *
+     * @param replayed the replayed to set
      */
-    public boolean isResponse() {
-        return this.packetData.getResponse();
+    public void setReplayed(boolean replayed) {
+        this.replayed = replayed;
     }
 
     /**
-     * @return true if response required
+     * Get the producerId
+     *
+     * @return the producerId
      */
-    public boolean isResponseRequired() {
-        return this.packetData.getResponseRequired();
+    public String getProducerId() {
+        return this.producerId;
+    }
+
+    /**
+     * Set the producerId
+     *
+     * @param producerId the producerId to set
+     */
+    public void setProducerId(String producerId) {
+        this.producerId = producerId;
+    }
+
+    /**
+     * Get the messageSequence
+     *
+     * @return the messageSequence
+     */
+    public long getMessageSequence() {
+        return this.messageSequence;
+    }
+
+    /**
+     * Set the messageSequence
+     *
+     * @param messageSequence the messageSequence to set
+     */
+    public void setMessageSequence(long messageSequence) {
+        this.messageSequence = messageSequence;
     }
 
-    public void setPacketData(PacketDataBuffer packetData) {
-        this.packetData = packetData;
+    /**
+     * Get the reliable
+     *
+     * @return the reliable
+     */
+    public boolean isReliable() {
+        return this.reliable;
+    }
+
+    /**
+     * Set the reliable
+     *
+     * @param reliable the reliable to set
+     */
+    public void setReliable(boolean reliable) {
+        this.reliable = reliable;
+    }
+
+    /**
+     * Get the noLocal
+     *
+     * @return the noLocal
+     */
+    public boolean isNoLocal() {
+        return this.noLocal;
+    }
+
+    /**
+     * Set the noLocal
+     *
+     * @param noLocal the noLocal to set
+     */
+    public void setNoLocal(boolean noLocal) {
+        this.noLocal = noLocal;
+    }
+
+    /**
+     * Initialize from a Buffer
+     */
+    public void read(BufferInputStream in) throws IOException {
+        this.id = in.readString();
+        this.correlationId = in.readString();
+        this.producerId = in.readString();
+        this.messageSequence = in.readLong();
+        this.byteBool.setData(in.readByte());
+        this.responseRequired = this.byteBool.readBoolean();
+        this.response = this.byteBool.readBoolean();
+        this.replayed = this.byteBool.readBoolean();
+    }
+
+    protected void preWrite() {
+        this.byteBool.clear();
+        this.byteBool.writeBoolean(this.responseRequired);
+        this.byteBool.writeBoolean(this.response);
+        this.byteBool.writeBoolean(this.replayed);
+    }
+
+    /**
+     * Write state to a Buffer
+     */
+    public void write(BufferOutputStream out) throws IOException {
+        preWrite();
+        out.writeString(this.id);
+        out.writeString(this.correlationId);
+        out.writeString(this.producerId);
+        out.writeLong(this.messageSequence);
+        out.writeByte(this.byteBool.getData());
     }
-}
+}
\ No newline at end of file

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/Packet.java
------------------------------------------------------------------------------
    svn:eol-style = native

Copied: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/PacketAudit.java (from r752825, activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/PacketAudit.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/PacketAudit.java?p2=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/PacketAudit.java&p1=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/PacketAudit.java&r1=752825&r2=1504961&rev=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/PacketAudit.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/PacketAudit.java Fri Jul 19 18:44:21 2013
@@ -14,28 +14,35 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activeblaze.impl.processor;
+package org.apache.activeblaze.wire;
 
 import java.util.LinkedHashMap;
 import java.util.Map;
+
 import org.apache.activeblaze.BaseService;
 import org.apache.activeblaze.util.BitArrayBin;
-import org.apache.activeblaze.wire.PacketData;
-import org.apache.activemq.protobuf.Buffer;
 
 /**
  * Checks for duplicates
- * 
  */
 public class PacketAudit extends BaseService {
+    public static final int DEFAULT_MAX_AUDIT_DEPTH = 2048;
+    public static final int DEFAULT_MAX_CHANNELS = 256;
     private LinkedHashMap<Buffer, BitArrayBin> cache;
-    private int maxChannels = 256;
-    private int maxAuditDepth = 1024;
+    private int maxChannels = DEFAULT_MAX_CHANNELS;
+    private int maxAuditDepth = DEFAULT_MAX_AUDIT_DEPTH;
 
+    /**
+     * @see org.apache.activeblaze.BaseService#doShutDown()
+     */
     public void doShutDown() throws Exception {
         this.cache = null;
     }
 
+    /**
+     * @see org.apache.activeblaze.BaseService#doStart()
+     */
+    @SuppressWarnings("serial")
     public void doStart() throws Exception {
         if (this.cache == null) {
             this.cache = new LinkedHashMap<Buffer, BitArrayBin>() {
@@ -54,8 +61,7 @@ public class PacketAudit extends BaseSer
     }
 
     /**
-     * @param maxChannels
-     *            the maxChannels to set
+     * @param maxChannels the maxChannels to set
      */
     public void setMaxChannels(int maxChannels) {
         this.maxChannels = maxChannels;
@@ -63,32 +69,27 @@ public class PacketAudit extends BaseSer
 
     /**
      * tests for a duplicate message
-     * 
-     * @param packet
-     * @return
+     *
+     * @return true if a duplicate message
      */
     public boolean isDuplicate(Packet packet) {
-        PacketData data = packet.getPacketData();
-        return isDuplicate(data.getProducerId(), data.getMessageSequence());
+        return isDuplicate(packet.getProducerId(), packet.getMessageSequence());
     }
 
     /**
      * tests for a duplicate message
-     * 
-     * @param producerId
-     * @param id
-     * @return
+     *
+     * @return true if a duplicate message
      */
     public boolean isDuplicate(String producerId, long id) {
-        return isDuplicate(new Buffer(producerId), id);
+        if (producerId != null) {
+            return isDuplicate(new Buffer(producerId), id);
+        }
+        return false;
     }
 
     /**
      * tests for a duplicate message
-     * 
-     * @param key
-     * @param id
-     * @return
      */
     public boolean isDuplicate(Buffer key, long id) {
         boolean result = false;
@@ -114,15 +115,13 @@ public class PacketAudit extends BaseSer
     }
 
     /**
-     * @param maxAuditDepth
-     *            the maxAuditDepth to set
+     * @param maxAuditDepth the maxAuditDepth to set
      */
     public void setMaxAuditDepth(int maxAuditDepth) {
         this.maxAuditDepth = maxAuditDepth;
     }
 
     /**
-     * @throws Exception
      * @see org.apache.activeblaze.BaseService#doInit()
      */
     @Override
@@ -131,7 +130,6 @@ public class PacketAudit extends BaseSer
     }
 
     /**
-     * @throws Exception
      * @see org.apache.activeblaze.BaseService#doStop()
      */
     @Override

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/PacketAudit.java
------------------------------------------------------------------------------
    svn:eol-style = native

Copied: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/StateValue.java (from r752825, activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/StateValue.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/StateValue.java?p2=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/StateValue.java&p1=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/StateValue.java&r1=752825&r2=1504961&rev=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/StateValue.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/StateValue.java Fri Jul 19 18:44:21 2013
@@ -14,79 +14,324 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activeblaze.cluster;
+package org.apache.activeblaze.wire;
 
-import org.apache.activeblaze.util.IOUtils;
-import org.apache.activeblaze.wire.StateData.StateDataBean;
+import java.io.IOException;
 
 /**
  * Holds information about the Value in the Map
- * 
  */
-class StateValue {
-    private final StateKey key;
-    private final StateDataBean data;
+public class StateValue extends Packet {
+    public enum StateType {
+        NOT_SET, INSERT, DELETE, SYNC
+    }
+
+    ;
+    private StateKey key;
     private Object value;
+    private StateType type = StateType.NOT_SET;
+    private Object oldValue;
+    private boolean mapUpdate;
+    private boolean mapWrite;
+    private boolean expired;
+    private boolean lockExpired;
+    private boolean lockUpdate;
+    private boolean lockWrite;
+    private boolean error;
+
+    /**
+     * Constructor
+     */
+    public StateValue() {
+    }
 
     /**
      * Constructor
-     * 
-     * @param key
-     * @param value
      */
-    StateValue(StateKey key, Object value, StateDataBean data) {
+    public StateValue(StateKey key, Object value) {
         this.key = key;
         this.value = value;
-        this.data = data;
     }
 
-    StateValue(StateDataBean data) throws Exception {
-        this.key = new StateKey(data.getKeyData().copy());
-        this.data = data;
+    public int getPacketType() {
+        return PacketType.STATE.getNumber();
+    }
+
+    public StateValue clone() {
+        StateValue result = new StateValue();
+        copy(result);
+        return result;
+    }
+
+    protected void copy(StateValue copy) {
+        super.copy(copy);
+        copy.key = this.key;
+        copy.value = this.value;
+        copy.oldValue = this.oldValue;
+        copy.mapUpdate = this.mapUpdate;
+        copy.mapWrite = this.mapWrite;
+        copy.expired = this.expired;
+        copy.lockExpired = this.lockExpired;
+        copy.lockUpdate = this.lockUpdate;
+        copy.lockWrite = this.lockWrite;
+        copy.error = this.error;
+        copy.type = this.type;
+    }
+
+    public int hashCode() {
+        return this.value != null ? this.value.hashCode() : super.hashCode();
     }
 
-    StateValue copy() throws Exception {
-        return new StateValue(this.data.copy());
+    public boolean equals(Object obj) {
+        boolean result = false;
+        if (obj instanceof StateValue) {
+            StateValue other = (StateValue) obj;
+            result = (this.value == null && other.value == null)
+                    || (this.value != null && other.value != null && this.value.equals(other.value));
+        }
+        return result;
     }
 
     /**
      * @return the owner
      */
-    StateKey getKey() {
+    public StateKey getKey() {
         return this.key;
     }
 
     /**
+     * Set the key
+     *
+     * @param key the key to set
+     */
+    public void setKey(StateKey key) {
+        this.key = key;
+    }
+
+    /**
      * @return the key
-     * @throws Exception
      */
-    Object getValue() throws Exception {
-        if (this.value == null && this.data != null) {
-            if (this.data.getValue() != null) {
-                this.value = IOUtils.getObject(this.data.getValue());
-            }
-        }
+    public Object getValue() throws Exception {
         return this.value;
     }
 
     /**
-     * @return the data
+     * Set the value
+     *
+     * @param value the value to set
      */
-    StateDataBean getData() {
-        return this.data;
+    public void setValue(Object value) {
+        this.value = value;
     }
 
-    public int hashCode() {
-        return this.value != null ? this.value.hashCode() : super.hashCode();
+    /**
+     * Get the oldValue
+     *
+     * @return the oldValue
+     */
+    public Object getOldValue() {
+        return this.oldValue;
     }
 
-    public boolean equals(Object obj) {
-        boolean result = false;
-        if (obj instanceof StateValue) {
-            StateValue other = (StateValue) obj;
-            result = (this.value == null && other.value == null)
-                    || (this.value != null && other.value != null && this.value.equals(other.value));
-        }
+    /**
+     * Set the oldValue
+     *
+     * @param oldValue the oldValue to set
+     */
+    public void setOldValue(Object oldValue) {
+        this.oldValue = oldValue;
+    }
+
+    /**
+     * Get the mapUpdate
+     *
+     * @return the mapUpdate
+     */
+    public boolean isMapUpdate() {
+        return this.mapUpdate;
+    }
+
+    /**
+     * Set the mapUpdate
+     *
+     * @param mapUpdate the mapUpdate to set
+     */
+    public void setMapUpdate(boolean mapUpdate) {
+        this.mapUpdate = mapUpdate;
+    }
+
+    /**
+     * Get the mapWrite
+     *
+     * @return the mapWrite
+     */
+    public boolean isMapWrite() {
+        return this.mapWrite;
+    }
+
+    /**
+     * Set the mapWrite
+     *
+     * @param mapWrite the mapWrite to set
+     */
+    public void setMapWrite(boolean mapWrite) {
+        this.mapWrite = mapWrite;
+    }
+
+    /**
+     * Get the expired
+     *
+     * @return the expired
+     */
+    public boolean isExpired() {
+        return this.expired;
+    }
+
+    /**
+     * Set the expired
+     *
+     * @param expired the expired to set
+     */
+    public void setExpired(boolean expired) {
+        this.expired = expired;
+    }
+
+    /**
+     * Get the lockExpired
+     *
+     * @return the lockExpired
+     */
+    public boolean isLockExpired() {
+        return this.lockExpired;
+    }
+
+    /**
+     * Set the lockExpired
+     *
+     * @param lockExpired the lockExpired to set
+     */
+    public void setLockExpired(boolean lockExpired) {
+        this.lockExpired = lockExpired;
+    }
+
+    /**
+     * Get the lockUpdate
+     *
+     * @return the lockUpdate
+     */
+    public boolean isLockUpdate() {
+        return this.lockUpdate;
+    }
+
+    /**
+     * Set the lockUpdate
+     *
+     * @param lockUpdate the lockUpdate to set
+     */
+    public void setLockUpdate(boolean lockUpdate) {
+        this.lockUpdate = lockUpdate;
+    }
+
+    /**
+     * Get the lockWrite
+     *
+     * @return the lockWrite
+     */
+    public boolean isLockWrite() {
+        return this.lockWrite;
+    }
+
+    /**
+     * Set the lockWrite
+     *
+     * @param lockWrite the lockWrite to set
+     */
+    public void setLockWrite(boolean lockWrite) {
+        this.lockWrite = lockWrite;
+    }
+
+    /**
+     * Get the error
+     *
+     * @return the error
+     */
+    public boolean isError() {
+        return this.error;
+    }
+
+    /**
+     * Set the error
+     *
+     * @param error the error to set
+     */
+    public void setError(boolean error) {
+        this.error = error;
+    }
+
+    public void read(BufferInputStream in) throws IOException {
+        super.read(in);
+        String typeStr = in.readUTF();
+        this.type = StateType.valueOf(typeStr);
+        ByteBool bb = new ByteBool();
+        bb.setData(in.readByte());
+        this.mapUpdate = bb.readBoolean();
+        this.mapWrite = bb.readBoolean();
+        this.expired = bb.readBoolean();
+        this.lockExpired = bb.readBoolean();
+        this.lockUpdate = bb.readBoolean();
+        this.lockWrite = bb.readBoolean();
+        this.error = bb.readBoolean();
+        this.value = IOUtils.readObject(in);
+        this.oldValue = IOUtils.readObject(in);
+        this.key = new StateKey();
+        this.key.read(in);
+    }
+
+    /**
+     * Write state to a Buffer
+     */
+    public void write(BufferOutputStream out) throws IOException {
+        super.write(out);
+        out.writeUTF(this.type.toString());
+        ByteBool bb = new ByteBool();
+        bb.writeBoolean(this.mapUpdate);
+        bb.writeBoolean(this.mapWrite);
+        bb.writeBoolean(this.expired);
+        bb.writeBoolean(this.lockExpired);
+        bb.writeBoolean(this.lockUpdate);
+        bb.writeBoolean(this.lockWrite);
+        bb.writeBoolean(this.error);
+        out.writeByte(bb.getData());
+        IOUtils.writeObject(out, this.value);
+        IOUtils.writeObject(out, this.oldValue);
+        this.key.write(out);
+    }
+
+    /**
+     * Get the type
+     *
+     * @return the type
+     */
+    public StateType getType() {
+        return this.type;
+    }
+
+    /**
+     * Set the type
+     *
+     * @param type the type to set
+     */
+    public void setType(StateType type) {
+        this.type = type;
+    }
+
+    /**
+     * @return a String
+     * @see java.lang.Object#toString()
+     */
+    public String toString() {
+        String result = "StateValue(" + this.type + ")key=" + this.key + ",value=" + this.value + ",oldValue="
+                + this.oldValue;
         return result;
     }
 }
\ No newline at end of file

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/StateValue.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/activemq-blaze/trunk/src/main/proto/blaze.proto
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/proto/blaze.proto?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/proto/blaze.proto (original)
+++ activemq/activemq-blaze/trunk/src/main/proto/blaze.proto Fri Jul 19 18:44:21 2013
@@ -48,8 +48,7 @@ message PacketData {   
   optional bytes payload= 11;
   optional bytes messageId =12;
   optional bytes correlationId = 13;
-  optional DestinationData destinationData = 14;  
-  optional int32 payloadType = 15;
+  optional int32 payloadType = 14;
 }
     
 message BlazeData {
@@ -59,9 +58,11 @@ message BlazeData {
   optional int64 timestamp = 4;
   optional int64 expiration = 5;
   optional bytes messageType = 6;
-  optional DestinationData replyToData = 7;  
-  optional MapData mapData = 8;
-  optional bytes payload = 9; 
+  optional DestinationData destinationData = 7;  
+  optional DestinationData replyToData = 8;  
+  optional MapData mapData = 9;
+  optional bytes payload = 10; 
+  optional bytes correlationId = 11;
 }
     
 message AckData {

Modified: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazeChannelTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazeChannelTest.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazeChannelTest.java (original)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazeChannelTest.java Fri Jul 19 18:44:21 2013
@@ -21,79 +21,95 @@ import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+
 import junit.framework.TestCase;
 
 /**
  * Basic test for BlazeChannel
- * 
  */
 public class BlazeChannelTest extends TestCase {
     public void testChannel() throws Exception {
-        int count = 1000;
-        final AtomicInteger received = new AtomicInteger();
-        String destination = "test.foo";
-        BlazeChannelFactory factory = new BlazeChannelFactory();
-        configureChannelFactory(factory, 2);
-        BlazeChannel sender = factory.createChannel();
-        BlazeChannel receiver = factory.createChannel();
-        sender.start();
-        receiver.start();
-        final CountDownLatch latch = new CountDownLatch(count);
-        receiver.addBlazeTopicMessageListener(destination, new BlazeMessageListener() {
-            public void onMessage(BlazeMessage message) {
-                message.size();
-                received.incrementAndGet();
-                latch.countDown();
+        try {
+            setGroupSize(2);
+            final int COUNT = 10000;
+            final AtomicInteger received = new AtomicInteger();
+            String destination = "test.foo";
+
+            BlazeChannel sender = getChannelFactory().createChannel();
+            BlazeChannel receiver = getChannelFactory().createChannel();
+            sender.start();
+            receiver.start();
+            final CountDownLatch latch = new CountDownLatch(COUNT);
+            receiver.addBlazeTopicMessageListener(destination, new BlazeMessageListener() {
+                public void onMessage(BlazeMessage message) {
+                    message.size();
+                    received.incrementAndGet();
+                    latch.countDown();
+                }
+            });
+            BlazeMessage msg = new BlazeMessage();
+            msg.setText("value");
+            for (int i = 0; i < COUNT; i++) {
+                sender.broadcast(destination, msg);
+                //Thread.sleep(100);
             }
-        });
-        BlazeMessage msg = new BlazeMessage();
-        msg.setText("value");
-        for (int i = 0; i < count; i++) {
-            sender.broadcast(destination, msg);
-            //Thread.sleep(100);
+            latch.await(10, TimeUnit.SECONDS);
+            receiver.stop();
+            sender.stop();
+            assertEquals("Too many messages not sent ", 0, latch.getCount());
+        } finally {
+            resetGroupSize();
         }
-        latch.await(10, TimeUnit.SECONDS);
-        receiver.stop();
-        sender.stop();
-        assertEquals("Too many messages not sent ", 0, latch.getCount());
     }
 
     public void testGroupBroadcast() throws Exception {
-        final int GROUP_SIZE = 10;
-        String destination = "test.foo";
-        final AtomicInteger count = new AtomicInteger();
-        List<BlazeChannel> channels = new ArrayList<BlazeChannel>();
-        BlazeChannelFactory factory = new BlazeChannelFactory();
-        configureChannelFactory(factory, GROUP_SIZE);
-        for (int i = 0; i < GROUP_SIZE; i++) {
-            BlazeChannel channel = factory.createChannel();
-            channel.start();
-            channels.add(channel);
-            channel.addBlazeTopicMessageListener(destination, new BlazeMessageListener() {
-                public void onMessage(BlazeMessage message) {
-                    synchronized (count) {
-                        if (count.incrementAndGet() == GROUP_SIZE) {
-                            count.notifyAll();
+        try {
+            final int GROUP_SIZE = 10;
+            setGroupSize(GROUP_SIZE);
+
+            String destination = "test.foo";
+            final AtomicInteger count = new AtomicInteger();
+            List<BlazeChannel> channels = new ArrayList<BlazeChannel>();
+
+            for (int i = 0; i < GROUP_SIZE; i++) {
+                BlazeChannel channel = getChannelFactory().createChannel();
+                channel.start();
+                channels.add(channel);
+                channel.addBlazeTopicMessageListener(destination, new BlazeMessageListener() {
+                    public void onMessage(BlazeMessage message) {
+                        synchronized (count) {
+                            if (count.incrementAndGet() == GROUP_SIZE) {
+                                count.notifyAll();
+                            }
                         }
                     }
+                });
+            }
+            BlazeMessage msg = new BlazeMessage();
+            msg.setText("hello");
+            channels.get(0).broadcast(destination, msg);
+            synchronized (count) {
+                if (count.get() < GROUP_SIZE) {
+                    count.wait(5000);
                 }
-            });
-        }
-        BlazeMessage msg = new BlazeMessage();
-        msg.setText("hello");
-        channels.get(0).broadcast(destination, msg);
-        synchronized (count) {
-            if (count.get() < GROUP_SIZE) {
-                count.wait(5000);
             }
-        }
-       
-        assertEquals(GROUP_SIZE, count.get());
-        for (BlazeChannel channel : channels) {
-            channel.shutDown();
+
+            assertEquals(GROUP_SIZE, count.get());
+            for (BlazeChannel channel : channels) {
+                channel.shutDown();
+            }
+        } finally {
+            resetGroupSize();
         }
     }
-    
-    protected void configureChannelFactory(BlazeChannelFactory fac, int groupNumber) {
+
+    protected void setGroupSize(int groupSize) {
+    }
+
+    protected void resetGroupSize() {
+    }
+
+    protected BlazeChannelFactory getChannelFactory() {
+        return new BlazeChannelFactory();
     }
 }

Modified: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazeMessageTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazeMessageTest.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazeMessageTest.java (original)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazeMessageTest.java Fri Jul 19 18:44:21 2013
@@ -20,12 +20,13 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.Enumeration;
 import java.util.List;
+
 import junit.framework.TestCase;
-import org.apache.activeblaze.BlazeMessage;
+import org.apache.activeblaze.wire.Buffer;
+import org.apache.activeblaze.wire.IOUtils;
 
 /**
  * Test BlazeMessage
- * 
  */
 public class BlazeMessageTest extends TestCase {
     private final String NAME = "testName";
@@ -132,6 +133,13 @@ public class BlazeMessageTest extends Te
         assertEquals(msg.getStringValue(this.NAME), str);
     }
 
+    public void testGetText() throws Exception {
+        String str = "test";
+        BlazeMessage msg = new BlazeMessage(str);
+        msg = msg.clone();
+        assertEquals(msg.getText(), str);
+    }
+
     public void testGetBytes() throws Exception {
         BlazeMessage msg = new BlazeMessage();
         byte[] bytes1 = new byte[3];
@@ -239,4 +247,12 @@ public class BlazeMessageTest extends Te
         assertTrue(msg.containsKey("exists"));
         assertFalse(msg.containsKey("doesntExist"));
     }
+
+    public void testMarshall() throws Exception {
+        String str = "test";
+        BlazeMessage msg = new BlazeMessage(str);
+        Buffer buffer = IOUtils.writePacket(msg);
+        msg = (BlazeMessage) IOUtils.readPacket(buffer);
+        assertEquals(msg.getText(), str);
+    }
 }

Modified: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazePointcastChannelTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazePointcastChannelTest.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazePointcastChannelTest.java (original)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazePointcastChannelTest.java Fri Jul 19 18:44:21 2013
@@ -16,21 +16,39 @@
  */
 package org.apache.activeblaze;
 
+import java.util.LinkedList;
+
 /**
- * 
- * 
+ *
+ *
  */
 public class BlazePointcastChannelTest extends BlazeChannelTest {
-    protected void configureChannelFactory(BlazeChannelFactory fac, int groupNumber) {
-        int portStart = 61616;
+    private static int PORTSTART = 61616;
+    private LinkedList<Integer> portList = new LinkedList<Integer>();
+
+    protected void setGroupSize(int groupSize) {
+        for (int i = 0; i < groupSize; i++) {
+            portList.add((PORTSTART + i));
+        }
+    }
+
+    protected void resetGroupSize() {
+        portList.clear();
+    }
+
+
+    protected BlazeChannelFactory getChannelFactory() {
+        BlazeChannelFactory result = new BlazeChannelFactory();
         String uri = "static://(";
-        for (int i = 0; i < groupNumber; i++) {
-            uri += "udp://localhost:" + (portStart + i);
+        for (int i = 0; i < portList.size(); i++) {
+            uri += "udp://localhost:" + (portList.get(i));
             uri += ",";
         }
         uri += ")";
-        fac.getConfiguration().setManagementURI("");
-        fac.getConfiguration().setBroadcastURI(uri);
-        fac.getConfiguration().setReliableBroadcast("swp");
+        result.getConfiguration().setManagementURI("");
+        result.getConfiguration().setBroadcastURI(uri);
+        result.getConfiguration().setReliableBroadcast("swp");
+        portList.addLast(portList.removeFirst());
+        return result;
     }
 }

Modified: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelTest.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelTest.java (original)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelTest.java Fri Jul 19 18:44:21 2013
@@ -19,185 +19,223 @@ package org.apache.activeblaze.cluster;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
+
 import junit.framework.TestCase;
-import org.apache.activeblaze.BlazeChannelFactory;
 import org.apache.activeblaze.group.Member;
 
 /**
  * Test for clustered channel
- * 
  */
 public class BlazeClusterGroupChannelTest extends TestCase {
-    public void XtestOneChannel() throws Exception {
-        BlazeClusterGroupChannelFactory factory = new BlazeClusterGroupChannelFactory();
-        BlazeClusterGroupChannel channel = factory.createChannel("test");
-        assertEquals(1, channel.getMembers().size());
-        channel.start();
-        boolean electionFinished = channel
-                .waitForElection((int) (channel.getConfiguration().getAwaitGroupTimeout() + 500));
-        assertTrue(electionFinished);
-        assertEquals(1, channel.getMembers().size());
-        channel.shutDown();
-    }
-
-    public void XtestGroup() throws Exception {
-        final int number = 3;
-        List<BlazeClusterGroupChannel> channels = new ArrayList<BlazeClusterGroupChannel>();
-        BlazeClusterGroupChannelFactory factory = new BlazeClusterGroupChannelFactory();
-        configureChannelFactory(factory, number);
-        for (int i = 0; i < number; i++) {
-            BlazeClusterGroupChannel channel = factory.createChannel("test" + i);
-            channel.getConfiguration().setMinimumGroupSize(number);
-            channel.addToGroup("test");
+
+    public void testOneChannel() throws Exception {
+        try {
+            setGroupSize(1);
+
+            BlazeClusterGroupChannel channel = getChannelFactory().createChannel("test");
+            assertEquals(1, channel.getMembers().size());
             channel.start();
-            channels.add(channel);
+            boolean electionFinished = channel.waitForElection(0);
+            assertTrue(electionFinished);
+            assertEquals(1, channel.getMembers().size());
+            channel.shutDown();
+        } finally {
+            resetGroupSize();
         }
-        channels.get(number - 1).waitForElection(0);
-        int masterNumber = 0;
-        BlazeClusterGroupChannel master = null;
-        for (BlazeClusterGroupChannel channel : channels) {
-            if (channel.isMaster()) {
-                masterNumber++;
-                master = channel;
+    }
+
+    public void testChangedWeightedGroup() throws Exception {
+
+        try {
+            final int NUMBER = 4;
+            setGroupSize(NUMBER);
+            List<BlazeClusterGroupChannel> channels = new ArrayList<BlazeClusterGroupChannel>();
+
+            BlazeClusterGroupChannel weightedMaster = null;
+            for (int i = 0; i < NUMBER; i++) {
+                BlazeClusterGroupChannel channel = getChannelFactory().createChannel("test" + i);
+                channel.getConfiguration().setMinimumGroupSize(NUMBER);
+                channel.addToGroup("changedWeightedTest");
+                if (i == NUMBER / 2) {
+                    channel.getConfiguration().setMasterWeight(10);
+                    weightedMaster = channel;
+                } else {
+                    channel.getConfiguration().setMasterWeight(0);
+                }
+                channel.start();
+                channels.add(channel);
             }
-        }
-        assertNotNull(master);
-        assertEquals(1, masterNumber);
-        // kill the master
-        master.shutDown();
-        channels.remove(master);
-        Thread.sleep(1000);
-        channels.get(0).waitForElection(0);
-        masterNumber = 0;
-        master = null;
-        for (BlazeClusterGroupChannel channel : channels) {
-            if (channel.isMaster()) {
-                masterNumber++;
-                master = channel;
+            channels.get(NUMBER - 1).waitForElection(0);
+            int masterNumber = 0;
+            BlazeClusterGroupChannel master = null;
+            for (BlazeClusterGroupChannel channel : channels) {
+                if (channel.isMaster()) {
+                    masterNumber++;
+                    master = channel;
+                }
             }
-        }
-        assertNotNull(master);
-        assertEquals(1, masterNumber);
-        for (BlazeClusterGroupChannel channel : channels) {
-            channel.shutDown();
+            assertNotNull(master);
+            assertEquals(1, masterNumber);
+            assertTrue(master == weightedMaster);
+            BlazeClusterGroupChannel newMaster = channels.get(0);
+            assertTrue(newMaster != weightedMaster);
+            newMaster.getConfiguration().setMasterWeight(2000);
+            // wait for a new heartbeat to be generated
+            Thread.sleep(newMaster.getConfiguration().getHeartBeatInterval() + 500);
+            newMaster.waitForElection(0);
+            masterNumber = 0;
+            master = null;
+            for (BlazeClusterGroupChannel channel : channels) {
+                if (channel.isMaster()) {
+                    masterNumber++;
+                    master = channel;
+                }
+            }
+            assertEquals(1, masterNumber);
+            assertNotNull(master);
+            assertTrue(master == newMaster);
+            for (BlazeClusterGroupChannel channel : channels) {
+                channel.shutDown();
+            }
+        } finally {
+            resetGroupSize();
         }
     }
 
-    public void testWeightedGroup() throws Exception {
-        final int number = 4;
-        List<BlazeClusterGroupChannel> channels = new ArrayList<BlazeClusterGroupChannel>();
-        BlazeClusterGroupChannelFactory factory = new BlazeClusterGroupChannelFactory();
-        configureChannelFactory(factory, number);
-        configureChannelFactory(factory, number);
-        BlazeClusterGroupChannel weightedMaster = null;
-        for (int i = 0; i < number; i++) {
-            BlazeClusterGroupChannel channel = factory.createChannel("test" + i);
-            channel.addToGroup("test");
-            channel.getConfiguration().setMinimumGroupSize(number);
-            if (i == number / 2) {
-                channel.getConfiguration().setMasterWeight(10);
-                weightedMaster = channel;
-            } else {
-                channel.getConfiguration().setMasterWeight(0);
+
+    public void testGroup() throws Exception {
+        try {
+            final int NUMBER = 4;
+            setGroupSize(NUMBER);
+            List<BlazeClusterGroupChannel> channels = new ArrayList<BlazeClusterGroupChannel>();
+
+            for (int i = 0; i < NUMBER; i++) {
+                BlazeClusterGroupChannel channel = getChannelFactory().createChannel("test" + i);
+                channel.getConfiguration().setMinimumGroupSize(NUMBER - 2);
+                channel.addToGroup("test");
+                channel.start();
+                channels.add(channel);
+            }
+            channels.get(NUMBER - 1).waitForElection(0);
+            int masterNumber = 0;
+            BlazeClusterGroupChannel master = null;
+            for (BlazeClusterGroupChannel channel : channels) {
+                if (channel.isMaster()) {
+                    masterNumber++;
+                    master = channel;
+                }
             }
-            channel.start();
-            channels.add(channel);
-        }
-        channels.get(number - 1).waitForElection(5000);
-        int masterNumber = 0;
-        BlazeClusterGroupChannel master = null;
-        for (BlazeClusterGroupChannel channel : channels) {
-            if (channel.isMaster()) {
-                masterNumber++;
-                master = channel;
+            assertNotNull(master);
+            assertEquals(1, masterNumber);
+            // kill the master
+            master.shutDown();
+            channels.remove(master);
+            Thread.sleep(5000);
+            channels.get(channels.size() - 1).waitForElection(0);
+            masterNumber = 0;
+            master = null;
+            for (BlazeClusterGroupChannel channel : channels) {
+                if (channel.isMaster()) {
+                    masterNumber++;
+                    master = channel;
+                }
             }
-        }
-        assertNotNull(master);
-        assertTrue(master == weightedMaster);
-        assertEquals(1, masterNumber);
-        for (BlazeClusterGroupChannel channel : channels) {
-            channel.shutDown();
+            assertNotNull(master);
+            assertEquals(1, masterNumber);
+            for (BlazeClusterGroupChannel channel : channels) {
+                channel.shutDown();
+            }
+        } finally {
+            resetGroupSize();
         }
     }
 
-    public void testChangedWeightedGroup() throws Exception {
-        final int number = 4;
-        List<BlazeClusterGroupChannel> channels = new ArrayList<BlazeClusterGroupChannel>();
-        BlazeClusterGroupChannelFactory factory = new BlazeClusterGroupChannelFactory();
-        configureChannelFactory(factory, number);
-        BlazeClusterGroupChannel weightedMaster = null;
-        for (int i = 0; i < number; i++) {
-            BlazeClusterGroupChannel channel = factory.createChannel("test" + i);
-            
-            channel.getConfiguration().setMinimumGroupSize(number);
-            channel.addToGroup("changedWeightedTest");
-            if (i == number / 2) {
-                channel.getConfiguration().setMasterWeight(10);
-                weightedMaster = channel;
-            } else {
-                channel.getConfiguration().setMasterWeight(0);
+    public void testWeightedGroup() throws Exception {
+        try {
+            final int NUMBER = 4;
+            setGroupSize(4);
+            List<BlazeClusterGroupChannel> channels = new ArrayList<BlazeClusterGroupChannel>();
+
+
+            BlazeClusterGroupChannel weightedMaster = null;
+            for (int i = 0; i < NUMBER; i++) {
+                BlazeClusterGroupChannel channel = getChannelFactory().createChannel("test" + i);
+                channel.addToGroup("test");
+                channel.getConfiguration().setMinimumGroupSize(NUMBER);
+                if (i == NUMBER / 2) {
+                    channel.getConfiguration().setMasterWeight(10);
+                    weightedMaster = channel;
+                } else {
+                    channel.getConfiguration().setMasterWeight(0);
+                }
+                channel.start();
+                channels.add(channel);
             }
-            channel.start();
-            channels.add(channel);
-        }
-        channels.get(number - 1).waitForElection(10000);
-        int masterNumber = 0;
-        BlazeClusterGroupChannel master = null;
-        for (BlazeClusterGroupChannel channel : channels) {
-            if (channel.isMaster()) {
-                masterNumber++;
-                master = channel;
+            channels.get(NUMBER - 1).waitForElection(0);
+            int masterNumber = 0;
+            BlazeClusterGroupChannel master = null;
+            for (BlazeClusterGroupChannel channel : channels) {
+                if (channel.isMaster()) {
+                    masterNumber++;
+                    master = channel;
+                }
             }
-        }
-        assertNotNull(master);
-        assertEquals(1, masterNumber);
-        assertTrue(master == weightedMaster);
-        channels.get(0).getConfiguration().setMasterWeight(2000);
-        Thread.sleep(2000);
-        masterNumber = 0;
-        master = null;
-        for (BlazeClusterGroupChannel channel : channels) {
-            if (channel.isMaster()) {
-                masterNumber++;
-                master = channel;
+            assertNotNull(master);
+            assertTrue(master == weightedMaster);
+            assertEquals(1, masterNumber);
+            for (BlazeClusterGroupChannel channel : channels) {
+                channel.shutDown();
             }
-        }
-        assertNotNull(master);
-        assertTrue(master != weightedMaster);
-        assertEquals(1, masterNumber);
-        for (BlazeClusterGroupChannel channel : channels) {
-            channel.shutDown();
+        } finally {
+            resetGroupSize();
         }
     }
 
-    public void XtestClusterChangedListener() throws Exception {
-        final AtomicBoolean result = new AtomicBoolean();
-        BlazeClusterGroupChannelFactory factory = new BlazeClusterGroupChannelFactory();
-        BlazeClusterGroupChannel master = factory.createChannel("master");
-        master.addToGroup("test");
-        master.getConfiguration().setMasterWeight(10);
-        master.start();
-        BlazeClusterGroupChannel channel = factory.createChannel("test1");
-        channel.addToGroup("test");
-        channel.addMasterChangedListener(new MasterChangedListener() {
-            public void masterChanged(Member master) {
-                synchronized (result) {
-                    result.set(true);
-                    result.notifyAll();
-                }
-            }
-        });
-        channel.start();
-        synchronized (result) {
-            if (!result.get()) {
-                result.wait(3000);
+
+    public void testClusterChangedListener() throws Exception {
+        try {
+            setGroupSize(2);
+            final AtomicBoolean result = new AtomicBoolean();
+
+            BlazeClusterGroupChannel master = getChannelFactory().createChannel("master");
+            master.addToGroup("test");
+            master.getConfiguration().setMasterWeight(10);
+
+            master.start();
+            BlazeClusterGroupChannel channel = getChannelFactory().createChannel("test1");
+            channel.addToGroup("test");
+            master.addMasterChangedListener(new MasterChangedListener() {
+                public void masterChanged(Member master) {
+                    synchronized (result) {
+                        result.set(true);
+                        result.notifyAll();
+                    }
+                }
+            });
+            channel.start();
+            channel.waitForElection(0);
+            channel.getConfiguration().setMasterWeight(1000);
+            synchronized (result) {
+                if (!result.get()) {
+                    result.wait(10000);
+                }
             }
+            assertTrue(channel.isMaster());
+            assertTrue(result.get());
+            channel.shutDown();
+            master.shutDown();
+        } finally {
+            resetGroupSize();
         }
-        assertTrue(result.get());
-        channel.shutDown();
-        master.shutDown();
     }
 
-    protected void configureChannelFactory(BlazeChannelFactory fac, int groupNumber) {
+    protected void setGroupSize(int groupSize) {
+    }
+
+    protected void resetGroupSize() {
+    }
+
+    protected BlazeClusterGroupChannelFactory getChannelFactory() {
+        return new BlazeClusterGroupChannelFactory();
     }
 }

Modified: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/BlazePointcastClusterGroupChannelTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/BlazePointcastClusterGroupChannelTest.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/BlazePointcastClusterGroupChannelTest.java (original)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/BlazePointcastClusterGroupChannelTest.java Fri Jul 19 18:44:21 2013
@@ -16,20 +16,36 @@
  */
 package org.apache.activeblaze.cluster;
 
-import org.apache.activeblaze.BlazeChannelFactory;
+import java.util.LinkedList;
 
 public class BlazePointcastClusterGroupChannelTest extends BlazeClusterGroupChannelTest {
-    int portNum = 61616;
+    private static int PORTSTART = 61616;
+    private LinkedList<Integer> portList = new LinkedList<Integer>();
 
-    protected void configureChannelFactory(BlazeChannelFactory fac, int groupNumber) {
+    protected void setGroupSize(int groupSize) {
+        for (int i = 0; i < groupSize; i++) {
+            portList.add((PORTSTART + i));
+        }
+    }
+
+    protected void resetGroupSize() {
+        portList.clear();
+    }
+
+
+    protected BlazeClusterGroupChannelFactory getChannelFactory() {
+        BlazeClusterGroupChannelFactory result = new BlazeClusterGroupChannelFactory();
         String uri = "static://(";
-        for (int i = 0; i < groupNumber; i++) {
-            uri += "udp://localhost:" + (this.portNum++);
+        for (int i = 0; i < portList.size(); i++) {
+            uri += "udp://localhost:" + (portList.get(i));
             uri += ",";
         }
         uri += ")";
-        fac.getConfiguration().setManagementURI("");
-        fac.getConfiguration().setBroadcastURI(uri);
-        fac.getConfiguration().setReliableBroadcast("swp");
+        result.getConfiguration().setManagementURI("");
+        result.getConfiguration().setBroadcastURI(uri);
+        result.getConfiguration().setReliableBroadcast("swp");
+        portList.addLast(portList.removeFirst());
+        return result;
     }
+
 }