You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2007/02/14 21:02:22 UTC
svn commit: r507672 [8/16] - in /incubator/qpid/branches/qpid.0-9:
gentools/src/org/apache/qpid/gentools/ gentools/templ.java/
gentools/templ.net/ java/ java/broker/ java/broker/bin/
java/broker/distribution/ java/broker/distribution/src/ java/broker/d...
Modified: incubator/qpid/branches/qpid.0-9/java/client/pom.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/pom.xml?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/pom.xml (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/pom.xml Wed Feb 14 12:02:03 2007
@@ -35,40 +35,45 @@
<properties>
<topDirectoryLocation>..</topDirectoryLocation>
+ <java.source.version>1.5</java.source.version>
+ <qpid.version>${pom.version}</qpid.version>
+ <qpid.targetDir>${project.build.directory}</qpid.targetDir>
</properties>
<dependencies>
+
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-common</artifactId>
</dependency>
- <dependency>
- <groupId>org.apache.qpid</groupId>
- <artifactId>qpid-broker</artifactId>
- </dependency>
-
- <dependency>
- <groupId>commons-codec</groupId>
- <artifactId>commons-codec</artifactId>
- </dependency>
<dependency>
<groupId>org.apache.geronimo.specs</groupId>
<artifactId>geronimo-jms_1.1_spec</artifactId>
</dependency>
+
<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
</dependency>
+
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
</dependency>
+
<dependency>
<groupId>org.apache.mina</groupId>
<artifactId>mina-filter-ssl</artifactId>
</dependency>
+ <!-- Test Dependencies -->
+ <dependency> <!-- for inVm Broker -->
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-broker</artifactId>
+ <scope>test</scope>
+ </dependency>
+
<dependency>
<groupId>jmscts</groupId>
<artifactId>jmscts</artifactId>
@@ -85,12 +90,14 @@
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
+ <scope>test</scope>
</dependency>
+
<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymockclassextension</artifactId>
+ <scope>test</scope>
</dependency>
-
</dependencies>
<build>
@@ -115,11 +122,67 @@
</property>
<property>
<name>log4j.configuration</name>
- <value>file:///${basedir}/src/main/java/log4j.properties</value>
+ <value>file:///${basedir}/src/main/java/client.log4j</value>
</property>
</systemProperties>
</configuration>
</plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
</plugins>
+
+<!-- The inclusion of this resource causes the build to hang. -->
+ <!--resources>
+ <resource>
+ <targetPath>META-INF/</targetPath>
+ <filtering>false</filtering>
+ <directory>../resources/META-INF</directory>
+ <includes>
+ <include>**</include>
+ </includes>
+ </resource>
+ </resources-->
+
+ <testResources>
+ <testResource>
+ <targetPath>META-INF/</targetPath>
+ <filtering>false</filtering>
+ <directory>../resources/META-INF</directory>
+ <includes>
+ <include>**</include>
+ </includes>
+ </testResource>
+ <testResource>
+ <targetPath>src/</targetPath>
+ <filtering>false</filtering>
+ <directory>src/test/java</directory>
+ <includes>
+ <include>**/*.java</include>
+ </includes>
+ </testResource>
+
+ <testResource>
+ <targetPath></targetPath>
+ <filtering>false</filtering>
+ <directory>src/main/java</directory>
+ <includes>
+ <include>client.log4j</include>
+ </includes>
+ </testResource>
+ </testResources>
+
</build>
+
</project>
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java Wed Feb 14 12:02:03 2007
@@ -308,7 +308,7 @@
}
}
- //remove the extra DEFAULT_OPTION_SEPERATOR or the '?' if there are no options
+ //removeKey the extra DEFAULT_OPTION_SEPERATOR or the '?' if there are no options
optionsURL.deleteCharAt(optionsURL.length() - 1);
return optionsURL.toString();
@@ -334,4 +334,15 @@
}
+ public static String checkTransport(String broker)
+ {
+ if ((!broker.contains("://")))
+ {
+ return "tcp://" + broker;
+ }
+ else
+ {
+ return broker;
+ }
+ }
}
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Wed Feb 14 12:02:03 2007
@@ -30,6 +30,7 @@
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import javax.jms.ConnectionConsumer;
import javax.jms.ConnectionMetaData;
@@ -156,21 +157,30 @@
// Keeps a tally of connections for logging and debugging
- private static AtomicInteger _ConnectionId;
- static { _ConnectionId = new AtomicInteger(0); }
+ private static AtomicLong _ConnectionId;
+ static { _ConnectionId = new AtomicLong(0); }
/*
* The connection meta data
*/
private QpidConnectionMetaData _connectionMetaData;
+ /**
+ * @param broker brokerdetails
+ * @param username username
+ * @param password password
+ * @param clientName clientid
+ * @param virtualHost virtualhost
+ * @throws AMQException
+ * @throws URLSyntaxException
+ */
public AMQConnection(String broker, String username, String password,
String clientName, String virtualHost) throws AMQException, URLSyntaxException
{
this(new AMQConnectionURL(ConnectionURL.AMQ_PROTOCOL + "://" +
username + ":" + password + "@" +
- (clientName==null?"":clientName) +
- virtualHost + "?brokerlist='" + broker + "'"));
+ (clientName == null ? "" : clientName) + "/" +
+ virtualHost + "?brokerlist='" + AMQBrokerDetails.checkTransport(broker) + "'"));
}
public AMQConnection(String host, int port, String username, String password,
@@ -185,12 +195,12 @@
this(new AMQConnectionURL(useSSL ?
ConnectionURL.AMQ_PROTOCOL + "://" +
username + ":" + password + "@" +
- (clientName==null?"":clientName) +
+ (clientName == null ? "" : clientName) +
virtualHost + "?brokerlist='tcp://" + host + ":" + port + "'"
+ "," + ConnectionURL.OPTIONS_SSL + "='true'" :
ConnectionURL.AMQ_PROTOCOL + "://" +
username + ":" + password + "@" +
- (clientName==null?"":clientName) +
+ (clientName == null ? "" : clientName) +
virtualHost + "?brokerlist='tcp://" + host + ":" + port + "'"
+ "," + ConnectionURL.OPTIONS_SSL + "='false'"
));
@@ -215,7 +225,7 @@
_clientName = connectionURL.getClientName();
_username = connectionURL.getUsername();
_password = connectionURL.getPassword();
- _virtualHost = connectionURL.getVirtualHost();
+ setVirtualHost(connectionURL.getVirtualHost());
_failoverPolicy = new FailoverPolicy(connectionURL);
@@ -326,6 +336,15 @@
_clientName = clientName;
_username = username;
_password = password;
+ setVirtualHost(virtualHost);
+ }
+
+ private void setVirtualHost(String virtualHost)
+ {
+ if(virtualHost.startsWith("/"))
+ {
+ virtualHost = virtualHost.substring(1);
+ }
_virtualHost = virtualHost;
}
@@ -410,6 +429,12 @@
return _failoverPolicy.failoverAllowed();
}
+ // Test purposes only - used for testing refs, which cannot be done using JMS interfaces
+ public AMQSession createAMQSession(final boolean transacted, final int acknowledgeMode) throws JMSException
+ {
+ return (AMQSession)createSession(transacted, acknowledgeMode, AMQSession.DEFAULT_PREFETCH_HIGH_MARK);
+ }
+
public Session createSession(final boolean transacted, final int acknowledgeMode) throws JMSException
{
return createSession(transacted, acknowledgeMode, AMQSession.DEFAULT_PREFETCH_HIGH_MARK);
@@ -484,20 +509,21 @@
private void createChannelOverWire(int channelId, int prefetchHigh, int prefetchLow, boolean transacted)
throws AMQException
{
- // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
_protocolHandler.syncWrite(channelId,
- ChannelOpenBody.createMethodBody((byte)0, (byte)9, // AMQP version (major, minor)
+ ChannelOpenBody.createMethodBody(
+ _protocolHandler.getProtocolMajorVersion(), // AMQP major version
+ _protocolHandler.getProtocolMinorVersion(), // AMQP minor version
null), // outOfBand
ChannelOpenOkBody.class);
//todo send low water mark when protocol allows.
- // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+
// Be aware of possible changes to parameter order as versions change.
_protocolHandler.syncWrite(channelId,
- MessageQosBody.createMethodBody((byte)0, (byte)9, // AMQP version (major, minor)
+ MessageQosBody.createMethodBody(
+ _protocolHandler.getProtocolMajorVersion(), // AMQP major version
+ _protocolHandler.getProtocolMinorVersion(), // AMQP minor version
false, // global
prefetchHigh, // prefetchCount
0), // prefetchSize
@@ -509,10 +535,11 @@
{
_logger.debug("Issuing TxSelect for " + channelId);
}
- // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- _protocolHandler.syncWrite(channelId, TxSelectBody.createMethodBody((byte)0, (byte)9), TxSelectOkBody.class);
+ _protocolHandler.syncWrite(channelId, TxSelectBody.createMethodBody(
+ _protocolHandler.getProtocolMajorVersion(), // AMQP major version
+ _protocolHandler.getProtocolMinorVersion()), // AMQP minor version
+ TxSelectOkBody.class);
}
}
@@ -544,6 +571,7 @@
/**
* Returns an AMQQueueSessionAdaptor which wraps an AMQSession and throws IllegalStateExceptions
* where specified in the JMS spec
+ *
* @param transacted
* @param acknowledgeMode
* @return QueueSession
@@ -557,6 +585,7 @@
/**
* Returns an AMQTopicSessionAdapter which wraps an AMQSession and throws IllegalStateExceptions
* where specified in the JMS spec
+ *
* @param transacted
* @param acknowledgeMode
* @return TopicSession
@@ -591,7 +620,7 @@
{
checkNotClosed();
return _connectionMetaData;
-
+
}
public ExceptionListener getExceptionListener() throws JMSException
@@ -642,13 +671,18 @@
public void close() throws JMSException
{
- synchronized(getFailoverMutex())
+ close(-1);
+ }
+
+ public void close(long timeout) throws JMSException
+ {
+ synchronized (getFailoverMutex())
{
if (!_closed.getAndSet(true))
{
try
{
- closeAllSessions(null);
+ closeAllSessions(null, timeout);
_protocolHandler.closeConnection();
}
catch (AMQException e)
@@ -686,7 +720,7 @@
* <p/>
* The caller must hold the failover mutex before calling this method.
*/
- private void closeAllSessions(Throwable cause) throws JMSException
+ private void closeAllSessions(Throwable cause, long timeout) throws JMSException
{
final LinkedList sessionCopy = new LinkedList(_sessions.values());
final Iterator it = sessionCopy.iterator();
@@ -702,7 +736,7 @@
{
try
{
- session.close();
+ session.close(timeout);
}
catch (JMSException e)
{
@@ -920,7 +954,7 @@
{
if (cause instanceof AMQException)
{
- je = new JMSException(Integer.toString(((AMQException)cause).getErrorCode()) ,"Exception thrown against " + toString() + ": " + cause);
+ je = new JMSException(Integer.toString(((AMQException) cause).getErrorCode()), "Exception thrown against " + toString() + ": " + cause);
}
else
{
@@ -951,7 +985,7 @@
{
_logger.info("Closing AMQConnection due to :" + cause.getMessage());
_closed.set(true);
- closeAllSessions(cause); // FIXME: when doing this end up with RejectedExecutionException from executor.
+ closeAllSessions(cause, -1); // FIXME: when doing this end up with RejectedExecutionException from executor.
}
catch (JMSException e)
{
@@ -973,8 +1007,8 @@
void deregisterSession(int channelId)
{
_sessions.remove(channelId);
- }
-
+ }
+
/**
* For all sessions, and for all consumers in those sessions, resubscribe. This is called during failover handling.
* The caller must hold the failover mutex before calling this method.
@@ -982,7 +1016,7 @@
public void resubscribeSessions() throws JMSException, AMQException
{
ArrayList sessions = new ArrayList(_sessions.values());
- _logger.info(MessageFormat.format("Resubscribing sessions = {0} sessions.size={1}", sessions, sessions.size())); // FIXME: remove?
+ _logger.info(MessageFormat.format("Resubscribing sessions = {0} sessions.size={1}", sessions, sessions.size())); // FIXME: removeKey?
for (Iterator it = sessions.iterator(); it.hasNext();)
{
AMQSession s = (AMQSession) it.next();
@@ -1025,7 +1059,7 @@
null); // factory location
}
- public int getConnectionId()
+ public long getConnectionId()
{
return _ConnectionId.get();
}
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java Wed Feb 14 12:02:03 2007
@@ -25,6 +25,7 @@
import org.apache.qpid.url.URLSyntaxException;
import org.apache.qpid.url.URLHelper;
import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
import javax.naming.Reference;
import javax.naming.NamingException;
@@ -35,19 +36,31 @@
public abstract class AMQDestination implements Destination, Referenceable
{
- protected final String _exchangeName;
+ protected final AMQShortString _exchangeName;
- protected final String _exchangeClass;
+ protected final AMQShortString _exchangeClass;
- protected final String _destinationName;
+ protected final AMQShortString _destinationName;
- protected boolean _isDurable;
+ protected final boolean _isDurable;
protected final boolean _isExclusive;
protected final boolean _isAutoDelete;
- protected String _queueName;
+ private AMQShortString _queueName;
+
+ private String _url;
+ private AMQShortString _urlAsShortString;
+
+ private byte[] _byteEncoding;
+ private static final int IS_DURABLE_MASK = 0x1;
+ private static final int IS_EXCLUSIVE_MASK = 0x2;
+ private static final int IS_AUTODELETE_MASK = 0x4;
+
+ public static final byte QUEUE_TYPE = 1;
+ public static final byte TOPIC_TYPE = 2;
+ public static final byte UNKNOWN_TYPE = 3;
protected AMQDestination(String url) throws URLSyntaxException
{
@@ -63,27 +76,27 @@
_isExclusive = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_EXCLUSIVE));
_isAutoDelete = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_AUTODELETE));
_isDurable = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_DURABLE));
- _queueName = binding.getQueueName();
+ _queueName = new AMQShortString(binding.getQueueName());
}
- protected AMQDestination(String exchangeName, String exchangeClass, String destinationName, String queueName)
+ protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString destinationName, AMQShortString queueName)
{
this(exchangeName, exchangeClass, destinationName, false, false, queueName);
}
- protected AMQDestination(String exchangeName, String exchangeClass, String destinationName)
+ protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString destinationName)
{
this(exchangeName, exchangeClass, destinationName, false, false, null);
}
- protected AMQDestination(String exchangeName, String exchangeClass, String destinationName, boolean isExclusive,
- boolean isAutoDelete, String queueName)
+ protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString destinationName, boolean isExclusive,
+ boolean isAutoDelete, AMQShortString queueName)
{
this(exchangeName, exchangeClass, destinationName, isExclusive, isAutoDelete, queueName, false);
}
- protected AMQDestination(String exchangeName, String exchangeClass, String destinationName, boolean isExclusive,
- boolean isAutoDelete, String queueName, boolean isDurable)
+ protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString destinationName, boolean isExclusive,
+ boolean isAutoDelete, AMQShortString queueName, boolean isDurable)
{
if (destinationName == null)
{
@@ -106,9 +119,13 @@
_isDurable = isDurable;
}
- public String getEncodedName()
+ public AMQShortString getEncodedName()
{
- return toURL();
+ if(_urlAsShortString == null)
+ {
+ toURL();
+ }
+ return _urlAsShortString;
}
public boolean isDurable()
@@ -116,12 +133,12 @@
return _isDurable;
}
- public String getExchangeName()
+ public AMQShortString getExchangeName()
{
return _exchangeName;
}
- public String getExchangeClass()
+ public AMQShortString getExchangeClass()
{
return _exchangeClass;
}
@@ -136,22 +153,34 @@
return ExchangeDefaults.DIRECT_EXCHANGE_NAME.equals(_exchangeName);
}
- public String getDestinationName()
+ public AMQShortString getDestinationName()
{
return _destinationName;
}
public String getQueueName()
{
+ return _queueName == null ? null : _queueName.toString();
+ }
+
+ public AMQShortString getAMQQueueName()
+ {
return _queueName;
}
- public void setQueueName(String queueName)
+
+
+ public void setQueueName(AMQShortString queueName)
{
+
_queueName = queueName;
+ // calculated URL now out of date
+ _url = null;
+ _urlAsShortString = null;
+ _byteEncoding = null;
}
- public abstract String getRoutingKey();
+ public abstract AMQShortString getRoutingKey();
public boolean isExclusive()
{
@@ -179,53 +208,114 @@
public String toURL()
{
- StringBuffer sb = new StringBuffer();
-
- sb.append(_exchangeClass);
- sb.append("://");
- sb.append(_exchangeName);
-
- sb.append("/");
-
- if (_destinationName != null)
+ String url = _url;
+ if(url == null)
{
- sb.append(_destinationName);
- }
- sb.append("/");
- if (_queueName != null)
- {
- sb.append(_queueName);
- }
+ StringBuffer sb = new StringBuffer();
- sb.append("?");
+ sb.append(_exchangeClass);
+ sb.append("://");
+ sb.append(_exchangeName);
+
+ sb.append('/');
+
+ if (_destinationName != null)
+ {
+ sb.append(_destinationName);
+ }
+
+ sb.append('/');
+
+ if (_queueName != null)
+ {
+ sb.append(_queueName);
+ }
+
+ sb.append('?');
+
+ if (_isDurable)
+ {
+ sb.append(BindingURL.OPTION_DURABLE);
+ sb.append("='true'");
+ sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR);
+ }
+
+ if (_isExclusive)
+ {
+ sb.append(BindingURL.OPTION_EXCLUSIVE);
+ sb.append("='true'");
+ sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR);
+ }
+
+ if (_isAutoDelete)
+ {
+ sb.append(BindingURL.OPTION_AUTODELETE);
+ sb.append("='true'");
+ sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR);
+ }
+
+ //removeKey the last char '?' if there is no options , ',' if there are.
+ sb.deleteCharAt(sb.length() - 1);
+ url = sb.toString();
+ _url = url;
+ _urlAsShortString = new AMQShortString(url);
+ }
+ return url;
+ }
+
+ public byte[] toByteEncoding()
+ {
+ byte[] encoding = _byteEncoding;
+ if(encoding == null)
+ {
+ int size = _exchangeClass.length() + 1 +
+ _exchangeName.length() + 1 +
+ (_destinationName == null ? 0 : _destinationName.length()) + 1 +
+ (_queueName == null ? 0 : _queueName.length()) + 1 +
+ 1;
+ encoding = new byte[size];
+ int pos = 0;
+
+ pos = _exchangeClass.writeToByteArray(encoding, pos);
+ pos = _exchangeName.writeToByteArray(encoding, pos);
+ if(_destinationName == null)
+ {
+ encoding[pos++] = (byte)0;
+ }
+ else
+ {
+ pos = _destinationName.writeToByteArray(encoding,pos);
+ }
+ if(_queueName == null)
+ {
+ encoding[pos++] = (byte)0;
+ }
+ else
+ {
+ pos = _queueName.writeToByteArray(encoding,pos);
+ }
+ byte options = 0;
+ if(_isDurable)
+ {
+ options |= IS_DURABLE_MASK;
+ }
+ if(_isExclusive)
+ {
+ options |= IS_EXCLUSIVE_MASK;
+ }
+ if(_isAutoDelete)
+ {
+ options |= IS_AUTODELETE_MASK;
+ }
+ encoding[pos] = options;
- if (_isDurable)
- {
- sb.append(BindingURL.OPTION_DURABLE);
- sb.append("='true'");
- sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR);
- }
- if (_isExclusive)
- {
- sb.append(BindingURL.OPTION_EXCLUSIVE);
- sb.append("='true'");
- sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR);
- }
+ _byteEncoding = encoding;
- if (_isAutoDelete)
- {
- sb.append(BindingURL.OPTION_AUTODELETE);
- sb.append("='true'");
- sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR);
}
-
- //remove the last char '?' if there is no options , ',' if there are.
- sb.deleteCharAt(sb.length() - 1);
-
- return sb.toString();
+ return encoding;
}
public boolean equals(Object o)
@@ -258,7 +348,7 @@
{
return false;
}
- if (_isExclusive != that._isExclusive)
+ /* if (_isExclusive != that._isExclusive)
{
return false;
}
@@ -266,6 +356,7 @@
{
return false;
}
+ */
return true;
}
@@ -279,8 +370,8 @@
{
result = 29 * result + _queueName.hashCode();
}
- result = result * (_isExclusive ? 13 : 7);
- result = result * (_isAutoDelete ? 13 : 7);
+// result = result * (_isExclusive ? 13 : 7);
+// result = result * (_isAutoDelete ? 13 : 7);
return result;
}
@@ -293,9 +384,55 @@
null); // factory location
}
+
+ public static Destination createDestination(byte[] byteEncodedDestination)
+ {
+ AMQShortString exchangeClass;
+ AMQShortString exchangeName;
+ AMQShortString destinationName;
+ AMQShortString queueName;
+ boolean isDurable;
+ boolean isExclusive;
+ boolean isAutoDelete;
+
+ int pos = 0;
+ exchangeClass = AMQShortString.readFromByteArray(byteEncodedDestination, pos);
+ pos+= exchangeClass.length() + 1;
+ exchangeName = AMQShortString.readFromByteArray(byteEncodedDestination, pos);
+ pos+= exchangeName.length() + 1;
+ destinationName = AMQShortString.readFromByteArray(byteEncodedDestination, pos);
+ pos+= (destinationName == null ? 0 : destinationName.length()) + 1;
+ queueName = AMQShortString.readFromByteArray(byteEncodedDestination, pos);
+ pos+= (queueName == null ? 0 : queueName.length()) + 1;
+ int options = byteEncodedDestination[pos];
+ isDurable = (options & IS_DURABLE_MASK) != 0;
+ isExclusive = (options & IS_EXCLUSIVE_MASK) != 0;
+ isAutoDelete = (options & IS_AUTODELETE_MASK) != 0;
+
+ if (exchangeClass.equals(ExchangeDefaults.DIRECT_EXCHANGE_CLASS))
+ {
+ return new AMQQueue(destinationName,queueName,isExclusive,isAutoDelete,isDurable);
+ }
+ else if (exchangeClass.equals(ExchangeDefaults.TOPIC_EXCHANGE_CLASS))
+ {
+ return new AMQTopic(destinationName,isAutoDelete,queueName,isDurable);
+ }
+ else if (exchangeClass.equals(ExchangeDefaults.HEADERS_EXCHANGE_CLASS))
+ {
+ return new AMQHeadersExchange(destinationName);
+ }
+ else
+ {
+ throw new IllegalArgumentException("Unknown Exchange Class:" + exchangeClass);
+ }
+
+
+
+ }
+
public static Destination createDestination(BindingURL binding)
{
- String type = binding.getExchangeClass();
+ AMQShortString type = binding.getExchangeClass();
if (type.equals(ExchangeDefaults.DIRECT_EXCHANGE_CLASS))
{
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java Wed Feb 14 12:02:03 2007
@@ -22,6 +22,7 @@
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.url.BindingURL;
+import org.apache.qpid.framing.AMQShortString;
/**
* A destination backed by a headers exchange
@@ -33,12 +34,17 @@
this(binding.getExchangeName());
}
- public AMQHeadersExchange(String queueName)
+ public AMQHeadersExchange(String name)
+ {
+ this(new AMQShortString(name));
+ }
+
+ public AMQHeadersExchange(AMQShortString queueName)
{
super(queueName, ExchangeDefaults.HEADERS_EXCHANGE_CLASS, queueName, true, true, null);
}
- public String getRoutingKey()
+ public AMQShortString getRoutingKey()
{
return getDestinationName();
}
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java Wed Feb 14 12:02:03 2007
@@ -22,6 +22,7 @@
import org.apache.qpid.url.BindingURL;
import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
import javax.jms.Queue;
@@ -42,11 +43,27 @@
* Create a reference to a non temporary queue. Note this does not actually imply the queue exists.
* @param name the name of the queue
*/
- public AMQQueue(String name)
+ public AMQQueue(AMQShortString name)
{
this(name, false);
}
+ public AMQQueue(AMQShortString exchangeName, AMQShortString routingKey, AMQShortString queueName)
+ {
+ super(exchangeName, ExchangeDefaults.DIRECT_EXCHANGE_CLASS, routingKey, false,
+ false, queueName, false); }
+
+
+ /**
+ * Create a reference to a non temporary queue. Note this does not actually imply the queue exists.
+ * @param name the name of the queue
+ */
+ public AMQQueue(String name)
+ {
+ this(new AMQShortString(name), false);
+ }
+
+
/**
* Create a queue with a specified name.
*
@@ -56,10 +73,23 @@
*/
public AMQQueue(String name, boolean temporary)
{
+ this(new AMQShortString(name),temporary);
+ }
+
+
+ /**
+ * Create a queue with a specified name.
+ *
+ * @param name the destination name (used in the routing key)
+ * @param temporary if true the broker will generate a queue name, also if true then the queue is autodeleted
+ * and exclusive
+ */
+ public AMQQueue(AMQShortString name, boolean temporary)
+ {
// queue name is set to null indicating that the broker assigns a name in the case of temporary queues
// temporary queues are typically used as response queues
- this(name, temporary?null:name, temporary, temporary);
- _isDurable = !temporary;
+ this(name, temporary?null:name, temporary, temporary, !temporary);
+
}
/**
@@ -69,16 +99,22 @@
* @param exclusive true if the queue should only permit a single consumer
* @param autoDelete true if the queue should be deleted automatically when the last consumers detaches
*/
- public AMQQueue(String destinationName, String queueName, boolean exclusive, boolean autoDelete)
+ public AMQQueue(AMQShortString destinationName, AMQShortString queueName, boolean exclusive, boolean autoDelete)
+ {
+ this(destinationName, queueName, exclusive, autoDelete, false);
+ }
+
+
+ public AMQQueue(AMQShortString destinationName, AMQShortString queueName, boolean exclusive, boolean autoDelete, boolean durable)
{
super(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS, destinationName, exclusive,
- autoDelete, queueName);
+ autoDelete, queueName, durable);
}
- public String getRoutingKey()
+ public AMQShortString getRoutingKey()
{
- return getQueueName();
+ return getAMQQueueName();
}
public boolean isNameRequired()
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Wed Feb 14 12:02:03 2007
@@ -20,16 +20,6 @@
*/
package org.apache.qpid.client;
-import java.io.Serializable;
-import java.text.MessageFormat;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
import javax.jms.BytesMessage;
import javax.jms.Destination;
import javax.jms.IllegalStateException;
@@ -60,16 +50,15 @@
import org.apache.qpid.AMQInvalidSelectorException;
import org.apache.qpid.AMQUndeliveredException;
import org.apache.qpid.client.failover.FailoverSupport;
-import org.apache.qpid.client.message.AbstractJMSMessage;
-import org.apache.qpid.client.message.JMSStreamMessage;
-import org.apache.qpid.client.message.MessageFactoryRegistry;
-import org.apache.qpid.client.message.UnprocessedMessage;
+import org.apache.qpid.client.message.*;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.client.util.FlowControllingBlockingQueue;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ChannelCloseBody;
import org.apache.qpid.framing.ChannelCloseOkBody;
import org.apache.qpid.framing.ChannelFlowBody;
@@ -93,10 +82,19 @@
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.jms.Session;
import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.handler.ExchangeBoundHandler;
import org.apache.qpid.url.AMQBindingURL;
import org.apache.qpid.url.URLSyntaxException;
+import java.io.Serializable;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
public class AMQSession extends Closeable implements Session, QueueSession, TopicSession
{
private static final Logger _logger = Logger.getLogger(AMQSession.class);
@@ -115,6 +113,10 @@
private int _defaultPrefetchHighMark = DEFAULT_PREFETCH_HIGH_MARK;
private int _defaultPrefetchLowMark = DEFAULT_PREFETCH_LOW_MARK;
+ private MessageListener _messageListener = null;
+
+ private AtomicBoolean _startedAtLeastOnce = new AtomicBoolean(false);
+
/**
* Used to reference durable subscribers so they requests for unsubscribe can be handled
* correctly. Note this only keeps a record of subscriptions which have been created
@@ -137,8 +139,11 @@
*/
private final FlowControllingBlockingQueue _queue;
+ // working
private ConcurrentLinkedQueue<Long> _unacknowledged = new ConcurrentLinkedQueue();
-
+ // merge-right.r501854
+ private final java.util.Queue<MessageConsumerPair> _reprocessQueue;
+
private Dispatcher _dispatcher;
private MessageFactoryRegistry _messageFactoryRegistry;
@@ -151,7 +156,7 @@
/**
* Maps from consumer tag (String) to JMSMessageConsumer instance
*/
- private Map<String, BasicMessageConsumer> _consumers = new ConcurrentHashMap<String, BasicMessageConsumer>();
+ private Map<AMQShortString, BasicMessageConsumer> _consumers = new ConcurrentHashMap<AMQShortString, BasicMessageConsumer>();
/**
* Maps from destination to count of JMSMessageConsumers
@@ -186,16 +191,27 @@
private volatile AtomicBoolean _stopped = new AtomicBoolean(true);
/**
+ * Used to signal 'pausing' the dispatcher when setting a message listener on a consumer
+ */
+ private final AtomicBoolean _pausingDispatcher = new AtomicBoolean(false);
+
+ /**
+ * Used to signal 'pausing' the dispatcher when setting a message listener on a consumer
+ */
+ private final AtomicBoolean _pausedDispatcher = new AtomicBoolean(false);
+
+ /**
* Set when recover is called. This is to handle the case where recover() is called by application code
* during onMessage() processing. We need to make sure we do not send an auto ack if recover was called.
*/
private boolean _inRecovery;
-
+ private boolean _hasMessageListeners;
/**
* Responsible for decoding a message fragment and passing it to the appropriate message consumer.
*/
+
private class Dispatcher extends Thread
{
public Dispatcher()
@@ -224,57 +240,18 @@
private void dispatchMessage(UnprocessedMessage message)
{
- if (message.contents != null)
- {
- final BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(message.contentHeader.getDestination());
+ final BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(message.getMessageHeaders().getDestination());
- if (consumer == null)
- {
- _logger.warn("Received a message from queue " + message.contentHeader.getDestination() + " without a handler - ignoring...");
- _logger.warn("Consumers that exist: " + _consumers);
- _logger.warn("Session hashcode: " + System.identityHashCode(this));
- }
- else
- {
-
- consumer.notifyMessage(message, _channelId);
-
- }
+ if (consumer == null)
+ {
+ _logger.warn("Received a message from queue " + message.getMessageHeaders().getDestination() + " without a handler - ignoring...");
+ _logger.warn("Consumers that exist: " + _consumers);
+ _logger.warn("Session hashcode: " + System.identityHashCode(this));
}
- /*else
+ else
{
- try
- {
- // Bounced message is processed here, away from the mina thread
- AbstractJMSMessage bouncedMessage = _messageFactoryRegistry.createMessage(0,
- false,
- message.contentHeader,
- message.content);
-
- int errorCode = message.bounceBody.replyCode;
- String reason = message.bounceBody.replyText;
- _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")");
-
- //@TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions.
- if (errorCode == AMQConstant.NO_CONSUMERS.getCode())
- {
- _connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage));
- }
- else if (errorCode == AMQConstant.NO_ROUTE.getCode())
- {
- _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage));
- }
- else
- {
- _connection.exceptionReceived(new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage));
- }
-
- }
- catch (Exception e)
- {
- _logger.error("Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", e);
- }
- }*/
+ consumer.notifyMessage(message, _channelId);
+ }
}
public void stopDispatcher()
@@ -284,6 +261,8 @@
}
}
+
+
AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode,
MessageFactoryRegistry messageFactoryRegistry)
{
@@ -314,6 +293,8 @@
_defaultPrefetchHighMark = defaultPrefetchHighMark;
_defaultPrefetchLowMark = defaultPrefetchLowMark;
+ _reprocessQueue = new ConcurrentLinkedQueue<MessageConsumerPair>();
+
if (_acknowledgeMode == NO_ACKNOWLEDGE)
{
_queue = new FlowControllingBlockingQueue(_defaultPrefetchHighMark, _defaultPrefetchLowMark,
@@ -374,136 +355,69 @@
public BytesMessage createBytesMessage() throws JMSException
{
- synchronized(_connection.getFailoverMutex())
+ synchronized (_connection.getFailoverMutex())
{
checkNotClosed();
- try
- {
- return (BytesMessage) _messageFactoryRegistry.createMessage("application/octet-stream");
- }
- catch (AMQException e)
- {
- throw new JMSException("Unable to create message: " + e);
- }
+ return new JMSBytesMessage();
}
}
public MapMessage createMapMessage() throws JMSException
{
- synchronized(_connection.getFailoverMutex())
+ synchronized (_connection.getFailoverMutex())
{
checkNotClosed();
- try
- {
- return (MapMessage) _messageFactoryRegistry.createMessage("jms/map-message");
- }
- catch (AMQException e)
- {
- throw new JMSException("Unable to create message: " + e);
- }
+ return new JMSMapMessage();
}
}
public javax.jms.Message createMessage() throws JMSException
{
- synchronized(_connection.getFailoverMutex())
- {
- checkNotClosed();
- try
- {
- return (BytesMessage) _messageFactoryRegistry.createMessage("application/octet-stream");
- }
- catch (AMQException e)
- {
- throw new JMSException("Unable to create message: " + e);
- }
- }
+ return createBytesMessage();
}
public ObjectMessage createObjectMessage() throws JMSException
{
- synchronized(_connection.getFailoverMutex())
+ synchronized (_connection.getFailoverMutex())
{
checkNotClosed();
- try
- {
- return (ObjectMessage) _messageFactoryRegistry.createMessage("application/java-object-stream");
- }
- catch (AMQException e)
- {
- throw new JMSException("Unable to create message: " + e);
- }
+ return (ObjectMessage) new JMSObjectMessage();
}
}
public ObjectMessage createObjectMessage(Serializable object) throws JMSException
{
- synchronized(_connection.getFailoverMutex())
- {
- checkNotClosed();
- try
- {
- ObjectMessage msg = (ObjectMessage) _messageFactoryRegistry.createMessage("application/java-object-stream");
- msg.setObject(object);
- return msg;
- }
- catch (AMQException e)
- {
- throw new JMSException("Unable to create message: " + e);
- }
- }
+ ObjectMessage msg = createObjectMessage();
+ msg.setObject(object);
+ return msg;
}
public StreamMessage createStreamMessage() throws JMSException
{
- synchronized(_connection.getFailoverMutex())
+ synchronized (_connection.getFailoverMutex())
{
checkNotClosed();
- try
- {
- return (StreamMessage) _messageFactoryRegistry.createMessage(JMSStreamMessage.MIME_TYPE);
- }
- catch (AMQException e)
- {
- throw new JMSException("Unable to create text message: " + e);
- }
+ return new JMSStreamMessage();
}
}
public TextMessage createTextMessage() throws JMSException
{
- synchronized(_connection.getFailoverMutex())
+ synchronized (_connection.getFailoverMutex())
{
checkNotClosed();
- try
- {
- return (TextMessage) _messageFactoryRegistry.createMessage("text/plain");
- }
- catch (AMQException e)
- {
- throw new JMSException("Unable to create text message: " + e);
- }
+ return new JMSTextMessage();
}
}
public TextMessage createTextMessage(String text) throws JMSException
{
- synchronized(_connection.getFailoverMutex())
- {
- checkNotClosed();
- try
- {
- TextMessage msg = (TextMessage) _messageFactoryRegistry.createMessage("text/plain");
- msg.setText(text);
- return msg;
- }
- catch (AMQException e)
- {
- throw new JMSException("Unable to create text message: " + e);
- }
- }
+
+ TextMessage msg = createTextMessage();
+ msg.setText(text);
+ return msg;
}
public boolean getTransacted() throws JMSException
@@ -532,10 +446,11 @@
}
// Commits outstanding messages sent and outstanding acknowledgements.
- // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- _connection.getProtocolHandler().syncWrite(_channelId, TxCommitBody.createMethodBody((byte)0, (byte)9), TxCommitOkBody.class);
+ _connection.getProtocolHandler().syncWrite(_channelId, TxCommitBody.createMethodBody(
+ getProtocolMajorVersion(),
+ getProtocolMinorVersion()),
+ TxCommitOkBody.class);
}
catch (AMQException e)
{
@@ -545,17 +460,18 @@
}
}
+
public void rollback() throws JMSException
{
checkTransacted();
try
{
_unacknowledged.clear();
- // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- _connection.getProtocolHandler().syncWrite(_channelId,
- TxRollbackBody.createMethodBody((byte)0, (byte)9), TxRollbackOkBody.class);
+ _connection.getProtocolHandler().syncWrite(_channelId, TxRollbackBody.createMethodBody(
+ getProtocolMajorVersion(),
+ getProtocolMinorVersion()),
+ TxRollbackOkBody.class);
}
catch (AMQException e)
{
@@ -565,9 +481,14 @@
public void close() throws JMSException
{
+ close(-1);
+ }
+
+ public void close(long timeout) throws JMSException
+ {
// We must close down all producers and consumers in an orderly fashion. This is the only method
// that can be called from a different thread of control from the one controlling the session
- synchronized(_connection.getFailoverMutex())
+ synchronized (_connection.getFailoverMutex())
{
//Ensure we only try and close an open session.
if (!_closed.getAndSet(true))
@@ -578,15 +499,14 @@
try
{
_connection.getProtocolHandler().closeSession(this);
- // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
final AMQMethodBody methodBody = ChannelCloseBody.createMethodBody(
- (byte)0, (byte)9, // AMQP version (major, minor)
+ getProtocolMajorVersion(),
+ getProtocolMinorVersion(),
0, // classId
0, // methodId
AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
- "JMS client closing channel"); // replyText
+ new AMQShortString("JMS client closing channel")); // replyText
_connection.getProtocolHandler().syncWrite(getChannelId(), methodBody, ChannelCloseOkBody.class);
// When control resumes at this point, a reply will have been received that
// indicates the broker has closed the channel successfully
@@ -606,13 +526,31 @@
}
}
+ private AMQProtocolHandler getProtocolHandler()
+ {
+ return _connection.getProtocolHandler();
+ }
+
+
+ private byte getProtocolMinorVersion()
+ {
+ return getProtocolHandler().getProtocolMinorVersion();
+ }
+
+ private byte getProtocolMajorVersion()
+ {
+ return getProtocolHandler().getProtocolMajorVersion();
+ }
+
+
/**
* Close all producers or consumers. This is called either in the error case or when closing the session normally.
*
* @param amqe the exception, may be null to indicate no error has occurred
*/
- private void closeProducersAndConsumers(AMQException amqe)
+ private void closeProducersAndConsumers(AMQException amqe) throws JMSException
{
+ JMSException jmse = null;
try
{
closeProducers();
@@ -620,6 +558,7 @@
catch (JMSException e)
{
_logger.error("Error closing session: " + e, e);
+ jmse = e;
}
try
{
@@ -628,7 +567,19 @@
catch (JMSException e)
{
_logger.error("Error closing session: " + e, e);
+ if (jmse == null)
+ {
+ jmse = e;
+ }
+ }
+ finally
+ {
+ if (jmse != null)
+ {
+ throw jmse;
+ }
}
+
}
/**
@@ -637,9 +588,9 @@
*
* @param e the exception that caused this session to be closed. Null causes the
*/
- public void closed(Throwable e)
+ public void closed(Throwable e) throws JMSException
{
- synchronized(_connection.getFailoverMutex())
+ synchronized (_connection.getFailoverMutex())
{
// An AMQException has an error code and message already and will be passed in when closure occurs as a
// result of a channel close request
@@ -777,14 +728,17 @@
consumer.clearUnackedMessages();
}
_unacknowledged.clear();
- // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- try {
+ try
+ {
_connection.getProtocolHandler().writeRequest(_channelId,
- MessageRecoverBody.createMethodBody((byte)0, (byte)9, // AMQP version (major, minor)
+ MessageRecoverBody.createMethodBody(
+ getProtocolMajorVersion(),
+ getProtocolMinorVersion(),
false)); // requeue
- } catch (AMQException e) {
+ }
+ catch (AMQException e)
+ {
_logger.error("Error recovering",e);
JMSException ex = new JMSException("Error Recovering");
ex.initCause(e);
@@ -820,13 +774,37 @@
public MessageListener getMessageListener() throws JMSException
{
checkNotClosed();
- throw new java.lang.UnsupportedOperationException("MessageListener interface not supported");
+ return _messageListener;
}
public void setMessageListener(MessageListener listener) throws JMSException
{
checkNotClosed();
- throw new java.lang.UnsupportedOperationException("MessageListener interface not supported");
+
+ if (!isStopped())
+ {
+ throw new javax.jms.IllegalStateException("Attempt to set listener while session is started.");
+ }
+
+ // We are stopped
+ for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();)
+ {
+ BasicMessageConsumer consumer = i.next();
+
+ if (consumer.isReceiving())
+ {
+ throw new javax.jms.IllegalStateException("Another thread is already receiving synchronously.");
+ }
+ }
+
+ _messageListener = listener;
+
+ for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();)
+ {
+ i.next().setMessageListener(_messageListener);
+ }
+
+
}
public void run()
@@ -858,6 +836,12 @@
return createProducerImpl(destination, DEFAULT_MANDATORY, DEFAULT_IMMEDIATE);
}
+ // Test purposes only - used for testing refs, which cannot be done using JMS interfaces
+ public BasicMessageProducer createBasicProducer(Topic destination) throws JMSException
+ {
+ return (BasicMessageProducer)createProducerImpl(destination, DEFAULT_MANDATORY, DEFAULT_IMMEDIATE);
+ }
+
private org.apache.qpid.jms.MessageProducer createProducerImpl(Destination destination, boolean mandatory,
boolean immediate)
throws JMSException
@@ -883,7 +867,10 @@
registerProducer(producerId, producer);
return producer;
}
- catch (AMQException e) { throw new JMSException(e.toString()); }
+ catch (AMQException e)
+ {
+ throw new JMSException(e.toString());
+ }
}
}.execute(_connection);
}
@@ -964,8 +951,8 @@
}
public MessageConsumer createBrowserConsumer(Destination destination,
- String messageSelector,
- boolean noLocal)
+ String messageSelector,
+ boolean noLocal)
throws JMSException
{
checkValidDestination(destination);
@@ -1039,6 +1026,7 @@
{
checkTemporaryDestination(destination);
+
return (org.apache.qpid.jms.MessageConsumer) new FailoverSupport()
{
public Object operation() throws JMSException
@@ -1047,7 +1035,7 @@
AMQDestination amqd = (AMQDestination) destination;
- final AMQProtocolHandler protocolHandler = _connection.getProtocolHandler();
+ final AMQProtocolHandler protocolHandler = getProtocolHandler();
// TODO: construct the rawSelector from the selector string if rawSelector == null
final FieldTable ft = FieldTableFactory.newFieldTable();
//if (rawSelector != null)
@@ -1061,6 +1049,11 @@
protocolHandler, ft, prefetchHigh, prefetchLow, exclusive,
_acknowledgeMode, noConsume, autoClose);
+ if (_messageListener != null)
+ {
+ consumer.setMessageListener(_messageListener);
+ }
+
try
{
registerConsumer(consumer, false);
@@ -1074,11 +1067,14 @@
catch (AMQException e)
{
JMSException ex = new JMSException("Error registering consumer: " + e);
+
+ //todo remove
+ e.printStackTrace();
ex.setLinkedException(e);
throw ex;
}
- synchronized(destination)
+ synchronized (destination)
{
_destinationConsumerCount.putIfAbsent(destination, new AtomicInteger());
_destinationConsumerCount.get(destination).incrementAndGet();
@@ -1118,18 +1114,17 @@
}
- public void declareExchange(String name, String type) throws AMQException
+ public void declareExchange(AMQShortString name, AMQShortString type) throws AMQException
{
- declareExchange(name, type, _connection.getProtocolHandler());
+ declareExchange(name, type, getProtocolHandler());
}
- public void declareExchangeSynch(String name, String type) throws AMQException
+ public void declareExchangeSynch(AMQShortString name, AMQShortString type) throws AMQException
{
- // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
AMQMethodBody methodBody = ExchangeDeclareBody.createMethodBody(
- (byte)0, (byte)9, // AMQP version (major, minor)
+ getProtocolMajorVersion(), // AMQP major version
+ getProtocolMinorVersion(), // AMQP minor version
null, // arguments
false, // autoDelete
false, // durable
@@ -1142,18 +1137,17 @@
_connection.getProtocolHandler().syncWrite(_channelId, methodBody, ExchangeDeclareOkBody.class);
}
- private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler)throws AMQException
+ private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler) throws AMQException
{
declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(), protocolHandler);
}
- private void declareExchange(String name, String type, AMQProtocolHandler protocolHandler) throws AMQException
+ private void declareExchange(AMQShortString name, AMQShortString type, AMQProtocolHandler protocolHandler) throws AMQException
{
- // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
AMQMethodBody methodBody = ExchangeDeclareBody.createMethodBody(
- (byte)0, (byte)9, // AMQP version (major, minor)
+ getProtocolMajorVersion(), // AMQP major version
+ getProtocolMinorVersion(), // AMQP minor version
null, // arguments
false, // autoDelete
false, // durable
@@ -1175,7 +1169,7 @@
* @return the queue name. This is useful where the broker is generating a queue name on behalf of the client.
* @throws AMQException
*/
- private String declareQueue(AMQDestination amqd, AMQProtocolHandler protocolHandler) throws AMQException
+ private AMQShortString declareQueue(AMQDestination amqd, AMQProtocolHandler protocolHandler) throws AMQException
{
// For queues (but not topics) we generate the name in the client rather than the
// server. This allows the name to be reused on failover if required. In general,
@@ -1185,38 +1179,35 @@
amqd.setQueueName(protocolHandler.generateQueueName());
}
- // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
AMQMethodBody methodBody = QueueDeclareBody.createMethodBody(
- (byte)0, (byte)9, // AMQP version (major, minor)
+ getProtocolMajorVersion(), // AMQP major version
+ getProtocolMinorVersion(), // AMQP minor version
null, // arguments
amqd.isAutoDelete(), // autoDelete
amqd.isDurable(), // durable
amqd.isExclusive(), // exclusive
true, // nowait
false, // passive
- amqd.getQueueName(), // queue
+ amqd.getAMQQueueName(), // queue
0); // ticket
protocolHandler.writeRequest(_channelId, methodBody);
- return amqd.getQueueName();
+ return amqd.getAMQQueueName();
}
- private void bindQueue(AMQDestination amqd, String queueName, AMQProtocolHandler protocolHandler, FieldTable ft) throws AMQException
+ private void bindQueue(AMQDestination amqd, AMQShortString queueName, AMQProtocolHandler protocolHandler, FieldTable ft) throws AMQException
{
- // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
AMQMethodBody methodBody = QueueBindBody.createMethodBody(
- (byte)0, (byte)9, // AMQP version (major, minor)
+ getProtocolMajorVersion(), // AMQP major version
+ getProtocolMinorVersion(), // AMQP minor version
ft, // arguments
amqd.getExchangeName(), // exchange
true, // nowait
queueName, // queue
amqd.getRoutingKey(), // routingKey
0); // ticket
-
protocolHandler.writeRequest(_channelId, methodBody);
}
@@ -1226,23 +1217,23 @@
* @param queueName
* @return the consumer tag generated by the broker
*/
- private void consumeFromQueue(BasicMessageConsumer consumer, String queueName, AMQProtocolHandler protocolHandler,
+ private void consumeFromQueue(BasicMessageConsumer consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler,
boolean nowait, String messageSelector) throws AMQException
{
//fixme prefetch values are not used here. Do we need to have them as parametsrs?
//need to generate a consumer tag on the client so we can exploit the nowait flag
- String tag = Integer.toString(_nextTag++);
+ AMQShortString tag = new AMQShortString(Integer.toString(_nextTag++));
FieldTable arguments = FieldTableFactory.newFieldTable();
if (messageSelector != null && !messageSelector.equals(""))
{
arguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector);
}
- if(consumer.isAutoClose())
+ if (consumer.isAutoClose())
{
arguments.put(AMQPFilterTypes.AUTO_CLOSE.getValue(), Boolean.TRUE);
}
- if(consumer.isNoConsume())
+ if (consumer.isNoConsume())
{
arguments.put(AMQPFilterTypes.NO_CONSUME.getValue(), Boolean.TRUE);
}
@@ -1253,29 +1244,17 @@
try
{
- // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- /*AMQFrame jmsConsume = BasicConsumeBody.createAMQFrame(_channelId,
- (byte)0, (byte)9, // AMQP version (major, minor)
- arguments, // arguments
- tag, // consumerTag
- consumer.isExclusive(), // exclusive
- consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE, // noAck
- consumer.isNoLocal(), // noLocal
- nowait, // nowait
- queueName, // queue
- 0); // ticket */
-
AMQMethodBody methodBody = MessageConsumeBody.createMethodBody(
- (byte)0, (byte)9, // AMQP version (major, minor)
- tag, // consumerTag
- consumer.isExclusive(), // exclusive
- arguments, // arguments in the form of a field table
- consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE, // noAck
- consumer.isNoLocal(), // noLocal
- queueName, // queue
- 0); // ticket */
+ getProtocolMajorVersion(), // AMQP major version
+ getProtocolMinorVersion(), // AMQP minor version
+ tag, // consumerTag
+ consumer.isExclusive(), // exclusive
+ arguments, // arguments in the form of a field table
+ consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE, // noAck
+ consumer.isNoLocal(), // noLocal
+ queueName, // queue
+ 0); // ticket */
/*
if (nowait)
{
@@ -1364,7 +1343,7 @@
if (topicName.indexOf('/') == -1)
{
- return new AMQTopic(topicName);
+ return new AMQTopic(new AMQShortString(topicName));
}
else
{
@@ -1434,12 +1413,21 @@
}
else
{
+ AMQShortString topicName;
+ if (topic instanceof AMQTopic)
+ {
+ topicName = ((AMQTopic) topic).getDestinationName();
+ }
+ else
+ {
+ topicName = new AMQShortString(topic.getTopicName());
+ }
// if the queue is bound to the exchange but NOT for this topic, then the JMS spec
// says we must trash the subscription.
- if (isQueueBound(dest.getQueueName()) &&
- !isQueueBound(dest.getQueueName(), topic.getTopicName()))
+ if (isQueueBound(dest.getAMQQueueName()) &&
+ !isQueueBound(dest.getAMQQueueName(), topicName))
{
- deleteQueue(dest.getQueueName());
+ deleteQueue(dest.getAMQQueueName());
}
}
@@ -1451,15 +1439,14 @@
return subscriber;
}
- void deleteQueue(String queueName) throws JMSException
+ void deleteQueue(AMQShortString queueName) throws JMSException
{
try
{
- // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
AMQMethodBody methodBody = QueueDeleteBody.createMethodBody(
- (byte)0, (byte)9, // AMQP version (major, minor)
+ getProtocolMajorVersion(), // AMQP major version
+ getProtocolMinorVersion(), // AMQP minor version
false, // ifEmpty
false, // ifUnused
true, // nowait
@@ -1504,7 +1491,7 @@
{
checkNotClosed();
checkValidQueue(queue);
- return new AMQQueueBrowser(this, (AMQQueue) queue,messageSelector);
+ return new AMQQueueBrowser(this, (AMQQueue) queue, messageSelector);
}
public TemporaryQueue createTemporaryQueue() throws JMSException
@@ -1543,18 +1530,17 @@
}
}
- boolean isQueueBound(String queueName) throws JMSException
+ boolean isQueueBound(AMQShortString queueName) throws JMSException
{
return isQueueBound(queueName, null);
}
- boolean isQueueBound(String queueName, String routingKey) throws JMSException
+ boolean isQueueBound(AMQShortString queueName, AMQShortString routingKey) throws JMSException
{
- // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
AMQMethodBody methodBody = ExchangeBoundBody.createMethodBody(
- (byte)0, (byte)9, // AMQP version (major, minor)
+ getProtocolMajorVersion(), // AMQP major version
+ getProtocolMinorVersion(), // AMQP minor version
ExchangeDefaults.TOPIC_EXCHANGE_NAME, // exchange
queueName, // queue
routingKey); // routingKey
@@ -1568,7 +1554,7 @@
throw new JMSAMQException(e);
}
ExchangeBoundOkBody responseBody = (ExchangeBoundOkBody) response.getMethod();
- return (responseBody.replyCode == ExchangeBoundHandler.OK);
+ return (responseBody.replyCode == 0); //ExchangeBoundHandler.OK); Remove Broker compile dependency
}
private void checkTransacted() throws JMSException
@@ -1600,7 +1586,7 @@
_logger.debug("Message received in session with channel id " + _channelId);
}
- _unacknowledged.offer(message.deliveryTag);
+ _unacknowledged.offer(message.getDeliveryTag());
_queue.add(message);
}
@@ -1616,10 +1602,8 @@
*/
public synchronized void acknowledgeMessage(long requestId, boolean multiple) throws AMQException
{
- // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- final AMQMethodBody methodBody = MessageOkBody.createMethodBody((byte)0, (byte)9); // AMQP version (major, minor)
+ final AMQMethodBody methodBody = MessageOkBody.createMethodBody(getProtocolMajorVersion(), getProtocolMinorVersion()); // AMQP version (major, minor)
if (_logger.isDebugEnabled())
{
_logger.debug("Sending ack for request ID " + requestId + " on channel " + _channelId);
@@ -1659,7 +1643,7 @@
void start()
{
- if (_dispatcher != null)
+ if (_startedAtLeastOnce.getAndSet(true))
{
try{
//then we stopped this and are restarting, so signal server to resume delivery
@@ -1668,9 +1652,31 @@
_logger.error("Error Un Suspending Channel", e);
}
}
- _dispatcher = new Dispatcher();
- _dispatcher.setDaemon(true);
- _dispatcher.start();
+
+ if(hasMessageListeners() && _dispatcher == null)
+ {
+ startDistpatcherIfNecessary();
+ }
+ }
+
+ private boolean hasMessageListeners()
+ {
+ return _hasMessageListeners;
+ }
+
+ void setHasMessageListeners()
+ {
+ _hasMessageListeners = true;
+ }
+
+ synchronized void startDistpatcherIfNecessary()
+ {
+ if(_dispatcher == null)
+ {
+ _dispatcher = new Dispatcher();
+ _dispatcher.setDaemon(true);
+ _dispatcher.start();
+ }
}
void stop()
@@ -1682,7 +1688,7 @@
_logger.error("Error Suspending Channel", e);
}
-//stop the dispatcher thread
+ //stop the dispatcher thread
_stopped.set(true);
}
@@ -1701,11 +1707,11 @@
{
AMQDestination amqd = consumer.getDestination();
- AMQProtocolHandler protocolHandler = _connection.getProtocolHandler();
+ AMQProtocolHandler protocolHandler = getProtocolHandler();
declareExchange(amqd, protocolHandler);
- String queueName = declareQueue(amqd, protocolHandler);
+ AMQShortString queueName = declareQueue(amqd, protocolHandler);
bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable());
@@ -1727,19 +1733,21 @@
*/
void deregisterConsumer(BasicMessageConsumer consumer)
{
- _consumers.remove(consumer.getConsumerTag());
- String subscriptionName = _reverseSubscriptionMap.remove(consumer);
- if (subscriptionName != null)
+ if (_consumers.remove(consumer.getConsumerTag()) != null)
{
- _subscriptions.remove(subscriptionName);
- }
+ String subscriptionName = _reverseSubscriptionMap.remove(consumer);
+ if (subscriptionName != null)
+ {
+ _subscriptions.remove(subscriptionName);
+ }
- Destination dest = consumer.getDestination();
- synchronized(dest)
- {
- if (_destinationConsumerCount.get(dest).decrementAndGet() == 0)
+ Destination dest = consumer.getDestination();
+ synchronized (dest)
{
- _destinationConsumerCount.remove(dest);
+ if (_destinationConsumerCount.get(dest).decrementAndGet() == 0)
+ {
+ _destinationConsumerCount.remove(dest);
+ }
}
}
}
@@ -1773,7 +1781,7 @@
private void resubscribeProducers() throws AMQException
{
ArrayList producers = new ArrayList(_producers.values());
- _logger.info(MessageFormat.format("Resubscribing producers = {0} producers.size={1}", producers, producers.size())); // FIXME: remove
+ _logger.info(MessageFormat.format("Resubscribing producers = {0} producers.size={1}", producers, producers.size())); // FIXME: removeKey
for (Iterator it = producers.iterator(); it.hasNext();)
{
BasicMessageProducer producer = (BasicMessageProducer) it.next();
@@ -1796,11 +1804,10 @@
private void suspendChannel() throws AMQException
{
_logger.warn("Suspending channel");
- // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
AMQMethodBody methodBody = ChannelFlowBody.createMethodBody(
- (byte)0, (byte)9, // AMQP version (major, minor)
+ getProtocolMajorVersion(), // AMQP major version
+ getProtocolMinorVersion(), // AMQP minor version
false); // active
_connection.getProtocolHandler().writeRequest(_channelId, methodBody);
}
@@ -1808,19 +1815,18 @@
private void unsuspendChannel() throws AMQException
{
_logger.warn("Unsuspending channel");
- // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
AMQMethodBody methodBody = ChannelFlowBody.createMethodBody(
- (byte)0, (byte)9, // AMQP version (major, minor)
+ getProtocolMajorVersion(), // AMQP major version
+ getProtocolMinorVersion(), // AMQP minor version
true); // active
_connection.getProtocolHandler().writeRequest(_channelId, methodBody);
}
- public void confirmConsumerCancelled(String consumerTag)
+ public void confirmConsumerCancelled(AMQShortString consumerTag)
{
BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(consumerTag);
- if((consumer != null) && (consumer.isAutoClose()))
+ if ((consumer != null) && (consumer.isAutoClose()))
{
consumer.closeWhenNoMessages(true);
}
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java Wed Feb 14 12:02:03 2007
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.client;
+import org.apache.qpid.framing.AMQShortString;
+
import javax.jms.JMSException;
import javax.jms.TemporaryQueue;
@@ -38,7 +40,7 @@
*/
public AMQTemporaryQueue(AMQSession session)
{
- super("TempQueue" + Long.toString(System.currentTimeMillis()), true);
+ super(new AMQShortString("TempQueue" + Long.toString(System.currentTimeMillis())), true);
_session = session;
}
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java Wed Feb 14 12:02:03 2007
@@ -22,6 +22,7 @@
import org.apache.qpid.url.BindingURL;
import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
import javax.jms.JMSException;
import javax.jms.Topic;
@@ -40,10 +41,21 @@
public AMQTopic(String name)
{
+ this(new AMQShortString(name));
+ }
+
+ public AMQTopic(AMQShortString exchange, AMQShortString routingKey, AMQShortString queueName)
+ {
+ super(exchange, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, routingKey, true, true, queueName, false);
+ }
+
+
+ public AMQTopic(AMQShortString name)
+ {
this(name, true, null, false);
}
- public AMQTopic(String name, boolean isAutoDelete, String queueName, boolean isDurable)
+ public AMQTopic(AMQShortString name, boolean isAutoDelete, AMQShortString queueName, boolean isDurable)
{
super(ExchangeDefaults.TOPIC_EXCHANGE_NAME, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, name, true, isAutoDelete,
queueName, isDurable);
@@ -56,17 +68,17 @@
true);
}
- public static String getDurableTopicQueueName(String subscriptionName, AMQConnection connection) throws JMSException
+ public static AMQShortString getDurableTopicQueueName(String subscriptionName, AMQConnection connection) throws JMSException
{
- return connection.getClientID() + ":" + subscriptionName;
+ return new AMQShortString(connection.getClientID() + ":" + subscriptionName);
}
public String getTopicName() throws JMSException
{
- return super.getDestinationName();
+ return super.getDestinationName().toString();
}
- public String getRoutingKey()
+ public AMQShortString getRoutingKey()
{
return getDestinationName();
}