You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2008/11/21 21:44:43 UTC

svn commit: r719706 [2/6] - in /activemq/activemq-blaze: ./ branches/ tags/ trunk/ trunk/src/ trunk/src/main/ trunk/src/main/java/ trunk/src/main/java/org/ trunk/src/main/java/org/apache/ trunk/src/main/java/org/apache/activeblaze/ trunk/src/main/java/...

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BaseService.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BaseService.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BaseService.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BaseService.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Implementation of a Service
+ *
+ */
+public class BaseService implements Service {
+
+    AtomicBoolean initialialized = new AtomicBoolean();
+    AtomicBoolean started = new AtomicBoolean();
+   
+    public boolean init() throws Exception {
+        return this.initialialized.compareAndSet(false, true);
+    }
+
+    
+    public boolean shutDown() throws Exception {
+        if (isStarted()) {
+            stop();
+        }
+        return this.initialialized.compareAndSet(true, false);
+    }
+
+    
+    public boolean start() throws Exception {
+        if (!this.initialialized.get()) {
+            init();
+        }
+        return this.started.compareAndSet(false, true); 
+    }
+
+    
+    public boolean stop() throws Exception {
+        if (!isInitialized()) {
+            init();
+        }
+        return this.started.compareAndSet(true, false);
+    }
+    
+    public boolean isStarted() {
+        return this.started.get();
+    }
+    
+    public boolean isStopped() {
+        return !isStarted();
+    }
+   
+    public boolean isInitialized() {
+        return this.initialialized.get();
+    }
+
+    public boolean isShutDown() {
+       return !isInitialized();
+    }
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BaseService.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannel.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannel.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannel.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannel.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze;
+
+/**
+ * <P>
+ * A <CODE>BlazeChannel</CODE> is a wrapper for reliable multicast communication
+ * 
+ */
+public interface BlazeChannel extends Service {
+    /**
+     * @return the id
+     */    
+    public String getId();
+
+    /**
+     * broadcast as message
+     * @param destination
+     * @param msg
+     * @throws Exception 
+     */
+    public void broadcast(String destination, BlazeMessage msg) throws Exception;
+
+    /**
+     * @return the configuration
+     */
+    public BlazeConfiguration getConfiguration();
+    
+    /**
+     * Add a listener for messages
+     * @param destination
+     * @param l
+     * @throws Exception 
+     */
+    public void addBlazeTopicMessageListener(String destination, BlazeTopicListener l) throws Exception;
+    
+    /**
+     * Remove a listener for messages
+     * @param destination
+     * @return the removed listener
+     * @throws Exception 
+     */
+    public BlazeTopicListener removeBlazeTopicMessageListener(String destination) throws Exception;
+    
+    /**
+     * Set an exception listener for async exceptions that can be generated
+     * by a Transport thread
+     * @param l
+     */
+    public void setExceptionListener(ExceptionListener l);
+
+}
\ No newline at end of file

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannel.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelFactory.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelFactory.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelFactory.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelFactory.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze;
+
+/**
+ * Factory class for creating <Code>BlazeChannel</CODE>
+ */
+public class BlazeChannelFactory {
+    private BlazeConfiguration configuration;
+    
+    /**
+     * Default Constructor
+     */
+    public BlazeChannelFactory() {
+        this.configuration = new BlazeConfiguration();
+    }
+    
+    /**
+     * Construct a factory to use the passed Configuration
+     * @param config
+     */
+    public BlazeChannelFactory(BlazeConfiguration config){
+        this.configuration=config;
+    }
+    
+    /**
+     * Create a Channel
+     * @return the Channel
+     * @throws Exception 
+     */
+    public BlazeChannel createChannel() throws Exception {
+        BlazeChannelImpl result = new BlazeChannelImpl();
+        result.setConfiguration(getConfiguration().copy());
+        return result;
+    }
+
+    /**
+     * @return the configuration
+     */
+    public BlazeConfiguration getConfiguration() {
+        return this.configuration;
+    }
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,321 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze;
+
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.activeblaze.impl.destination.DestinationMatch;
+import org.apache.activeblaze.impl.processor.ChainedProcessor;
+import org.apache.activeblaze.impl.processor.CompressionProcessor;
+import org.apache.activeblaze.impl.processor.FragmentationProcessor;
+import org.apache.activeblaze.impl.processor.Packet;
+import org.apache.activeblaze.impl.reliable.ReliableFactory;
+import org.apache.activeblaze.impl.transport.BaseTransport;
+import org.apache.activeblaze.impl.transport.TransportFactory;
+import org.apache.activeblaze.util.IdGenerator;
+import org.apache.activeblaze.util.PropertyUtil;
+import org.apache.activeblaze.wire.BlazeData;
+import org.apache.activeblaze.wire.MessageType;
+import org.apache.activeblaze.wire.PacketData;
+import org.apache.activemq.protobuf.Buffer;
+import org.apache.activemq.protobuf.Message;
+
+/**
+ * <P>
+ * A <CODE>BlazeChannel</CODE> handles all client communication, either unicast, broadcast or multicast with other
+ * peers in the Blaze network
+ * 
+ * 
+ */
+public class BlazeChannelImpl extends ChainedProcessor implements BlazeChannel, ExceptionListener {
+    protected Map<Buffer, BlazeTopicListener> topicessageListenerMap = new ConcurrentHashMap<Buffer, BlazeTopicListener>();
+    protected final IdGenerator idGenerator = new IdGenerator();
+    protected Buffer producerId;
+    protected AtomicLong sequence = new AtomicLong();
+    protected AtomicLong session = new AtomicLong(1);
+    private Processor broadcast;
+    private BlazeConfiguration configuration = new BlazeConfiguration();
+    private String id;
+    private LinkedBlockingQueue<BlazeMessage> broadcastQueue;
+    private Thread broadcastQueueThread;
+    private Buffer managementURI;
+    private InetSocketAddress toAddress;
+
+    /**
+     * Constructor
+     * 
+     * @param prev
+     * @param next
+     */
+    protected BlazeChannelImpl() {
+        this.id = this.idGenerator.generateId();
+        this.producerId = new Buffer(this.id);
+    }
+
+    public String getId() {
+        return this.id;
+    }
+
+    /**
+     * @param destination
+     * @param l
+     * @throws Exception 
+     * @see org.apache.activeblaze.BlazeChannel#addBlazeTopicMessageListener(java.lang.String,
+     *      org.apache.activeblaze.BlazeTopicListener)
+     */
+    public void addBlazeTopicMessageListener(String destination, BlazeTopicListener l) throws Exception {
+        Buffer key = new Buffer(destination);
+        this.topicessageListenerMap.put(key, l);
+    }
+
+    /**
+     * @param destination
+     * @param l
+     * @return
+     * @throws Exception 
+     * @see org.apache.activeblaze.BlazeChannel#removeBlazeMessageListener(java.lang.String,
+     *      org.apache.activeblaze.BlazeTopicListener)
+     */
+    public BlazeTopicListener removeBlazeTopicMessageListener(String destination) throws Exception {
+        Buffer key = new Buffer(destination);
+        return this.topicessageListenerMap.remove(key);
+    }
+
+    public boolean init() throws Exception {
+        boolean result = super.init();
+        if (result) {
+            this.broadcastQueue = new LinkedBlockingQueue<BlazeMessage>(getConfiguration().getMaxDispatchQueueSize());
+            String broadcastURIStr = getConfiguration().getBroadcastURI();
+            broadcastURIStr=PropertyUtil.addPropertiesToURIFromBean(broadcastURIStr, getConfiguration());
+            URI broadcastURI = new URI(broadcastURIStr);
+            this.toAddress = new InetSocketAddress(broadcastURI.getHost(), broadcastURI.getPort());
+            this.managementURI = new Buffer(new URI(getConfiguration().getManagementURI()).toString());
+            BaseTransport transport = TransportFactory.get(broadcastURI);
+            transport.setName(getId() + "-Broadcast");
+            this.broadcast = configureProcess(transport);
+            this.broadcast.init();
+        }
+        return result;
+    }
+
+    protected final void configureTransport(BaseTransport transport) throws Exception {
+        transport.setMaxPacketSize(getConfiguration().getMaxPacketSize());
+    }
+
+    protected Processor configureProcess(BaseTransport transport) throws Exception {
+        int maxPacketSize = getConfiguration().getMaxPacketSize();
+        configureTransport(transport);
+        
+        CompressionProcessor result = new CompressionProcessor();
+        result.setPrev(this);
+        result.setExceptionListener(this);
+        FragmentationProcessor fp = new FragmentationProcessor();
+        fp.setMaxPacketSize(maxPacketSize);
+        result.setEnd(fp);
+        ChainedProcessor reliable = ReliableFactory.get(getConfiguration().getReliable());
+        result.setEnd(reliable);
+        result.setEnd(transport);
+        return result;
+    }
+
+    public boolean shutDown() throws Exception {
+        boolean result = super.shutDown();
+        if (result) {
+            this.broadcast.shutDown();
+        }
+        return result;
+    }
+
+    public boolean start() throws Exception {
+        boolean result = super.start();
+        if (result) {
+            if (getConfiguration().isUseDispatchThread()) {
+                Runnable runable = new Runnable() {
+                    public void run() {
+                        while (isStarted()) {
+                            dequeueBroadcastMessages();
+                        }
+                    }
+                };
+                this.broadcastQueueThread = new Thread(runable, getId() + "-BroadcastQueue");
+                this.broadcastQueueThread.setDaemon(true);
+                this.broadcastQueueThread.start();
+            }
+            this.broadcast.start();
+        }
+        return result;
+    }
+
+    public boolean stop() throws Exception {
+        boolean result = super.stop();
+        if (result) {
+            if (this.broadcastQueueThread != null) {
+                this.broadcastQueueThread.interrupt();
+                try {
+                    this.broadcastQueueThread.join(1000);
+                } catch (InterruptedException e) {
+                }
+            }
+            this.broadcast.stop();
+        }
+        return result;
+    }
+
+    public synchronized void broadcast(String destination, BlazeMessage msg) throws Exception {
+        msg.storeContent();
+        BlazeData blazeData = msg.getContent();
+        blazeData.setTopic(true);
+        blazeData.setDestination(new Buffer(destination));
+        PacketData packetData = getPacketData(MessageType.BLAZE_DATA, blazeData);
+        packetData.setReliable(true);
+        packetData.setFromAddress(this.managementURI);
+        Packet packet = new Packet(packetData);
+        packet.setTo(this.toAddress);
+        this.broadcast.downStream(packet);
+    }
+
+    protected synchronized PacketData getPacketData(MessageType type, Message<?> message) {
+        PacketData packetData = new PacketData();
+        packetData.setFromAddress(this.managementURI);
+        packetData.setType(type.getNumber());
+        packetData.setProducerId(this.producerId);
+        packetData.setSessionId(this.session.get());
+        long sequence = this.sequence.incrementAndGet();
+        packetData.setMessageSequence(sequence);
+        packetData.setPayload(message.toFramedBuffer());
+        StringBuilder builder = new StringBuilder(this.id.length() + 32);
+        builder.append(this.id).append(":").append(sequence);
+        packetData.setMessageId(new Buffer(builder.toString()));
+        return packetData;
+    }
+
+    public void upStream(Packet packet) throws Exception {
+        PacketData data = packet.getPacketData();
+        processData(packet.getId(), data.getCorrelationId(), data);
+    }
+
+    protected void processData(String id, Buffer correlationId, PacketData data) throws Exception {
+        MessageType type = MessageType.valueOf(data.getType());
+        if (type == MessageType.BLAZE_DATA) {
+            doProcessBlazeData(data);
+        }
+    }
+
+    public BlazeConfiguration getConfiguration() {
+        return this.configuration;
+    }
+
+    /**
+     * @param configuration
+     * @see org.apache.activeblaze.BlazeChannel#setConfiguration(org.apache.activeblaze.BlazeConfiguration)
+     */
+    public void setConfiguration(BlazeConfiguration configuration) {
+        this.configuration = configuration;
+    }
+
+    /**
+     * @param ex
+     * @see org.apache.activeblaze.ExceptionListener#onException(java.lang.Exception)
+     */
+    public void onException(Exception ex) {
+        doFireException(ex);
+    }
+
+    protected void doProcessBlazeData(PacketData data) throws Exception {
+        BlazeMessage message = buildBlazeMessage(data);
+        processBlazeMessage(message);
+    }
+
+    protected final BlazeMessage buildBlazeMessage(PacketData data) throws Exception {
+        BlazeMessage message = null;
+        if (data != null) {
+            MessageType type = MessageType.BLAZE_DATA;
+            BlazeData blazeData = (BlazeData) type.createMessage();
+            Buffer payload = data.getPayload();
+            blazeData.mergeFramed(payload);
+            String fromId = null;
+            if (data.hasProducerId()) {
+                fromId = data.getProducerId().toStringUtf8();
+            }
+            message = createMessage(fromId);
+            message.setDestination(blazeData.getDestination().toStringUtf8());
+            message.setFromId(fromId);
+            if (data.hasMessageId()) {
+                message.setMessageId(data.getMessageId().toStringUtf8());
+            }
+            if (data.hasCorrelationId()) {
+                message.setCorrelationId(data.getCorrelationId().toStringUtf8());
+            }
+            message.setTimeStamp(blazeData.getTimestamp());
+            message.setContent(blazeData);
+        }
+        return message;
+    }
+
+    protected BlazeMessage createMessage(String fromId) {
+        return new BlazeMessage();
+    }
+
+    protected void processBlazeMessage(BlazeMessage message) {
+        if (this.broadcastQueueThread == null) {
+            dispatch(message);
+        } else {
+            try {
+                this.broadcastQueue.put(message);
+            } catch (InterruptedException e) {
+                // ignore - we are stopping
+            }
+        }
+    }
+
+    protected void dequeueBroadcastMessages() {
+        BlazeMessage message = null;
+        try {
+            message = this.broadcastQueue.take();
+        } catch (InterruptedException e1) {
+        }
+        dispatch(message);
+    }
+
+    protected void dispatch(BlazeMessage message) {
+        if (message != null) {
+            Buffer destination = message.getContent().getDestination();
+            for (Map.Entry<Buffer, BlazeTopicListener> entry : this.topicessageListenerMap.entrySet()) {
+                if (DestinationMatch.isMatch(destination, entry.getKey())) {
+                    entry.getValue().onMessage(message);
+                }
+            }
+        }
+    }
+    
+    /**
+     * shutdown on gc
+     * @throws Throwable
+     * @see java.lang.Object#finalize()
+     */
+    protected void finalize() throws Throwable {
+        try {
+            shutDown();
+        } finally {
+            super.finalize();
+        }
+    }
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeConfiguration.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeConfiguration.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeConfiguration.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeConfiguration.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze;
+
+import java.util.Map;
+import org.apache.activeblaze.util.PropertyUtil;
+
+/**
+ * Basic Configuration for a BlazeChannel
+ * 
+ */
+public class BlazeConfiguration {
+    /**
+     * Max size for datagrams
+     */
+    public static final int DEFAULT_MAX_PACKET_SIZE = 4 * 1024;
+    // transport bindings
+    private int unicastPort = 0;
+    private String unicastURI = "udp://localhost:0";
+    private String broadcastURI = "mcast://224.2.2.2:9999";
+    private String managementURI = "mcast://224.2.2.2:8888";
+    // Channel internals
+    private boolean useDispatchThread = true;
+    private int maxDispatchQueueSize = 10000;
+    private int maxPacketSize = DEFAULT_MAX_PACKET_SIZE;
+    //reliability
+    private String reliable = "simple";
+
+    /**
+     * @return the unicastPort
+     */
+    public int getUnicastPort() {
+        return this.unicastPort;
+    }
+
+    /**
+     * @param unicastPort
+     *            the unicastPort to set
+     */
+    public void setUnicastPort(int unicastPort) {
+        this.unicastPort = unicastPort;
+    }
+
+    /**
+     * @return the unicastURL
+     */
+    public String getUnicastURI() {
+        return this.unicastURI;
+    }
+
+    /**
+     * @param unicastURI
+     *            the unicastURI to set
+     */
+    public void setUnicastURI(String unicastURI) {
+        this.unicastURI = unicastURI;
+    }
+
+    /**
+     * @return the broadcastURL
+     */
+    public String getBroadcastURI() {
+        return this.broadcastURI;
+    }
+
+    /**
+     * @param broadcastURL
+     *            the broadcastURL to set
+     */
+    public void setBroadcastURI(String broadcastURL) {
+        this.broadcastURI = broadcastURL;
+    }
+
+    /**
+     * @return the useDispatchThread
+     */
+    public boolean isUseDispatchThread() {
+        return this.useDispatchThread;
+    }
+
+    /**
+     * @param useDispatchThread
+     *            the useDispatchThread to set
+     */
+    public void setUseDispatchThread(boolean useDispatchThread) {
+        this.useDispatchThread = useDispatchThread;
+    }
+
+    /**
+     * @return the maxDispatchQueueSize
+     */
+    public int getMaxDispatchQueueSize() {
+        return this.maxDispatchQueueSize;
+    }
+
+    /**
+     * @param maxDispatchQueueSize
+     *            the maxDispatchQueueSize to set
+     */
+    public void setMaxDispatchQueueSize(int maxDispatchQueueSize) {
+        this.maxDispatchQueueSize = maxDispatchQueueSize;
+    }
+
+    /**
+     * @return the managmentURI
+     */
+    public String getManagementURI() {
+        return this.managementURI;
+    }
+
+    /**
+     * @param managementURI
+     *            the managementURI to set
+     */
+    public void setManagementURI(String managementURI) {
+        this.managementURI = managementURI;
+    }
+
+    /**
+     * @return the maxPacketSize
+     */
+    public int getMaxPacketSize() {
+        return this.maxPacketSize;
+    }
+
+    /**
+     * @param maxPacketSize
+     *            the maxPacketSize to set
+     */
+    public void setMaxPacketSize(int maxPacketSize) {
+        this.maxPacketSize = maxPacketSize;
+    }
+
+    /**
+     * Copy the configuration
+     * @return a deep copy of the configuration
+     * @throws Exception
+     */
+    public final BlazeConfiguration copy() throws Exception {
+        Map<String,String>props = PropertyUtil.getProperties(this);
+        BlazeConfiguration result = newInstance();
+        PropertyUtil.setProperties(result, props);
+        return result;
+    }
+
+    protected BlazeConfiguration newInstance() {
+        return new BlazeConfiguration();
+    }
+
+    /**
+     * @return the reliable
+     */
+    public String getReliable() {
+        return this.reliable;
+    }
+
+    /**
+     * @param reliable the reliable to set
+     */
+    public void setReliable(String reliable) {
+        this.reliable = reliable;
+    }
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeConfiguration.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeException.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeException.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeException.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeException.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze;
+
+/**
+ * Exception raised on internal error
+ *
+ */
+public class BlazeException extends Exception  {
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 1064152356749288271L;
+
+    /**
+     * Constructs a new exception with <code>null</code> as its detail message.
+     * The cause is not initialized, and may subsequently be initialized by a
+     * call to {@link #initCause}.
+     */
+    public BlazeException() {
+    super();
+    }
+
+    /**
+     * Constructs a new exception with the specified detail message.  The
+     * cause is not initialized, and may subsequently be initialized by
+     * a call to {@link #initCause}.
+     *
+     * @param   message   the detail message. The detail message is saved for 
+     *          later retrieval by the {@link #getMessage()} method.
+     */
+    public BlazeException(String message) {
+    super(message);
+    }
+
+    /**
+     * Constructs a new exception with the specified detail message and
+     * cause.  <p>Note that the detail message associated with
+     * <code>cause</code> is <i>not</i> automatically incorporated in
+     * this exception's detail message.
+     *
+     * @param  message the detail message (which is saved for later retrieval
+     *         by the {@link #getMessage()} method).
+     * @param  cause the cause (which is saved for later retrieval by the
+     *         {@link #getCause()} method).  (A <tt>null</tt> value is
+     *         permitted, and indicates that the cause is nonexistent or
+     *         unknown.)
+     */
+    public BlazeException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    /**
+     * Constructs a new exception with the specified cause and a detail
+     * message of <tt>(cause==null ? null : cause.toString())</tt> (which
+     * typically contains the class and detail message of <tt>cause</tt>).
+     * This constructor is useful for exceptions that are little more than
+     * wrappers for other throwables (for example, {@link
+     * java.security.PrivilegedActionException}).
+     *
+     * @param  cause the cause (which is saved for later retrieval by the
+     *         {@link #getCause()} method).  (A <tt>null</tt> value is
+     *         permitted, and indicates that the cause is nonexistent or
+     *         unknown.)
+     */
+    public BlazeException(Throwable cause) {
+        super(cause);
+    }
+
+    
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeException.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessage.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessage.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessage.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessage.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,918 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze;
+
+import java.security.Key;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.activeblaze.wire.BlazeData;
+import org.apache.activeblaze.wire.BoolType;
+import org.apache.activeblaze.wire.ByteType;
+import org.apache.activeblaze.wire.BytesType;
+import org.apache.activeblaze.wire.CharType;
+import org.apache.activeblaze.wire.DoubleType;
+import org.apache.activeblaze.wire.FloatType;
+import org.apache.activeblaze.wire.IntType;
+import org.apache.activeblaze.wire.LongType;
+import org.apache.activeblaze.wire.MapData;
+import org.apache.activeblaze.wire.ShortType;
+import org.apache.activeblaze.wire.StringType;
+import org.apache.activemq.protobuf.Buffer;
+
+
+/**
+ * A <CODE>BlazeMessage</CODE> object is used to send a set of name-value pairs. The names are <CODE>String</CODE>
+ * objects, and the values are primitive data types in the Java programming language. The names must have a value that
+ * is not null, and not an empty string. The entries can be accessed sequentially or randomly by name. The order of the
+ * entries is undefined. <CODE>BlazeMessage</CODE> inherits from the <CODE>Message</CODE> interface and adds a
+ * message body that contains a Map.
+ * <P>
+ * The primitive types can be read or written explicitly using methods for each type. They may also be read or written
+ * generically as objects. For instance, a call to <CODE>BlazeMessage.setInt("foo", 6)</CODE> is equivalent to
+ * <CODE> BlazeMessage.setObject("foo", new Integer(6))</CODE>. Both forms are provided, because the explicit form is
+ * convenient for static programming, and the object form is needed when types are not known at compile time.
+ * <P>
+ * <P>
+ * <CODE>BlazeMessage</CODE> objects support the following conversion table. The marked cases must be supported. The
+ * unmarked cases must throw a <CODE>JMSException</CODE>. The <CODE>String</CODE> -to-primitive conversions may
+ * throw a runtime exception if the primitive's <CODE>valueOf()</CODE> method does not accept it as a valid
+ * <CODE> String</CODE> representation of the primitive.
+ * <P>
+ * A value written as the row type can be read as the column type. <p/>
+ * 
+ * <PRE>
+ * | | boolean byte short char int long float double String byte[] |----------------------------------------------------------------------
+ * |boolean | X X |byte | X X X X X |short | X X X X |char | X X |int | X X X |long | X X |float | X X X |double | X X
+ * |String | X X X X X X X X |byte[] | X |----------------------------------------------------------------------
+ * &lt;p/&gt;
+ * </PRE>
+ * 
+ * <p/>
+ * <P>
+ * Attempting to read a null value as a primitive type must be treated as calling the primitive's corresponding
+ * <code>valueOf(String)</code> conversion method with a null value. Since <code>char</code> does not support a
+ * <code>String</code> conversion, attempting to read a null value as a <code>char</code> must throw a
+ * <code>NullPointerException</code>.
+ * 
+ */
+public class BlazeMessage implements Map<String, Object>{
+    private static final String DEFAULT_TEXT_PAYLOAD = "DEFAULT_TEXT_PAYLOAD";
+    private static final String DEFAULT_BYTES_PAYLOAD = "DEFAULT_BYTES_PAYLOAD";
+    private transient Map<String, Object> map = new ConcurrentHashMap<String, Object>();
+    private transient String destination;
+    private transient String fromId;
+    private transient String messageId;
+    private transient String correlationId;
+    private transient long timeStamp;
+    private BlazeData content;
+    
+    /**
+     * Default Constructor
+     */
+    public BlazeMessage() {
+    }
+    
+    /**
+     * Constructor - Utility to construct a message with a text <Code>String</Code> payload
+     * @param text
+     */
+    public BlazeMessage(String text) {
+        setString(DEFAULT_TEXT_PAYLOAD,text);
+    }
+    
+    /**
+     * Constructor - Utility to construct a message with a byte[] array payload
+     * @param data
+     */
+    public BlazeMessage(byte[] data) {
+        setBytes(DEFAULT_BYTES_PAYLOAD,data);
+    }
+    
+    /**
+     * Utility method for setting a default <Code>String</Code> payload
+     * @param text
+     */
+    public void setText(String text) {
+        setString(DEFAULT_TEXT_PAYLOAD,text);
+    }
+    
+    /**
+     * Utility method used for when a BlazeMessage is only carrying a String
+     * @return text the default text
+     * @throws Exception
+     */
+    public String getText() throws Exception {
+        return getString(DEFAULT_TEXT_PAYLOAD);
+    }
+    
+    /**
+     * Utility method for setting a default <Code>String</Code> payload
+     * @param payload 
+     */
+    public void setBytes(byte[] payload) {
+        setBytes(DEFAULT_BYTES_PAYLOAD,payload);
+    }
+    
+    /**
+     * Utility method used for when a BlazeMessage is only carrying a String
+     * @return text the default text
+     * @throws Exception
+     */
+    public byte[] getBytes() throws Exception {
+        return getBytes(DEFAULT_BYTES_PAYLOAD);
+    }
+    
+    /**
+     * @param copy2 
+     * @return a copy of this message
+     * @throws BlazeException
+     */
+    public BlazeMessage copy() throws BlazeException{
+        BlazeMessage copy = new BlazeMessage();
+        copy(copy);
+        return copy;
+    }
+    
+    /**
+     * clear the contents of this message
+     */
+    public void clear(){
+        this.map.clear();
+    }
+    /**
+     * Returns the <CODE>boolean</CODE> value with the specified name.
+     * 
+     * @param name the name of the <CODE>boolean</CODE>
+     * @return the <CODE>boolean</CODE> value with the specified name
+     * @throws BlazeMessageFormatException if this type conversion is invalid.
+     */
+    public boolean getBoolean(String name) throws BlazeMessageFormatException{
+        initializeReading();
+        Object value = this.map.get(name);
+        if (value == null) {
+            return false;
+        }
+        if (value instanceof Boolean) {
+            return ((Boolean) value).booleanValue();
+        }
+        if (value instanceof String) {
+            return Boolean.valueOf(value.toString()).booleanValue();
+        } else {
+            throw new BlazeMessageFormatException(" cannot read a boolean from " + value.getClass().getName());
+        }
+    }
+    /**
+     * Returns the <CODE>byte</CODE> value with the specified name.
+     * 
+     * @param name the name of the <CODE>byte</CODE>
+     * @return the <CODE>byte</CODE> value with the specified name
+     * @throws BlazeMessageFormatException if this type conversion is invalid.
+     */
+    public byte getByte(String name) throws BlazeMessageFormatException{
+        initializeReading();
+        Object value = this.map.get(name);
+        if (value == null) {
+            return 0;
+        }
+        if (value instanceof Byte) {
+            return ((Byte) value).byteValue();
+        }
+        if (value instanceof String) {
+            return Byte.valueOf(value.toString()).byteValue();
+        } else {
+            throw new BlazeMessageFormatException(" cannot read a byte from " + value.getClass().getName());
+        }
+    }
+    /**
+     * Returns the <CODE>short</CODE> value with the specified name.
+     * 
+     * @param name the name of the <CODE>short</CODE>
+     * @return the <CODE>short</CODE> value with the specified name
+     * @throws BlazeMessageFormatException if this type conversion is invalid.
+     */
+    public short getShort(String name) throws BlazeMessageFormatException{
+        initializeReading();
+        Object value = this.map.get(name);
+        if (value == null) {
+            return 0;
+        }
+        if (value instanceof Short) {
+            return ((Short) value).shortValue();
+        }
+        if (value instanceof Byte) {
+            return ((Byte) value).shortValue();
+        }
+        if (value instanceof String) {
+            return Short.valueOf(value.toString()).shortValue();
+        } else {
+            throw new BlazeMessageFormatException(" cannot read a short from " + value.getClass().getName());
+        }
+    }
+    /**
+     * Returns the Unicode character value with the specified name.
+     * 
+     * @param name the name of the Unicode character
+     * @return the Unicode character value with the specified name
+     * @throws BlazeMessageFormatException if this type conversion is invalid.
+     */
+    public char getChar(String name) throws BlazeMessageFormatException{
+        initializeReading();
+        Object value = this.map.get(name);
+        if (value == null) {
+            throw new NullPointerException();
+        }
+        if (value instanceof Character) {
+            return ((Character) value).charValue();
+        } else {
+            throw new BlazeMessageFormatException(" cannot read a short from " + value.getClass().getName());
+        }
+    }
+    /**
+     * Returns the <CODE>int</CODE> value with the specified name.
+     * 
+     * @param name the name of the <CODE>int</CODE>
+     * @return the <CODE>int</CODE> value with the specified name
+     * @throws BlazeMessageFormatException if this type conversion is invalid.
+     */
+    public int getInt(String name) throws BlazeMessageFormatException{
+        initializeReading();
+        Object value = this.map.get(name);
+        if (value == null) {
+            return 0;
+        }
+        if (value instanceof Integer) {
+            return ((Integer) value).intValue();
+        }
+        if (value instanceof Short) {
+            return ((Short) value).intValue();
+        }
+        if (value instanceof Byte) {
+            return ((Byte) value).intValue();
+        }
+        if (value instanceof String) {
+            return Integer.valueOf(value.toString()).intValue();
+        } else {
+            throw new BlazeMessageFormatException(" cannot read an int from " + value.getClass().getName());
+        }
+    }
+    /**
+     * Returns the <CODE>long</CODE> value with the specified name.
+     * 
+     * @param name the name of the <CODE>long</CODE>
+     * @return the <CODE>long</CODE> value with the specified name
+     * @throws BlazeMessageFormatException if this type conversion is invalid.
+     */
+    public long getLong(String name) throws BlazeMessageFormatException{
+        initializeReading();
+        Object value = this.map.get(name);
+        if (value == null) {
+            return 0;
+        }
+        if (value instanceof Long) {
+            return ((Long) value).longValue();
+        }
+        if (value instanceof Integer) {
+            return ((Integer) value).longValue();
+        }
+        if (value instanceof Short) {
+            return ((Short) value).longValue();
+        }
+        if (value instanceof Byte) {
+            return ((Byte) value).longValue();
+        }
+        if (value instanceof String) {
+            return Long.valueOf(value.toString()).longValue();
+        } else {
+            throw new BlazeMessageFormatException(" cannot read a long from " + value.getClass().getName());
+        }
+    }
+    /**
+     * Returns the <CODE>float</CODE> value with the specified name.
+     * 
+     * @param name the name of the <CODE>float</CODE>
+     * @return the <CODE>float</CODE> value with the specified name
+     * @throws BlazeMessageFormatException if this type conversion is invalid.
+     */
+    public float getFloat(String name) throws BlazeMessageFormatException{
+        initializeReading();
+        Object value = this.map.get(name);
+        if (value == null) {
+            return 0;
+        }
+        if (value instanceof Float) {
+            return ((Float) value).floatValue();
+        }
+        if (value instanceof String) {
+            return Float.valueOf(value.toString()).floatValue();
+        } else {
+            throw new BlazeMessageFormatException(" cannot read a float from " + value.getClass().getName());
+        }
+    }
+    /**
+     * Returns the <CODE>double</CODE> value with the specified name.
+     * 
+     * @param name the name of the <CODE>double</CODE>
+     * @return the <CODE>double</CODE> value with the specified name
+     * @throws BlazeMessageFormatException if this type conversion is invalid.
+     */
+    public double getDouble(String name) throws BlazeMessageFormatException{
+        initializeReading();
+        Object value = this.map.get(name);
+        if (value == null) {
+            return 0;
+        }
+        if (value instanceof Double) {
+            return ((Double) value).doubleValue();
+        }
+        if (value instanceof Float) {
+            return ((Float) value).floatValue();
+        }
+        if (value instanceof String) {
+            return Float.valueOf(value.toString()).floatValue();
+        } else {
+            throw new BlazeMessageFormatException(" cannot read a double from " + value.getClass().getName());
+        }
+    }
+    /**
+     * Returns the <CODE>String</CODE> value with the specified name.
+     * 
+     * @param name the name of the <CODE>String</CODE>
+     * @return the <CODE>String</CODE> value with the specified name; if there is no item by this name, a null value
+     *         is returned
+     * @throws BlazeMessageFormatException if this type conversion is invalid.
+     */
+    public String getString(String name) throws BlazeMessageFormatException{
+        initializeReading();
+        Object value = this.map.get(name);
+        if (value == null) {
+            return null;
+        }
+        if (value instanceof byte[]) {
+            throw new BlazeMessageFormatException("Use getBytes to read a byte array");
+        } else {
+            return value.toString();
+        }
+    }
+    /**
+     * Returns the byte array value with the specified name.
+     * 
+     * @param name the name of the byte array
+     * @return the byte array value with the specified name; if there is no item by this name, a null value is returned.
+     * @throws BlazeMessageFormatException if this type conversion is invalid.
+     */
+    public byte[] getBytes(String name) throws BlazeMessageFormatException{
+        initializeReading();
+        Object value = this.map.get(name);
+        if (value instanceof byte[]) {
+            return (byte[]) value;
+        } else {
+            throw new BlazeMessageFormatException(" cannot read a byte[] from " + value.getClass().getName());
+        }
+    }
+    /**
+     * Returns the value of the object with the specified name.
+     * <P>
+     * This method can be used to return, in objectified format, an object in the Java programming language ("Java
+     * object") that had been stored in the Map with the equivalent <CODE>setObject</CODE> method call, or its
+     * equivalent primitive <CODE>set <I>type </I></CODE> method.
+     * <P>
+     * Note that byte values are returned as <CODE>byte[]</CODE>, not <CODE>Byte[]</CODE>.
+     * 
+     * @param name the name of the Java object
+     * @return a copy of the Java object value with the specified name, in objectified format (for example, if the
+     *         object was set as an <CODE>int</CODE>, an <CODE>Integer</CODE> is returned); if there is no item by
+     *         this name, a null value is returned
+     */
+    public Object getObject(String name){
+        initializeReading();
+        return this.map.get(name);
+    }
+    /**
+     * Returns an <CODE>Enumeration</CODE> of all the names in the <CODE>BlazeMessage</CODE> object.
+     * 
+     * @return an enumeration of all the names in this <CODE>BlazeMessage</CODE>
+     */
+    public Enumeration<String> getMapNames(){
+        initializeReading();
+        return Collections.enumeration(this.map.keySet());
+    }
+    
+    /**
+     * put a key,value pair into the message
+     * @param name 
+     * @param value must be a supported primitive, or map of supported primitives
+     * @return the previous value associated with the key
+     */
+    public Object put(String name,Object value){
+        initializeWriting();  
+        if (name == null) {
+            throw new IllegalArgumentException("The name of the property cannot be null.");
+        }
+        if (name.length() == 0) {
+            throw new IllegalArgumentException("The name of the property cannot be an emprty string.");
+        }
+        checkValidObject(value);
+        return this.map.put(name, value);
+    }
+    /**
+     * Sets a <CODE>boolean</CODE> value with the specified name into the Map.
+     * 
+     * @param name the name of the <CODE>boolean</CODE>
+     * @param value the <CODE>boolean</CODE> value to set in the Map
+     */
+    public void setBoolean(String name,boolean value){
+        initializeWriting();
+        put(name, value ? Boolean.TRUE : Boolean.FALSE);
+    }
+    
+    /**
+     * Sets a <CODE>byte</CODE> value with the specified name into the Map.
+     * 
+     * @param name the name of the <CODE>byte</CODE>
+     * @param value the <CODE>byte</CODE> value to set in the Map
+     */
+    public void setByte(String name,byte value){
+        initializeWriting();
+        put(name, Byte.valueOf(value));
+    }
+    /**
+     * Sets a <CODE>short</CODE> value with the specified name into the Map.
+     * 
+     * @param name the name of the <CODE>short</CODE>
+     * @param value the <CODE>short</CODE> value to set in the Map
+     */
+    public void setShort(String name,short value){
+        initializeWriting();
+        put(name, Short.valueOf(value));
+    }
+    /**
+     * Sets a Unicode character value with the specified name into the Map.
+     * 
+     * @param name the name of the Unicode character
+     * @param value the Unicode character value to set in the Map
+     */
+    public void setChar(String name,char value){
+        initializeWriting();
+        put(name, Character.valueOf(value));
+    }
+    /**
+     * Sets an <CODE>int</CODE> value with the specified name into the Map.
+     * 
+     * @param name the name of the <CODE>int</CODE>
+     * @param value the <CODE>int</CODE> value to set in the Map
+     */
+    public void setInt(String name,int value){
+        initializeWriting();
+        put(name, Integer.valueOf(value));
+    }
+    /**
+     * Sets a <CODE>long</CODE> value with the specified name into the Map.
+     * 
+     * @param name the name of the <CODE>long</CODE>
+     * @param value the <CODE>long</CODE> value to set in the Map
+     */
+    public void setLong(String name,long value){
+        initializeWriting();
+        put(name, Long.valueOf(value));
+    }
+    /**
+     * Sets a <CODE>float</CODE> value with the specified name into the Map.
+     * 
+     * @param name the name of the <CODE>float</CODE>
+     * @param value the <CODE>float</CODE> value to set in the Map
+     */
+    public void setFloat(String name,float value){
+        initializeWriting();
+        put(name, new Float(value));
+    }
+    /**
+     * Sets a <CODE>double</CODE> value with the specified name into the Map.
+     * 
+     * @param name the name of the <CODE>double</CODE>
+     * @param value the <CODE>double</CODE> value to set in the Map
+     */
+    public void setDouble(String name,double value){
+        initializeWriting();
+        put(name, new Double(value));
+    }
+    /**
+     * Sets a <CODE>String</CODE> value with the specified name into the Map.
+     * 
+     * @param name the name of the <CODE>String</CODE>
+     * @param value the <CODE>String</CODE> value to set in the Map
+     */
+    public void setString(String name,String value){
+        initializeWriting();
+        put(name, value);
+    }
+    /**
+     * Sets a byte array value with the specified name into the Map.
+     * 
+     * @param name the name of the byte array
+     * @param value the byte array value to set in the Map; the array is copied so that the value for <CODE>name </CODE>
+     *            will not be altered by future modifications
+     * @throws NullPointerException if the name is null, or if the name is an empty string.
+     */
+    public void setBytes(String name,byte[] value){
+        initializeWriting();
+        if (value != null) {
+            put(name, value);
+        } else {
+            this.map.remove(name);
+        }
+    }
+    /**
+     * Sets a portion of the byte array value with the specified name into the Map.
+     * 
+     * @param name the name of the byte array
+     * @param value the byte array value to set in the Map
+     * @param offset the initial offset within the byte array
+     * @param length the number of bytes to use
+     */
+    public void setBytes(String name,byte[] value,int offset,int length){
+        initializeWriting();
+        byte[] data = new byte[length];
+        System.arraycopy(value, offset, data, 0, length);
+        put(name, data);
+    }
+    
+    /**
+     * Find out if the message contains a key
+     * This isn't recursive
+     * @param key 
+     * @return true if the message contains the key
+     * 
+     */    
+    public boolean containsKey(Object key){
+        initializeReading();
+        return this.map.containsKey(key.toString());
+    }
+    
+    /**
+     * Find out if the message contains a value
+     * @param value 
+     * @return true if the value exists
+     * 
+     */
+    public boolean containsValue(Object value){
+        initializeReading();
+        return this.map.containsValue(value);
+    }
+    
+    /**
+     * @return a set of Map.Entry values
+     * 
+     */
+    public Set<java.util.Map.Entry<String, Object>> entrySet(){
+        initializeReading();
+        return this.map.entrySet();
+    }
+    
+    /**
+     * Retrieve the object associated with the key
+     * @param key 
+     * @return the object
+     */
+    public Object get(Object key){
+        initializeReading();
+        return getObject(key.toString());
+    }
+    
+    /**
+     * @return true if the message is empty
+     * 
+     */
+    public boolean isEmpty(){
+        initializeReading();
+        return this.map.isEmpty();
+    }
+    
+    /**
+     * @return a Set of all the keys
+     */
+    public Set<String> keySet(){
+        initializeReading();
+        return this.map.keySet();
+    }
+    
+    /**
+     * Add all entries in a Map to the message
+     * @param t the map
+     * 
+     */
+    public void putAll(Map<? extends String, ? extends Object> t){
+        for (Map.Entry<? extends String, ? extends Object> entry : t.entrySet()) {
+            put(entry.getKey(), entry.getValue());
+        }
+        
+    }
+    
+    /**
+     * Remove a key/value pair from the message
+     * @param key 
+     * @return the value removed or null
+     * 
+     */
+    public Object remove(Object key){
+        setContent(null);
+        return this.map.remove(key.toString());
+    }
+    
+    /**
+     * @return the number of entries in the message
+     */
+    public int size(){
+        initializeReading();
+        return this.map.size();
+    }
+    
+    /**
+     * @return a Collection of the values in the message
+     */
+    public Collection<Object> values(){
+        initializeReading();
+        return this.map.values();
+    }
+    
+    private void initializeReading(){
+        loadContent();
+    }
+    
+    private void initializeWriting(){
+        setContent(null);
+    }
+    
+    protected void checkValidObject(Object value) throws IllegalArgumentException{
+        boolean valid = value instanceof Boolean || value instanceof Byte || value instanceof Short
+                || value instanceof Integer || value instanceof Long;
+        valid = valid || value instanceof Float || value instanceof Double || value instanceof Character
+                || value instanceof String || value == null || value instanceof byte[];
+        if (value instanceof Map) {
+            Map map =  (Map) value;
+            for(Object v:map.values()) {
+                checkValidObject(v);
+            }
+            valid = true;
+        }
+        if (!valid) {
+            throw new IllegalArgumentException("Not a valid message value: "+value);
+        }
+    }
+    
+    public String toString(){
+        return super.toString() + "MQBlazeMessage{ " + "map = " + this.map + " }";
+    }
+    
+    protected void copy(BlazeMessage copy) throws BlazeException{
+        storeContent();
+        copy.content = this.content;
+    }
+    
+    
+    public BlazeData getContent(){
+        return this.content;
+    }
+    
+    public void setContent(BlazeData content){
+        this.content = content;
+    }
+    
+    private void marshallMap(MapData mapData,String name,Object value) throws BlazeMessageFormatException{
+        if (value != null) {
+            if (value.getClass() == Boolean.class) {
+                BoolType type = new BoolType();
+                type.setName(name);
+                type.setValue(((Boolean) value).booleanValue());
+                mapData.addBoolType(type);
+            } else if (value.getClass() == Byte.class) {
+                ByteType type = new ByteType();
+                type.setName(name);
+                type.setValue(((Byte) value).byteValue());
+                mapData.addByteType(type);
+            } else if (value.getClass() == Character.class) {
+                CharType type = new CharType();
+                type.setName(name);
+                type.setValue(value.toString());
+                mapData.addCharType(type);
+            } else if (value.getClass() == Short.class) {
+                ShortType type = new ShortType();
+                type.setName(name);
+                type.setValue(((Short) value).shortValue());
+                mapData.addShortType(type);
+            } else if (value.getClass() == Integer.class) {
+                IntType type = new IntType();
+                type.setName(name);
+                type.setValue(((Integer) value).intValue());
+                mapData.addIntType(type);
+            } else if (value.getClass() == Long.class) {
+                LongType type = new LongType();
+                type.setName(name);
+                type.setValue(((Long) value).longValue());
+                mapData.addLongType(type);
+            } else if (value.getClass() == Float.class) {
+                FloatType type = new FloatType();
+                type.setName(name);
+                type.setValue(((Float) value).floatValue());
+                mapData.addFloatType(type);
+            } else if (value.getClass() == Double.class) {
+                DoubleType type = new DoubleType();
+                type.setName(name);
+                type.setValue(((Double) value).doubleValue());
+                mapData.addDoubleType(type);
+            } else if (value.getClass() == byte[].class) {
+                BytesType type = new BytesType();
+                type.setName(name);
+                type.setValue(new Buffer((byte[]) value));
+                mapData.addBytesType(type);
+            } else if (value.getClass() == String.class) {
+                StringType type = new StringType();
+                type.setName(name);
+                type.setValue(value.toString());
+                mapData.addStringType(type);
+            } else if (value instanceof Map) {
+                Map<String, Key> subMap = (Map<String, Key>) value;
+                for (Map.Entry<String, Key> entry : subMap.entrySet()) {
+                    MapData md = new MapData();
+                    md.setName(name);
+                    marshallMap(md, entry.getKey().toString(), entry.getValue());
+                    mapData.addMapType(md);
+                }
+            } else {
+                throw new BlazeMessageFormatException("Cannot seralize type " + value);
+            }
+        }
+    }
+    
+    Map<String, Object> unmarshall(MapData mapData){
+        Map<String, Object> result = new ConcurrentHashMap<String, Object>();
+        if (mapData.hasBoolType()) {
+            for (BoolType type : mapData.getBoolTypeList()) {
+                result.put(type.getName(), new Boolean(type.getValue()));
+            }
+        }
+        if (mapData.hasCharType()) {
+            for (CharType type : mapData.getCharTypeList()) {
+                result.put(type.getName(), new Character(type.getValue().charAt(0)));
+            }
+        }
+        if (mapData.hasShortType()) {
+            for (ShortType type : mapData.getShortTypeList()) {
+                result.put(type.getName(), new Short((short) type.getValue()));
+            }
+        }
+        if (mapData.hasIntType()) {
+            for (IntType type : mapData.getIntTypeList()) {
+                result.put(type.getName(), new Integer(type.getValue()));
+            }
+        }
+        if (mapData.hasLongType()) {
+            for (LongType type : mapData.getLongTypeList()) {
+                result.put(type.getName(), new Long(type.getValue()));
+            }
+        }
+        if (mapData.hasFloatType()) {
+            for (FloatType type : mapData.getFloatTypeList()) {
+                result.put(type.getName(), new Float(type.getValue()));
+            }
+        }
+        if (mapData.hasDoubleType()) {
+            for (DoubleType type : mapData.getDoubleTypeList()) {
+                result.put(type.getName(), new Double(type.getValue()));
+            }
+        }
+        if (mapData.hasByteType()) {
+            for (ByteType type : mapData.getByteTypeList()) {
+                result.put(type.getName(), new Byte((byte) type.getValue()));
+            }
+        }
+        if (mapData.hasStringType()) {
+            for (StringType type : mapData.getStringTypeList()) {
+                result.put(type.getName(), type.getValue());
+            }
+        }
+        if (mapData.hasBytesType()) {
+            for (BytesType type : mapData.getBytesTypeList()) {
+                result.put(type.getName(), type.getValue().toByteArray());
+            }
+        }
+        if (mapData.hasMapType()) {
+            for (MapData type : mapData.getMapTypeList()) {
+                Map<String, Object> map = unmarshall(type);
+                result.put(type.getName(), map);
+            }
+        }
+        return result;
+    }
+    
+    public void storeContent() throws BlazeMessageFormatException{
+        if (getContent() == null && !this.map.isEmpty()) {
+            BlazeData bd = new BlazeData();
+            MapData mapData = new MapData();
+            for (Map.Entry<String, Object> entry : this.map.entrySet()) {
+                marshallMap(mapData, entry.getKey().toString(), entry.getValue());
+            }
+            bd.setMapData(mapData);
+            this.content = bd;
+        }
+    }
+    
+    /**
+     * Builds the message body from data
+     * 
+     */
+    void loadContent() throws BlazeRuntimeException{
+        BlazeData data = getContent();
+        if (data != null && this.map.isEmpty()) {
+            this.map = unmarshall(data.getMapData());
+        }
+    }
+
+    /**
+     * @return the destination
+     */
+    public String getDestination() {
+        return this.destination;
+    }
+
+    /**
+     * @param destination the destination to set
+     */
+    public void setDestination(String destination) {
+        this.destination = destination;
+    }
+
+    /**
+     * The id of the channel that sent the message
+     * @return the fromId
+     */
+    public String getFromId() {
+        return this.fromId;
+    }
+
+    /**
+     * @param fromId the fromId to set
+     */
+    public void setFromId(String fromId) {
+        this.fromId = fromId;
+    }
+
+    /**
+     * @return the messageId
+     */
+    public String getMessageId() {
+        return this.messageId;
+    }
+
+    /**
+     * @param messageId the messageId to set
+     */
+    public void setMessageId(String messageId) {
+        this.messageId = messageId;
+    }
+
+    /**
+     * @return the correlationId
+     */
+    public String getCorrelationId() {
+        return this.correlationId;
+    }
+
+    /**
+     * @param correlationId the correlationId to set
+     */
+    public void setCorrelationId(String correlationId) {
+        this.correlationId = correlationId;
+    }
+
+    /**
+     * @return the timeStamp
+     */
+    public long getTimeStamp() {
+        return this.timeStamp;
+    }
+
+    /**
+     * @param timeStamp the timeStamp to set
+     */
+    public void setTimeStamp(long timeStamp) {
+        this.timeStamp = timeStamp;
+    }
+}
\ No newline at end of file

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessage.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessageFormatException.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessageFormatException.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessageFormatException.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessageFormatException.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze;
+
+/**
+ * Exception raised for message format exceptions
+ *
+ */
+public class BlazeMessageFormatException extends BlazeException{
+    private static final long serialVersionUID = 1925143462979839452L;
+
+    /**
+     * Constructor
+     * @param reason
+     */
+    public BlazeMessageFormatException(String reason) {
+        super(reason);
+    }
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessageFormatException.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeRuntimeException.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeRuntimeException.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeRuntimeException.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeRuntimeException.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze;
+
+/**
+ * Blaze RuntimeException
+ *
+ */
+public class BlazeRuntimeException extends RuntimeException {
+
+	/**
+     * 
+     */
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * Constructs a new exception with <code>null</code> as its detail message.
+     * The cause is not initialized, and may subsequently be initialized by a
+     * call to {@link #initCause}.
+     */
+    public BlazeRuntimeException() {
+    super();
+    }
+
+    /**
+     * Constructs a new exception with the specified detail message.  The
+     * cause is not initialized, and may subsequently be initialized by
+     * a call to {@link #initCause}.
+     *
+     * @param   message   the detail message. The detail message is saved for 
+     *          later retrieval by the {@link #getMessage()} method.
+     */
+    public BlazeRuntimeException(String message) {
+    super(message);
+    }
+
+    /**
+     * Constructs a new exception with the specified detail message and
+     * cause.  <p>Note that the detail message associated with
+     * <code>cause</code> is <i>not</i> automatically incorporated in
+     * this exception's detail message.
+     *
+     * @param  message the detail message (which is saved for later retrieval
+     *         by the {@link #getMessage()} method).
+     * @param  cause the cause (which is saved for later retrieval by the
+     *         {@link #getCause()} method).  (A <tt>null</tt> value is
+     *         permitted, and indicates that the cause is nonexistent or
+     *         unknown.)
+     */
+    public BlazeRuntimeException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    /**
+     * Constructs a new exception with the specified cause and a detail
+     * message of <tt>(cause==null ? null : cause.toString())</tt> (which
+     * typically contains the class and detail message of <tt>cause</tt>).
+     * This constructor is useful for exceptions that are little more than
+     * wrappers for other throwables (for example, {@link
+     * java.security.PrivilegedActionException}).
+     *
+     * @param  cause the cause (which is saved for later retrieval by the
+     *         {@link #getCause()} method).  (A <tt>null</tt> value is
+     *         permitted, and indicates that the cause is nonexistent or
+     *         unknown.)
+     */
+    public BlazeRuntimeException(Throwable cause) {
+        super(cause);
+    }
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeRuntimeException.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeTopicListener.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeTopicListener.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeTopicListener.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeTopicListener.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze;
+
+/**
+ * A listener for BlazeMessages
+ *
+ */
+public interface BlazeTopicListener {
+    
+    /**
+     * Called when a Message is available to be processes
+     * @param message
+     */
+    public void onMessage(BlazeMessage message);
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeTopicListener.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/ExceptionListener.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/ExceptionListener.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/ExceptionListener.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/ExceptionListener.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze;
+
+/**
+ * Listener for async exceptions
+ *
+ */
+public interface ExceptionListener {
+    /**
+     * Called when an Async exception has been raised
+     * @param ex
+     */
+    void onException(Exception ex);
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/ExceptionListener.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/Processor.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/Processor.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/Processor.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/Processor.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activeblaze;
+
+import org.apache.activeblaze.impl.processor.Packet;
+
+/**
+ * Processes a Packet
+ *
+ */
+public interface Processor  extends Service{
+	/**
+	 * @param packet
+	 * @throws Exception
+	 */
+	void downStream(Packet packet) throws Exception;
+	
+	/**
+	 * @param packet
+	 * @throws Exception
+	 */
+	void upStream(Packet packet) throws Exception;
+	
+	/**
+	 * Set An exception Listener
+	 * @param l
+	 */
+	void setExceptionListener(ExceptionListener l);
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/Processor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/Service.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/Service.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/Service.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/Service.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze;
+
+/**
+ * LifeCycle for an administrated object
+ *
+ */
+public interface Service {
+    /**
+     * initialize the service
+     * @return true if initialized
+     * @throws Exception
+     */
+    public boolean init() throws Exception;
+    /**
+     * Start the service
+     * @return true if started
+     * @throws Exception
+     */
+    public boolean start() throws Exception;
+    /**
+     * Stop the service
+     * @return true if stopped
+     * @throws Exception
+     */
+    public boolean stop() throws Exception;
+    /**
+     * Shutdown the Service
+     * @return true if shutdown, false if already in the shutdown state
+     * @throws Exception
+     */
+    public boolean shutDown() throws Exception;
+    /**
+     * @return true if started
+     */
+    public boolean isStarted();
+    /**
+     * @return true if stopped
+     */
+    public boolean isStopped();
+    /**
+     * @return true if initialized
+     */
+    public boolean isInitialized();
+    /**
+     * @return true if shutDown
+     */
+    public boolean isShutDown();
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/Service.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/BlazeCoordinatedGroupChannel.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/BlazeCoordinatedGroupChannel.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/BlazeCoordinatedGroupChannel.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/BlazeCoordinatedGroupChannel.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.coordinated;
+
+import org.apache.activeblaze.group.BlazeGroupChannel;
+import org.apache.activeblaze.group.Member;
+/**
+ * A <CODE>BlazeGroupChannel</CODE> enables peer-based point to point communication and maintains a coordinator
+ * (elected leader) for the group
+ * 
+ */
+public interface BlazeCoordinatedGroupChannel  extends BlazeGroupChannel{
+    /**
+     * @return true if this Channel is the coordinator of the group
+     * @throws Exception 
+     */
+    public boolean isCoordinator() throws Exception;
+    /**
+     * @return the member of the group which is the coordinator
+     * @throws Exception 
+     */
+    public Member getCoordinator() throws Exception;
+    
+    /**
+     * Add a listener for membership changes
+     * 
+     * @param l
+     * @throws Exception 
+     */
+    public void addCoordinatorChangedListener(CoordinatorChangedListener l) throws Exception;
+
+    /**
+     * Remove a listener for membership changes
+     * 
+     * @param l
+     * @throws Exception 
+     */
+    public void removeCoordinatorChangedListener(CoordinatorChangedListener l) throws Exception;
+    
+    /**
+     * @return the configuration
+     */
+    public BlazeCoordinatedGroupConfiguration getCoordinatedGroupConfiguration();
+    
+    /**
+     * waits for election in the group to finish
+     * @param timeout time to wait in milliseconds
+     * @return true if election finished
+     * @throws Exception
+     */
+    public boolean waitForElection(int timeout) throws Exception;
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/BlazeCoordinatedGroupChannel.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/BlazeCoordinatedGroupChannelFactory.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/BlazeCoordinatedGroupChannelFactory.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/BlazeCoordinatedGroupChannelFactory.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/BlazeCoordinatedGroupChannelFactory.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.coordinated;
+
+import org.apache.activeblaze.group.BlazeGroupChannelFactory;
+
+
+
+/**
+ * Factory class for creating <Code>BlazeGroupChannel</CODE>
+ */
+public class BlazeCoordinatedGroupChannelFactory extends BlazeGroupChannelFactory {
+    
+    /**
+     * Default Constructor
+     */
+    public BlazeCoordinatedGroupChannelFactory() {
+        super(new BlazeCoordinatedGroupConfiguration());
+    }
+    
+    /**
+     * Construct a factory to use the passed Configuration
+     * @param config
+     */
+    public BlazeCoordinatedGroupChannelFactory(BlazeCoordinatedGroupConfiguration config){
+        super(config);
+    }
+    
+    /**
+     * Create a GroupChannel
+     * @param name 
+     * @return the Channel
+     * @throws Exception 
+     */
+    public BlazeCoordinatedGroupChannel createChannel(String name) throws Exception {
+        BlazeCoordinatedGroupChannelImpl result = new BlazeCoordinatedGroupChannelImpl(name);
+        result.setConfiguration(getConfiguration().copy());
+        return result;
+    }
+    
+    /**
+     * @return the configuration
+     */
+    public BlazeCoordinatedGroupConfiguration getConfiguration() {
+        return (BlazeCoordinatedGroupConfiguration) super.getConfiguration();
+    }
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/BlazeCoordinatedGroupChannelFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native