You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2014/01/23 12:01:08 UTC
svn commit: r1560634 [5/7] - in /qpid/branches/java-broker-bdb-ha: ./ qpid/
qpid/bin/ qpid/cpp/ qpid/cpp/bindings/qmf2/examples/cpp/
qpid/cpp/bindings/qpid/dotnet/src/ qpid/cpp/bindings/qpid/dotnet/src/msvc10/
qpid/cpp/bindings/qpid/dotnet/src/msvc9/ q...
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java Thu Jan 23 11:01:02 2014
@@ -24,96 +24,44 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.message.AMQMessageHeader;
-import org.apache.qpid.server.message.EnqueableMessage;
-import org.apache.qpid.server.message.InboundMessage;
-import org.apache.qpid.server.message.MessageContentSource;
-import org.apache.qpid.server.queue.BaseQueue;
-import org.apache.qpid.server.queue.Filterable;
import org.apache.qpid.server.store.StoredMessage;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
-public class IncomingMessage implements Filterable, InboundMessage, EnqueableMessage, MessageContentSource
+public class IncomingMessage
{
- /** Used for debugging purposes. */
- private static final Logger _logger = Logger.getLogger(IncomingMessage.class);
-
private final MessagePublishInfo _messagePublishInfo;
private ContentHeaderBody _contentHeaderBody;
-
+ private Exchange _exchange;
/**
* Keeps a track of how many bytes we have received in body frames
*/
private long _bodyLengthReceived = 0;
+ private List<ContentBody> _contentChunks = new ArrayList<ContentBody>();
- /**
- * This is stored during routing, to know the queues to which this message should immediately be
- * delivered. It is <b>cleared after delivery has been attempted</b>. Any persistent record of destinations is done
- * by the message handle.
- */
- private List<? extends BaseQueue> _destinationQueues;
-
- private long _expiration;
-
- private Exchange _exchange;
-
- private List<ContentChunk> _contentChunks = new ArrayList<ContentChunk>();
-
- // we keep both the original meta data object and the store reference to it just in case the
- // store would otherwise flow it to disk
-
- private MessageMetaData _messageMetaData;
-
- private StoredMessage<MessageMetaData> _storedMessageHandle;
- private Object _connectionReference;
-
-
- public IncomingMessage(
- final MessagePublishInfo info
- )
- {
- this(info, null);
- }
-
- public IncomingMessage(MessagePublishInfo info, Object reference)
+ public IncomingMessage(MessagePublishInfo info)
{
_messagePublishInfo = info;
- _connectionReference = reference;
}
- public void setContentHeaderBody(final ContentHeaderBody contentHeaderBody) throws AMQException
+ public void setContentHeaderBody(final ContentHeaderBody contentHeaderBody)
{
_contentHeaderBody = contentHeaderBody;
}
- public void setExpiration()
- {
- _expiration = ((BasicContentHeaderProperties) _contentHeaderBody.getProperties()).getExpiration();
- }
-
- public MessageMetaData headersReceived(long currentTime)
- {
- _messageMetaData = new MessageMetaData(_messagePublishInfo, _contentHeaderBody, 0, currentTime);
- return _messageMetaData;
- }
-
-
- public List<? extends BaseQueue> getDestinationQueues()
+ public MessagePublishInfo getMessagePublishInfo()
{
- return _destinationQueues;
+ return _messagePublishInfo;
}
- public void addContentBodyFrame(final ContentChunk contentChunk) throws AMQException
+ public void addContentBodyFrame(final ContentBody contentChunk) throws AMQException
{
_bodyLengthReceived += contentChunk.getSize();
_contentChunks.add(contentChunk);
@@ -124,31 +72,14 @@ public class IncomingMessage implements
return (_bodyLengthReceived == getContentHeader().getBodySize());
}
- public AMQShortString getExchange()
+ public AMQShortString getExchangeName()
{
return _messagePublishInfo.getExchange();
}
- public String getRoutingKey()
- {
- return _messagePublishInfo.getRoutingKey() == null ? null : _messagePublishInfo.getRoutingKey().toString();
- }
-
- public String getBinding()
- {
- return _messagePublishInfo.getRoutingKey() == null ? null : _messagePublishInfo.getRoutingKey().toString();
- }
-
-
- public boolean isMandatory()
- {
- return _messagePublishInfo.isMandatory();
- }
-
-
- public boolean isImmediate()
+ public Exchange getExchange()
{
- return _messagePublishInfo.isImmediate();
+ return _exchange;
}
public ContentHeaderBody getContentHeader()
@@ -156,129 +87,24 @@ public class IncomingMessage implements
return _contentHeaderBody;
}
-
- public AMQMessageHeader getMessageHeader()
- {
- return _messageMetaData.getMessageHeader();
- }
-
- public boolean isPersistent()
- {
- return getContentHeader().getProperties() instanceof BasicContentHeaderProperties &&
- ((BasicContentHeaderProperties) getContentHeader().getProperties()).getDeliveryMode() ==
- BasicContentHeaderProperties.PERSISTENT;
- }
-
- public boolean isRedelivered()
- {
- return false;
- }
-
-
public long getSize()
{
return getContentHeader().getBodySize();
}
- public long getMessageNumber()
- {
- return _storedMessageHandle.getMessageNumber();
- }
-
public void setExchange(final Exchange e)
{
_exchange = e;
}
- public void route()
- {
- enqueue(_exchange.route(this));
-
- }
-
- public void enqueue(final List<? extends BaseQueue> queues)
- {
- _destinationQueues = queues;
- }
-
- public MessagePublishInfo getMessagePublishInfo()
- {
- return _messagePublishInfo;
- }
-
- public long getExpiration()
- {
- return _expiration;
- }
-
public int getBodyCount() throws AMQException
{
return _contentChunks.size();
}
- public ContentChunk getContentChunk(int index)
+ public ContentBody getContentChunk(int index)
{
return _contentChunks.get(index);
}
-
- public int getContent(ByteBuffer buf, int offset)
- {
- int pos = 0;
- int written = 0;
- for(ContentChunk cb : _contentChunks)
- {
- ByteBuffer data = ByteBuffer.wrap(cb.getData());
- if(offset+written >= pos && offset < pos + data.limit())
- {
- ByteBuffer src = data.duplicate();
- src.position(offset+written - pos);
- src = src.slice();
-
- if(buf.remaining() < src.limit())
- {
- src.limit(buf.remaining());
- }
- int count = src.limit();
- buf.put(src);
- written += count;
- if(buf.remaining() == 0)
- {
- break;
- }
- }
- pos+=data.limit();
- }
- return written;
-
- }
-
-
- public ByteBuffer getContent(int offset, int size)
- {
- ByteBuffer buf = ByteBuffer.allocate(size);
- getContent(buf,offset);
- buf.flip();
- return buf;
- }
-
- public void setStoredMessage(StoredMessage<MessageMetaData> storedMessageHandle)
- {
- _storedMessageHandle = storedMessageHandle;
- }
-
- public StoredMessage<MessageMetaData> getStoredMessage()
- {
- return _storedMessageHandle;
- }
-
- public Object getConnectionReference()
- {
- return _connectionReference;
- }
-
- public MessageMetaData getMessageMetaData()
- {
- return _messageMetaData;
- }
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java Thu Jan 23 11:01:02 2014
@@ -46,11 +46,10 @@ import java.util.Set;
*/
public class MessageMetaData implements StorableMessageMetaData
{
- private MessagePublishInfo _messagePublishInfo;
+ private final MessagePublishInfo _messagePublishInfo;
- private ContentHeaderBody _contentHeaderBody;
+ private final ContentHeaderBody _contentHeaderBody;
- private int _contentChunkCount;
private long _arrivalTime;
private static final byte MANDATORY_FLAG = 1;
@@ -58,59 +57,36 @@ public class MessageMetaData implements
public static final MessageMetaDataType.Factory<MessageMetaData> FACTORY = new MetaDataFactory();
private static final MessageMetaDataType_0_8 TYPE = new MessageMetaDataType_0_8();
- public MessageMetaData(MessagePublishInfo publishBody, ContentHeaderBody contentHeaderBody, int contentChunkCount)
+ public MessageMetaData(MessagePublishInfo publishBody, ContentHeaderBody contentHeaderBody)
{
- this(publishBody,contentHeaderBody, contentChunkCount, System.currentTimeMillis());
+ this(publishBody,contentHeaderBody, System.currentTimeMillis());
}
- public MessageMetaData(MessagePublishInfo publishBody, ContentHeaderBody contentHeaderBody, int contentChunkCount, long arrivalTime)
+ public MessageMetaData(MessagePublishInfo publishBody,
+ ContentHeaderBody contentHeaderBody,
+ long arrivalTime)
{
_contentHeaderBody = contentHeaderBody;
_messagePublishInfo = publishBody;
- _contentChunkCount = contentChunkCount;
_arrivalTime = arrivalTime;
}
- public int getContentChunkCount()
- {
- return _contentChunkCount;
- }
-
- public void setContentChunkCount(int contentChunkCount)
- {
- _contentChunkCount = contentChunkCount;
- }
public ContentHeaderBody getContentHeaderBody()
{
return _contentHeaderBody;
}
- public void setContentHeaderBody(ContentHeaderBody contentHeaderBody)
- {
- _contentHeaderBody = contentHeaderBody;
- }
-
public MessagePublishInfo getMessagePublishInfo()
{
return _messagePublishInfo;
}
- public void setMessagePublishInfo(MessagePublishInfo messagePublishInfo)
- {
- _messagePublishInfo = messagePublishInfo;
- }
-
public long getArrivalTime()
{
return _arrivalTime;
}
- public void setArrivalTime(long arrivalTime)
- {
- _arrivalTime = arrivalTime;
- }
-
public MessageMetaDataType getType()
{
return TYPE;
@@ -169,8 +145,7 @@ public class MessageMetaData implements
public boolean isPersistent()
{
- BasicContentHeaderProperties properties = (BasicContentHeaderProperties) (_contentHeaderBody.getProperties());
- return properties.getDeliveryMode() == BasicContentHeaderProperties.PERSISTENT;
+ return _contentHeaderBody.getProperties().getDeliveryMode() == BasicContentHeaderProperties.PERSISTENT;
}
private static class MetaDataFactory implements MessageMetaDataType.Factory
@@ -219,7 +194,7 @@ public class MessageMetaData implements
return routingKey;
}
};
- return new MessageMetaData(publishBody, chb, 0, arrivalTime);
+ return new MessageMetaData(publishBody, chb, arrivalTime);
}
catch (AMQException e)
{
@@ -242,7 +217,7 @@ public class MessageMetaData implements
{
private BasicContentHeaderProperties getProperties()
{
- return (BasicContentHeaderProperties) getContentHeaderBody().getProperties();
+ return getContentHeaderBody().getProperties();
}
public String getUserId()
@@ -300,18 +275,6 @@ public class MessageMetaData implements
return getProperties().getReplyToAsString();
}
- public String getReplyToExchange()
- {
- // TODO
- return getReplyTo();
- }
-
- public String getReplyToRoutingKey()
- {
- // TODO
- return getReplyTo();
- }
-
public Object getHeader(String name)
{
FieldTable ft = getProperties().getHeaders();
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java Thu Jan 23 11:01:02 2014
@@ -28,6 +28,7 @@ import org.apache.qpid.framing.AMQShortS
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.FilterManagerFactory;
+import org.apache.qpid.server.filter.Filterable;
import org.apache.qpid.server.flow.FlowCreditManager;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogSubject;
@@ -488,7 +489,7 @@ public abstract class SubscriptionImpl i
{
AMQMessage message = (AMQMessage) entry.getMessage();
- final Object publisherReference = message.getConnectionIdentifier();
+ final Object publisherReference = message.getConnectionReference();
// We don't want local messages so check to see if message is one we sent
Object localReference = getProtocolSession().getReference();
@@ -519,7 +520,7 @@ public abstract class SubscriptionImpl i
private boolean checkFilters(QueueEntry msg)
{
- return (_filters == null) || _filters.allAllow(msg);
+ return (_filters == null) || _filters.allAllow(msg.asFilterable());
}
public boolean isAutoClose()
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java Thu Jan 23 11:01:02 2014
@@ -119,8 +119,6 @@ public class AckTest extends QpidTestCas
return new AMQShortString("rk");
}
};
- final IncomingMessage msg = new IncomingMessage(publishBody);
- //IncomingMessage msg2 = null;
BasicContentHeaderProperties b = new BasicContentHeaderProperties();
ContentHeaderBody cb = new ContentHeaderBody();
cb.setProperties(b);
@@ -131,42 +129,35 @@ public class AckTest extends QpidTestCas
b.setDeliveryMode((byte) 2);
}
- msg.setContentHeaderBody(cb);
-
// we increment the reference here since we are not delivering the messaging to any queues, which is where
// the reference is normally incremented. The test is easier to construct if we have direct access to the
// subscription
ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
qs.add(_queue);
- msg.enqueue(qs);
- MessageMetaData mmd = msg.headersReceived(System.currentTimeMillis());
+ MessageMetaData mmd = new MessageMetaData(publishBody,cb, System.currentTimeMillis());
final StoredMessage storedMessage = _messageStore.addMessage(mmd);
- msg.setStoredMessage(storedMessage);
final AMQMessage message = new AMQMessage(storedMessage);
- if(msg.allContentReceived())
- {
- ServerTransaction txn = new AutoCommitTransaction(_messageStore);
- txn.enqueue(_queue, message, new ServerTransaction.Action() {
- public void postCommit()
+ ServerTransaction txn = new AutoCommitTransaction(_messageStore);
+ txn.enqueue(_queue, message, new ServerTransaction.Action() {
+ public void postCommit()
+ {
+ try
{
- try
- {
- _queue.enqueue(message);
- }
- catch (AMQException e)
- {
- throw new RuntimeException(e);
- }
+ _queue.enqueue(message);
}
-
- public void onRollback()
+ catch (AMQException e)
{
- //To change body of implemented methods use File | Settings | File Templates.
+ throw new RuntimeException(e);
}
- });
+ }
+
+ public void onRollback()
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+ });
- }
// we manually send the message to the subscription
//_subscription.send(new QueueEntry(_queue,msg), _queue);
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java Thu Jan 23 11:01:02 2014
@@ -35,6 +35,7 @@ import java.util.concurrent.atomic.Atomi
import javax.security.auth.Subject;
+import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentHeaderBody;
@@ -55,6 +56,7 @@ import org.apache.qpid.transport.network
public class InternalTestProtocolSession extends AMQProtocolEngine implements ProtocolOutputConverter
{
+ private static final Logger _logger = Logger.getLogger(InternalTestProtocolSession.class);
// ChannelID(LIST) -> LinkedList<Pair>
private final Map<Integer, Map<AMQShortString, LinkedList<DeliveryPair>>> _channelDelivers;
private AtomicInteger _deliveryCount = new AtomicInteger(0);
@@ -185,7 +187,7 @@ public class InternalTestProtocolSession
}
catch (InterruptedException e)
{
- e.printStackTrace();
+ _logger.error("Thread interupted", e);
}
}
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java Thu Jan 23 11:01:02 2014
@@ -57,9 +57,9 @@ public class MockStoredMessage implement
{
FieldTable headers = new FieldTable();
headers.setString(headerName, headerValue == null? null :String.valueOf(headerValue));
- ((BasicContentHeaderProperties)chb.getProperties()).setHeaders(headers);
+ ( chb.getProperties()).setHeaders(headers);
}
- _metaData = new MessageMetaData(info, chb, 0);
+ _metaData = new MessageMetaData(info, chb);
_content = ByteBuffer.allocate(_metaData.getContentSize());
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java Thu Jan 23 11:01:02 2014
@@ -81,7 +81,7 @@ public class ReferenceCountingTest exten
- MessageMetaData mmd = new MessageMetaData(info, chb, 0);
+ MessageMetaData mmd = new MessageMetaData(info, chb);
StoredMessage storedMessage = _store.addMessage(mmd);
@@ -139,7 +139,7 @@ public class ReferenceCountingTest exten
final ContentHeaderBody chb = createPersistentContentHeader();
- MessageMetaData mmd = new MessageMetaData(info, chb, 0);
+ MessageMetaData mmd = new MessageMetaData(info, chb);
StoredMessage storedMessage = _store.addMessage(mmd);
AMQMessage message = new AMQMessage(storedMessage);
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/pom.xml
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/pom.xml?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/pom.xml (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/pom.xml Thu Jan 23 11:01:02 2014
@@ -40,6 +40,14 @@
<version>0.26-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
+
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <version>${log4j-version}</version>
+ <scope>compile</scope>
+ </dependency>
+
</dependencies>
<build>
Propchange: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0:r1558037-1560619
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java Thu Jan 23 11:01:02 2014
@@ -51,6 +51,8 @@ public class Connection_1_0 implements C
private final ConnectionEndpoint _conn;
private final long _connectionId;
private final Collection<Session_1_0> _sessions = Collections.synchronizedCollection(new ArrayList<Session_1_0>());
+ private final Object _reference = new Object();
+
public static interface Task
@@ -79,6 +81,11 @@ public class Connection_1_0 implements C
}
+ public Object getReference()
+ {
+ return _reference;
+ }
+
public void remoteSessionCreation(SessionEndpoint endpoint)
{
Session_1_0 session = new Session_1_0(_vhost, this);
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java Thu Jan 23 11:01:02 2014
@@ -27,6 +27,8 @@ import org.apache.qpid.amqp_1_0.type.mes
import org.apache.qpid.amqp_1_0.type.messaging.TerminusDurability;
import org.apache.qpid.amqp_1_0.type.messaging.TerminusExpiryPolicy;
import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.txn.ServerTransaction;
@@ -53,34 +55,71 @@ public class ExchangeDestination impleme
public Outcome send(final Message_1_0 message, ServerTransaction txn)
{
- final List<? extends BaseQueue> queues = _exchange.route(message);
+ final InstanceProperties instanceProperties =
+ new InstanceProperties()
+ {
+
+ @Override
+ public Object getProperty(final Property prop)
+ {
+ switch(prop)
+ {
+ case MANDATORY:
+ return false;
+ case REDELIVERED:
+ return false;
+ case PERSISTENT:
+ return message.isPersistent();
+ case IMMEDIATE:
+ return false;
+ case EXPIRATION:
+ return message.getExpiration();
+ }
+ return null;
+ }};
- txn.enqueue(queues,message, new ServerTransaction.Action()
+ List<? extends BaseQueue> queues = _exchange.route(message, instanceProperties);
+
+ if(queues == null || queues.isEmpty())
{
+ Exchange altExchange = _exchange.getAlternateExchange();
+ if(altExchange != null)
+ {
+ queues = altExchange.route(message, instanceProperties);
+ }
+ }
- BaseQueue[] _queues = queues.toArray(new BaseQueue[queues.size()]);
+ if(queues != null && !queues.isEmpty())
+ {
+ final BaseQueue[] baseQueues = queues.toArray(new BaseQueue[queues.size()]);
- public void postCommit()
+ txn.enqueue(queues,message, new ServerTransaction.Action()
{
- for(int i = 0; i < _queues.length; i++)
+ MessageReference _reference = message.newReference();
+
+ public void postCommit()
{
- try
- {
- _queues[i].enqueue(message);
- }
- catch (AMQException e)
+ for(int i = 0; i < baseQueues.length; i++)
{
- // TODO
- throw new RuntimeException(e);
+ try
+ {
+ baseQueues[i].enqueue(message);
+ }
+ catch (AMQException e)
+ {
+ // TODO
+ throw new RuntimeException(e);
+ }
}
+ _reference.release();
}
- }
- public void onRollback()
- {
- // NO-OP
- }
- });
+ public void onRollback()
+ {
+ _reference.release();
+ }
+ });
+ }
return ACCEPTED;
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java Thu Jan 23 11:01:02 2014
@@ -272,7 +272,7 @@ public abstract class MessageConverter_t
@Override
public void remove()
{
- serverMessage.getStoredMessage().remove();
+ throw new UnsupportedOperationException();
}
};
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java Thu Jan 23 11:01:02 2014
@@ -22,6 +22,8 @@ package org.apache.qpid.server.protocol.
import java.nio.ByteBuffer;
import java.util.*;
+
+import org.apache.log4j.Logger;
import org.apache.qpid.amqp_1_0.codec.ValueHandler;
import org.apache.qpid.amqp_1_0.messaging.SectionDecoder;
import org.apache.qpid.amqp_1_0.messaging.SectionEncoder;
@@ -44,6 +46,7 @@ import org.apache.qpid.server.store.Stor
public class MessageMetaData_1_0 implements StorableMessageMetaData
{
+ private static final Logger _logger = Logger.getLogger(MessageMetaData_1_0.class);
// TODO move to somewhere more useful
public static final Symbol JMS_TYPE = Symbol.valueOf("x-opt-jms-type");
public static final MessageMetaDataType.Factory<MessageMetaData_1_0> FACTORY = new MetaDataFactory();
@@ -275,7 +278,7 @@ public class MessageMetaData_1_0 impleme
}
catch (AmqpErrorException e)
{
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ _logger.error("Decoding read section error", e);
throw new IllegalArgumentException(e);
}
}
@@ -501,16 +504,6 @@ public class MessageMetaData_1_0 impleme
}
}
- public String getReplyToExchange()
- {
- return null; //TODO
- }
-
- public String getReplyToRoutingKey()
- {
- return null; //TODO
- }
-
public String getAppId()
{
//TODO
Propchange: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java:r1558037-1560619
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java Thu Jan 23 11:01:02 2014
@@ -21,35 +21,22 @@
package org.apache.qpid.server.protocol.v1_0;
-import java.lang.ref.WeakReference;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-import org.apache.qpid.server.message.InboundMessage;
-import org.apache.qpid.server.message.MessageReference;
-import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.message.AbstractServerMessageImpl;
import org.apache.qpid.server.store.StoredMessage;
-public class Message_1_0 implements ServerMessage<MessageMetaData_1_0>, InboundMessage
+public class Message_1_0 extends AbstractServerMessageImpl<Message_1_0, MessageMetaData_1_0>
{
-
- private static final AtomicIntegerFieldUpdater<Message_1_0> _refCountUpdater =
- AtomicIntegerFieldUpdater.newUpdater(Message_1_0.class, "_referenceCount");
-
- private volatile int _referenceCount = 0;
-
- private final StoredMessage<MessageMetaData_1_0> _storedMessage;
private List<ByteBuffer> _fragments;
- private WeakReference<Session_1_0> _session;
private long _arrivalTime;
public Message_1_0(final StoredMessage<MessageMetaData_1_0> storedMessage)
{
- _storedMessage = storedMessage;
- _session = null;
+ super(storedMessage, null);
_fragments = restoreFragments(storedMessage);
}
@@ -75,11 +62,10 @@ public class Message_1_0 implements Serv
public Message_1_0(final StoredMessage<MessageMetaData_1_0> storedMessage,
final List<ByteBuffer> fragments,
- final Session_1_0 session)
+ final Object connectionReference)
{
- _storedMessage = storedMessage;
+ super(storedMessage, connectionReference);
_fragments = fragments;
- _session = new WeakReference<Session_1_0>(session);
_arrivalTime = System.currentTimeMillis();
}
@@ -98,7 +84,7 @@ public class Message_1_0 implements Serv
private MessageMetaData_1_0 getMessageMetaData()
{
- return _storedMessage.getMetaData();
+ return getStoredMessage().getMetaData();
}
public MessageMetaData_1_0.MessageHeader_1_0 getMessageHeader()
@@ -106,16 +92,6 @@ public class Message_1_0 implements Serv
return getMessageMetaData().getMessageHeader();
}
- public StoredMessage getStoredMessage()
- {
- return _storedMessage;
- }
-
- public boolean isPersistent()
- {
- return getMessageMetaData().isPersistent();
- }
-
public boolean isRedelivered()
{
// TODO
@@ -136,121 +112,19 @@ public class Message_1_0 implements Serv
return size;
}
- public boolean isImmediate()
- {
- return false;
- }
-
public long getExpiration()
{
return getMessageHeader().getExpiration();
}
- public MessageReference<Message_1_0> newReference()
- {
- return new Reference(this);
- }
-
- public long getMessageNumber()
- {
- return _storedMessage.getMessageNumber();
- }
-
public long getArrivalTime()
{
return _arrivalTime;
}
- public int getContent(final ByteBuffer buf, final int offset)
- {
- return _storedMessage.getContent(offset, buf);
- }
-
- public ByteBuffer getContent(int offset, int size)
- {
- ByteBuffer buf = ByteBuffer.allocate(size);
- buf.limit(getContent(buf, offset));
-
- return buf;
- }
-
public List<ByteBuffer> getFragments()
{
return _fragments;
}
- public Session_1_0 getSession()
- {
- return _session == null ? null : _session.get();
- }
-
-
- public boolean incrementReference()
- {
- if(_refCountUpdater.incrementAndGet(this) <= 0)
- {
- _refCountUpdater.decrementAndGet(this);
- return false;
- }
- else
- {
- return true;
- }
- }
-
- /**
- * Threadsafe. This will decrement the reference count and when it reaches zero will remove the message from the
- * message store.
- */
-
- public void decrementReference()
- {
- int count = _refCountUpdater.decrementAndGet(this);
-
- // note that the operation of decrementing the reference count and then removing the message does not
- // have to be atomic since the ref count starts at 1 and the exchange itself decrements that after
- // the message has been passed to all queues. i.e. we are
- // not relying on the all the increments having taken place before the delivery manager decrements.
- if (count == 0)
- {
- // set the reference count way below 0 so that we can detect that the message has been deleted
- // this is to guard against the message being spontaneously recreated (from the mgmt console)
- // by copying from other queues at the same time as it is being removed.
- _refCountUpdater.set(this,Integer.MIN_VALUE/2);
-
- // must check if the handle is null since there may be cases where we decide to throw away a message
- // and the handle has not yet been constructed
- if (_storedMessage != null)
- {
- _storedMessage.remove();
- }
- }
- else
- {
- if (count < 0)
- {
- throw new RuntimeException("Reference count for message id " + getMessageNumber()
- + " has gone below 0.");
- }
- }
- }
-
- public static class Reference extends MessageReference<Message_1_0>
- {
- public Reference(Message_1_0 message)
- {
- super(message);
- }
-
- protected void onReference(Message_1_0 message)
- {
- message.incrementReference();
- }
-
- protected void onRelease(Message_1_0 message)
- {
- message.decrementReference();
- }
-
- }
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java Thu Jan 23 11:01:02 2014
@@ -20,18 +20,17 @@
*/
package org.apache.qpid.server.protocol.v1_0;
+import org.apache.log4j.Logger;
import org.apache.qpid.amqp_1_0.type.Outcome;
import org.apache.qpid.amqp_1_0.type.messaging.Accepted;
-import org.apache.qpid.AMQException;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.txn.ServerTransaction;
-import java.util.Arrays;
-
public class QueueDestination implements SendingDestination, ReceivingDestination
{
+ private static final Logger _logger = Logger.getLogger(QueueDestination.class);
private static final Accepted ACCEPTED = new Accepted();
private static final Outcome[] OUTCOMES = new Outcome[] { ACCEPTED };
@@ -80,7 +79,7 @@ public class QueueDestination implements
}
catch(Exception e)
{
- e.printStackTrace();
+ _logger.error("Send error", e);
throw new RuntimeException(e);
}
return ACCEPTED;
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java Thu Jan 23 11:01:02 2014
@@ -43,6 +43,7 @@ import org.apache.qpid.amqp_1_0.type.tra
import org.apache.qpid.amqp_1_0.type.transport.Detach;
import org.apache.qpid.amqp_1_0.type.transport.ReceiverSettleMode;
import org.apache.qpid.amqp_1_0.type.transport.Transfer;
+import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
@@ -170,8 +171,8 @@ public class ReceivingLink_1_0 implement
storedMessage.flushToStore();
- Message_1_0 message = new Message_1_0(storedMessage, fragments, getSession());
-
+ Message_1_0 message = new Message_1_0(storedMessage, fragments, getSession().getConnection().getReference());
+ MessageReference<Message_1_0> reference = message.newReference();
Binary transactionId = null;
org.apache.qpid.amqp_1_0.type.DeliveryState xfrState = xfr.getState();
@@ -241,6 +242,8 @@ public class ReceivingLink_1_0 implement
}
});
}
+
+ reference.release();
}
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java Thu Jan 23 11:01:02 2014
@@ -27,6 +27,8 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInternalException;
import org.apache.qpid.AMQSecurityException;
@@ -64,7 +66,6 @@ import org.apache.qpid.server.filter.JMS
import org.apache.qpid.server.filter.SimpleFilterManager;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
@@ -72,6 +73,8 @@ import org.apache.qpid.server.virtualhos
public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryStateHandler
{
+ private static final Logger _logger = Logger.getLogger(SendingLink_1_0.class);
+
private VirtualHost _vhost;
private SendingDestination _destination;
@@ -319,7 +322,8 @@ public class SendingLink_1_0 implements
}
catch (AMQException e)
{
- e.printStackTrace(); //TODO.
+ //TODO
+ _logger.error("Error removing queue", e);
}
}
}
@@ -342,14 +346,15 @@ public class SendingLink_1_0 implements
}
catch (AMQSecurityException e)
{
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ _logger.error("Security error", e);
}
catch (AMQInternalException e)
{
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- } catch (AMQException e)
+ _logger.error("Internal error", e);
+ }
+ catch (AMQException e)
{
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ _logger.error("Error", e);
}
_subscription = new Subscription_1_0(this, qd, true);
@@ -370,7 +375,8 @@ public class SendingLink_1_0 implements
}
catch (AMQException e)
{
- e.printStackTrace(); //TODO
+ //TODO
+ _logger.error("Error registering subscription", e);
}
}
@@ -398,7 +404,8 @@ public class SendingLink_1_0 implements
}
catch (AMQException e)
{
- e.printStackTrace(); //TODO
+ //TODO
+ _logger.error("Error unregistering subscription", e);
}
Modified state = new Modified();
@@ -423,7 +430,8 @@ public class SendingLink_1_0 implements
}
catch(AMQException e)
{
- e.printStackTrace(); // TODO - Implement
+ //TODO
+ _logger.error("Error removing queue", e);
}
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java Thu Jan 23 11:01:02 2014
@@ -21,6 +21,8 @@
package org.apache.qpid.server.protocol.v1_0;
import java.text.MessageFormat;
+
+import org.apache.log4j.Logger;
import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
import org.apache.qpid.amqp_1_0.transport.ReceivingLinkEndpoint;
import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint;
@@ -39,13 +41,11 @@ import org.apache.qpid.AMQSecurityExcept
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.message.InboundMessage;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.LinkRegistry;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -56,6 +56,7 @@ import static org.apache.qpid.server.log
public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSubject
{
+ private static final Logger _logger = Logger.getLogger(Session_1_0.class);
private static final Symbol LIFETIME_POLICY = Symbol.valueOf("lifetime-policy");
private VirtualHost _vhost;
private AutoCommitTransaction _transaction;
@@ -155,7 +156,7 @@ public class Session_1_0 implements Sess
}
catch(AmqpErrorException e)
{
- e.printStackTrace();
+ _logger.error("Error creating sending link", e);
destination = null;
sendingLinkEndpoint.setSource(null);
error = e.getError();
@@ -355,7 +356,8 @@ public class Session_1_0 implements Sess
}
catch (AMQException e)
{
- e.printStackTrace(); //TODO.
+ //TODO
+ _logger.error("Error removing queue from vhost", e);
}
}
}
@@ -388,10 +390,13 @@ public class Session_1_0 implements Sess
}
catch (AMQSecurityException e)
{
- e.printStackTrace(); //TODO.
- } catch (AMQException e)
+ //TODO
+ _logger.error("Security error", e);
+ }
+ catch (AMQException e)
{
- e.printStackTrace(); //TODO
+ //TODO
+ _logger.error("Error", e);
}
return queue;
@@ -533,10 +538,9 @@ public class Session_1_0 implements Sess
}
@Override
- public boolean onSameConnection(InboundMessage inbound)
+ public Object getConnectionReference()
{
- // TODO
- return false;
+ return getConnection().getReference();
}
@Override
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java Thu Jan 23 11:01:02 2014
@@ -57,7 +57,8 @@ import org.apache.qpid.server.queue.Queu
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.txn.ServerTransaction;
-class Subscription_1_0 implements Subscription
+class
+ Subscription_1_0 implements Subscription
{
private SendingLink_1_0 _link;
@@ -149,7 +150,7 @@ class Subscription_1_0 implements Subscr
{
if(entry.getMessage() instanceof Message_1_0)
{
- if(_noLocal && ((Message_1_0)entry.getMessage()).getSession() == getSession())
+ if(_noLocal && ((Message_1_0)entry.getMessage()).getConnectionReference() == getSession().getConnection().getReference())
{
return false;
}
@@ -164,7 +165,7 @@ class Subscription_1_0 implements Subscr
private boolean checkFilters(final QueueEntry entry)
{
- return (_filters == null) || _filters.allAllow(entry);
+ return (_filters == null) || _filters.allAllow(entry.asFilterable());
}
public boolean isClosed()
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorLink_1_0.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorLink_1_0.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorLink_1_0.java Thu Jan 23 11:01:02 2014
@@ -20,13 +20,13 @@
*/
package org.apache.qpid.server.protocol.v1_0;
+import org.apache.log4j.Logger;
import org.apache.qpid.amqp_1_0.messaging.SectionDecoder;
import org.apache.qpid.amqp_1_0.messaging.SectionDecoderImpl;
import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
import org.apache.qpid.amqp_1_0.transport.ReceivingLinkEndpoint;
import org.apache.qpid.amqp_1_0.transport.ReceivingLinkListener;
import org.apache.qpid.amqp_1_0.type.*;
-import org.apache.qpid.amqp_1_0.type.DeliveryState;
import org.apache.qpid.amqp_1_0.type.messaging.*;
import org.apache.qpid.amqp_1_0.type.transaction.Declare;
import org.apache.qpid.amqp_1_0.type.transaction.Declared;
@@ -42,6 +42,7 @@ import java.util.*;
public class TxnCoordinatorLink_1_0 implements ReceivingLinkListener, Link_1_0
{
+ private static final Logger _logger = Logger.getLogger(TxnCoordinatorLink_1_0.class);
private VirtualHost _vhost;
private ReceivingLinkEndpoint _endpoint;
@@ -149,7 +150,8 @@ public class TxnCoordinatorLink_1_0 impl
}
catch (AmqpErrorException e)
{
- e.printStackTrace(); //TODO.
+ //TODO
+ _logger.error("AMQP error", e);
}
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java Thu Jan 23 11:01:02 2014
@@ -225,7 +225,6 @@ public class MessageConverter_0_10_to_0_
{
return new MessageMetaData(convertPublishBody(message),
convertContentHeaderBody(message, vhost),
- 1,
message.getArrivalTime());
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java Thu Jan 23 11:01:02 2014
@@ -121,7 +121,7 @@ public class MessageConverter_0_8_to_0_1
body.flip();
BasicContentHeaderProperties properties =
- (BasicContentHeaderProperties) message_0_8.getContentHeaderBody().getProperties();
+ message_0_8.getContentHeaderBody().getProperties();
final AMQShortString exchange = message_0_8.getMessagePublishInfo().getExchange();
if(exchange != null)
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java Thu Jan 23 11:01:02 2014
@@ -56,7 +56,7 @@ public class MessageConverter_0_8_to_1_0
header.setDurable(serverMessage.isPersistent());
BasicContentHeaderProperties contentHeader =
- (BasicContentHeaderProperties) serverMessage.getContentHeaderBody().getProperties();
+ serverMessage.getContentHeaderBody().getProperties();
header.setPriority(UnsignedByte.valueOf(contentHeader.getPriority()));
final long expiration = serverMessage.getExpiration();
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/derby-store/pom.xml
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/derby-store/pom.xml?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/derby-store/pom.xml (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/derby-store/pom.xml Thu Jan 23 11:01:02 2014
@@ -41,6 +41,13 @@
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <version>${log4j-version}</version>
+ <scope>compile</scope>
+ </dependency>
+
<!-- test dependencies -->
<dependency>
<groupId>org.apache.qpid</groupId>
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/jdbc-store/pom.xml
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/jdbc-store/pom.xml?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/jdbc-store/pom.xml (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/jdbc-store/pom.xml Thu Jan 23 11:01:02 2014
@@ -34,6 +34,13 @@
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <version>${log4j-version}</version>
+ <scope>compile</scope>
+ </dependency>
+
<!-- test dependencies -->
<dependency>
<groupId>org.apache.qpid</groupId>
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/management-http/pom.xml
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/management-http/pom.xml?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/management-http/pom.xml (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/management-http/pom.xml Thu Jan 23 11:01:02 2014
@@ -35,6 +35,20 @@
</dependency>
<dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <version>${log4j-version}</version>
+ <scope>compile</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>${slf4j-version}</version>
+ <scope>compile</scope>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.geronimo.specs</groupId>
<artifactId>geronimo-servlet_2.5_spec</artifactId>
<version>1.2</version>
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/management-jmx/pom.xml
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/management-jmx/pom.xml?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/management-jmx/pom.xml (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/management-jmx/pom.xml Thu Jan 23 11:01:02 2014
@@ -41,6 +41,13 @@
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <version>${log4j-version}</version>
+ <scope>compile</scope>
+ </dependency>
+
<!-- test dependencies -->
<dependency>
<groupId>org.apache.qpid</groupId>
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker/pom.xml
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker/pom.xml?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker/pom.xml (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker/pom.xml Thu Jan 23 11:01:02 2014
@@ -40,12 +40,12 @@
<scope>compile</scope>
</dependency>
- <dependency>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- <version>1.2.16</version>
- <scope>compile</scope>
- </dependency>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <version>${log4j-version}</version>
+ <scope>compile</scope>
+ </dependency>
<dependency>
<groupId>commons-cli</groupId>
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/client/pom.xml
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/client/pom.xml?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/client/pom.xml (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/client/pom.xml Thu Jan 23 11:01:02 2014
@@ -43,7 +43,7 @@
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
- <version>1.6.4</version>
+ <version>${slf4j-version}</version>
<scope>compile</scope>
</dependency>
@@ -54,13 +54,6 @@
<version>0.26-SNAPSHOT</version>
<scope>test</scope>
</dependency>
-
- <dependency>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- <version>1.2.16</version>
- <scope>test</scope>
- </dependency>
</dependencies>
<build>
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java Thu Jan 23 11:01:02 2014
@@ -30,7 +30,6 @@ import org.apache.qpid.client.CustomJMSX
import org.apache.qpid.client.JMSAMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.ContentHeaderProperties;
import org.apache.qpid.url.AMQBindingURL;
import org.apache.qpid.url.BindingURL;
@@ -73,7 +72,7 @@ public class AMQMessageDelegate_0_8 exte
private static final boolean STRICT_AMQP_COMPLIANCE =
Boolean.parseBoolean(System.getProperties().getProperty(AMQSession.STRICT_AMQP, AMQSession.STRICT_AMQP_DEFAULT));
- private ContentHeaderProperties _contentHeaderProperties;
+ private BasicContentHeaderProperties _contentHeaderProperties;
// The base set of items that needs to be set.
private AMQMessageDelegate_0_8(BasicContentHeaderProperties properties, long deliveryTag)
@@ -81,7 +80,7 @@ public class AMQMessageDelegate_0_8 exte
super(deliveryTag);
_contentHeaderProperties = properties;
_readableProperties = (_contentHeaderProperties != null);
- _headerAdapter = new JMSHeaderAdapter(_readableProperties ? ((BasicContentHeaderProperties) _contentHeaderProperties).getHeaders()
+ _headerAdapter = new JMSHeaderAdapter(_readableProperties ? _contentHeaderProperties.getHeaders()
: (new BasicContentHeaderProperties()).getHeaders() );
}
@@ -90,7 +89,7 @@ public class AMQMessageDelegate_0_8 exte
{
this(new BasicContentHeaderProperties(), -1);
_readableProperties = false;
- _headerAdapter = new JMSHeaderAdapter(((BasicContentHeaderProperties) _contentHeaderProperties).getHeaders());
+ _headerAdapter = new JMSHeaderAdapter(_contentHeaderProperties.getHeaders());
}
@@ -337,7 +336,7 @@ public class AMQMessageDelegate_0_8 exte
public BasicContentHeaderProperties getContentHeaderProperties()
{
- return (BasicContentHeaderProperties) _contentHeaderProperties;
+ return _contentHeaderProperties;
}
@@ -443,7 +442,7 @@ public class AMQMessageDelegate_0_8 exte
//NOTE: if the JMSX Property is a non AMQP property then we must check _strictAMQP and throw as below.
if (propertyName.equals(CustomJMSXProperty.JMSXUserID.toString()))
{
- return ((BasicContentHeaderProperties) _contentHeaderProperties).getUserIdAsString();
+ return _contentHeaderProperties.getUserIdAsString();
}
else
{
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java Thu Jan 23 11:01:02 2014
@@ -101,7 +101,7 @@ public abstract class AbstractJMSMessage
}
AMQMessageDelegate delegate = new AMQMessageDelegate_0_8(messageNbr,
- (BasicContentHeaderProperties) contentHeader.getProperties(),
+ contentHeader.getProperties(),
exchange, routingKey, queueDestinationCache, topicDestinationCache);
return createMessage(delegate, data);
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java Thu Jan 23 11:01:02 2014
@@ -110,7 +110,7 @@ public class MessageFactoryRegistry
AMQSession_0_8.DestinationCache<AMQTopic> topicDestinationCache)
throws AMQException, JMSException
{
- BasicContentHeaderProperties properties = (BasicContentHeaderProperties) contentHeader.getProperties();
+ BasicContentHeaderProperties properties = contentHeader.getProperties();
// Get the message content type. This may be null for pure AMQP messages, but will always be set for JMS over
// AMQP. When the type is null, it can only be assumed that the message is a byte message.
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java Thu Jan 23 11:01:02 2014
@@ -179,12 +179,9 @@ public class ClientConnectionDelegate ex
}
@Override
- public void connectionHeartbeat(Connection conn, ConnectionHeartbeat hearbeat)
+ public void connectionHeartbeat(Connection conn, ConnectionHeartbeat heartbeat)
{
- // ClientDelegate simply responds to heartbeats with heartbeats
_heartbeatListener.heartbeatReceived();
- super.connectionHeartbeat(conn, hearbeat);
- _heartbeatListener.heartbeatSent();
}
@@ -192,4 +189,11 @@ public class ClientConnectionDelegate ex
{
_heartbeatListener = listener == null ? HeartbeatListener.DEFAULT : listener;
}
+
+ @Override
+ public void writerIdle(final Connection connection)
+ {
+ super.writerIdle(connection);
+ _heartbeatListener.heartbeatSent();
+ }
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java Thu Jan 23 11:01:02 2014
@@ -179,7 +179,7 @@ public class AMQProtocolHandlerTest exte
}
catch (Exception e)
{
- e.printStackTrace();
+ _logger.error(e.getMessage(), e);
fail(e.getMessage());
}
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java Thu Jan 23 11:01:02 2014
@@ -22,6 +22,7 @@ package org.apache.qpid.test.unit.messag
import junit.framework.TestCase;
+import org.apache.log4j.Logger;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
@@ -42,7 +43,7 @@ import javax.jms.TextMessage;
public class MessageConverterTest extends TestCase
{
-
+ private static final Logger _logger = Logger.getLogger(MessageConverterTest.class);
public static final String JMS_CORR_ID = "QPIDID_01";
public static final int JMS_DELIV_MODE = 1;
public static final String JMS_TYPE = "test.jms.type";
@@ -134,8 +135,8 @@ public class MessageConverterTest extend
}
catch (JMSException e)
{
+ _logger.error("An error occured testing the property values", e);
fail("An error occured testing the property values" + e.getCause());
- e.printStackTrace();
}
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/common/pom.xml
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/common/pom.xml?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/common/pom.xml (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/common/pom.xml Thu Jan 23 11:01:02 2014
@@ -34,12 +34,12 @@
</properties>
<dependencies>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- <version>1.6.4</version>
- <scope>compile</scope>
- </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>${slf4j-version}</version>
+ <scope>compile</scope>
+ </dependency>
<!-- test dependencies -->
<dependency>
@@ -49,12 +49,6 @@
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- <version>1.2.16</version>
- <scope>test</scope>
- </dependency>
</dependencies>
<build>
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java Thu Jan 23 11:01:02 2014
@@ -27,7 +27,7 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-public class BasicContentHeaderProperties implements CommonContentHeaderProperties
+public class BasicContentHeaderProperties
{
//persistent & non-persistent constants, values as per JMS DeliveryMode
public static final int NON_PERSISTENT = 1;
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java Thu Jan 23 11:01:02 2014
@@ -39,7 +39,7 @@ public class ContentHeaderBody implement
private long bodySize;
/** must never be null */
- private ContentHeaderProperties properties;
+ private BasicContentHeaderProperties properties;
public ContentHeaderBody()
{
@@ -57,13 +57,13 @@ public class ContentHeaderBody implement
}
- public ContentHeaderBody(ContentHeaderProperties props, int classId)
+ public ContentHeaderBody(BasicContentHeaderProperties props, int classId)
{
properties = props;
this.classId = classId;
}
- public ContentHeaderBody(int classId, int weight, ContentHeaderProperties props, long bodySize)
+ public ContentHeaderBody(int classId, int weight, BasicContentHeaderProperties props, long bodySize)
{
this(props, classId);
this.weight = weight;
@@ -121,12 +121,12 @@ public class ContentHeaderBody implement
return new AMQFrame(channelId, body);
}
- public ContentHeaderProperties getProperties()
+ public BasicContentHeaderProperties getProperties()
{
return properties;
}
- public void setProperties(ContentHeaderProperties props)
+ public void setProperties(BasicContentHeaderProperties props)
{
properties = props;
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java Thu Jan 23 11:01:02 2014
@@ -38,11 +38,11 @@ public class ContentHeaderPropertiesFact
{
}
- public ContentHeaderProperties createContentHeaderProperties(int classId, int propertyFlags,
+ public BasicContentHeaderProperties createContentHeaderProperties(int classId, int propertyFlags,
DataInput buffer, int size)
throws AMQFrameDecodingException, IOException
{
- ContentHeaderProperties properties;
+ BasicContentHeaderProperties properties;
// AMQP version change: "Hardwired" version to major=8, minor=0
// TODO: Change so that the actual version is obtained from
// the ProtocolInitiation object for this session.
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java Thu Jan 23 11:01:02 2014
@@ -21,14 +21,10 @@
package org.apache.qpid.framing.abstraction;
-import org.apache.qpid.framing.AMQBody;
+import org.apache.qpid.framing.AMQMethodBody;
-public interface ProtocolVersionMethodConverter extends MessagePublishInfoConverter
+public interface ProtocolVersionMethodConverter
{
- AMQBody convertToBody(ContentChunk contentBody);
- ContentChunk convertToContentChunk(AMQBody body);
-
- void configure();
-
- AMQBody convertToBody(byte[] input);
+ public MessagePublishInfo convertToInfo(AMQMethodBody body);
+ public AMQMethodBody convertToBody(MessagePublishInfo info);
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java?rev=1560634&r1=1560633&r2=1560634&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java Thu Jan 23 11:01:02 2014
@@ -21,13 +21,10 @@
package org.apache.qpid.framing.amqp_0_9;
-import org.apache.qpid.framing.AMQBody;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicPublishBody;
-import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.abstraction.AbstractMethodConverter;
-import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
@@ -35,48 +32,12 @@ import org.apache.qpid.framing.abstracti
public class MethodConverter_0_9 extends AbstractMethodConverter implements ProtocolVersionMethodConverter
{
- private int _basicPublishClassId;
- private int _basicPublishMethodId;
public MethodConverter_0_9()
{
super((byte)0,(byte)9);
-
-
- }
-
- public AMQBody convertToBody(ContentChunk contentChunk)
- {
- if(contentChunk instanceof ContentChunk_0_9)
- {
- return ((ContentChunk_0_9)contentChunk).toBody();
- }
- else
- {
- return new ContentBody(contentChunk.getData());
- }
- }
-
- public ContentChunk convertToContentChunk(AMQBody body)
- {
- final ContentBody contentBodyChunk = (ContentBody) body;
-
- return new ContentChunk_0_9(contentBodyChunk);
-
}
- public void configure()
- {
-
- _basicPublishClassId = org.apache.qpid.framing.amqp_0_9.BasicPublishBodyImpl.CLASS_ID;
- _basicPublishMethodId = BasicPublishBodyImpl.METHOD_ID;
-
- }
-
- public AMQBody convertToBody(byte[] data)
- {
- return new ContentBody(data);
- }
public MessagePublishInfo convertToInfo(AMQMethodBody methodBody)
{
@@ -103,33 +64,4 @@ public class MethodConverter_0_9 extends
}
- private static class ContentChunk_0_9 implements ContentChunk
- {
- private final ContentBody _contentBodyChunk;
-
- public ContentChunk_0_9(final ContentBody contentBodyChunk)
- {
- _contentBodyChunk = contentBodyChunk;
- }
-
- public int getSize()
- {
- return _contentBodyChunk.getSize();
- }
-
- public byte[] getData()
- {
- return _contentBodyChunk.getPayload();
- }
-
- public void reduceToFit()
- {
- _contentBodyChunk.reduceBufferToFit();
- }
-
- public AMQBody toBody()
- {
- return _contentBodyChunk;
- }
- }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org