You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2009/02/02 00:35:56 UTC
svn commit: r739885 [2/5] - in /activemq/activemq-blaze/trunk/src:
main/java/org/apache/activeblaze/ main/java/org/apache/activeblaze/group/
main/java/org/apache/activeblaze/impl/transport/
main/java/org/apache/activeblaze/jms/ main/java/org/apache/act...
Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnectionMetaData.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnectionMetaData.java?rev=739885&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnectionMetaData.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnectionMetaData.java Sun Feb 1 23:35:54 2009
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.jms;
+
+import java.util.Enumeration;
+import java.util.Vector;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import javax.jms.ConnectionMetaData;
+
+/**
+ * A <CODE>ConnectionMetaData</CODE> object provides information describing
+ * the <CODE>Connection</CODE> object.
+ */
+
+public final class BlazeJmsConnectionMetaData implements ConnectionMetaData {
+
+ public static final String PROVIDER_VERSION;
+ public static final int PROVIDER_MAJOR_VERSION;
+ public static final int PROVIDER_MINOR_VERSION;
+
+ public static final BlazeJmsConnectionMetaData INSTANCE = new BlazeJmsConnectionMetaData();
+
+ static {
+ String version = null;
+ int major = 0;
+ int minor = 0;
+ try {
+ Package p = Package.getPackage("org.apache.activeblaze");
+ if (p != null) {
+ version = p.getImplementationVersion();
+ Pattern pattern = Pattern.compile("(\\d+)\\.(\\d+).*");
+ Matcher m = pattern.matcher(version);
+ if (m.matches()) {
+ major = Integer.parseInt(m.group(1));
+ minor = Integer.parseInt(m.group(2));
+ }
+ }
+ } catch (Throwable e) {
+ }
+ PROVIDER_VERSION = version;
+ PROVIDER_MAJOR_VERSION = major;
+ PROVIDER_MINOR_VERSION = minor;
+ }
+
+ private BlazeJmsConnectionMetaData() {
+ }
+
+ /**
+ * Gets the JMS API version.
+ *
+ * @return the JMS API version
+ */
+
+ public String getJMSVersion() {
+ return "1.1";
+ }
+
+ /**
+ * Gets the JMS major version number.
+ *
+ * @return the JMS API major version number
+ */
+
+ public int getJMSMajorVersion() {
+ return 1;
+ }
+
+ /**
+ * Gets the JMS minor version number.
+ *
+ * @return the JMS API minor version number
+ */
+
+ public int getJMSMinorVersion() {
+ return 1;
+ }
+
+ /**
+ * Gets the JMS provider name.
+ *
+ * @return the JMS provider name
+ */
+
+ public String getJMSProviderName() {
+ return "ActiveBlaze";
+ }
+
+ /**
+ * Gets the JMS provider version.
+ *
+ * @return the JMS provider version
+ */
+
+ public String getProviderVersion() {
+ return PROVIDER_VERSION;
+ }
+
+ /**
+ * Gets the JMS provider major version number.
+ *
+ * @return the JMS provider major version number
+ */
+
+ public int getProviderMajorVersion() {
+ return PROVIDER_MAJOR_VERSION;
+ }
+
+ /**
+ * Gets the JMS provider minor version number.
+ *
+ * @return the JMS provider minor version number
+ */
+
+ public int getProviderMinorVersion() {
+ return PROVIDER_MINOR_VERSION;
+ }
+
+ /**
+ * Gets an enumeration of the JMSX property names.
+ *
+ * @return an Enumeration of JMSX property names
+ */
+
+ public Enumeration<String> getJMSXPropertyNames() {
+ Vector<String> jmxProperties = new Vector<String>();
+ return jmxProperties.elements();
+ }
+
+
+}
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnectionMetaData.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnectionMetaData.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsDestination.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsDestination.java?rev=739885&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsDestination.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsDestination.java Sun Feb 1 23:35:54 2009
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.jms;
+
+import java.io.Externalizable;
+import java.util.Map;
+import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.Topic;
+import org.apache.activeblaze.Destination;
+import org.apache.activeblaze.jndi.JNDIStorable;
+import org.apache.activemq.protobuf.Buffer;
+
+/**
+ * Jms Destination
+ *
+ */
+public class BlazeJmsDestination extends JNDIStorable implements Externalizable, javax.jms.Destination,
+ Comparable<BlazeJmsDestination> {
+ protected transient Destination destination;
+
+ /**
+ * Constructor
+ */
+ public BlazeJmsDestination() {
+ this.destination = new Destination();
+ }
+
+ /**
+ * Constructor
+ *
+ * @param dest
+ */
+ public BlazeJmsDestination(Destination dest) {
+ this.destination = dest;
+ }
+
+ /**
+ * Constructor
+ *
+ * @param name
+ */
+ public BlazeJmsDestination(String name) {
+ this.destination.setName(new Buffer(name));
+ }
+
+ /**
+ * @return the destination
+ */
+ public Destination getDestination() {
+ return this.destination;
+ }
+
+ /**
+ * @return name of destination
+ */
+ public String getName() {
+ return this.destination.getName().toStringUtf8();
+ }
+
+ /**
+ * @return the topic
+ */
+ public boolean isTopic() {
+ return this.destination.isTopic();
+ }
+
+ /**
+ * @return the temporary
+ */
+ public boolean isTemporary() {
+ return this.destination.isTemporary();
+ }
+
+ /**
+ * @return true if a Topic
+ */
+ public boolean isQueue() {
+ return this.destination.isQueue();
+ }
+
+ /**
+ * @param props
+ * @see org.apache.activeblaze.jndi.JNDIStorable#buildFromProperties(java.util.Properties)
+ */
+ @Override
+ protected void buildFromProperties(Map<String,String> props) {
+
+ this.destination.setName(new Buffer(getProperty(props, "name", "")));
+ Boolean bool = Boolean.valueOf(getProperty(props,"topic", Boolean.TRUE.toString()));
+ this.destination.setTopic(bool.booleanValue());
+ bool = Boolean.valueOf(getProperty(props,"temporary", Boolean.FALSE.toString()));
+ this.destination.setTemporary(bool.booleanValue());
+ }
+
+ /**
+ * @param props
+ * @see org.apache.activeblaze.jndi.JNDIStorable#populateProperties(java.util.Properties)
+ */
+ @Override
+ protected void populateProperties(Map<String,String> props) {
+ props.put("name", this.destination.getName().toStringUtf8());
+ props.put("topic", Boolean.toString(this.destination.isTopic()));
+ props.put("temporary", Boolean.toString(this.destination.isTemporary()));
+ }
+
+ /**
+ *
+ * @param other
+ * the Object to be compared.
+ * @return a negative integer, zero, or a positive integer as this object is less than, equal to, or greater than
+ * the specified object.
+ * @see java.lang.Comparable#compareTo(java.lang.Object)
+ */
+ public int compareTo(BlazeJmsDestination other) {
+ if (other != null) {
+ if (this.destination.isTemporary() == other.destination.isTemporary()) {
+ return this.destination.getName().toStringUtf8().compareTo(other.destination.getName().toStringUtf8());
+ }
+ return -1;
+ }
+ return -1;
+ }
+
+ /**
+ * Transform a javax.jms.Destination to a BlazeJmsDestination
+ *
+ * @param dest
+ * @return a BlazeJmsDestination
+ * @throws JMSException
+ */
+ public static BlazeJmsDestination transform(javax.jms.Destination dest) throws JMSException {
+ if (dest == null) {
+ return null;
+ }
+ if (dest instanceof BlazeJmsDestination) {
+ return (BlazeJmsDestination) dest;
+ }
+ if (dest instanceof TemporaryQueue) {
+ return new BlazeJmsTempQueue(((TemporaryQueue) dest).getQueueName());
+ }
+ if (dest instanceof TemporaryTopic) {
+ return new BlazeJmsTempTopic(((TemporaryTopic) dest).getTopicName());
+ }
+ if (dest instanceof Queue) {
+ return new BlazeJmsQueue(((Queue) dest).getQueueName());
+ }
+ if (dest instanceof Topic) {
+ return new BlazeJmsTopic(((Topic) dest).getTopicName());
+ }
+ throw new JMSException("Could not transform the destination into a ActiveMQ destination: " + dest);
+ }
+
+ /**
+ * @param dest
+ * @return a JMS destination
+ *
+ */
+ public static BlazeJmsDestination createJmsDestination(Destination dest) {
+ if (dest.isTopic()) {
+ if (!dest.isTemporary()) {
+ return new BlazeJmsTopic(dest);
+ }
+ return new BlazeJmsTempTopic(dest);
+ }
+ if (!dest.isTemporary()) {
+ return new BlazeJmsQueue(dest);
+ }
+ return new BlazeJmsTempQueue(dest);
+ }
+}
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsDestination.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsExceptionSupport.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsExceptionSupport.java?rev=739885&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsExceptionSupport.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsExceptionSupport.java Sun Feb 1 23:35:54 2009
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.jms;
+
+import javax.jms.JMSException;
+import javax.jms.MessageEOFException;
+import javax.jms.MessageFormatException;
+
+/**
+ * Create those nice, old fashioned JMS Exceptions
+ *
+ */
+public final class BlazeJmsExceptionSupport {
+
+ private BlazeJmsExceptionSupport() {
+ }
+
+ public static JMSException create(String msg, Throwable cause) {
+ JMSException exception = new JMSException(msg);
+ exception.initCause(cause);
+ return exception;
+ }
+
+ public static JMSException create(String msg, Exception cause) {
+ JMSException exception = new JMSException(msg);
+ exception.setLinkedException(cause);
+ exception.initCause(cause);
+ return exception;
+ }
+
+ public static JMSException create(Throwable cause) {
+ if (cause instanceof JMSException) {
+ return (JMSException)cause;
+ }
+ String msg = cause.getMessage();
+ if (msg == null || msg.length() == 0) {
+ msg = cause.toString();
+ }
+ JMSException exception = new JMSException(msg);
+ exception.initCause(cause);
+ return exception;
+ }
+
+ public static JMSException create(Exception cause) {
+ if (cause instanceof JMSException) {
+ return (JMSException)cause;
+ }
+ String msg = cause.getMessage();
+ if (msg == null || msg.length() == 0) {
+ msg = cause.toString();
+ }
+ JMSException exception = new JMSException(msg);
+ exception.setLinkedException(cause);
+ exception.initCause(cause);
+ return exception;
+ }
+
+ public static MessageEOFException createMessageEOFException(Exception cause) {
+ String msg = cause.getMessage();
+ if (msg == null || msg.length() == 0) {
+ msg = cause.toString();
+ }
+ MessageEOFException exception = new MessageEOFException(msg);
+ exception.setLinkedException(cause);
+ exception.initCause(cause);
+ return exception;
+ }
+
+ public static MessageFormatException createMessageFormatException(Exception cause) {
+ String msg = cause.getMessage();
+ if (msg == null || msg.length() == 0) {
+ msg = cause.toString();
+ }
+ MessageFormatException exception = new MessageFormatException(msg);
+ exception.setLinkedException(cause);
+ exception.initCause(cause);
+ return exception;
+ }
+}
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsExceptionSupport.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsExceptionSupport.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsMessageConsumer.java?rev=739885&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsMessageConsumer.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsMessageConsumer.java Sun Feb 1 23:35:54 2009
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.jms;
+
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+
+/**
+ * implementation of a Jms Message Consumer
+ *
+ */
+public class BlazeJmsMessageConsumer implements MessageConsumer {
+ protected final BlazeJmsSession session;
+ protected final BlazeJmsDestination destination;
+ private boolean closed;
+ private MessageListener messageListener;
+ private String messageSelector = "";
+
+ protected BlazeJmsMessageConsumer(BlazeJmsSession s,BlazeJmsDestination destination) {
+ this.session=s;
+ this.destination=destination;
+ }
+ /**
+ * @see javax.jms.MessageConsumer#close()
+ */
+ public void close() {
+ this.closed=true;
+ this.session.remove(this);
+
+ }
+
+ /**
+ * @return the MessageListener
+ * @throws JMSException
+ * @see javax.jms.MessageConsumer#getMessageListener()
+ */
+ public MessageListener getMessageListener() throws JMSException {
+ checkClosed();
+ return this.messageListener;
+ }
+
+ /**
+ * @return the Message Selector
+ * @throws JMSException
+ * @see javax.jms.MessageConsumer#getMessageSelector()
+ */
+ public String getMessageSelector() throws JMSException {
+ checkClosed();
+ return this.messageSelector;
+ }
+
+ /**
+ * @return a Message or null if closed during the operation
+ * @throws JMSException
+ * @see javax.jms.MessageConsumer#receive()
+ */
+ public Message receive() throws JMSException {
+ checkClosed();
+ return null;
+ }
+
+ /**
+ * @param timeout
+ * @return a MEssage or null
+ * @throws JMSException
+ * @see javax.jms.MessageConsumer#receive(long)
+ */
+ public Message receive(long timeout) throws JMSException {
+ checkClosed();
+ return null;
+ }
+
+ /**
+ * @return a Message or null
+ * @throws JMSException
+ * @see javax.jms.MessageConsumer#receiveNoWait()
+ */
+ public Message receiveNoWait() throws JMSException {
+ checkClosed();
+ return null;
+ }
+
+ /**
+ * @param listener
+ * @throws JMSException
+ * @see javax.jms.MessageConsumer#setMessageListener(javax.jms.MessageListener)
+ */
+ public void setMessageListener(MessageListener listener) throws JMSException {
+ checkClosed();
+ this.messageListener=listener;
+
+ }
+
+ /**
+ * @param messageSelector the messageSelector to set
+ * @throws IllegalStateException
+ */
+ public void setMessageSelector(String messageSelector) throws IllegalStateException {
+ checkClosed();
+ this.messageSelector = messageSelector;
+ }
+
+ protected void checkClosed() throws IllegalStateException {
+ if (this.closed) {
+ throw new IllegalStateException("The MessageProducer is closed");
+ }
+ }
+
+}
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsMessageConsumer.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsMessageProducer.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsMessageProducer.java?rev=739885&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsMessageProducer.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsMessageProducer.java Sun Feb 1 23:35:54 2009
@@ -0,0 +1,237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.jms;
+
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.IllegalStateException;
+import javax.jms.InvalidDestinationException;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+
+/**
+ * Implementation of a Jms MessageProducer
+ *
+ */
+public class BlazeJmsMessageProducer implements MessageProducer {
+ protected final BlazeJmsSession session;
+ protected BlazeJmsDestination destination;
+ protected final boolean flexibleDestination;
+ protected int deliveryMode = DeliveryMode.NON_PERSISTENT;
+ protected int priority = Message.DEFAULT_PRIORITY;
+ protected long timeToLive = Message.DEFAULT_TIME_TO_LIVE;
+ protected boolean closed;
+ protected boolean disableMessageId;
+ protected boolean disableTimestamp;
+
+ protected BlazeJmsMessageProducer(BlazeJmsSession s, BlazeJmsDestination dest) {
+ this.session = s;
+ this.destination = dest;
+ this.flexibleDestination = dest == null;
+ }
+
+ /**
+ * Close the producer
+ *
+ * @see javax.jms.MessageProducer#close()
+ */
+ public void close() {
+ this.closed = true;
+ this.session.remove(this);
+ }
+
+ /**
+ * @return the delivery mode
+ * @throws JMSException
+ * @see javax.jms.MessageProducer#getDeliveryMode()
+ */
+ public int getDeliveryMode() throws JMSException {
+ checkClosed();
+ return this.deliveryMode;
+ }
+
+ /**
+ * @return the destination
+ * @throws JMSException
+ * @see javax.jms.MessageProducer#getDestination()
+ */
+ public Destination getDestination() throws JMSException {
+ checkClosed();
+ return this.destination;
+ }
+
+ /**
+ * @return true if disableIds is set
+ * @throws JMSException
+ * @see javax.jms.MessageProducer#getDisableMessageID()
+ */
+ public boolean getDisableMessageID() throws JMSException {
+ checkClosed();
+ return this.disableMessageId;
+ }
+
+ /**
+ * @return true if disable timestamp is set
+ * @throws JMSException
+ * @see javax.jms.MessageProducer#getDisableMessageTimestamp()
+ */
+ public boolean getDisableMessageTimestamp() throws JMSException {
+ checkClosed();
+ return this.disableTimestamp;
+ }
+
+ /**
+ * @return the priority
+ * @throws JMSException
+ * @see javax.jms.MessageProducer#getPriority()
+ */
+ public int getPriority() throws JMSException {
+ checkClosed();
+ return this.priority;
+ }
+
+ /**
+ * @return timeToLive
+ * @throws JMSException
+ * @see javax.jms.MessageProducer#getTimeToLive()
+ */
+ public long getTimeToLive() throws JMSException {
+ checkClosed();
+ return this.timeToLive;
+ }
+
+ /**
+ * @param message
+ * @throws JMSException
+ * @see javax.jms.MessageProducer#send(javax.jms.Message)
+ */
+ public void send(Message message) throws JMSException {
+ send(this.destination, message, this.deliveryMode, this.priority, this.timeToLive);
+ }
+
+ /**
+ * @param destination
+ * @param message
+ * @throws JMSException
+ * @see javax.jms.MessageProducer#send(javax.jms.Destination, javax.jms.Message)
+ */
+ public void send(Destination destination, Message message) throws JMSException {
+ send(destination, message, this.deliveryMode, this.priority, this.timeToLive);
+ }
+
+ /**
+ * @param message
+ * @param deliveryMode
+ * @param priority
+ * @param timeToLive
+ * @throws JMSException
+ * @see javax.jms.MessageProducer#send(javax.jms.Message, int, int, long)
+ */
+ public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
+ send(this.destination, message, deliveryMode, priority, timeToLive);
+ }
+
+ /**
+ * @param destination
+ * @param message
+ * @param deliveryMode
+ * @param priority
+ * @param timeToLive
+ * @throws JMSException
+ * @see javax.jms.MessageProducer#send(javax.jms.Destination, javax.jms.Message, int, int, long)
+ */
+ public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive)
+ throws JMSException {
+ setDestination(destination);
+ this.session.send(destination, message, deliveryMode, priority, timeToLive);
+ }
+
+ /**
+ * @param deliveryMode
+ * @throws JMSException
+ * @see javax.jms.MessageProducer#setDeliveryMode(int)
+ */
+ public void setDeliveryMode(int deliveryMode) throws JMSException {
+ checkClosed();
+ this.deliveryMode = deliveryMode;
+ }
+
+ /**
+ * @param value
+ * @throws JMSException
+ * @see javax.jms.MessageProducer#setDisableMessageID(boolean)
+ */
+ public void setDisableMessageID(boolean value) throws JMSException {
+ checkClosed();
+ this.disableMessageId = value;
+ }
+
+ /**
+ * @param value
+ * @throws JMSException
+ * @see javax.jms.MessageProducer#setDisableMessageTimestamp(boolean)
+ */
+ public void setDisableMessageTimestamp(boolean value) throws JMSException {
+ checkClosed();
+ this.disableTimestamp = value;
+ }
+
+ /**
+ * @param defaultPriority
+ * @throws JMSException
+ * @see javax.jms.MessageProducer#setPriority(int)
+ */
+ public void setPriority(int defaultPriority) throws JMSException {
+ checkClosed();
+ this.priority = defaultPriority;
+ }
+
+ /**
+ * @param timeToLive
+ * @throws JMSException
+ * @see javax.jms.MessageProducer#setTimeToLive(long)
+ */
+ public void setTimeToLive(long timeToLive) throws JMSException {
+ checkClosed();
+ this.timeToLive = timeToLive;
+ }
+
+ /**
+ * @param dest
+ * the destination to set
+ * @throws JMSException
+ * @throws InvalidDestinationException
+ */
+ public void setDestination(Destination dest) throws JMSException {
+ BlazeJmsDestination destination = BlazeJmsDestination.transform(dest);
+ if (destination == null) {
+ throw new InvalidDestinationException("Don't understand null destinations");
+ }
+ if (!this.flexibleDestination && !destination.equals(this.destination)) {
+ throw new UnsupportedOperationException("This producer can only send messages to: "
+ + this.destination.getName());
+ }
+ this.destination = destination;
+ }
+
+ protected void checkClosed() throws IllegalStateException {
+ if (this.closed) {
+ throw new IllegalStateException("The MessageProducer is closed");
+ }
+ }
+}
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsMessageProducer.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsMessageTransformation.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsMessageTransformation.java?rev=739885&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsMessageTransformation.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsMessageTransformation.java Sun Feb 1 23:35:54 2009
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.jms;
+
+import java.util.Enumeration;
+import javax.jms.BytesMessage;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageEOFException;
+import javax.jms.ObjectMessage;
+import javax.jms.StreamMessage;
+import javax.jms.TextMessage;
+import org.apache.activeblaze.jms.message.BlazeJmsBytesMessage;
+import org.apache.activeblaze.jms.message.BlazeJmsMapMessage;
+import org.apache.activeblaze.jms.message.BlazeJmsMessage;
+import org.apache.activeblaze.jms.message.BlazeJmsObjectMessage;
+import org.apache.activeblaze.jms.message.BlazeJmsStreamMessage;
+import org.apache.activeblaze.jms.message.BlazeJmsTextMessage;
+
+/**
+ * A helper class for converting normal JMS interfaces into ActiveMQ specific ones.
+ *
+ * @version $Revision: 1.1 $
+ */
+public final class BlazeJmsMessageTransformation {
+ private BlazeJmsMessageTransformation() {
+ }
+
+ /**
+ * @param dest
+ * @return a BlazeJmsDestination
+ * @throws JMSException
+ */
+ private static BlazeJmsDestination transformDestination(Destination dest) throws JMSException {
+ return BlazeJmsDestination.transform(dest);
+ }
+
+ /**
+ * @param message
+ * @return a BlazeJmsDestination
+ * @throws JMSException
+ */
+ public static BlazeJmsMessage transformMessage(Message message) throws JMSException {
+ if (message instanceof BlazeJmsMessage) {
+ return (BlazeJmsMessage) message;
+ }
+ BlazeJmsMessage transformedMessage = null;
+ if (message instanceof BytesMessage) {
+ BytesMessage bytesMsg = (BytesMessage) message;
+ bytesMsg.reset();
+ BlazeJmsBytesMessage msg = new BlazeJmsBytesMessage();
+ try {
+ for (;;) {
+ // Reads a byte from the message stream until the stream
+ // is empty
+ msg.writeByte(bytesMsg.readByte());
+ }
+ } catch (MessageEOFException e) {
+ // if an end of message stream as expected
+ } catch (JMSException e) {
+ }
+ transformedMessage = msg;
+ } else if (message instanceof MapMessage) {
+ MapMessage mapMsg = (MapMessage) message;
+ BlazeJmsMapMessage msg = new BlazeJmsMapMessage();
+ Enumeration iter = mapMsg.getMapNames();
+ while (iter.hasMoreElements()) {
+ String name = iter.nextElement().toString();
+ msg.setObject(name, mapMsg.getObject(name));
+ }
+ transformedMessage = msg;
+ } else if (message instanceof ObjectMessage) {
+ ObjectMessage objMsg = (ObjectMessage) message;
+ BlazeJmsObjectMessage msg = new BlazeJmsObjectMessage();
+ msg.setObject(objMsg.getObject());
+ msg.storeContent();
+ transformedMessage = msg;
+ } else if (message instanceof StreamMessage) {
+ StreamMessage streamMessage = (StreamMessage) message;
+ streamMessage.reset();
+ BlazeJmsStreamMessage msg = new BlazeJmsStreamMessage();
+ Object obj = null;
+ try {
+ while ((obj = streamMessage.readObject()) != null) {
+ msg.writeObject(obj);
+ }
+ } catch (MessageEOFException e) {
+ // if an end of message stream as expected
+ } catch (JMSException e) {
+ }
+ transformedMessage = msg;
+ } else if (message instanceof TextMessage) {
+ TextMessage textMsg = (TextMessage) message;
+ BlazeJmsTextMessage msg = new BlazeJmsTextMessage();
+ msg.setText(textMsg.getText());
+ transformedMessage = msg;
+ } else {
+ transformedMessage = new BlazeJmsMessage();
+ }
+ copyProperties(message, transformedMessage);
+ return transformedMessage;
+ }
+
+ /**
+ * Copies the standard JMS and user defined properties from the givem message to the specified message
+ *
+ * @param fromMessage
+ * the message to take the properties from
+ * @param toMessage
+ * the message to add the properties to
+ * @throws JMSException
+ */
+ public static void copyProperties(Message fromMessage, Message toMessage) throws JMSException {
+ toMessage.setJMSMessageID(fromMessage.getJMSMessageID());
+ toMessage.setJMSCorrelationID(fromMessage.getJMSCorrelationID());
+ toMessage.setJMSReplyTo(transformDestination(fromMessage.getJMSReplyTo()));
+ toMessage.setJMSDestination(transformDestination(fromMessage.getJMSDestination()));
+ toMessage.setJMSDeliveryMode(fromMessage.getJMSDeliveryMode());
+ toMessage.setJMSRedelivered(fromMessage.getJMSRedelivered());
+ toMessage.setJMSType(fromMessage.getJMSType());
+ toMessage.setJMSExpiration(fromMessage.getJMSExpiration());
+ toMessage.setJMSPriority(fromMessage.getJMSPriority());
+ toMessage.setJMSTimestamp(fromMessage.getJMSTimestamp());
+ Enumeration propertyNames = fromMessage.getPropertyNames();
+ while (propertyNames.hasMoreElements()) {
+ String name = propertyNames.nextElement().toString();
+ Object obj = fromMessage.getObjectProperty(name);
+ toMessage.setObjectProperty(name, obj);
+ }
+ }
+
+
+}
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsMessageTransformation.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsMessageTransformation.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsQueue.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsQueue.java?rev=739885&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsQueue.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsQueue.java Sun Feb 1 23:35:54 2009
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.jms;
+
+import javax.jms.Queue;
+import org.apache.activeblaze.Destination;
+
+/**
+ * Queue implementation
+ *
+ */
+public class BlazeJmsQueue extends BlazeJmsDestination implements Queue {
+ /**
+ * Constructor
+ */
+ public BlazeJmsQueue(){
+ this("");
+ }
+
+ /**
+ * Constructor
+ * @param dest
+ */
+ public BlazeJmsQueue(Destination dest) {
+ super(dest);
+ }
+
+ /**
+ * Constructor
+ * @param name
+ */
+ public BlazeJmsQueue(String name){
+ super(name);
+ this.destination.setTopic(false);
+ }
+
+ /**
+ * @return name
+ * @see javax.jms.Queue#getQueueName()
+ */
+ public String getQueueName() {
+ return getName();
+ }
+}
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsQueue.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsQueueReceiver.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsQueueReceiver.java?rev=739885&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsQueueReceiver.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsQueueReceiver.java Sun Feb 1 23:35:54 2009
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.jms;
+
+import javax.jms.IllegalStateException;
+import javax.jms.Queue;
+import javax.jms.QueueReceiver;
+
+/**
+ * Implementation of a Jms QueueReceiver
+ *
+ */
+public class BlazeJmsQueueReceiver extends BlazeJmsMessageConsumer implements QueueReceiver {
+
+ /**
+ * Constructor
+ * @param s
+ */
+ protected BlazeJmsQueueReceiver(BlazeJmsSession s,BlazeJmsDestination d) {
+ super(s,d);
+ }
+
+ /**
+ * @return the Queue
+ * @throws IllegalStateException
+ * @see javax.jms.QueueReceiver#getQueue()
+ */
+ public Queue getQueue() throws IllegalStateException {
+ checkClosed();
+ return (Queue) this.destination;
+ }
+}
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsQueueReceiver.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsQueueSender.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsQueueSender.java?rev=739885&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsQueueSender.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsQueueSender.java Sun Feb 1 23:35:54 2009
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.jms;
+
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Queue;
+import javax.jms.QueueSender;
+
+/**
+ * Implementation of a Queue Sender
+ *
+ */
+public class BlazeJmsQueueSender extends BlazeJmsMessageProducer implements QueueSender {
+ /**
+ * Constructor
+ *
+ * @param s
+ * @param dest
+ */
+ protected BlazeJmsQueueSender(BlazeJmsSession s, BlazeJmsDestination dest) {
+ super(s, dest);
+ }
+
+ /**
+ * @return the Queue
+ * @throws IllegalStateException
+ * @see javax.jms.QueueSender#getQueue()
+ */
+ public Queue getQueue() throws IllegalStateException {
+ checkClosed();
+ return (Queue) this.destination;
+ }
+
+ /**
+ * @param queue
+ * @param message
+ * @throws JMSException
+ * @see javax.jms.QueueSender#send(javax.jms.Queue, javax.jms.Message)
+ */
+ public void send(Queue queue, Message message) throws JMSException {
+ super.send(queue, message);
+ }
+
+ /**
+ * @param queue
+ * @param message
+ * @param deliveryMode
+ * @param priority
+ * @param timeToLive
+ * @throws JMSException
+ * @see javax.jms.QueueSender#send(javax.jms.Queue, javax.jms.Message, int, int, long)
+ */
+ public void send(Queue queue, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
+ super.send(message, deliveryMode, priority, timeToLive);
+ }
+}
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsQueueSender.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsQueueSession.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsQueueSession.java?rev=739885&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsQueueSession.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsQueueSession.java Sun Feb 1 23:35:54 2009
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.jms;
+
+import javax.jms.Destination;
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.TemporaryTopic;
+import javax.jms.Topic;
+import javax.jms.TopicPublisher;
+import javax.jms.TopicSubscriber;
+
+/**
+ * Jms QueueSession implementation
+ *
+ */
+public class BlazeJmsQueueSession extends BlazeJmsSession {
+ /**
+ * Constructor
+ * @param connection
+ * @param acknowledgementMode
+ */
+ protected BlazeJmsQueueSession(BlazeJmsConnection connection, int acknowledgementMode) {
+ super(connection, acknowledgementMode);
+ }
+
+ public MessageConsumer createConsumer(Destination destination) throws JMSException {
+ if (destination instanceof Topic) {
+ throw new IllegalStateException("Operation not supported by a QueueSession");
+ }
+ return super.createConsumer(destination);
+ }
+
+ /**
+ * @param destination
+ * @param messageSelector
+ * @return
+ * @throws JMSException
+ * @see javax.jms.Session#createConsumer(javax.jms.Destination, java.lang.String)
+ */
+ public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException {
+ if (destination instanceof Topic) {
+ throw new IllegalStateException("Operation not supported by a QueueSession");
+ }
+ return super.createConsumer(destination, messageSelector);
+ }
+
+ /**
+ * @param destination
+ * @param messageSelector
+ * @param NoLocal
+ * @return
+ * @throws JMSException
+ * @see javax.jms.Session#createConsumer(javax.jms.Destination, java.lang.String, boolean)
+ */
+ public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean NoLocal)
+ throws JMSException {
+ throw new IllegalStateException("Operation not supported by a QueueSession");
+ }
+
+ /**
+ * @param topic
+ * @param name
+ * @return
+ * @throws JMSException
+ * @see javax.jms.Session#createDurableSubscriber(javax.jms.Topic, java.lang.String)
+ */
+ public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException {
+ throw new IllegalStateException("Operation not supported by a QueueSession");
+ }
+
+ /**
+ * @param topic
+ * @param name
+ * @param messageSelector
+ * @param noLocal
+ * @return
+ * @throws IllegalStateException
+ * @throws JMSException
+ * @see javax.jms.Session#createDurableSubscriber(javax.jms.Topic, java.lang.String, java.lang.String, boolean)
+ */
+ public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal)
+ throws IllegalStateException {
+ throw new IllegalStateException("Operation not supported by a QueueSession");
+ }
+
+ /**
+ * @param destination
+ * @return
+ * @throws JMSException
+ * @see javax.jms.Session#createProducer(javax.jms.Destination)
+ */
+ public MessageProducer createProducer(Destination destination) throws JMSException {
+ if (destination instanceof Topic) {
+ throw new IllegalStateException("Operation not supported by a QueueSession");
+ }
+ return super.createProducer(destination);
+ }
+
+ /**
+ * @return
+ * @throws JMSException
+ * @see javax.jms.Session#createTemporaryTopic()
+ */
+ public TemporaryTopic createTemporaryTopic() throws JMSException {
+ throw new IllegalStateException("Operation not supported by a QueueSession");
+ }
+
+ /**
+ * @param topicName
+ * @return
+ * @throws JMSException
+ * @see javax.jms.Session#createTopic(java.lang.String)
+ */
+ public Topic createTopic(String topicName) throws JMSException {
+ throw new IllegalStateException("Operation not supported by a QueueSession");
+ }
+
+ /**
+ * @param name
+ * @throws JMSException
+ * @see javax.jms.Session#unsubscribe(java.lang.String)
+ */
+ public void unsubscribe(String name) throws JMSException {
+ throw new IllegalStateException("Operation not supported by a QueueSession");
+ }
+
+ /**
+ * @param topic
+ * @return
+ * @throws JMSException
+ * @see javax.jms.TopicSession#createPublisher(javax.jms.Topic)
+ */
+ public TopicPublisher createPublisher(Topic topic) throws JMSException {
+ throw new IllegalStateException("Operation not supported by a QueueSession");
+ }
+
+ /**
+ * @param topic
+ * @return
+ * @throws JMSException
+ * @see javax.jms.TopicSession#createSubscriber(javax.jms.Topic)
+ */
+ public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
+ throw new IllegalStateException("Operation not supported by a QueueSession");
+ }
+
+ /**
+ * @param topic
+ * @param messageSelector
+ * @param noLocal
+ * @return
+ * @throws JMSException
+ * @see javax.jms.TopicSession#createSubscriber(javax.jms.Topic, java.lang.String, boolean)
+ */
+ public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException {
+ throw new IllegalStateException("Operation not supported by a QueueSession");
+ }
+}
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsQueueSession.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsSession.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsSession.java?rev=739885&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsSession.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsSession.java Sun Feb 1 23:35:54 2009
@@ -0,0 +1,532 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.jms;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import javax.jms.BytesMessage;
+import javax.jms.Destination;
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.QueueReceiver;
+import javax.jms.QueueSender;
+import javax.jms.QueueSession;
+import javax.jms.Session;
+import javax.jms.StreamMessage;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicPublisher;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
+import org.apache.activeblaze.jms.message.BlazeJmsBytesMessage;
+import org.apache.activeblaze.jms.message.BlazeJmsMapMessage;
+import org.apache.activeblaze.jms.message.BlazeJmsMessage;
+import org.apache.activeblaze.jms.message.BlazeJmsObjectMessage;
+import org.apache.activeblaze.jms.message.BlazeJmsStreamMessage;
+import org.apache.activeblaze.jms.message.BlazeJmsTextMessage;
+
+/**
+ * JMS Session implementation
+ *
+ */
+public class BlazeJmsSession implements Session, QueueSession, TopicSession {
+ private final BlazeJmsConnection connection;
+ private final int acknowledgementMode;
+ private final List<MessageProducer> producers = new CopyOnWriteArrayList<MessageProducer>();
+ private final List<MessageConsumer> consumers = new CopyOnWriteArrayList<MessageConsumer>();
+ private MessageListener messageListener;
+ private boolean closed;
+
+ /**
+ * Constructor
+ *
+ * @param connection
+ * @param acknowledgementMode
+ */
+ protected BlazeJmsSession(BlazeJmsConnection connection, int acknowledgementMode) {
+ this.connection = connection;
+ this.acknowledgementMode = acknowledgementMode;
+ }
+
+ /**
+ * @throws JMSException
+ * @see javax.jms.Session#close()
+ */
+ public void close() throws JMSException {
+ this.closed = true;
+ this.connection.removeSession(this);
+ for (MessageConsumer c : this.consumers) {
+ c.close();
+ }
+ this.consumers.clear();
+ }
+
+ /**
+ * @throws JMSException
+ * @see javax.jms.Session#commit()
+ */
+ public void commit() throws JMSException {
+ checkClosed();
+ }
+
+ /**
+ * @param queue
+ * @return QueueBrowser
+ * @throws JMSException
+ * @see javax.jms.Session#createBrowser(javax.jms.Queue)
+ */
+ public QueueBrowser createBrowser(Queue queue) throws JMSException {
+ checkClosed();
+ return null;
+ }
+
+ /**
+ * @param queue
+ * @param messageSelector
+ * @return QueueBrowser
+ * @throws JMSException
+ * @see javax.jms.Session#createBrowser(javax.jms.Queue, java.lang.String)
+ */
+ public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException {
+ checkClosed();
+ return null;
+ }
+
+ /**
+ * @return BytesMessage
+ * @throws IllegalStateException
+ * @see javax.jms.Session#createBytesMessage()
+ */
+ public BytesMessage createBytesMessage() throws IllegalStateException {
+ checkClosed();
+ return new BlazeJmsBytesMessage();
+ }
+
+ /**
+ * @param destination
+ * @return a MessageConsumer
+ * @throws JMSException
+ * @see javax.jms.Session#createConsumer(javax.jms.Destination)
+ */
+ public MessageConsumer createConsumer(Destination destination) throws JMSException {
+ checkClosed();
+ return new BlazeJmsMessageConsumer(this, BlazeJmsDestination.transform(destination));
+ }
+
+ /**
+ * @param destination
+ * @param messageSelector
+ * @return MessageConsumer
+ * @throws JMSException
+ * @see javax.jms.Session#createConsumer(javax.jms.Destination, java.lang.String)
+ */
+ public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException {
+ checkClosed();
+ BlazeJmsMessageConsumer result = new BlazeJmsMessageConsumer(this, BlazeJmsDestination.transform(destination));
+ result.setMessageSelector(messageSelector);
+ return result;
+ }
+
+ /**
+ * @param destination
+ * @param messageSelector
+ * @param NoLocal
+ * @return the MessageConsumer
+ * @throws JMSException
+ * @see javax.jms.Session#createConsumer(javax.jms.Destination, java.lang.String, boolean)
+ */
+ public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean NoLocal)
+ throws JMSException {
+ checkClosed();
+ BlazeJmsDestination dest = BlazeJmsDestination.transform(destination);
+ BlazeJmsTopicSubscriber result = new BlazeJmsTopicSubscriber(this, dest, "", false, NoLocal);
+ result.setMessageSelector(messageSelector);
+ return result;
+ }
+
+ /**
+ * @param topic
+ * @param name
+ * @return a TopicSubscriber
+ * @throws JMSException
+ * @see javax.jms.Session#createDurableSubscriber(javax.jms.Topic, java.lang.String)
+ */
+ public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException {
+ checkClosed();
+ BlazeJmsDestination dest = BlazeJmsDestination.transform(topic);
+ BlazeJmsTopicSubscriber result = new BlazeJmsTopicSubscriber(this, dest, "", true, false);
+ return result;
+ }
+
+ /**
+ * @param topic
+ * @param name
+ * @param messageSelector
+ * @param noLocal
+ * @return TopicSubscriber
+ * @throws JMSException
+ * @see javax.jms.Session#createDurableSubscriber(javax.jms.Topic, java.lang.String, java.lang.String, boolean)
+ */
+ public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal)
+ throws JMSException {
+ checkClosed();
+ BlazeJmsDestination dest = BlazeJmsDestination.transform(topic);
+ BlazeJmsTopicSubscriber result = new BlazeJmsTopicSubscriber(this, dest, "", true, noLocal);
+ result.setMessageSelector(messageSelector);
+ return result;
+ }
+
+ /**
+ * @return MapMessage
+ * @throws IllegalStateException
+ * @see javax.jms.Session#createMapMessage()
+ */
+ public MapMessage createMapMessage() throws IllegalStateException {
+ checkClosed();
+ return new BlazeJmsMapMessage();
+ }
+
+ /**
+ * @return Message
+ * @throws IllegalStateException
+ * @see javax.jms.Session#createMessage()
+ */
+ public Message createMessage() throws IllegalStateException {
+ checkClosed();
+ return new BlazeJmsMessage();
+ }
+
+ /**
+ * @return ObjectMessage
+ * @throws IllegalStateException
+ * @see javax.jms.Session#createObjectMessage()
+ */
+ public ObjectMessage createObjectMessage() throws IllegalStateException {
+ checkClosed();
+ return new BlazeJmsObjectMessage();
+ }
+
+ /**
+ * @param object
+ * @return ObjectMessage
+ * @throws JMSException
+ * @see javax.jms.Session#createObjectMessage(java.io.Serializable)
+ */
+ public ObjectMessage createObjectMessage(Serializable object) throws JMSException {
+ checkClosed();
+ ObjectMessage result = createObjectMessage();
+ result.setObject(object);
+ return result;
+ }
+
+ /**
+ * @param destination
+ * @return MessageProducer
+ * @throws JMSException
+ * @see javax.jms.Session#createProducer(javax.jms.Destination)
+ */
+ public MessageProducer createProducer(Destination destination) throws JMSException {
+ checkClosed();
+ BlazeJmsDestination dest = BlazeJmsDestination.transform(destination);
+ BlazeJmsMessageProducer result = new BlazeJmsMessageProducer(this, dest);
+ return result;
+ }
+
+ /**
+ * @param queueName
+ * @return Queue
+ * @throws JMSException
+ * @see javax.jms.Session#createQueue(java.lang.String)
+ */
+ public Queue createQueue(String queueName) throws JMSException {
+ checkClosed();
+ return new BlazeJmsQueue(queueName);
+ }
+
+ /**
+ * @return StreamMessage
+ * @throws JMSException
+ * @see javax.jms.Session#createStreamMessage()
+ */
+ public StreamMessage createStreamMessage() throws JMSException {
+ checkClosed();
+ return new BlazeJmsStreamMessage();
+ }
+
+ /**
+ * @return TemporaryQueue
+ * @throws JMSException
+ * @see javax.jms.Session#createTemporaryQueue()
+ */
+ public TemporaryQueue createTemporaryQueue() throws JMSException {
+ checkClosed();
+ return new BlazeJmsTempQueue(this.connection.tempDestinationGenerator.generateId());
+ }
+
+ /**
+ * @return TemporaryTopic
+ * @throws JMSException
+ * @see javax.jms.Session#createTemporaryTopic()
+ */
+ public TemporaryTopic createTemporaryTopic() throws JMSException {
+ checkClosed();
+ return new BlazeJmsTempTopic(this.connection.tempDestinationGenerator.generateId());
+ }
+
+ /**
+ * @return TextMessage
+ * @throws JMSException
+ * @see javax.jms.Session#createTextMessage()
+ */
+ public TextMessage createTextMessage() throws JMSException {
+ checkClosed();
+ return new BlazeJmsTextMessage();
+ }
+
+ /**
+ * @param text
+ * @return TextMessage
+ * @throws JMSException
+ * @see javax.jms.Session#createTextMessage(java.lang.String)
+ */
+ public TextMessage createTextMessage(String text) throws JMSException {
+ checkClosed();
+ BlazeJmsTextMessage result = new BlazeJmsTextMessage();
+ result.setText(text);
+ return result;
+ }
+
+ /**
+ * @param topicName
+ * @return Topic
+ * @throws JMSException
+ * @see javax.jms.Session#createTopic(java.lang.String)
+ */
+ public Topic createTopic(String topicName) throws JMSException {
+ checkClosed();
+ return new BlazeJmsTopic(topicName);
+ }
+
+ /**
+ * @return acknowledgeMode
+ * @throws JMSException
+ * @see javax.jms.Session#getAcknowledgeMode()
+ */
+ public int getAcknowledgeMode() throws JMSException {
+ checkClosed();
+ return this.acknowledgementMode;
+ }
+
+ /**
+ * @return
+ * @throws JMSException
+ * @see javax.jms.Session#getMessageListener()
+ */
+ public MessageListener getMessageListener() throws JMSException {
+ checkClosed();
+ return this.messageListener;
+ }
+
+ /**
+ * @return
+ * @throws JMSException
+ * @see javax.jms.Session#getTransacted()
+ */
+ public boolean getTransacted() throws JMSException {
+ checkClosed();
+ return this.acknowledgementMode == Session.SESSION_TRANSACTED;
+ }
+
+ /**
+ * @throws JMSException
+ * @see javax.jms.Session#recover()
+ */
+ public void recover() throws JMSException {
+ checkClosed();
+ }
+
+ /**
+ * @throws JMSException
+ * @see javax.jms.Session#rollback()
+ */
+ public void rollback() throws JMSException {
+ checkClosed();
+ }
+
+ /**
+ *
+ * @see javax.jms.Session#run()
+ */
+ public void run() {
+ // TODO Auto-generated method stub
+ }
+
+ /**
+ * @param listener
+ * @throws JMSException
+ * @see javax.jms.Session#setMessageListener(javax.jms.MessageListener)
+ */
+ public void setMessageListener(MessageListener listener) throws JMSException {
+ checkClosed();
+ this.messageListener = listener;
+ }
+
+ /**
+ * @param name
+ * @throws JMSException
+ * @see javax.jms.Session#unsubscribe(java.lang.String)
+ */
+ public void unsubscribe(String name) throws JMSException {
+ checkClosed();
+ }
+
+ /**
+ * @param queue
+ * @return
+ * @throws JMSException
+ * @see javax.jms.QueueSession#createReceiver(javax.jms.Queue)
+ */
+ public QueueReceiver createReceiver(Queue queue) throws JMSException {
+ checkClosed();
+ BlazeJmsDestination dest = BlazeJmsDestination.transform(queue);
+ BlazeJmsQueueReceiver result = new BlazeJmsQueueReceiver(this, dest);
+ return result;
+ }
+
+ /**
+ * @param queue
+ * @param messageSelector
+ * @return QueueReceiver
+ * @throws JMSException
+ * @see javax.jms.QueueSession#createReceiver(javax.jms.Queue, java.lang.String)
+ */
+ public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException {
+ checkClosed();
+ BlazeJmsDestination dest = BlazeJmsDestination.transform(queue);
+ BlazeJmsQueueReceiver result = new BlazeJmsQueueReceiver(this, dest);
+ result.setMessageSelector(messageSelector);
+ return result;
+ }
+
+ /**
+ * @param queue
+ * @return QueueSender
+ * @throws JMSException
+ * @see javax.jms.QueueSession#createSender(javax.jms.Queue)
+ */
+ public QueueSender createSender(Queue queue) throws JMSException {
+ checkClosed();
+ BlazeJmsDestination dest = BlazeJmsDestination.transform(queue);
+ BlazeJmsQueueSender result = new BlazeJmsQueueSender(this, dest);
+ return result;
+ }
+
+ /**
+ * @param topic
+ * @return TopicPublisher
+ * @throws JMSException
+ * @see javax.jms.TopicSession#createPublisher(javax.jms.Topic)
+ */
+ public TopicPublisher createPublisher(Topic topic) throws JMSException {
+ checkClosed();
+ BlazeJmsDestination dest = BlazeJmsDestination.transform(topic);
+ BlazeJmsTopicPublisher result = new BlazeJmsTopicPublisher(this, dest);
+ return result;
+ }
+
+ /**
+ * @param topic
+ * @return TopicSubscriber
+ * @throws JMSException
+ * @see javax.jms.TopicSession#createSubscriber(javax.jms.Topic)
+ */
+ public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
+ checkClosed();
+ BlazeJmsDestination dest = BlazeJmsDestination.transform(topic);
+ BlazeJmsTopicSubscriber result = new BlazeJmsTopicSubscriber(this, dest, "", false, false);
+ return result;
+ }
+
+ /**
+ * @param topic
+ * @param messageSelector
+ * @param noLocal
+ * @return TopicSubscriber
+ * @throws JMSException
+ * @see javax.jms.TopicSession#createSubscriber(javax.jms.Topic, java.lang.String, boolean)
+ */
+ public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException {
+ checkClosed();
+ BlazeJmsDestination dest = BlazeJmsDestination.transform(topic);
+ BlazeJmsTopicSubscriber result = new BlazeJmsTopicSubscriber(this, dest, "", false, noLocal);
+ result.setMessageSelector(messageSelector);
+ return result;
+ }
+
+ protected void remove(MessageConsumer consumer) {
+ this.consumers.remove(consumer);
+ }
+
+ protected void remove(MessageProducer producer) {
+ this.producers.remove(producer);
+ }
+
+ protected void send(Destination dest, Message msg, int deliveryMode, int priority, long timeToLive)
+ throws JMSException {
+ BlazeJmsDestination destination = BlazeJmsDestination.transform(dest);
+ BlazeJmsMessage message = BlazeJmsMessageTransformation.transformMessage(msg);
+ send(destination, message, deliveryMode, priority, timeToLive);
+ }
+
+ private void send(BlazeJmsDestination destination, BlazeJmsMessage message, int deliveryMode, int priority,
+ long timeToLive) throws JMSException {
+ message.setJMSDestination(destination);
+ message.setJMSDeliveryMode(deliveryMode);
+ message.setJMSPriority(priority);
+ if (timeToLive > 0) {
+ long timeStamp = System.currentTimeMillis();
+ message.setTimeStamp(timeStamp);
+ message.setExpiration(System.currentTimeMillis() + timeToLive);
+ }
+ try {
+ if (destination.isTopic()) {
+ this.connection.channel.send(destination.getName(), message);
+ } else {
+ this.connection.channel.broadcast(destination.getName(), message);
+ }
+ } catch (Exception e) {
+ throw BlazeJmsExceptionSupport.create(e);
+ }
+ }
+
+ protected void checkClosed() throws IllegalStateException {
+ if (this.closed) {
+ throw new IllegalStateException("The MessageProducer is closed");
+ }
+ }
+}
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsSession.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsTempDestination.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsTempDestination.java?rev=739885&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsTempDestination.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsTempDestination.java Sun Feb 1 23:35:54 2009
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.jms;
+
+import org.apache.activeblaze.Destination;
+
+/**
+ * Temporary Destination
+ *
+ */
+public class BlazeJmsTempDestination extends BlazeJmsDestination{
+
+ /**
+ * Constructor
+ */
+ public BlazeJmsTempDestination(){
+ this("");
+ }
+
+ /**
+ * Constructor
+ * @param dest
+ */
+ public BlazeJmsTempDestination(Destination dest) {
+ super(dest);
+ }
+
+ /**
+ * Constructor
+ * @param name
+ */
+ public BlazeJmsTempDestination(String name){
+ super(name);
+ this.destination.setTemporary(true);
+ }
+}
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsTempDestination.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsTempQueue.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsTempQueue.java?rev=739885&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsTempQueue.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsTempQueue.java Sun Feb 1 23:35:54 2009
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.jms;
+
+import javax.jms.JMSException;
+import javax.jms.TemporaryQueue;
+import org.apache.activeblaze.Destination;
+
+/**
+ * TemporaryQueue
+ *
+ */
+public class BlazeJmsTempQueue extends BlazeJmsTempDestination implements TemporaryQueue {
+ /**
+ * Constructor
+ */
+ public BlazeJmsTempQueue(){
+ this("");
+ }
+
+ /**
+ * Constructor
+ * @param dest
+ */
+ public BlazeJmsTempQueue(Destination dest) {
+ super(dest);
+ }
+
+ /**
+ * Constructor
+ * @param name
+ */
+ public BlazeJmsTempQueue(String name){
+ super(name);
+ this.destination.setTopic(false);
+ }
+
+ /**
+ * @see javax.jms.TemporaryQueue#delete()
+ */
+ public void delete() {
+ // TODO Auto-generated method stub
+
+ }
+
+ /**
+ * @return name
+ * @see javax.jms.Queue#getQueueName()
+ */
+ public String getQueueName() {
+ return getName();
+ }
+}
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsTempQueue.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsTempTopic.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsTempTopic.java?rev=739885&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsTempTopic.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsTempTopic.java Sun Feb 1 23:35:54 2009
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.jms;
+
+import javax.jms.JMSException;
+import javax.jms.TemporaryTopic;
+import org.apache.activeblaze.Destination;
+
+/**
+ * Temporary Topic
+ *
+ */
+public class BlazeJmsTempTopic extends BlazeJmsTempDestination implements TemporaryTopic{
+ /**
+ * Constructor
+ */
+ public BlazeJmsTempTopic(){
+ this("");
+ }
+
+ /**
+ * Constructor
+ * @param dest
+ */
+ public BlazeJmsTempTopic(Destination dest) {
+ super(dest);
+ }
+
+ /**
+ * Constructor
+ * @param name
+ */
+ public BlazeJmsTempTopic(String name){
+ super(name);
+ this.destination.setTopic(true);
+ }
+
+ /**
+ * @see javax.jms.TemporaryTopic#delete()
+ */
+ public void delete() {
+ // TODO Auto-generated method stub
+
+ }
+
+ /**
+ * @return name
+ * @see javax.jms.Topic#getTopicName()
+ */
+ public String getTopicName() {
+ return getName();
+ }
+}
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsTempTopic.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsTopic.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsTopic.java?rev=739885&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsTopic.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsTopic.java Sun Feb 1 23:35:54 2009
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.jms;
+
+import javax.jms.Topic;
+import org.apache.activeblaze.Destination;
+
+/**
+ * TemporaryQueue
+ *
+ */
+public class BlazeJmsTopic extends BlazeJmsDestination implements Topic {
+ /**
+ * Constructor
+ */
+ public BlazeJmsTopic(){
+ this("");
+ }
+
+ /**
+ * Constructor
+ * @param dest
+ */
+ public BlazeJmsTopic(Destination dest) {
+ super(dest);
+ }
+
+ /**
+ * Constructor
+ * @param name
+ */
+ public BlazeJmsTopic(String name){
+ super(name);
+ this.destination.setTopic(true);
+ }
+
+ /**
+ * @return the name
+ * @see javax.jms.Topic#getTopicName()
+ */
+ public String getTopicName() {
+ return getName();
+ }
+}
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsTopic.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsTopicPublisher.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsTopicPublisher.java?rev=739885&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsTopicPublisher.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsTopicPublisher.java Sun Feb 1 23:35:54 2009
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.jms;
+
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Topic;
+import javax.jms.TopicPublisher;
+
+/**
+ * Implementation of a TopicPublisher
+ *
+ */
+public class BlazeJmsTopicPublisher extends BlazeJmsMessageProducer implements TopicPublisher {
+
+ /**
+ * Constructor
+ * @param s
+ * @param destination
+ */
+ protected BlazeJmsTopicPublisher(BlazeJmsSession s, BlazeJmsDestination destination) {
+ super(s, destination);
+ }
+
+ /**
+ * @return the Topic
+ * @throws IllegalStateException
+ * @see javax.jms.TopicPublisher#getTopic()
+ */
+ public Topic getTopic() throws IllegalStateException {
+ checkClosed();
+ return (Topic) this.destination;
+ }
+
+ /**
+ * @param message
+ * @throws JMSException
+ * @see javax.jms.TopicPublisher#publish(javax.jms.Message)
+ */
+ public void publish(Message message) throws JMSException {
+ super.send(message);
+
+ }
+
+ /**
+ * @param topic
+ * @param message
+ * @throws JMSException
+ * @see javax.jms.TopicPublisher#publish(javax.jms.Topic, javax.jms.Message)
+ */
+ public void publish(Topic topic, Message message) throws JMSException {
+ super.send(topic,message);
+
+ }
+
+ /**
+ * @param message
+ * @param deliveryMode
+ * @param priority
+ * @param timeToLive
+ * @throws JMSException
+ * @see javax.jms.TopicPublisher#publish(javax.jms.Message, int, int, long)
+ */
+ public void publish(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
+ super.send(message, deliveryMode, priority, timeToLive);
+
+ }
+
+ /**
+ * @param topic
+ * @param message
+ * @param deliveryMode
+ * @param priority
+ * @param timeToLive
+ * @throws JMSException
+ * @see javax.jms.TopicPublisher#publish(javax.jms.Topic, javax.jms.Message, int, int, long)
+ */
+ public void publish(Topic topic, Message message, int deliveryMode, int priority, long timeToLive)
+ throws JMSException {
+ super.send(topic, message, deliveryMode, priority, timeToLive);
+
+ }
+
+
+}
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsTopicPublisher.java
------------------------------------------------------------------------------
svn:eol-style = native