You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2006/12/21 22:08:41 UTC
svn commit: r489461 [2/5] - in /incubator/qpid/branches/new_persistence/java:
broker/src/main/java/org/apache/qpid/server/exchange/
broker/src/main/java/org/apache/qpid/server/handler/
broker/src/main/java/org/apache/qpid/server/protocol/ broker/src/ma...
Modified: incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java?view=diff&rev=489461&r1=489460&r2=489461
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java Thu Dec 21 13:08:38 2006
@@ -26,25 +26,45 @@
/**
* AMQ implementation of a TemporaryQueue.
*/
-final class AMQTemporaryQueue extends AMQQueue implements TemporaryQueue
+final class AMQTemporaryQueue extends AMQQueue implements TemporaryQueue, TemporaryDestination
{
+ private final AMQSession _session;
+ private boolean _deleted;
+
/**
* Create a new instance of an AMQTemporaryQueue
*/
- public AMQTemporaryQueue()
+ public AMQTemporaryQueue(AMQSession session)
{
super("TempQueue" + Long.toString(System.currentTimeMillis()), true);
+ _session = session;
}
/**
* @see javax.jms.TemporaryQueue#delete()
*/
- public void delete() throws JMSException
+ public synchronized void delete() throws JMSException
{
- throw new UnsupportedOperationException("Delete not supported, " +
- "will auto-delete when connection closed");
+ if(_session.hasConsumer(this))
+ {
+ throw new JMSException("Temporary Queue has consumers so cannot be deleted");
+ }
+ _deleted = true;
+
+ // Currently TemporaryQueue is set to be auto-delete which means that the queue will be deleted
+ // by the server when there are no more subscriptions to that queue. This is probably not
+ // quite right for JMSCompliance.
}
+ public AMQSession getSession()
+ {
+ return _session;
+ }
+
+ public boolean isDeleted()
+ {
+ return _deleted;
+ }
}
Modified: incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryTopic.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryTopic.java?view=diff&rev=489461&r1=489460&r2=489461
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryTopic.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryTopic.java Thu Dec 21 13:08:38 2006
@@ -26,15 +26,18 @@
/**
* AMQ implementation of TemporaryTopic.
*/
-class AMQTemporaryTopic extends AMQTopic implements TemporaryTopic
+class AMQTemporaryTopic extends AMQTopic implements TemporaryTopic, TemporaryDestination
{
+ private final AMQSession _session;
+ private boolean _deleted;
/**
* Create new temporary topic.
*/
- public AMQTemporaryTopic()
+ public AMQTemporaryTopic(AMQSession session)
{
super("TempQueue" + Long.toString(System.currentTimeMillis()));
+ _session = session;
}
/**
@@ -42,8 +45,25 @@
*/
public void delete() throws JMSException
{
- throw new UnsupportedOperationException("Delete not supported, " +
- "will auto-delete when connection closed");
+ if(_session.hasConsumer(this))
+ {
+ throw new JMSException("Temporary Topic has consumers so cannot be deleted");
+ }
+
+ _deleted = true;
+ // Currently TemporaryQueue is set to be auto-delete which means that the queue will be deleted
+ // by the server when there are no more subscriptions to that queue. This is probably not
+ // quite right for JMSCompliance.
+ }
+
+ public AMQSession getSession()
+ {
+ return _session;
+ }
+
+ public boolean isDeleted()
+ {
+ return _deleted;
}
}
Modified: incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java?view=diff&rev=489461&r1=489460&r2=489461
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java Thu Dec 21 13:08:38 2006
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -40,20 +40,25 @@
public AMQTopic(String name)
{
- super(ExchangeDefaults.TOPIC_EXCHANGE_NAME, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, name, true, true, null);
- _isDurable = false;
+ this(name, true, null, false);
}
- /**
- * Constructor for use in creating a topic to represent a durable subscription
- * @param topic
- * @param clientId
- * @param subscriptionName
- */
- public AMQTopic(AMQTopic topic, String clientId, String subscriptionName)
+ public AMQTopic(String name, boolean isAutoDelete, String queueName, boolean isDurable)
+ {
+ super(ExchangeDefaults.TOPIC_EXCHANGE_NAME, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, name, true, isAutoDelete,
+ queueName, isDurable);
+ }
+
+ public static AMQTopic createDurableTopic(AMQTopic topic, String subscriptionName, AMQConnection connection)
+ throws JMSException
+ {
+ return new AMQTopic(topic.getDestinationName(), false, getDurableTopicQueueName(subscriptionName, connection),
+ true);
+ }
+
+ public static String getDurableTopicQueueName(String subscriptionName, AMQConnection connection) throws JMSException
{
- super(ExchangeDefaults.TOPIC_EXCHANGE_NAME, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, topic.getDestinationName(), true, false, clientId + ":" + subscriptionName);
- _isDurable = true;
+ return connection.getClientID() + ":" + subscriptionName;
}
public String getTopicName() throws JMSException
Modified: incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?view=diff&rev=489461&r1=489460&r2=489461
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Thu Dec 21 13:08:38 2006
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -214,10 +214,10 @@
{
//handle case where connection has already been started, and the dispatcher is blocked
//doing a put on the _synchronousQueue
- Object msg = _synchronousQueue.poll();
- if (msg != null)
+ AbstractJMSMessage jmsMsg = (AbstractJMSMessage)_synchronousQueue.poll();
+ if (jmsMsg != null)
{
- AbstractJMSMessage jmsMsg = (AbstractJMSMessage) msg;
+ _session.setLastDeliveredMessage(jmsMsg);
messageListener.onMessage(jmsMsg);
postDeliver(jmsMsg);
}
@@ -280,7 +280,7 @@
public Message receive(long l) throws JMSException
{
checkPreConditions();
-
+
acquireReceiving();
try
@@ -297,12 +297,15 @@
final AbstractJMSMessage m = returnMessageOrThrow(o);
if (m != null)
{
+ _session.setLastDeliveredMessage(m);
postDeliver(m);
}
+
return m;
}
catch (InterruptedException e)
{
+ _logger.warn("Interrupted: " + e, e);
return null;
}
finally
@@ -323,8 +326,10 @@
final AbstractJMSMessage m = returnMessageOrThrow(o);
if (m != null)
{
+ _session.setLastDeliveredMessage(m);
postDeliver(m);
}
+
return m;
}
finally
@@ -423,6 +428,7 @@
{
//we do not need a lock around the test above, and the dispatch below as it is invalid
//for an application to alter an installed listener while the session is started
+ _session.setLastDeliveredMessage(jmsMessage);
getMessageListener().onMessage(jmsMessage);
postDeliver(jmsMessage);
}
@@ -459,8 +465,9 @@
}
}
- private void postDeliver(AbstractJMSMessage msg)
+ private void postDeliver(AbstractJMSMessage msg) throws JMSException
{
+ msg.setJMSDestination(_destination);
switch (_acknowledgeMode)
{
case Session.DUPS_OK_ACKNOWLEDGE:
@@ -522,7 +529,7 @@
*/
private void deregisterConsumer()
{
- _session.deregisterConsumer(_consumerTag);
+ _session.deregisterConsumer(this);
}
public String getConsumerTag()
@@ -531,18 +538,18 @@
}
public void setConsumerTag(String consumerTag)
- {
+ {
_consumerTag = consumerTag;
}
public AMQSession getSession() {
return _session;
}
-
+
private void checkPreConditions() throws JMSException{
-
+
this.checkNotClosed();
-
+
if(_session == null || _session.isClosed()){
throw new javax.jms.IllegalStateException("Invalid Session");
}
Modified: incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java?view=diff&rev=489461&r1=489460&r2=489461
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java Thu Dec 21 13:08:38 2006
@@ -24,11 +24,13 @@
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.message.AbstractJMSMessage;
+import org.apache.qpid.client.message.JMSBytesMessage;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.framing.*;
import javax.jms.*;
import java.io.UnsupportedEncodingException;
+import java.util.Enumeration;
public class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer
{
@@ -140,7 +142,7 @@
public void setDisableMessageID(boolean b) throws JMSException
{
- checkPreConditions();
+ checkPreConditions();
checkNotClosed();
// IGNORED
}
@@ -154,7 +156,7 @@
public void setDisableMessageTimestamp(boolean b) throws JMSException
{
- checkPreConditions();
+ checkPreConditions();
_disableTimestamps = b;
}
@@ -166,11 +168,11 @@
public void setDeliveryMode(int i) throws JMSException
{
- checkPreConditions();
+ checkPreConditions();
if (i != DeliveryMode.NON_PERSISTENT && i != DeliveryMode.PERSISTENT)
{
throw new JMSException("DeliveryMode must be either NON_PERSISTENT or PERSISTENT. Value of " + i +
- " is illegal");
+ " is illegal");
}
_deliveryMode = i;
}
@@ -183,7 +185,7 @@
public void setPriority(int i) throws JMSException
{
- checkPreConditions();
+ checkPreConditions();
if (i < 0 || i > 9)
{
throw new IllegalArgumentException("Priority of " + i + " is illegal. Value must be in range 0 to 9");
@@ -199,7 +201,7 @@
public void setTimeToLive(long l) throws JMSException
{
- checkPreConditions();
+ checkPreConditions();
if (l < 0)
{
throw new IllegalArgumentException("Time to live must be non-negative - supplied value was " + l);
@@ -227,33 +229,36 @@
public void send(Message message) throws JMSException
{
- checkPreConditions();
- checkInitialDestination();
+ checkPreConditions();
+ checkInitialDestination();
+
+
synchronized (_connection.getFailoverMutex())
{
- sendImpl(_destination, (AbstractJMSMessage) message, _deliveryMode, _messagePriority, _timeToLive,
+ sendImpl(_destination, message, _deliveryMode, _messagePriority, _timeToLive,
_mandatory, _immediate);
}
}
public void send(Message message, int deliveryMode) throws JMSException
{
- checkPreConditions();
- checkInitialDestination();
+ checkPreConditions();
+ checkInitialDestination();
+
synchronized (_connection.getFailoverMutex())
{
- sendImpl(_destination, (AbstractJMSMessage) message, deliveryMode, _messagePriority, _timeToLive,
+ sendImpl(_destination, message, deliveryMode, _messagePriority, _timeToLive,
_mandatory, _immediate);
}
}
public void send(Message message, int deliveryMode, boolean immediate) throws JMSException
{
- checkPreConditions();
- checkInitialDestination();
+ checkPreConditions();
+ checkInitialDestination();
synchronized (_connection.getFailoverMutex())
{
- sendImpl(_destination, (AbstractJMSMessage) message, deliveryMode, _messagePriority, _timeToLive,
+ sendImpl(_destination, message, deliveryMode, _messagePriority, _timeToLive,
_mandatory, immediate);
}
}
@@ -261,23 +266,23 @@
public void send(Message message, int deliveryMode, int priority,
long timeToLive) throws JMSException
{
- checkPreConditions();
- checkInitialDestination();
+ checkPreConditions();
+ checkInitialDestination();
synchronized (_connection.getFailoverMutex())
{
- sendImpl(_destination, (AbstractJMSMessage)message, deliveryMode, priority, timeToLive, _mandatory,
+ sendImpl(_destination, message, deliveryMode, priority, timeToLive, _mandatory,
_immediate);
}
}
public void send(Destination destination, Message message) throws JMSException
{
- checkPreConditions();
- checkDestination(destination);
+ checkPreConditions();
+ checkDestination(destination);
synchronized (_connection.getFailoverMutex())
{
validateDestination(destination);
- sendImpl((AMQDestination) destination, (AbstractJMSMessage) message, _deliveryMode, _messagePriority, _timeToLive,
+ sendImpl((AMQDestination) destination, message, _deliveryMode, _messagePriority, _timeToLive,
_mandatory, _immediate);
}
}
@@ -286,12 +291,12 @@
int priority, long timeToLive)
throws JMSException
{
- checkPreConditions();
- checkDestination(destination);
+ checkPreConditions();
+ checkDestination(destination);
synchronized (_connection.getFailoverMutex())
{
validateDestination(destination);
- sendImpl((AMQDestination) destination, (AbstractJMSMessage) message, deliveryMode, priority, timeToLive,
+ sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive,
_mandatory, _immediate);
}
}
@@ -305,7 +310,7 @@
synchronized (_connection.getFailoverMutex())
{
validateDestination(destination);
- sendImpl((AMQDestination) destination, (AbstractJMSMessage) message, deliveryMode, priority, timeToLive,
+ sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive,
mandatory, _immediate);
}
}
@@ -314,12 +319,12 @@
int priority, long timeToLive, boolean mandatory, boolean immediate)
throws JMSException
{
- checkPreConditions();
- checkDestination(destination);
+ checkPreConditions();
+ checkDestination(destination);
synchronized (_connection.getFailoverMutex())
{
validateDestination(destination);
- sendImpl((AMQDestination) destination, (AbstractJMSMessage) message, deliveryMode, priority, timeToLive,
+ sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive,
mandatory, immediate);
}
}
@@ -329,27 +334,158 @@
boolean immediate, boolean waitUntilSent)
throws JMSException
{
- checkPreConditions();
- checkDestination(destination);
+ checkPreConditions();
+ checkDestination(destination);
synchronized (_connection.getFailoverMutex())
{
validateDestination(destination);
- sendImpl((AMQDestination) destination, (AbstractJMSMessage) message, deliveryMode, priority, timeToLive,
+ sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive,
mandatory, immediate, waitUntilSent);
}
}
+
+ private AbstractJMSMessage convertToNativeMessage(Message message) throws JMSException
+ {
+ if (message instanceof AbstractJMSMessage)
+ {
+ return (AbstractJMSMessage) message;
+ }
+ else
+ {
+ AbstractJMSMessage newMessage;
+
+ if (message instanceof BytesMessage)
+ {
+ BytesMessage bytesMessage = (BytesMessage) message;
+ bytesMessage.reset();
+
+ JMSBytesMessage nativeMsg = (JMSBytesMessage) _session.createBytesMessage();
+
+
+ byte[] buf = new byte[1024];
+
+ int len;
+
+ while ((len = bytesMessage.readBytes(buf)) != -1)
+ {
+ nativeMsg.writeBytes(buf, 0, len);
+ }
+
+ newMessage = nativeMsg;
+ }
+ else if (message instanceof MapMessage)
+ {
+ MapMessage origMessage = (MapMessage) message;
+ MapMessage nativeMessage = _session.createMapMessage();
+
+ Enumeration mapNames = origMessage.getMapNames();
+ while (mapNames.hasMoreElements())
+ {
+ String name = (String) mapNames.nextElement();
+ nativeMessage.setObject(name, origMessage.getObject(name));
+ }
+ newMessage = (AbstractJMSMessage) nativeMessage;
+ }
+ else if (message instanceof ObjectMessage)
+ {
+ ObjectMessage origMessage = (ObjectMessage) message;
+ ObjectMessage nativeMessage = _session.createObjectMessage();
+
+ nativeMessage.setObject(origMessage.getObject());
+
+ newMessage = (AbstractJMSMessage) nativeMessage;
+ }
+ else if (message instanceof TextMessage)
+ {
+ TextMessage origMessage = (TextMessage) message;
+ TextMessage nativeMessage = _session.createTextMessage();
+
+ nativeMessage.setText(origMessage.getText());
+
+ newMessage = (AbstractJMSMessage) nativeMessage;
+ }
+ else if (message instanceof StreamMessage)
+ {
+ StreamMessage origMessage = (StreamMessage) message;
+ StreamMessage nativeMessage = _session.createStreamMessage();
+
+
+ try
+ {
+ origMessage.reset();
+ while (true)
+ {
+ nativeMessage.writeObject(origMessage.readObject());
+ }
+ }
+ catch (MessageEOFException e)
+ {
+ ;//
+ }
+ newMessage = (AbstractJMSMessage) nativeMessage;
+ }
+ else
+ {
+ newMessage = (AbstractJMSMessage) _session.createMessage();
+
+ }
+
+ Enumeration propertyNames = message.getPropertyNames();
+ while (propertyNames.hasMoreElements())
+ {
+ String propertyName = String.valueOf(propertyNames.nextElement());
+ if (!propertyName.startsWith("JMSX_"))
+ {
+ Object value = message.getObjectProperty(propertyName);
+ newMessage.setObjectProperty(propertyName, value);
+ }
+ }
+
+ newMessage.setJMSDeliveryMode(message.getJMSDeliveryMode());
+
+
+ int priority = message.getJMSPriority();
+ if (priority < 0)
+ {
+ priority = 0;
+ }
+ else if (priority > 9)
+ {
+ priority = 9;
+ }
+
+ newMessage.setJMSPriority(priority);
+ if (message.getJMSReplyTo() != null)
+ {
+ newMessage.setJMSReplyTo(message.getJMSReplyTo());
+ }
+ newMessage.setJMSType(message.getJMSType());
+
+
+ if (newMessage != null)
+ {
+ return newMessage;
+ }
+ else
+ {
+ throw new JMSException("Unable to send message, due to class conversion error: " + message.getClass().getName());
+ }
+ }
+ }
+
+
private void validateDestination(Destination destination) throws JMSException
{
if (!(destination instanceof AMQDestination))
{
throw new JMSException("Unsupported destination class: " +
- (destination != null ? destination.getClass() : null));
+ (destination != null ? destination.getClass() : null));
}
- declareDestination((AMQDestination)destination);
+ declareDestination((AMQDestination) destination);
}
- protected void sendImpl(AMQDestination destination, AbstractJMSMessage message, int deliveryMode, int priority,
+ protected void sendImpl(AMQDestination destination, Message message, int deliveryMode, int priority,
long timeToLive, boolean mandatory, boolean immediate) throws JMSException
{
sendImpl(destination, message, deliveryMode, priority, timeToLive, mandatory, immediate, _waitUntilSent);
@@ -357,8 +493,9 @@
/**
* The caller of this method must hold the failover mutex.
+ *
* @param destination
- * @param message
+ * @param origMessage
* @param deliveryMode
* @param priority
* @param timeToLive
@@ -366,9 +503,12 @@
* @param immediate
* @throws JMSException
*/
- protected void sendImpl(AMQDestination destination, AbstractJMSMessage message, int deliveryMode, int priority,
+ protected void sendImpl(AMQDestination destination, Message origMessage, int deliveryMode, int priority,
long timeToLive, boolean mandatory, boolean immediate, boolean wait) throws JMSException
{
+ checkTemporaryDestination(destination);
+
+ AbstractJMSMessage message = convertToNativeMessage(origMessage);
AMQFrame publishFrame = BasicPublishBody.createAMQFrame(_channelId, 0, destination.getExchangeName(),
destination.getRoutingKey(), mandatory, immediate);
@@ -424,11 +564,42 @@
frames[1] = contentHeaderFrame;
CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames);
_protocolHandler.writeFrame(compositeFrame, wait);
+
+
+ if (message != origMessage)
+ {
+ _logger.warn("Updating original message");
+ origMessage.setJMSPriority(message.getJMSPriority());
+ origMessage.setJMSTimestamp(message.getJMSTimestamp());
+ _logger.warn("Setting JMSExpiration:" + message.getJMSExpiration());
+ origMessage.setJMSExpiration(message.getJMSExpiration());
+ origMessage.setJMSMessageID(message.getJMSMessageID());
+ }
+ }
+
+ private void checkTemporaryDestination(AMQDestination destination) throws JMSException
+ {
+ if(destination instanceof TemporaryDestination)
+ {
+ _logger.debug("destination is temporary destination");
+ TemporaryDestination tempDest = (TemporaryDestination) destination;
+ if(tempDest.getSession().isClosed())
+ {
+ _logger.debug("session is closed");
+ throw new JMSException("Session for temporary destination has been closed");
+ }
+ if(tempDest.isDeleted())
+ {
+ _logger.debug("destination is deleted");
+ throw new JMSException("Cannot send to a deleted temporary destination");
+ }
+ }
}
/**
* Create content bodies. This will split a large message into numerous bodies depending on the negotiated
* maximum frame size.
+ *
* @param payload
* @return the array of content bodies
*/
@@ -458,8 +629,8 @@
for (int i = 0; i < bodies.length; i++)
{
bodies[i] = new ContentBody();
- payload.position((int)framePayloadMax * i);
- int length = (remaining >= framePayloadMax) ? (int)framePayloadMax : (int)remaining;
+ payload.position((int) framePayloadMax * i);
+ int length = (remaining >= framePayloadMax) ? (int) framePayloadMax : (int) remaining;
payload.limit(payload.position() + length);
bodies[i].payload = payload.slice();
remaining -= length;
@@ -480,32 +651,42 @@
_encoding = encoding;
}
- private void checkPreConditions() throws javax.jms.IllegalStateException, JMSException {
- checkNotClosed();
+ private void checkPreConditions() throws javax.jms.IllegalStateException, JMSException
+ {
+ checkNotClosed();
+
+ if (_session == null || _session.isClosed())
+ {
+ throw new javax.jms.IllegalStateException("Invalid Session");
+ }
+ }
+
+ private void checkInitialDestination()
+ {
+ if (_destination == null)
+ {
+ throw new UnsupportedOperationException("Destination is null");
+ }
+ }
+
+ private void checkDestination(Destination suppliedDestination) throws InvalidDestinationException
+ {
+ if (_destination != null && suppliedDestination != null)
+ {
+ throw new UnsupportedOperationException("This message producer was created with a Destination, therefore you cannot use an unidentified Destination");
+ }
+
+ if (suppliedDestination == null)
+ {
+ throw new InvalidDestinationException("Supplied Destination was invalid");
+ }
+
+
+ }
+
- if(_session == null || _session.isClosed()){
- throw new javax.jms.IllegalStateException("Invalid Session");
- }
- }
-
- private void checkInitialDestination(){
- if(_destination == null){
- throw new UnsupportedOperationException("Destination is null");
- }
- }
-
- private void checkDestination(Destination suppliedDestination) throws InvalidDestinationException{
- if (_destination != null && suppliedDestination != null){
- throw new UnsupportedOperationException("This message producer was created with a Destination, therefore you cannot use an unidentified Destination");
- }
-
- if (suppliedDestination == null){
- throw new InvalidDestinationException("Supplied Destination was invalid");
- }
- }
-
-
- public AMQSession getSession() {
- return _session;
- }
+ public AMQSession getSession()
+ {
+ return _session;
+ }
}
Modified: incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java?view=diff&rev=489461&r1=489460&r2=489461
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java Thu Dec 21 13:08:38 2006
@@ -10,119 +10,124 @@
public class QueueSenderAdapter implements QueueSender {
- private MessageProducer delegate;
- private Queue queue;
+ private MessageProducer _delegate;
+ private Queue _queue;
private boolean closed = false;
public QueueSenderAdapter(MessageProducer msgProducer, Queue queue){
- delegate = msgProducer;
- this.queue = queue;
+ _delegate = msgProducer;
+ _queue = queue;
}
public Queue getQueue() throws JMSException {
checkPreConditions();
- return queue;
+ return _queue;
}
public void send(Message msg) throws JMSException {
checkPreConditions();
- delegate.send(msg);
+ _delegate.send(msg);
}
public void send(Queue queue, Message msg) throws JMSException {
- checkPreConditions();
- delegate.send(queue, msg);
+ checkPreConditions(queue);
+ _delegate.send(queue, msg);
}
public void publish(Message msg, int deliveryMode, int priority, long timeToLive)
throws JMSException {
checkPreConditions();
- delegate.send(msg, deliveryMode,priority,timeToLive);
+ _delegate.send(msg, deliveryMode,priority,timeToLive);
}
public void send(Queue queue,Message msg, int deliveryMode, int priority, long timeToLive)
throws JMSException {
- checkPreConditions();
- delegate.send(queue,msg, deliveryMode,priority,timeToLive);
+ checkPreConditions(queue);
+ _delegate.send(queue,msg, deliveryMode,priority,timeToLive);
}
public void close() throws JMSException {
- delegate.close();
+ _delegate.close();
closed = true;
}
public int getDeliveryMode() throws JMSException {
checkPreConditions();
- return delegate.getDeliveryMode();
+ return _delegate.getDeliveryMode();
}
public Destination getDestination() throws JMSException {
checkPreConditions();
- return delegate.getDestination();
+ return _delegate.getDestination();
}
public boolean getDisableMessageID() throws JMSException {
checkPreConditions();
- return delegate.getDisableMessageID();
+ return _delegate.getDisableMessageID();
}
public boolean getDisableMessageTimestamp() throws JMSException {
checkPreConditions();
- return delegate.getDisableMessageTimestamp();
+ return _delegate.getDisableMessageTimestamp();
}
public int getPriority() throws JMSException {
checkPreConditions();
- return delegate.getPriority();
+ return _delegate.getPriority();
}
public long getTimeToLive() throws JMSException {
checkPreConditions();
- return delegate.getTimeToLive();
+ return _delegate.getTimeToLive();
}
public void send(Destination dest, Message msg) throws JMSException {
- checkPreConditions();
- delegate.send(dest,msg);
+ checkPreConditions((Queue)dest);
+ _delegate.send(dest,msg);
}
public void send(Message msg, int deliveryMode, int priority, long timeToLive)
throws JMSException {
checkPreConditions();
- delegate.send(msg, deliveryMode,priority,timeToLive);
+ _delegate.send(msg, deliveryMode,priority,timeToLive);
}
public void send(Destination dest, Message msg, int deliveryMode, int priority, long timeToLive) throws JMSException {
- checkPreConditions();
- delegate.send(dest,msg, deliveryMode,priority,timeToLive);
+ checkPreConditions((Queue)dest);
+ _delegate.send(dest,msg, deliveryMode,priority,timeToLive);
}
public void setDeliveryMode(int deliveryMode) throws JMSException {
checkPreConditions();
- delegate.setDeliveryMode(deliveryMode);
+ _delegate.setDeliveryMode(deliveryMode);
}
public void setDisableMessageID(boolean disableMessageID) throws JMSException {
checkPreConditions();
- delegate.setDisableMessageID(disableMessageID);
+ _delegate.setDisableMessageID(disableMessageID);
}
public void setDisableMessageTimestamp(boolean disableMessageTimestamp) throws JMSException {
checkPreConditions();
- delegate.setDisableMessageTimestamp(disableMessageTimestamp);
+ _delegate.setDisableMessageTimestamp(disableMessageTimestamp);
}
public void setPriority(int priority) throws JMSException {
checkPreConditions();
- delegate.setPriority(priority);
+ _delegate.setPriority(priority);
}
public void setTimeToLive(long timeToLive) throws JMSException {
checkPreConditions();
- delegate.setTimeToLive(timeToLive);
+ _delegate.setTimeToLive(timeToLive);
}
-
- private void checkPreConditions() throws IllegalStateException, IllegalStateException {
+
+ private void checkPreConditions() throws IllegalStateException, IllegalStateException
+ {
+ checkPreConditions(_queue);
+ }
+
+ private void checkPreConditions(Queue queue) throws IllegalStateException, IllegalStateException {
if (closed){
throw new javax.jms.IllegalStateException("Publisher is closed");
}
@@ -131,7 +136,7 @@
throw new UnsupportedOperationException("Queue is null");
}
- AMQSession session = ((BasicMessageProducer)delegate).getSession();
+ AMQSession session = ((BasicMessageProducer) _delegate).getSession();
if(session == null || session.isClosed()){
throw new javax.jms.IllegalStateException("Invalid Session");
Modified: incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java?view=diff&rev=489461&r1=489460&r2=489461
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java Thu Dec 21 13:08:38 2006
@@ -35,10 +35,10 @@
class TopicSubscriberAdaptor implements TopicSubscriber
{
private final Topic _topic;
- private final MessageConsumer _consumer;
+ private final BasicMessageConsumer _consumer;
private final boolean _noLocal;
- TopicSubscriberAdaptor(Topic topic, MessageConsumer consumer, boolean noLocal)
+ TopicSubscriberAdaptor(Topic topic, BasicMessageConsumer consumer, boolean noLocal)
{
_topic = topic;
_consumer = consumer;
@@ -119,4 +119,10 @@
throw new javax.jms.IllegalStateException("Invalid Session");
}
}
+
+ BasicMessageConsumer getMessageConsumer()
+ {
+ return _consumer;
+ }
+
}
Modified: incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java?view=diff&rev=489461&r1=489460&r2=489461
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java Thu Dec 21 13:08:38 2006
@@ -32,16 +32,18 @@
import org.apache.qpid.client.JmsNotImplementedException;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.JMSPropertyFieldTable;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageNotReadableException;
import javax.jms.MessageNotWriteableException;
+import javax.jms.MessageFormatException;
import java.util.Collections;
import java.util.Enumeration;
import java.util.Map;
-public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms.Message
+public abstract class AbstractJMSMessage extends AMQMessage implements org.apache.qpid.jms.Message
{
private static final Map _destinationCache = Collections.synchronizedMap(new ReferenceMap());
@@ -50,7 +52,8 @@
protected ByteBuffer _data;
private boolean _readableProperties = false;
private boolean _readableMessage = false;
-
+ private Destination _destination;
+
protected AbstractJMSMessage(ByteBuffer data)
{
super(new BasicContentHeaderProperties());
@@ -174,12 +177,12 @@
public Destination getJMSDestination() throws JMSException
{
// TODO: implement this once we have sorted out how to figure out the exchange class
- throw new JmsNotImplementedException();
+ return _destination;
}
public void setJMSDestination(Destination destination) throws JMSException
{
- throw new JmsNotImplementedException();
+ _destination = destination;
}
public int getJMSDeliveryMode() throws JMSException
@@ -234,7 +237,7 @@
public void clearProperties() throws JMSException
{
- getJmsContentHeaderProperties().getHeaders().clear();
+ getJmsContentHeaderProperties().getJMSHeaders().clear();
_readableProperties = false;
}
@@ -249,138 +252,139 @@
public boolean propertyExists(String propertyName) throws JMSException
{
checkPropertyName(propertyName);
- return getJmsContentHeaderProperties().getHeaders().propertyExists(propertyName);
+ return getJmsContentHeaderProperties().getJMSHeaders().propertyExists(propertyName);
}
public boolean getBooleanProperty(String propertyName) throws JMSException
{
checkPropertyName(propertyName);
- return getJmsContentHeaderProperties().getHeaders().getBoolean(propertyName);
+
+ return getJmsContentHeaderProperties().getJMSHeaders().getBoolean(propertyName);
}
public byte getByteProperty(String propertyName) throws JMSException
{
checkPropertyName(propertyName);
- return getJmsContentHeaderProperties().getHeaders().getByte(propertyName);
+ return getJmsContentHeaderProperties().getJMSHeaders().getByte(propertyName);
}
public short getShortProperty(String propertyName) throws JMSException
{
checkPropertyName(propertyName);
- return getJmsContentHeaderProperties().getHeaders().getShort(propertyName);
+ return getJmsContentHeaderProperties().getJMSHeaders().getShort(propertyName);
}
public int getIntProperty(String propertyName) throws JMSException
{
checkPropertyName(propertyName);
- return getJmsContentHeaderProperties().getHeaders().getInteger(propertyName);
+ return getJmsContentHeaderProperties().getJMSHeaders().getInteger(propertyName);
}
public long getLongProperty(String propertyName) throws JMSException
{
checkPropertyName(propertyName);
- return getJmsContentHeaderProperties().getHeaders().getLong(propertyName);
+ return getJmsContentHeaderProperties().getJMSHeaders().getLong(propertyName);
}
public float getFloatProperty(String propertyName) throws JMSException
{
checkPropertyName(propertyName);
- return getJmsContentHeaderProperties().getHeaders().getFloat(propertyName);
+ return getJmsContentHeaderProperties().getJMSHeaders().getFloat(propertyName);
}
public double getDoubleProperty(String propertyName) throws JMSException
{
checkPropertyName(propertyName);
- return getJmsContentHeaderProperties().getHeaders().getDouble(propertyName);
+ return getJmsContentHeaderProperties().getJMSHeaders().getDouble(propertyName);
}
public String getStringProperty(String propertyName) throws JMSException
{
checkPropertyName(propertyName);
- return getJmsContentHeaderProperties().getHeaders().getString(propertyName);
+ return getJmsContentHeaderProperties().getJMSHeaders().getString(propertyName);
}
public Object getObjectProperty(String propertyName) throws JMSException
{
checkPropertyName(propertyName);
- return getJmsContentHeaderProperties().getHeaders().getObject(propertyName);
+ return getJmsContentHeaderProperties().getJMSHeaders().getObject(propertyName);
}
public Enumeration getPropertyNames() throws JMSException
{
- return getJmsContentHeaderProperties().getHeaders().getPropertyNames();
+ return getJmsContentHeaderProperties().getJMSHeaders().getPropertyNames();
}
public void setBooleanProperty(String propertyName, boolean b) throws JMSException
{
checkWritableProperties();
checkPropertyName(propertyName);
- getJmsContentHeaderProperties().getHeaders().setBoolean(propertyName, b);
+ getJmsContentHeaderProperties().getJMSHeaders().setBoolean(propertyName, b);
}
public void setByteProperty(String propertyName, byte b) throws JMSException
{
checkWritableProperties();
checkPropertyName(propertyName);
- getJmsContentHeaderProperties().getHeaders().setByte(propertyName, new Byte(b));
+ getJmsContentHeaderProperties().getJMSHeaders().setByte(propertyName, new Byte(b));
}
public void setShortProperty(String propertyName, short i) throws JMSException
{
checkWritableProperties();
checkPropertyName(propertyName);
- getJmsContentHeaderProperties().getHeaders().setShort(propertyName, new Short(i));
+ getJmsContentHeaderProperties().getJMSHeaders().setShort(propertyName, new Short(i));
}
public void setIntProperty(String propertyName, int i) throws JMSException
{
checkWritableProperties();
checkPropertyName(propertyName);
- getJmsContentHeaderProperties().getHeaders().setInteger(propertyName, new Integer(i));
+ getJmsContentHeaderProperties().getJMSHeaders().setInteger(propertyName, new Integer(i));
}
public void setLongProperty(String propertyName, long l) throws JMSException
{
checkWritableProperties();
checkPropertyName(propertyName);
- getJmsContentHeaderProperties().getHeaders().setLong(propertyName, new Long(l));
+ getJmsContentHeaderProperties().getJMSHeaders().setLong(propertyName, new Long(l));
}
public void setFloatProperty(String propertyName, float f) throws JMSException
{
checkWritableProperties();
checkPropertyName(propertyName);
- getJmsContentHeaderProperties().getHeaders().setFloat(propertyName, new Float(f));
+ getJmsContentHeaderProperties().getJMSHeaders().setFloat(propertyName, new Float(f));
}
public void setDoubleProperty(String propertyName, double v) throws JMSException
{
checkWritableProperties();
checkPropertyName(propertyName);
- getJmsContentHeaderProperties().getHeaders().setDouble(propertyName, new Double(v));
+ getJmsContentHeaderProperties().getJMSHeaders().setDouble(propertyName, new Double(v));
}
public void setStringProperty(String propertyName, String value) throws JMSException
{
checkWritableProperties();
checkPropertyName(propertyName);
- getJmsContentHeaderProperties().getHeaders().setString(propertyName, value);
+ getJmsContentHeaderProperties().getJMSHeaders().setString(propertyName, value);
}
public void setObjectProperty(String propertyName, Object object) throws JMSException
{
checkWritableProperties();
checkPropertyName(propertyName);
- getJmsContentHeaderProperties().getHeaders().setObject(propertyName, object);
+ getJmsContentHeaderProperties().getJMSHeaders().setObject(propertyName, object);
}
protected void removeProperty(String propertyName) throws JMSException
{
checkPropertyName(propertyName);
- getJmsContentHeaderProperties().getHeaders().remove(propertyName);
+ getJmsContentHeaderProperties().getJMSHeaders().remove(propertyName);
}
- public void acknowledge() throws JMSException
+ public void acknowledgeThis() throws JMSException
{
// the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge
// is not specified. In our case, we only set the session field where client acknowledge mode is specified.
@@ -397,6 +401,14 @@
}
}
+ public void acknowledge() throws JMSException
+ {
+ if(_session != null)
+ {
+ _session.acknowledge();
+ }
+ }
+
/**
* This forces concrete classes to implement clearBody()
@@ -426,13 +438,13 @@
buf.append("\nJMS reply to: ").append(String.valueOf(getJMSReplyTo()));
buf.append("\nAMQ message number: ").append(_deliveryTag);
buf.append("\nProperties:");
- if (getJmsContentHeaderProperties().getHeaders().isEmpty())
+ if (getJmsContentHeaderProperties().getJMSHeaders().isEmpty())
{
buf.append("<NONE>");
}
else
{
- buf.append('\n').append(getJmsContentHeaderProperties().getHeaders());
+ buf.append('\n').append(getJmsContentHeaderProperties().getJMSHeaders());
}
return buf.toString();
}
@@ -462,9 +474,6 @@
{
throw new IllegalArgumentException("Property name must not be the empty string");
}
-
- // Call to ensure that the it has been set.
- getJmsContentHeaderProperties().getHeaders();
}
public BasicContentHeaderProperties getJmsContentHeaderProperties()
@@ -478,14 +487,7 @@
// position beyond the start
if (_data != null)
{
- if (!_readableMessage)
- {
- _data.flip();
- }
- else
- {
- _data.rewind();
- }
+ reset();
}
return _data;
}
@@ -524,7 +526,7 @@
return !_readableMessage;
}
- public void reset() throws JMSException
+ public void reset()
{
if (_readableMessage)
{
Modified: incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java?view=diff&rev=489461&r1=489460&r2=489461
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java Thu Dec 21 13:08:38 2006
@@ -30,6 +30,9 @@
import javax.jms.MessageEOFException;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
+import java.nio.charset.CharsetEncoder;
+import java.nio.charset.CharsetDecoder;
+import java.nio.CharBuffer;
public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessage
{
@@ -149,10 +152,27 @@
checkReadable();
// we check only for one byte since theoretically the string could be only a
// single byte when using UTF-8 encoding
- checkAvailable(1);
+
try
{
- return _data.getString(Charset.forName("UTF-8").newDecoder());
+ short length = readShort();
+ if(length == 0)
+ {
+ return "";
+ }
+ else
+ {
+ CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder();
+ ByteBuffer encodedString = _data.slice();
+ encodedString.limit(length);
+ _data.position(_data.position()+length);
+ CharBuffer string = decoder.decode(encodedString.buf());
+
+ return string.toString();
+ }
+
+
+
}
catch (CharacterCodingException e)
{
@@ -257,9 +277,15 @@
checkWritable();
try
{
- _data.putString(string, Charset.forName("UTF-8").newEncoder());
+ CharsetEncoder encoder = Charset.forName("UTF-8").newEncoder();
+ java.nio.ByteBuffer encodedString = encoder.encode(CharBuffer.wrap(string));
+
+ _data.putShort((short)encodedString.limit());
+ _data.put(encodedString);
+
+ //_data.putString(string, Charset.forName("UTF-8").newEncoder());
// we must add the null terminator manually
- _data.put((byte)0);
+ //_data.put((byte)0);
}
catch (CharacterCodingException e)
{
Modified: incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java?view=diff&rev=489461&r1=489460&r2=489461
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java Thu Dec 21 13:08:38 2006
@@ -25,6 +25,8 @@
import org.apache.qpid.framing.FieldTableFactory;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.EncodingUtils;
+import org.apache.qpid.framing.JMSPropertyFieldTable;
+import org.apache.qpid.framing.AMQFrameDecodingException;
import org.apache.qpid.AMQException;
import org.apache.log4j.Logger;
@@ -39,7 +41,7 @@
public static final String MIME_TYPE = "jms/map-message";
- private PropertyFieldTable _map;
+ private JMSPropertyFieldTable _properties;
JMSMapMessage() throws JMSException
{
@@ -49,10 +51,9 @@
JMSMapMessage(ByteBuffer data) throws JMSException
{
super(data); // this instantiates a content header
- _map = new PropertyFieldTable();
+ _properties = new JMSPropertyFieldTable();
}
-
JMSMapMessage(long messageNbr, ContentHeaderBody contentHeader, ByteBuffer data)
throws AMQException
{
@@ -62,19 +63,33 @@
{
long tableSize = EncodingUtils.readInteger(_data);
- _map = (PropertyFieldTable) FieldTableFactory.newFieldTable(_data, tableSize);
-
+ try
+ {
+ _properties = new JMSPropertyFieldTable(_data, tableSize);
+ }
+ catch (JMSException e)
+ {
+ Exception error = e.getLinkedException();
+ if (error instanceof AMQFrameDecodingException)
+ {
+ throw(AMQFrameDecodingException) error;
+ }
+ else
+ {
+ throw new AMQException(e.getMessage(), e);
+ }
+ }
}
else
{
- _map = (PropertyFieldTable) FieldTableFactory.newFieldTable();
+ _properties = new JMSPropertyFieldTable();
}
}
public String toBodyString() throws JMSException
{
- return "MapSize:" + _map.getEncodedSize() + "\nMapData:\n" + _map.toString();
+ return _properties.toString();
}
public String getMimeType()
@@ -82,85 +97,43 @@
return MIME_TYPE;
}
- // MapMessage Interface
- public boolean getBoolean(String string) throws JMSException
+ public ByteBuffer getData()
{
- Boolean b = _map.getBoolean(string);
-
- if (b == null)
- {
- if (_map.containsKey(string))
- {
- Object str = _map.getObject(string);
+ //What if _data is null?
+ _properties.writeToBuffer(_data);
+ return super.getData();
+ }
- if (str == null || !(str instanceof String))
- {
- throw new MessageFormatException("getBoolean can't use " + string + " item.");
- }
- else
- {
- return Boolean.valueOf((String) str);
- }
- }
- else
- {
- b = Boolean.valueOf(null);
- }
- }
+ @Override
+ public void clearBodyImpl() throws JMSException
+ {
+ super.clearBodyImpl();
+ _properties.clear();
+ }
- return b;
+ public boolean getBoolean(String string) throws JMSException
+ {
+ return _properties.getBoolean(string);
}
public byte getByte(String string) throws JMSException
{
- Byte b = _map.getByte(string);
- if (b == null)
- {
- if (_map.containsKey(string))
- {
- Object str = _map.getObject(string);
-
- if (str == null || !(str instanceof String))
- {
- throw new MessageFormatException("getByte can't use " + string + " item.");
- }
- else
- {
- return Byte.valueOf((String) str);
- }
- }
- else
- {
- b = Byte.valueOf(null);
- }
- }
-
- return b;
+ return _properties.getByte(string);
}
public short getShort(String string) throws JMSException
{
- {
- Short s = _map.getShort(string);
-
- if (s == null)
- {
- s = Short.valueOf(getByte(string));
- }
-
- return s;
- }
+ return _properties.getShort(string);
}
public char getChar(String string) throws JMSException
{
-
- Character result = _map.getCharacter(string);
+ Character result = _properties.getCharacter(string);
if (result == null)
{
- throw new MessageFormatException("getChar couldn't find " + string + " item.");
+ throw new NullPointerException("getChar couldn't find " + string + " item.");
}
else
{
@@ -170,179 +143,97 @@
public int getInt(String string) throws JMSException
{
- Integer i = _map.getInteger(string);
-
- if (i == null)
- {
- i = Integer.valueOf(getShort(string));
- }
-
- return i;
+ return _properties.getInteger(string);
}
public long getLong(String string) throws JMSException
{
-
- Long l = _map.getLong(string);
-
- if (l == null)
- {
- l = Long.valueOf(getInt(string));
- }
-
- return l;
-
+ return _properties.getLong(string);
}
public float getFloat(String string) throws JMSException
{
-
- Float f = _map.getFloat(string);
-
- if (f == null)
- {
- if (_map.containsKey(string))
- {
- Object str = _map.getObject(string);
-
- if (str == null || !(str instanceof String))
- {
- throw new MessageFormatException("getFloat can't use " + string + " item.");
- }
- else
- {
- return Float.valueOf((String) str);
- }
- }
- else
- {
- f = Float.valueOf(null);
- }
-
- }
-
- return f;
-
+ return _properties.getFloat(string);
}
public double getDouble(String string) throws JMSException
{
- Double d = _map.getDouble(string);
-
- if (d == null)
- {
- d = Double.valueOf(getFloat(string));
- }
-
- return d;
+ return _properties.getDouble(string);
}
public String getString(String string) throws JMSException
{
- String s = _map.getString(string);
-
- if (s == null)
- {
- if (_map.containsKey(string))
- {
- Object o = _map.getObject(string);
- if (o instanceof byte[])
- {
- throw new MessageFormatException("getObject couldn't find " + string + " item.");
- }
- else
- {
- if (o == null)
- {
- return null;
- }
- else
- {
- s = String.valueOf(o);
- }
- }
- }
- }
-
- return s;
+ return _properties.getString(string);
}
public byte[] getBytes(String string) throws JMSException
{
-
- byte[] result = _map.getBytes(string);
-
- if (result == null)
- {
- throw new MessageFormatException("getBytes couldn't find " + string + " item.");
- }
-
- return result;
-
+ return _properties.getBytes(string);
}
public Object getObject(String string) throws JMSException
{
- return _map.getObject(string);
+ return _properties.getObject(string);
}
public Enumeration getMapNames() throws JMSException
{
- return _map.getPropertyNames();
+ return _properties.getMapNames();
}
+
public void setBoolean(String string, boolean b) throws JMSException
{
checkWritable();
- _map.setBoolean(string, b);
+ _properties.setBoolean(string, b);
}
public void setByte(String string, byte b) throws JMSException
{
checkWritable();
- _map.setByte(string, b);
+ _properties.setByte(string, b);
}
public void setShort(String string, short i) throws JMSException
{
checkWritable();
- _map.setShort(string, i);
+ _properties.setShort(string, i);
}
public void setChar(String string, char c) throws JMSException
{
checkWritable();
- _map.setChar(string, c);
+ _properties.setChar(string, c);
}
public void setInt(String string, int i) throws JMSException
{
checkWritable();
- _map.setInteger(string, i);
+ _properties.setInteger(string, i);
}
public void setLong(String string, long l) throws JMSException
{
checkWritable();
- _map.setLong(string, l);
+ _properties.setLong(string, l);
}
public void setFloat(String string, float v) throws JMSException
{
checkWritable();
- _map.setFloat(string, v);
+ _properties.setFloat(string, v);
}
public void setDouble(String string, double v) throws JMSException
{
checkWritable();
- _map.setDouble(string, v);
+ _properties.setDouble(string, v);
}
public void setString(String string, String string1) throws JMSException
{
checkWritable();
- _map.setString(string, string1);
+ _properties.setString(string, string1);
}
public void setBytes(String string, byte[] bytes) throws JMSException
@@ -353,25 +244,18 @@
public void setBytes(String string, byte[] bytes, int i, int i1) throws JMSException
{
checkWritable();
- _map.setBytes(string, bytes, i, i1);
+ _properties.setBytes(string, bytes, i, i1);
}
public void setObject(String string, Object object) throws JMSException
{
checkWritable();
- _map.setObject(string, object);
+ _properties.setObject(string, object);
}
public boolean itemExists(String string) throws JMSException
{
- return _map.itemExists(string);
- }
-
- public ByteBuffer getData()
- {
- //What if _data is null?
- _map.writeToBuffer(_data);
- return super.getData();
+ return _properties.itemExists(string);
}
}
Modified: incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java?view=diff&rev=489461&r1=489460&r2=489461
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java Thu Dec 21 13:08:38 2006
@@ -72,6 +72,7 @@
_data.release();
}
_data = null;
+
}
public String toBodyString() throws JMSException
@@ -97,6 +98,7 @@
{
_data.rewind();
}
+
try
{
ObjectOutputStream out = new ObjectOutputStream(_data.asOutputStream());
@@ -108,6 +110,7 @@
{
throw new MessageFormatException("Message not serializable: " + e);
}
+
}
public Serializable getObject() throws JMSException
@@ -120,15 +123,18 @@
try
{
+ _data.rewind();
in = new ObjectInputStream(_data.asInputStream());
return (Serializable) in.readObject();
}
catch (IOException e)
- {
- throw new MessageFormatException("Could not deserialize message: " + e);
+ {
+ e.printStackTrace();
+ throw new MessageFormatException("Could not deserialize message: " + e);
}
catch (ClassNotFoundException e)
{
+ e.printStackTrace();
throw new MessageFormatException("Could not deserialize message: " + e);
}
finally
Modified: incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java?view=diff&rev=489461&r1=489460&r2=489461
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java Thu Dec 21 13:08:38 2006
@@ -226,6 +226,10 @@
byte wireType = readWireType();
try
{
+ if(wireType == NULL_STRING_TYPE){
+ throw new NullPointerException();
+ }
+
if (wireType != CHAR_TYPE)
{
_data.position(position);
@@ -428,7 +432,7 @@
break;
case NULL_STRING_TYPE:
result = null;
- break;
+ throw new NullPointerException("data is null");
case BOOLEAN_TYPE:
checkAvailable(1);
result = String.valueOf(readBooleanImpl());
Modified: incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java?view=diff&rev=489461&r1=489460&r2=489461
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java Thu Dec 21 13:08:38 2006
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -104,6 +104,8 @@
frame2handlerMap.put(BasicDeliverBody.class, BasicDeliverMethodHandler.getInstance());
frame2handlerMap.put(BasicReturnBody.class, BasicReturnMethodHandler.getInstance());
frame2handlerMap.put(ChannelFlowOkBody.class, ChannelFlowOkMethodHandler.getInstance());
+ frame2handlerMap.put(QueueDeleteOkBody.class, QueueDeleteOkMethodHandler.getInstance());
+ frame2handlerMap.put(ExchangeBoundOkBody.class, ExchangeBoundOkMethodHandler.getInstance());
_state2HandlersMap.put(AMQState.CONNECTION_OPEN, frame2handlerMap);
}
Added: incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/transacted/Config.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/transacted/Config.java?view=auto&rev=489461
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/transacted/Config.java (added)
+++ incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/transacted/Config.java Thu Dec 21 13:08:38 2006
@@ -0,0 +1,110 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transacted;
+
+import org.apache.qpid.config.ConnectorConfig;
+import org.apache.qpid.config.AbstractConfig;
+import org.apache.qpid.config.Connector;
+
+import javax.jms.Connection;
+
+class Config extends AbstractConfig implements ConnectorConfig
+{
+ private String host = "localhost";
+ private int port = 5672;
+ private String factory;
+ private boolean echo;
+ private int batch = 100;
+ private boolean persistent = true;
+
+ Config(String[] argv)
+ {
+ setOptions(argv);
+ }
+
+ Connection createConnection() throws Exception
+ {
+ return new Connector().createConnection(this);
+ }
+
+ public boolean isEchoOn()
+ {
+ return echo;
+ }
+
+ public boolean usePersistentMessages()
+ {
+ return persistent;
+ }
+
+ public int getBatchSize()
+ {
+ return batch;
+ }
+
+ public String getHost()
+ {
+ return host;
+ }
+
+ public int getPort()
+ {
+ return port;
+ }
+
+ public String getFactory()
+ {
+ return factory;
+ }
+
+ public void setOption(String key, String value)
+ {
+ if("-host".equalsIgnoreCase(key))
+ {
+ host = value;
+ }
+ else if("-port".equalsIgnoreCase(key))
+ {
+ port = parseInt("Bad port number", value);
+ }
+ else if("-factory".equalsIgnoreCase(key))
+ {
+ factory = value;
+ }
+ else if("-echo".equalsIgnoreCase(key))
+ {
+ echo = "true".equalsIgnoreCase(value);
+ }
+ else if("-persistent".equalsIgnoreCase(key))
+ {
+ persistent = "true".equalsIgnoreCase(value);
+ }
+ else if("-batch".equalsIgnoreCase(key))
+ {
+ batch = parseInt("Bad batch size", value);
+ }
+ else
+ {
+ System.out.println("Ignoring nrecognised option " + key);
+ }
+ }
+
+}
Propchange: incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/transacted/Config.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/transacted/Ping.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/transacted/Ping.java?view=auto&rev=489461
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/transacted/Ping.java (added)
+++ incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/transacted/Ping.java Thu Dec 21 13:08:38 2006
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transacted;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQQueue;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import java.util.Arrays;
+
+public class Ping
+{
+ public static void main(String[] argv) throws Exception
+ {
+ Config config = new Config(argv);
+ Connection con = config.createConnection();
+ con.setClientID("ping");
+ new Relay(new AMQQueue("ping"), new AMQQueue("pong"), con,
+ config.isEchoOn(),
+ config.getBatchSize(),
+ config.usePersistentMessages()).start();
+ }
+}
Propchange: incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/transacted/Ping.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/transacted/Pong.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/transacted/Pong.java?view=auto&rev=489461
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/transacted/Pong.java (added)
+++ incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/transacted/Pong.java Thu Dec 21 13:08:38 2006
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transacted;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQQueue;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+
+public class Pong
+{
+ public static void main(String[] argv) throws Exception
+ {
+ Config config = new Config(argv);
+ Connection con = config.createConnection();
+ con.setClientID("pong");
+ new Relay(new AMQQueue("pong"), new AMQQueue("ping"), con,
+ config.isEchoOn(),
+ config.getBatchSize(),
+ config.usePersistentMessages()).start();
+
+ }
+}
Propchange: incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/transacted/Pong.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/transacted/Relay.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/transacted/Relay.java?view=auto&rev=489461
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/transacted/Relay.java (added)
+++ incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/transacted/Relay.java Thu Dec 21 13:08:38 2006
@@ -0,0 +1,127 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transacted;
+
+import org.apache.qpid.client.AMQSession;
+
+import javax.jms.MessageProducer;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import javax.jms.Destination;
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.TextMessage;
+import javax.jms.DeliveryMode;
+
+class Relay implements Runnable
+{
+ private final Connection _con;
+ private final Session _session;
+ private final MessageConsumer _src;
+ private final MessageProducer _dest;
+ private final int _batch;
+ private final boolean _echo;
+ private int _counter;
+ private long start;
+ private boolean _running;
+
+ Relay(Destination src, Destination dest, Connection con) throws JMSException
+ {
+ this(src, dest, con, false, 100, true);
+ }
+
+ Relay(Destination src, Destination dest, Connection con, boolean echo, int batch, boolean persistent) throws JMSException
+ {
+ _echo = echo;
+ _batch = batch;
+ _con = con;
+ _session = con.createSession(true, AMQSession.NO_ACKNOWLEDGE);
+ _src = _session.createConsumer(src);
+ _dest = _session.createProducer(dest);
+ _dest.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+ }
+
+ public void run()
+ {
+ start = System.currentTimeMillis();
+ try{
+ while(true) relay();
+ }
+ catch(JMSException e)
+ {
+ e.printStackTrace();
+ }
+ try
+ {
+ _session.close();
+ }
+ catch (JMSException e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ void relay() throws JMSException
+ {
+ _dest.send(relay(_src.receive()));
+ _session.commit();
+ }
+
+ Message relay(Message in) throws JMSException
+ {
+ if(!_running)
+ {
+ System.out.println(_con.getClientID() + " started.");
+ _running = true;
+ }
+ if(++_counter % _batch == 0)
+ {
+ long time = System.currentTimeMillis() - start;
+ System.out.println(_batch + " iterations performed in " + time + " ms");
+ try
+ {
+ Thread.sleep(100);
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace();
+ }
+ start = System.currentTimeMillis();
+ }
+ if(_echo)
+ {
+ System.out.println("Received: " + ((TextMessage) in).getText());
+ }
+ return _session.createTextMessage(_con.getClientID() + _counter);
+ }
+
+ void start() throws InterruptedException, JMSException
+ {
+ Thread runner = new Thread(this);
+ runner.start();
+ _con.start();
+ System.out.println(_con.getClientID() + " waiting...");
+ runner.join();
+ _con.close();
+ }
+}
Propchange: incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/transacted/Relay.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/transacted/Start.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/transacted/Start.java?view=auto&rev=489461
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/transacted/Start.java (added)
+++ incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/transacted/Start.java Thu Dec 21 13:08:38 2006
@@ -0,0 +1,42 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transacted;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQQueue;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Session;
+
+public class Start
+{
+ public static void main(String[] argv) throws Exception
+ {
+ Connection con = new Config(argv).createConnection();
+ AMQQueue ping = new AMQQueue("ping");
+ Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ session.createProducer(ping).send(session.createTextMessage("start"));
+ session.close();
+ con.close();
+ }
+}
Propchange: incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/transacted/Start.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/weblogic/ServiceProvider.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/weblogic/ServiceProvider.java?view=auto&rev=489461
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/weblogic/ServiceProvider.java (added)
+++ incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/weblogic/ServiceProvider.java Thu Dec 21 13:08:38 2006
@@ -0,0 +1,151 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.weblogic;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQDestination;
+
+import javax.jms.*;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+import javax.naming.Context;
+import java.net.InetAddress;
+import java.util.Hashtable;
+
+public class ServiceProvider
+{
+ private static final String JNDI_FACTORY = "weblogic.jndi.WLInitialContextFactory";
+ private static final String JMS_FACTORY = "transientJMSConnectionFactory";
+
+ private static final Logger _logger = Logger.getLogger(ServiceProvider.class);
+
+ private static MessageProducer _destinationProducer;
+
+ private static Queue _destinationQ;
+
+ public static void main(String[] args)
+ {
+ _logger.info("Starting...");
+
+ if (args.length != 2)
+ {
+ System.out.println("Usage: <WLS URI> <service queue>");
+ System.exit(1);
+ }
+ try
+ {
+ String url = args[0];
+ String receiveQueue = args[1];
+
+ final InitialContext ctx = getInitialContext(url);
+
+ QueueConnectionFactory qconFactory = (QueueConnectionFactory) ctx.lookup(JMS_FACTORY);
+ QueueConnection qcon = qconFactory.createQueueConnection();
+ final QueueSession qsession = qcon.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);
+ Queue receiveQ = (Queue) ctx.lookup(receiveQueue);
+
+ _logger.info("Service (queue) name is '" + receiveQ + "'...");
+
+ String selector = (args.length > 2 && args[2] != null && args[2].length() > 1) ? args[2] : null;
+
+ _logger.info("Message selector is <" + selector + ">...");
+
+ MessageConsumer consumer = qsession.createConsumer(receiveQ, selector);
+
+ consumer.setMessageListener(new MessageListener()
+ {
+ private int _messageCount;
+
+ public void onMessage(javax.jms.Message message)
+ {
+ //_logger.info("Got message '" + message + "'");
+
+ TextMessage tm = (TextMessage) message;
+
+ try
+ {
+ Queue responseQueue = (Queue)tm.getJMSReplyTo();
+ if (!responseQueue.equals(_destinationQ))
+ {
+ _destinationQ = responseQueue;
+ _logger.info("Creating destination for " + responseQueue);
+
+ try
+ {
+ _destinationProducer = qsession.createProducer(_destinationQ);
+ _destinationProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ }
+ catch (JMSException e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ _messageCount++;
+ if (_messageCount % 1000 == 0)
+ {
+ _logger.info("Received message total: " + _messageCount);
+ _logger.info("Sending response to '" + responseQueue + "'");
+ }
+
+ String payload = "This is a response: sing together: 'Mahnah mahnah...'" + tm.getText();
+ TextMessage msg = qsession.createTextMessage(payload);
+ if (tm.propertyExists("timeSent"))
+ {
+ _logger.info("timeSent property set on message");
+ final long timeSent = tm.getLongProperty("timeSent");
+ msg.setLongProperty("timeSent", timeSent);
+ _logger.info("time taken to go from service request to provider is: " + (System.currentTimeMillis() - timeSent));
+ }
+ _destinationProducer.send(msg);
+ if (_messageCount % 1000 == 0)
+ {
+ tm.acknowledge();
+ _logger.info("Sent response to '" + responseQueue + "'");
+ }
+ }
+ catch (JMSException e)
+ {
+ _logger.error("Error sending message: " + e, e);
+ }
+ }
+ });
+ qcon.start();
+ }
+ catch (Throwable t)
+ {
+ System.err.println("Fatal error: " + t);
+ t.printStackTrace();
+ }
+
+
+ System.out.println("Waiting...");
+ }
+
+ private static InitialContext getInitialContext(String url) throws NamingException
+ {
+ Hashtable env = new Hashtable();
+ env.put(Context.INITIAL_CONTEXT_FACTORY, JNDI_FACTORY);
+ env.put(Context.PROVIDER_URL, url);
+ return new InitialContext(env);
+ }
+}
Propchange: incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/weblogic/ServiceProvider.java
------------------------------------------------------------------------------
svn:eol-style = native