You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2008/11/25 15:27:24 UTC
svn commit: r720505 [1/2] - in /activemq/activemq-blaze/trunk/src:
main/java/org/apache/activeblaze/
main/java/org/apache/activeblaze/coordinated/
main/java/org/apache/activeblaze/group/
main/java/org/apache/activeblaze/impl/processor/ main/java/org/ap...
Author: rajdavies
Date: Tue Nov 25 06:27:23 2008
New Revision: 720505
URL: http://svn.apache.org/viewvc?rev=720505&view=rev
Log:
re-org some classes
Added:
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeNoRouteException.java (with props)
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/GroupState.java (with props)
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/AsyncGroupRequest.java
- copied, changed from r719718, activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/AsyncGroupRequest.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/ClassLoadingAwareObjectInputStream.java (with props)
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/LRUCache.java (with props)
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/RequestCallback.java
- copied, changed from r719718, activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/RequestCallback.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/SendRequest.java
- copied, changed from r719718, activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/SendRequest.java
activemq/activemq-blaze/trunk/src/test/resources/
activemq/activemq-blaze/trunk/src/test/resources/log4j.properties
Removed:
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/AsyncGroupRequest.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/RequestCallback.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/SendRequest.java
Modified:
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeConfiguration.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeException.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessage.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeRuntimeException.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/CoordinatedGroup.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Group.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/ChainedProcessor.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/Packet.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/BaseTransport.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/MulticastTransport.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/UdpTransport.java
activemq/activemq-blaze/trunk/src/main/proto/blaze.proto
activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazeChannelTest.java
activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/group/BlazeGroupChannelTest.java
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java?rev=720505&r1=720504&r2=720505&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java Tue Nov 25 06:27:23 2008
@@ -20,7 +20,6 @@
import java.net.URI;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activeblaze.impl.destination.DestinationMatch;
import org.apache.activeblaze.impl.processor.ChainedProcessor;
@@ -54,8 +53,6 @@
private Processor broadcast;
private BlazeConfiguration configuration = new BlazeConfiguration();
private String id;
- private LinkedBlockingQueue<BlazeMessage> broadcastQueue;
- private Thread broadcastQueueThread;
private Buffer managementURI;
private InetSocketAddress toAddress;
@@ -77,7 +74,7 @@
/**
* @param destination
* @param l
- * @throws Exception
+ * @throws Exception
* @see org.apache.activeblaze.BlazeChannel#addBlazeTopicMessageListener(java.lang.String,
* org.apache.activeblaze.BlazeTopicListener)
*/
@@ -90,7 +87,7 @@
* @param destination
* @param l
* @return
- * @throws Exception
+ * @throws Exception
* @see org.apache.activeblaze.BlazeChannel#removeBlazeMessageListener(java.lang.String,
* org.apache.activeblaze.BlazeTopicListener)
*/
@@ -102,9 +99,8 @@
public boolean init() throws Exception {
boolean result = super.init();
if (result) {
- this.broadcastQueue = new LinkedBlockingQueue<BlazeMessage>(getConfiguration().getMaxDispatchQueueSize());
String broadcastURIStr = getConfiguration().getBroadcastURI();
- broadcastURIStr=PropertyUtil.addPropertiesToURIFromBean(broadcastURIStr, getConfiguration());
+ broadcastURIStr = PropertyUtil.addPropertiesToURIFromBean(broadcastURIStr, getConfiguration());
URI broadcastURI = new URI(broadcastURIStr);
this.toAddress = new InetSocketAddress(broadcastURI.getHost(), broadcastURI.getPort());
this.managementURI = new Buffer(new URI(getConfiguration().getManagementURI()).toString());
@@ -123,7 +119,6 @@
protected Processor configureProcess(BaseTransport transport) throws Exception {
int maxPacketSize = getConfiguration().getMaxPacketSize();
configureTransport(transport);
-
CompressionProcessor result = new CompressionProcessor();
result.setPrev(this);
result.setExceptionListener(this);
@@ -147,18 +142,6 @@
public boolean start() throws Exception {
boolean result = super.start();
if (result) {
- if (getConfiguration().isUseDispatchThread()) {
- Runnable runable = new Runnable() {
- public void run() {
- while (isStarted()) {
- dequeueBroadcastMessages();
- }
- }
- };
- this.broadcastQueueThread = new Thread(runable, getId() + "-BroadcastQueue");
- this.broadcastQueueThread.setDaemon(true);
- this.broadcastQueueThread.start();
- }
this.broadcast.start();
}
return result;
@@ -167,13 +150,6 @@
public boolean stop() throws Exception {
boolean result = super.stop();
if (result) {
- if (this.broadcastQueueThread != null) {
- this.broadcastQueueThread.interrupt();
- try {
- this.broadcastQueueThread.join(1000);
- } catch (InterruptedException e) {
- }
- }
this.broadcast.stop();
}
return result;
@@ -241,7 +217,7 @@
protected void doProcessBlazeData(PacketData data) throws Exception {
BlazeMessage message = buildBlazeMessage(data);
- processBlazeMessage(message);
+ dispatch(message);
}
protected final BlazeMessage buildBlazeMessage(PacketData data) throws Exception {
@@ -274,28 +250,7 @@
return new BlazeMessage();
}
- protected void processBlazeMessage(BlazeMessage message) {
- if (this.broadcastQueueThread == null) {
- dispatch(message);
- } else {
- try {
- this.broadcastQueue.put(message);
- } catch (InterruptedException e) {
- // ignore - we are stopping
- }
- }
- }
-
- protected void dequeueBroadcastMessages() {
- BlazeMessage message = null;
- try {
- message = this.broadcastQueue.take();
- } catch (InterruptedException e1) {
- }
- dispatch(message);
- }
-
- protected void dispatch(BlazeMessage message) {
+ protected final void dispatch(BlazeMessage message) {
if (message != null) {
Buffer destination = message.getContent().getDestination();
for (Map.Entry<Buffer, BlazeTopicListener> entry : this.topicessageListenerMap.entrySet()) {
@@ -305,9 +260,10 @@
}
}
}
-
+
/**
* shutdown on gc
+ *
* @throws Throwable
* @see java.lang.Object#finalize()
*/
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeConfiguration.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeConfiguration.java?rev=720505&r1=720504&r2=720505&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeConfiguration.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeConfiguration.java Tue Nov 25 06:27:23 2008
@@ -33,8 +33,6 @@
private String unicastURI = "udp://localhost:0";
private String broadcastURI = "mcast://224.2.2.2:9999";
private String managementURI = "mcast://224.2.2.2:8888";
- // Channel internals
- private boolean useDispatchThread = true;
private int maxDispatchQueueSize = 10000;
private int maxPacketSize = DEFAULT_MAX_PACKET_SIZE;
//reliability
@@ -84,22 +82,6 @@
public void setBroadcastURI(String broadcastURL) {
this.broadcastURI = broadcastURL;
}
-
- /**
- * @return the useDispatchThread
- */
- public boolean isUseDispatchThread() {
- return this.useDispatchThread;
- }
-
- /**
- * @param useDispatchThread
- * the useDispatchThread to set
- */
- public void setUseDispatchThread(boolean useDispatchThread) {
- this.useDispatchThread = useDispatchThread;
- }
-
/**
* @return the maxDispatchQueueSize
*/
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeException.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeException.java?rev=720505&r1=720504&r2=720505&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeException.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeException.java Tue Nov 25 06:27:23 2008
@@ -21,9 +21,6 @@
*
*/
public class BlazeException extends Exception {
- /**
- *
- */
private static final long serialVersionUID = 1064152356749288271L;
/**
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessage.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessage.java?rev=720505&r1=720504&r2=720505&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessage.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessage.java Tue Nov 25 06:27:23 2008
@@ -16,6 +16,11 @@
*/
package org.apache.activeblaze;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectOutputStream;
import java.security.Key;
import java.util.Collection;
import java.util.Collections;
@@ -23,8 +28,10 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import org.apache.activeblaze.util.ClassLoadingAwareObjectInputStream;
import org.apache.activeblaze.wire.BlazeData;
import org.apache.activeblaze.wire.BoolType;
+import org.apache.activeblaze.wire.BufferType;
import org.apache.activeblaze.wire.ByteType;
import org.apache.activeblaze.wire.BytesType;
import org.apache.activeblaze.wire.CharType;
@@ -36,6 +43,8 @@
import org.apache.activeblaze.wire.ShortType;
import org.apache.activeblaze.wire.StringType;
import org.apache.activemq.protobuf.Buffer;
+import org.apache.activemq.protobuf.BufferInputStream;
+import org.apache.activemq.protobuf.BufferOutputStream;
/**
@@ -73,9 +82,10 @@
* <code>NullPointerException</code>.
*
*/
-public class BlazeMessage implements Map<String, Object>{
+public class BlazeMessage implements Map<String, Object> {
private static final String DEFAULT_TEXT_PAYLOAD = "DEFAULT_TEXT_PAYLOAD";
private static final String DEFAULT_BYTES_PAYLOAD = "DEFAULT_BYTES_PAYLOAD";
+ private static final String DEFAULT_OBJECT_PAYLOAD = "DEFAULT_OBJECT_PAYLOAD";
private transient Map<String, Object> map = new ConcurrentHashMap<String, Object>();
private transient String destination;
private transient String fromId;
@@ -83,88 +93,145 @@
private transient String correlationId;
private transient long timeStamp;
private BlazeData content;
-
+
/**
* Default Constructor
*/
public BlazeMessage() {
}
-
+
/**
* Constructor - Utility to construct a message with a text <Code>String</Code> payload
+ *
* @param text
*/
public BlazeMessage(String text) {
- setString(DEFAULT_TEXT_PAYLOAD,text);
+ setString(DEFAULT_TEXT_PAYLOAD, text);
}
-
+
/**
* Constructor - Utility to construct a message with a byte[] array payload
+ *
* @param data
*/
public BlazeMessage(byte[] data) {
- setBytes(DEFAULT_BYTES_PAYLOAD,data);
+ setBytes(DEFAULT_BYTES_PAYLOAD, data);
}
-
+
+ /**
+ * Constructor - Utility to construct a message with an object payload
+ *
+ * @param data
+ */
+ public BlazeMessage(Object data) {
+ setObject(data);
+ }
+
/**
* Utility method for setting a default <Code>String</Code> payload
+ *
* @param text
*/
public void setText(String text) {
- setString(DEFAULT_TEXT_PAYLOAD,text);
+ setString(DEFAULT_TEXT_PAYLOAD, text);
}
-
+
/**
- * Utility method used for when a BlazeMessage is only carrying a String
+ * Utility method used for when a BlazeMessage is only carrying a byte[] array
+ *
* @return text the default text
* @throws Exception
*/
public String getText() throws Exception {
return getString(DEFAULT_TEXT_PAYLOAD);
}
-
+
/**
- * Utility method for setting a default <Code>String</Code> payload
- * @param payload
+ * Utility method for setting a default <Code>byte[]</Code> payload
+ *
+ * @param payload
*/
public void setBytes(byte[] payload) {
- setBytes(DEFAULT_BYTES_PAYLOAD,payload);
+ setBytes(DEFAULT_BYTES_PAYLOAD, payload);
+ }
+
+ /**
+ * Utility method used for when a BlazeMessage is only carrying an Object
+ *
+ * @return text the default text
+ * @throws Exception
+ */
+ public Object getObject() throws Exception {
+ Object result = null;
+ Buffer buffer = getBuffer(DEFAULT_OBJECT_PAYLOAD);
+ InputStream is = new BufferInputStream(buffer);
+ DataInputStream dataIn = new DataInputStream(is);
+ ClassLoadingAwareObjectInputStream objIn = new ClassLoadingAwareObjectInputStream(dataIn);
+ result = objIn.readObject();
+ return result;
}
/**
+ * Utility method for setting a default <Code>Object</Code> payload
+ *
+ * @param payload
+ */
+
+ public void setObject(Object payload) {
+ BufferOutputStream bufferOut = new BufferOutputStream(1024);
+ DataOutputStream dataOut = new DataOutputStream(bufferOut);
+ try {
+ ObjectOutputStream objOut = new ObjectOutputStream(dataOut);
+ objOut.writeObject(payload);
+ objOut.flush();
+ objOut.reset();
+ objOut.close();
+ } catch (IOException e) {
+ throw new BlazeRuntimeException(e);
+ }
+ put(DEFAULT_OBJECT_PAYLOAD, bufferOut.toBuffer());
+ }
+
+
+
+ /**
* Utility method used for when a BlazeMessage is only carrying a String
+ *
* @return text the default text
* @throws Exception
*/
public byte[] getBytes() throws Exception {
return getBytes(DEFAULT_BYTES_PAYLOAD);
}
-
+
/**
- * @param copy2
+ * @param copy2
* @return a copy of this message
* @throws BlazeException
*/
- public BlazeMessage copy() throws BlazeException{
+ public BlazeMessage copy() throws BlazeException {
BlazeMessage copy = new BlazeMessage();
copy(copy);
return copy;
}
-
+
/**
* clear the contents of this message
*/
- public void clear(){
+ public void clear() {
this.map.clear();
}
+
/**
* Returns the <CODE>boolean</CODE> value with the specified name.
*
- * @param name the name of the <CODE>boolean</CODE>
+ * @param name
+ * the name of the <CODE>boolean</CODE>
* @return the <CODE>boolean</CODE> value with the specified name
- * @throws BlazeMessageFormatException if this type conversion is invalid.
+ * @throws BlazeMessageFormatException
+ * if this type conversion is invalid.
*/
- public boolean getBoolean(String name) throws BlazeMessageFormatException{
+ public boolean getBoolean(String name) throws BlazeMessageFormatException {
initializeReading();
Object value = this.map.get(name);
if (value == null) {
@@ -179,14 +246,17 @@
throw new BlazeMessageFormatException(" cannot read a boolean from " + value.getClass().getName());
}
}
+
/**
* Returns the <CODE>byte</CODE> value with the specified name.
*
- * @param name the name of the <CODE>byte</CODE>
+ * @param name
+ * the name of the <CODE>byte</CODE>
* @return the <CODE>byte</CODE> value with the specified name
- * @throws BlazeMessageFormatException if this type conversion is invalid.
+ * @throws BlazeMessageFormatException
+ * if this type conversion is invalid.
*/
- public byte getByte(String name) throws BlazeMessageFormatException{
+ public byte getByte(String name) throws BlazeMessageFormatException {
initializeReading();
Object value = this.map.get(name);
if (value == null) {
@@ -201,14 +271,17 @@
throw new BlazeMessageFormatException(" cannot read a byte from " + value.getClass().getName());
}
}
+
/**
* Returns the <CODE>short</CODE> value with the specified name.
*
- * @param name the name of the <CODE>short</CODE>
+ * @param name
+ * the name of the <CODE>short</CODE>
* @return the <CODE>short</CODE> value with the specified name
- * @throws BlazeMessageFormatException if this type conversion is invalid.
+ * @throws BlazeMessageFormatException
+ * if this type conversion is invalid.
*/
- public short getShort(String name) throws BlazeMessageFormatException{
+ public short getShort(String name) throws BlazeMessageFormatException {
initializeReading();
Object value = this.map.get(name);
if (value == null) {
@@ -226,14 +299,17 @@
throw new BlazeMessageFormatException(" cannot read a short from " + value.getClass().getName());
}
}
+
/**
* Returns the Unicode character value with the specified name.
*
- * @param name the name of the Unicode character
+ * @param name
+ * the name of the Unicode character
* @return the Unicode character value with the specified name
- * @throws BlazeMessageFormatException if this type conversion is invalid.
+ * @throws BlazeMessageFormatException
+ * if this type conversion is invalid.
*/
- public char getChar(String name) throws BlazeMessageFormatException{
+ public char getChar(String name) throws BlazeMessageFormatException {
initializeReading();
Object value = this.map.get(name);
if (value == null) {
@@ -245,14 +321,17 @@
throw new BlazeMessageFormatException(" cannot read a short from " + value.getClass().getName());
}
}
+
/**
* Returns the <CODE>int</CODE> value with the specified name.
*
- * @param name the name of the <CODE>int</CODE>
+ * @param name
+ * the name of the <CODE>int</CODE>
* @return the <CODE>int</CODE> value with the specified name
- * @throws BlazeMessageFormatException if this type conversion is invalid.
+ * @throws BlazeMessageFormatException
+ * if this type conversion is invalid.
*/
- public int getInt(String name) throws BlazeMessageFormatException{
+ public int getInt(String name) throws BlazeMessageFormatException {
initializeReading();
Object value = this.map.get(name);
if (value == null) {
@@ -273,14 +352,17 @@
throw new BlazeMessageFormatException(" cannot read an int from " + value.getClass().getName());
}
}
+
/**
* Returns the <CODE>long</CODE> value with the specified name.
*
- * @param name the name of the <CODE>long</CODE>
+ * @param name
+ * the name of the <CODE>long</CODE>
* @return the <CODE>long</CODE> value with the specified name
- * @throws BlazeMessageFormatException if this type conversion is invalid.
+ * @throws BlazeMessageFormatException
+ * if this type conversion is invalid.
*/
- public long getLong(String name) throws BlazeMessageFormatException{
+ public long getLong(String name) throws BlazeMessageFormatException {
initializeReading();
Object value = this.map.get(name);
if (value == null) {
@@ -304,14 +386,17 @@
throw new BlazeMessageFormatException(" cannot read a long from " + value.getClass().getName());
}
}
+
/**
* Returns the <CODE>float</CODE> value with the specified name.
*
- * @param name the name of the <CODE>float</CODE>
+ * @param name
+ * the name of the <CODE>float</CODE>
* @return the <CODE>float</CODE> value with the specified name
- * @throws BlazeMessageFormatException if this type conversion is invalid.
+ * @throws BlazeMessageFormatException
+ * if this type conversion is invalid.
*/
- public float getFloat(String name) throws BlazeMessageFormatException{
+ public float getFloat(String name) throws BlazeMessageFormatException {
initializeReading();
Object value = this.map.get(name);
if (value == null) {
@@ -326,14 +411,17 @@
throw new BlazeMessageFormatException(" cannot read a float from " + value.getClass().getName());
}
}
+
/**
* Returns the <CODE>double</CODE> value with the specified name.
*
- * @param name the name of the <CODE>double</CODE>
+ * @param name
+ * the name of the <CODE>double</CODE>
* @return the <CODE>double</CODE> value with the specified name
- * @throws BlazeMessageFormatException if this type conversion is invalid.
+ * @throws BlazeMessageFormatException
+ * if this type conversion is invalid.
*/
- public double getDouble(String name) throws BlazeMessageFormatException{
+ public double getDouble(String name) throws BlazeMessageFormatException {
initializeReading();
Object value = this.map.get(name);
if (value == null) {
@@ -351,15 +439,18 @@
throw new BlazeMessageFormatException(" cannot read a double from " + value.getClass().getName());
}
}
+
/**
* Returns the <CODE>String</CODE> value with the specified name.
*
- * @param name the name of the <CODE>String</CODE>
+ * @param name
+ * the name of the <CODE>String</CODE>
* @return the <CODE>String</CODE> value with the specified name; if there is no item by this name, a null value
* is returned
- * @throws BlazeMessageFormatException if this type conversion is invalid.
+ * @throws BlazeMessageFormatException
+ * if this type conversion is invalid.
*/
- public String getString(String name) throws BlazeMessageFormatException{
+ public String getString(String name) throws BlazeMessageFormatException {
initializeReading();
Object value = this.map.get(name);
if (value == null) {
@@ -371,14 +462,17 @@
return value.toString();
}
}
+
/**
* Returns the byte array value with the specified name.
*
- * @param name the name of the byte array
+ * @param name
+ * the name of the byte array
* @return the byte array value with the specified name; if there is no item by this name, a null value is returned.
- * @throws BlazeMessageFormatException if this type conversion is invalid.
+ * @throws BlazeMessageFormatException
+ * if this type conversion is invalid.
*/
- public byte[] getBytes(String name) throws BlazeMessageFormatException{
+ public byte[] getBytes(String name) throws BlazeMessageFormatException {
initializeReading();
Object value = this.map.get(name);
if (value instanceof byte[]) {
@@ -387,6 +481,28 @@
throw new BlazeMessageFormatException(" cannot read a byte[] from " + value.getClass().getName());
}
}
+
+ /**
+ * Returns a Buffer with the specified name.
+ *
+ * @param name
+ * the name of the byte array
+ * @return the byte array value with the specified name; if there is no item by this name, a null value is returned.
+ * @throws BlazeMessageFormatException
+ * if this type conversion is invalid.
+ */
+ public Buffer getBuffer(String name) throws BlazeMessageFormatException {
+ initializeReading();
+ Object value = this.map.get(name);
+ if (value instanceof Buffer) {
+ return (Buffer) value;
+ } else {
+ throw new BlazeMessageFormatException(" cannot read a Buffer from " + value.getClass().getName());
+ }
+ }
+
+
+
/**
* Returns the value of the object with the specified name.
* <P>
@@ -396,33 +512,37 @@
* <P>
* Note that byte values are returned as <CODE>byte[]</CODE>, not <CODE>Byte[]</CODE>.
*
- * @param name the name of the Java object
+ * @param name
+ * the name of the Java object
* @return a copy of the Java object value with the specified name, in objectified format (for example, if the
* object was set as an <CODE>int</CODE>, an <CODE>Integer</CODE> is returned); if there is no item by
* this name, a null value is returned
*/
- public Object getObject(String name){
+ public Object getObject(String name) {
initializeReading();
return this.map.get(name);
}
+
/**
* Returns an <CODE>Enumeration</CODE> of all the names in the <CODE>BlazeMessage</CODE> object.
*
* @return an enumeration of all the names in this <CODE>BlazeMessage</CODE>
*/
- public Enumeration<String> getMapNames(){
+ public Enumeration<String> getMapNames() {
initializeReading();
return Collections.enumeration(this.map.keySet());
}
-
+
/**
* put a key,value pair into the message
- * @param name
- * @param value must be a supported primitive, or map of supported primitives
+ *
+ * @param name
+ * @param value
+ * must be a supported primitive, or map of supported primitives
* @return the previous value associated with the key
*/
- public Object put(String name,Object value){
- initializeWriting();
+ public Object put(String name, Object value) {
+ initializeWriting();
if (name == null) {
throw new IllegalArgumentException("The name of the property cannot be null.");
}
@@ -432,106 +552,155 @@
checkValidObject(value);
return this.map.put(name, value);
}
+
/**
* Sets a <CODE>boolean</CODE> value with the specified name into the Map.
*
- * @param name the name of the <CODE>boolean</CODE>
- * @param value the <CODE>boolean</CODE> value to set in the Map
+ * @param name
+ * the name of the <CODE>boolean</CODE>
+ * @param value
+ * the <CODE>boolean</CODE> value to set in the Map
*/
- public void setBoolean(String name,boolean value){
+ public void setBoolean(String name, boolean value) {
initializeWriting();
put(name, value ? Boolean.TRUE : Boolean.FALSE);
}
-
+
/**
* Sets a <CODE>byte</CODE> value with the specified name into the Map.
*
- * @param name the name of the <CODE>byte</CODE>
- * @param value the <CODE>byte</CODE> value to set in the Map
+ * @param name
+ * the name of the <CODE>byte</CODE>
+ * @param value
+ * the <CODE>byte</CODE> value to set in the Map
*/
- public void setByte(String name,byte value){
+ public void setByte(String name, byte value) {
initializeWriting();
put(name, Byte.valueOf(value));
}
+
/**
* Sets a <CODE>short</CODE> value with the specified name into the Map.
*
- * @param name the name of the <CODE>short</CODE>
- * @param value the <CODE>short</CODE> value to set in the Map
+ * @param name
+ * the name of the <CODE>short</CODE>
+ * @param value
+ * the <CODE>short</CODE> value to set in the Map
*/
- public void setShort(String name,short value){
+ public void setShort(String name, short value) {
initializeWriting();
put(name, Short.valueOf(value));
}
+
/**
* Sets a Unicode character value with the specified name into the Map.
*
- * @param name the name of the Unicode character
- * @param value the Unicode character value to set in the Map
+ * @param name
+ * the name of the Unicode character
+ * @param value
+ * the Unicode character value to set in the Map
*/
- public void setChar(String name,char value){
+ public void setChar(String name, char value) {
initializeWriting();
put(name, Character.valueOf(value));
}
+
/**
* Sets an <CODE>int</CODE> value with the specified name into the Map.
*
- * @param name the name of the <CODE>int</CODE>
- * @param value the <CODE>int</CODE> value to set in the Map
+ * @param name
+ * the name of the <CODE>int</CODE>
+ * @param value
+ * the <CODE>int</CODE> value to set in the Map
*/
- public void setInt(String name,int value){
+ public void setInt(String name, int value) {
initializeWriting();
put(name, Integer.valueOf(value));
}
+
/**
* Sets a <CODE>long</CODE> value with the specified name into the Map.
*
- * @param name the name of the <CODE>long</CODE>
- * @param value the <CODE>long</CODE> value to set in the Map
+ * @param name
+ * the name of the <CODE>long</CODE>
+ * @param value
+ * the <CODE>long</CODE> value to set in the Map
*/
- public void setLong(String name,long value){
+ public void setLong(String name, long value) {
initializeWriting();
put(name, Long.valueOf(value));
}
+
/**
* Sets a <CODE>float</CODE> value with the specified name into the Map.
*
- * @param name the name of the <CODE>float</CODE>
- * @param value the <CODE>float</CODE> value to set in the Map
+ * @param name
+ * the name of the <CODE>float</CODE>
+ * @param value
+ * the <CODE>float</CODE> value to set in the Map
*/
- public void setFloat(String name,float value){
+ public void setFloat(String name, float value) {
initializeWriting();
put(name, new Float(value));
}
+
/**
* Sets a <CODE>double</CODE> value with the specified name into the Map.
*
- * @param name the name of the <CODE>double</CODE>
- * @param value the <CODE>double</CODE> value to set in the Map
+ * @param name
+ * the name of the <CODE>double</CODE>
+ * @param value
+ * the <CODE>double</CODE> value to set in the Map
*/
- public void setDouble(String name,double value){
+ public void setDouble(String name, double value) {
initializeWriting();
put(name, new Double(value));
}
+
/**
* Sets a <CODE>String</CODE> value with the specified name into the Map.
*
- * @param name the name of the <CODE>String</CODE>
- * @param value the <CODE>String</CODE> value to set in the Map
+ * @param name
+ * the name of the <CODE>String</CODE>
+ * @param value
+ * the <CODE>String</CODE> value to set in the Map
*/
- public void setString(String name,String value){
+ public void setString(String name, String value) {
initializeWriting();
put(name, value);
}
+
/**
* Sets a byte array value with the specified name into the Map.
*
- * @param name the name of the byte array
- * @param value the byte array value to set in the Map; the array is copied so that the value for <CODE>name </CODE>
+ * @param name
+ * the name of the byte array
+ * @param value
+ * the byte array value to set in the Map; the array is copied so that the value for <CODE>name </CODE>
* will not be altered by future modifications
- * @throws NullPointerException if the name is null, or if the name is an empty string.
+ * @throws NullPointerException
+ * if the name is null, or if the name is an empty string.
+ */
+ public void setBytes(String name, byte[] value) {
+ initializeWriting();
+ if (value != null) {
+ put(name, value);
+ } else {
+ this.map.remove(name);
+ }
+ }
+
+ /**
+ * Sets a Buffer value with the specified name into the Map.
+ *
+ * @param name
+ * the name of the byte array
+ * @param value
+ * the Buffer value to set in the Map
+ * @throws NullPointerException
+ * if the name is null, or if the name is an empty string.
*/
- public void setBytes(String name,byte[] value){
+ public void setBuffer(String name, Buffer value) {
initializeWriting();
if (value != null) {
put(name, value);
@@ -539,163 +708,173 @@
this.map.remove(name);
}
}
+
/**
* Sets a portion of the byte array value with the specified name into the Map.
*
- * @param name the name of the byte array
- * @param value the byte array value to set in the Map
- * @param offset the initial offset within the byte array
- * @param length the number of bytes to use
+ * @param name
+ * the name of the byte array
+ * @param value
+ * the byte array value to set in the Map
+ * @param offset
+ * the initial offset within the byte array
+ * @param length
+ * the number of bytes to use
*/
- public void setBytes(String name,byte[] value,int offset,int length){
+ public void setBytes(String name, byte[] value, int offset, int length) {
initializeWriting();
byte[] data = new byte[length];
System.arraycopy(value, offset, data, 0, length);
put(name, data);
}
-
+
+
+
/**
- * Find out if the message contains a key
- * This isn't recursive
- * @param key
+ * Find out if the message contains a key This isn't recursive
+ *
+ * @param key
* @return true if the message contains the key
*
- */
- public boolean containsKey(Object key){
+ */
+ public boolean containsKey(Object key) {
initializeReading();
return this.map.containsKey(key.toString());
}
-
+
/**
* Find out if the message contains a value
- * @param value
+ *
+ * @param value
* @return true if the value exists
*
*/
- public boolean containsValue(Object value){
+ public boolean containsValue(Object value) {
initializeReading();
return this.map.containsValue(value);
}
-
+
/**
* @return a set of Map.Entry values
*
*/
- public Set<java.util.Map.Entry<String, Object>> entrySet(){
+ public Set<java.util.Map.Entry<String, Object>> entrySet() {
initializeReading();
return this.map.entrySet();
}
-
+
/**
* Retrieve the object associated with the key
- * @param key
+ *
+ * @param key
* @return the object
*/
- public Object get(Object key){
+ public Object get(Object key) {
initializeReading();
return getObject(key.toString());
}
-
+
/**
* @return true if the message is empty
*
*/
- public boolean isEmpty(){
+ public boolean isEmpty() {
initializeReading();
return this.map.isEmpty();
}
-
+
/**
* @return a Set of all the keys
*/
- public Set<String> keySet(){
+ public Set<String> keySet() {
initializeReading();
return this.map.keySet();
}
-
+
/**
* Add all entries in a Map to the message
- * @param t the map
+ *
+ * @param t
+ * the map
*
*/
- public void putAll(Map<? extends String, ? extends Object> t){
+ public void putAll(Map<? extends String, ? extends Object> t) {
for (Map.Entry<? extends String, ? extends Object> entry : t.entrySet()) {
put(entry.getKey(), entry.getValue());
}
-
}
-
+
/**
* Remove a key/value pair from the message
- * @param key
+ *
+ * @param key
* @return the value removed or null
*
*/
- public Object remove(Object key){
+ public Object remove(Object key) {
setContent(null);
return this.map.remove(key.toString());
}
-
+
/**
* @return the number of entries in the message
*/
- public int size(){
+ public int size() {
initializeReading();
return this.map.size();
}
-
+
/**
* @return a Collection of the values in the message
*/
- public Collection<Object> values(){
+ public Collection<Object> values() {
initializeReading();
return this.map.values();
}
-
- private void initializeReading(){
+
+ private void initializeReading() {
loadContent();
}
-
- private void initializeWriting(){
+
+ private void initializeWriting() {
setContent(null);
}
-
- protected void checkValidObject(Object value) throws IllegalArgumentException{
+
+ protected void checkValidObject(Object value) throws IllegalArgumentException {
boolean valid = value instanceof Boolean || value instanceof Byte || value instanceof Short
|| value instanceof Integer || value instanceof Long;
valid = valid || value instanceof Float || value instanceof Double || value instanceof Character
|| value instanceof String || value == null || value instanceof byte[];
if (value instanceof Map) {
- Map map = (Map) value;
- for(Object v:map.values()) {
+ Map map = (Map) value;
+ for (Object v : map.values()) {
checkValidObject(v);
}
valid = true;
}
if (!valid) {
- throw new IllegalArgumentException("Not a valid message value: "+value);
+ throw new IllegalArgumentException("Not a valid message value: " + value);
}
}
-
- public String toString(){
+
+ public String toString() {
return super.toString() + "MQBlazeMessage{ " + "map = " + this.map + " }";
}
-
- protected void copy(BlazeMessage copy) throws BlazeException{
+
+ protected void copy(BlazeMessage copy) throws BlazeException {
storeContent();
copy.content = this.content;
}
-
-
- public BlazeData getContent(){
+
+ public BlazeData getContent() {
return this.content;
}
-
- public void setContent(BlazeData content){
+
+ public void setContent(BlazeData content) {
this.content = content;
}
-
- private void marshallMap(MapData mapData,String name,Object value) throws BlazeMessageFormatException{
+
+ private void marshallMap(MapData mapData, String name, Object value) throws BlazeMessageFormatException {
if (value != null) {
if (value.getClass() == Boolean.class) {
BoolType type = new BoolType();
@@ -747,6 +926,10 @@
type.setName(name);
type.setValue(value.toString());
mapData.addStringType(type);
+ } else if (value.getClass() == Buffer.class) {
+ BufferType type = new BufferType();
+ type.setName(name);
+ type.setValue((Buffer) value);
} else if (value instanceof Map) {
Map<String, Key> subMap = (Map<String, Key>) value;
for (Map.Entry<String, Key> entry : subMap.entrySet()) {
@@ -760,8 +943,8 @@
}
}
}
-
- Map<String, Object> unmarshall(MapData mapData){
+
+ Map<String, Object> unmarshall(MapData mapData) {
Map<String, Object> result = new ConcurrentHashMap<String, Object>();
if (mapData.hasBoolType()) {
for (BoolType type : mapData.getBoolTypeList()) {
@@ -813,6 +996,11 @@
result.put(type.getName(), type.getValue().toByteArray());
}
}
+ if (mapData.hasBufferType()) {
+ for (BufferType type : mapData.getBufferTypeList()) {
+ result.put(type.getName(), type.getValue());
+ }
+ }
if (mapData.hasMapType()) {
for (MapData type : mapData.getMapTypeList()) {
Map<String, Object> map = unmarshall(type);
@@ -821,8 +1009,8 @@
}
return result;
}
-
- public void storeContent() throws BlazeMessageFormatException{
+
+ public void storeContent() throws BlazeMessageFormatException {
if (getContent() == null && !this.map.isEmpty()) {
BlazeData bd = new BlazeData();
MapData mapData = new MapData();
@@ -833,12 +1021,12 @@
this.content = bd;
}
}
-
+
/**
* Builds the message body from data
*
*/
- void loadContent() throws BlazeRuntimeException{
+ void loadContent() throws BlazeRuntimeException {
BlazeData data = getContent();
if (data != null && this.map.isEmpty()) {
this.map = unmarshall(data.getMapData());
@@ -853,7 +1041,8 @@
}
/**
- * @param destination the destination to set
+ * @param destination
+ * the destination to set
*/
public void setDestination(String destination) {
this.destination = destination;
@@ -861,6 +1050,7 @@
/**
* The id of the channel that sent the message
+ *
* @return the fromId
*/
public String getFromId() {
@@ -868,7 +1058,8 @@
}
/**
- * @param fromId the fromId to set
+ * @param fromId
+ * the fromId to set
*/
public void setFromId(String fromId) {
this.fromId = fromId;
@@ -882,7 +1073,8 @@
}
/**
- * @param messageId the messageId to set
+ * @param messageId
+ * the messageId to set
*/
public void setMessageId(String messageId) {
this.messageId = messageId;
@@ -896,7 +1088,8 @@
}
/**
- * @param correlationId the correlationId to set
+ * @param correlationId
+ * the correlationId to set
*/
public void setCorrelationId(String correlationId) {
this.correlationId = correlationId;
@@ -910,9 +1103,12 @@
}
/**
- * @param timeStamp the timeStamp to set
+ * @param timeStamp
+ * the timeStamp to set
*/
public void setTimeStamp(long timeStamp) {
this.timeStamp = timeStamp;
}
+
+
}
\ No newline at end of file
Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeNoRouteException.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeNoRouteException.java?rev=720505&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeNoRouteException.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeNoRouteException.java Tue Nov 25 06:27:23 2008
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze;
+
+/**
+ * Blaze BlazeNoRouteException
+ *
+ */
+public class BlazeNoRouteException extends BlazeException {
+ private static final long serialVersionUID = 3951297225484077839L;
+
+ /**
+ * Constructs a new exception with <code>null</code> as its detail message. The cause is not initialized, and may
+ * subsequently be initialized by a call to {@link #initCause}.
+ */
+ public BlazeNoRouteException() {
+ super();
+ }
+
+ /**
+ * Constructs a new exception with the specified detail message. The cause is not initialized, and may subsequently
+ * be initialized by a call to {@link #initCause}.
+ *
+ * @param message
+ * the detail message. The detail message is saved for later retrieval by the {@link #getMessage()}
+ * method.
+ */
+ public BlazeNoRouteException(String message) {
+ super(message);
+ }
+
+ /**
+ * Constructs a new exception with the specified detail message and cause.
+ * <p>
+ * Note that the detail message associated with <code>cause</code> is <i>not</i> automatically incorporated in
+ * this exception's detail message.
+ *
+ * @param message
+ * the detail message (which is saved for later retrieval by the {@link #getMessage()} method).
+ * @param cause
+ * the cause (which is saved for later retrieval by the {@link #getCause()} method). (A <tt>null</tt>
+ * value is permitted, and indicates that the cause is nonexistent or unknown.)
+ */
+ public BlazeNoRouteException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ /**
+ * Constructs a new exception with the specified cause and a detail message of
+ * <tt>(cause==null ? null : cause.toString())</tt> (which typically contains the class and detail message of
+ * <tt>cause</tt>). This constructor is useful for exceptions that are little more than wrappers for other
+ * throwables (for example, {@link java.security.PrivilegedActionException}).
+ *
+ * @param cause
+ * the cause (which is saved for later retrieval by the {@link #getCause()} method). (A <tt>null</tt>
+ * value is permitted, and indicates that the cause is nonexistent or unknown.)
+ */
+ public BlazeNoRouteException(Throwable cause) {
+ super(cause);
+ }
+}
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeNoRouteException.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeRuntimeException.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeRuntimeException.java?rev=720505&r1=720504&r2=720505&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeRuntimeException.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeRuntimeException.java Tue Nov 25 06:27:23 2008
@@ -21,11 +21,8 @@
*
*/
public class BlazeRuntimeException extends RuntimeException {
-
- /**
- *
- */
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = -239755000850890447L;
+
/**
* Constructs a new exception with <code>null</code> as its detail message.
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/CoordinatedGroup.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/CoordinatedGroup.java?rev=720505&r1=720504&r2=720505&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/CoordinatedGroup.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/CoordinatedGroup.java Tue Nov 25 06:27:23 2008
@@ -26,10 +26,10 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.activeblaze.group.AsyncGroupRequest;
import org.apache.activeblaze.group.Group;
import org.apache.activeblaze.group.Member;
import org.apache.activeblaze.group.MemberImpl;
+import org.apache.activeblaze.util.AsyncGroupRequest;
import org.apache.activeblaze.wire.ElectionMessage;
import org.apache.activeblaze.wire.ElectionType;
import org.apache.activeblaze.wire.MemberData;
@@ -242,25 +242,27 @@
boolean waitForElection(int timeout) throws Exception {
long deadline = 0;
+ long waitTime = timeout;
if (timeout > 0) {
deadline = System.currentTimeMillis() + timeout;
}
synchronized (this.electionFinished) {
- while (isStarted() && !this.electionFinished.get()) {
+ while (isStarted() && !this.electionFinished.get() && (timeout == 0 || waitTime > 0)) {
try {
- this.electionFinished.wait(timeout);
+ this.electionFinished.wait(waitTime);
} catch (InterruptedException e) {
LOG.warn("Interrupted in waitForElection");
stop();
}
if (timeout > 0) {
- timeout = (int) Math.max(deadline - System.currentTimeMillis(), 0l);
+ waitTime = (int) Math.max(deadline - System.currentTimeMillis(), 0l);
}
}
}
return !isStopped() && this.electionFinished.get();
}
+
protected static List<MemberImpl> sortMemberList(List<MemberImpl> list) {
Collections.sort(list, new Comparator<Member>() {
public int compare(Member m1, Member m2) {
Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/GroupState.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/GroupState.java?rev=720505&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/GroupState.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/GroupState.java Tue Nov 25 06:27:23 2008
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.coordinated;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * <P>
+ * A <CODE>GroupState</CODE> is a distributed collaboration implementation that is
+ * used to shared state and process messages amongst a distributed group of
+ * other <CODE>Group</CODE> instances. Membership of a group is handled
+ * automatically using discovery.
+ * <P>
+ * The underlying transport is JMS and there are some optimizations that occur
+ * for membership if used with ActiveMQ - but <CODE>Group</CODE> can be used
+ * with any JMS implementation.
+ *
+ * <P>
+ * Updates to the group shared map are controlled by a coordinator. The
+ * coordinator is elected by the member with the lowest lexicographical id -
+ * based on the bully algorithm [Silberschatz et al. 1993]
+ * <P>
+ * The {@link #selectCordinator(Collection<Member> members)} method may be
+ * overridden to implement a custom mechanism for choosing how the coordinator
+ * is elected for the map.
+ * <P>
+ * New <CODE>Group</CODE> instances have their state updated by the
+ * coordinator, and coordinator failure is handled automatically within the
+ * group.
+ * <P>
+ * All map updates are totally ordered through the coordinator, whilst read
+ * operations happen locally.
+ * <P>
+ * A <CODE>Group</CODE> supports the concept of owner only updates(write
+ * locks), shared updates, entry expiration times and removal on owner exit -
+ * all of which are optional. In addition, you can grab and release locks for
+ * values in the map, independently of who created them.
+ * <P>
+ * In addition, members of a group can broadcast messages and implement
+ * request/response with other <CODE>Group</CODE> instances.
+ *
+ * <P>
+ * @param <String>
+ * @param <V>
+ *
+ */
+
+public class GroupState<String,V> implements Map<String,V>{
+
+ private final BlazeCoordinatedGroupChannelImpl channel;
+ protected GroupState(BlazeCoordinatedGroupChannelImpl channel) {
+ this.channel=channel;
+ }
+ /**
+ *
+ * @see java.util.Map#clear()
+ */
+ public void clear() {
+ // TODO Auto-generated method stub
+
+ }
+ /**
+ * @param key
+ * @return
+ * @see java.util.Map#containsKey(java.lang.Object)
+ */
+ public boolean containsKey(Object key) {
+ // TODO Auto-generated method stub
+ return false;
+ }
+ /**
+ * @param value
+ * @return
+ * @see java.util.Map#containsValue(java.lang.Object)
+ */
+ public boolean containsValue(Object value) {
+ // TODO Auto-generated method stub
+ return false;
+ }
+ /**
+ * @return
+ * @see java.util.Map#entrySet()
+ */
+ public Set<java.util.Map.Entry<String, V>> entrySet() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+ /**
+ * @param key
+ * @return
+ * @see java.util.Map#get(java.lang.Object)
+ */
+ public V get(Object key) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+ /**
+ * @return
+ * @see java.util.Map#isEmpty()
+ */
+ public boolean isEmpty() {
+ // TODO Auto-generated method stub
+ return false;
+ }
+ /**
+ * @return
+ * @see java.util.Map#keySet()
+ */
+ public Set<String> keySet() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+ /**
+ * @param key
+ * @param value
+ * @return
+ * @see java.util.Map#put(java.lang.Object, java.lang.Object)
+ */
+ public V put(String key, V value) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+ /**
+ * @param t
+ * @see java.util.Map#putAll(java.util.Map)
+ */
+ public void putAll(Map<? extends String, ? extends V> t) {
+ // TODO Auto-generated method stub
+
+ }
+ /**
+ * @param key
+ * @return
+ * @see java.util.Map#remove(java.lang.Object)
+ */
+ public V remove(Object key) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+ /**
+ * @return
+ * @see java.util.Map#size()
+ */
+ public int size() {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+ /**
+ * @return
+ * @see java.util.Map#values()
+ */
+ public Collection<V> values() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+}
\ No newline at end of file
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/GroupState.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java?rev=720505&r1=720504&r2=720505&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java Tue Nov 25 06:27:23 2008
@@ -25,6 +25,7 @@
import java.util.concurrent.ConcurrentHashMap;
import org.apache.activeblaze.BlazeChannelImpl;
import org.apache.activeblaze.BlazeMessage;
+import org.apache.activeblaze.BlazeNoRouteException;
import org.apache.activeblaze.BlazeRuntimeException;
import org.apache.activeblaze.BlazeTopicListener;
import org.apache.activeblaze.Processor;
@@ -32,7 +33,10 @@
import org.apache.activeblaze.impl.processor.Packet;
import org.apache.activeblaze.impl.transport.BaseTransport;
import org.apache.activeblaze.impl.transport.TransportFactory;
+import org.apache.activeblaze.util.AsyncGroupRequest;
+import org.apache.activeblaze.util.LRUCache;
import org.apache.activeblaze.util.PropertyUtil;
+import org.apache.activeblaze.util.SendRequest;
import org.apache.activeblaze.wire.BlazeData;
import org.apache.activeblaze.wire.DestinationData;
import org.apache.activeblaze.wire.MemberData;
@@ -56,7 +60,7 @@
private InetSocketAddress toManagementAddress;
private MemberImpl local;
private BlazeQueueListener inboxListener;
- private Map<Buffer, SendRequest> messageRequests = new HashMap<Buffer, SendRequest>();
+ private Map<Buffer, SendRequest> messageRequests = new LRUCache<Buffer, SendRequest>(1000);
private Map<Buffer, BlazeQueueListener> queueMessageListenerMap = new ConcurrentHashMap<Buffer, BlazeQueueListener>();
private Group group;
private Buffer inboxURI;
@@ -81,7 +85,7 @@
boolean result = super.init();
if (result) {
String unicastURIStr = getConfiguration().getUnicastURI();
- unicastURIStr=PropertyUtil.addPropertiesToURIFromBean(unicastURIStr, getConfiguration());
+ unicastURIStr = PropertyUtil.addPropertiesToURIFromBean(unicastURIStr, getConfiguration());
URI unicastURI = new URI(unicastURIStr);
this.inboxURI = new Buffer(unicastURIStr);
BaseTransport transport = TransportFactory.get(unicastURI);
@@ -91,10 +95,9 @@
// if using a port of zero - the port will be assigned automatically,
// so need to get the potentially new value
unicastURI = transport.getLocalURI();
- //append configuration properties
-
+ // append configuration properties
String groupManagementURIStr = getGroupConfiguration().getGroupManagementURI();
- groupManagementURIStr=PropertyUtil.addPropertiesToURIFromBean(groupManagementURIStr, getConfiguration());
+ groupManagementURIStr = PropertyUtil.addPropertiesToURIFromBean(groupManagementURIStr, getConfiguration());
URI groupManagementURI = new URI(groupManagementURIStr);
this.toManagementAddress = new InetSocketAddress(groupManagementURI.getHost(), groupManagementURI.getPort());
this.groupManagementTransport = TransportFactory.get(groupManagementURI);
@@ -265,9 +268,17 @@
*/
public void send(String destination, BlazeMessage message) throws Exception {
Buffer key = new Buffer(destination);
- MemberImpl member = getQueueDestination(key);
- if (member != null) {
- send(member, key, message);
+ while (true) {
+ MemberImpl member = getQueueDestination(key);
+ if (member != null) {
+ try {
+ send(member, key, message);
+ return;
+ } catch (BlazeNoRouteException e) {
+ }
+ } else {
+ return;
+ }
}
}
@@ -317,9 +328,7 @@
* org.apache.activeblaze.BlazeMessage)
*/
public BlazeMessage sendRequest(String destination, BlazeMessage message) throws Exception {
- Buffer key = new Buffer(destination);
- MemberImpl member = getQueueDestination(key);
- return sendRequest(member, key, message, 0);
+ return sendRequest(destination, message, 0);
}
/**
@@ -333,8 +342,30 @@
*/
public BlazeMessage sendRequest(String destination, BlazeMessage message, int timeout) throws Exception {
Buffer key = new Buffer(destination);
- MemberImpl member = getQueueDestination(key);
- return sendRequest(member, key, message, timeout);
+ long deadline = 0;
+ long waitTime = timeout;
+ if (timeout > 0) {
+ deadline = System.currentTimeMillis() + timeout;
+ }
+ while (!isStopped() && (timeout == 0 || waitTime > 0)) {
+ MemberImpl member = getQueueDestination(key);
+ if (member != null) {
+ try {
+ BlazeMessage result = sendRequest(member, key, message, (int) waitTime);
+ if (result != null) {
+ return result;
+ }
+ } catch (BlazeNoRouteException e) {
+ } finally {
+ if (timeout > 0) {
+ waitTime = (int) Math.max(deadline - System.currentTimeMillis(), 0);
+ }
+ }
+ }else {
+ this.group.waitForNewMember((int) waitTime);
+ }
+ }
+ return null;
}
protected synchronized BlazeMessage sendRequest(MemberImpl member, Buffer destination, BlazeMessage message,
@@ -379,7 +410,6 @@
Packet packet = new Packet(data);
packet.setTo(((MemberImpl) to).getAddress());
this.unicast.downStream(packet);
-
}
protected void send(MemberImpl member, Buffer destination, BlazeMessage message) throws Exception {
@@ -392,6 +422,8 @@
blazeData.setTopic(false);
blazeData.setDestination(destination);
PacketData data = getPacketData(MessageType.BLAZE_DATA, blazeData);
+ data.setReliable(true);
+ data.setResponseRequired(true);
data.setFromAddress(this.inboxURI);
Packet packet = new Packet(data);
packet.setTo(member.getAddress());
@@ -485,7 +517,7 @@
protected void doProcessBlazeData(PacketData data) throws Exception {
BlazeMessage message = (BlazeMessage) buildBlazeMessage(data);
if (message.getContent().getTopic()) {
- super.processBlazeMessage(message);
+ dispatch(message);
} else {
Buffer destination = message.getContent().getDestination();
if (this.inboxListener != null && this.producerId.equals(destination)) {
@@ -571,13 +603,13 @@
}
/**
- * @param to
+ * @param to
* @param messageType
* @param message
* @param correlationId
* @throws Exception
*/
- public synchronized void sendReply(MemberImpl to,MessageType messageType, Message<?> message, String correlationId)
+ public synchronized void sendReply(MemberImpl to, MessageType messageType, Message<?> message, String correlationId)
throws Exception {
PacketData data = getPacketData(messageType, message);
data.setCorrelationId(new Buffer(correlationId));
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Group.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Group.java?rev=720505&r1=720504&r2=720505&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Group.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Group.java Tue Nov 25 06:27:23 2008
@@ -48,6 +48,7 @@
private List<MemberChangedListener> membershipListeners = new CopyOnWriteArrayList<MemberChangedListener>();
private final Map<Buffer, List<MemberImpl>> queueMap = new ConcurrentHashMap<Buffer, List<MemberImpl>>();
private final Map<Buffer, List<MemberImpl>> topicMap = new ConcurrentHashMap<Buffer, List<MemberImpl>>();
+ private final Object memberMutex = new Object();
/**
* Constructor
@@ -69,7 +70,7 @@
public MemberImpl getLocalMember() throws Exception {
return this.channel.getLocalMember();
}
-
+
void updateLocal(MemberImpl member) {
this.members.put(member.getId(), member);
}
@@ -80,7 +81,7 @@
public String getId() {
return this.channel.getId();
}
-
+
/**
* @return the name of the local channel
*/
@@ -143,13 +144,14 @@
}
return null;
}
-
+
/**
* Will wait for a member to advertise itself if not available
+ *
* @param name
* @param timeout
* @return the member or null
- * @throws InterruptedException
+ * @throws InterruptedException
*/
public Member getAndWaitForMemberByName(String name, int timeout) throws InterruptedException {
Member result = null;
@@ -160,13 +162,13 @@
while (true) {
result = getMemberByName(name);
if (result == null) {
- synchronized(this.members) {
+ synchronized (this.members) {
this.members.wait(timeout);
}
if (timeout > 0) {
timeout = (int) Math.max(deadline - System.currentTimeMillis(), 0l);
}
- }else {
+ } else {
break;
}
}
@@ -218,7 +220,7 @@
try {
broadcastHeartBeat(getLocalMember());
} catch (Exception e) {
- LOG.error("Failed to send heartbeat",e);
+ LOG.error("Failed to send heartbeat", e);
}
}
};
@@ -297,6 +299,9 @@
}
private void fireMemberStarted(Member member) {
+ synchronized (this.memberMutex) {
+ this.memberMutex.notifyAll();
+ }
LOG.debug(this.channel.getName() + " Member started " + member);
for (MemberChangedListener l : this.membershipListeners) {
l.memberStarted(member);
@@ -304,6 +309,9 @@
}
private void fireMemberStopped(Member member) {
+ synchronized (this.memberMutex) {
+ this.memberMutex.notifyAll();
+ }
LOG.debug(this.channel.getName() + " Member stopped " + member);
for (MemberChangedListener l : this.membershipListeners) {
l.memberStopped(member);
@@ -315,7 +323,7 @@
long checkTime = System.currentTimeMillis() - this.configuration.getHeartBeatInterval();
for (MemberImpl member : this.members.values()) {
if (!member.getId().equals(getId()) && member.getTimeStamp() < checkTime) {
- LOG.debug(getId() +" Member timestamp expired " + member);
+ LOG.debug(getId() + " Member timestamp expired " + member);
this.members.remove(member.getId());
processMemberStopped(member);
}
@@ -326,7 +334,7 @@
protected void processMemberStarted(MemberImpl member) throws Exception {
processDestinationsForStarted(member);
fireMemberStarted(member);
- synchronized(this.members) {
+ synchronized (this.members) {
this.members.notifyAll();
}
}
@@ -393,10 +401,33 @@
protected Map<Buffer, List<MemberImpl>> getTopicMap() {
return this.topicMap;
}
-
+
protected void broadcastHeartBeat(MemberImpl local) throws Exception {
if (isStarted()) {
Group.this.channel.broadcastMessage(MessageType.MEMBER_DATA, local.getData());
}
}
+
+ protected boolean waitForNewMember(int timeout) throws Exception {
+ int memberCount = this.members.size();
+ long deadline = 0;
+ long waitTime = timeout;
+ if (timeout > 0) {
+ deadline = System.currentTimeMillis() + timeout;
+ }
+ synchronized (this.memberMutex) {
+ while (isStarted() && memberCount >= this.members.size() && (timeout == 0 || waitTime > 0)) {
+ try {
+ this.memberMutex.wait(waitTime);
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted in waitForMember");
+ stop();
+ }
+ if (timeout > 0) {
+ waitTime = Math.max(deadline - System.currentTimeMillis(), 0l);
+ }
+ }
+ }
+ return !isStopped() && memberCount < this.members.size();
+ }
}
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/ChainedProcessor.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/ChainedProcessor.java?rev=720505&r1=720504&r2=720505&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/ChainedProcessor.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/ChainedProcessor.java Tue Nov 25 06:27:23 2008
@@ -178,4 +178,15 @@
LOG.error("No exception listener - caught exception ", e);
}
}
+
+ /**
+ * calls stop - but catches exceptions
+ */
+ protected void stopInternal() {
+ try {
+ stop();
+ } catch (Throwable e) {
+ LOG.error("Caught an exception stopping",e);
+ }
+ }
}
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/Packet.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/Packet.java?rev=720505&r1=720504&r2=720505&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/Packet.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/Packet.java Tue Nov 25 06:27:23 2008
@@ -27,7 +27,7 @@
*/
public final class Packet {
final private SocketAddress from;
- private InetSocketAddress to;
+ private SocketAddress to;
private String id;
final private PacketData packetData;
@@ -138,14 +138,14 @@
/**
* @return the to
*/
- public InetSocketAddress getTo() {
+ public SocketAddress getTo() {
return this.to;
}
/**
* @param to the to to set
*/
- public void setTo(InetSocketAddress to) {
+ public void setTo(SocketAddress to) {
this.to = to;
}
}
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/BaseTransport.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/BaseTransport.java?rev=720505&r1=720504&r2=720505&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/BaseTransport.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/BaseTransport.java Tue Nov 25 06:27:23 2008
@@ -17,17 +17,26 @@
package org.apache.activeblaze.impl.transport;
import java.net.URI;
+import java.util.concurrent.LinkedBlockingQueue;
import org.apache.activeblaze.BlazeConfiguration;
+import org.apache.activeblaze.BlazeMessage;
+import org.apache.activeblaze.impl.processor.ChainedProcessor;
+import org.apache.activeblaze.impl.processor.Packet;
import org.apache.activeblaze.impl.processor.PacketAudit;
+import org.apache.activemq.protobuf.Buffer;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
/**
* Base Class for transports
- *
+ *
*/
-public abstract class BaseTransport extends ThreadChainedProcessor{
+public abstract class BaseTransport extends ThreadChainedProcessor {
+ private static final Log LOG = LogFactory.getLog(BaseTransport.class);
static final int DEFAULT_MAX_PACKET_SIZE = BlazeConfiguration.DEFAULT_MAX_PACKET_SIZE;
static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
private URI localURI;
+ private Buffer bufferOfLocalURI;
private int maxPacketSize = DEFAULT_MAX_PACKET_SIZE;
private int bufferSize = DEFAULT_BUFFER_SIZE;
private int soTimeout = 2000;
@@ -36,39 +45,59 @@
protected final PacketAudit audit = new PacketAudit();
private boolean broadcast = false;
private boolean enableAudit = false;
-
-
+ private int maxDispatchQueueSize = 10000;
+ private LinkedBlockingQueue<Packet> dispatchQueue;
+ private Thread dispatchQueueThread;
+
public boolean init() throws Exception {
boolean result = super.init();
- if(result) {
+ if (result) {
this.audit.init();
+ if (this.localURI != null) {
+ this.bufferOfLocalURI = new Buffer(this.localURI.toString());
+ }
+ this.dispatchQueue = new LinkedBlockingQueue<Packet>(getMaxDispatchQueueSize());
}
return result;
}
-
public boolean shutDown() throws Exception {
boolean result = super.shutDown();
- if(result) {
+ if (result) {
this.audit.shutDown();
}
return result;
}
-
public boolean start() throws Exception {
boolean result = super.start();
- if(result) {
+ if (result) {
this.audit.start();
+ Runnable runable = new Runnable() {
+ public void run() {
+ while (isStarted()) {
+ dequeuePackets();
+ }
+ }
+ };
+ this.dispatchQueueThread = new Thread(runable, getLocalURI() + "-DispatchQueue");
+ this.dispatchQueueThread.setDaemon(true);
+ this.dispatchQueueThread.start();
}
- return result;
+ return result;
}
-
public boolean stop() throws Exception {
boolean result = super.stop();
- if(result) {
+ if (result) {
this.audit.stop();
+ if (this.dispatchQueueThread != null) {
+ this.dispatchQueueThread.interrupt();
+ try {
+ this.dispatchQueueThread.join(100);
+ } catch (InterruptedException e) {
+ }
+ }
}
return result;
}
@@ -76,94 +105,103 @@
/**
* @return the localURI
*/
- public URI getLocalURI(){
+ public URI getLocalURI() {
return this.localURI;
}
/**
- * @param localURI the localURI to set
+ * @param localURI
+ * the localURI to set
*/
- public void setLocalURI(URI localURI){
+ public void setLocalURI(URI localURI) {
this.localURI = localURI;
+ if (this.localURI != null) {
+ this.bufferOfLocalURI = new Buffer(this.localURI.toString());
+ }
}
/**
* @return the maxPacketSize
*/
- public int getMaxPacketSize(){
+ public int getMaxPacketSize() {
return this.maxPacketSize;
}
/**
- * @param maxPacketSize the maxPacketSize to set
+ * @param maxPacketSize
+ * the maxPacketSize to set
*/
- public void setMaxPacketSize(int maxPacketSize){
+ public void setMaxPacketSize(int maxPacketSize) {
this.maxPacketSize = maxPacketSize;
}
/**
* @return the bufferSize
*/
- public int getBufferSize(){
+ public int getBufferSize() {
return this.bufferSize;
}
/**
- * @param bufferSize the bufferSize to set
+ * @param bufferSize
+ * the bufferSize to set
*/
- public void setBufferSize(int bufferSize){
+ public void setBufferSize(int bufferSize) {
this.bufferSize = bufferSize;
}
/**
* @return the soTimeout
*/
- public int getSoTimeout(){
+ public int getSoTimeout() {
return this.soTimeout;
}
/**
- * @param soTimeout the soTimeout to set
+ * @param soTimeout
+ * the soTimeout to set
*/
- public void setSoTimeout(int soTimeout){
+ public void setSoTimeout(int soTimeout) {
this.soTimeout = soTimeout;
}
-
+
/**
* @return the timeToLive
*/
- public int getTimeToLive(){
+ public int getTimeToLive() {
return this.timeToLive;
}
/**
- * @param timeToLive the timeToLive to set
+ * @param timeToLive
+ * the timeToLive to set
*/
- public void setTimeToLive(int timeToLive){
+ public void setTimeToLive(int timeToLive) {
this.timeToLive = timeToLive;
}
/**
* @return the loopBack
*/
- public boolean isLoopBack(){
+ public boolean isLoopBack() {
return this.loopBack;
}
/**
- * @param loopBack the loopBack to set
+ * @param loopBack
+ * the loopBack to set
*/
- public void setLoopBack(boolean loopBack){
+ public void setLoopBack(boolean loopBack) {
this.loopBack = loopBack;
}
-
+
/**
* @return the audit
*/
- protected PacketAudit getAudit(){
+ protected PacketAudit getAudit() {
return this.audit;
}
-
+
/**
* @return the broadcast
*/
@@ -179,7 +217,6 @@
this.broadcast = broadcast;
}
-
/**
* @return the enableAudit
*/
@@ -187,11 +224,58 @@
return this.enableAudit;
}
-
/**
- * @param enableAudit the enableAudit to set
+ * @param enableAudit
+ * the enableAudit to set
*/
public void setEnableAudit(boolean enableAudit) {
this.enableAudit = enableAudit;
}
+
+ /**
+ * @return the bufferOfLocalURI
+ */
+ public Buffer getBufferOfLocalURI() {
+ return this.bufferOfLocalURI;
+ }
+
+ public String toString() {
+ return this.localURI != null ? this.localURI.toString() : " Uninitialized Transport";
+ }
+
+ /**
+ * @return the maxDispatchQueueSize
+ */
+ public int getMaxDispatchQueueSize() {
+ return this.maxDispatchQueueSize;
+ }
+
+ /**
+ * @param maxDispatchQueueSize
+ * the maxDispatchQueueSize to set
+ */
+ public void setMaxDispatchQueueSize(int maxDispatchQueueSize) {
+ this.maxDispatchQueueSize = maxDispatchQueueSize;
+ }
+
+ public void upStream(Packet packet) throws Exception {
+ if (!isStopped()) {
+ this.dispatchQueue.put(packet);
+ }
+ }
+
+ protected void dequeuePackets() {
+ Packet packet = null;
+ try {
+ packet = this.dispatchQueue.take();
+ if (packet != null) {
+ super.upStream(packet);
+ }
+ } catch (InterruptedException e1) {
+ // we've stopped
+ } catch (Exception e) {
+ LOG.error("Caught an exception processing a packet: " + packet, e);
+ stopInternal();
+ }
+ }
}
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/MulticastTransport.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/MulticastTransport.java?rev=720505&r1=720504&r2=720505&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/MulticastTransport.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/MulticastTransport.java Tue Nov 25 06:27:23 2008
@@ -33,13 +33,11 @@
public class MulticastTransport extends BaseTransport {
private MulticastSocket socket;
private String networkInterface;
- private byte[] receiveData;
private InetSocketAddress socketAddress;
public boolean init() throws Exception {
boolean result = super.init();
if (result) {
- this.receiveData = new byte[getMaxPacketSize()];
this.socket = new MulticastSocket(getLocalURI().getPort());
this.socket.setTimeToLive(getTimeToLive());
this.socket.setLoopbackMode(isLoopBack());
@@ -76,7 +74,8 @@
protected void doProcess() throws Exception {
if (isInitialized()) {
- DatagramPacket dp = new DatagramPacket(this.receiveData, this.receiveData.length);
+ byte[] receiveData = new byte[getMaxPacketSize()];
+ DatagramPacket dp = new DatagramPacket(receiveData, receiveData.length);
this.socket.receive(dp);
if (dp.getLength() > 0) {
PacketData data = PacketData.parseFramed(dp.getData());
@@ -96,7 +95,7 @@
this.audit.isDuplicate(packet);
}
byte[] data = packet.getPacketData().toFramedByteArray();
- InetSocketAddress to = packet.getTo();
+ SocketAddress to = packet.getTo();
DatagramPacket dp = new DatagramPacket(data, data.length, to);
this.socket.send(dp);
} else {