You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2013/07/19 20:44:24 UTC
svn commit: r1504961 [10/11] - in /activemq/activemq-blaze/trunk: ./
src/main/java/org/apache/activeblaze/
src/main/java/org/apache/activeblaze/cluster/
src/main/java/org/apache/activeblaze/group/
src/main/java/org/apache/activeblaze/impl/destination/ ...
Copied: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/MarshallingSupport.java (from r752825, activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/MarshallingSupport.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/MarshallingSupport.java?p2=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/MarshallingSupport.java&p1=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/MarshallingSupport.java&r1=752825&r2=1504961&rev=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/MarshallingSupport.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/MarshallingSupport.java Fri Jul 19 18:44:21 2013
@@ -14,12 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activeblaze.jms.message;
+package org.apache.activeblaze.wire;
import java.io.DataInput;
-import java.io.DataInputStream;
import java.io.DataOutput;
-import java.io.DataOutputStream;
import java.io.IOException;
import java.io.UTFDataFormatException;
import java.util.ArrayList;
@@ -31,7 +29,7 @@ import java.util.Map;
/**
* The fixed version of the UTF8 encoding function. Some older JVM's UTF8
* encoding function breaks when handling large strings.
- *
+ *
* @version $Revision$
*/
public final class MarshallingSupport {
@@ -53,13 +51,13 @@ public final class MarshallingSupport {
private MarshallingSupport() {
}
-
- public static void marshalPrimitiveMap(Map<String,Object> map, DataOutputStream out) throws IOException {
+
+ public static void marshalPrimitiveMap(Map<String, Object> map, DataOutput out) throws IOException {
if (map == null) {
out.writeInt(-1);
} else {
out.writeInt(map.size());
- for (String name:map.keySet()) {
+ for (String name : map.keySet()) {
out.writeUTF(name);
Object value = map.get(name);
marshalPrimitive(out, value);
@@ -67,7 +65,7 @@ public final class MarshallingSupport {
}
}
- public static Map<String, Object> unmarshalPrimitiveMap(DataInputStream in) throws IOException {
+ public static Map<String, Object> unmarshalPrimitiveMap(DataInput in) throws IOException {
return unmarshalPrimitiveMap(in, Integer.MAX_VALUE);
}
@@ -77,12 +75,12 @@ public final class MarshallingSupport {
* @throws IOException
* @throws IOException
*/
- public static Map<String, Object> unmarshalPrimitiveMap(DataInputStream in, int maxPropertySize) throws IOException {
+ public static Map<String, Object> unmarshalPrimitiveMap(DataInput in, int maxPropertySize) throws IOException {
int size = in.readInt();
if (size > maxPropertySize) {
throw new IOException("Primitive map is larger than the allowed size: " + size);
}
- if (size < 0) {
+ if (size <= 0) {
return null;
} else {
Map<String, Object> rc = new HashMap<String, Object>(size);
@@ -95,15 +93,15 @@ public final class MarshallingSupport {
}
- public static void marshalPrimitiveList(List list, DataOutputStream out) throws IOException {
+ public static void marshalPrimitiveList(List list, DataOutput out) throws IOException {
out.writeInt(list.size());
- for (Iterator iter = list.iterator(); iter.hasNext();) {
- Object element = (Object)iter.next();
+ for (Iterator iter = list.iterator(); iter.hasNext(); ) {
+ Object element = (Object) iter.next();
marshalPrimitive(out, element);
}
}
- public static List<Object> unmarshalPrimitiveList(DataInputStream in) throws IOException {
+ public static List<Object> unmarshalPrimitiveList(DataInput in) throws IOException {
int size = in.readInt();
List<Object> answer = new ArrayList<Object>(size);
while (size-- > 0) {
@@ -112,148 +110,148 @@ public final class MarshallingSupport {
return answer;
}
- public static void marshalPrimitive(DataOutputStream out, Object value) throws IOException {
+ public static void marshalPrimitive(DataOutput out, Object value) throws IOException {
if (value == null) {
marshalNull(out);
} else if (value.getClass() == Boolean.class) {
- marshalBoolean(out, ((Boolean)value).booleanValue());
+ marshalBoolean(out, ((Boolean) value).booleanValue());
} else if (value.getClass() == Byte.class) {
- marshalByte(out, ((Byte)value).byteValue());
+ marshalByte(out, ((Byte) value).byteValue());
} else if (value.getClass() == Character.class) {
- marshalChar(out, ((Character)value).charValue());
+ marshalChar(out, ((Character) value).charValue());
} else if (value.getClass() == Short.class) {
- marshalShort(out, ((Short)value).shortValue());
+ marshalShort(out, ((Short) value).shortValue());
} else if (value.getClass() == Integer.class) {
- marshalInt(out, ((Integer)value).intValue());
+ marshalInt(out, ((Integer) value).intValue());
} else if (value.getClass() == Long.class) {
- marshalLong(out, ((Long)value).longValue());
+ marshalLong(out, ((Long) value).longValue());
} else if (value.getClass() == Float.class) {
- marshalFloat(out, ((Float)value).floatValue());
+ marshalFloat(out, ((Float) value).floatValue());
} else if (value.getClass() == Double.class) {
- marshalDouble(out, ((Double)value).doubleValue());
+ marshalDouble(out, ((Double) value).doubleValue());
} else if (value.getClass() == byte[].class) {
- marshalByteArray(out, (byte[])value);
+ marshalByteArray(out, (byte[]) value);
} else if (value.getClass() == String.class) {
- marshalString(out, (String)value);
+ marshalString(out, (String) value);
} else if (value instanceof Map) {
out.writeByte(MAP_TYPE);
- marshalPrimitiveMap((Map)value, out);
+ marshalPrimitiveMap((Map) value, out);
} else if (value instanceof List) {
out.writeByte(LIST_TYPE);
- marshalPrimitiveList((List)value, out);
+ marshalPrimitiveList((List) value, out);
} else {
throw new IOException("Object is not a primitive: " + value);
}
}
- public static Object unmarshalPrimitive(DataInputStream in) throws IOException {
+ public static Object unmarshalPrimitive(DataInput in) throws IOException {
Object value = null;
byte type = in.readByte();
switch (type) {
- case BYTE_TYPE:
- value = Byte.valueOf(in.readByte());
- break;
- case BOOLEAN_TYPE:
- value = in.readBoolean() ? Boolean.TRUE : Boolean.FALSE;
- break;
- case CHAR_TYPE:
- value = Character.valueOf(in.readChar());
- break;
- case SHORT_TYPE:
- value = Short.valueOf(in.readShort());
- break;
- case INTEGER_TYPE:
- value = Integer.valueOf(in.readInt());
- break;
- case LONG_TYPE:
- value = Long.valueOf(in.readLong());
- break;
- case FLOAT_TYPE:
- value = new Float(in.readFloat());
- break;
- case DOUBLE_TYPE:
- value = new Double(in.readDouble());
- break;
- case BYTE_ARRAY_TYPE:
- value = new byte[in.readInt()];
- in.readFully((byte[])value);
- break;
- case STRING_TYPE:
- value = in.readUTF();
- break;
- case BIG_STRING_TYPE:
- value = readUTF8(in);
- break;
- case MAP_TYPE:
- value = unmarshalPrimitiveMap(in);
- break;
- case LIST_TYPE:
- value = unmarshalPrimitiveList(in);
- break;
- case NULL:
- value = null;
- break;
- default:
- throw new IOException("Unknown primitive type: " + type);
+ case BYTE_TYPE:
+ value = Byte.valueOf(in.readByte());
+ break;
+ case BOOLEAN_TYPE:
+ value = in.readBoolean() ? Boolean.TRUE : Boolean.FALSE;
+ break;
+ case CHAR_TYPE:
+ value = Character.valueOf(in.readChar());
+ break;
+ case SHORT_TYPE:
+ value = Short.valueOf(in.readShort());
+ break;
+ case INTEGER_TYPE:
+ value = Integer.valueOf(in.readInt());
+ break;
+ case LONG_TYPE:
+ value = Long.valueOf(in.readLong());
+ break;
+ case FLOAT_TYPE:
+ value = new Float(in.readFloat());
+ break;
+ case DOUBLE_TYPE:
+ value = new Double(in.readDouble());
+ break;
+ case BYTE_ARRAY_TYPE:
+ value = new byte[in.readInt()];
+ in.readFully((byte[]) value);
+ break;
+ case STRING_TYPE:
+ value = in.readUTF();
+ break;
+ case BIG_STRING_TYPE:
+ value = readUTF8(in);
+ break;
+ case MAP_TYPE:
+ value = unmarshalPrimitiveMap(in);
+ break;
+ case LIST_TYPE:
+ value = unmarshalPrimitiveList(in);
+ break;
+ case NULL:
+ value = null;
+ break;
+ default:
+ throw new IOException("Unknown primitive type: " + type);
}
return value;
}
- public static void marshalNull(DataOutputStream out) throws IOException {
+ public static void marshalNull(DataOutput out) throws IOException {
out.writeByte(NULL);
}
- public static void marshalBoolean(DataOutputStream out, boolean value) throws IOException {
+ public static void marshalBoolean(DataOutput out, boolean value) throws IOException {
out.writeByte(BOOLEAN_TYPE);
out.writeBoolean(value);
}
- public static void marshalByte(DataOutputStream out, byte value) throws IOException {
+ public static void marshalByte(DataOutput out, byte value) throws IOException {
out.writeByte(BYTE_TYPE);
out.writeByte(value);
}
- public static void marshalChar(DataOutputStream out, char value) throws IOException {
+ public static void marshalChar(DataOutput out, char value) throws IOException {
out.writeByte(CHAR_TYPE);
out.writeChar(value);
}
- public static void marshalShort(DataOutputStream out, short value) throws IOException {
+ public static void marshalShort(DataOutput out, short value) throws IOException {
out.writeByte(SHORT_TYPE);
out.writeShort(value);
}
- public static void marshalInt(DataOutputStream out, int value) throws IOException {
+ public static void marshalInt(DataOutput out, int value) throws IOException {
out.writeByte(INTEGER_TYPE);
out.writeInt(value);
}
- public static void marshalLong(DataOutputStream out, long value) throws IOException {
+ public static void marshalLong(DataOutput out, long value) throws IOException {
out.writeByte(LONG_TYPE);
out.writeLong(value);
}
- public static void marshalFloat(DataOutputStream out, float value) throws IOException {
+ public static void marshalFloat(DataOutput out, float value) throws IOException {
out.writeByte(FLOAT_TYPE);
out.writeFloat(value);
}
- public static void marshalDouble(DataOutputStream out, double value) throws IOException {
+ public static void marshalDouble(DataOutput out, double value) throws IOException {
out.writeByte(DOUBLE_TYPE);
out.writeDouble(value);
}
- public static void marshalByteArray(DataOutputStream out, byte[] value) throws IOException {
+ public static void marshalByteArray(DataOutput out, byte[] value) throws IOException {
marshalByteArray(out, value, 0, value.length);
}
- public static void marshalByteArray(DataOutputStream out, byte[] value, int offset, int length) throws IOException {
+ public static void marshalByteArray(DataOutput out, byte[] value, int offset, int length) throws IOException {
out.writeByte(BYTE_ARRAY_TYPE);
out.writeInt(length);
out.write(value, offset, length);
}
- public static void marshalString(DataOutputStream out, String s) throws IOException {
+ public static void marshalString(DataOutput out, String s) throws IOException {
// If it's too big, out.writeUTF may not able able to write it out.
if (s.length() < Short.MAX_VALUE / 4) {
out.writeByte(STRING_TYPE);
@@ -286,23 +284,23 @@ public final class MarshallingSupport {
}
// TODO diff: Sun code - removed
byte[] bytearr = new byte[utflen + 4]; // TODO diff: Sun code
- bytearr[count++] = (byte)((utflen >>> 24) & 0xFF); // TODO diff:
+ bytearr[count++] = (byte) ((utflen >>> 24) & 0xFF); // TODO diff:
// Sun code
- bytearr[count++] = (byte)((utflen >>> 16) & 0xFF); // TODO diff:
+ bytearr[count++] = (byte) ((utflen >>> 16) & 0xFF); // TODO diff:
// Sun code
- bytearr[count++] = (byte)((utflen >>> 8) & 0xFF);
- bytearr[count++] = (byte)((utflen >>> 0) & 0xFF);
+ bytearr[count++] = (byte) ((utflen >>> 8) & 0xFF);
+ bytearr[count++] = (byte) ((utflen >>> 0) & 0xFF);
for (int i = 0; i < strlen; i++) {
c = charr[i];
if ((c >= 0x0001) && (c <= 0x007F)) {
- bytearr[count++] = (byte)c;
+ bytearr[count++] = (byte) c;
} else if (c > 0x07FF) {
- bytearr[count++] = (byte)(0xE0 | ((c >> 12) & 0x0F));
- bytearr[count++] = (byte)(0x80 | ((c >> 6) & 0x3F));
- bytearr[count++] = (byte)(0x80 | ((c >> 0) & 0x3F));
+ bytearr[count++] = (byte) (0xE0 | ((c >> 12) & 0x0F));
+ bytearr[count++] = (byte) (0x80 | ((c >> 6) & 0x3F));
+ bytearr[count++] = (byte) (0x80 | ((c >> 0) & 0x3F));
} else {
- bytearr[count++] = (byte)(0xC0 | ((c >> 6) & 0x1F));
- bytearr[count++] = (byte)(0x80 | ((c >> 0) & 0x3F));
+ bytearr[count++] = (byte) (0xC0 | ((c >> 6) & 0x1F));
+ bytearr[count++] = (byte) (0x80 | ((c >> 0) & 0x3F));
}
}
dataOut.write(bytearr);
@@ -327,47 +325,47 @@ public final class MarshallingSupport {
while (count < utflen) {
c = bytearr[count] & 0xff;
switch (c >> 4) {
- case 0:
- case 1:
- case 2:
- case 3:
- case 4:
- case 5:
- case 6:
- case 7:
+ case 0:
+ case 1:
+ case 2:
+ case 3:
+ case 4:
+ case 5:
+ case 6:
+ case 7:
/* 0xxxxxxx */
- count++;
- str.append((char)c);
- break;
- case 12:
- case 13:
+ count++;
+ str.append((char) c);
+ break;
+ case 12:
+ case 13:
/* 110x xxxx 10xx xxxx */
- count += 2;
- if (count > utflen) {
- throw new UTFDataFormatException();
- }
- char2 = bytearr[count - 1];
- if ((char2 & 0xC0) != 0x80) {
- throw new UTFDataFormatException();
- }
- str.append((char)(((c & 0x1F) << 6) | (char2 & 0x3F)));
- break;
- case 14:
+ count += 2;
+ if (count > utflen) {
+ throw new UTFDataFormatException();
+ }
+ char2 = bytearr[count - 1];
+ if ((char2 & 0xC0) != 0x80) {
+ throw new UTFDataFormatException();
+ }
+ str.append((char) (((c & 0x1F) << 6) | (char2 & 0x3F)));
+ break;
+ case 14:
/* 1110 xxxx 10xx xxxx 10xx xxxx */
- count += 3;
- if (count > utflen) {
- throw new UTFDataFormatException();
- }
- char2 = bytearr[count - 2]; // TODO diff: Sun code
- char3 = bytearr[count - 1]; // TODO diff: Sun code
- if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) {
- throw new UTFDataFormatException();
- }
- str.append((char)(((c & 0x0F) << 12) | ((char2 & 0x3F) << 6) | ((char3 & 0x3F) << 0)));
- break;
- default:
+ count += 3;
+ if (count > utflen) {
+ throw new UTFDataFormatException();
+ }
+ char2 = bytearr[count - 2]; // TODO diff: Sun code
+ char3 = bytearr[count - 1]; // TODO diff: Sun code
+ if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) {
+ throw new UTFDataFormatException();
+ }
+ str.append((char) (((c & 0x0F) << 12) | ((char2 & 0x3F) << 6) | ((char3 & 0x3F) << 0)));
+ break;
+ default:
/* 10xx xxxx, 1111 xxxx */
- throw new UTFDataFormatException();
+ throw new UTFDataFormatException();
}
}
// The number of chars produced may be less than utflen
@@ -377,6 +375,5 @@ public final class MarshallingSupport {
}
}
-
}
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/MarshallingSupport.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/MarshallingSupport.java
------------------------------------------------------------------------------
svn:executable = *
Copied: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/MemberImpl.java (from r752825, activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/MemberImpl.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/MemberImpl.java?p2=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/MemberImpl.java&p1=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/MemberImpl.java&r1=752825&r2=1504961&rev=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/MemberImpl.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/MemberImpl.java Fri Jul 19 18:44:21 2013
@@ -14,109 +14,109 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activeblaze.group;
+package org.apache.activeblaze.wire;
+import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import org.apache.activeblaze.Subscription;
+import org.apache.activeblaze.group.Member;
import org.apache.activeblaze.impl.destination.DestinationMatch;
-import org.apache.activeblaze.wire.MemberData;
-import org.apache.activeblaze.wire.MemberData.MemberDataBean;
-import org.apache.activeblaze.wire.MemberData.MemberDataBuffer;
-import org.apache.activemq.protobuf.Buffer;
/**
* Implementation of a Member
- *
*/
-public class MemberImpl implements Member, Comparable<MemberImpl> {
- private MemberDataBuffer data;
- private final InetSocketAddress socketAddress;
- private final Buffer socketAddressAsBuffer;
+public class MemberImpl extends Packet implements Member, Comparable<MemberImpl> {
+ private boolean observer;
+ private boolean lockedMaster;
+ private String name;
+ private long startTime;
+ private long timeStamp;
+ private long masterWeight;
+ private long refinedWeight;
+ private List<String> groups = new CopyOnWriteArrayList<String>();
+ private List<Subscription> subscriptions = new CopyOnWriteArrayList<Subscription>();
+ private InetSocketAddress socketAddress;
+
+ /**
+ * Constructor
+ */
+ MemberImpl() {
+ }
/**
* Default constructor
- *
- * @param id
- * @param name
- * @param masterWeight
- * @param refinedWeight
- * @param localURI
- * @throws Exception
*/
public MemberImpl(String id, String name, long masterWeight, long refinedWeight, URI localURI) throws Exception {
+ setId(id);
InetAddress addr = InetAddress.getByName(localURI.getHost());
this.socketAddress = new InetSocketAddress(addr, localURI.getPort());
- this.socketAddressAsBuffer = new Buffer(this.socketAddress.toString());
- MemberDataBean bean = new MemberDataBean();
- bean.setId(id);
- bean.setName(name);
- bean.setMasterWeight(masterWeight);
- bean.setRefinedWeight(refinedWeight);
- bean.setStartTime(System.currentTimeMillis());
- bean.setInetAddress(new Buffer(addr.getHostAddress()));
- bean.setPort(localURI.getPort());
- this.data=bean.freeze();
+ this.name = name;
+ this.masterWeight = masterWeight;
+ this.refinedWeight = refinedWeight;
+ this.startTime = System.currentTimeMillis();
+ this.timeStamp = this.startTime;
}
- /**
- * Constructor
- *
- * @param data
- * @throws Exception
- */
- public MemberImpl(MemberDataBuffer data) throws Exception {
- this.data = data;
- InetAddress addr = InetAddress.getByName(data.getInetAddress().toStringUtf8());
- this.socketAddress = new InetSocketAddress(addr, data.getPort());
- this.socketAddressAsBuffer = new Buffer(this.socketAddress.toString());
+ public MemberImpl clone() {
+ MemberImpl result = new MemberImpl();
+ copy(result);
+ return result;
+ }
+
+ protected void copy(MemberImpl copy) {
+ super.copy(copy);
+ copy.observer = this.observer;
+ copy.lockedMaster = this.lockedMaster;
+ copy.name = this.name;
+ copy.startTime = this.startTime;
+ copy.timeStamp = this.timeStamp;
+ copy.masterWeight = this.masterWeight;
+ copy.refinedWeight = this.refinedWeight;
+ copy.groups.addAll(this.groups);
+ copy.subscriptions.addAll(this.subscriptions);
+ copy.socketAddress = this.socketAddress;
+ }
+
+ public int getPacketType() {
+ return PacketType.MEMBER.getNumber();
}
/**
* @return the name
*/
public String getName() {
- return this.data.getName();
+ return this.name;
}
-
+
/**
* Set the name
- * @param name
*/
public void setName(String name) {
- this.data = this.data.copy().setName(name).freeze();
+ this.name = name;
}
public void setMasterWeight(long masterWeight) {
- this.data = this.data.copy().setMasterWeight(masterWeight).freeze();
- }
-
- /**
- * @return the id
- */
- public String getId() {
- return this.data.getId();
- }
-
- void setId(String id) {
- this.data = this.data.copy().setId(id).freeze();
+ this.masterWeight = masterWeight;
}
/**
* @return the startTime
*/
public long getStartTime() {
- return this.data.getStartTime();
+ return this.startTime;
}
/**
* @return the inbox destination
*/
public String getInBoxDestination() {
- return this.data.getId();
+ return getId();
}
/**
@@ -127,33 +127,24 @@ public class MemberImpl implements Membe
}
/**
- * @return address as a Buffer
- */
- public Buffer getAddressAsBuffer() {
- return this.socketAddressAsBuffer;
- }
-
- /**
* @return the timeStamp
*/
public long getTimeStamp() {
- return this.data.getTimeStamp();
+ return this.timeStamp;
}
/**
* Set the timestamp
- *
- * @param value
*/
public void setTimeStamp(long value) {
- this.data = this.data.copy().setTimeStamp(value).freeze();
+ this.timeStamp = value;
}
/**
* @return the masterWeight
*/
public long getMasterWeight() {
- return this.data.getMasterWeight();
+ return this.masterWeight;
}
/**
@@ -161,7 +152,7 @@ public class MemberImpl implements Membe
* @see org.apache.activeblaze.group.Member#getRefinedMasterWeight()
*/
public long getRefinedMasterWeight() {
- return this.data.getRefinedWeight();
+ return this.refinedWeight;
}
public String toString() {
@@ -169,33 +160,26 @@ public class MemberImpl implements Membe
}
public int hashCode() {
- return this.data.getId().hashCode();
+ return this.getId().hashCode();
}
public boolean equals(Object obj) {
boolean result = false;
if (obj instanceof MemberImpl) {
MemberImpl other = (MemberImpl) obj;
- result = this.data.getId().equals(other.data.getId());
+ result = this.getId().equals(other.getId());
}
return result;
}
/**
- * @return the data
- */
- public MemberDataBuffer getData() {
- return this.data;
- }
-
- /**
- * * Compares this member with the specified member for order. Returns a negative integer, zero, or a positive
- * integer as this object is less than, equal to, or greater than the specified member.
- * <p>
- *
- * @param member
- * @return a negative integer, zero, or a positive integer as this member is less than, equal to, or greater than
- * the specified member.
+ * * Compares this member with the specified member for order. Returns a
+ * negative integer, zero, or a positive integer as this object is less
+ * than, equal to, or greater than the specified member.
+ * <p/>
+ *
+ * @return a negative integer, zero, or a positive integer as this member is
+ * less than, equal to, or greater than the specified member.
* @see java.lang.Comparable#compareTo(java.lang.Object)
*/
public int compareTo(MemberImpl member) {
@@ -217,9 +201,7 @@ public class MemberImpl implements Membe
* @param groupName
*/
public void addToGroup(String groupName) {
- {//synchronized (this.data) {
- this.data = this.data.copy().addGroups(new Buffer(groupName)).freeze();
- }
+ this.groups.add(groupName);
}
/**
@@ -227,35 +209,37 @@ public class MemberImpl implements Membe
* @see org.apache.activeblaze.group.Member#getGroups()
*/
public List<String> getGroups() {
- List<Buffer> list = null;
- synchronized (this.data) {
- list = new ArrayList<Buffer>(this.data.getGroupsList());
- }
- List<String> result = new ArrayList<String>(list.size());
- for (Buffer b : list) {
- result.add(b.toStringUtf8());
- }
- return result;
+ return this.groups;
}
/**
* @param groupName
*/
public void removeFromGroup(String groupName) {
- {//synchronized (this.data) {
- this.data.getGroupsList().remove(new Buffer(groupName));
- }
+ this.groups.remove(groupName);
}
- protected boolean isInSameGroup(MemberImpl other) {
- { //synchronized (other.data) {
- { // synchronized (this.data) {
- List<Buffer> list = this.data.getGroupsList();
- if( list == null ) {
- return false;
- }
- for (Buffer b : list) {
- for (Buffer o : other.data.getGroupsList()) {
+ public void addSubscription(Subscription s) {
+ this.subscriptions.add(s);
+ }
+
+ public void removeSubscription(Subscription s) {
+ this.subscriptions.remove(s);
+ }
+
+ public void clearSubscriptions() {
+ this.subscriptions.clear();
+ }
+
+ public List<Subscription> getSubscriptions() {
+ return this.subscriptions;
+ }
+
+ public boolean isInSameGroup(MemberImpl other) {
+ { // synchronized (other.data) {
+ { // synchronized (this.data) {
+ for (String b : this.groups) {
+ for (String o : other.groups) {
if (DestinationMatch.isMatch(b, o)) {
return true;
}
@@ -266,5 +250,58 @@ public class MemberImpl implements Membe
return false;
}
-
+ protected void preWrite() {
+ super.preWrite();
+ this.byteBool.writeBoolean(this.observer);
+ this.byteBool.writeBoolean(this.lockedMaster);
+ }
+
+ public void write(BufferOutputStream out) throws IOException {
+ super.write(out);
+ out.writeString(getName());
+ out.writeLong(this.startTime);
+ out.writeLong(this.timeStamp);
+ out.writeLong(this.masterWeight);
+ out.writeLong(this.refinedWeight);
+ List<String> list = new ArrayList<String>(this.groups);
+ out.writeShort(list.size());
+ for (String s : list) {
+ out.writeString(s);
+ }
+ List<Subscription> list2 = new ArrayList<Subscription>(this.subscriptions);
+ out.writeShort(list2.size());
+ for (Subscription s : list2) {
+ s.write(out);
+ }
+ InetAddress addr = this.socketAddress.getAddress();
+ out.write(new Buffer(addr.getAddress()));
+ out.writeInt(this.socketAddress.getPort());
+ }
+
+ public void read(BufferInputStream in) throws IOException {
+ super.read(in);
+ this.observer = this.byteBool.readBoolean();
+ this.lockedMaster = this.byteBool.readBoolean();
+ this.name = in.readString();
+ this.startTime = in.readLong();
+ this.timeStamp = in.readLong();
+ this.masterWeight = in.readLong();
+ this.refinedWeight = in.readLong();
+ this.groups.clear();
+ int size = in.readShort();
+ for (int i = 0; i < size; i++) {
+ this.groups.add(in.readString());
+ }
+ this.subscriptions.clear();
+ size = in.readShort();
+ for (int i = 0; i < size; i++) {
+ Subscription s = new Subscription();
+ s.read(in);
+ this.subscriptions.add(s);
+ }
+ Buffer buffer = in.readBuffer();
+ InetAddress addr = InetAddress.getByAddress(buffer.toByteArray());
+ int port = in.readInt();
+ this.socketAddress = new InetSocketAddress(addr, port);
+ }
}
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/MemberImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Copied: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/Packet.java (from r754975, activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/Packet.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/Packet.java?p2=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/Packet.java&p1=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/Packet.java&r1=754975&r2=1504961&rev=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/Packet.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/Packet.java Fri Jul 19 18:44:21 2013
@@ -14,188 +14,280 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activeblaze.impl.processor;
+package org.apache.activeblaze.wire;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
+import java.io.IOException;
import java.net.SocketAddress;
-import org.apache.activeblaze.wire.PacketData;
-import org.apache.activeblaze.wire.PacketData.PacketDataBuffer;
/**
- * Wrapper for PacketData
- *
+ * Base class for
*/
-public final class Packet {
- private SocketAddress from;
- private SocketAddress to;
+public class Packet implements Cloneable {
+ private transient SocketAddress from;
+ private transient SocketAddress to;
private String id;
- private PacketDataBuffer packetData;
+ private String correlationId;
+ private boolean responseRequired;
+ private boolean response;
+ private boolean replayed;
+ private String producerId;
+ private long messageSequence;
+ private transient boolean reliable;
+ private transient boolean noLocal;
+ protected ByteBool byteBool = new ByteBool();
+
+ public Packet clone() {
+ Packet result = new Packet();
+ copy(result);
+ return result;
+ }
+
+ protected void copy(Packet copy) {
+ copy.from = this.from;
+ copy.to = this.to;
+ copy.id = this.id;
+ copy.correlationId = this.correlationId;
+ copy.responseRequired = this.responseRequired;
+ copy.response = this.response;
+ copy.replayed = this.replayed;
+ copy.producerId = this.producerId;
+ copy.messageSequence = this.messageSequence;
+ copy.reliable = this.reliable;
+ }
/**
- * Internal Constructor
- *
- * @param id
- * @param data
+ * Get the packetType
+ *
+ * @return the packetType
*/
- private Packet(String id, PacketDataBuffer data) {
- this.id = id;
- this.packetData = data;
- this.from = null;
- this.to = null;
+ public int getPacketType() {
+ return PacketType.PACKET.getNumber();
}
/**
- * Construct a Packet from PacketData
- *
- * @param data
+ * Get the from
+ *
+ * @return the from
*/
- public Packet(PacketDataBuffer data) {
- this.packetData = data;
- this.from = null;
- this.to = null;
+ public SocketAddress getFrom() {
+ return this.from;
}
/**
- * Construct a Packet received
- *
- * @param from
- * @param data
- * @throws Exception
+ * Set the from
+ *
+ * @param from the from to set
*/
- public Packet(SocketAddress from, PacketDataBuffer data) throws Exception {
+ public void setFrom(SocketAddress from) {
this.from = from;
- this.packetData = data;
- this.to = null;
}
/**
- * Construct a Packet to send
- *
- * @param toAddress
- * @param toPort
- * @param data
+ * Get the to
+ *
+ * @return the to
*/
- public Packet(InetAddress toAddress, int toPort, PacketDataBuffer data) {
- this.to = new InetSocketAddress(toAddress, toPort);
- this.packetData = data;
- this.from = null;
+ public SocketAddress getTo() {
+ return this.to;
}
/**
- * Construct a Packet to send
- *
- * @param toAddress
- * @param toPort
- * @param data
+ * Set the to
+ *
+ * @param to the to to set
*/
- public Packet(String toAddress, int toPort, PacketDataBuffer data) {
- this.to = new InetSocketAddress(toAddress, toPort);
- this.packetData = data;
- this.from = null;
- }
-
- public String toString() {
- StringBuilder builder = new StringBuilder("Packet:");
- builder.append(getId());
- builder.append("[");
- builder.append(getPacketData().toString());
- builder.append("]");
- return builder.toString();
+ public void setTo(SocketAddress to) {
+ this.to = to;
}
/**
+ * Get the id
+ *
* @return the id
*/
public String getId() {
- if (this.id == null && this.packetData != null) {
- if (this.packetData.hasMessageId()) {
- this.id = this.packetData.getMessageId().toStringUtf8();
- }
- }
return this.id;
}
/**
- * @return the message sequence
+ * Set the id
+ *
+ * @param id the id to set
*/
- public long getMessageSequence() {
- return this.packetData.getMessageSequence();
+ public void setId(String id) {
+ this.id = id;
}
/**
- * @return the packetData
+ * @return the correlationId
*/
- public PacketDataBuffer getPacketData() {
- return this.packetData;
+ public String getCorrelationId() {
+ return this.correlationId;
}
-
/**
- *
- * @return a deep copy of <Code>this</Code>
- * @see java.lang.Object#clone()
+ * @param correlationId the correlationId to set
*/
- public Packet clone() {
- Packet result = new Packet(this.id, packetData);
- result.to = this.to;
- result.from = this.from;
- return result;
+ public void setCorrelationId(String correlationId) {
+ this.correlationId = correlationId;
}
/**
- * @return the from
+ * Get the reponseRequired
+ *
+ * @return the reponseRequired
*/
- public SocketAddress getFrom() {
- return this.from;
+ public boolean isResponseRequired() {
+ return this.responseRequired;
}
/**
- * @param address
+ * Set the reponseRequired
+ *
+ * @param responseRequired the reponseRequired to set
*/
- public void setFrom(SocketAddress address) {
- this.from = address;
+ public void setResponseRequired(boolean responseRequired) {
+ this.responseRequired = responseRequired;
}
/**
- * @return the to
+ * Get the response
+ *
+ * @return the response
*/
- public SocketAddress getTo() {
- return this.to;
+ public boolean isResponse() {
+ return this.response;
}
/**
- * @param to
- * the to to set
+ * Set the response
+ *
+ * @param response the response to set
*/
- public void setTo(SocketAddress to) {
- this.to = to;
+ public void setResponse(boolean response) {
+ this.response = response;
}
/**
- * Is this Packet replayed
- *
- * @return true if replayed
+ * Get the replayed
+ *
+ * @return the replayed
*/
public boolean isReplayed() {
- return this.packetData.getReplayed();
+ return this.replayed;
}
/**
- * @return true if a response
+ * Set the replayed
+ *
+ * @param replayed the replayed to set
*/
- public boolean isResponse() {
- return this.packetData.getResponse();
+ public void setReplayed(boolean replayed) {
+ this.replayed = replayed;
}
/**
- * @return true if response required
+ * Get the producerId
+ *
+ * @return the producerId
*/
- public boolean isResponseRequired() {
- return this.packetData.getResponseRequired();
+ public String getProducerId() {
+ return this.producerId;
+ }
+
+ /**
+ * Set the producerId
+ *
+ * @param producerId the producerId to set
+ */
+ public void setProducerId(String producerId) {
+ this.producerId = producerId;
+ }
+
+ /**
+ * Get the messageSequence
+ *
+ * @return the messageSequence
+ */
+ public long getMessageSequence() {
+ return this.messageSequence;
+ }
+
+ /**
+ * Set the messageSequence
+ *
+ * @param messageSequence the messageSequence to set
+ */
+ public void setMessageSequence(long messageSequence) {
+ this.messageSequence = messageSequence;
}
- public void setPacketData(PacketDataBuffer packetData) {
- this.packetData = packetData;
+ /**
+ * Get the reliable
+ *
+ * @return the reliable
+ */
+ public boolean isReliable() {
+ return this.reliable;
+ }
+
+ /**
+ * Set the reliable
+ *
+ * @param reliable the reliable to set
+ */
+ public void setReliable(boolean reliable) {
+ this.reliable = reliable;
+ }
+
+ /**
+ * Get the noLocal
+ *
+ * @return the noLocal
+ */
+ public boolean isNoLocal() {
+ return this.noLocal;
+ }
+
+ /**
+ * Set the noLocal
+ *
+ * @param noLocal the noLocal to set
+ */
+ public void setNoLocal(boolean noLocal) {
+ this.noLocal = noLocal;
+ }
+
+ /**
+ * Initialize from a Buffer
+ */
+ public void read(BufferInputStream in) throws IOException {
+ this.id = in.readString();
+ this.correlationId = in.readString();
+ this.producerId = in.readString();
+ this.messageSequence = in.readLong();
+ this.byteBool.setData(in.readByte());
+ this.responseRequired = this.byteBool.readBoolean();
+ this.response = this.byteBool.readBoolean();
+ this.replayed = this.byteBool.readBoolean();
+ }
+
+ protected void preWrite() {
+ this.byteBool.clear();
+ this.byteBool.writeBoolean(this.responseRequired);
+ this.byteBool.writeBoolean(this.response);
+ this.byteBool.writeBoolean(this.replayed);
+ }
+
+ /**
+ * Write state to a Buffer
+ */
+ public void write(BufferOutputStream out) throws IOException {
+ preWrite();
+ out.writeString(this.id);
+ out.writeString(this.correlationId);
+ out.writeString(this.producerId);
+ out.writeLong(this.messageSequence);
+ out.writeByte(this.byteBool.getData());
}
-}
+}
\ No newline at end of file
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/Packet.java
------------------------------------------------------------------------------
svn:eol-style = native
Copied: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/PacketAudit.java (from r752825, activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/PacketAudit.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/PacketAudit.java?p2=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/PacketAudit.java&p1=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/PacketAudit.java&r1=752825&r2=1504961&rev=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/PacketAudit.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/PacketAudit.java Fri Jul 19 18:44:21 2013
@@ -14,28 +14,35 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activeblaze.impl.processor;
+package org.apache.activeblaze.wire;
import java.util.LinkedHashMap;
import java.util.Map;
+
import org.apache.activeblaze.BaseService;
import org.apache.activeblaze.util.BitArrayBin;
-import org.apache.activeblaze.wire.PacketData;
-import org.apache.activemq.protobuf.Buffer;
/**
* Checks for duplicates
- *
*/
public class PacketAudit extends BaseService {
+ public static final int DEFAULT_MAX_AUDIT_DEPTH = 2048;
+ public static final int DEFAULT_MAX_CHANNELS = 256;
private LinkedHashMap<Buffer, BitArrayBin> cache;
- private int maxChannels = 256;
- private int maxAuditDepth = 1024;
+ private int maxChannels = DEFAULT_MAX_CHANNELS;
+ private int maxAuditDepth = DEFAULT_MAX_AUDIT_DEPTH;
+ /**
+ * @see org.apache.activeblaze.BaseService#doShutDown()
+ */
public void doShutDown() throws Exception {
this.cache = null;
}
+ /**
+ * @see org.apache.activeblaze.BaseService#doStart()
+ */
+ @SuppressWarnings("serial")
public void doStart() throws Exception {
if (this.cache == null) {
this.cache = new LinkedHashMap<Buffer, BitArrayBin>() {
@@ -54,8 +61,7 @@ public class PacketAudit extends BaseSer
}
/**
- * @param maxChannels
- * the maxChannels to set
+ * @param maxChannels the maxChannels to set
*/
public void setMaxChannels(int maxChannels) {
this.maxChannels = maxChannels;
@@ -63,32 +69,27 @@ public class PacketAudit extends BaseSer
/**
* tests for a duplicate message
- *
- * @param packet
- * @return
+ *
+ * @return true if a duplicate message
*/
public boolean isDuplicate(Packet packet) {
- PacketData data = packet.getPacketData();
- return isDuplicate(data.getProducerId(), data.getMessageSequence());
+ return isDuplicate(packet.getProducerId(), packet.getMessageSequence());
}
/**
* tests for a duplicate message
- *
- * @param producerId
- * @param id
- * @return
+ *
+ * @return true if a duplicate message
*/
public boolean isDuplicate(String producerId, long id) {
- return isDuplicate(new Buffer(producerId), id);
+ if (producerId != null) {
+ return isDuplicate(new Buffer(producerId), id);
+ }
+ return false;
}
/**
* tests for a duplicate message
- *
- * @param key
- * @param id
- * @return
*/
public boolean isDuplicate(Buffer key, long id) {
boolean result = false;
@@ -114,15 +115,13 @@ public class PacketAudit extends BaseSer
}
/**
- * @param maxAuditDepth
- * the maxAuditDepth to set
+ * @param maxAuditDepth the maxAuditDepth to set
*/
public void setMaxAuditDepth(int maxAuditDepth) {
this.maxAuditDepth = maxAuditDepth;
}
/**
- * @throws Exception
* @see org.apache.activeblaze.BaseService#doInit()
*/
@Override
@@ -131,7 +130,6 @@ public class PacketAudit extends BaseSer
}
/**
- * @throws Exception
* @see org.apache.activeblaze.BaseService#doStop()
*/
@Override
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/PacketAudit.java
------------------------------------------------------------------------------
svn:eol-style = native
Copied: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/StateValue.java (from r752825, activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/StateValue.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/StateValue.java?p2=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/StateValue.java&p1=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/StateValue.java&r1=752825&r2=1504961&rev=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/StateValue.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/StateValue.java Fri Jul 19 18:44:21 2013
@@ -14,79 +14,324 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activeblaze.cluster;
+package org.apache.activeblaze.wire;
-import org.apache.activeblaze.util.IOUtils;
-import org.apache.activeblaze.wire.StateData.StateDataBean;
+import java.io.IOException;
/**
* Holds information about the Value in the Map
- *
*/
-class StateValue {
- private final StateKey key;
- private final StateDataBean data;
+public class StateValue extends Packet {
+ public enum StateType {
+ NOT_SET, INSERT, DELETE, SYNC
+ }
+
+ ;
+ private StateKey key;
private Object value;
+ private StateType type = StateType.NOT_SET;
+ private Object oldValue;
+ private boolean mapUpdate;
+ private boolean mapWrite;
+ private boolean expired;
+ private boolean lockExpired;
+ private boolean lockUpdate;
+ private boolean lockWrite;
+ private boolean error;
+
+ /**
+ * Constructor
+ */
+ public StateValue() {
+ }
/**
* Constructor
- *
- * @param key
- * @param value
*/
- StateValue(StateKey key, Object value, StateDataBean data) {
+ public StateValue(StateKey key, Object value) {
this.key = key;
this.value = value;
- this.data = data;
}
- StateValue(StateDataBean data) throws Exception {
- this.key = new StateKey(data.getKeyData().copy());
- this.data = data;
+ public int getPacketType() {
+ return PacketType.STATE.getNumber();
+ }
+
+ public StateValue clone() {
+ StateValue result = new StateValue();
+ copy(result);
+ return result;
+ }
+
+ protected void copy(StateValue copy) {
+ super.copy(copy);
+ copy.key = this.key;
+ copy.value = this.value;
+ copy.oldValue = this.oldValue;
+ copy.mapUpdate = this.mapUpdate;
+ copy.mapWrite = this.mapWrite;
+ copy.expired = this.expired;
+ copy.lockExpired = this.lockExpired;
+ copy.lockUpdate = this.lockUpdate;
+ copy.lockWrite = this.lockWrite;
+ copy.error = this.error;
+ copy.type = this.type;
+ }
+
+ public int hashCode() {
+ return this.value != null ? this.value.hashCode() : super.hashCode();
}
- StateValue copy() throws Exception {
- return new StateValue(this.data.copy());
+ public boolean equals(Object obj) {
+ boolean result = false;
+ if (obj instanceof StateValue) {
+ StateValue other = (StateValue) obj;
+ result = (this.value == null && other.value == null)
+ || (this.value != null && other.value != null && this.value.equals(other.value));
+ }
+ return result;
}
/**
* @return the owner
*/
- StateKey getKey() {
+ public StateKey getKey() {
return this.key;
}
/**
+ * Set the key
+ *
+ * @param key the key to set
+ */
+ public void setKey(StateKey key) {
+ this.key = key;
+ }
+
+ /**
* @return the key
- * @throws Exception
*/
- Object getValue() throws Exception {
- if (this.value == null && this.data != null) {
- if (this.data.getValue() != null) {
- this.value = IOUtils.getObject(this.data.getValue());
- }
- }
+ public Object getValue() throws Exception {
return this.value;
}
/**
- * @return the data
+ * Set the value
+ *
+ * @param value the value to set
*/
- StateDataBean getData() {
- return this.data;
+ public void setValue(Object value) {
+ this.value = value;
}
- public int hashCode() {
- return this.value != null ? this.value.hashCode() : super.hashCode();
+ /**
+ * Get the oldValue
+ *
+ * @return the oldValue
+ */
+ public Object getOldValue() {
+ return this.oldValue;
}
- public boolean equals(Object obj) {
- boolean result = false;
- if (obj instanceof StateValue) {
- StateValue other = (StateValue) obj;
- result = (this.value == null && other.value == null)
- || (this.value != null && other.value != null && this.value.equals(other.value));
- }
+ /**
+ * Set the oldValue
+ *
+ * @param oldValue the oldValue to set
+ */
+ public void setOldValue(Object oldValue) {
+ this.oldValue = oldValue;
+ }
+
+ /**
+ * Get the mapUpdate
+ *
+ * @return the mapUpdate
+ */
+ public boolean isMapUpdate() {
+ return this.mapUpdate;
+ }
+
+ /**
+ * Set the mapUpdate
+ *
+ * @param mapUpdate the mapUpdate to set
+ */
+ public void setMapUpdate(boolean mapUpdate) {
+ this.mapUpdate = mapUpdate;
+ }
+
+ /**
+ * Get the mapWrite
+ *
+ * @return the mapWrite
+ */
+ public boolean isMapWrite() {
+ return this.mapWrite;
+ }
+
+ /**
+ * Set the mapWrite
+ *
+ * @param mapWrite the mapWrite to set
+ */
+ public void setMapWrite(boolean mapWrite) {
+ this.mapWrite = mapWrite;
+ }
+
+ /**
+ * Get the expired
+ *
+ * @return the expired
+ */
+ public boolean isExpired() {
+ return this.expired;
+ }
+
+ /**
+ * Set the expired
+ *
+ * @param expired the expired to set
+ */
+ public void setExpired(boolean expired) {
+ this.expired = expired;
+ }
+
+ /**
+ * Get the lockExpired
+ *
+ * @return the lockExpired
+ */
+ public boolean isLockExpired() {
+ return this.lockExpired;
+ }
+
+ /**
+ * Set the lockExpired
+ *
+ * @param lockExpired the lockExpired to set
+ */
+ public void setLockExpired(boolean lockExpired) {
+ this.lockExpired = lockExpired;
+ }
+
+ /**
+ * Get the lockUpdate
+ *
+ * @return the lockUpdate
+ */
+ public boolean isLockUpdate() {
+ return this.lockUpdate;
+ }
+
+ /**
+ * Set the lockUpdate
+ *
+ * @param lockUpdate the lockUpdate to set
+ */
+ public void setLockUpdate(boolean lockUpdate) {
+ this.lockUpdate = lockUpdate;
+ }
+
+ /**
+ * Get the lockWrite
+ *
+ * @return the lockWrite
+ */
+ public boolean isLockWrite() {
+ return this.lockWrite;
+ }
+
+ /**
+ * Set the lockWrite
+ *
+ * @param lockWrite the lockWrite to set
+ */
+ public void setLockWrite(boolean lockWrite) {
+ this.lockWrite = lockWrite;
+ }
+
+ /**
+ * Get the error
+ *
+ * @return the error
+ */
+ public boolean isError() {
+ return this.error;
+ }
+
+ /**
+ * Set the error
+ *
+ * @param error the error to set
+ */
+ public void setError(boolean error) {
+ this.error = error;
+ }
+
+ public void read(BufferInputStream in) throws IOException {
+ super.read(in);
+ String typeStr = in.readUTF();
+ this.type = StateType.valueOf(typeStr);
+ ByteBool bb = new ByteBool();
+ bb.setData(in.readByte());
+ this.mapUpdate = bb.readBoolean();
+ this.mapWrite = bb.readBoolean();
+ this.expired = bb.readBoolean();
+ this.lockExpired = bb.readBoolean();
+ this.lockUpdate = bb.readBoolean();
+ this.lockWrite = bb.readBoolean();
+ this.error = bb.readBoolean();
+ this.value = IOUtils.readObject(in);
+ this.oldValue = IOUtils.readObject(in);
+ this.key = new StateKey();
+ this.key.read(in);
+ }
+
+ /**
+ * Write state to a Buffer
+ */
+ public void write(BufferOutputStream out) throws IOException {
+ super.write(out);
+ out.writeUTF(this.type.toString());
+ ByteBool bb = new ByteBool();
+ bb.writeBoolean(this.mapUpdate);
+ bb.writeBoolean(this.mapWrite);
+ bb.writeBoolean(this.expired);
+ bb.writeBoolean(this.lockExpired);
+ bb.writeBoolean(this.lockUpdate);
+ bb.writeBoolean(this.lockWrite);
+ bb.writeBoolean(this.error);
+ out.writeByte(bb.getData());
+ IOUtils.writeObject(out, this.value);
+ IOUtils.writeObject(out, this.oldValue);
+ this.key.write(out);
+ }
+
+ /**
+ * Get the type
+ *
+ * @return the type
+ */
+ public StateType getType() {
+ return this.type;
+ }
+
+ /**
+ * Set the type
+ *
+ * @param type the type to set
+ */
+ public void setType(StateType type) {
+ this.type = type;
+ }
+
+ /**
+ * @return a String
+ * @see java.lang.Object#toString()
+ */
+ public String toString() {
+ String result = "StateValue(" + this.type + ")key=" + this.key + ",value=" + this.value + ",oldValue="
+ + this.oldValue;
return result;
}
}
\ No newline at end of file
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/StateValue.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: activemq/activemq-blaze/trunk/src/main/proto/blaze.proto
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/proto/blaze.proto?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/proto/blaze.proto (original)
+++ activemq/activemq-blaze/trunk/src/main/proto/blaze.proto Fri Jul 19 18:44:21 2013
@@ -48,8 +48,7 @@ message PacketData {
optional bytes payload= 11;
optional bytes messageId =12;
optional bytes correlationId = 13;
- optional DestinationData destinationData = 14;
- optional int32 payloadType = 15;
+ optional int32 payloadType = 14;
}
message BlazeData {
@@ -59,9 +58,11 @@ message BlazeData {
optional int64 timestamp = 4;
optional int64 expiration = 5;
optional bytes messageType = 6;
- optional DestinationData replyToData = 7;
- optional MapData mapData = 8;
- optional bytes payload = 9;
+ optional DestinationData destinationData = 7;
+ optional DestinationData replyToData = 8;
+ optional MapData mapData = 9;
+ optional bytes payload = 10;
+ optional bytes correlationId = 11;
}
message AckData {
Modified: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazeChannelTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazeChannelTest.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazeChannelTest.java (original)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazeChannelTest.java Fri Jul 19 18:44:21 2013
@@ -21,79 +21,95 @@ import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+
import junit.framework.TestCase;
/**
* Basic test for BlazeChannel
- *
*/
public class BlazeChannelTest extends TestCase {
public void testChannel() throws Exception {
- int count = 1000;
- final AtomicInteger received = new AtomicInteger();
- String destination = "test.foo";
- BlazeChannelFactory factory = new BlazeChannelFactory();
- configureChannelFactory(factory, 2);
- BlazeChannel sender = factory.createChannel();
- BlazeChannel receiver = factory.createChannel();
- sender.start();
- receiver.start();
- final CountDownLatch latch = new CountDownLatch(count);
- receiver.addBlazeTopicMessageListener(destination, new BlazeMessageListener() {
- public void onMessage(BlazeMessage message) {
- message.size();
- received.incrementAndGet();
- latch.countDown();
+ try {
+ setGroupSize(2);
+ final int COUNT = 10000;
+ final AtomicInteger received = new AtomicInteger();
+ String destination = "test.foo";
+
+ BlazeChannel sender = getChannelFactory().createChannel();
+ BlazeChannel receiver = getChannelFactory().createChannel();
+ sender.start();
+ receiver.start();
+ final CountDownLatch latch = new CountDownLatch(COUNT);
+ receiver.addBlazeTopicMessageListener(destination, new BlazeMessageListener() {
+ public void onMessage(BlazeMessage message) {
+ message.size();
+ received.incrementAndGet();
+ latch.countDown();
+ }
+ });
+ BlazeMessage msg = new BlazeMessage();
+ msg.setText("value");
+ for (int i = 0; i < COUNT; i++) {
+ sender.broadcast(destination, msg);
+ //Thread.sleep(100);
}
- });
- BlazeMessage msg = new BlazeMessage();
- msg.setText("value");
- for (int i = 0; i < count; i++) {
- sender.broadcast(destination, msg);
- //Thread.sleep(100);
+ latch.await(10, TimeUnit.SECONDS);
+ receiver.stop();
+ sender.stop();
+ assertEquals("Too many messages not sent ", 0, latch.getCount());
+ } finally {
+ resetGroupSize();
}
- latch.await(10, TimeUnit.SECONDS);
- receiver.stop();
- sender.stop();
- assertEquals("Too many messages not sent ", 0, latch.getCount());
}
public void testGroupBroadcast() throws Exception {
- final int GROUP_SIZE = 10;
- String destination = "test.foo";
- final AtomicInteger count = new AtomicInteger();
- List<BlazeChannel> channels = new ArrayList<BlazeChannel>();
- BlazeChannelFactory factory = new BlazeChannelFactory();
- configureChannelFactory(factory, GROUP_SIZE);
- for (int i = 0; i < GROUP_SIZE; i++) {
- BlazeChannel channel = factory.createChannel();
- channel.start();
- channels.add(channel);
- channel.addBlazeTopicMessageListener(destination, new BlazeMessageListener() {
- public void onMessage(BlazeMessage message) {
- synchronized (count) {
- if (count.incrementAndGet() == GROUP_SIZE) {
- count.notifyAll();
+ try {
+ final int GROUP_SIZE = 10;
+ setGroupSize(GROUP_SIZE);
+
+ String destination = "test.foo";
+ final AtomicInteger count = new AtomicInteger();
+ List<BlazeChannel> channels = new ArrayList<BlazeChannel>();
+
+ for (int i = 0; i < GROUP_SIZE; i++) {
+ BlazeChannel channel = getChannelFactory().createChannel();
+ channel.start();
+ channels.add(channel);
+ channel.addBlazeTopicMessageListener(destination, new BlazeMessageListener() {
+ public void onMessage(BlazeMessage message) {
+ synchronized (count) {
+ if (count.incrementAndGet() == GROUP_SIZE) {
+ count.notifyAll();
+ }
}
}
+ });
+ }
+ BlazeMessage msg = new BlazeMessage();
+ msg.setText("hello");
+ channels.get(0).broadcast(destination, msg);
+ synchronized (count) {
+ if (count.get() < GROUP_SIZE) {
+ count.wait(5000);
}
- });
- }
- BlazeMessage msg = new BlazeMessage();
- msg.setText("hello");
- channels.get(0).broadcast(destination, msg);
- synchronized (count) {
- if (count.get() < GROUP_SIZE) {
- count.wait(5000);
}
- }
-
- assertEquals(GROUP_SIZE, count.get());
- for (BlazeChannel channel : channels) {
- channel.shutDown();
+
+ assertEquals(GROUP_SIZE, count.get());
+ for (BlazeChannel channel : channels) {
+ channel.shutDown();
+ }
+ } finally {
+ resetGroupSize();
}
}
-
- protected void configureChannelFactory(BlazeChannelFactory fac, int groupNumber) {
+
+ protected void setGroupSize(int groupSize) {
+ }
+
+ protected void resetGroupSize() {
+ }
+
+ protected BlazeChannelFactory getChannelFactory() {
+ return new BlazeChannelFactory();
}
}
Modified: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazeMessageTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazeMessageTest.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazeMessageTest.java (original)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazeMessageTest.java Fri Jul 19 18:44:21 2013
@@ -20,12 +20,13 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.Enumeration;
import java.util.List;
+
import junit.framework.TestCase;
-import org.apache.activeblaze.BlazeMessage;
+import org.apache.activeblaze.wire.Buffer;
+import org.apache.activeblaze.wire.IOUtils;
/**
* Test BlazeMessage
- *
*/
public class BlazeMessageTest extends TestCase {
private final String NAME = "testName";
@@ -132,6 +133,13 @@ public class BlazeMessageTest extends Te
assertEquals(msg.getStringValue(this.NAME), str);
}
+ public void testGetText() throws Exception {
+ String str = "test";
+ BlazeMessage msg = new BlazeMessage(str);
+ msg = msg.clone();
+ assertEquals(msg.getText(), str);
+ }
+
public void testGetBytes() throws Exception {
BlazeMessage msg = new BlazeMessage();
byte[] bytes1 = new byte[3];
@@ -239,4 +247,12 @@ public class BlazeMessageTest extends Te
assertTrue(msg.containsKey("exists"));
assertFalse(msg.containsKey("doesntExist"));
}
+
+ public void testMarshall() throws Exception {
+ String str = "test";
+ BlazeMessage msg = new BlazeMessage(str);
+ Buffer buffer = IOUtils.writePacket(msg);
+ msg = (BlazeMessage) IOUtils.readPacket(buffer);
+ assertEquals(msg.getText(), str);
+ }
}
Modified: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazePointcastChannelTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazePointcastChannelTest.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazePointcastChannelTest.java (original)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazePointcastChannelTest.java Fri Jul 19 18:44:21 2013
@@ -16,21 +16,39 @@
*/
package org.apache.activeblaze;
+import java.util.LinkedList;
+
/**
- *
- *
+ *
+ *
*/
public class BlazePointcastChannelTest extends BlazeChannelTest {
- protected void configureChannelFactory(BlazeChannelFactory fac, int groupNumber) {
- int portStart = 61616;
+ private static int PORTSTART = 61616;
+ private LinkedList<Integer> portList = new LinkedList<Integer>();
+
+ protected void setGroupSize(int groupSize) {
+ for (int i = 0; i < groupSize; i++) {
+ portList.add((PORTSTART + i));
+ }
+ }
+
+ protected void resetGroupSize() {
+ portList.clear();
+ }
+
+
+ protected BlazeChannelFactory getChannelFactory() {
+ BlazeChannelFactory result = new BlazeChannelFactory();
String uri = "static://(";
- for (int i = 0; i < groupNumber; i++) {
- uri += "udp://localhost:" + (portStart + i);
+ for (int i = 0; i < portList.size(); i++) {
+ uri += "udp://localhost:" + (portList.get(i));
uri += ",";
}
uri += ")";
- fac.getConfiguration().setManagementURI("");
- fac.getConfiguration().setBroadcastURI(uri);
- fac.getConfiguration().setReliableBroadcast("swp");
+ result.getConfiguration().setManagementURI("");
+ result.getConfiguration().setBroadcastURI(uri);
+ result.getConfiguration().setReliableBroadcast("swp");
+ portList.addLast(portList.removeFirst());
+ return result;
}
}
Modified: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelTest.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelTest.java (original)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelTest.java Fri Jul 19 18:44:21 2013
@@ -19,185 +19,223 @@ package org.apache.activeblaze.cluster;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
+
import junit.framework.TestCase;
-import org.apache.activeblaze.BlazeChannelFactory;
import org.apache.activeblaze.group.Member;
/**
* Test for clustered channel
- *
*/
public class BlazeClusterGroupChannelTest extends TestCase {
- public void XtestOneChannel() throws Exception {
- BlazeClusterGroupChannelFactory factory = new BlazeClusterGroupChannelFactory();
- BlazeClusterGroupChannel channel = factory.createChannel("test");
- assertEquals(1, channel.getMembers().size());
- channel.start();
- boolean electionFinished = channel
- .waitForElection((int) (channel.getConfiguration().getAwaitGroupTimeout() + 500));
- assertTrue(electionFinished);
- assertEquals(1, channel.getMembers().size());
- channel.shutDown();
- }
-
- public void XtestGroup() throws Exception {
- final int number = 3;
- List<BlazeClusterGroupChannel> channels = new ArrayList<BlazeClusterGroupChannel>();
- BlazeClusterGroupChannelFactory factory = new BlazeClusterGroupChannelFactory();
- configureChannelFactory(factory, number);
- for (int i = 0; i < number; i++) {
- BlazeClusterGroupChannel channel = factory.createChannel("test" + i);
- channel.getConfiguration().setMinimumGroupSize(number);
- channel.addToGroup("test");
+
+ public void testOneChannel() throws Exception {
+ try {
+ setGroupSize(1);
+
+ BlazeClusterGroupChannel channel = getChannelFactory().createChannel("test");
+ assertEquals(1, channel.getMembers().size());
channel.start();
- channels.add(channel);
+ boolean electionFinished = channel.waitForElection(0);
+ assertTrue(electionFinished);
+ assertEquals(1, channel.getMembers().size());
+ channel.shutDown();
+ } finally {
+ resetGroupSize();
}
- channels.get(number - 1).waitForElection(0);
- int masterNumber = 0;
- BlazeClusterGroupChannel master = null;
- for (BlazeClusterGroupChannel channel : channels) {
- if (channel.isMaster()) {
- masterNumber++;
- master = channel;
+ }
+
+ public void testChangedWeightedGroup() throws Exception {
+
+ try {
+ final int NUMBER = 4;
+ setGroupSize(NUMBER);
+ List<BlazeClusterGroupChannel> channels = new ArrayList<BlazeClusterGroupChannel>();
+
+ BlazeClusterGroupChannel weightedMaster = null;
+ for (int i = 0; i < NUMBER; i++) {
+ BlazeClusterGroupChannel channel = getChannelFactory().createChannel("test" + i);
+ channel.getConfiguration().setMinimumGroupSize(NUMBER);
+ channel.addToGroup("changedWeightedTest");
+ if (i == NUMBER / 2) {
+ channel.getConfiguration().setMasterWeight(10);
+ weightedMaster = channel;
+ } else {
+ channel.getConfiguration().setMasterWeight(0);
+ }
+ channel.start();
+ channels.add(channel);
}
- }
- assertNotNull(master);
- assertEquals(1, masterNumber);
- // kill the master
- master.shutDown();
- channels.remove(master);
- Thread.sleep(1000);
- channels.get(0).waitForElection(0);
- masterNumber = 0;
- master = null;
- for (BlazeClusterGroupChannel channel : channels) {
- if (channel.isMaster()) {
- masterNumber++;
- master = channel;
+ channels.get(NUMBER - 1).waitForElection(0);
+ int masterNumber = 0;
+ BlazeClusterGroupChannel master = null;
+ for (BlazeClusterGroupChannel channel : channels) {
+ if (channel.isMaster()) {
+ masterNumber++;
+ master = channel;
+ }
}
- }
- assertNotNull(master);
- assertEquals(1, masterNumber);
- for (BlazeClusterGroupChannel channel : channels) {
- channel.shutDown();
+ assertNotNull(master);
+ assertEquals(1, masterNumber);
+ assertTrue(master == weightedMaster);
+ BlazeClusterGroupChannel newMaster = channels.get(0);
+ assertTrue(newMaster != weightedMaster);
+ newMaster.getConfiguration().setMasterWeight(2000);
+ // wait for a new heartbeat to be generated
+ Thread.sleep(newMaster.getConfiguration().getHeartBeatInterval() + 500);
+ newMaster.waitForElection(0);
+ masterNumber = 0;
+ master = null;
+ for (BlazeClusterGroupChannel channel : channels) {
+ if (channel.isMaster()) {
+ masterNumber++;
+ master = channel;
+ }
+ }
+ assertEquals(1, masterNumber);
+ assertNotNull(master);
+ assertTrue(master == newMaster);
+ for (BlazeClusterGroupChannel channel : channels) {
+ channel.shutDown();
+ }
+ } finally {
+ resetGroupSize();
}
}
- public void testWeightedGroup() throws Exception {
- final int number = 4;
- List<BlazeClusterGroupChannel> channels = new ArrayList<BlazeClusterGroupChannel>();
- BlazeClusterGroupChannelFactory factory = new BlazeClusterGroupChannelFactory();
- configureChannelFactory(factory, number);
- configureChannelFactory(factory, number);
- BlazeClusterGroupChannel weightedMaster = null;
- for (int i = 0; i < number; i++) {
- BlazeClusterGroupChannel channel = factory.createChannel("test" + i);
- channel.addToGroup("test");
- channel.getConfiguration().setMinimumGroupSize(number);
- if (i == number / 2) {
- channel.getConfiguration().setMasterWeight(10);
- weightedMaster = channel;
- } else {
- channel.getConfiguration().setMasterWeight(0);
+
+ public void testGroup() throws Exception {
+ try {
+ final int NUMBER = 4;
+ setGroupSize(NUMBER);
+ List<BlazeClusterGroupChannel> channels = new ArrayList<BlazeClusterGroupChannel>();
+
+ for (int i = 0; i < NUMBER; i++) {
+ BlazeClusterGroupChannel channel = getChannelFactory().createChannel("test" + i);
+ channel.getConfiguration().setMinimumGroupSize(NUMBER - 2);
+ channel.addToGroup("test");
+ channel.start();
+ channels.add(channel);
+ }
+ channels.get(NUMBER - 1).waitForElection(0);
+ int masterNumber = 0;
+ BlazeClusterGroupChannel master = null;
+ for (BlazeClusterGroupChannel channel : channels) {
+ if (channel.isMaster()) {
+ masterNumber++;
+ master = channel;
+ }
}
- channel.start();
- channels.add(channel);
- }
- channels.get(number - 1).waitForElection(5000);
- int masterNumber = 0;
- BlazeClusterGroupChannel master = null;
- for (BlazeClusterGroupChannel channel : channels) {
- if (channel.isMaster()) {
- masterNumber++;
- master = channel;
+ assertNotNull(master);
+ assertEquals(1, masterNumber);
+ // kill the master
+ master.shutDown();
+ channels.remove(master);
+ Thread.sleep(5000);
+ channels.get(channels.size() - 1).waitForElection(0);
+ masterNumber = 0;
+ master = null;
+ for (BlazeClusterGroupChannel channel : channels) {
+ if (channel.isMaster()) {
+ masterNumber++;
+ master = channel;
+ }
}
- }
- assertNotNull(master);
- assertTrue(master == weightedMaster);
- assertEquals(1, masterNumber);
- for (BlazeClusterGroupChannel channel : channels) {
- channel.shutDown();
+ assertNotNull(master);
+ assertEquals(1, masterNumber);
+ for (BlazeClusterGroupChannel channel : channels) {
+ channel.shutDown();
+ }
+ } finally {
+ resetGroupSize();
}
}
- public void testChangedWeightedGroup() throws Exception {
- final int number = 4;
- List<BlazeClusterGroupChannel> channels = new ArrayList<BlazeClusterGroupChannel>();
- BlazeClusterGroupChannelFactory factory = new BlazeClusterGroupChannelFactory();
- configureChannelFactory(factory, number);
- BlazeClusterGroupChannel weightedMaster = null;
- for (int i = 0; i < number; i++) {
- BlazeClusterGroupChannel channel = factory.createChannel("test" + i);
-
- channel.getConfiguration().setMinimumGroupSize(number);
- channel.addToGroup("changedWeightedTest");
- if (i == number / 2) {
- channel.getConfiguration().setMasterWeight(10);
- weightedMaster = channel;
- } else {
- channel.getConfiguration().setMasterWeight(0);
+ public void testWeightedGroup() throws Exception {
+ try {
+ final int NUMBER = 4;
+ setGroupSize(4);
+ List<BlazeClusterGroupChannel> channels = new ArrayList<BlazeClusterGroupChannel>();
+
+
+ BlazeClusterGroupChannel weightedMaster = null;
+ for (int i = 0; i < NUMBER; i++) {
+ BlazeClusterGroupChannel channel = getChannelFactory().createChannel("test" + i);
+ channel.addToGroup("test");
+ channel.getConfiguration().setMinimumGroupSize(NUMBER);
+ if (i == NUMBER / 2) {
+ channel.getConfiguration().setMasterWeight(10);
+ weightedMaster = channel;
+ } else {
+ channel.getConfiguration().setMasterWeight(0);
+ }
+ channel.start();
+ channels.add(channel);
}
- channel.start();
- channels.add(channel);
- }
- channels.get(number - 1).waitForElection(10000);
- int masterNumber = 0;
- BlazeClusterGroupChannel master = null;
- for (BlazeClusterGroupChannel channel : channels) {
- if (channel.isMaster()) {
- masterNumber++;
- master = channel;
+ channels.get(NUMBER - 1).waitForElection(0);
+ int masterNumber = 0;
+ BlazeClusterGroupChannel master = null;
+ for (BlazeClusterGroupChannel channel : channels) {
+ if (channel.isMaster()) {
+ masterNumber++;
+ master = channel;
+ }
}
- }
- assertNotNull(master);
- assertEquals(1, masterNumber);
- assertTrue(master == weightedMaster);
- channels.get(0).getConfiguration().setMasterWeight(2000);
- Thread.sleep(2000);
- masterNumber = 0;
- master = null;
- for (BlazeClusterGroupChannel channel : channels) {
- if (channel.isMaster()) {
- masterNumber++;
- master = channel;
+ assertNotNull(master);
+ assertTrue(master == weightedMaster);
+ assertEquals(1, masterNumber);
+ for (BlazeClusterGroupChannel channel : channels) {
+ channel.shutDown();
}
- }
- assertNotNull(master);
- assertTrue(master != weightedMaster);
- assertEquals(1, masterNumber);
- for (BlazeClusterGroupChannel channel : channels) {
- channel.shutDown();
+ } finally {
+ resetGroupSize();
}
}
- public void XtestClusterChangedListener() throws Exception {
- final AtomicBoolean result = new AtomicBoolean();
- BlazeClusterGroupChannelFactory factory = new BlazeClusterGroupChannelFactory();
- BlazeClusterGroupChannel master = factory.createChannel("master");
- master.addToGroup("test");
- master.getConfiguration().setMasterWeight(10);
- master.start();
- BlazeClusterGroupChannel channel = factory.createChannel("test1");
- channel.addToGroup("test");
- channel.addMasterChangedListener(new MasterChangedListener() {
- public void masterChanged(Member master) {
- synchronized (result) {
- result.set(true);
- result.notifyAll();
- }
- }
- });
- channel.start();
- synchronized (result) {
- if (!result.get()) {
- result.wait(3000);
+
+ public void testClusterChangedListener() throws Exception {
+ try {
+ setGroupSize(2);
+ final AtomicBoolean result = new AtomicBoolean();
+
+ BlazeClusterGroupChannel master = getChannelFactory().createChannel("master");
+ master.addToGroup("test");
+ master.getConfiguration().setMasterWeight(10);
+
+ master.start();
+ BlazeClusterGroupChannel channel = getChannelFactory().createChannel("test1");
+ channel.addToGroup("test");
+ master.addMasterChangedListener(new MasterChangedListener() {
+ public void masterChanged(Member master) {
+ synchronized (result) {
+ result.set(true);
+ result.notifyAll();
+ }
+ }
+ });
+ channel.start();
+ channel.waitForElection(0);
+ channel.getConfiguration().setMasterWeight(1000);
+ synchronized (result) {
+ if (!result.get()) {
+ result.wait(10000);
+ }
}
+ assertTrue(channel.isMaster());
+ assertTrue(result.get());
+ channel.shutDown();
+ master.shutDown();
+ } finally {
+ resetGroupSize();
}
- assertTrue(result.get());
- channel.shutDown();
- master.shutDown();
}
- protected void configureChannelFactory(BlazeChannelFactory fac, int groupNumber) {
+ protected void setGroupSize(int groupSize) {
+ }
+
+ protected void resetGroupSize() {
+ }
+
+ protected BlazeClusterGroupChannelFactory getChannelFactory() {
+ return new BlazeClusterGroupChannelFactory();
}
}
Modified: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/BlazePointcastClusterGroupChannelTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/BlazePointcastClusterGroupChannelTest.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/BlazePointcastClusterGroupChannelTest.java (original)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/BlazePointcastClusterGroupChannelTest.java Fri Jul 19 18:44:21 2013
@@ -16,20 +16,36 @@
*/
package org.apache.activeblaze.cluster;
-import org.apache.activeblaze.BlazeChannelFactory;
+import java.util.LinkedList;
public class BlazePointcastClusterGroupChannelTest extends BlazeClusterGroupChannelTest {
- int portNum = 61616;
+ private static int PORTSTART = 61616;
+ private LinkedList<Integer> portList = new LinkedList<Integer>();
- protected void configureChannelFactory(BlazeChannelFactory fac, int groupNumber) {
+ protected void setGroupSize(int groupSize) {
+ for (int i = 0; i < groupSize; i++) {
+ portList.add((PORTSTART + i));
+ }
+ }
+
+ protected void resetGroupSize() {
+ portList.clear();
+ }
+
+
+ protected BlazeClusterGroupChannelFactory getChannelFactory() {
+ BlazeClusterGroupChannelFactory result = new BlazeClusterGroupChannelFactory();
String uri = "static://(";
- for (int i = 0; i < groupNumber; i++) {
- uri += "udp://localhost:" + (this.portNum++);
+ for (int i = 0; i < portList.size(); i++) {
+ uri += "udp://localhost:" + (portList.get(i));
uri += ",";
}
uri += ")";
- fac.getConfiguration().setManagementURI("");
- fac.getConfiguration().setBroadcastURI(uri);
- fac.getConfiguration().setReliableBroadcast("swp");
+ result.getConfiguration().setManagementURI("");
+ result.getConfiguration().setBroadcastURI(uri);
+ result.getConfiguration().setReliableBroadcast("swp");
+ portList.addLast(portList.removeFirst());
+ return result;
}
+
}