You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ai...@apache.org on 2008/02/11 13:11:49 UTC
svn commit: r620468 [3/4] - in /incubator/qpid/branches/thegreatmerge: ./
qpid/bin/ qpid/cpp/ qpid/cpp/examples/ qpid/cpp/examples/examples/direct/
qpid/cpp/examples/examples/fanout/ qpid/cpp/examples/examples/pub-sub/
qpid/cpp/examples/examples/reques...
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/bin/qpid.start
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/bin/qpid.start?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/bin/qpid.start (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/bin/qpid.start Mon Feb 11 04:11:03 2008
@@ -1,21 +1,21 @@
-#!/bin/bash
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-exec qpid-server -run:debug "$@"
\ No newline at end of file
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+qpid-server -run:debug "$@"
Propchange: incubator/qpid/branches/thegreatmerge/qpid/java/broker/bin/qpid.start
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/branches/thegreatmerge/qpid/java/broker/bin/qpid.start
------------------------------------------------------------------------------
svn:executable = *
Propchange: incubator/qpid/branches/thegreatmerge/qpid/java/broker/bin/qpid.stop
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/branches/thegreatmerge/qpid/java/broker/bin/qpid.stop
------------------------------------------------------------------------------
svn:executable = *
Propchange: incubator/qpid/branches/thegreatmerge/qpid/java/broker/bin/qpid.stopall
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/branches/thegreatmerge/qpid/java/broker/bin/qpid.stopall
------------------------------------------------------------------------------
svn:executable = *
Propchange: incubator/qpid/branches/thegreatmerge/qpid/java/broker/bin/runAll
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/branches/thegreatmerge/qpid/java/broker/bin/runAll
------------------------------------------------------------------------------
svn:executable = *
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/Producer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/Producer.java?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/Producer.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/Producer.java Mon Feb 11 04:11:03 2008
@@ -25,6 +25,8 @@
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
@@ -79,6 +81,14 @@
ConnectionFactory conFac = (ConnectionFactory)ctx.lookup("qpidConnectionfactory");
// create the connection
Connection connection = conFac.createConnection();
+
+ connection.setExceptionListener(new ExceptionListener()
+ {
+ public void onException(JMSException e)
+ {
+ e.printStackTrace();
+ }
+ });
// Create a session on the connection
// This session is a default choice of non-transacted and uses the auto acknowledge feature of a session.
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Mon Feb 11 04:11:03 2008
@@ -116,7 +116,7 @@
private String _virtualHost;
- private ExceptionListener _exceptionListener;
+ protected ExceptionListener _exceptionListener;
private ConnectionListener _connectionListener;
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java Mon Feb 11 04:11:03 2008
@@ -11,11 +11,13 @@
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.Session;
import org.apache.qpidity.nclient.Client;
+import org.apache.qpidity.nclient.ClosedListener;
+import org.apache.qpidity.ErrorCode;
import org.apache.qpidity.QpidException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate
+public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, ClosedListener
{
/**
* This class logger.
@@ -109,6 +111,7 @@
}
_qpidConnection.connect(brokerDetail.getHost(), brokerDetail.getPort(), _conn.getVirtualHost(),
_conn.getUsername(), _conn.getPassword());
+ _qpidConnection.setClosedListener(this);
}
catch (QpidException e)
{
@@ -137,5 +140,23 @@
throw new AMQException(AMQConstant.CHANNEL_ERROR, "cannot close connection", e);
}
+ }
+
+ public void onClosed(ErrorCode errorCode, String reason, Throwable t)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Received a connection close from the broker: Error code : " + errorCode.getCode());
+ }
+ if (_conn._exceptionListener != null)
+ {
+ JMSException ex = new JMSException(reason,String.valueOf(errorCode.getCode()));
+ if (t != null)
+ {
+ ex.initCause(t);
+ }
+
+ _conn._exceptionListener.onException(ex);
+ }
}
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Mon Feb 11 04:11:03 2008
@@ -20,6 +20,43 @@
*/
package org.apache.qpid.client;
+import java.io.Serializable;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.jms.BytesMessage;
+import javax.jms.Destination;
+import javax.jms.IllegalStateException;
+import javax.jms.InvalidDestinationException;
+import javax.jms.InvalidSelectorException;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.QueueReceiver;
+import javax.jms.QueueSender;
+import javax.jms.QueueSession;
+import javax.jms.StreamMessage;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicPublisher;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
+import javax.jms.TransactionRolledBackException;
+
import org.apache.qpid.AMQDisconnectedException;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInvalidArgumentException;
@@ -44,7 +81,6 @@
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.framing.*;
import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9;
-import org.apache.qpid.framing.amqp_8_0.MethodRegistry_8_0;
import org.apache.qpid.jms.Session;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
@@ -53,44 +89,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.jms.BytesMessage;
-import javax.jms.Destination;
-import javax.jms.IllegalStateException;
-import javax.jms.InvalidDestinationException;
-import javax.jms.InvalidSelectorException;
-import javax.jms.JMSException;
-import javax.jms.MapMessage;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.ObjectMessage;
-import javax.jms.Queue;
-import javax.jms.QueueBrowser;
-import javax.jms.QueueReceiver;
-import javax.jms.QueueSender;
-import javax.jms.QueueSession;
-import javax.jms.StreamMessage;
-import javax.jms.TemporaryQueue;
-import javax.jms.TemporaryTopic;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-import javax.jms.TopicPublisher;
-import javax.jms.TopicSession;
-import javax.jms.TopicSubscriber;
-import javax.jms.TransactionRolledBackException;
-import java.io.Serializable;
-import java.text.MessageFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
/**
*
* <p/><table id="crc"><caption>CRC Card</caption>
@@ -185,14 +183,14 @@
* keeps a record of subscriptions which have been created in the current instance. It does not remember
* subscriptions between executions of the client.
*/
- private final ConcurrentHashMap<String, TopicSubscriberAdaptor> _subscriptions =
+ protected final ConcurrentHashMap<String, TopicSubscriberAdaptor> _subscriptions =
new ConcurrentHashMap<String, TopicSubscriberAdaptor>();
/**
* Holds a mapping from message consumers to their identifying names, so that their subscriptions may be looked
* up in the {@link #_subscriptions} map.
*/
- private final ConcurrentHashMap<BasicMessageConsumer, String> _reverseSubscriptionMap =
+ protected final ConcurrentHashMap<BasicMessageConsumer, String> _reverseSubscriptionMap =
new ConcurrentHashMap<BasicMessageConsumer, String>();
/**
@@ -200,7 +198,7 @@
*
* @todo Weaken the type once {@link FlowControllingBlockingQueue} implements Queue.
*/
- private final FlowControllingBlockingQueue _queue;
+ protected final FlowControllingBlockingQueue _queue;
/**
* Holds the highest received delivery tag.
@@ -279,10 +277,10 @@
protected final boolean _immediatePrefetch;
/** Indicates that warnings should be generated on violations of the strict AMQP. */
- private final boolean _strictAMQP;
+ protected final boolean _strictAMQP;
/** Indicates that runtime exceptions should be generated on vilations of the strict AMQP. */
- private final boolean _strictAMQPFATAL;
+ protected final boolean _strictAMQPFATAL;
private final Object _messageDeliveryLock = new Object();
/** Session state : used to detect if commit is a) required b) allowed , i.e. does the tx span failover. */
@@ -518,8 +516,8 @@
if (_logger.isInfoEnabled())
{
StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
- _logger.info("Closing session: " + this + ":"
- + Arrays.asList(stackTrace).subList(3, stackTrace.length - 1));
+ _logger.info("Closing session: " + this); // + ":"
+ // + Arrays.asList(stackTrace).subList(3, stackTrace.length - 1));
}
synchronized (_connection.getFailoverMutex())
@@ -781,6 +779,14 @@
false, false);
}
+ public MessageConsumer createExclusiveConsumer(Destination destination) throws JMSException
+ {
+ checkValidDestination(destination);
+
+ return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, true, null, null,
+ false, false);
+ }
+
public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException
{
checkValidDestination(destination);
@@ -831,70 +837,7 @@
false);
}
- public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException
- {
-
- checkNotClosed();
- AMQTopic origTopic = checkValidTopic(topic);
- AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name, _connection);
- TopicSubscriberAdaptor subscriber = _subscriptions.get(name);
- if (subscriber != null)
- {
- if (subscriber.getTopic().equals(topic))
- {
- throw new IllegalStateException("Already subscribed to topic " + topic + " with subscription exchange "
- + name);
- }
- else
- {
- unsubscribe(name);
- }
- }
- else
- {
- AMQShortString topicName;
- if (topic instanceof AMQTopic)
- {
- topicName = ((AMQTopic) topic).getRoutingKey();
- }
- else
- {
- topicName = new AMQShortString(topic.getTopicName());
- }
-
- if (_strictAMQP)
- {
- if (_strictAMQPFATAL)
- {
- throw new UnsupportedOperationException("JMS Durable not currently supported by AMQP.");
- }
- else
- {
- _logger.warn("Unable to determine if subscription already exists for '" + topicName + "' "
- + "for creation durableSubscriber. Requesting queue deletion regardless.");
- }
-
- deleteQueue(dest.getAMQQueueName());
- }
- else
- {
- // if the queue is bound to the exchange but NOT for this topic, then the JMS spec
- // says we must trash the subscription.
- if (isQueueBound(dest.getExchangeName(), dest.getAMQQueueName())
- && !isQueueBound(dest.getExchangeName(), dest.getAMQQueueName(), topicName))
- {
- deleteQueue(dest.getAMQQueueName());
- }
- }
- }
-
- subscriber = new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest));
-
- _subscriptions.put(name, subscriber);
- _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
-
- return subscriber;
- }
+ public abstract TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException;
/** Note, currently this does not handle reuse of the same name with different topics correctly. */
public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal)
@@ -1387,7 +1330,7 @@
{
// in Qpid the 0-8 spec was hacked to have a recover-ok method... this is bad
// in 0-9 we used the cleaner addition of a new sync recover method with its own ok
- if(getProtocolVersion().equals(ProtocolVersion.v8_0))
+ if(getProtocolHandler().getProtocolVersion().equals(ProtocolVersion.v8_0))
{
BasicRecoverBody body = getMethodRegistry().createBasicRecoverBody(false);
_connection.getProtocolHandler().syncWrite(body.generateFrame(_channelId), BasicRecoverOkBody.class);
@@ -1985,7 +1928,7 @@
/*
* I could have combined the last 3 methods, but this way it improves readability
*/
- private AMQTopic checkValidTopic(Topic topic) throws JMSException
+ protected AMQTopic checkValidTopic(Topic topic) throws JMSException
{
if (topic == null)
{
@@ -2353,7 +2296,7 @@
*
* @todo Be aware of possible changes to parameter order as versions change.
*/
- private void deleteQueue(final AMQShortString queueName) throws JMSException
+ protected void deleteQueue(final AMQShortString queueName) throws JMSException
{
try
{
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Mon Feb 11 04:11:03 2008
@@ -27,6 +27,7 @@
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.FiledTableSupport;
+import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpidity.nclient.Session;
import org.apache.qpidity.nclient.util.MessagePartListenerAdapter;
import org.apache.qpidity.ErrorCode;
@@ -38,13 +39,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.jms.JMSException;
-import javax.jms.Destination;
-import javax.jms.TemporaryQueue;
+import javax.jms.*;
+import javax.jms.IllegalStateException;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.UUID;
import java.util.Map;
-import java.util.HashMap;
+import java.util.Iterator;
/**
* This is a 0.10 Session
@@ -146,6 +146,25 @@
//------- overwritten methods of class AMQSession
+ public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal)
+ throws JMSException
+ {
+ checkNotClosed();
+ checkValidTopic(topic);
+ if( _subscriptions.containsKey(name))
+ {
+ _subscriptions.get(name).close();
+ }
+ AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic) topic, name, _connection);
+ BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal);
+ TopicSubscriberAdaptor subscriber = new TopicSubscriberAdaptor(dest, consumer);
+
+ _subscriptions.put(name, subscriber);
+ _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
+
+ return subscriber;
+ }
+
/**
* Acknowledge one or many messages.
*
@@ -223,6 +242,25 @@
}
/**
+ * We need to release message that may be pre-fetched in the local queue
+ *
+ * @throws JMSException
+ */
+ public void close() throws JMSException
+ {
+ super.close();
+ // We need to release pre-fetched messages
+ Iterator messages=_queue.iterator();
+ while (messages.hasNext())
+ {
+ UnprocessedMessage message=(UnprocessedMessage) messages.next();
+ messages.remove();
+ rejectMessage(message, true);
+ }
+ }
+
+
+ /**
* Commit the receipt and the delivery of all messages exchanged by this session resources.
*/
public void sendCommit() throws AMQException, FailoverException
@@ -359,9 +397,17 @@
consumer.isNoLocal() ? Option.NO_LOCAL : Option.NO_OPTION,
consumer.isExclusive() ? Option.EXCLUSIVE : Option.NO_OPTION);
- getQpidSession().messageFlowMode(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_MODE_CREDIT);
+ getQpidSession().messageFlowMode(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_MODE_WINDOW);
getQpidSession().messageFlow(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF);
// We need to sync so that we get notify of an error.
+ if(consumer.isStrated())
+ {
+ // set the flow
+ getQpidSession().messageFlow(consumer.getConsumerTag().toString(),
+ org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE,
+ AMQSession_0_10.MAX_PREFETCH);
+
+ }
getQpidSession().sync();
getCurrentException();
}
@@ -462,11 +508,11 @@
//only set if msg list is null
try
{
- if (consumer.getMessageListener() != null)
- {
+ // if (consumer.getMessageListener() != null)
+ // {
getQpidSession().messageFlow(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_MESSAGE,
MAX_PREFETCH);
- }
+ // }
getQpidSession()
.messageFlow(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF);
}
@@ -546,7 +592,7 @@
*/
private class QpidSessionExceptionListener implements org.apache.qpidity.nclient.ClosedListener
{
- public void onClosed(ErrorCode errorCode, String reason)
+ public void onClosed(ErrorCode errorCode, String reason, Throwable t)
{
synchronized (this)
{
@@ -579,8 +625,7 @@
void start() throws AMQException
{
-
- super.suspendChannel(false);
+ suspendChannel(false);
for(BasicMessageConsumer c: _consumers.values())
{
c.start();
@@ -592,16 +637,19 @@
}
}
- void stop() throws AMQException
+
+
+
+ void stop() throws AMQException
{
super.stop();
- for(BasicMessageConsumer c: _consumers.values())
+ for(BasicMessageConsumer c: _consumers.values())
{
c.stop();
}
}
- synchronized void startDistpatcherIfNecessary()
+ synchronized void startDistpatcherIfNecessary()
{
// If IMMEDIATE_PREFETCH is not set then we need to start fetching
if (!_immediatePrefetch)
@@ -621,5 +669,72 @@
}
startDistpatcherIfNecessary(false);
+ }
+
+
+ public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException
+ {
+
+ checkNotClosed();
+ AMQTopic origTopic=checkValidTopic(topic);
+ AMQTopic dest=AMQTopic.createDurable010Topic(origTopic, name, _connection);
+
+ TopicSubscriberAdaptor subscriber=_subscriptions.get(name);
+ if (subscriber != null)
+ {
+ if (subscriber.getTopic().equals(topic))
+ {
+ throw new IllegalStateException("Already subscribed to topic " + topic + " with subscription exchange "
+ + name);
+ }
+ else
+ {
+ unsubscribe(name);
+ }
+ }
+ else
+ {
+ AMQShortString topicName;
+ if (topic instanceof AMQTopic)
+ {
+ topicName=((AMQTopic) topic).getRoutingKey();
+ }
+ else
+ {
+ topicName=new AMQShortString(topic.getTopicName());
+ }
+
+ if (_strictAMQP)
+ {
+ if (_strictAMQPFATAL)
+ {
+ throw new UnsupportedOperationException("JMS Durable not currently supported by AMQP.");
+ }
+ else
+ {
+ _logger.warn("Unable to determine if subscription already exists for '" + topicName + "' "
+ + "for creation durableSubscriber. Requesting queue deletion regardless.");
+ }
+
+ deleteQueue(dest.getAMQQueueName());
+ }
+ else
+ {
+ // if the queue is bound to the exchange but NOT for this topic, then the JMS spec
+ // says we must trash the subscription.
+ if (isQueueBound(dest.getExchangeName(), dest.getAMQQueueName())
+ && !isQueueBound(dest.getExchangeName(), dest.getAMQQueueName(), topicName))
+ {
+ deleteQueue(dest.getAMQQueueName());
+ }
+ }
+ }
+
+ subscriber=new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createExclusiveConsumer(dest));
+
+ _subscriptions.put(name, subscriber);
+ _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
+
+ return subscriber;
}
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java Mon Feb 11 04:11:03 2008
@@ -21,9 +21,8 @@
package org.apache.qpid.client;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.TemporaryQueue;
+import javax.jms.*;
+import javax.jms.IllegalStateException;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.failover.FailoverException;
@@ -132,7 +131,7 @@
{
// We can't use the BasicRecoverBody-OK method as it isn't part of the spec.
- BasicRecoverBody body = getMethodRegistry().createBasicRecoverBody(false);
+ BasicRecoverBody body = getProtocolHandler().getMethodRegistry().createBasicRecoverBody(false);
_connection.getProtocolHandler().writeFrame(body.generateFrame(_channelId));
_logger.warn("Session Recover cannot be guaranteed with STRICT_AMQP. Messages may arrive out of order.");
}
@@ -142,12 +141,12 @@
// in 0-9 we used the cleaner addition of a new sync recover method with its own ok
if(getProtocolHandler().getProtocolVersion().equals(ProtocolVersion.v8_0))
{
- BasicRecoverBody body = getMethodRegistry().createBasicRecoverBody(false);
+ BasicRecoverBody body = getProtocolHandler().getMethodRegistry().createBasicRecoverBody(false);
_connection.getProtocolHandler().syncWrite(body.generateFrame(_channelId), BasicRecoverOkBody.class);
}
else if(getProtocolHandler().getProtocolVersion().equals(ProtocolVersion.v0_9))
{
- BasicRecoverSyncBody body = ((MethodRegistry_0_9)getMethodRegistry()).createBasicRecoverSyncBody(false);
+ BasicRecoverSyncBody body = ((MethodRegistry_0_9)getProtocolHandler().getMethodRegistry()).createBasicRecoverSyncBody(false);
_connection.getProtocolHandler().syncWrite(body.generateFrame(_channelId), BasicRecoverSyncOkBody.class);
}
else
@@ -166,7 +165,7 @@
_logger.debug("Rejecting delivery tag:" + deliveryTag);
}
- AMQFrame basicRejectBody = getMethodRegistry().createBasicRejectBody(deliveryTag, requeue).generateFrame(_channelId);
+ AMQFrame basicRejectBody = getProtocolHandler().getMethodRegistry().createBasicRejectBody(deliveryTag, requeue).generateFrame(_channelId);
_connection.getProtocolHandler().writeFrame(basicRejectBody);
}
@@ -182,7 +181,7 @@
{
public AMQMethodEvent execute() throws AMQException, FailoverException
{
- AMQFrame boundFrame = getMethodRegistry().createExchangeBoundBody
+ AMQFrame boundFrame = getProtocolHandler().getMethodRegistry().createExchangeBoundBody
(exchangeName, routingKey, queueName).generateFrame(_channelId);
return getProtocolHandler().syncWrite(boundFrame, ExchangeBoundOkBody.class);
@@ -225,7 +224,7 @@
// we must register the consumer in the map before we actually start listening
_consumers.put(tag, consumer);
// TODO: Be aware of possible changes to parameter order as versions change.
- AMQFrame jmsConsume = getMethodRegistry().createBasicConsumeBody(getTicket(),
+ AMQFrame jmsConsume = getProtocolHandler().getMethodRegistry().createBasicConsumeBody(getTicket(),
queueName,
tag,
consumer.isNoLocal(),
@@ -247,7 +246,7 @@
public void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final AMQProtocolHandler protocolHandler,
final boolean nowait) throws AMQException, FailoverException
{
- AMQFrame exchangeDeclare = getMethodRegistry().createExchangeDeclareBody(getTicket(),name,type,false,false,false,false,nowait,null).
+ AMQFrame exchangeDeclare = getProtocolHandler().getMethodRegistry().createExchangeDeclareBody(getTicket(),name,type,false,false,false,false,nowait,null).
generateFrame(_channelId);
protocolHandler.syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class);
@@ -255,14 +254,14 @@
public void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler) throws AMQException, FailoverException
{
- AMQFrame queueDeclare = getMethodRegistry().createQueueDeclareBody(getTicket(),amqd.getAMQQueueName(),false,amqd.isDurable(),amqd.isExclusive(),amqd.isAutoDelete(),false,null).generateFrame(_channelId);
+ AMQFrame queueDeclare = getProtocolHandler().getMethodRegistry().createQueueDeclareBody(getTicket(),amqd.getAMQQueueName(),false,amqd.isDurable(),amqd.isExclusive(),amqd.isAutoDelete(),false,null).generateFrame(_channelId);
protocolHandler.syncWrite(queueDeclare, QueueDeclareOkBody.class);
}
public void sendQueueDelete(final AMQShortString queueName) throws AMQException, FailoverException
{
- QueueDeleteBody body = getMethodRegistry().createQueueDeleteBody(getTicket(),
+ QueueDeleteBody body = getProtocolHandler().getMethodRegistry().createQueueDeleteBody(getTicket(),
queueName,
false,
false,
@@ -311,4 +310,70 @@
return new AMQTemporaryQueue(this);
}
+
+ public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException
+ {
+
+ checkNotClosed();
+ AMQTopic origTopic = checkValidTopic(topic);
+ AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name, _connection);
+ TopicSubscriberAdaptor subscriber = _subscriptions.get(name);
+ if (subscriber != null)
+ {
+ if (subscriber.getTopic().equals(topic))
+ {
+ throw new IllegalStateException("Already subscribed to topic " + topic + " with subscription exchange "
+ + name);
+ }
+ else
+ {
+ unsubscribe(name);
+ }
+ }
+ else
+ {
+ AMQShortString topicName;
+ if (topic instanceof AMQTopic)
+ {
+ topicName = ((AMQTopic) topic).getRoutingKey();
+ }
+ else
+ {
+ topicName = new AMQShortString(topic.getTopicName());
+ }
+
+ if (_strictAMQP)
+ {
+ if (_strictAMQPFATAL)
+ {
+ throw new UnsupportedOperationException("JMS Durable not currently supported by AMQP.");
+ }
+ else
+ {
+ _logger.warn("Unable to determine if subscription already exists for '" + topicName + "' "
+ + "for creation durableSubscriber. Requesting queue deletion regardless.");
+ }
+
+ deleteQueue(dest.getAMQQueueName());
+ }
+ else
+ {
+ // if the queue is bound to the exchange but NOT for this topic, then the JMS spec
+ // says we must trash the subscription.
+ if (isQueueBound(dest.getExchangeName(), dest.getAMQQueueName())
+ && !isQueueBound(dest.getExchangeName(), dest.getAMQQueueName(), topicName))
+ {
+ deleteQueue(dest.getAMQQueueName());
+ }
+ }
+ }
+
+ subscriber = new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest));
+
+ _subscriptions.put(name, subscriber);
+ _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
+
+ return subscriber;
+ }
+
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java Mon Feb 11 04:11:03 2008
@@ -71,12 +71,26 @@
queueName, isDurable);
}
+ protected AMQTopic(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, boolean isExclusive,
+ boolean isAutoDelete, AMQShortString queueName, boolean isDurable)
+ {
+ super(exchangeName, exchangeClass, routingKey, isExclusive, isAutoDelete, queueName, isDurable );
+ }
+
+
public static AMQTopic createDurableTopic(AMQTopic topic, String subscriptionName, AMQConnection connection)
throws JMSException
{
return new AMQTopic(topic.getExchangeName(), topic.getRoutingKey(), false,
getDurableTopicQueueName(subscriptionName, connection),
true);
+ }
+
+ public static AMQTopic createDurable010Topic(AMQTopic topic, String subscriptionName, AMQConnection connection)
+ throws JMSException
+ {
+ return new AMQTopic(topic.getExchangeName(), ExchangeDefaults.TOPIC_EXCHANGE_CLASS, topic.getRoutingKey(), true, false,
+ getDurableTopicQueueName(subscriptionName, connection), false);
}
public static AMQShortString getDurableTopicQueueName(String subscriptionName, AMQConnection connection) throws JMSException
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Mon Feb 11 04:11:03 2008
@@ -85,7 +85,7 @@
protected MessageFactoryRegistry _messageFactory;
- private final AMQSession _session;
+ protected final AMQSession _session;
protected AMQProtocolHandler _protocolHandler;
@@ -434,7 +434,23 @@
}
}
- public abstract Object getMessageFromQueue(long l) throws InterruptedException;
+ public Object getMessageFromQueue(long l) throws InterruptedException
+ {
+ Object o;
+ if (l > 0)
+ {
+ o = _synchronousQueue.poll(l, TimeUnit.MILLISECONDS);
+ }
+ else if (l < 0)
+ {
+ o = _synchronousQueue.poll();
+ }
+ else
+ {
+ o = _synchronousQueue.take();
+ }
+ return o;
+ }
private boolean closeOnAutoClose() throws JMSException
{
@@ -1105,6 +1121,12 @@
public void stop()
{
// do nothing as this is a 0_10 feature
+ }
+
+ public boolean isStrated()
+ {
+ // do nothing as this is a 0_10 feature
+ return false;
}
public AMQShortString getQueuename()
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java Mon Feb 11 04:11:03 2008
@@ -19,10 +19,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.qpid.client.message.MessageFactoryRegistry;
-import org.apache.qpid.client.message.UnprocessedMessage;
-import org.apache.qpid.client.message.AbstractJMSMessage;
-import org.apache.qpid.client.message.UnprocessedMessage_0_10;
+import org.apache.qpid.client.message.*;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.AMQShortString;
@@ -41,8 +38,8 @@
import javax.jms.MessageListener;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.Iterator;
/**
* This is a 0.10 message consumer.
@@ -50,15 +47,6 @@
public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], ByteBuffer>
implements org.apache.qpidity.nclient.util.MessageListener
{
- /**
- * A counter for keeping the number of available messages for this consumer
- */
- private final AtomicLong _messageCounter = new AtomicLong(0);
-
- /**
- * Number of received message so far
- */
- private final AtomicLong _messagesReceived = new AtomicLong(0);
/**
* This class logger
@@ -117,6 +105,11 @@
// ----- Interface org.apache.qpidity.client.util.MessageListener
/**
+ *
+ * This is invoked by the session thread when emptying the session message queue.
+ * We first check if the message is valid (match the selector) and then deliver it to the
+ * message listener or to the sync consumer queue.
+ *
* @param jmsMessage this message has already been processed so can't redo preDeliver
* @param channelId
*/
@@ -136,12 +129,6 @@
}
catch (Exception e1)
{
- // the receiver may be waiting for a message
- if (_messageCounter.get() >= 0)
- {
- _messageCounter.decrementAndGet();
- _synchronousQueue.add(new NullTocken());
- }
// we should silently log thie exception as it only hanppens when the connection is closed
_logger.error("Exception when receiving message", e1);
}
@@ -152,20 +139,15 @@
}
}
+
+
+ /**
+ * This method is invoked by the transport layer when a message is delivered for this
+ * consumer. The message is transformed and pass to the session.
+ * @param message an 0.10 message
+ */
public void onMessage(Message message)
{
- if (isMessageListenerSet())
- {
- _messagesReceived.incrementAndGet();
- if (_messagesReceived.get() >= AMQSession_0_10.MAX_PREFETCH)
- {
- // require more credit
- _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
- org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE,
- AMQSession_0_10.MAX_PREFETCH);
- _messagesReceived.set(0);
- }
- }
int channelId = getSession().getChannelId();
long deliveryId = message.getMessageTransferId();
String consumerTag = getConsumerTag().toString();
@@ -207,8 +189,6 @@
newMessage.setReplyToURL(replyToUrl);
}
newMessage.setContentHeader(headers);
- // increase the counter of messages
- _messageCounter.incrementAndGet();
getSession().messageReceived(newMessage);
// else ignore this message
}
@@ -246,6 +226,8 @@
//{
super.postDeliver(msg);
//}
+
+
}
void notifyMessage(UnprocessedMessage messageFrame, int channelId)
@@ -351,50 +333,9 @@
}
messageOk = acquireMessage(message);
}
- if (!messageOk)
- {
- requestCreditIfCreditMode();
- }
return messageOk;
}
- private void requestCreditIfCreditMode()
- {
- try
- {
- // the current message received is not good, so we need to get a message.
- if (getMessageListener() == null)
- {
- int oldval = _messageCounter.intValue();
- _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
- org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE,
- 1);
- _0_10session.getQpidSession()
- .messageFlow(getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF);
- _0_10session.getQpidSession().messageFlush(getConsumerTag().toString());
- _0_10session.getQpidSession().sync();
- _0_10session.getQpidSession()
- .messageFlow(getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF);
- if (_messageCounter.intValue() <= oldval)
- {
- // we haven't received a message so tell the receiver to return null
- _synchronousQueue.add(new NullTocken());
- }
- else
- {
- _messageCounter.decrementAndGet();
- }
- }
- // we now need to check if we have received a message
-
- }
- catch (Exception e)
- {
- _logger.error(
- "Error getting message listener, couldn't request credit after releasing a message that failed the selector test",
- e);
- }
- }
/**
* Acknowledge a message
@@ -469,16 +410,18 @@
super.setMessageListener(messageListener);
if (messageListener == null)
{
- _0_10session.getQpidSession().messageStop(getConsumerTag().toString());
+ /* _0_10session.getQpidSession().messageStop(getConsumerTag().toString());
_0_10session.getQpidSession()
.messageFlowMode(getConsumerTag().toString(), Session.MESSAGE_FLOW_MODE_CREDIT);
_0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_BYTE,
0xFFFFFFFF);
_0_10session.getQpidSession().sync();
+ */
}
else
{
+ //TODO: empty the list of sync messages.
if (_connection.started())
{
_0_10session.getQpidSession()
@@ -490,66 +433,13 @@
org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_BYTE,
0xFFFFFFFF);
_0_10session.getQpidSession().sync();
- _messagesReceived.set(0);
- ;
- }
- }
- }
-
- public Object getMessageFromQueue(long l) throws InterruptedException
- {
- if (!_isStarted)
- {
- return null;
- }
- Object o;
- _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
- org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1);
-
- if (l == 0)
- {
- o = _synchronousQueue.take();
- }
- else
- {
- if (l > 0)
- {
- o = _synchronousQueue.poll(l, TimeUnit.MILLISECONDS);
- }
- else
- {
- o = _synchronousQueue.poll();
- }
- if (o == null)
- {
- _logger.debug("Message Didn't arrive in time, checking if one is inflight");
- // checking if one is inflight
- _0_10session.getQpidSession().messageFlush(getConsumerTag().toString());
- _0_10session.getQpidSession().sync();
- _0_10session.getQpidSession()
- .messageFlow(getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF);
- if (_messageCounter.get() > 0)
- {
- o = _synchronousQueue.take();
- }
}
}
- if (o instanceof NullTocken)
- {
- o = null;
- }
- return o;
}
- protected void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws JMSException
+ public boolean isStrated()
{
- _messageCounter.decrementAndGet();
- super.preApplicationProcessing(jmsMsg);
- }
-
- private class NullTocken
- {
-
+ return _isStarted;
}
public void start()
@@ -560,5 +450,18 @@
public void stop()
{
_isStarted = false;
+ }
+
+ public void close() throws JMSException
+ {
+ super.close();
+ // release message that may be staged
+ Iterator messages=_synchronousQueue.iterator();
+ while (messages.hasNext())
+ {
+ AbstractJMSMessage message=(AbstractJMSMessage) messages.next();
+ messages.remove();
+ _session.rejectMessage(message, true);
+ }
}
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java Mon Feb 11 04:11:03 2008
@@ -86,22 +86,5 @@
messageFrame.getRoutingKey(), messageFrame.getContentHeader(), messageFrame.getBodies());
}
-
- public Object getMessageFromQueue(long l) throws InterruptedException
- {
- Object o;
- if (l > 0)
- {
- o = _synchronousQueue.poll(l, TimeUnit.MILLISECONDS);
- }
- else if (l < 0)
- {
- o = _synchronousQueue.poll();
- }
- else
- {
- o = _synchronousQueue.take();
- }
- return o;
- }
+
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java Mon Feb 11 04:11:03 2008
@@ -739,4 +739,9 @@
_consumer = basicMessageConsumer;
}
+ public void receivedFromServer()
+ {
+ _changedData = false;
+ }
+
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java Mon Feb 11 04:11:03 2008
@@ -140,7 +140,9 @@
props.setType(mprop.getType());
props.setUserId(mprop.getUserId());
props.setHeaders(FiledTableSupport.convertToFieldTable(mprop.getApplicationHeaders()));
- return createMessage(messageNbr, data, exchange, routingKey, props);
+ AbstractJMSMessage message = createMessage(messageNbr, data, exchange, routingKey, props);
+ message.receivedFromServer();
+ return message;
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java Mon Feb 11 04:11:03 2008
@@ -49,11 +49,33 @@
ConnectionDelegate connectionDelegate = new ConnectionDelegate()
{
+ private boolean receivedClose = false;
+
public SessionDelegate getSessionDelegate()
{
return new ClientSessionDelegate();
}
+ public void exception(Throwable t)
+ {
+ if (_closedListner != null)
+ {
+ _closedListner.onClosed(ErrorCode.CONNECTION_ERROR,ErrorCode.CONNECTION_ERROR.getDesc(),t);
+ }
+ else
+ {
+ throw new RuntimeException("connection closed",t);
+ }
+ }
+
+ public void closed()
+ {
+ if (_closedListner != null && !this.receivedClose)
+ {
+ _closedListner.onClosed(ErrorCode.CONNECTION_ERROR,ErrorCode.CONNECTION_ERROR.getDesc(),null);
+ }
+ }
+
@Override public void connectionClose(Channel context, ConnectionClose connectionClose)
{
ErrorCode errorCode = ErrorCode.get(connectionClose.getReplyCode());
@@ -67,8 +89,10 @@
}
else
{
- _closedListner.onClosed(errorCode, connectionClose.getReplyText());
+ _closedListner.onClosed(errorCode, connectionClose.getReplyText(),null);
}
+
+ this.receivedClose = true;
}
};
@@ -79,6 +103,7 @@
if (System.getProperty("transport","mina").equalsIgnoreCase("nio"))
{
+ System.out.println("Using NIO");
if( _logger.isDebugEnabled())
{
_logger.debug("using NIO");
@@ -180,6 +205,7 @@
public void setClosedListener(ClosedListener closedListner)
{
+
_closedListner = closedListner;
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpidity/nclient/ClosedListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpidity/nclient/ClosedListener.java?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpidity/nclient/ClosedListener.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpidity/nclient/ClosedListener.java Mon Feb 11 04:11:03 2008
@@ -32,8 +32,8 @@
* informs the connection's ExceptionListener
* @param errorCode TODO
* @param reason TODO
- *
+ * @param t TODO
* @see Connection
*/
- public void onClosed(ErrorCode errorCode, String reason);
+ public void onClosed(ErrorCode errorCode, String reason, Throwable t);
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpidity/nclient/JMSTestCase.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpidity/nclient/JMSTestCase.java?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpidity/nclient/JMSTestCase.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpidity/nclient/JMSTestCase.java Mon Feb 11 04:11:03 2008
@@ -1,5 +1,7 @@
package org.apache.qpidity.nclient;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
@@ -16,17 +18,17 @@
try
{
- javax.jms.Connection con = new AMQConnection("qpid:password=guest;username=guest;client_id=clientid;virtualhost=test@tcp:127.0.0.1:5672");
+ javax.jms.Connection con = new AMQConnection("qpid:password=pass;username=name@tcp:localhost:5672");
con.start();
javax.jms.Session ssn = con.createSession(false, 1);
javax.jms.Destination dest = new AMQQueue(new AMQShortString("direct"),"test");
javax.jms.MessageConsumer cons = ssn.createConsumer(dest);
- javax.jms.MessageProducer prod = ssn.createProducer(dest);
+ //javax.jms.MessageProducer prod = ssn.createProducer(dest);
- //javax.jms.TextMessage m = (javax.jms.TextMessage)cons.receive();
- /* cons.setMessageListener(new MessageListener()
+ javax.jms.TextMessage m = null; // (javax.jms.TextMessage)cons.receive();
+ cons.setMessageListener(new MessageListener()
{
public void onMessage(Message m)
{
@@ -41,9 +43,25 @@
}
}
- });*/
+ });
- javax.jms.TextMessage msg = ssn.createTextMessage();
+ con.setExceptionListener(new ExceptionListener()
+ {
+ public void onException(JMSException e)
+ {
+ e.printStackTrace();
+ }
+ });
+
+ System.out.println("Waiting");
+ while (m == null)
+ {
+
+ }
+
+ System.out.println("Exiting");
+
+ /*javax.jms.TextMessage msg = ssn.createTextMessage();
msg.setText("This is a test message");
msg.setBooleanProperty("targetMessage", false);
prod.send(msg);
@@ -60,7 +78,7 @@
else
{
System.out.println("message is not null" + m);
- }
+ }*/
}
catch(Exception e)
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java Mon Feb 11 04:11:03 2008
@@ -189,7 +189,7 @@
void notifyException(QpidException ex)
{
- _exceptionListner.onClosed(null, null);
+ _exceptionListner.onClosed(null, null, null);
}
Map<String,MessagePartListener> getMessageListerners()
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/DemoClient.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/DemoClient.java?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/DemoClient.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/DemoClient.java Mon Feb 11 04:11:03 2008
@@ -40,7 +40,7 @@
Session ssn = conn.createSession(50000);
ssn.setClosedListener(new ClosedListener()
{
- public void onClosed(ErrorCode errorCode, String reason)
+ public void onClosed(ErrorCode errorCode, String reason, Throwable t)
{
System.out.println("ErrorCode : " + errorCode + " reason : " + reason);
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/LargeMsgDemoClient.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/LargeMsgDemoClient.java?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/LargeMsgDemoClient.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/LargeMsgDemoClient.java Mon Feb 11 04:11:03 2008
@@ -43,7 +43,7 @@
Session ssn = conn.createSession(50000);
ssn.setClosedListener(new ClosedListener()
{
- public void onClosed(ErrorCode errorCode, String reason)
+ public void onClosed(ErrorCode errorCode, String reason, Throwable t)
{
System.out.println("ErrorCode : " + errorCode + " reason : " + reason);
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpidity/nclient/interop/BasicInteropTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpidity/nclient/interop/BasicInteropTest.java?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpidity/nclient/interop/BasicInteropTest.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpidity/nclient/interop/BasicInteropTest.java Mon Feb 11 04:11:03 2008
@@ -124,7 +124,7 @@
session.sync();
}
- public void onClosed(ErrorCode errorCode, String reason)
+ public void onClosed(ErrorCode errorCode, String reason, Throwable t)
{
System.out.println("------- Broker Notified an error --------");
System.out.println("------- " + errorCode + " --------");
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpidity/njms/SessionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpidity/njms/SessionImpl.java?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpidity/njms/SessionImpl.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpidity/njms/SessionImpl.java Mon Feb 11 04:11:03 2008
@@ -1163,7 +1163,7 @@
*/
private class QpidSessionExceptionListener implements org.apache.qpidity.nclient.ClosedListener
{
- public void onClosed(ErrorCode errorCode, String reason)
+ public void onClosed(ErrorCode errorCode, String reason, Throwable t)
{
synchronized (this)
{
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionStartTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionStartTest.java?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionStartTest.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionStartTest.java Mon Feb 11 04:11:03 2008
@@ -82,8 +82,9 @@
protected void tearDown() throws Exception
{
_connection.close();
+ super.tearDown();
}
-
+
public void testSimpleReceiveConnection()
{
try
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Client.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Client.java?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Client.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Client.java Mon Feb 11 04:11:03 2008
@@ -98,7 +98,7 @@
if (_count < _expected)
{
- wait(1000000000);
+ wait(60000);
}
if (_count < _expected)
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java Mon Feb 11 04:11:03 2008
@@ -20,13 +20,12 @@
*/
package org.apache.qpid.test.unit.close;
-import junit.framework.TestCase;
-
import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.testutil.QpidClientConnection;
+import org.apache.qpid.testutil.QpidTestCase;
import org.apache.qpid.url.URLSyntaxException;
import org.slf4j.Logger;
@@ -41,7 +40,7 @@
import java.util.concurrent.atomic.AtomicInteger;
-public class MessageRequeueTest extends TestCase
+public class MessageRequeueTest extends QpidTestCase
{
private static final Logger _logger = LoggerFactory.getLogger(MessageRequeueTest.class);
@@ -64,7 +63,8 @@
protected void setUp() throws Exception
{
super.setUp();
- conn = new QpidClientConnection(BROKER);
+
+ conn = new QpidClientConnection(BROKER);
conn.connect();
// clear queue
@@ -78,7 +78,6 @@
protected void tearDown() throws Exception
{
- super.tearDown();
if (!passed) // clean up
{
@@ -91,6 +90,7 @@
conn.disconnect();
}
+ super.tearDown();
}
/**
@@ -125,7 +125,7 @@
if (messageLog[msgindex] != 0)
{
_logger.error("Received Message(" + msgindex + ":" + ((AbstractJMSMessage) msg).getDeliveryTag()
- + ") more than once.");
+ + ") more than once.");
}
if (_logger.isInfoEnabled())
@@ -144,16 +144,18 @@
msg = consumer.receive(1000);
}
- _logger.info("consuming done.");
+ _logger.info("consuming done.");
conn.getSession().commit();
consumer.close();
- assertEquals("number of consumed messages does not match initial data", (int) numTestMessages, messagesReceived);
int index = 0;
StringBuilder list = new StringBuilder();
list.append("Failed to receive:");
int failed = 0;
+ _logger.info("consumed: " + messagesReceived);
+
+ assertEquals("number of consumed messages does not match initial data", (int) numTestMessages, messagesReceived);
// wit 0_10 we can have a delivery tag of 0
if (conn.isBroker08())
{
@@ -174,7 +176,7 @@
assertEquals(list.toString(), 0, failed);
}
- _logger.info("consumed: " + messagesReceived);
+
conn.disconnect();
passed = true;
}
@@ -208,7 +210,7 @@
}
catch (InterruptedException e)
{
- fail("Uanble to join to Consumer theads");
+ fail("Unable to join to Consumer theads");
}
_logger.info("consumer 1 count is " + c1.getCount());
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java Mon Feb 11 04:11:03 2008
@@ -106,7 +106,7 @@
AMQTopic topic = new AMQTopic(con, "MyTopic1" + String.valueOf(shutdown));
AMQTopic topic2 = new AMQTopic(con, "MyOtherTopic1" + String.valueOf(shutdown));
- TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
+ TopicSession session1 = con.createTopicSession(false, AMQSession.AUTO_ACKNOWLEDGE);
TopicSubscriber sub = session1.createDurableSubscriber(topic, "subscription0");
TopicPublisher publisher = session1.createPublisher(null);
@@ -144,11 +144,11 @@
AMQConnection con1 = (AMQConnection) getConnection("guest", "guest");
AMQTopic topic = new AMQTopic(con1, "MyTopic3");
- TopicSession session1 = con1.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
+ TopicSession session1 = con1.createTopicSession(false, AMQSession.AUTO_ACKNOWLEDGE);
TopicPublisher publisher = session1.createPublisher(topic);
AMQConnection con2 = (AMQConnection) getConnection("guest", "guest");
- TopicSession session2 = con2.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
+ TopicSession session2 = con2.createTopicSession(false, AMQSession.AUTO_ACKNOWLEDGE);
TopicSubscriber sub = session2.createDurableSubscriber(topic, "subscription0");
con2.start();
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java Mon Feb 11 04:11:03 2008
@@ -467,6 +467,18 @@
}
result = _consumer.receive(1000);
+
+ if (isBroker08())
+ {
+ assertNotNull("test message was consumed and rolled back, but is gone", result);
+ // assertTrue("Messasge is not marked as redelivered" + result, result.getJMSRedelivered());
+ }
+ else
+ {
+ assertNull("test message was consumed and not rolled back, but is redelivered", result);
+ }
+
+ result = _consumer.receive(1000);
assertNull("test message should be null:" + result, result);
_session.commit();
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java Mon Feb 11 04:11:03 2008
@@ -89,7 +89,7 @@
prepCon = (AMQConnection) getConnection("guest", "guest");
_logger.info("Create prep session");
- prepSession = prepCon.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+ prepSession = prepCon.createSession(false, AMQSession.AUTO_ACKNOWLEDGE);
_logger.info("Create prep producer to Q1");
prepProducer1 = prepSession.createProducer(queue1);
@@ -100,7 +100,7 @@
_logger.info("Create test connection");
testCon = (AMQConnection) getConnection("guest", "guest");
_logger.info("Create test session");
- testSession = testCon.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+ testSession = testCon.createSession(false, AMQSession.AUTO_ACKNOWLEDGE);
_logger.info("Create test consumer of q2");
testConsumer2 = testSession.createConsumer(queue2);
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java Mon Feb 11 04:11:03 2008
@@ -81,11 +81,8 @@
String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'";
try
{
- AMQConnectionFactory factory = new AMQConnectionFactory(brokerUrl);
_logger.info("connecting to Qpid :" + brokerUrl);
- //connection = factory.createConnection();
- setUp();
- connection = getConnection("guest", "guest") ;
+ connection = getConnection("guest", "guest") ;
// register exception listener
connection.setExceptionListener(this);
@@ -112,7 +109,6 @@
connection.close();
connected = false;
_logger.info("disconnected");
- tearDown();
}
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidTestCase.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidTestCase.java?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidTestCase.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidTestCase.java Mon Feb 11 04:11:03 2008
@@ -21,8 +21,8 @@
import javax.jms.Connection;
import javax.naming.InitialContext;
-import java.io.BufferedReader;
-import java.io.InputStreamReader;
+import java.io.InputStream;
+import java.io.IOException;
import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.client.AMQConnection;
@@ -39,117 +39,101 @@
public class QpidTestCase extends TestCase
{
- /* this clas logger */
private static final Logger _logger = LoggerFactory.getLogger(QpidTestCase.class);
- /* Test properties */
- private static final String SHEL = "broker_shel";
- private static final String BROKER_PATH = "broker_path";
- private static final String BROKER_PARAM = "broker_param";
- private static final String BROKER_VERSION = "broker_version";
- public static final String BROKER_08 = "08";
- private static final String BROKER_VM = "vm";
- private static final String EXT_BROKER = "ext" ;
- /**
- * The process where the remote broker is running.
- */
- private Process _brokerProcess;
+ // system properties
+ private static final String BROKER = "broker";
+ private static final String BROKER_VERSION = "broker.version";
+
+ // values
+ private static final String VM = "vm";
+ private static final String EXTERNAL = "external";
+ private static final String VERSION_08 = "0-8";
+ private static final String VERSION_010 = "0-10";
- /* The test property values */
- // The default broker is an in-VM one
- private String _shel = BROKER_VM;
- private String _brokerPath = "";
- private String _brokerParams = "";
- private String _brokerVersion = "08" ;
+ private String _broker = System.getProperty(BROKER, VM);
+ private String _brokerVersion = System.getProperty(BROKER_VERSION, VERSION_08);
+
+ private Process _brokerProcess;
- /* The broker communication objects */
private InitialContext _initialContext;
private AMQConnectionFactory _connectionFactory;
- //--------- JUnit support
-
protected void setUp() throws Exception
{
super.setUp();
- // get the propeties if they are set
- if (System.getProperties().containsKey(BROKER_VERSION ))
- {
- _brokerVersion = System.getProperties().getProperty(BROKER_VERSION );
- }
- if (System.getProperties().containsKey(SHEL))
- {
- _shel = System.getProperties().getProperty(SHEL);
- }
- if (System.getProperties().containsKey(BROKER_PATH))
- {
- _brokerPath = System.getProperties().getProperty(BROKER_PATH);
- }
- if (System.getProperties().containsKey(BROKER_PARAM))
- {
- _brokerParams = System.getProperties().getProperty(BROKER_PARAM);
- }
- if (!_shel.equals(BROKER_VM) && ! _shel.equals(EXT_BROKER) )
- {
- // start a new broker
- startBroker();
- }
- else if ( ! _shel.equals(EXT_BROKER) )
- {
- // create an in_VM broker
- TransportConnection.createVMBroker(1);
- }
- _logger.info("=========================================");
- _logger.info("broker version " + _brokerVersion + " ==== " + _shel + " " + _brokerPath + " " + _brokerParams);
+ startBroker();
}
- /**
- * This method _is invoked after each test case.
- *
- * @throws Exception
- */
protected void tearDown() throws Exception
{
- killBroker();
- super.tearDown();
+ stopBroker();
+ super.tearDown();
}
- public void killBroker()
+ public void startBroker() throws Exception
{
- _logger.info("Kill broker");
- if (_brokerProcess != null)
+ if (_broker.equals(VM))
{
- // destroy the currently running broker
- _brokerProcess.destroy();
- _brokerProcess = null;
+ // create an in_VM broker
+ TransportConnection.createVMBroker(1);
}
- else if ( ! _shel.equals(EXT_BROKER))
+ else if (!_broker.equals(EXTERNAL))
{
- TransportConnection.killAllVMBrokers();
+ _logger.info("starting broker: " + _broker);
+ ProcessBuilder pb = new ProcessBuilder(_broker.split("\\s+"));
+ pb.redirectErrorStream(true);
+ _brokerProcess = pb.start();
+
+ new Thread()
+ {
+ private InputStream in = _brokerProcess.getInputStream();
+
+ public void run()
+ {
+ try
+ {
+ byte[] buf = new byte[4*1024];
+ int n;
+ while ((n = in.read(buf)) != -1)
+ {
+ System.out.write(buf, 0, n);
+ }
+ }
+ catch (IOException e)
+ {
+ _logger.info("redirector", e);
+ }
+ }
+ }.start();
+
+ Thread.sleep(1000);
+
+ try
+ {
+ int exit = _brokerProcess.exitValue();
+ throw new RuntimeException("broker aborted: " + exit);
+ }
+ catch (IllegalThreadStateException e)
+ {
+ // this is expect if the broker started succesfully
+ }
}
}
- //--------- Util method
-
- /**
- * This method starts a remote server by spawning an external process.
- *
- * @throws Exception If the broker cannot be started
- */
- public void startBroker() throws Exception
+ public void stopBroker() throws Exception
{
- _logger.info("Starting broker: " + _shel + " " + _brokerPath + " " + _brokerParams + "");
- Runtime rt = Runtime.getRuntime();
- _brokerProcess = rt.exec(_shel + " " + _brokerPath + " " + _brokerParams + "");
- BufferedReader reader = new BufferedReader(new InputStreamReader(_brokerProcess.getInputStream()));
- if (reader.ready())
+ _logger.info("stopping broker: " + _broker);
+ if (_brokerProcess != null)
{
- //bad, we had an error starting the broker
- throw new Exception("Problem when starting the broker: " + reader.readLine());
+ _brokerProcess.destroy();
+ _brokerProcess.waitFor();
+ _logger.info("broker exited: " + _brokerProcess.exitValue());
+ _brokerProcess = null;
}
- // We need to wait for th ebroker to start ideally we would need to ping it
- synchronized(this)
+ else if (_broker.equals(VM))
{
- this.wait(1000);
+ TransportConnection.killAllVMBrokers();
}
}
@@ -159,28 +143,18 @@
*/
public boolean isBroker08()
{
- return _brokerVersion.equals(BROKER_08);
+ return _brokerVersion.equals(VERSION_08);
}
- /**
- * Stop the currently running broker.
- */
- public void stopBroker()
+ public boolean isBroker010()
{
- _logger.info("Stopping broker");
- // stooping the broker
- if (_brokerProcess != null)
- {
- _brokerProcess.destroy();
- }
- _initialContext = null;
- _connectionFactory = null;
+ return _brokerVersion.equals(VERSION_010);
}
- public void shutdownServer() throws Exception
+ public void shutdownServer() throws Exception
{
- killBroker();
- setUp();
+ stopBroker();
+ startBroker();
}
/**
* we assume that the environment is correctly set
@@ -228,7 +202,7 @@
{
_logger.info("get Connection");
Connection con;
- if (_shel.equals(BROKER_VM))
+ if (_broker.equals(VM))
{
con = new AMQConnection("vm://:1", username, password, "Test", "test");
}
@@ -243,7 +217,7 @@
{
_logger.info("get Connection");
Connection con;
- if (_shel.equals(BROKER_VM))
+ if (_broker.equals(VM))
{
con = new AMQConnection("vm://:1", username, password, id, "test");
}
@@ -254,8 +228,4 @@
return con;
}
- public void testfoo()
- {
- //do nothing, just to avoid maven to report an error
- }
}
Propchange: incubator/qpid/branches/thegreatmerge/qpid/java/common/bin/qpid-run
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/branches/thegreatmerge/qpid/java/common/bin/qpid-run
------------------------------------------------------------------------------
svn:executable = *
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpidity/BrokerDetailsImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpidity/BrokerDetailsImpl.java?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpidity/BrokerDetailsImpl.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpidity/BrokerDetailsImpl.java Mon Feb 11 04:11:03 2008
@@ -116,7 +116,7 @@
{
if (getProtocol().equals(BrokerDetails.PROTOCOL_TCP))
{
- _port = 1234;
+ _port = 5672;
}
else if (getProtocol().equals(BrokerDetails.PROTOCOL_TLS))
{