You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/06/15 18:28:54 UTC
svn commit: r547730 [4/9] - in /incubator/qpid/trunk/qpid: ./ java/
java/broker/ java/broker/src/main/java/org/apache/qpid/server/
java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/
java/broker/src/main/java/org/apache/qpid/server/queue/ ...
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java?view=diff&rev=547730&r1=547729&r2=547730
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java Fri Jun 15 09:28:46 2007
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.client.handler;
-import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.ConnectionTuneParameters;
import org.apache.qpid.client.protocol.AMQProtocolSession;
@@ -34,9 +33,12 @@
import org.apache.qpid.framing.ConnectionTuneOkBody;
import org.apache.qpid.protocol.AMQMethodEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
public class ConnectionTuneMethodHandler implements StateAwareMethodListener
{
- private static final Logger _logger = Logger.getLogger(ConnectionTuneMethodHandler.class);
+ private static final Logger _logger = LoggerFactory.getLogger(ConnectionTuneMethodHandler.class);
private static final ConnectionTuneMethodHandler _instance = new ConnectionTuneMethodHandler();
@@ -46,10 +48,10 @@
}
protected ConnectionTuneMethodHandler()
- {
- }
+ { }
- public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt)
+ throws AMQException
{
_logger.debug("ConnectionTune frame received");
ConnectionTuneBody frame = (ConnectionTuneBody) evt.getMethod();
@@ -60,38 +62,36 @@
params = new ConnectionTuneParameters();
}
- params.setFrameMax(frame.frameMax);
+ params.setFrameMax(frame.frameMax);
params.setChannelMax(frame.channelMax);
params.setHeartbeat(Integer.getInteger("amqj.heartbeat.delay", frame.heartbeat));
protocolSession.setConnectionTuneParameters(params);
stateManager.changeState(AMQState.CONNECTION_NOT_OPENED);
- protocolSession.writeFrame(createTuneOkFrame(evt.getChannelId(), params,frame.getMajor(), frame.getMinor()));
+ protocolSession.writeFrame(createTuneOkFrame(evt.getChannelId(), params, frame.getMajor(), frame.getMinor()));
String host = protocolSession.getAMQConnection().getVirtualHost();
AMQShortString virtualHost = new AMQShortString("/" + host);
-
- protocolSession.writeFrame(createConnectionOpenFrame(evt.getChannelId(), virtualHost, null, true,frame.getMajor(), frame.getMinor()));
+ protocolSession.writeFrame(createConnectionOpenFrame(evt.getChannelId(), virtualHost, null, true, frame.getMajor(),
+ frame.getMinor()));
}
- protected AMQFrame createConnectionOpenFrame(int channel, AMQShortString path, AMQShortString capabilities, boolean insist, byte major, byte minor)
+ protected AMQFrame createConnectionOpenFrame(int channel, AMQShortString path, AMQShortString capabilities,
+ boolean insist, byte major, byte minor)
{
// Be aware of possible changes to parameter order as versions change.
- return ConnectionOpenBody.createAMQFrame(channel,
- major, minor, // AMQP version (major, minor)
- capabilities, // capabilities
- insist, // insist
- path); // virtualHost
+ return ConnectionOpenBody.createAMQFrame(channel, major, minor, // AMQP version (major, minor)
+ capabilities, // capabilities
+ insist, // insist
+ path); // virtualHost
}
protected AMQFrame createTuneOkFrame(int channel, ConnectionTuneParameters params, byte major, byte minor)
{
// Be aware of possible changes to parameter order as versions change.
- return ConnectionTuneOkBody.createAMQFrame(channel,
- major, minor,
- params.getChannelMax(), // channelMax
- params.getFrameMax(), // frameMax
- params.getHeartbeat()); // heartbeat
+ return ConnectionTuneOkBody.createAMQFrame(channel, major, minor, params.getChannelMax(), // channelMax
+ params.getFrameMax(), // frameMax
+ params.getHeartbeat()); // heartbeat
}
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java?view=diff&rev=547730&r1=547729&r2=547730
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java Fri Jun 15 09:28:46 2007
@@ -17,7 +17,6 @@
*/
package org.apache.qpid.client.handler;
-import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQStateManager;
@@ -25,32 +24,33 @@
import org.apache.qpid.framing.ExchangeBoundOkBody;
import org.apache.qpid.protocol.AMQMethodEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* @author Apache Software Foundation
*/
public class ExchangeBoundOkMethodHandler implements StateAwareMethodListener
{
- private static final Logger _logger = Logger.getLogger(ExchangeBoundOkMethodHandler.class);
- private static final ExchangeBoundOkMethodHandler _instance = new ExchangeBoundOkMethodHandler();
-
- public static ExchangeBoundOkMethodHandler getInstance()
- {
- return _instance;
- }
-
- private ExchangeBoundOkMethodHandler()
- {
- }
+ private static final Logger _logger = LoggerFactory.getLogger(ExchangeBoundOkMethodHandler.class);
+ private static final ExchangeBoundOkMethodHandler _instance = new ExchangeBoundOkMethodHandler();
- public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
- {
- if (_logger.isDebugEnabled())
- {
+ public static ExchangeBoundOkMethodHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private ExchangeBoundOkMethodHandler()
+ { }
+
+ public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt)
+ throws AMQException
+ {
+ if (_logger.isDebugEnabled())
+ {
ExchangeBoundOkBody body = (ExchangeBoundOkBody) evt.getMethod();
- _logger.debug("Received Exchange.Bound-Ok message, response code: " + body.replyCode + " text: " +
- body.replyText);
- }
- }
+ _logger.debug("Received Exchange.Bound-Ok message, response code: " + body.replyCode + " text: "
+ + body.replyText);
+ }
+ }
}
-
-
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java?view=diff&rev=547730&r1=547729&r2=547730
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java Fri Jun 15 09:28:46 2007
@@ -17,7 +17,6 @@
*/
package org.apache.qpid.client.handler;
-import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQStateManager;
@@ -25,31 +24,32 @@
import org.apache.qpid.framing.QueueDeleteOkBody;
import org.apache.qpid.protocol.AMQMethodEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* @author Apache Software Foundation
*/
public class QueueDeleteOkMethodHandler implements StateAwareMethodListener
{
- private static final Logger _logger = Logger.getLogger(QueueDeleteOkMethodHandler.class);
- private static final QueueDeleteOkMethodHandler _instance = new QueueDeleteOkMethodHandler();
-
- public static QueueDeleteOkMethodHandler getInstance()
- {
- return _instance;
- }
-
- private QueueDeleteOkMethodHandler()
- {
- }
+ private static final Logger _logger = LoggerFactory.getLogger(QueueDeleteOkMethodHandler.class);
+ private static final QueueDeleteOkMethodHandler _instance = new QueueDeleteOkMethodHandler();
- public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
- {
- if (_logger.isDebugEnabled())
- {
+ public static QueueDeleteOkMethodHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private QueueDeleteOkMethodHandler()
+ { }
+
+ public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt)
+ throws AMQException
+ {
+ if (_logger.isDebugEnabled())
+ {
QueueDeleteOkBody body = (QueueDeleteOkBody) evt.getMethod();
_logger.debug("Received Queue.Delete-Ok message, message count: " + body.messageCount);
- }
- }
+ }
+ }
}
-
-
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java?view=diff&rev=547730&r1=547729&r2=547730
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java Fri Jun 15 09:28:46 2007
@@ -20,18 +20,10 @@
*/
package org.apache.qpid.client.message;
-import java.util.Collections;
-import java.util.Enumeration;
-import java.util.Map;
-import java.util.UUID;
-
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.MessageNotReadableException;
-import javax.jms.MessageNotWriteableException;
-
import org.apache.commons.collections.map.ReferenceMap;
+
import org.apache.mina.common.ByteBuffer;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.client.*;
import org.apache.qpid.framing.AMQShortString;
@@ -41,6 +33,16 @@
import org.apache.qpid.url.BindingURL;
import org.apache.qpid.url.URLSyntaxException;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageNotReadableException;
+import javax.jms.MessageNotWriteableException;
+
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.Map;
+import java.util.UUID;
+
public abstract class AbstractJMSMessage extends AMQMessage implements org.apache.qpid.jms.Message
{
private static final Map _destinationCache = Collections.synchronizedMap(new ReferenceMap());
@@ -70,35 +72,32 @@
_changedData = (data == null);
_headerAdapter = new JMSHeaderAdapter(((BasicContentHeaderProperties) _contentHeaderProperties).getHeaders());
- _strictAMQP = Boolean.parseBoolean(System.getProperties().getProperty(AMQSession.STRICT_AMQP, AMQSession.STRICT_AMQP_DEFAULT));
+ _strictAMQP =
+ Boolean.parseBoolean(System.getProperties().getProperty(AMQSession.STRICT_AMQP, AMQSession.STRICT_AMQP_DEFAULT));
}
protected AbstractJMSMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, AMQShortString exchange,
- AMQShortString routingKey, ByteBuffer data) throws AMQException
+ AMQShortString routingKey, ByteBuffer data) throws AMQException
{
this(contentHeader, deliveryTag);
Integer type = contentHeader.getHeaders().getInteger(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName());
- int contentType = (type == null) ? AMQDestination.UNKNOWN_TYPE : type.intValue();
AMQDestination dest;
- switch (contentType)
+ if (AMQDestination.QUEUE_TYPE.equals(type))
{
-
- case AMQDestination.QUEUE_TYPE:
- dest = new AMQQueue(exchange, routingKey, routingKey);
- break;
-
- case AMQDestination.TOPIC_TYPE:
- dest = new AMQTopic(exchange, routingKey, null);
- break;
-
- default:
- dest = new AMQUndefinedDestination(exchange, routingKey, null);
- break;
+ dest = new AMQQueue(exchange, routingKey, routingKey);
+ }
+ else if (AMQDestination.TOPIC_TYPE.equals(type))
+ {
+ dest = new AMQTopic(exchange, routingKey, null);
}
- //Destination dest = AMQDestination.createDestination(url);
+ else
+ {
+ dest = new AMQUndefinedDestination(exchange, routingKey, null);
+ }
+ // Destination dest = AMQDestination.createDestination(url);
setJMSDestination(dest);
_data = data;
@@ -202,7 +201,7 @@
if (!(destination instanceof AMQDestination))
{
throw new IllegalArgumentException(
- "ReplyTo destination may only be an AMQDestination - passed argument was type " + destination.getClass());
+ "ReplyTo destination may only be an AMQDestination - passed argument was type " + destination.getClass());
}
final AMQDestination amqd = (AMQDestination) destination;
@@ -613,7 +612,6 @@
{
getContentHeaderProperties().setHeaders(messageProperties);
}
-
public JMSHeaderAdapter getJmsHeaders()
{
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java?view=diff&rev=547730&r1=547729&r2=547730
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java Fri Jun 15 09:28:46 2007
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,51 +20,53 @@
*/
package org.apache.qpid.client.message;
-import java.util.Iterator;
-import java.util.List;
-
-import javax.jms.JMSException;
-
-import org.apache.log4j.Logger;
import org.apache.mina.common.ByteBuffer;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.JMSException;
+
+import java.util.Iterator;
+import java.util.List;
+
public abstract class AbstractJMSMessageFactory implements MessageFactory
{
- private static final Logger _logger = Logger.getLogger(AbstractJMSMessageFactory.class);
-
+ private static final Logger _logger = LoggerFactory.getLogger(AbstractJMSMessageFactory.class);
- protected abstract AbstractJMSMessage createMessage(long messageNbr, ByteBuffer data,
- AMQShortString exchange, AMQShortString routingKey,
- ContentHeaderBody contentHeader) throws AMQException;
+ protected abstract AbstractJMSMessage createMessage(long messageNbr, ByteBuffer data, AMQShortString exchange,
+ AMQShortString routingKey, ContentHeaderBody contentHeader) throws AMQException;
- protected AbstractJMSMessage createMessageWithBody(long messageNbr,
- ContentHeaderBody contentHeader,
- AMQShortString exchange, AMQShortString routingKey,
- List bodies) throws AMQException
+ protected AbstractJMSMessage createMessageWithBody(long messageNbr, ContentHeaderBody contentHeader,
+ AMQShortString exchange, AMQShortString routingKey, List bodies) throws AMQException
{
ByteBuffer data;
final boolean debug = _logger.isDebugEnabled();
// we optimise the non-fragmented case to avoid copying
- if (bodies != null && bodies.size() == 1)
+ if ((bodies != null) && (bodies.size() == 1))
{
- if(debug)
+ if (debug)
{
- _logger.debug("Non-fragmented message body (bodySize=" + contentHeader.bodySize +")");
+ _logger.debug("Non-fragmented message body (bodySize=" + contentHeader.bodySize + ")");
}
- data = ((ContentBody)bodies.get(0)).payload;
+
+ data = ((ContentBody) bodies.get(0)).payload;
}
else if (bodies != null)
{
- if(debug)
+ if (debug)
{
- _logger.debug("Fragmented message body (" + bodies.size() + " frames, bodySize=" + contentHeader.bodySize + ")");
+ _logger.debug("Fragmented message body (" + bodies.size() + " frames, bodySize=" + contentHeader.bodySize
+ + ")");
}
- data = ByteBuffer.allocate((int)contentHeader.bodySize); // XXX: Is cast a problem?
+
+ data = ByteBuffer.allocate((int) contentHeader.bodySize); // XXX: Is cast a problem?
final Iterator it = bodies.iterator();
while (it.hasNext())
{
@@ -72,27 +74,29 @@
data.put(cb.payload);
cb.payload.release();
}
+
data.flip();
}
else // bodies == null
{
data = ByteBuffer.allocate(0);
}
- if(debug)
+
+ if (debug)
{
- _logger.debug("Creating message from buffer with position=" + data.position() + " and remaining=" + data.remaining());
+ _logger.debug("Creating message from buffer with position=" + data.position() + " and remaining="
+ + data.remaining());
}
return createMessage(messageNbr, data, exchange, routingKey, contentHeader);
}
- public AbstractJMSMessage createMessage(long messageNbr, boolean redelivered,
- ContentHeaderBody contentHeader,
- AMQShortString exchange, AMQShortString routingKey,
- List bodies) throws JMSException, AMQException
+ public AbstractJMSMessage createMessage(long messageNbr, boolean redelivered, ContentHeaderBody contentHeader,
+ AMQShortString exchange, AMQShortString routingKey, List bodies) throws JMSException, AMQException
{
final AbstractJMSMessage msg = createMessageWithBody(messageNbr, contentHeader, exchange, routingKey, bodies);
msg.setJMSRedelivered(redelivered);
+
return msg;
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java?view=diff&rev=547730&r1=547729&r2=547730
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java Fri Jun 15 09:28:46 2007
@@ -14,36 +14,38 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
+ * under the License.
+ *
*
- *
*/
package org.apache.qpid.client.message;
+import org.apache.mina.common.ByteBuffer;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ContentHeaderBody;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.JMSException;
+import javax.jms.MessageFormatException;
+
import java.nio.charset.CharacterCodingException;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
-import javax.jms.JMSException;
-import javax.jms.MessageFormatException;
-
-import org.apache.log4j.Logger;
-import org.apache.mina.common.ByteBuffer;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ContentHeaderBody;
-
public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jms.MapMessage
{
- private static final Logger _logger = Logger.getLogger(JMSMapMessage.class);
-
+ private static final Logger _logger = LoggerFactory.getLogger(JMSMapMessage.class);
public static final String MIME_TYPE = "jms/map-message";
private static final AMQShortString MIME_TYPE_SHORT_STRING = new AMQShortString(MIME_TYPE);
- private Map<String,Object> _map = new HashMap<String, Object>();
+ private Map<String, Object> _map = new HashMap<String, Object>();
public JMSMapMessage() throws JMSException
{
@@ -56,24 +58,22 @@
populateMapFromData();
}
-
- JMSMapMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange,
- AMQShortString routingKey, ByteBuffer data) throws AMQException
+ JMSMapMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange, AMQShortString routingKey,
+ ByteBuffer data) throws AMQException
{
super(messageNbr, contentHeader, exchange, routingKey, data);
try
{
populateMapFromData();
- }
+ }
catch (JMSException je)
{
throw new AMQException(null, "Error populating MapMessage from ByteBuffer", je);
-
+
}
}
-
public String toBodyString() throws JMSException
{
return _map.toString();
@@ -84,16 +84,14 @@
return MIME_TYPE_SHORT_STRING;
}
-
public ByteBuffer getData()
{
- //What if _data is null?
+ // What if _data is null?
writeMapToData();
+
return super.getData();
}
-
-
@Override
public void clearBodyImpl() throws JMSException
{
@@ -105,18 +103,18 @@
{
Object value = _map.get(propName);
- if(value instanceof Boolean)
+ if (value instanceof Boolean)
{
- return ((Boolean)value).booleanValue();
+ return ((Boolean) value).booleanValue();
}
- else if((value instanceof String) || (value == null))
+ else if ((value instanceof String) || (value == null))
{
- return Boolean.valueOf((String)value);
+ return Boolean.valueOf((String) value);
}
else
{
- throw new MessageFormatException("Property " + propName + " of type " +
- value.getClass().getName() + " cannot be converted to boolean.");
+ throw new MessageFormatException("Property " + propName + " of type " + value.getClass().getName()
+ + " cannot be converted to boolean.");
}
}
@@ -125,18 +123,18 @@
{
Object value = _map.get(propName);
- if(value instanceof Byte)
+ if (value instanceof Byte)
{
- return ((Byte)value).byteValue();
+ return ((Byte) value).byteValue();
}
- else if((value instanceof String) || (value==null))
+ else if ((value instanceof String) || (value == null))
{
- return Byte.valueOf((String)value).byteValue();
+ return Byte.valueOf((String) value).byteValue();
}
else
{
- throw new MessageFormatException("Property " + propName + " of type " +
- value.getClass().getName() + " cannot be converted to byte.");
+ throw new MessageFormatException("Property " + propName + " of type " + value.getClass().getName()
+ + " cannot be converted to byte.");
}
}
@@ -144,51 +142,50 @@
{
Object value = _map.get(propName);
- if(value instanceof Short)
+ if (value instanceof Short)
{
- return ((Short)value).shortValue();
+ return ((Short) value).shortValue();
}
- else if(value instanceof Byte)
+ else if (value instanceof Byte)
{
- return ((Byte)value).shortValue();
+ return ((Byte) value).shortValue();
}
- else if((value instanceof String) || (value==null))
+ else if ((value instanceof String) || (value == null))
{
- return Short.valueOf((String)value).shortValue();
+ return Short.valueOf((String) value).shortValue();
}
else
{
- throw new MessageFormatException("Property " + propName + " of type " +
- value.getClass().getName() + " cannot be converted to short.");
+ throw new MessageFormatException("Property " + propName + " of type " + value.getClass().getName()
+ + " cannot be converted to short.");
}
}
-
public int getInt(String propName) throws JMSException
{
Object value = _map.get(propName);
- if(value instanceof Integer)
+ if (value instanceof Integer)
{
- return ((Integer)value).intValue();
+ return ((Integer) value).intValue();
}
- else if(value instanceof Short)
+ else if (value instanceof Short)
{
- return ((Short)value).intValue();
+ return ((Short) value).intValue();
}
- else if(value instanceof Byte)
+ else if (value instanceof Byte)
{
- return ((Byte)value).intValue();
+ return ((Byte) value).intValue();
}
- else if((value instanceof String) || (value==null))
+ else if ((value instanceof String) || (value == null))
{
- return Integer.valueOf((String)value).intValue();
+ return Integer.valueOf((String) value).intValue();
}
else
{
- throw new MessageFormatException("Property " + propName + " of type " +
- value.getClass().getName() + " cannot be converted to int.");
+ throw new MessageFormatException("Property " + propName + " of type " + value.getClass().getName()
+ + " cannot be converted to int.");
}
}
@@ -197,30 +194,32 @@
{
Object value = _map.get(propName);
- if(value instanceof Long)
+ if (value instanceof Long)
{
- return ((Long)value).longValue();
+ return ((Long) value).longValue();
}
- else if(value instanceof Integer)
+ else if (value instanceof Integer)
{
- return ((Integer)value).longValue();
+ return ((Integer) value).longValue();
}
- if(value instanceof Short)
+
+ if (value instanceof Short)
{
- return ((Short)value).longValue();
+ return ((Short) value).longValue();
}
- if(value instanceof Byte)
+
+ if (value instanceof Byte)
{
- return ((Byte)value).longValue();
+ return ((Byte) value).longValue();
}
- else if((value instanceof String) || (value==null))
+ else if ((value instanceof String) || (value == null))
{
- return Long.valueOf((String)value).longValue();
+ return Long.valueOf((String) value).longValue();
}
else
{
- throw new MessageFormatException("Property " + propName + " of type " +
- value.getClass().getName() + " cannot be converted to long.");
+ throw new MessageFormatException("Property " + propName + " of type " + value.getClass().getName()
+ + " cannot be converted to long.");
}
}
@@ -229,45 +228,43 @@
{
Object value = _map.get(propName);
- if(!_map.containsKey(propName))
+ if (!_map.containsKey(propName))
{
throw new MessageFormatException("Property " + propName + " not present");
}
- else if(value instanceof Character)
+ else if (value instanceof Character)
{
- return ((Character)value).charValue();
+ return ((Character) value).charValue();
}
else if (value == null)
{
- throw new NullPointerException("Property " + propName + " has null value and therefore cannot " +
- "be converted to char.");
+ throw new NullPointerException("Property " + propName + " has null value and therefore cannot "
+ + "be converted to char.");
}
else
{
- throw new MessageFormatException("Property " + propName + " of type " +
- value.getClass().getName() + " cannot be converted to boolan.");
+ throw new MessageFormatException("Property " + propName + " of type " + value.getClass().getName()
+ + " cannot be converted to boolan.");
}
}
-
-
public float getFloat(String propName) throws JMSException
{
Object value = _map.get(propName);
- if(value instanceof Float)
+ if (value instanceof Float)
{
- return ((Float)value).floatValue();
+ return ((Float) value).floatValue();
}
- else if((value instanceof String) || (value==null))
+ else if ((value instanceof String) || (value == null))
{
- return Float.valueOf((String)value).floatValue();
+ return Float.valueOf((String) value).floatValue();
}
else
{
- throw new MessageFormatException("Property " + propName + " of type " +
- value.getClass().getName() + " cannot be converted to float.");
+ throw new MessageFormatException("Property " + propName + " of type " + value.getClass().getName()
+ + " cannot be converted to float.");
}
}
@@ -275,22 +272,22 @@
{
Object value = _map.get(propName);
- if(value instanceof Double)
+ if (value instanceof Double)
{
- return ((Double)value).doubleValue();
+ return ((Double) value).doubleValue();
}
- else if(value instanceof Float)
+ else if (value instanceof Float)
{
- return ((Float)value).doubleValue();
+ return ((Float) value).doubleValue();
}
- else if((value instanceof String) || (value==null))
+ else if ((value instanceof String) || (value == null))
{
- return Double.valueOf((String)value).doubleValue();
+ return Double.valueOf((String) value).doubleValue();
}
else
{
- throw new MessageFormatException("Property " + propName + " of type " +
- value.getClass().getName() + " cannot be converted to double.");
+ throw new MessageFormatException("Property " + propName + " of type " + value.getClass().getName()
+ + " cannot be converted to double.");
}
}
@@ -298,14 +295,13 @@
{
Object value = _map.get(propName);
- if((value instanceof String) || (value == null))
+ if ((value instanceof String) || (value == null))
{
return (String) value;
}
- else if(value instanceof byte[])
+ else if (value instanceof byte[])
{
- throw new MessageFormatException("Property " + propName + " of type byte[] " +
- "cannot be converted to String.");
+ throw new MessageFormatException("Property " + propName + " of type byte[] " + "cannot be converted to String.");
}
else
{
@@ -318,18 +314,18 @@
{
Object value = _map.get(propName);
- if(!_map.containsKey(propName))
+ if (!_map.containsKey(propName))
{
- throw new MessageFormatException("Property " + propName + " not present");
+ throw new MessageFormatException("Property " + propName + " not present");
}
- else if((value instanceof byte[]) || (value == null))
+ else if ((value instanceof byte[]) || (value == null))
{
- return (byte[])value;
+ return (byte[]) value;
}
else
{
- throw new MessageFormatException("Property " + propName + " of type " +
- value.getClass().getName() + " cannot be converted to byte[].");
+ throw new MessageFormatException("Property " + propName + " of type " + value.getClass().getName()
+ + " cannot be converted to byte[].");
}
}
@@ -343,7 +339,6 @@
return Collections.enumeration(_map.keySet());
}
-
public void setBoolean(String propName, boolean b) throws JMSException
{
checkWritable();
@@ -416,46 +411,38 @@
public void setBytes(String propName, byte[] bytes, int offset, int length) throws JMSException
{
- if((offset == 0) && (length == bytes.length))
+ if ((offset == 0) && (length == bytes.length))
{
- setBytes(propName,bytes);
+ setBytes(propName, bytes);
}
else
{
byte[] newBytes = new byte[length];
- System.arraycopy(bytes,offset,newBytes,0,length);
- setBytes(propName,newBytes);
+ System.arraycopy(bytes, offset, newBytes, 0, length);
+ setBytes(propName, newBytes);
}
}
public void setObject(String propName, Object value) throws JMSException
- {
+ {
checkWritable();
checkPropertyName(propName);
- if(value instanceof Boolean
- || value instanceof Byte
- || value instanceof Short
- || value instanceof Integer
- || value instanceof Long
- || value instanceof Character
- || value instanceof Float
- || value instanceof Double
- || value instanceof String
- || value instanceof byte[]
- || value == null)
+ if ((value instanceof Boolean) || (value instanceof Byte) || (value instanceof Short) || (value instanceof Integer)
+ || (value instanceof Long) || (value instanceof Character) || (value instanceof Float)
+ || (value instanceof Double) || (value instanceof String) || (value instanceof byte[]) || (value == null))
{
_map.put(propName, value);
}
else
{
- throw new MessageFormatException("Cannot set property " + propName + " to value " + value +
- "of type " + value.getClass().getName() + ".");
+ throw new MessageFormatException("Cannot set property " + propName + " to value " + value + "of type "
+ + value.getClass().getName() + ".");
}
}
private void checkPropertyName(String propName)
{
- if(propName == null || propName.equals(""))
+ if ((propName == null) || propName.equals(""))
{
throw new IllegalArgumentException("Property name cannot be null, or the empty String.");
}
@@ -466,19 +453,18 @@
return _map.containsKey(propName);
}
-
private void populateMapFromData() throws JMSException
{
- if(_data != null)
+ if (_data != null)
{
_data.rewind();
final int entries = readIntImpl();
- for(int i = 0; i < entries; i++)
+ for (int i = 0; i < entries; i++)
{
String propName = readStringImpl();
Object value = readObject();
- _map.put(propName,value);
+ _map.put(propName, value);
}
}
else
@@ -492,7 +478,7 @@
allocateInitialBuffer();
final int size = _map.size();
writeIntImpl(size);
- for(Map.Entry<String, Object> entry : _map.entrySet())
+ for (Map.Entry<String, Object> entry : _map.entrySet())
{
try
{
@@ -500,10 +486,10 @@
}
catch (CharacterCodingException e)
{
- throw new IllegalArgumentException("Cannot encode property key name " + entry.getKey(),e);
-
+ throw new IllegalArgumentException("Cannot encode property key name " + entry.getKey(), e);
}
+
try
{
writeObject(entry.getValue());
@@ -511,14 +497,11 @@
catch (JMSException e)
{
Object value = entry.getValue();
- throw new IllegalArgumentException("Cannot encode property key name " + entry.getKey() +
- " value : " + value + " (type: " + value.getClass().getName() + ").",e);
+ throw new IllegalArgumentException("Cannot encode property key name " + entry.getKey() + " value : " + value
+ + " (type: " + value.getClass().getName() + ").", e);
}
}
}
-
-
-
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageConverter.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageConverter.java?view=diff&rev=547730&r1=547729&r2=547730
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageConverter.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageConverter.java Fri Jun 15 09:28:46 2007
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,7 +20,8 @@
*/
package org.apache.qpid.client.message;
-import java.util.Enumeration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.jms.BytesMessage;
import javax.jms.JMSException;
@@ -31,7 +32,7 @@
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
-import org.apache.log4j.Logger;
+import java.util.Enumeration;
public class MessageConverter
{
@@ -39,7 +40,7 @@
/**
* Log4J logger
*/
- protected final Logger _logger = Logger.getLogger(getClass());
+ protected final Logger _logger = LoggerFactory.getLogger(getClass());
/**
* AbstractJMSMessage which will hold the converted message
@@ -81,6 +82,7 @@
String name = (String) mapNames.nextElement();
nativeMessage.setObject(name, message.getObject(name));
}
+
_newMessage = (AbstractJMSMessage) nativeMessage;
setMessageProperties(message);
}
@@ -121,15 +123,16 @@
}
catch (MessageEOFException e)
{
- //we're at the end so don't mind the exception
+ // we're at the end so don't mind the exception
}
+
_newMessage = (AbstractJMSMessage) nativeMessage;
setMessageProperties(message);
}
public MessageConverter(Message message) throws JMSException
{
- //Send a message with just properties.
+ // Send a message with just properties.
// Throwing away content
BytesMessage nativeMessage = new JMSBytesMessage();
@@ -160,7 +163,7 @@
while (propertyNames.hasMoreElements())
{
String propertyName = String.valueOf(propertyNames.nextElement());
- //TODO: Shouldn't need to check for JMS properties here as don't think getPropertyNames() should return them
+ // TODO: Shouldn't need to check for JMS properties here as don't think getPropertyNames() should return them
if (!propertyName.startsWith("JMSX_"))
{
Object value = message.getObjectProperty(propertyName);
@@ -190,6 +193,7 @@
{
_newMessage.setJMSReplyTo(message.getJMSReplyTo());
}
+
_newMessage.setJMSType(message.getJMSType());
_newMessage.setJMSCorrelationID(message.getJMSCorrelationID());
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?view=diff&rev=547730&r1=547729&r2=547730
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Fri Jun 15 09:28:46 2007
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,10 +20,6 @@
*/
package org.apache.qpid.client.protocol;
-import java.util.Iterator;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.CountDownLatch;
-
import org.apache.log4j.Logger;
import org.apache.mina.common.IdleStatus;
import org.apache.mina.common.IoHandlerAdapter;
@@ -34,10 +30,10 @@
import org.apache.qpid.AMQDisconnectedException;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQTimeoutException;
-import org.apache.qpid.AMQChannelClosedException;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.SSLConfiguration;
+import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverHandler;
import org.apache.qpid.client.failover.FailoverState;
import org.apache.qpid.client.state.AMQState;
@@ -60,9 +56,67 @@
import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.ssl.SSLContextFactory;
+import java.util.Iterator;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.CountDownLatch;
+/**
+ * AMQProtocolHandler is the client side protocol handler for AMQP, it handles all protocol events received from the
+ * network by MINA. The primary purpose of AMQProtocolHandler is to translate the generic event model of MINA into the
+ * specific event model of AMQP, by revealing the type of the received events (from decoded data), and passing the event
+ * on to more specific handlers for the type. In this sense, it channels the richer event model of AMQP, expressed in
+ * terms of methods and so on, through the cruder, general purpose event model of MINA, expressed in terms of "message
+ * received" and so on.
+ *
+ * <p/>There is a 1:1 mapping between an AMQProtocolHandler and an {@link AMQConnection}. The connection class is
+ * exposed to the end user of the AMQP client API, and also implements the JMS Connection API, so provides the public
+ * API calls through which an individual connection can be manipulated. This protocol handler talks to the network
+ * through MINA, in a behind the scenes role; it is not an exposed part of the client API.
+ *
+ * <p/>There is a 1:many mapping between an AMQProtocolHandler and a set of {@link AMQSession}s. At the MINA level,
+ * there is one session per connection. At the AMQP level there can be many channels which are also called sessions in
+ * JMS parlance. The {@link AMQSession}s are managed through an {@link AMQProtocolSession} instance. The protocol
+ * session is similar to the MINA per-connection session, except that it can span the lifecycle of multiple MINA
+ * sessions in the event of failover. See below for more information about this.
+ *
+ * <p/>Mina provides a session container that can be used to store/retrieve arbitrary objects as String named
+ * attributes. A more convenient, type-safe, container for session data is provided in the form of {@link
+ * AMQProtocolSession}.
+ *
+ * <p/>A common way to use MINA is to have a single instance of the event handler, and for MINA to pass in its session
+ * object with every event, and for per-connection data to be held in the MINA session (perhaps using a type-safe
+ * wrapper as described above). This event handler is different, because dealing with failover complicates things. To
+ * the end client of an AMQConnection, a failed over connection is still handled through the same connection instance,
+ * but behind the scenes a new transport connection, and MINA session will have been created. The MINA session object
+ * cannot be used to track the state of the fail-over process, because it is destroyed and a new one is created, as the
+ * old connection is shutdown and a new one created. For this reason, an AMQProtocolHandler is created per AMQConnection
+ * and the protocol session data is held outside of the MINA IOSession.
+ *
+ * <p/>This handler is responsibile for setting up the filter chain to filter all events for this handler through. The
+ * filter chain is set up as a stack of event handers that perform the following functions (working upwards from the
+ * network traffic at the bottom), handing off incoming events to an asynchronous thread pool to do the work, optionally
+ * handling secure sockets encoding/decoding, encoding/decoding the AMQP format itself.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collaborations <tr><td> Create the
+ * filter chain to filter this handlers events. <td> {@link ProtocolCodecFilter}, {@link SSLContextFactory}, {@link
+ * SSLFilter}, {@link ReadWriteThreadModel}.
+ *
+ * <tr><td> Maintain fail-over state. <tr><td> </table>
+ *
+ * @todo Explain the system property: amqj.shared_read_write_pool. How does putting the protocol codec filter before the
+ * async write filter make it a shared pool? The pooling filter uses the same thread pool for reading and writing
+ * anyway, see {@link org.apache.qpid.pool.PoolingFilter}, docs for comments. Will putting the protocol codec filter
+ * before it mean not doing the read/write asynchronously but in the main filter thread?
+ * @todo Use a single handler instance, by shifting everything to do with the 'protocol session' state, including
+ * failover state, into AMQProtocolSession, and tracking that from AMQConnection? The lifecycles of AMQProtocolSesssion
+ * and AMQConnection will be the same, so if there is high cohesion between them, they could be merged, although there
+ * is sense in keeping the session model seperate. Will clarify things by having data held per protocol handler, per
+ * protocol session, per network connection, per channel, in seperate classes, so that lifecycles of the fields match
+ * lifecycles of their containing objects.
+ */
public class AMQProtocolHandler extends IoHandlerAdapter
{
+ /** Used for debugging. */
private static final Logger _logger = Logger.getLogger(AMQProtocolHandler.class);
/**
@@ -74,8 +128,10 @@
/** Our wrapper for a protocol session that provides access to session values in a typesafe manner. */
private volatile AMQProtocolSession _protocolSession;
+ /** Holds the state of the protocol session. */
private AMQStateManager _stateManager = new AMQStateManager();
+ /** Holds the method listeners, */
private final CopyOnWriteArraySet _frameListeners = new CopyOnWriteArraySet();
/**
@@ -91,15 +147,31 @@
*/
private FailoverState _failoverState = FailoverState.NOT_STARTED;
+ /** Used to provide a condition to wait upon for operations that are required to wait for failover to complete. */
private CountDownLatch _failoverLatch;
+ /** Defines the default timeout to use for synchronous protocol commands. */
private final long DEFAULT_SYNC_TIMEOUT = 1000 * 30;
+ /**
+ * Creates a new protocol handler, associated with the specified client connection instance.
+ *
+ * @param con The client connection that this is the event handler for.
+ */
public AMQProtocolHandler(AMQConnection con)
{
_connection = con;
}
+ /**
+ * Invoked by MINA when a MINA session for a new connection is created. This method sets up the filter chain on the
+ * session, which filters the events handled by this handler. The filter chain consists of, handing off events to an
+ * asynchronous thread pool, optionally encoding/decoding ssl, encoding/decoding AMQP.
+ *
+ * @param session The MINA session.
+ *
+ * @throws Exception Any underlying exceptions are allowed to fall through to MINA.
+ */
public void sessionCreated(IoSession session) throws Exception
{
_logger.debug("Protocol session created for session " + System.identityHashCode(session));
@@ -119,16 +191,15 @@
if (_connection.getSSLConfiguration() != null)
{
SSLConfiguration sslConfig = _connection.getSSLConfiguration();
- SSLContextFactory sslFactory = new SSLContextFactory(sslConfig.getKeystorePath(), sslConfig.getKeystorePassword(), sslConfig.getCertType());
+ SSLContextFactory sslFactory =
+ new SSLContextFactory(sslConfig.getKeystorePath(), sslConfig.getKeystorePassword(), sslConfig.getCertType());
SSLFilter sslFilter = new SSLFilter(sslFactory.buildClientContext());
sslFilter.setUseClientMode(true);
session.getFilterChain().addBefore("protocolFilter", "ssl", sslFilter);
}
-
try
{
-
ReadWriteThreadModel threadModel = ReadWriteThreadModel.getInstance();
threadModel.getAsynchronousReadFilter().createNewJobForSession(session);
threadModel.getAsynchronousWriteFilter().createNewJobForSession(session);
@@ -142,35 +213,38 @@
_protocolSession.init();
}
- public void sessionOpened(IoSession session) throws Exception
- {
- //System.setProperty("foo", "bar");
- }
-
/**
- * When the broker connection dies we can either get sessionClosed() called or exceptionCaught() followed by
- * sessionClosed() depending on whether we were trying to send data at the time of failure.
+ * Called when the network connection is closed. This can happen, either because the client explicitly requested
+ * that the connection be closed, in which case nothing is done, or because the connection died. In the case where
+ * the connection died, an attempt to failover automatically to a new connection may be started. The failover
+ * process will be started, provided that it is the clients policy to allow failover, and provided that a failover
+ * has not already been started or failed.
+ *
+ * <p/>It is important to note that when the connection dies this method may be called or {@link #exceptionCaught}
+ * may be called first followed by this method. This depends on whether the client was trying to send data at the
+ * time of the failure.
*
- * @param session
+ * @param session The MINA session.
*
- * @throws Exception
+ * @todo Clarify: presumably exceptionCaught is called when the client is sending during a connection failure and
+ * not otherwise? The above comment doesn't make that clear.
*/
- public void sessionClosed(IoSession session) throws Exception
+ public void sessionClosed(IoSession session)
{
if (_connection.isClosed())
{
- _logger.info("Session closed called by client");
+ _logger.debug("Session closed called by client");
}
else
{
- _logger.info("Session closed called with failover state currently " + _failoverState);
+ _logger.debug("Session closed called with failover state currently " + _failoverState);
- //reconnetablility was introduced here so as not to disturb the client as they have made their intentions
+ // reconnetablility was introduced here so as not to disturb the client as they have made their intentions
// known through the policy settings.
if ((_failoverState != FailoverState.IN_PROGRESS) && _connection.failoverAllowed())
{
- _logger.info("FAILOVER STARTING");
+ _logger.debug("FAILOVER STARTING");
if (_failoverState == FailoverState.NOT_STARTED)
{
_failoverState = FailoverState.IN_PROGRESS;
@@ -178,12 +252,12 @@
}
else
{
- _logger.info("Not starting failover as state currently " + _failoverState);
+ _logger.debug("Not starting failover as state currently " + _failoverState);
}
}
else
{
- _logger.info("Failover not allowed by policy.");
+ _logger.debug("Failover not allowed by policy."); // or already in progress?
if (_logger.isDebugEnabled())
{
@@ -199,12 +273,12 @@
}
else
{
- _logger.info("sessionClose() failover in progress");
+ _logger.debug("sessionClose() failover in progress");
}
}
}
- _logger.info("Protocol Session [" + this + "] closed");
+ _logger.debug("Protocol Session [" + this + "] closed");
}
/** See {@link FailoverHandler} to see rationale for separate thread. */
@@ -223,25 +297,32 @@
_logger.debug("Protocol Session [" + this + ":" + session + "] idle: " + status);
if (IdleStatus.WRITER_IDLE.equals(status))
{
- //write heartbeat frame:
+ // write heartbeat frame:
_logger.debug("Sent heartbeat");
session.write(HeartbeatBody.FRAME);
HeartbeatDiagnostics.sent();
}
else if (IdleStatus.READER_IDLE.equals(status))
{
- //failover:
+ // failover:
HeartbeatDiagnostics.timeout();
_logger.warn("Timed out while waiting for heartbeat from peer.");
session.close();
}
}
- public void exceptionCaught(IoSession session, Throwable cause) throws Exception
+ /**
+ * Invoked when any exception is thrown by a user IoHandler implementation or by MINA. If the cause is an
+ * IOException, MINA will close the connection automatically.
+ *
+ * @param session The MINA session.
+ * @param cause The exception that triggered this event.
+ */
+ public void exceptionCaught(IoSession session, Throwable cause)
{
if (_failoverState == FailoverState.NOT_STARTED)
{
- //if (!(cause instanceof AMQUndeliveredException) && (!(cause instanceof AMQAuthenticationException)))
+ // if (!(cause instanceof AMQUndeliveredException) && (!(cause instanceof AMQAuthenticationException)))
if (cause instanceof AMQConnectionClosedException)
{
_logger.info("Exception caught therefore going to attempt failover: " + cause, cause);
@@ -250,8 +331,8 @@
sessionClosed(session);
}
- //FIXME Need to correctly handle other exceptions. Things like ...
-// if (cause instanceof AMQChannelClosedException)
+ // FIXME Need to correctly handle other exceptions. Things like ...
+ // if (cause instanceof AMQChannelClosedException)
// which will cause the JMSSession to end due to a channel close and so that Session needs
// to be removed from the map so we can correctly still call close without an exception when trying to close
// the server closed session. See also CloseChannelMethodHandler as the sessionClose is never called on exception
@@ -261,6 +342,7 @@
else if (_failoverState == FailoverState.FAILED)
{
_logger.error("Exception caught by protocol handler: " + cause, cause);
+
// we notify the state manager of the error in case we have any clients waiting on a state
// change. Those "waiters" will be interrupted and can handle the exception
AMQException amqe = new AMQException(null, "Protocol handler error: " + cause, cause);
@@ -297,7 +379,7 @@
final boolean debug = _logger.isDebugEnabled();
final long msgNumber = ++_messageReceivedCount;
- if (debug && (msgNumber % 1000 == 0))
+ if (debug && ((msgNumber % 1000) == 0))
{
_logger.debug("Received " + _messageReceivedCount + " protocol messages");
}
@@ -317,7 +399,8 @@
_logger.debug("(" + System.identityHashCode(this) + ")Method frame received: " + frame);
}
- final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(frame.getChannel(), (AMQMethodBody) bodyFrame);
+ final AMQMethodEvent<AMQMethodBody> evt =
+ new AMQMethodEvent<AMQMethodBody>(frame.getChannel(), (AMQMethodBody) bodyFrame);
try
{
@@ -331,10 +414,16 @@
final AMQMethodListener listener = (AMQMethodListener) it.next();
wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested;
}
+ if (!wasAnyoneInterested)
+ {
+ throw new AMQException(null, "AMQMethodEvent " + evt + " was not processed by any listener. Listeners:" + _frameListeners, null);
+ }
}
+
if (!wasAnyoneInterested)
{
- throw new AMQException(null, "AMQMethodEvent " + evt + " was not processed by any listener. Listeners:" + _frameListeners, null);
+ throw new AMQException(null, "AMQMethodEvent " + evt + " was not processed by any listener. Listeners:"
+ + _frameListeners, null);
}
}
catch (AMQException e)
@@ -349,20 +438,20 @@
listener.error(e);
}
}
+
exceptionCaught(session, e);
}
+
break;
case ContentHeaderBody.TYPE:
- _protocolSession.messageContentHeaderReceived(frame.getChannel(),
- (ContentHeaderBody) bodyFrame);
+ _protocolSession.messageContentHeaderReceived(frame.getChannel(), (ContentHeaderBody) bodyFrame);
break;
case ContentBody.TYPE:
- _protocolSession.messageContentBodyReceived(frame.getChannel(),
- (ContentBody) bodyFrame);
+ _protocolSession.messageContentBodyReceived(frame.getChannel(), (ContentBody) bodyFrame);
break;
case HeartbeatBody.TYPE:
@@ -371,11 +460,13 @@
{
_logger.debug("Received heartbeat");
}
+
break;
default:
}
+
_connection.bytesReceived(_protocolSession.getIoSession().getReadBytes());
}
@@ -387,10 +478,11 @@
final boolean debug = _logger.isDebugEnabled();
- if (debug && (sentMessages % 1000 == 0))
+ if (debug && ((sentMessages % 1000) == 0))
{
_logger.debug("Sent " + _messagesOut + " protocol messages");
}
+
_connection.bytesSent(session.getWrittenBytes());
if (debug)
{
@@ -408,7 +500,7 @@
{
_frameListeners.remove(listener);
}
- */
+ */
public void attainState(AMQState s) throws AMQException
{
getStateManager().attainState(s);
@@ -437,9 +529,8 @@
* @param frame
* @param listener the blocking listener. Note the calling thread will block.
*/
- public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame,
- BlockingMethodFrameListener listener)
- throws AMQException
+ public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame, BlockingMethodFrameListener listener)
+ throws AMQException, FailoverException
{
return writeCommandFrameAndWaitForReply(frame, listener, DEFAULT_SYNC_TIMEOUT);
}
@@ -451,9 +542,8 @@
* @param frame
* @param listener the blocking listener. Note the calling thread will block.
*/
- public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame,
- BlockingMethodFrameListener listener, long timeout)
- throws AMQException
+ public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame, BlockingMethodFrameListener listener,
+ long timeout) throws AMQException, FailoverException
{
try
{
@@ -461,6 +551,7 @@
_protocolSession.writeFrame(frame);
AMQMethodEvent e = listener.blockForFrame(timeout);
+
return e;
// When control resumes before this line, a reply will have been received
// that matches the criteria defined in the blocking listener
@@ -478,25 +569,33 @@
}
/** More convenient method to write a frame and wait for it's response. */
- public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass) throws AMQException
+ public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass) throws AMQException, FailoverException
{
return syncWrite(frame, responseClass, DEFAULT_SYNC_TIMEOUT);
}
/** More convenient method to write a frame and wait for it's response. */
- public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass, long timeout) throws AMQException
+ public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass, long timeout) throws AMQException, FailoverException
{
- return writeCommandFrameAndWaitForReply(frame,
- new SpecificMethodFrameListener(frame.getChannel(), responseClass), timeout);
+ return writeCommandFrameAndWaitForReply(frame, new SpecificMethodFrameListener(frame.getChannel(), responseClass),
+ timeout);
}
-
-
public void closeSession(AMQSession session) throws AMQException
{
_protocolSession.closeSession(session);
}
+ /**
+ * Closes the connection.
+ *
+ * <p/>If a failover exception occurs whilst closing the connection it is ignored, as the connection is closed
+ * anyway.
+ *
+ * @param timeout The timeout to wait for an acknowledgement to the close request.
+ *
+ * @throws AMQException If the close fails for any reason.
+ */
public void closeConnection(long timeout) throws AMQException
{
getStateManager().changeState(AMQState.CONNECTION_CLOSING);
@@ -504,13 +603,13 @@
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- final AMQFrame frame = ConnectionCloseBody.createAMQFrame(0,
- _protocolSession.getProtocolMajorVersion(),
- _protocolSession.getProtocolMinorVersion(), // AMQP version (major, minor)
- 0, // classId
- 0, // methodId
- AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
- new AMQShortString("JMS client is closing the connection.")); // replyText
+ final AMQFrame frame =
+ ConnectionCloseBody.createAMQFrame(0, _protocolSession.getProtocolMajorVersion(),
+ _protocolSession.getProtocolMinorVersion(), // AMQP version (major, minor)
+ 0, // classId
+ 0, // methodId
+ AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
+ new AMQShortString("JMS client is closing the connection.")); // replyText
try
{
@@ -521,8 +620,10 @@
{
_protocolSession.closeProtocolSession(false);
}
-
-
+ catch (FailoverException e)
+ {
+ _logger.debug("FailoverException interrupted connection close, ignoring as connection close anyway.");
+ }
}
/** @return the number of bytes read from this protocol session */
@@ -603,7 +704,6 @@
{
return _protocolSession.getProtocolMajorVersion();
}
-
public byte getProtocolMinorVersion()
{
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?view=diff&rev=547730&r1=547729&r2=547730
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java Fri Jun 15 09:28:46 2007
@@ -20,16 +20,8 @@
*/
package org.apache.qpid.client.protocol;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import javax.jms.JMSException;
-import javax.security.sasl.SaslClient;
-
import org.apache.commons.lang.StringUtils;
-import org.apache.log4j.Logger;
-
import org.apache.mina.common.CloseFuture;
import org.apache.mina.common.IdleStatus;
import org.apache.mina.common.IoSession;
@@ -53,16 +45,24 @@
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.JMSException;
+import javax.security.sasl.SaslClient;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
/**
* Wrapper for protocol session that provides type-safe access to session attributes. <p/> The underlying protocol
* session is still available but clients should not use it to obtain session attributes.
*/
public class AMQProtocolSession implements AMQVersionAwareProtocolSession
{
-
protected static final int LAST_WRITE_FUTURE_JOIN_TIMEOUT = 1000 * 60 * 2;
- protected static final Logger _logger = Logger.getLogger(AMQProtocolSession.class);
+ protected static final Logger _logger = LoggerFactory.getLogger(AMQProtocolSession.class);
public static final String PROTOCOL_INITIATION_RECEIVED = "ProtocolInitiatiionReceived";
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java?view=diff&rev=547730&r1=547729&r2=547730
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java Fri Jun 15 09:28:46 2007
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -27,71 +27,137 @@
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
+/**
+ * BlockingMethodFrameListener is a 'rendezvous' which acts as a {@link AMQMethodListener} that delegates handling of
+ * incoming methods to a method listener implemented as a sub-class of this and hands off the processed method or
+ * error to a consumer. The producer of the event does not have to wait for the consumer to take the event, so this
+ * differs from a 'rendezvous' in that sense.
+ *
+ * <p/>BlockingMethodFrameListeners are used to coordinate waiting for replies to method calls that expect a response.
+ * They are always used in a 'one-shot' manner, that is, to recieve just one response. Usually the caller has to register
+ * them as method listeners with an event dispatcher and remember to de-register them (in a finally block) once they
+ * have been completed.
+ *
+ * <p/>The {@link #processMethod} must return <tt>true</tt> on any incoming method that it handles. This indicates to
+ * this listeners that the method it is waiting for has arrived. Incoming methods are also filtered by channel prior to
+ * being passed to the {@link #processMethod} method, so responses are only received for a particular channel. The
+ * channel id must be passed to the constructor.
+ *
+ * <p/>Errors from the producer are rethrown to the consumer.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Accept notification of AMQP method events. <td> {@link AMQMethodEvent}
+ * <tr><td> Delegate handling of the method to another method listener. <td> {@link AMQMethodBody}
+ * <tr><td> Block until a method is handled by the delegated to handler.
+ * <tr><td> Propagate the most recent exception to the consumer.
+ * </table>
+ *
+ * @todo Might be neater if this method listener simply wrapped another that provided the method handling using a
+ * methodRecevied method. The processMethod takes an additional channelId, however none of the implementations
+ * seem to use it. So wrapping the listeners is possible.
+ *
+ * @todo What is to stop a blocking method listener, receiving a second method whilst it is registered as a listener,
+ * overwriting the first one before the caller of the block method has had a chance to examine it? If one-shot
+ * behaviour is to be intended it should be enforced, perhaps by always returning false once the blocked for
+ * method has been received.
+ *
+ * @todo Interuption is caught but not handled. This could be allowed to fall through. This might actually be usefull
+ * for fail-over where a thread is blocking when failure happens, it could be interrupted to abandon or retry
+ * when this happens. At the very least, restore the interrupted status flag.
+ *
+ * @todo If the retrotranslator can handle it, could use a SynchronousQueue to implement this rendezvous. Need to
+ * check that SynchronousQueue has a non-blocking put method available.
+ */
public abstract class BlockingMethodFrameListener implements AMQMethodListener
{
+ /** This flag is used to indicate that the blocked for method has been received. */
private volatile boolean _ready = false;
- public abstract boolean processMethod(int channelId, AMQMethodBody frame) throws AMQException;
-
+ /** Used to protect the shared event and ready flag between the producer and consumer. */
private final Object _lock = new Object();
- /**
- * This is set if there is an exception thrown from processCommandFrame and the
- * exception is rethrown to the caller of blockForFrame()
- */
+ /** Used to hold the most recent exception that is passed to the {@link #error(Exception)} method. */
private volatile Exception _error;
+ /** Holds the channel id for the channel upon which this listener is waiting for a response. */
protected int _channelId;
+ /** Holds the incoming method. */
protected AMQMethodEvent _doneEvt = null;
+ /**
+ * Creates a new method listener, that filters incoming method to just those that match the specified channel id.
+ *
+ * @param channelId The channel id to filter incoming methods with.
+ */
public BlockingMethodFrameListener(int channelId)
{
_channelId = channelId;
}
/**
- * This method is called by the MINA dispatching thread. Note that it could
- * be called before blockForFrame() has been called.
+ * Delegates any additional handling of the incoming methods to another handler.
*
- * @param evt the frame event
- * @return true if the listener has dealt with this frame
- * @throws AMQException
+ * @param channelId The channel id of the incoming method.
+ * @param frame The method body.
+ *
+ * @return <tt>true</tt> if the method was handled, <tt>false</tt> otherwise.
*/
- public boolean methodReceived(AMQMethodEvent evt) throws AMQException
+ public abstract boolean processMethod(int channelId, AMQMethodBody frame); // throws AMQException;
+
+ /**
+ * Informs this listener that an AMQP method has been received.
+ *
+ * @param evt The AMQP method.
+ *
+ * @return <tt>true</tt> if this listener has handled the method, <tt>false</tt> otherwise.
+ */
+ public boolean methodReceived(AMQMethodEvent evt) // throws AMQException
{
AMQMethodBody method = evt.getMethod();
- try
+ /*try
+ {*/
+ boolean ready = (evt.getChannelId() == _channelId) && processMethod(evt.getChannelId(), method);
+
+ if (ready)
{
- boolean ready = (evt.getChannelId() == _channelId) && processMethod(evt.getChannelId(), method);
- if (ready)
+ // we only update the flag from inside the synchronized block
+ // so that the blockForFrame method cannot "miss" an update - it
+ // will only ever read the flag from within the synchronized block
+ synchronized (_lock)
{
- // we only update the flag from inside the synchronized block
- // so that the blockForFrame method cannot "miss" an update - it
- // will only ever read the flag from within the synchronized block
- synchronized (_lock)
- {
- _doneEvt = evt;
- _ready = ready;
- _lock.notify();
- }
+ _doneEvt = evt;
+ _ready = ready;
+ _lock.notify();
}
- return ready;
}
+
+ return ready;
+
+ /*}
catch (AMQException e)
{
error(e);
// we rethrow the error here, and the code in the frame dispatcher will go round
// each listener informing them that an exception has been thrown
throw e;
- }
+ }*/
}
/**
- * This method is called by the thread that wants to wait for a frame.
+ * Blocks until a method is received that is handled by the delegated to method listener, or the specified timeout
+ * has passed.
+ *
+ * @param timeout The timeout in milliseconds.
+ *
+ * @return The AMQP method that was received.
+ *
+ * @throws AMQException
+ * @throws FailoverException
*/
- public AMQMethodEvent blockForFrame(long timeout) throws AMQException
+ public AMQMethodEvent blockForFrame(long timeout) throws AMQException, FailoverException
{
synchronized (_lock)
{
@@ -117,24 +183,25 @@
catch (InterruptedException e)
{
// IGNORE -- //fixme this isn't ideal as being interrupted isn't equivellant to sucess
-// if (!_ready && timeout != -1)
-// {
-// _error = new AMQException("Server did not respond timely");
-// _ready = true;
-// }
+ // if (!_ready && timeout != -1)
+ // {
+ // _error = new AMQException("Server did not respond timely");
+ // _ready = true;
+ // }
}
}
}
+
if (_error != null)
{
if (_error instanceof AMQException)
{
- throw(AMQException) _error;
+ throw (AMQException) _error;
}
else if (_error instanceof FailoverException)
{
- // This should ensure that FailoverException is not wrapped and can be caught.
- throw(FailoverException) _error; // needed to expose FailoverException.
+ // This should ensure that FailoverException is not wrapped and can be caught.
+ throw (FailoverException) _error; // needed to expose FailoverException.
}
else
{
@@ -156,6 +223,7 @@
// set the error so that the thread that is blocking (against blockForFrame())
// can pick up the exception and rethrow to the caller
_error = e;
+
synchronized (_lock)
{
_ready = true;
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/HeartbeatConfig.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/HeartbeatConfig.java?view=diff&rev=547730&r1=547729&r2=547730
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/HeartbeatConfig.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/HeartbeatConfig.java Fri Jun 15 09:28:46 2007
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,11 +20,12 @@
*/
package org.apache.qpid.client.protocol;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
class HeartbeatConfig
{
- private static final Logger _logger = Logger.getLogger(HeartbeatConfig.class);
+ private static final Logger _logger = LoggerFactory.getLogger(HeartbeatConfig.class);
static final HeartbeatConfig CONFIG = new HeartbeatConfig();
/**
@@ -35,13 +36,13 @@
HeartbeatConfig()
{
String property = System.getProperty("amqj.heartbeat.timeoutFactor");
- if(property != null)
+ if (property != null)
{
try
{
timeoutFactor = Float.parseFloat(property);
}
- catch(NumberFormatException e)
+ catch (NumberFormatException e)
{
_logger.warn("Invalid timeout factor (amqj.heartbeat.timeoutFactor): " + property);
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/ProtocolBufferMonitorFilter.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/ProtocolBufferMonitorFilter.java?view=diff&rev=547730&r1=547729&r2=547730
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/ProtocolBufferMonitorFilter.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/ProtocolBufferMonitorFilter.java Fri Jun 15 09:28:46 2007
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,10 +20,12 @@
*/
package org.apache.qpid.client.protocol;
-import org.apache.log4j.Logger;
import org.apache.mina.common.IoFilterAdapter;
import org.apache.mina.common.IoSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* A MINA filter that monitors the numbers of messages pending to be sent by MINA. It outputs a message
* when a threshold has been exceeded, and has a frequency configuration so that messages are not output
@@ -32,13 +34,13 @@
*/
public class ProtocolBufferMonitorFilter extends IoFilterAdapter
{
- private static final Logger _logger = Logger.getLogger(ProtocolBufferMonitorFilter.class);
+ private static final Logger _logger = LoggerFactory.getLogger(ProtocolBufferMonitorFilter.class);
public static long DEFAULT_FREQUENCY = 5000;
public static int DEFAULT_THRESHOLD = 3000;
- private int _bufferedMessages = 0;
+ private int _bufferedMessages = 0;
private int _threshold;
@@ -58,7 +60,7 @@
_outputFrequencyInMillis = frequency;
}
- public void messageReceived( NextFilter nextFilter, IoSession session, Object message ) throws Exception
+ public void messageReceived(NextFilter nextFilter, IoSession session, Object message) throws Exception
{
_bufferedMessages++;
if (_bufferedMessages > _threshold)
@@ -66,8 +68,8 @@
long now = System.currentTimeMillis();
if ((now - _lastMessageOutputTime) > _outputFrequencyInMillis)
{
- _logger.warn("Protocol message buffer exceeded threshold of " + _threshold + ". Current backlog: " +
- _bufferedMessages);
+ _logger.warn("Protocol message buffer exceeded threshold of " + _threshold + ". Current backlog: "
+ + _bufferedMessages);
_lastMessageOutputTime = now;
}
}
@@ -75,7 +77,7 @@
nextFilter.messageReceived(session, message);
}
- public void messageSent( NextFilter nextFilter, IoSession session, Object message ) throws Exception
+ public void messageSent(NextFilter nextFilter, IoSession session, Object message) throws Exception
{
_bufferedMessages--;
nextFilter.messageSent(session, message);