You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2008/11/21 21:44:43 UTC
svn commit: r719706 [2/6] - in /activemq/activemq-blaze: ./ branches/ tags/
trunk/ trunk/src/ trunk/src/main/ trunk/src/main/java/
trunk/src/main/java/org/ trunk/src/main/java/org/apache/
trunk/src/main/java/org/apache/activeblaze/ trunk/src/main/java/...
Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BaseService.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BaseService.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BaseService.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BaseService.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Implementation of a Service
+ *
+ */
+public class BaseService implements Service {
+
+ AtomicBoolean initialialized = new AtomicBoolean();
+ AtomicBoolean started = new AtomicBoolean();
+
+ public boolean init() throws Exception {
+ return this.initialialized.compareAndSet(false, true);
+ }
+
+
+ public boolean shutDown() throws Exception {
+ if (isStarted()) {
+ stop();
+ }
+ return this.initialialized.compareAndSet(true, false);
+ }
+
+
+ public boolean start() throws Exception {
+ if (!this.initialialized.get()) {
+ init();
+ }
+ return this.started.compareAndSet(false, true);
+ }
+
+
+ public boolean stop() throws Exception {
+ if (!isInitialized()) {
+ init();
+ }
+ return this.started.compareAndSet(true, false);
+ }
+
+ public boolean isStarted() {
+ return this.started.get();
+ }
+
+ public boolean isStopped() {
+ return !isStarted();
+ }
+
+ public boolean isInitialized() {
+ return this.initialialized.get();
+ }
+
+ public boolean isShutDown() {
+ return !isInitialized();
+ }
+}
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BaseService.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannel.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannel.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannel.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannel.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze;
+
+/**
+ * <P>
+ * A <CODE>BlazeChannel</CODE> is a wrapper for reliable multicast communication
+ *
+ */
+public interface BlazeChannel extends Service {
+ /**
+ * @return the id
+ */
+ public String getId();
+
+ /**
+ * broadcast as message
+ * @param destination
+ * @param msg
+ * @throws Exception
+ */
+ public void broadcast(String destination, BlazeMessage msg) throws Exception;
+
+ /**
+ * @return the configuration
+ */
+ public BlazeConfiguration getConfiguration();
+
+ /**
+ * Add a listener for messages
+ * @param destination
+ * @param l
+ * @throws Exception
+ */
+ public void addBlazeTopicMessageListener(String destination, BlazeTopicListener l) throws Exception;
+
+ /**
+ * Remove a listener for messages
+ * @param destination
+ * @return the removed listener
+ * @throws Exception
+ */
+ public BlazeTopicListener removeBlazeTopicMessageListener(String destination) throws Exception;
+
+ /**
+ * Set an exception listener for async exceptions that can be generated
+ * by a Transport thread
+ * @param l
+ */
+ public void setExceptionListener(ExceptionListener l);
+
+}
\ No newline at end of file
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannel.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelFactory.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelFactory.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelFactory.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelFactory.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze;
+
+/**
+ * Factory class for creating <Code>BlazeChannel</CODE>
+ */
+public class BlazeChannelFactory {
+ private BlazeConfiguration configuration;
+
+ /**
+ * Default Constructor
+ */
+ public BlazeChannelFactory() {
+ this.configuration = new BlazeConfiguration();
+ }
+
+ /**
+ * Construct a factory to use the passed Configuration
+ * @param config
+ */
+ public BlazeChannelFactory(BlazeConfiguration config){
+ this.configuration=config;
+ }
+
+ /**
+ * Create a Channel
+ * @return the Channel
+ * @throws Exception
+ */
+ public BlazeChannel createChannel() throws Exception {
+ BlazeChannelImpl result = new BlazeChannelImpl();
+ result.setConfiguration(getConfiguration().copy());
+ return result;
+ }
+
+ /**
+ * @return the configuration
+ */
+ public BlazeConfiguration getConfiguration() {
+ return this.configuration;
+ }
+}
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelFactory.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,321 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze;
+
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.activeblaze.impl.destination.DestinationMatch;
+import org.apache.activeblaze.impl.processor.ChainedProcessor;
+import org.apache.activeblaze.impl.processor.CompressionProcessor;
+import org.apache.activeblaze.impl.processor.FragmentationProcessor;
+import org.apache.activeblaze.impl.processor.Packet;
+import org.apache.activeblaze.impl.reliable.ReliableFactory;
+import org.apache.activeblaze.impl.transport.BaseTransport;
+import org.apache.activeblaze.impl.transport.TransportFactory;
+import org.apache.activeblaze.util.IdGenerator;
+import org.apache.activeblaze.util.PropertyUtil;
+import org.apache.activeblaze.wire.BlazeData;
+import org.apache.activeblaze.wire.MessageType;
+import org.apache.activeblaze.wire.PacketData;
+import org.apache.activemq.protobuf.Buffer;
+import org.apache.activemq.protobuf.Message;
+
+/**
+ * <P>
+ * A <CODE>BlazeChannel</CODE> handles all client communication, either unicast, broadcast or multicast with other
+ * peers in the Blaze network
+ *
+ *
+ */
+public class BlazeChannelImpl extends ChainedProcessor implements BlazeChannel, ExceptionListener {
+ protected Map<Buffer, BlazeTopicListener> topicessageListenerMap = new ConcurrentHashMap<Buffer, BlazeTopicListener>();
+ protected final IdGenerator idGenerator = new IdGenerator();
+ protected Buffer producerId;
+ protected AtomicLong sequence = new AtomicLong();
+ protected AtomicLong session = new AtomicLong(1);
+ private Processor broadcast;
+ private BlazeConfiguration configuration = new BlazeConfiguration();
+ private String id;
+ private LinkedBlockingQueue<BlazeMessage> broadcastQueue;
+ private Thread broadcastQueueThread;
+ private Buffer managementURI;
+ private InetSocketAddress toAddress;
+
+ /**
+ * Constructor
+ *
+ * @param prev
+ * @param next
+ */
+ protected BlazeChannelImpl() {
+ this.id = this.idGenerator.generateId();
+ this.producerId = new Buffer(this.id);
+ }
+
+ public String getId() {
+ return this.id;
+ }
+
+ /**
+ * @param destination
+ * @param l
+ * @throws Exception
+ * @see org.apache.activeblaze.BlazeChannel#addBlazeTopicMessageListener(java.lang.String,
+ * org.apache.activeblaze.BlazeTopicListener)
+ */
+ public void addBlazeTopicMessageListener(String destination, BlazeTopicListener l) throws Exception {
+ Buffer key = new Buffer(destination);
+ this.topicessageListenerMap.put(key, l);
+ }
+
+ /**
+ * @param destination
+ * @param l
+ * @return
+ * @throws Exception
+ * @see org.apache.activeblaze.BlazeChannel#removeBlazeMessageListener(java.lang.String,
+ * org.apache.activeblaze.BlazeTopicListener)
+ */
+ public BlazeTopicListener removeBlazeTopicMessageListener(String destination) throws Exception {
+ Buffer key = new Buffer(destination);
+ return this.topicessageListenerMap.remove(key);
+ }
+
+ public boolean init() throws Exception {
+ boolean result = super.init();
+ if (result) {
+ this.broadcastQueue = new LinkedBlockingQueue<BlazeMessage>(getConfiguration().getMaxDispatchQueueSize());
+ String broadcastURIStr = getConfiguration().getBroadcastURI();
+ broadcastURIStr=PropertyUtil.addPropertiesToURIFromBean(broadcastURIStr, getConfiguration());
+ URI broadcastURI = new URI(broadcastURIStr);
+ this.toAddress = new InetSocketAddress(broadcastURI.getHost(), broadcastURI.getPort());
+ this.managementURI = new Buffer(new URI(getConfiguration().getManagementURI()).toString());
+ BaseTransport transport = TransportFactory.get(broadcastURI);
+ transport.setName(getId() + "-Broadcast");
+ this.broadcast = configureProcess(transport);
+ this.broadcast.init();
+ }
+ return result;
+ }
+
+ protected final void configureTransport(BaseTransport transport) throws Exception {
+ transport.setMaxPacketSize(getConfiguration().getMaxPacketSize());
+ }
+
+ protected Processor configureProcess(BaseTransport transport) throws Exception {
+ int maxPacketSize = getConfiguration().getMaxPacketSize();
+ configureTransport(transport);
+
+ CompressionProcessor result = new CompressionProcessor();
+ result.setPrev(this);
+ result.setExceptionListener(this);
+ FragmentationProcessor fp = new FragmentationProcessor();
+ fp.setMaxPacketSize(maxPacketSize);
+ result.setEnd(fp);
+ ChainedProcessor reliable = ReliableFactory.get(getConfiguration().getReliable());
+ result.setEnd(reliable);
+ result.setEnd(transport);
+ return result;
+ }
+
+ public boolean shutDown() throws Exception {
+ boolean result = super.shutDown();
+ if (result) {
+ this.broadcast.shutDown();
+ }
+ return result;
+ }
+
+ public boolean start() throws Exception {
+ boolean result = super.start();
+ if (result) {
+ if (getConfiguration().isUseDispatchThread()) {
+ Runnable runable = new Runnable() {
+ public void run() {
+ while (isStarted()) {
+ dequeueBroadcastMessages();
+ }
+ }
+ };
+ this.broadcastQueueThread = new Thread(runable, getId() + "-BroadcastQueue");
+ this.broadcastQueueThread.setDaemon(true);
+ this.broadcastQueueThread.start();
+ }
+ this.broadcast.start();
+ }
+ return result;
+ }
+
+ public boolean stop() throws Exception {
+ boolean result = super.stop();
+ if (result) {
+ if (this.broadcastQueueThread != null) {
+ this.broadcastQueueThread.interrupt();
+ try {
+ this.broadcastQueueThread.join(1000);
+ } catch (InterruptedException e) {
+ }
+ }
+ this.broadcast.stop();
+ }
+ return result;
+ }
+
+ public synchronized void broadcast(String destination, BlazeMessage msg) throws Exception {
+ msg.storeContent();
+ BlazeData blazeData = msg.getContent();
+ blazeData.setTopic(true);
+ blazeData.setDestination(new Buffer(destination));
+ PacketData packetData = getPacketData(MessageType.BLAZE_DATA, blazeData);
+ packetData.setReliable(true);
+ packetData.setFromAddress(this.managementURI);
+ Packet packet = new Packet(packetData);
+ packet.setTo(this.toAddress);
+ this.broadcast.downStream(packet);
+ }
+
+ protected synchronized PacketData getPacketData(MessageType type, Message<?> message) {
+ PacketData packetData = new PacketData();
+ packetData.setFromAddress(this.managementURI);
+ packetData.setType(type.getNumber());
+ packetData.setProducerId(this.producerId);
+ packetData.setSessionId(this.session.get());
+ long sequence = this.sequence.incrementAndGet();
+ packetData.setMessageSequence(sequence);
+ packetData.setPayload(message.toFramedBuffer());
+ StringBuilder builder = new StringBuilder(this.id.length() + 32);
+ builder.append(this.id).append(":").append(sequence);
+ packetData.setMessageId(new Buffer(builder.toString()));
+ return packetData;
+ }
+
+ public void upStream(Packet packet) throws Exception {
+ PacketData data = packet.getPacketData();
+ processData(packet.getId(), data.getCorrelationId(), data);
+ }
+
+ protected void processData(String id, Buffer correlationId, PacketData data) throws Exception {
+ MessageType type = MessageType.valueOf(data.getType());
+ if (type == MessageType.BLAZE_DATA) {
+ doProcessBlazeData(data);
+ }
+ }
+
+ public BlazeConfiguration getConfiguration() {
+ return this.configuration;
+ }
+
+ /**
+ * @param configuration
+ * @see org.apache.activeblaze.BlazeChannel#setConfiguration(org.apache.activeblaze.BlazeConfiguration)
+ */
+ public void setConfiguration(BlazeConfiguration configuration) {
+ this.configuration = configuration;
+ }
+
+ /**
+ * @param ex
+ * @see org.apache.activeblaze.ExceptionListener#onException(java.lang.Exception)
+ */
+ public void onException(Exception ex) {
+ doFireException(ex);
+ }
+
+ protected void doProcessBlazeData(PacketData data) throws Exception {
+ BlazeMessage message = buildBlazeMessage(data);
+ processBlazeMessage(message);
+ }
+
+ protected final BlazeMessage buildBlazeMessage(PacketData data) throws Exception {
+ BlazeMessage message = null;
+ if (data != null) {
+ MessageType type = MessageType.BLAZE_DATA;
+ BlazeData blazeData = (BlazeData) type.createMessage();
+ Buffer payload = data.getPayload();
+ blazeData.mergeFramed(payload);
+ String fromId = null;
+ if (data.hasProducerId()) {
+ fromId = data.getProducerId().toStringUtf8();
+ }
+ message = createMessage(fromId);
+ message.setDestination(blazeData.getDestination().toStringUtf8());
+ message.setFromId(fromId);
+ if (data.hasMessageId()) {
+ message.setMessageId(data.getMessageId().toStringUtf8());
+ }
+ if (data.hasCorrelationId()) {
+ message.setCorrelationId(data.getCorrelationId().toStringUtf8());
+ }
+ message.setTimeStamp(blazeData.getTimestamp());
+ message.setContent(blazeData);
+ }
+ return message;
+ }
+
+ protected BlazeMessage createMessage(String fromId) {
+ return new BlazeMessage();
+ }
+
+ protected void processBlazeMessage(BlazeMessage message) {
+ if (this.broadcastQueueThread == null) {
+ dispatch(message);
+ } else {
+ try {
+ this.broadcastQueue.put(message);
+ } catch (InterruptedException e) {
+ // ignore - we are stopping
+ }
+ }
+ }
+
+ protected void dequeueBroadcastMessages() {
+ BlazeMessage message = null;
+ try {
+ message = this.broadcastQueue.take();
+ } catch (InterruptedException e1) {
+ }
+ dispatch(message);
+ }
+
+ protected void dispatch(BlazeMessage message) {
+ if (message != null) {
+ Buffer destination = message.getContent().getDestination();
+ for (Map.Entry<Buffer, BlazeTopicListener> entry : this.topicessageListenerMap.entrySet()) {
+ if (DestinationMatch.isMatch(destination, entry.getKey())) {
+ entry.getValue().onMessage(message);
+ }
+ }
+ }
+ }
+
+ /**
+ * shutdown on gc
+ * @throws Throwable
+ * @see java.lang.Object#finalize()
+ */
+ protected void finalize() throws Throwable {
+ try {
+ shutDown();
+ } finally {
+ super.finalize();
+ }
+ }
+}
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeConfiguration.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeConfiguration.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeConfiguration.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeConfiguration.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze;
+
+import java.util.Map;
+import org.apache.activeblaze.util.PropertyUtil;
+
+/**
+ * Basic Configuration for a BlazeChannel
+ *
+ */
+public class BlazeConfiguration {
+ /**
+ * Max size for datagrams
+ */
+ public static final int DEFAULT_MAX_PACKET_SIZE = 4 * 1024;
+ // transport bindings
+ private int unicastPort = 0;
+ private String unicastURI = "udp://localhost:0";
+ private String broadcastURI = "mcast://224.2.2.2:9999";
+ private String managementURI = "mcast://224.2.2.2:8888";
+ // Channel internals
+ private boolean useDispatchThread = true;
+ private int maxDispatchQueueSize = 10000;
+ private int maxPacketSize = DEFAULT_MAX_PACKET_SIZE;
+ //reliability
+ private String reliable = "simple";
+
+ /**
+ * @return the unicastPort
+ */
+ public int getUnicastPort() {
+ return this.unicastPort;
+ }
+
+ /**
+ * @param unicastPort
+ * the unicastPort to set
+ */
+ public void setUnicastPort(int unicastPort) {
+ this.unicastPort = unicastPort;
+ }
+
+ /**
+ * @return the unicastURL
+ */
+ public String getUnicastURI() {
+ return this.unicastURI;
+ }
+
+ /**
+ * @param unicastURI
+ * the unicastURI to set
+ */
+ public void setUnicastURI(String unicastURI) {
+ this.unicastURI = unicastURI;
+ }
+
+ /**
+ * @return the broadcastURL
+ */
+ public String getBroadcastURI() {
+ return this.broadcastURI;
+ }
+
+ /**
+ * @param broadcastURL
+ * the broadcastURL to set
+ */
+ public void setBroadcastURI(String broadcastURL) {
+ this.broadcastURI = broadcastURL;
+ }
+
+ /**
+ * @return the useDispatchThread
+ */
+ public boolean isUseDispatchThread() {
+ return this.useDispatchThread;
+ }
+
+ /**
+ * @param useDispatchThread
+ * the useDispatchThread to set
+ */
+ public void setUseDispatchThread(boolean useDispatchThread) {
+ this.useDispatchThread = useDispatchThread;
+ }
+
+ /**
+ * @return the maxDispatchQueueSize
+ */
+ public int getMaxDispatchQueueSize() {
+ return this.maxDispatchQueueSize;
+ }
+
+ /**
+ * @param maxDispatchQueueSize
+ * the maxDispatchQueueSize to set
+ */
+ public void setMaxDispatchQueueSize(int maxDispatchQueueSize) {
+ this.maxDispatchQueueSize = maxDispatchQueueSize;
+ }
+
+ /**
+ * @return the managmentURI
+ */
+ public String getManagementURI() {
+ return this.managementURI;
+ }
+
+ /**
+ * @param managementURI
+ * the managementURI to set
+ */
+ public void setManagementURI(String managementURI) {
+ this.managementURI = managementURI;
+ }
+
+ /**
+ * @return the maxPacketSize
+ */
+ public int getMaxPacketSize() {
+ return this.maxPacketSize;
+ }
+
+ /**
+ * @param maxPacketSize
+ * the maxPacketSize to set
+ */
+ public void setMaxPacketSize(int maxPacketSize) {
+ this.maxPacketSize = maxPacketSize;
+ }
+
+ /**
+ * Copy the configuration
+ * @return a deep copy of the configuration
+ * @throws Exception
+ */
+ public final BlazeConfiguration copy() throws Exception {
+ Map<String,String>props = PropertyUtil.getProperties(this);
+ BlazeConfiguration result = newInstance();
+ PropertyUtil.setProperties(result, props);
+ return result;
+ }
+
+ protected BlazeConfiguration newInstance() {
+ return new BlazeConfiguration();
+ }
+
+ /**
+ * @return the reliable
+ */
+ public String getReliable() {
+ return this.reliable;
+ }
+
+ /**
+ * @param reliable the reliable to set
+ */
+ public void setReliable(String reliable) {
+ this.reliable = reliable;
+ }
+}
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeConfiguration.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeException.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeException.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeException.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeException.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze;
+
+/**
+ * Exception raised on internal error
+ *
+ */
+public class BlazeException extends Exception {
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1064152356749288271L;
+
+ /**
+ * Constructs a new exception with <code>null</code> as its detail message.
+ * The cause is not initialized, and may subsequently be initialized by a
+ * call to {@link #initCause}.
+ */
+ public BlazeException() {
+ super();
+ }
+
+ /**
+ * Constructs a new exception with the specified detail message. The
+ * cause is not initialized, and may subsequently be initialized by
+ * a call to {@link #initCause}.
+ *
+ * @param message the detail message. The detail message is saved for
+ * later retrieval by the {@link #getMessage()} method.
+ */
+ public BlazeException(String message) {
+ super(message);
+ }
+
+ /**
+ * Constructs a new exception with the specified detail message and
+ * cause. <p>Note that the detail message associated with
+ * <code>cause</code> is <i>not</i> automatically incorporated in
+ * this exception's detail message.
+ *
+ * @param message the detail message (which is saved for later retrieval
+ * by the {@link #getMessage()} method).
+ * @param cause the cause (which is saved for later retrieval by the
+ * {@link #getCause()} method). (A <tt>null</tt> value is
+ * permitted, and indicates that the cause is nonexistent or
+ * unknown.)
+ */
+ public BlazeException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ /**
+ * Constructs a new exception with the specified cause and a detail
+ * message of <tt>(cause==null ? null : cause.toString())</tt> (which
+ * typically contains the class and detail message of <tt>cause</tt>).
+ * This constructor is useful for exceptions that are little more than
+ * wrappers for other throwables (for example, {@link
+ * java.security.PrivilegedActionException}).
+ *
+ * @param cause the cause (which is saved for later retrieval by the
+ * {@link #getCause()} method). (A <tt>null</tt> value is
+ * permitted, and indicates that the cause is nonexistent or
+ * unknown.)
+ */
+ public BlazeException(Throwable cause) {
+ super(cause);
+ }
+
+
+}
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeException.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessage.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessage.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessage.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessage.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,918 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze;
+
+import java.security.Key;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.activeblaze.wire.BlazeData;
+import org.apache.activeblaze.wire.BoolType;
+import org.apache.activeblaze.wire.ByteType;
+import org.apache.activeblaze.wire.BytesType;
+import org.apache.activeblaze.wire.CharType;
+import org.apache.activeblaze.wire.DoubleType;
+import org.apache.activeblaze.wire.FloatType;
+import org.apache.activeblaze.wire.IntType;
+import org.apache.activeblaze.wire.LongType;
+import org.apache.activeblaze.wire.MapData;
+import org.apache.activeblaze.wire.ShortType;
+import org.apache.activeblaze.wire.StringType;
+import org.apache.activemq.protobuf.Buffer;
+
+
+/**
+ * A <CODE>BlazeMessage</CODE> object is used to send a set of name-value pairs. The names are <CODE>String</CODE>
+ * objects, and the values are primitive data types in the Java programming language. The names must have a value that
+ * is not null, and not an empty string. The entries can be accessed sequentially or randomly by name. The order of the
+ * entries is undefined. <CODE>BlazeMessage</CODE> inherits from the <CODE>Message</CODE> interface and adds a
+ * message body that contains a Map.
+ * <P>
+ * The primitive types can be read or written explicitly using methods for each type. They may also be read or written
+ * generically as objects. For instance, a call to <CODE>BlazeMessage.setInt("foo", 6)</CODE> is equivalent to
+ * <CODE> BlazeMessage.setObject("foo", new Integer(6))</CODE>. Both forms are provided, because the explicit form is
+ * convenient for static programming, and the object form is needed when types are not known at compile time.
+ * <P>
+ * <P>
+ * <CODE>BlazeMessage</CODE> objects support the following conversion table. The marked cases must be supported. The
+ * unmarked cases must throw a <CODE>JMSException</CODE>. The <CODE>String</CODE> -to-primitive conversions may
+ * throw a runtime exception if the primitive's <CODE>valueOf()</CODE> method does not accept it as a valid
+ * <CODE> String</CODE> representation of the primitive.
+ * <P>
+ * A value written as the row type can be read as the column type. <p/>
+ *
+ * <PRE>
+ * | | boolean byte short char int long float double String byte[] |----------------------------------------------------------------------
+ * |boolean | X X |byte | X X X X X |short | X X X X |char | X X |int | X X X |long | X X |float | X X X |double | X X
+ * |String | X X X X X X X X |byte[] | X |----------------------------------------------------------------------
+ * <p/>
+ * </PRE>
+ *
+ * <p/>
+ * <P>
+ * Attempting to read a null value as a primitive type must be treated as calling the primitive's corresponding
+ * <code>valueOf(String)</code> conversion method with a null value. Since <code>char</code> does not support a
+ * <code>String</code> conversion, attempting to read a null value as a <code>char</code> must throw a
+ * <code>NullPointerException</code>.
+ *
+ */
+public class BlazeMessage implements Map<String, Object>{
+ private static final String DEFAULT_TEXT_PAYLOAD = "DEFAULT_TEXT_PAYLOAD";
+ private static final String DEFAULT_BYTES_PAYLOAD = "DEFAULT_BYTES_PAYLOAD";
+ private transient Map<String, Object> map = new ConcurrentHashMap<String, Object>();
+ private transient String destination;
+ private transient String fromId;
+ private transient String messageId;
+ private transient String correlationId;
+ private transient long timeStamp;
+ private BlazeData content;
+
+ /**
+ * Default Constructor
+ */
+ public BlazeMessage() {
+ }
+
+ /**
+ * Constructor - Utility to construct a message with a text <Code>String</Code> payload
+ * @param text
+ */
+ public BlazeMessage(String text) {
+ setString(DEFAULT_TEXT_PAYLOAD,text);
+ }
+
+ /**
+ * Constructor - Utility to construct a message with a byte[] array payload
+ * @param data
+ */
+ public BlazeMessage(byte[] data) {
+ setBytes(DEFAULT_BYTES_PAYLOAD,data);
+ }
+
+ /**
+ * Utility method for setting a default <Code>String</Code> payload
+ * @param text
+ */
+ public void setText(String text) {
+ setString(DEFAULT_TEXT_PAYLOAD,text);
+ }
+
+ /**
+ * Utility method used for when a BlazeMessage is only carrying a String
+ * @return text the default text
+ * @throws Exception
+ */
+ public String getText() throws Exception {
+ return getString(DEFAULT_TEXT_PAYLOAD);
+ }
+
+ /**
+ * Utility method for setting a default <Code>String</Code> payload
+ * @param payload
+ */
+ public void setBytes(byte[] payload) {
+ setBytes(DEFAULT_BYTES_PAYLOAD,payload);
+ }
+
+ /**
+ * Utility method used for when a BlazeMessage is only carrying a String
+ * @return text the default text
+ * @throws Exception
+ */
+ public byte[] getBytes() throws Exception {
+ return getBytes(DEFAULT_BYTES_PAYLOAD);
+ }
+
+ /**
+ * @param copy2
+ * @return a copy of this message
+ * @throws BlazeException
+ */
+ public BlazeMessage copy() throws BlazeException{
+ BlazeMessage copy = new BlazeMessage();
+ copy(copy);
+ return copy;
+ }
+
+ /**
+ * clear the contents of this message
+ */
+ public void clear(){
+ this.map.clear();
+ }
+ /**
+ * Returns the <CODE>boolean</CODE> value with the specified name.
+ *
+ * @param name the name of the <CODE>boolean</CODE>
+ * @return the <CODE>boolean</CODE> value with the specified name
+ * @throws BlazeMessageFormatException if this type conversion is invalid.
+ */
+ public boolean getBoolean(String name) throws BlazeMessageFormatException{
+ initializeReading();
+ Object value = this.map.get(name);
+ if (value == null) {
+ return false;
+ }
+ if (value instanceof Boolean) {
+ return ((Boolean) value).booleanValue();
+ }
+ if (value instanceof String) {
+ return Boolean.valueOf(value.toString()).booleanValue();
+ } else {
+ throw new BlazeMessageFormatException(" cannot read a boolean from " + value.getClass().getName());
+ }
+ }
+ /**
+ * Returns the <CODE>byte</CODE> value with the specified name.
+ *
+ * @param name the name of the <CODE>byte</CODE>
+ * @return the <CODE>byte</CODE> value with the specified name
+ * @throws BlazeMessageFormatException if this type conversion is invalid.
+ */
+ public byte getByte(String name) throws BlazeMessageFormatException{
+ initializeReading();
+ Object value = this.map.get(name);
+ if (value == null) {
+ return 0;
+ }
+ if (value instanceof Byte) {
+ return ((Byte) value).byteValue();
+ }
+ if (value instanceof String) {
+ return Byte.valueOf(value.toString()).byteValue();
+ } else {
+ throw new BlazeMessageFormatException(" cannot read a byte from " + value.getClass().getName());
+ }
+ }
+ /**
+ * Returns the <CODE>short</CODE> value with the specified name.
+ *
+ * @param name the name of the <CODE>short</CODE>
+ * @return the <CODE>short</CODE> value with the specified name
+ * @throws BlazeMessageFormatException if this type conversion is invalid.
+ */
+ public short getShort(String name) throws BlazeMessageFormatException{
+ initializeReading();
+ Object value = this.map.get(name);
+ if (value == null) {
+ return 0;
+ }
+ if (value instanceof Short) {
+ return ((Short) value).shortValue();
+ }
+ if (value instanceof Byte) {
+ return ((Byte) value).shortValue();
+ }
+ if (value instanceof String) {
+ return Short.valueOf(value.toString()).shortValue();
+ } else {
+ throw new BlazeMessageFormatException(" cannot read a short from " + value.getClass().getName());
+ }
+ }
+ /**
+ * Returns the Unicode character value with the specified name.
+ *
+ * @param name the name of the Unicode character
+ * @return the Unicode character value with the specified name
+ * @throws BlazeMessageFormatException if this type conversion is invalid.
+ */
+ public char getChar(String name) throws BlazeMessageFormatException{
+ initializeReading();
+ Object value = this.map.get(name);
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ if (value instanceof Character) {
+ return ((Character) value).charValue();
+ } else {
+ throw new BlazeMessageFormatException(" cannot read a short from " + value.getClass().getName());
+ }
+ }
+ /**
+ * Returns the <CODE>int</CODE> value with the specified name.
+ *
+ * @param name the name of the <CODE>int</CODE>
+ * @return the <CODE>int</CODE> value with the specified name
+ * @throws BlazeMessageFormatException if this type conversion is invalid.
+ */
+ public int getInt(String name) throws BlazeMessageFormatException{
+ initializeReading();
+ Object value = this.map.get(name);
+ if (value == null) {
+ return 0;
+ }
+ if (value instanceof Integer) {
+ return ((Integer) value).intValue();
+ }
+ if (value instanceof Short) {
+ return ((Short) value).intValue();
+ }
+ if (value instanceof Byte) {
+ return ((Byte) value).intValue();
+ }
+ if (value instanceof String) {
+ return Integer.valueOf(value.toString()).intValue();
+ } else {
+ throw new BlazeMessageFormatException(" cannot read an int from " + value.getClass().getName());
+ }
+ }
+ /**
+ * Returns the <CODE>long</CODE> value with the specified name.
+ *
+ * @param name the name of the <CODE>long</CODE>
+ * @return the <CODE>long</CODE> value with the specified name
+ * @throws BlazeMessageFormatException if this type conversion is invalid.
+ */
+ public long getLong(String name) throws BlazeMessageFormatException{
+ initializeReading();
+ Object value = this.map.get(name);
+ if (value == null) {
+ return 0;
+ }
+ if (value instanceof Long) {
+ return ((Long) value).longValue();
+ }
+ if (value instanceof Integer) {
+ return ((Integer) value).longValue();
+ }
+ if (value instanceof Short) {
+ return ((Short) value).longValue();
+ }
+ if (value instanceof Byte) {
+ return ((Byte) value).longValue();
+ }
+ if (value instanceof String) {
+ return Long.valueOf(value.toString()).longValue();
+ } else {
+ throw new BlazeMessageFormatException(" cannot read a long from " + value.getClass().getName());
+ }
+ }
+ /**
+ * Returns the <CODE>float</CODE> value with the specified name.
+ *
+ * @param name the name of the <CODE>float</CODE>
+ * @return the <CODE>float</CODE> value with the specified name
+ * @throws BlazeMessageFormatException if this type conversion is invalid.
+ */
+ public float getFloat(String name) throws BlazeMessageFormatException{
+ initializeReading();
+ Object value = this.map.get(name);
+ if (value == null) {
+ return 0;
+ }
+ if (value instanceof Float) {
+ return ((Float) value).floatValue();
+ }
+ if (value instanceof String) {
+ return Float.valueOf(value.toString()).floatValue();
+ } else {
+ throw new BlazeMessageFormatException(" cannot read a float from " + value.getClass().getName());
+ }
+ }
+ /**
+ * Returns the <CODE>double</CODE> value with the specified name.
+ *
+ * @param name the name of the <CODE>double</CODE>
+ * @return the <CODE>double</CODE> value with the specified name
+ * @throws BlazeMessageFormatException if this type conversion is invalid.
+ */
+ public double getDouble(String name) throws BlazeMessageFormatException{
+ initializeReading();
+ Object value = this.map.get(name);
+ if (value == null) {
+ return 0;
+ }
+ if (value instanceof Double) {
+ return ((Double) value).doubleValue();
+ }
+ if (value instanceof Float) {
+ return ((Float) value).floatValue();
+ }
+ if (value instanceof String) {
+ return Float.valueOf(value.toString()).floatValue();
+ } else {
+ throw new BlazeMessageFormatException(" cannot read a double from " + value.getClass().getName());
+ }
+ }
+ /**
+ * Returns the <CODE>String</CODE> value with the specified name.
+ *
+ * @param name the name of the <CODE>String</CODE>
+ * @return the <CODE>String</CODE> value with the specified name; if there is no item by this name, a null value
+ * is returned
+ * @throws BlazeMessageFormatException if this type conversion is invalid.
+ */
+ public String getString(String name) throws BlazeMessageFormatException{
+ initializeReading();
+ Object value = this.map.get(name);
+ if (value == null) {
+ return null;
+ }
+ if (value instanceof byte[]) {
+ throw new BlazeMessageFormatException("Use getBytes to read a byte array");
+ } else {
+ return value.toString();
+ }
+ }
+ /**
+ * Returns the byte array value with the specified name.
+ *
+ * @param name the name of the byte array
+ * @return the byte array value with the specified name; if there is no item by this name, a null value is returned.
+ * @throws BlazeMessageFormatException if this type conversion is invalid.
+ */
+ public byte[] getBytes(String name) throws BlazeMessageFormatException{
+ initializeReading();
+ Object value = this.map.get(name);
+ if (value instanceof byte[]) {
+ return (byte[]) value;
+ } else {
+ throw new BlazeMessageFormatException(" cannot read a byte[] from " + value.getClass().getName());
+ }
+ }
+ /**
+ * Returns the value of the object with the specified name.
+ * <P>
+ * This method can be used to return, in objectified format, an object in the Java programming language ("Java
+ * object") that had been stored in the Map with the equivalent <CODE>setObject</CODE> method call, or its
+ * equivalent primitive <CODE>set <I>type </I></CODE> method.
+ * <P>
+ * Note that byte values are returned as <CODE>byte[]</CODE>, not <CODE>Byte[]</CODE>.
+ *
+ * @param name the name of the Java object
+ * @return a copy of the Java object value with the specified name, in objectified format (for example, if the
+ * object was set as an <CODE>int</CODE>, an <CODE>Integer</CODE> is returned); if there is no item by
+ * this name, a null value is returned
+ */
+ public Object getObject(String name){
+ initializeReading();
+ return this.map.get(name);
+ }
+ /**
+ * Returns an <CODE>Enumeration</CODE> of all the names in the <CODE>BlazeMessage</CODE> object.
+ *
+ * @return an enumeration of all the names in this <CODE>BlazeMessage</CODE>
+ */
+ public Enumeration<String> getMapNames(){
+ initializeReading();
+ return Collections.enumeration(this.map.keySet());
+ }
+
+ /**
+ * put a key,value pair into the message
+ * @param name
+ * @param value must be a supported primitive, or map of supported primitives
+ * @return the previous value associated with the key
+ */
+ public Object put(String name,Object value){
+ initializeWriting();
+ if (name == null) {
+ throw new IllegalArgumentException("The name of the property cannot be null.");
+ }
+ if (name.length() == 0) {
+ throw new IllegalArgumentException("The name of the property cannot be an emprty string.");
+ }
+ checkValidObject(value);
+ return this.map.put(name, value);
+ }
+ /**
+ * Sets a <CODE>boolean</CODE> value with the specified name into the Map.
+ *
+ * @param name the name of the <CODE>boolean</CODE>
+ * @param value the <CODE>boolean</CODE> value to set in the Map
+ */
+ public void setBoolean(String name,boolean value){
+ initializeWriting();
+ put(name, value ? Boolean.TRUE : Boolean.FALSE);
+ }
+
+ /**
+ * Sets a <CODE>byte</CODE> value with the specified name into the Map.
+ *
+ * @param name the name of the <CODE>byte</CODE>
+ * @param value the <CODE>byte</CODE> value to set in the Map
+ */
+ public void setByte(String name,byte value){
+ initializeWriting();
+ put(name, Byte.valueOf(value));
+ }
+ /**
+ * Sets a <CODE>short</CODE> value with the specified name into the Map.
+ *
+ * @param name the name of the <CODE>short</CODE>
+ * @param value the <CODE>short</CODE> value to set in the Map
+ */
+ public void setShort(String name,short value){
+ initializeWriting();
+ put(name, Short.valueOf(value));
+ }
+ /**
+ * Sets a Unicode character value with the specified name into the Map.
+ *
+ * @param name the name of the Unicode character
+ * @param value the Unicode character value to set in the Map
+ */
+ public void setChar(String name,char value){
+ initializeWriting();
+ put(name, Character.valueOf(value));
+ }
+ /**
+ * Sets an <CODE>int</CODE> value with the specified name into the Map.
+ *
+ * @param name the name of the <CODE>int</CODE>
+ * @param value the <CODE>int</CODE> value to set in the Map
+ */
+ public void setInt(String name,int value){
+ initializeWriting();
+ put(name, Integer.valueOf(value));
+ }
+ /**
+ * Sets a <CODE>long</CODE> value with the specified name into the Map.
+ *
+ * @param name the name of the <CODE>long</CODE>
+ * @param value the <CODE>long</CODE> value to set in the Map
+ */
+ public void setLong(String name,long value){
+ initializeWriting();
+ put(name, Long.valueOf(value));
+ }
+ /**
+ * Sets a <CODE>float</CODE> value with the specified name into the Map.
+ *
+ * @param name the name of the <CODE>float</CODE>
+ * @param value the <CODE>float</CODE> value to set in the Map
+ */
+ public void setFloat(String name,float value){
+ initializeWriting();
+ put(name, new Float(value));
+ }
+ /**
+ * Sets a <CODE>double</CODE> value with the specified name into the Map.
+ *
+ * @param name the name of the <CODE>double</CODE>
+ * @param value the <CODE>double</CODE> value to set in the Map
+ */
+ public void setDouble(String name,double value){
+ initializeWriting();
+ put(name, new Double(value));
+ }
+ /**
+ * Sets a <CODE>String</CODE> value with the specified name into the Map.
+ *
+ * @param name the name of the <CODE>String</CODE>
+ * @param value the <CODE>String</CODE> value to set in the Map
+ */
+ public void setString(String name,String value){
+ initializeWriting();
+ put(name, value);
+ }
+ /**
+ * Sets a byte array value with the specified name into the Map.
+ *
+ * @param name the name of the byte array
+ * @param value the byte array value to set in the Map; the array is copied so that the value for <CODE>name </CODE>
+ * will not be altered by future modifications
+ * @throws NullPointerException if the name is null, or if the name is an empty string.
+ */
+ public void setBytes(String name,byte[] value){
+ initializeWriting();
+ if (value != null) {
+ put(name, value);
+ } else {
+ this.map.remove(name);
+ }
+ }
+ /**
+ * Sets a portion of the byte array value with the specified name into the Map.
+ *
+ * @param name the name of the byte array
+ * @param value the byte array value to set in the Map
+ * @param offset the initial offset within the byte array
+ * @param length the number of bytes to use
+ */
+ public void setBytes(String name,byte[] value,int offset,int length){
+ initializeWriting();
+ byte[] data = new byte[length];
+ System.arraycopy(value, offset, data, 0, length);
+ put(name, data);
+ }
+
+ /**
+ * Find out if the message contains a key
+ * This isn't recursive
+ * @param key
+ * @return true if the message contains the key
+ *
+ */
+ public boolean containsKey(Object key){
+ initializeReading();
+ return this.map.containsKey(key.toString());
+ }
+
+ /**
+ * Find out if the message contains a value
+ * @param value
+ * @return true if the value exists
+ *
+ */
+ public boolean containsValue(Object value){
+ initializeReading();
+ return this.map.containsValue(value);
+ }
+
+ /**
+ * @return a set of Map.Entry values
+ *
+ */
+ public Set<java.util.Map.Entry<String, Object>> entrySet(){
+ initializeReading();
+ return this.map.entrySet();
+ }
+
+ /**
+ * Retrieve the object associated with the key
+ * @param key
+ * @return the object
+ */
+ public Object get(Object key){
+ initializeReading();
+ return getObject(key.toString());
+ }
+
+ /**
+ * @return true if the message is empty
+ *
+ */
+ public boolean isEmpty(){
+ initializeReading();
+ return this.map.isEmpty();
+ }
+
+ /**
+ * @return a Set of all the keys
+ */
+ public Set<String> keySet(){
+ initializeReading();
+ return this.map.keySet();
+ }
+
+ /**
+ * Add all entries in a Map to the message
+ * @param t the map
+ *
+ */
+ public void putAll(Map<? extends String, ? extends Object> t){
+ for (Map.Entry<? extends String, ? extends Object> entry : t.entrySet()) {
+ put(entry.getKey(), entry.getValue());
+ }
+
+ }
+
+ /**
+ * Remove a key/value pair from the message
+ * @param key
+ * @return the value removed or null
+ *
+ */
+ public Object remove(Object key){
+ setContent(null);
+ return this.map.remove(key.toString());
+ }
+
+ /**
+ * @return the number of entries in the message
+ */
+ public int size(){
+ initializeReading();
+ return this.map.size();
+ }
+
+ /**
+ * @return a Collection of the values in the message
+ */
+ public Collection<Object> values(){
+ initializeReading();
+ return this.map.values();
+ }
+
+ private void initializeReading(){
+ loadContent();
+ }
+
+ private void initializeWriting(){
+ setContent(null);
+ }
+
+ protected void checkValidObject(Object value) throws IllegalArgumentException{
+ boolean valid = value instanceof Boolean || value instanceof Byte || value instanceof Short
+ || value instanceof Integer || value instanceof Long;
+ valid = valid || value instanceof Float || value instanceof Double || value instanceof Character
+ || value instanceof String || value == null || value instanceof byte[];
+ if (value instanceof Map) {
+ Map map = (Map) value;
+ for(Object v:map.values()) {
+ checkValidObject(v);
+ }
+ valid = true;
+ }
+ if (!valid) {
+ throw new IllegalArgumentException("Not a valid message value: "+value);
+ }
+ }
+
+ public String toString(){
+ return super.toString() + "MQBlazeMessage{ " + "map = " + this.map + " }";
+ }
+
+ protected void copy(BlazeMessage copy) throws BlazeException{
+ storeContent();
+ copy.content = this.content;
+ }
+
+
+ public BlazeData getContent(){
+ return this.content;
+ }
+
+ public void setContent(BlazeData content){
+ this.content = content;
+ }
+
+ private void marshallMap(MapData mapData,String name,Object value) throws BlazeMessageFormatException{
+ if (value != null) {
+ if (value.getClass() == Boolean.class) {
+ BoolType type = new BoolType();
+ type.setName(name);
+ type.setValue(((Boolean) value).booleanValue());
+ mapData.addBoolType(type);
+ } else if (value.getClass() == Byte.class) {
+ ByteType type = new ByteType();
+ type.setName(name);
+ type.setValue(((Byte) value).byteValue());
+ mapData.addByteType(type);
+ } else if (value.getClass() == Character.class) {
+ CharType type = new CharType();
+ type.setName(name);
+ type.setValue(value.toString());
+ mapData.addCharType(type);
+ } else if (value.getClass() == Short.class) {
+ ShortType type = new ShortType();
+ type.setName(name);
+ type.setValue(((Short) value).shortValue());
+ mapData.addShortType(type);
+ } else if (value.getClass() == Integer.class) {
+ IntType type = new IntType();
+ type.setName(name);
+ type.setValue(((Integer) value).intValue());
+ mapData.addIntType(type);
+ } else if (value.getClass() == Long.class) {
+ LongType type = new LongType();
+ type.setName(name);
+ type.setValue(((Long) value).longValue());
+ mapData.addLongType(type);
+ } else if (value.getClass() == Float.class) {
+ FloatType type = new FloatType();
+ type.setName(name);
+ type.setValue(((Float) value).floatValue());
+ mapData.addFloatType(type);
+ } else if (value.getClass() == Double.class) {
+ DoubleType type = new DoubleType();
+ type.setName(name);
+ type.setValue(((Double) value).doubleValue());
+ mapData.addDoubleType(type);
+ } else if (value.getClass() == byte[].class) {
+ BytesType type = new BytesType();
+ type.setName(name);
+ type.setValue(new Buffer((byte[]) value));
+ mapData.addBytesType(type);
+ } else if (value.getClass() == String.class) {
+ StringType type = new StringType();
+ type.setName(name);
+ type.setValue(value.toString());
+ mapData.addStringType(type);
+ } else if (value instanceof Map) {
+ Map<String, Key> subMap = (Map<String, Key>) value;
+ for (Map.Entry<String, Key> entry : subMap.entrySet()) {
+ MapData md = new MapData();
+ md.setName(name);
+ marshallMap(md, entry.getKey().toString(), entry.getValue());
+ mapData.addMapType(md);
+ }
+ } else {
+ throw new BlazeMessageFormatException("Cannot seralize type " + value);
+ }
+ }
+ }
+
+ Map<String, Object> unmarshall(MapData mapData){
+ Map<String, Object> result = new ConcurrentHashMap<String, Object>();
+ if (mapData.hasBoolType()) {
+ for (BoolType type : mapData.getBoolTypeList()) {
+ result.put(type.getName(), new Boolean(type.getValue()));
+ }
+ }
+ if (mapData.hasCharType()) {
+ for (CharType type : mapData.getCharTypeList()) {
+ result.put(type.getName(), new Character(type.getValue().charAt(0)));
+ }
+ }
+ if (mapData.hasShortType()) {
+ for (ShortType type : mapData.getShortTypeList()) {
+ result.put(type.getName(), new Short((short) type.getValue()));
+ }
+ }
+ if (mapData.hasIntType()) {
+ for (IntType type : mapData.getIntTypeList()) {
+ result.put(type.getName(), new Integer(type.getValue()));
+ }
+ }
+ if (mapData.hasLongType()) {
+ for (LongType type : mapData.getLongTypeList()) {
+ result.put(type.getName(), new Long(type.getValue()));
+ }
+ }
+ if (mapData.hasFloatType()) {
+ for (FloatType type : mapData.getFloatTypeList()) {
+ result.put(type.getName(), new Float(type.getValue()));
+ }
+ }
+ if (mapData.hasDoubleType()) {
+ for (DoubleType type : mapData.getDoubleTypeList()) {
+ result.put(type.getName(), new Double(type.getValue()));
+ }
+ }
+ if (mapData.hasByteType()) {
+ for (ByteType type : mapData.getByteTypeList()) {
+ result.put(type.getName(), new Byte((byte) type.getValue()));
+ }
+ }
+ if (mapData.hasStringType()) {
+ for (StringType type : mapData.getStringTypeList()) {
+ result.put(type.getName(), type.getValue());
+ }
+ }
+ if (mapData.hasBytesType()) {
+ for (BytesType type : mapData.getBytesTypeList()) {
+ result.put(type.getName(), type.getValue().toByteArray());
+ }
+ }
+ if (mapData.hasMapType()) {
+ for (MapData type : mapData.getMapTypeList()) {
+ Map<String, Object> map = unmarshall(type);
+ result.put(type.getName(), map);
+ }
+ }
+ return result;
+ }
+
+ public void storeContent() throws BlazeMessageFormatException{
+ if (getContent() == null && !this.map.isEmpty()) {
+ BlazeData bd = new BlazeData();
+ MapData mapData = new MapData();
+ for (Map.Entry<String, Object> entry : this.map.entrySet()) {
+ marshallMap(mapData, entry.getKey().toString(), entry.getValue());
+ }
+ bd.setMapData(mapData);
+ this.content = bd;
+ }
+ }
+
+ /**
+ * Builds the message body from data
+ *
+ */
+ void loadContent() throws BlazeRuntimeException{
+ BlazeData data = getContent();
+ if (data != null && this.map.isEmpty()) {
+ this.map = unmarshall(data.getMapData());
+ }
+ }
+
+ /**
+ * @return the destination
+ */
+ public String getDestination() {
+ return this.destination;
+ }
+
+ /**
+ * @param destination the destination to set
+ */
+ public void setDestination(String destination) {
+ this.destination = destination;
+ }
+
+ /**
+ * The id of the channel that sent the message
+ * @return the fromId
+ */
+ public String getFromId() {
+ return this.fromId;
+ }
+
+ /**
+ * @param fromId the fromId to set
+ */
+ public void setFromId(String fromId) {
+ this.fromId = fromId;
+ }
+
+ /**
+ * @return the messageId
+ */
+ public String getMessageId() {
+ return this.messageId;
+ }
+
+ /**
+ * @param messageId the messageId to set
+ */
+ public void setMessageId(String messageId) {
+ this.messageId = messageId;
+ }
+
+ /**
+ * @return the correlationId
+ */
+ public String getCorrelationId() {
+ return this.correlationId;
+ }
+
+ /**
+ * @param correlationId the correlationId to set
+ */
+ public void setCorrelationId(String correlationId) {
+ this.correlationId = correlationId;
+ }
+
+ /**
+ * @return the timeStamp
+ */
+ public long getTimeStamp() {
+ return this.timeStamp;
+ }
+
+ /**
+ * @param timeStamp the timeStamp to set
+ */
+ public void setTimeStamp(long timeStamp) {
+ this.timeStamp = timeStamp;
+ }
+}
\ No newline at end of file
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessage.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessageFormatException.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessageFormatException.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessageFormatException.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessageFormatException.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze;
+
+/**
+ * Exception raised for message format exceptions
+ *
+ */
+public class BlazeMessageFormatException extends BlazeException{
+ private static final long serialVersionUID = 1925143462979839452L;
+
+ /**
+ * Constructor
+ * @param reason
+ */
+ public BlazeMessageFormatException(String reason) {
+ super(reason);
+ }
+}
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessageFormatException.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeRuntimeException.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeRuntimeException.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeRuntimeException.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeRuntimeException.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze;
+
+/**
+ * Blaze RuntimeException
+ *
+ */
+public class BlazeRuntimeException extends RuntimeException {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Constructs a new exception with <code>null</code> as its detail message.
+ * The cause is not initialized, and may subsequently be initialized by a
+ * call to {@link #initCause}.
+ */
+ public BlazeRuntimeException() {
+ super();
+ }
+
+ /**
+ * Constructs a new exception with the specified detail message. The
+ * cause is not initialized, and may subsequently be initialized by
+ * a call to {@link #initCause}.
+ *
+ * @param message the detail message. The detail message is saved for
+ * later retrieval by the {@link #getMessage()} method.
+ */
+ public BlazeRuntimeException(String message) {
+ super(message);
+ }
+
+ /**
+ * Constructs a new exception with the specified detail message and
+ * cause. <p>Note that the detail message associated with
+ * <code>cause</code> is <i>not</i> automatically incorporated in
+ * this exception's detail message.
+ *
+ * @param message the detail message (which is saved for later retrieval
+ * by the {@link #getMessage()} method).
+ * @param cause the cause (which is saved for later retrieval by the
+ * {@link #getCause()} method). (A <tt>null</tt> value is
+ * permitted, and indicates that the cause is nonexistent or
+ * unknown.)
+ */
+ public BlazeRuntimeException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ /**
+ * Constructs a new exception with the specified cause and a detail
+ * message of <tt>(cause==null ? null : cause.toString())</tt> (which
+ * typically contains the class and detail message of <tt>cause</tt>).
+ * This constructor is useful for exceptions that are little more than
+ * wrappers for other throwables (for example, {@link
+ * java.security.PrivilegedActionException}).
+ *
+ * @param cause the cause (which is saved for later retrieval by the
+ * {@link #getCause()} method). (A <tt>null</tt> value is
+ * permitted, and indicates that the cause is nonexistent or
+ * unknown.)
+ */
+ public BlazeRuntimeException(Throwable cause) {
+ super(cause);
+ }
+}
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeRuntimeException.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeTopicListener.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeTopicListener.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeTopicListener.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeTopicListener.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze;
+
+/**
+ * A listener for BlazeMessages
+ *
+ */
+public interface BlazeTopicListener {
+
+ /**
+ * Called when a Message is available to be processes
+ * @param message
+ */
+ public void onMessage(BlazeMessage message);
+}
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeTopicListener.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/ExceptionListener.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/ExceptionListener.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/ExceptionListener.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/ExceptionListener.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze;
+
+/**
+ * Listener for async exceptions
+ *
+ */
+public interface ExceptionListener {
+ /**
+ * Called when an Async exception has been raised
+ * @param ex
+ */
+ void onException(Exception ex);
+}
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/ExceptionListener.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/Processor.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/Processor.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/Processor.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/Processor.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activeblaze;
+
+import org.apache.activeblaze.impl.processor.Packet;
+
+/**
+ * Processes a Packet
+ *
+ */
+public interface Processor extends Service{
+ /**
+ * @param packet
+ * @throws Exception
+ */
+ void downStream(Packet packet) throws Exception;
+
+ /**
+ * @param packet
+ * @throws Exception
+ */
+ void upStream(Packet packet) throws Exception;
+
+ /**
+ * Set An exception Listener
+ * @param l
+ */
+ void setExceptionListener(ExceptionListener l);
+}
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/Processor.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/Service.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/Service.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/Service.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/Service.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze;
+
+/**
+ * LifeCycle for an administrated object
+ *
+ */
+public interface Service {
+ /**
+ * initialize the service
+ * @return true if initialized
+ * @throws Exception
+ */
+ public boolean init() throws Exception;
+ /**
+ * Start the service
+ * @return true if started
+ * @throws Exception
+ */
+ public boolean start() throws Exception;
+ /**
+ * Stop the service
+ * @return true if stopped
+ * @throws Exception
+ */
+ public boolean stop() throws Exception;
+ /**
+ * Shutdown the Service
+ * @return true if shutdown, false if already in the shutdown state
+ * @throws Exception
+ */
+ public boolean shutDown() throws Exception;
+ /**
+ * @return true if started
+ */
+ public boolean isStarted();
+ /**
+ * @return true if stopped
+ */
+ public boolean isStopped();
+ /**
+ * @return true if initialized
+ */
+ public boolean isInitialized();
+ /**
+ * @return true if shutDown
+ */
+ public boolean isShutDown();
+}
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/Service.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/BlazeCoordinatedGroupChannel.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/BlazeCoordinatedGroupChannel.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/BlazeCoordinatedGroupChannel.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/BlazeCoordinatedGroupChannel.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.coordinated;
+
+import org.apache.activeblaze.group.BlazeGroupChannel;
+import org.apache.activeblaze.group.Member;
+/**
+ * A <CODE>BlazeGroupChannel</CODE> enables peer-based point to point communication and maintains a coordinator
+ * (elected leader) for the group
+ *
+ */
+public interface BlazeCoordinatedGroupChannel extends BlazeGroupChannel{
+ /**
+ * @return true if this Channel is the coordinator of the group
+ * @throws Exception
+ */
+ public boolean isCoordinator() throws Exception;
+ /**
+ * @return the member of the group which is the coordinator
+ * @throws Exception
+ */
+ public Member getCoordinator() throws Exception;
+
+ /**
+ * Add a listener for membership changes
+ *
+ * @param l
+ * @throws Exception
+ */
+ public void addCoordinatorChangedListener(CoordinatorChangedListener l) throws Exception;
+
+ /**
+ * Remove a listener for membership changes
+ *
+ * @param l
+ * @throws Exception
+ */
+ public void removeCoordinatorChangedListener(CoordinatorChangedListener l) throws Exception;
+
+ /**
+ * @return the configuration
+ */
+ public BlazeCoordinatedGroupConfiguration getCoordinatedGroupConfiguration();
+
+ /**
+ * waits for election in the group to finish
+ * @param timeout time to wait in milliseconds
+ * @return true if election finished
+ * @throws Exception
+ */
+ public boolean waitForElection(int timeout) throws Exception;
+}
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/BlazeCoordinatedGroupChannel.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/BlazeCoordinatedGroupChannelFactory.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/BlazeCoordinatedGroupChannelFactory.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/BlazeCoordinatedGroupChannelFactory.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/BlazeCoordinatedGroupChannelFactory.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.coordinated;
+
+import org.apache.activeblaze.group.BlazeGroupChannelFactory;
+
+
+
+/**
+ * Factory class for creating <Code>BlazeGroupChannel</CODE>
+ */
+public class BlazeCoordinatedGroupChannelFactory extends BlazeGroupChannelFactory {
+
+ /**
+ * Default Constructor
+ */
+ public BlazeCoordinatedGroupChannelFactory() {
+ super(new BlazeCoordinatedGroupConfiguration());
+ }
+
+ /**
+ * Construct a factory to use the passed Configuration
+ * @param config
+ */
+ public BlazeCoordinatedGroupChannelFactory(BlazeCoordinatedGroupConfiguration config){
+ super(config);
+ }
+
+ /**
+ * Create a GroupChannel
+ * @param name
+ * @return the Channel
+ * @throws Exception
+ */
+ public BlazeCoordinatedGroupChannel createChannel(String name) throws Exception {
+ BlazeCoordinatedGroupChannelImpl result = new BlazeCoordinatedGroupChannelImpl(name);
+ result.setConfiguration(getConfiguration().copy());
+ return result;
+ }
+
+ /**
+ * @return the configuration
+ */
+ public BlazeCoordinatedGroupConfiguration getConfiguration() {
+ return (BlazeCoordinatedGroupConfiguration) super.getConfiguration();
+ }
+}
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/BlazeCoordinatedGroupChannelFactory.java
------------------------------------------------------------------------------
svn:eol-style = native