You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/01/22 18:16:45 UTC
svn commit: r1560424 [2/2] - in /qpid/trunk/qpid/java:
bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/
bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/
broker-core/src/main/java/org/apache/qpid/server/mes...
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java?rev=1560424&r1=1560423&r2=1560424&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java Wed Jan 22 17:16:44 2014
@@ -21,35 +21,23 @@
package org.apache.qpid.server.protocol.v1_0;
-import java.lang.ref.WeakReference;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import org.apache.qpid.server.message.AbstractServerMessageImpl;
import org.apache.qpid.server.message.InboundMessage;
-import org.apache.qpid.server.message.MessageReference;
-import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.store.StoredMessage;
-public class Message_1_0 implements ServerMessage<MessageMetaData_1_0>, InboundMessage
+public class Message_1_0 extends AbstractServerMessageImpl<Message_1_0, MessageMetaData_1_0> implements InboundMessage
{
-
- private static final AtomicIntegerFieldUpdater<Message_1_0> _refCountUpdater =
- AtomicIntegerFieldUpdater.newUpdater(Message_1_0.class, "_referenceCount");
-
- private volatile int _referenceCount = 0;
-
- private final StoredMessage<MessageMetaData_1_0> _storedMessage;
private List<ByteBuffer> _fragments;
- private WeakReference<Session_1_0> _session;
private long _arrivalTime;
public Message_1_0(final StoredMessage<MessageMetaData_1_0> storedMessage)
{
- _storedMessage = storedMessage;
- _session = null;
+ super(storedMessage, null);
_fragments = restoreFragments(storedMessage);
}
@@ -75,11 +63,10 @@ public class Message_1_0 implements Serv
public Message_1_0(final StoredMessage<MessageMetaData_1_0> storedMessage,
final List<ByteBuffer> fragments,
- final Session_1_0 session)
+ final Object connectionReference)
{
- _storedMessage = storedMessage;
+ super(storedMessage, connectionReference);
_fragments = fragments;
- _session = new WeakReference<Session_1_0>(session);
_arrivalTime = System.currentTimeMillis();
}
@@ -98,7 +85,7 @@ public class Message_1_0 implements Serv
private MessageMetaData_1_0 getMessageMetaData()
{
- return _storedMessage.getMetaData();
+ return getStoredMessage().getMetaData();
}
public MessageMetaData_1_0.MessageHeader_1_0 getMessageHeader()
@@ -106,16 +93,6 @@ public class Message_1_0 implements Serv
return getMessageMetaData().getMessageHeader();
}
- public StoredMessage getStoredMessage()
- {
- return _storedMessage;
- }
-
- public boolean isPersistent()
- {
- return getMessageMetaData().isPersistent();
- }
-
public boolean isRedelivered()
{
// TODO
@@ -136,121 +113,19 @@ public class Message_1_0 implements Serv
return size;
}
- public boolean isImmediate()
- {
- return false;
- }
-
public long getExpiration()
{
return getMessageHeader().getExpiration();
}
- public MessageReference<Message_1_0> newReference()
- {
- return new Reference(this);
- }
-
- public long getMessageNumber()
- {
- return _storedMessage.getMessageNumber();
- }
-
public long getArrivalTime()
{
return _arrivalTime;
}
- public int getContent(final ByteBuffer buf, final int offset)
- {
- return _storedMessage.getContent(offset, buf);
- }
-
- public ByteBuffer getContent(int offset, int size)
- {
- ByteBuffer buf = ByteBuffer.allocate(size);
- buf.limit(getContent(buf, offset));
-
- return buf;
- }
-
public List<ByteBuffer> getFragments()
{
return _fragments;
}
- public Session_1_0 getSession()
- {
- return _session == null ? null : _session.get();
- }
-
-
- public boolean incrementReference()
- {
- if(_refCountUpdater.incrementAndGet(this) <= 0)
- {
- _refCountUpdater.decrementAndGet(this);
- return false;
- }
- else
- {
- return true;
- }
- }
-
- /**
- * Threadsafe. This will decrement the reference count and when it reaches zero will remove the message from the
- * message store.
- */
-
- public void decrementReference()
- {
- int count = _refCountUpdater.decrementAndGet(this);
-
- // note that the operation of decrementing the reference count and then removing the message does not
- // have to be atomic since the ref count starts at 1 and the exchange itself decrements that after
- // the message has been passed to all queues. i.e. we are
- // not relying on the all the increments having taken place before the delivery manager decrements.
- if (count == 0)
- {
- // set the reference count way below 0 so that we can detect that the message has been deleted
- // this is to guard against the message being spontaneously recreated (from the mgmt console)
- // by copying from other queues at the same time as it is being removed.
- _refCountUpdater.set(this,Integer.MIN_VALUE/2);
-
- // must check if the handle is null since there may be cases where we decide to throw away a message
- // and the handle has not yet been constructed
- if (_storedMessage != null)
- {
- _storedMessage.remove();
- }
- }
- else
- {
- if (count < 0)
- {
- throw new RuntimeException("Reference count for message id " + getMessageNumber()
- + " has gone below 0.");
- }
- }
- }
-
- public static class Reference extends MessageReference<Message_1_0>
- {
- public Reference(Message_1_0 message)
- {
- super(message);
- }
-
- protected void onReference(Message_1_0 message)
- {
- message.incrementReference();
- }
-
- protected void onRelease(Message_1_0 message)
- {
- message.decrementReference();
- }
-
- }
}
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java?rev=1560424&r1=1560423&r2=1560424&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java Wed Jan 22 17:16:44 2014
@@ -41,6 +41,7 @@ import org.apache.qpid.amqp_1_0.type.tra
import org.apache.qpid.amqp_1_0.type.transport.Detach;
import org.apache.qpid.amqp_1_0.type.transport.ReceiverSettleMode;
import org.apache.qpid.amqp_1_0.type.transport.Transfer;
+import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
@@ -160,8 +161,8 @@ public class ReceivingLink_1_0 implement
storedMessage.flushToStore();
- Message_1_0 message = new Message_1_0(storedMessage, fragments, getSession());
-
+ Message_1_0 message = new Message_1_0(storedMessage, fragments, getSession().getConnection().getReference());
+ MessageReference<Message_1_0> reference = message.newReference();
Binary transactionId = null;
org.apache.qpid.amqp_1_0.type.DeliveryState xfrState = xfr.getState();
@@ -231,6 +232,8 @@ public class ReceivingLink_1_0 implement
}
});
}
+
+ reference.release();
}
}
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1560424&r1=1560423&r2=1560424&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java Wed Jan 22 17:16:44 2014
@@ -541,8 +541,7 @@ public class Session_1_0 implements Sess
@Override
public boolean onSameConnection(InboundMessage inbound)
{
- // TODO
- return false;
+ return inbound.getConnectionReference() == getConnection().getReference();
}
@Override
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java?rev=1560424&r1=1560423&r2=1560424&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java Wed Jan 22 17:16:44 2014
@@ -149,7 +149,7 @@ class Subscription_1_0 implements Subscr
{
if(entry.getMessage() instanceof Message_1_0)
{
- if(_noLocal && ((Message_1_0)entry.getMessage()).getSession() == getSession())
+ if(_noLocal && ((Message_1_0)entry.getMessage()).getConnectionReference() == getSession().getConnection().getReference())
{
return false;
}
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java?rev=1560424&r1=1560423&r2=1560424&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java Wed Jan 22 17:16:44 2014
@@ -225,7 +225,6 @@ public class MessageConverter_0_10_to_0_
{
return new MessageMetaData(convertPublishBody(message),
convertContentHeaderBody(message, vhost),
- 1,
message.getArrivalTime());
}
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java?rev=1560424&r1=1560423&r2=1560424&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java Wed Jan 22 17:16:44 2014
@@ -121,7 +121,7 @@ public class MessageConverter_0_8_to_0_1
body.flip();
BasicContentHeaderProperties properties =
- (BasicContentHeaderProperties) message_0_8.getContentHeaderBody().getProperties();
+ message_0_8.getContentHeaderBody().getProperties();
final AMQShortString exchange = message_0_8.getMessagePublishInfo().getExchange();
if(exchange != null)
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java?rev=1560424&r1=1560423&r2=1560424&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java Wed Jan 22 17:16:44 2014
@@ -56,7 +56,7 @@ public class MessageConverter_0_8_to_1_0
header.setDurable(serverMessage.isPersistent());
BasicContentHeaderProperties contentHeader =
- (BasicContentHeaderProperties) serverMessage.getContentHeaderBody().getProperties();
+ serverMessage.getContentHeaderBody().getProperties();
header.setPriority(UnsignedByte.valueOf(contentHeader.getPriority()));
final long expiration = serverMessage.getExpiration();
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java?rev=1560424&r1=1560423&r2=1560424&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java Wed Jan 22 17:16:44 2014
@@ -30,7 +30,6 @@ import org.apache.qpid.client.CustomJMSX
import org.apache.qpid.client.JMSAMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.ContentHeaderProperties;
import org.apache.qpid.url.AMQBindingURL;
import org.apache.qpid.url.BindingURL;
@@ -73,7 +72,7 @@ public class AMQMessageDelegate_0_8 exte
private static final boolean STRICT_AMQP_COMPLIANCE =
Boolean.parseBoolean(System.getProperties().getProperty(AMQSession.STRICT_AMQP, AMQSession.STRICT_AMQP_DEFAULT));
- private ContentHeaderProperties _contentHeaderProperties;
+ private BasicContentHeaderProperties _contentHeaderProperties;
// The base set of items that needs to be set.
private AMQMessageDelegate_0_8(BasicContentHeaderProperties properties, long deliveryTag)
@@ -81,7 +80,7 @@ public class AMQMessageDelegate_0_8 exte
super(deliveryTag);
_contentHeaderProperties = properties;
_readableProperties = (_contentHeaderProperties != null);
- _headerAdapter = new JMSHeaderAdapter(_readableProperties ? ((BasicContentHeaderProperties) _contentHeaderProperties).getHeaders()
+ _headerAdapter = new JMSHeaderAdapter(_readableProperties ? _contentHeaderProperties.getHeaders()
: (new BasicContentHeaderProperties()).getHeaders() );
}
@@ -90,7 +89,7 @@ public class AMQMessageDelegate_0_8 exte
{
this(new BasicContentHeaderProperties(), -1);
_readableProperties = false;
- _headerAdapter = new JMSHeaderAdapter(((BasicContentHeaderProperties) _contentHeaderProperties).getHeaders());
+ _headerAdapter = new JMSHeaderAdapter(_contentHeaderProperties.getHeaders());
}
@@ -337,7 +336,7 @@ public class AMQMessageDelegate_0_8 exte
public BasicContentHeaderProperties getContentHeaderProperties()
{
- return (BasicContentHeaderProperties) _contentHeaderProperties;
+ return _contentHeaderProperties;
}
@@ -443,7 +442,7 @@ public class AMQMessageDelegate_0_8 exte
//NOTE: if the JMSX Property is a non AMQP property then we must check _strictAMQP and throw as below.
if (propertyName.equals(CustomJMSXProperty.JMSXUserID.toString()))
{
- return ((BasicContentHeaderProperties) _contentHeaderProperties).getUserIdAsString();
+ return _contentHeaderProperties.getUserIdAsString();
}
else
{
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java?rev=1560424&r1=1560423&r2=1560424&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java Wed Jan 22 17:16:44 2014
@@ -101,7 +101,7 @@ public abstract class AbstractJMSMessage
}
AMQMessageDelegate delegate = new AMQMessageDelegate_0_8(messageNbr,
- (BasicContentHeaderProperties) contentHeader.getProperties(),
+ contentHeader.getProperties(),
exchange, routingKey, queueDestinationCache, topicDestinationCache);
return createMessage(delegate, data);
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java?rev=1560424&r1=1560423&r2=1560424&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java Wed Jan 22 17:16:44 2014
@@ -110,7 +110,7 @@ public class MessageFactoryRegistry
AMQSession_0_8.DestinationCache<AMQTopic> topicDestinationCache)
throws AMQException, JMSException
{
- BasicContentHeaderProperties properties = (BasicContentHeaderProperties) contentHeader.getProperties();
+ BasicContentHeaderProperties properties = contentHeader.getProperties();
// Get the message content type. This may be null for pure AMQP messages, but will always be set for JMS over
// AMQP. When the type is null, it can only be assumed that the message is a byte message.
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java?rev=1560424&r1=1560423&r2=1560424&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java Wed Jan 22 17:16:44 2014
@@ -27,7 +27,7 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-public class BasicContentHeaderProperties implements CommonContentHeaderProperties
+public class BasicContentHeaderProperties
{
//persistent & non-persistent constants, values as per JMS DeliveryMode
public static final int NON_PERSISTENT = 1;
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java?rev=1560424&r1=1560423&r2=1560424&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java Wed Jan 22 17:16:44 2014
@@ -39,7 +39,7 @@ public class ContentHeaderBody implement
private long bodySize;
/** must never be null */
- private ContentHeaderProperties properties;
+ private BasicContentHeaderProperties properties;
public ContentHeaderBody()
{
@@ -57,13 +57,13 @@ public class ContentHeaderBody implement
}
- public ContentHeaderBody(ContentHeaderProperties props, int classId)
+ public ContentHeaderBody(BasicContentHeaderProperties props, int classId)
{
properties = props;
this.classId = classId;
}
- public ContentHeaderBody(int classId, int weight, ContentHeaderProperties props, long bodySize)
+ public ContentHeaderBody(int classId, int weight, BasicContentHeaderProperties props, long bodySize)
{
this(props, classId);
this.weight = weight;
@@ -121,12 +121,12 @@ public class ContentHeaderBody implement
return new AMQFrame(channelId, body);
}
- public ContentHeaderProperties getProperties()
+ public BasicContentHeaderProperties getProperties()
{
return properties;
}
- public void setProperties(ContentHeaderProperties props)
+ public void setProperties(BasicContentHeaderProperties props)
{
properties = props;
}
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java?rev=1560424&r1=1560423&r2=1560424&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java Wed Jan 22 17:16:44 2014
@@ -38,11 +38,11 @@ public class ContentHeaderPropertiesFact
{
}
- public ContentHeaderProperties createContentHeaderProperties(int classId, int propertyFlags,
+ public BasicContentHeaderProperties createContentHeaderProperties(int classId, int propertyFlags,
DataInput buffer, int size)
throws AMQFrameDecodingException, IOException
{
- ContentHeaderProperties properties;
+ BasicContentHeaderProperties properties;
// AMQP version change: "Hardwired" version to major=8, minor=0
// TODO: Change so that the actual version is obtained from
// the ProtocolInitiation object for this session.
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java?rev=1560424&r1=1560423&r2=1560424&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java Wed Jan 22 17:16:44 2014
@@ -21,14 +21,10 @@
package org.apache.qpid.framing.abstraction;
-import org.apache.qpid.framing.AMQBody;
+import org.apache.qpid.framing.AMQMethodBody;
-public interface ProtocolVersionMethodConverter extends MessagePublishInfoConverter
+public interface ProtocolVersionMethodConverter
{
- AMQBody convertToBody(ContentChunk contentBody);
- ContentChunk convertToContentChunk(AMQBody body);
-
- void configure();
-
- AMQBody convertToBody(byte[] input);
+ public MessagePublishInfo convertToInfo(AMQMethodBody body);
+ public AMQMethodBody convertToBody(MessagePublishInfo info);
}
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java?rev=1560424&r1=1560423&r2=1560424&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java Wed Jan 22 17:16:44 2014
@@ -21,13 +21,10 @@
package org.apache.qpid.framing.amqp_0_9;
-import org.apache.qpid.framing.AMQBody;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicPublishBody;
-import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.abstraction.AbstractMethodConverter;
-import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
@@ -35,48 +32,12 @@ import org.apache.qpid.framing.abstracti
public class MethodConverter_0_9 extends AbstractMethodConverter implements ProtocolVersionMethodConverter
{
- private int _basicPublishClassId;
- private int _basicPublishMethodId;
public MethodConverter_0_9()
{
super((byte)0,(byte)9);
-
-
- }
-
- public AMQBody convertToBody(ContentChunk contentChunk)
- {
- if(contentChunk instanceof ContentChunk_0_9)
- {
- return ((ContentChunk_0_9)contentChunk).toBody();
- }
- else
- {
- return new ContentBody(contentChunk.getData());
- }
- }
-
- public ContentChunk convertToContentChunk(AMQBody body)
- {
- final ContentBody contentBodyChunk = (ContentBody) body;
-
- return new ContentChunk_0_9(contentBodyChunk);
-
}
- public void configure()
- {
-
- _basicPublishClassId = org.apache.qpid.framing.amqp_0_9.BasicPublishBodyImpl.CLASS_ID;
- _basicPublishMethodId = BasicPublishBodyImpl.METHOD_ID;
-
- }
-
- public AMQBody convertToBody(byte[] data)
- {
- return new ContentBody(data);
- }
public MessagePublishInfo convertToInfo(AMQMethodBody methodBody)
{
@@ -103,33 +64,4 @@ public class MethodConverter_0_9 extends
}
- private static class ContentChunk_0_9 implements ContentChunk
- {
- private final ContentBody _contentBodyChunk;
-
- public ContentChunk_0_9(final ContentBody contentBodyChunk)
- {
- _contentBodyChunk = contentBodyChunk;
- }
-
- public int getSize()
- {
- return _contentBodyChunk.getSize();
- }
-
- public byte[] getData()
- {
- return _contentBodyChunk.getPayload();
- }
-
- public void reduceToFit()
- {
- _contentBodyChunk.reduceBufferToFit();
- }
-
- public AMQBody toBody()
- {
- return _contentBodyChunk;
- }
- }
}
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/MethodConverter_0_91.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/MethodConverter_0_91.java?rev=1560424&r1=1560423&r2=1560424&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/MethodConverter_0_91.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/MethodConverter_0_91.java Wed Jan 22 17:16:44 2014
@@ -21,61 +21,22 @@
package org.apache.qpid.framing.amqp_0_91;
-import org.apache.qpid.framing.AMQBody;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicPublishBody;
-import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.abstraction.AbstractMethodConverter;
-import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
public class MethodConverter_0_91 extends AbstractMethodConverter implements ProtocolVersionMethodConverter
{
- private int _basicPublishClassId;
- private int _basicPublishMethodId;
public MethodConverter_0_91()
{
super((byte)0,(byte)9);
-
-
- }
-
- public AMQBody convertToBody(ContentChunk contentChunk)
- {
- if(contentChunk instanceof ContentChunk_0_9)
- {
- return ((ContentChunk_0_9)contentChunk).toBody();
- }
- else
- {
- return new ContentBody(contentChunk.getData());
- }
- }
-
- public ContentChunk convertToContentChunk(AMQBody body)
- {
- final ContentBody contentBodyChunk = (ContentBody) body;
-
- return new ContentChunk_0_9(contentBodyChunk);
-
}
- public void configure()
- {
-
- _basicPublishClassId = BasicPublishBodyImpl.CLASS_ID;
- _basicPublishMethodId = BasicPublishBodyImpl.METHOD_ID;
-
- }
-
- public AMQBody convertToBody(byte[] data)
- {
- return new ContentBody(data);
- }
public MessagePublishInfo convertToInfo(AMQMethodBody methodBody)
{
@@ -102,33 +63,4 @@ public class MethodConverter_0_91 extend
}
- private static class ContentChunk_0_9 implements ContentChunk
- {
- private final ContentBody _contentBodyChunk;
-
- public ContentChunk_0_9(final ContentBody contentBodyChunk)
- {
- _contentBodyChunk = contentBodyChunk;
- }
-
- public int getSize()
- {
- return _contentBodyChunk.getSize();
- }
-
- public byte[] getData()
- {
- return _contentBodyChunk.getPayload();
- }
-
- public void reduceToFit()
- {
- _contentBodyChunk.reduceBufferToFit();
- }
-
- public AMQBody toBody()
- {
- return _contentBodyChunk;
- }
- }
}
\ No newline at end of file
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java?rev=1560424&r1=1560423&r2=1560424&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java Wed Jan 22 17:16:44 2014
@@ -21,71 +21,21 @@
package org.apache.qpid.framing.amqp_8_0;
-import org.apache.qpid.framing.AMQBody;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicPublishBody;
-import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.abstraction.AbstractMethodConverter;
-import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
public class MethodConverter_8_0 extends AbstractMethodConverter implements ProtocolVersionMethodConverter
{
- private int _basicPublishClassId;
- private int _basicPublishMethodId;
-
public MethodConverter_8_0()
{
super((byte)8,(byte)0);
-
-
- }
-
- public AMQBody convertToBody(ContentChunk contentChunk)
- {
- return new ContentBody(contentChunk.getData());
}
- public ContentChunk convertToContentChunk(AMQBody body)
- {
- final ContentBody contentBodyChunk = (ContentBody) body;
-
- return new ContentChunk()
- {
-
- public int getSize()
- {
- return contentBodyChunk.getSize();
- }
-
- public byte[] getData()
- {
- return contentBodyChunk.getPayload();
- }
-
- public void reduceToFit()
- {
- contentBodyChunk.reduceBufferToFit();
- }
- };
-
- }
-
- public void configure()
- {
-
- _basicPublishClassId = BasicPublishBodyImpl.CLASS_ID;
- _basicPublishMethodId = BasicPublishBodyImpl.METHOD_ID;
-
- }
-
- public AMQBody convertToBody(byte[] data)
- {
- return new ContentBody(data);
- }
public MessagePublishInfo convertToInfo(AMQMethodBody methodBody)
{
Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java?rev=1560424&r1=1560423&r2=1560424&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java Wed Jan 22 17:16:44 2014
@@ -50,8 +50,6 @@ import org.apache.qpid.server.queue.AMQP
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.ConflationQueue;
-import org.apache.qpid.server.protocol.v0_8.IncomingMessage;
-import org.apache.qpid.server.queue.QueueArgumentsConverter;
import org.apache.qpid.server.queue.SimpleAMQQueue;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
@@ -617,61 +615,41 @@ public class MessageStoreTest extends Qp
MessagePublishInfo messageInfo = new TestMessagePublishInfo(exchange, false, false, routingKey);
- final IncomingMessage currentMessage;
-
-
- currentMessage = new IncomingMessage(messageInfo);
-
- currentMessage.setExchange(exchange);
-
ContentHeaderBody headerBody = new ContentHeaderBody(BasicConsumeBodyImpl.CLASS_ID,0,properties,0l);
- try
- {
- currentMessage.setContentHeaderBody(headerBody);
- }
- catch (AMQException e)
- {
- fail(e.getMessage());
- }
+ MessageMetaData mmd = new MessageMetaData(messageInfo, headerBody, System.currentTimeMillis());
- currentMessage.setExpiration();
+ final StoredMessage<MessageMetaData> storedMessage = getVirtualHost().getMessageStore().addMessage(mmd);
+ storedMessage.flushToStore();
+ final AMQMessage currentMessage = new AMQMessage(storedMessage);
- MessageMetaData mmd = currentMessage.headersReceived(System.currentTimeMillis());
- currentMessage.setStoredMessage(getVirtualHost().getMessageStore().addMessage(mmd));
- currentMessage.getStoredMessage().flushToStore();
- currentMessage.route();
+ final List<? extends BaseQueue> destinationQueues = exchange.route(currentMessage);
- // check and deliver if header says body length is zero
- if (currentMessage.allContentReceived())
- {
- ServerTransaction trans = new AutoCommitTransaction(getVirtualHost().getMessageStore());
- final List<? extends BaseQueue> destinationQueues = currentMessage.getDestinationQueues();
- trans.enqueue(currentMessage.getDestinationQueues(), currentMessage, new ServerTransaction.Action() {
- public void postCommit()
- {
- try
- {
- AMQMessage message = new AMQMessage(currentMessage.getStoredMessage());
+ ServerTransaction trans = new AutoCommitTransaction(getVirtualHost().getMessageStore());
- for(BaseQueue queue : destinationQueues)
- {
- queue.enqueue(message);
- }
- }
- catch (AMQException e)
+ trans.enqueue(destinationQueues, currentMessage, new ServerTransaction.Action() {
+ public void postCommit()
+ {
+ try
+ {
+ for(BaseQueue queue : destinationQueues)
{
- _logger.error("Problem enqueing message", e);
+ queue.enqueue(currentMessage);
}
}
-
- public void onRollback()
+ catch (AMQException e)
{
- //To change body of implemented methods use File | Settings | File Templates.
+ _logger.error("Problem enqueing message", e);
}
- });
- }
+ }
+
+ public void onRollback()
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+ });
+
}
private void createAllQueues()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org