You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2013/03/18 23:19:54 UTC
svn commit: r1458042 [2/6] - in /qpid/branches/QPID-4659/qpid/java:
bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/
bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/
bdbstore/src/test/java/org/apache/qpid/ser...
Added: qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java?rev=1458042&view=auto
==============================================================================
--- qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java (added)
+++ qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java Mon Mar 18 22:19:50 2013
@@ -0,0 +1,243 @@
+package org.apache.qpid.server.protocol.converter.v0_8_v0_10;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;import java.util.Map;import org.apache.qpid.AMQPInvalidClassException;import org.apache.qpid.exchange.ExchangeDefaults;import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.FieldTable;import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.exchange.Exchange;import org.apache.qpid.server.plugin.MessageConverter;
+import org.apache.qpid.server.protocol.v0_10.MessageTransferMessage;
+import org.apache.qpid.server.protocol.v0_8.AMQMessage;
+import org.apache.qpid.server.protocol.v0_8.MessageMetaData;
+import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.transport.DeliveryProperties;import org.apache.qpid.transport.Header;import org.apache.qpid.transport.MessageDeliveryMode;import org.apache.qpid.transport.MessageProperties;import org.apache.qpid.transport.ReplyTo;
+
+public class MessageConverter_0_10_to_0_8 implements MessageConverter<MessageTransferMessage, AMQMessage>
+{
+ private static final int BASIC_CLASS_ID = 60;public static BasicContentHeaderProperties convertContentHeaderProperties(MessageTransferMessage messageTransferMessage,
+ VirtualHost vhost)
+ {
+ BasicContentHeaderProperties props = new BasicContentHeaderProperties();
+
+ Header header = messageTransferMessage.getHeader();
+ DeliveryProperties deliveryProps = header.getDeliveryProperties();
+ MessageProperties messageProps = header.getMessageProperties();
+
+ if(deliveryProps != null)
+ {
+ if(deliveryProps.hasDeliveryMode())
+ {
+ props.setDeliveryMode((byte)(deliveryProps.getDeliveryMode() == MessageDeliveryMode.PERSISTENT ? BasicContentHeaderProperties.PERSISTENT : BasicContentHeaderProperties.NON_PERSISTENT));
+ }
+ if(deliveryProps.hasExpiration())
+ {
+ props.setExpiration(deliveryProps.getExpiration());
+ }
+ if(deliveryProps.hasPriority())
+ {
+ props.setPriority((byte)deliveryProps.getPriority().getValue());
+ }
+ if(deliveryProps.hasTimestamp())
+ {
+ props.setTimestamp(deliveryProps.getTimestamp());
+ }
+ }
+ if(messageProps != null)
+ {
+ if(messageProps.hasAppId())
+ {
+ props.setAppId(new AMQShortString(messageProps.getAppId()));
+ }
+ if(messageProps.hasContentType())
+ {
+ props.setContentType(messageProps.getContentType());
+ }
+ if(messageProps.hasCorrelationId())
+ {
+ props.setCorrelationId(new AMQShortString(messageProps.getCorrelationId()));
+ }
+ if(messageProps.hasContentEncoding())
+ {
+ props.setEncoding(messageProps.getContentEncoding());
+ }
+ if(messageProps.hasMessageId())
+ {
+ props.setMessageId("ID:" + messageProps.getMessageId().toString());
+ }
+ if(messageProps.hasReplyTo())
+ {
+ ReplyTo replyTo = messageProps.getReplyTo();
+ String exchangeName = replyTo.getExchange();
+ String routingKey = replyTo.getRoutingKey();
+ if(exchangeName == null)
+ {
+ exchangeName = "";
+ }
+
+ Exchange exchange = vhost.getExchangeRegistry().getExchange(exchangeName);
+ String exchangeClass = exchange == null ? ExchangeDefaults.DIRECT_EXCHANGE_CLASS.asString() : exchange.getType().getName().asString();
+ props.setReplyTo(exchangeClass + "://"+exchangeName+"//?routingkey='"+(routingKey==null ? "" : routingKey+"'"));
+
+ }
+ if(messageProps.hasUserId())
+ {
+ props.setUserId(new AMQShortString(messageProps.getUserId()));
+ }
+
+ if(messageProps.hasApplicationHeaders())
+ {
+ Map<String, Object> appHeaders = new HashMap<String, Object>(messageProps.getApplicationHeaders());
+ if(messageProps.getApplicationHeaders().containsKey("x-jms-type"))
+ {
+ props.setType(String.valueOf(appHeaders.remove("x-jms-type")));
+ }
+
+ FieldTable ft = new FieldTable();
+ for(Map.Entry<String, Object> entry : appHeaders.entrySet())
+ {
+ try
+ {
+ ft.put(new AMQShortString(entry.getKey()), entry.getValue());
+ }
+ catch(AMQPInvalidClassException e)
+ {
+ // TODO
+ // log here, but ignore - just can;t convert
+ }
+ }
+ props.setHeaders(ft);
+
+ }
+ }
+
+ return props;
+ }
+
+ @Override
+ public Class<MessageTransferMessage> getInputClass()
+ {
+ return MessageTransferMessage.class;
+ }
+
+ @Override
+ public Class<AMQMessage> getOutputClass()
+ {
+ return AMQMessage.class;
+ }
+
+ @Override
+ public AMQMessage convert(MessageTransferMessage message, VirtualHost vhost)
+ {
+ return new AMQMessage(convertToStoredMessage(message, vhost));
+ }
+
+ private StoredMessage<MessageMetaData> convertToStoredMessage(final MessageTransferMessage message, VirtualHost vhost)
+ {
+ final MessageMetaData metaData = convertMetaData(message, vhost);
+ return new StoredMessage<org.apache.qpid.server.protocol.v0_8.MessageMetaData>()
+ {
+ @Override
+ public MessageMetaData getMetaData()
+ {
+ return metaData;
+ }
+
+ @Override
+ public long getMessageNumber()
+ {
+ return message.getMessageNumber();
+ }
+
+ @Override
+ public void addContent(int offsetInMessage, ByteBuffer src)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getContent(int offsetInMessage, ByteBuffer dst)
+ {
+ return message.getContent(dst, offsetInMessage);
+ }
+
+ @Override
+ public ByteBuffer getContent(int offsetInMessage, int size)
+ {
+ return message.getContent(offsetInMessage,size);
+ }
+
+ @Override
+ public StoreFuture flushToStore()
+ {
+ return StoreFuture.IMMEDIATE_FUTURE;
+ }
+
+ @Override
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+
+ private MessageMetaData convertMetaData(MessageTransferMessage message, VirtualHost vhost)
+ {
+ return new MessageMetaData(convertPublishBody(message), convertContentHeaderBody(message, vhost), 1, message.getArrivalTime());
+ }
+
+ private ContentHeaderBody convertContentHeaderBody(MessageTransferMessage message, VirtualHost vhost)
+ {
+ BasicContentHeaderProperties props = convertContentHeaderProperties(message, vhost);
+ ContentHeaderBody chb = new ContentHeaderBody(props, BASIC_CLASS_ID);
+ chb.setBodySize(message.getSize());
+ return chb;
+ }
+
+ private MessagePublishInfo convertPublishBody(MessageTransferMessage message)
+ {
+ DeliveryProperties delvProps = message.getHeader().getDeliveryProperties();
+ final AMQShortString exchangeName =(delvProps == null || delvProps.getExchange() == null)
+ ? null
+ : new AMQShortString(delvProps.getExchange());
+ final AMQShortString routingKey = (delvProps == null || delvProps.getRoutingKey() == null)
+ ? null
+ : new AMQShortString(delvProps.getRoutingKey());
+ final boolean immediate = delvProps != null && delvProps.getImmediate();
+ final boolean mandatory = delvProps != null && !delvProps.getDiscardUnroutable();
+
+ return new MessagePublishInfo()
+ {
+ @Override
+ public AMQShortString getExchange()
+ {
+ return exchangeName;
+ }
+
+ @Override
+ public void setExchange(AMQShortString exchange)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isImmediate()
+ {
+ return immediate;
+ }
+
+ @Override
+ public boolean isMandatory()
+ {
+ return mandatory;
+ }
+
+ @Override
+ public AMQShortString getRoutingKey()
+ {
+ return routingKey;
+ }
+ };
+ }
+}
Propchange: qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java?rev=1458042&view=auto
==============================================================================
--- qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java (added)
+++ qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java Mon Mar 18 22:19:50 2013
@@ -0,0 +1,200 @@
+package org.apache.qpid.server.protocol.converter.v0_8_v0_10;
+
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.plugin.MessageConverter;
+import org.apache.qpid.server.protocol.v0_10.MessageMetaData_0_10;
+import org.apache.qpid.server.protocol.v0_10.MessageTransferMessage;
+import org.apache.qpid.server.protocol.v0_8.AMQMessage;
+import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.transport.DeliveryProperties;
+import org.apache.qpid.transport.Header;
+import org.apache.qpid.transport.MessageDeliveryPriority;
+import org.apache.qpid.transport.MessageProperties;
+import org.apache.qpid.transport.ReplyTo;
+import org.apache.qpid.url.AMQBindingURL;
+
+public class MessageConverter_0_8_to_0_10 implements MessageConverter<AMQMessage, MessageTransferMessage>
+{
+ @Override
+ public Class<AMQMessage> getInputClass()
+ {
+ return AMQMessage.class;
+ }
+
+ @Override
+ public Class<MessageTransferMessage> getOutputClass()
+ {
+ return MessageTransferMessage.class;
+ }
+
+ @Override
+ public MessageTransferMessage convert(AMQMessage message_0_8, VirtualHost vhost)
+ {
+ return new MessageTransferMessage(convertToStoredMessage(message_0_8), null);
+ }
+
+ private StoredMessage<MessageMetaData_0_10> convertToStoredMessage(final AMQMessage message_0_8)
+ {
+ final MessageMetaData_0_10 messageMetaData_0_10 = convertMetaData(message_0_8);
+ return new StoredMessage<MessageMetaData_0_10>()
+ {
+ @Override
+ public MessageMetaData_0_10 getMetaData()
+ {
+ return messageMetaData_0_10;
+ }
+
+ @Override
+ public long getMessageNumber()
+ {
+ return message_0_8.getMessageNumber();
+ }
+
+ @Override
+ public void addContent(int offsetInMessage, ByteBuffer src)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getContent(int offsetInMessage, ByteBuffer dst)
+ {
+ return message_0_8.getContent(dst, offsetInMessage);
+ }
+
+ @Override
+ public ByteBuffer getContent(int offsetInMessage, int size)
+ {
+ return message_0_8.getContent(offsetInMessage, size);
+ }
+
+ @Override
+ public StoreFuture flushToStore()
+ {
+ return StoreFuture.IMMEDIATE_FUTURE;
+ }
+
+ @Override
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+
+ private MessageMetaData_0_10 convertMetaData(AMQMessage message_0_8)
+ {
+ DeliveryProperties deliveryProps = new DeliveryProperties();
+ MessageProperties messageProps = new MessageProperties();
+
+ int size = (int) message_0_8.getSize();
+ ByteBuffer body = ByteBuffer.allocate(size);
+ message_0_8.getContent(body, 0);
+ body.flip();
+
+ BasicContentHeaderProperties properties =
+ (BasicContentHeaderProperties) message_0_8.getContentHeaderBody().getProperties();
+
+ final AMQShortString exchange = message_0_8.getMessagePublishInfo().getExchange();
+ if(exchange != null)
+ {
+ deliveryProps.setExchange(exchange.toString());
+ }
+
+ deliveryProps.setExpiration(message_0_8.getExpiration());
+ deliveryProps.setImmediate(message_0_8.isImmediate());
+ deliveryProps.setPriority(MessageDeliveryPriority.get(properties.getPriority()));
+ deliveryProps.setRoutingKey(message_0_8.getRoutingKey());
+ deliveryProps.setTimestamp(properties.getTimestamp());
+
+ messageProps.setContentEncoding(properties.getEncodingAsString());
+ messageProps.setContentLength(size);
+ if(properties.getAppId() != null)
+ {
+ messageProps.setAppId(properties.getAppId().getBytes());
+ }
+ messageProps.setContentType(properties.getContentTypeAsString());
+ if(properties.getCorrelationId() != null)
+ {
+ messageProps.setCorrelationId(properties.getCorrelationId().getBytes());
+ }
+
+ if(properties.getReplyTo() != null && properties.getReplyTo().length() != 0)
+ {
+ String origReplyToString = properties.getReplyTo().asString();
+ ReplyTo replyTo = new ReplyTo();
+ // if the string looks like a binding URL, then attempt to parse it...
+ try
+ {
+ AMQBindingURL burl = new AMQBindingURL(origReplyToString);
+ AMQShortString routingKey = burl.getRoutingKey();
+ if(routingKey != null)
+ {
+ replyTo.setRoutingKey(routingKey.asString());
+ }
+
+ AMQShortString exchangeName = burl.getExchangeName();
+ if(exchangeName != null)
+ {
+ replyTo.setExchange(exchangeName.asString());
+ }
+ }
+ catch (URISyntaxException e)
+ {
+ replyTo.setRoutingKey(origReplyToString);
+ }
+ messageProps.setReplyTo(replyTo);
+
+ }
+
+ if(properties.getMessageId() != null)
+ {
+ try
+ {
+ String messageIdAsString = properties.getMessageIdAsString();
+ if(messageIdAsString.startsWith("ID:"))
+ {
+ messageIdAsString = messageIdAsString.substring(3);
+ }
+ UUID uuid = UUID.fromString(messageIdAsString);
+ messageProps.setMessageId(uuid);
+ }
+ catch(IllegalArgumentException e)
+ {
+ // ignore - can't parse
+ }
+ }
+
+
+
+ if(properties.getUserId() != null)
+ {
+ messageProps.setUserId(properties.getUserId().getBytes());
+ }
+
+ FieldTable fieldTable = properties.getHeaders();
+
+ Map<String, Object> appHeaders = FieldTable.convertToMap(fieldTable);
+
+ if(properties.getType() != null)
+ {
+ appHeaders.put("x-jms-type", properties.getTypeAsString());
+ }
+
+
+ messageProps.setApplicationHeaders(appHeaders);
+
+ Header header = new Header(deliveryProps, messageProps, null);
+
+
+ return new MessageMetaData_0_10(header, size, message_0_8.getArrivalTime());
+ }
+}
Propchange: qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java?rev=1458042&view=auto
==============================================================================
--- qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java (added)
+++ qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java Mon Mar 18 22:19:50 2013
@@ -0,0 +1,93 @@
+package org.apache.qpid.server.protocol.converter.v0_8_v1_0;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.qpid.amqp_1_0.messaging.SectionEncoder;
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.Section;
+import org.apache.qpid.amqp_1_0.type.Symbol;
+import org.apache.qpid.amqp_1_0.type.UnsignedByte;
+import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
+import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties;
+import org.apache.qpid.amqp_1_0.type.messaging.Header;
+import org.apache.qpid.amqp_1_0.type.messaging.Properties;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.protocol.v0_8.AMQMessage;
+import org.apache.qpid.server.protocol.v1_0.MessageConverter_to_1_0;
+import org.apache.qpid.server.protocol.v1_0.MessageMetaData_1_0;
+
+public class MessageConverter_0_8_to_1_0 extends MessageConverter_to_1_0<AMQMessage>
+{
+ @Override
+ public Class<AMQMessage> getInputClass()
+ {
+ return AMQMessage.class;
+ }
+
+ protected MessageMetaData_1_0 convertMetaData(final AMQMessage serverMessage, SectionEncoder sectionEncoder)
+ {
+
+ List<Section> sections = new ArrayList<Section>(3);
+
+ Header header = new Header();
+
+ header.setDurable(serverMessage.isPersistent());
+
+ BasicContentHeaderProperties contentHeader =
+ (BasicContentHeaderProperties) serverMessage.getContentHeaderBody().getProperties();
+
+ header.setPriority(UnsignedByte.valueOf(contentHeader.getPriority()));
+ final long expiration = serverMessage.getExpiration();
+ final long arrivalTime = serverMessage.getArrivalTime();
+
+ if(expiration > arrivalTime)
+ {
+ header.setTtl(UnsignedInteger.valueOf(expiration - arrivalTime));
+ }
+ sections.add(header);
+
+
+ Properties props = new Properties();
+
+ props.setContentEncoding(Symbol.valueOf(contentHeader.getEncodingAsString()));
+
+ props.setContentType(Symbol.valueOf(contentHeader.getContentTypeAsString()));
+
+ // Modify the content type when we are dealing with java object messages produced by the Qpid 0.x client
+ if(props.getContentType() == Symbol.valueOf("application/java-object-stream"))
+ {
+ props.setContentType(Symbol.valueOf("application/x-java-serialized-object"));
+ }
+
+ final AMQShortString correlationId = contentHeader.getCorrelationId();
+ if(correlationId != null)
+ {
+ props.setCorrelationId(new Binary(correlationId.getBytes()));
+ }
+ // props.setCreationTime();
+ // props.setGroupId();
+ // props.setGroupSequence();
+ final AMQShortString messageId = contentHeader.getMessageId();
+ if(messageId != null)
+ {
+ props.setMessageId(new Binary(messageId.getBytes()));
+ }
+ props.setReplyTo(String.valueOf(contentHeader.getReplyTo()));
+
+ // props.setReplyToGroupId();
+ props.setSubject(serverMessage.getRoutingKey());
+ // props.setTo();
+ if(contentHeader.getUserId() != null)
+ {
+ props.setUserId(new Binary(contentHeader.getUserId().getBytes()));
+ }
+ sections.add(props);
+
+ sections.add(new ApplicationProperties(FieldTable.convertToMap(contentHeader.getHeaders())));
+
+ return new MessageMetaData_1_0(sections, sectionEncoder);
+ }
+
+}
Propchange: qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_0_8.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_0_8.java?rev=1458042&view=auto
==============================================================================
--- qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_0_8.java (added)
+++ qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_0_8.java Mon Mar 18 22:19:50 2013
@@ -0,0 +1,27 @@
+package org.apache.qpid.server.protocol.converter.v0_8_v1_0;
+
+import org.apache.qpid.server.plugin.MessageConverter;
+import org.apache.qpid.server.protocol.v0_8.AMQMessage;
+import org.apache.qpid.server.protocol.v1_0.Message_1_0;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+public class MessageConverter_1_0_to_0_8 implements MessageConverter<Message_1_0, AMQMessage>
+{
+ @Override
+ public Class<Message_1_0> getInputClass()
+ {
+ return Message_1_0.class;
+ }
+
+ @Override
+ public Class<AMQMessage> getOutputClass()
+ {
+ return AMQMessage.class;
+ }
+
+ @Override
+ public AMQMessage convert(Message_1_0 message, VirtualHost vhost)
+ {
+ return null; //TODO - Implement
+ }
+}
Propchange: qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_0_8.java
------------------------------------------------------------------------------
svn:eol-style = native
Copied: qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java (from r1457505, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/CreditCreditManager.java)
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java?p2=qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java&p1=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/CreditCreditManager.java&r1=1457505&r2=1458042&rev=1458042&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/CreditCreditManager.java (original)
+++ qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java Mon Mar 18 22:19:50 2013
@@ -18,10 +18,10 @@
* under the License.
*
*/
-package org.apache.qpid.server.flow;
+package org.apache.qpid.server.protocol.v0_10;
-public class CreditCreditManager extends AbstractFlowCreditManager implements FlowCreditManager_0_10
+import org.apache.qpid.server.flow.AbstractFlowCreditManager;public class CreditCreditManager extends AbstractFlowCreditManager implements FlowCreditManager_0_10
{
private volatile long _bytesCredit;
private volatile long _messageCredit;
@@ -63,7 +63,7 @@ public class CreditCreditManager extends
{
}
-
+
public synchronized void addCredit(final long messageCredit, final long bytesCredit)
{
boolean notifyIncrease = true;
Copied: qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java (from r1457505, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ExplicitAcceptDispositionChangeListener.java)
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java?p2=qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java&p1=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ExplicitAcceptDispositionChangeListener.java&r1=1457505&r2=1458042&rev=1458042&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ExplicitAcceptDispositionChangeListener.java (original)
+++ qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java Mon Mar 18 22:19:50 2013
@@ -18,12 +18,10 @@
* under the License.
*
*/
-package org.apache.qpid.server.subscription;
+package org.apache.qpid.server.protocol.v0_10;
import org.apache.log4j.Logger;
-
import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.transport.ServerSession;
class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDispositionChangeListener
Propchange: qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java
------------------------------------------------------------------------------
svn:executable = *
Copied: qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/FlowCreditManager_0_10.java (from r1457505, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager_0_10.java)
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/FlowCreditManager_0_10.java?p2=qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/FlowCreditManager_0_10.java&p1=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager_0_10.java&r1=1457505&r2=1458042&rev=1458042&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager_0_10.java (original)
+++ qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/FlowCreditManager_0_10.java Mon Mar 18 22:19:50 2013
@@ -18,7 +18,9 @@
* under the License.
*
*/
-package org.apache.qpid.server.flow;
+package org.apache.qpid.server.protocol.v0_10;
+
+import org.apache.qpid.server.flow.FlowCreditManager;
public interface FlowCreditManager_0_10 extends FlowCreditManager
{
Propchange: qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/FlowCreditManager_0_10.java
------------------------------------------------------------------------------
svn:executable = *
Copied: qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java (from r1457505, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ImplicitAcceptDispositionChangeListener.java)
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java?p2=qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java&p1=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ImplicitAcceptDispositionChangeListener.java&r1=1457505&r2=1458042&rev=1458042&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ImplicitAcceptDispositionChangeListener.java (original)
+++ qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java Mon Mar 18 22:19:50 2013
@@ -18,12 +18,12 @@
* under the License.
*
*/
-package org.apache.qpid.server.subscription;
+package org.apache.qpid.server.protocol.v0_10;
import org.apache.log4j.Logger;
import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.transport.ServerSession;
+import org.apache.qpid.server.protocol.v0_10.ServerSession;
class ImplicitAcceptDispositionChangeListener implements ServerSession.MessageDispositionChangeListener
{
Propchange: qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java
------------------------------------------------------------------------------
svn:executable = *
Copied: qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java (from r1457505, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/MessageAcceptCompletionListener.java)
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java?p2=qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java&p1=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/MessageAcceptCompletionListener.java&r1=1457505&r2=1458042&rev=1458042&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/MessageAcceptCompletionListener.java (original)
+++ qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java Mon Mar 18 22:19:50 2013
@@ -19,10 +19,9 @@
*
*/
-package org.apache.qpid.server.subscription;
+package org.apache.qpid.server.protocol.v0_10;
import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.transport.ServerSession;
import org.apache.qpid.transport.Method;
public class MessageAcceptCompletionListener implements Method.CompletionListener
Propchange: qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java
------------------------------------------------------------------------------
svn:executable = *
Added: qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaDataType_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaDataType_0_10.java?rev=1458042&view=auto
==============================================================================
--- qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaDataType_0_10.java (added)
+++ qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaDataType_0_10.java Mon Mar 18 22:19:50 2013
@@ -0,0 +1,58 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v0_10;
+
+import java.nio.ByteBuffer;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.plugin.MessageMetaDataType;
+import org.apache.qpid.server.store.StoredMessage;
+
+public class MessageMetaDataType_0_10 implements MessageMetaDataType<MessageMetaData_0_10>
+{
+
+ @Override
+ public int ordinal()
+ {
+ return 1;
+ }
+
+ @Override
+ public MessageMetaData_0_10 createMetaData(ByteBuffer buf)
+ {
+ return MessageMetaData_0_10.FACTORY.createMetaData(buf);
+ }
+
+ @Override
+ public ServerMessage<MessageMetaData_0_10> createMessage(StoredMessage<MessageMetaData_0_10> msg)
+ {
+ return new MessageTransferMessage(msg, null);
+ }
+
+ public int hashCode()
+ {
+ return ordinal();
+ }
+
+ public boolean equals(Object o)
+ {
+ return o != null && o.getClass() == getClass();
+ }
+}
Propchange: qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaDataType_0_10.java
------------------------------------------------------------------------------
svn:eol-style = native
Copied: qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java (from r1457505, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java)
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java?p2=qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java&p1=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java&r1=1457505&r2=1458042&rev=1458042&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java (original)
+++ qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java Mon Mar 18 22:19:50 2013
@@ -18,10 +18,12 @@
* under the License.
*
*/
-package org.apache.qpid.server.message;
+package org.apache.qpid.server.protocol.v0_10;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.store.MessageMetaDataType;
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.plugin.MessageMetaDataType;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.transport.DeliveryProperties;
import org.apache.qpid.transport.Header;
@@ -38,6 +40,8 @@ import java.util.List;
public class MessageMetaData_0_10 implements StorableMessageMetaData, InboundMessage
{
+ private static final MessageMetaDataType_0_10 TYPE = new MessageMetaDataType_0_10();
+
private Header _header;
private DeliveryProperties _deliveryProps;
private MessageProperties _messageProps;
@@ -58,7 +62,7 @@ public class MessageMetaData_0_10 implem
this(xfr.getHeader(), xfr.getBodySize(), System.currentTimeMillis());
}
- private MessageMetaData_0_10(Header header, int bodySize, long arrivalTime)
+ public MessageMetaData_0_10(Header header, int bodySize, long arrivalTime)
{
_header = header;
if(_header != null)
@@ -81,7 +85,7 @@ public class MessageMetaData_0_10 implem
public MessageMetaDataType getType()
{
- return MessageMetaDataType.META_DATA_0_10;
+ return TYPE;
}
public int getStorableSize()
Propchange: qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
------------------------------------------------------------------------------
svn:executable = *
Copied: qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferHeader.java (from r1457505, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferHeader.java)
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferHeader.java?p2=qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferHeader.java&p1=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferHeader.java&r1=1457505&r2=1458042&rev=1458042&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferHeader.java (original)
+++ qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferHeader.java Mon Mar 18 22:19:50 2013
@@ -18,9 +18,10 @@
* under the License.
*
*/
-package org.apache.qpid.server.message;
+package org.apache.qpid.server.protocol.v0_10;
import java.util.*;
+import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.transport.DeliveryProperties;
import org.apache.qpid.transport.MessageDeliveryPriority;
import org.apache.qpid.transport.MessageProperties;
Copied: qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java (from r1457505, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java)
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java?p2=qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java&p1=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java&r1=1457505&r2=1458042&rev=1458042&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java (original)
+++ qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java Mon Mar 18 22:19:50 2013
@@ -18,9 +18,13 @@
* under the License.
*
*/
-package org.apache.qpid.server.message;
+package org.apache.qpid.server.protocol.v0_10;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.AbstractServerMessageImpl;
+import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.transport.Header;
Added: qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java?rev=1458042&view=auto
==============================================================================
--- qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java (added)
+++ qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java Mon Mar 18 22:19:50 2013
@@ -0,0 +1,90 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v0_10;
+
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.protocol.AmqpProtocolVersion;
+import org.apache.qpid.server.plugin.ProtocolEngineCreator;
+import org.apache.qpid.transport.ConnectionDelegate;
+import org.apache.qpid.transport.network.NetworkConnection;
+
+public class ProtocolEngineCreator_0_10 implements ProtocolEngineCreator
+{
+
+ private static final byte[] AMQP_0_10_HEADER =
+ new byte[] { (byte) 'A',
+ (byte) 'M',
+ (byte) 'Q',
+ (byte) 'P',
+ (byte) 1,
+ (byte) 1,
+ (byte) 0,
+ (byte) 10
+ };
+
+
+ public ProtocolEngineCreator_0_10()
+ {
+ }
+
+ public AmqpProtocolVersion getVersion()
+ {
+ return AmqpProtocolVersion.v0_10;
+ }
+
+
+ public byte[] getHeaderIdentifier()
+ {
+ return AMQP_0_10_HEADER;
+ }
+
+ public ServerProtocolEngine newProtocolEngine(Broker broker,
+ NetworkConnection network,
+ long id)
+ {
+ String fqdn = null;
+ SocketAddress address = network.getLocalAddress();
+ if (address instanceof InetSocketAddress)
+ {
+ fqdn = ((InetSocketAddress) address).getHostName();
+ }
+ final ConnectionDelegate connDelegate = new ServerConnectionDelegate(broker,
+ fqdn, broker.getSubjectCreator(address));
+
+ ServerConnection conn = new ServerConnection(id);
+
+ conn.setConnectionDelegate(connDelegate);
+ conn.setRemoteAddress(network.getRemoteAddress());
+ conn.setLocalAddress(network.getLocalAddress());
+ return new ProtocolEngine_0_10( conn, network);
+ }
+
+
+ private static ProtocolEngineCreator INSTANCE = new ProtocolEngineCreator_0_10();
+
+ public static ProtocolEngineCreator getInstance()
+ {
+ return INSTANCE;
+ }
+}
Propchange: qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java
------------------------------------------------------------------------------
svn:eol-style = native
Copied: qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java (from r1457505, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java)
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java?p2=qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java&p1=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java&r1=1457505&r2=1458042&rev=1458042&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java (original)
+++ qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java Mon Mar 18 22:19:50 2013
@@ -18,11 +18,11 @@
* under the License.
*
*/
-package org.apache.qpid.server.protocol;
+package org.apache.qpid.server.protocol.v0_10;
import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
-import org.apache.qpid.server.transport.ServerConnection;
+import org.apache.qpid.server.protocol.v0_10.ServerConnection;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.network.Assembler;
import org.apache.qpid.transport.network.Disassembler;
Propchange: qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
------------------------------------------------------------------------------
svn:executable = *
Copied: qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java (from r1457505, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java)
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java?p2=qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java&p1=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java&r1=1457505&r2=1458042&rev=1458042&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java (original)
+++ qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java Mon Mar 18 22:19:50 2013
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpid.server.transport;
+package org.apache.qpid.server.protocol.v0_10;
import java.net.SocketAddress;
import java.security.Principal;
@@ -463,7 +463,13 @@ public class ServerConnection extends Co
public String getPrincipalAsString()
{
- return getAuthorizedPrincipal().getName();
+ return getAuthorizedPrincipal() == null ? null : getAuthorizedPrincipal().getName();
+ }
+
+ @Override
+ public String getVirtualHostName()
+ {
+ return _virtualHost.getName();
}
public long getSessionCountLimit()
Copied: qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java (from r1457505, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java)
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java?p2=qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java&p1=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java&r1=1457505&r2=1458042&rev=1458042&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java (original)
+++ qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java Mon Mar 18 22:19:50 2013
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpid.server.transport;
+package org.apache.qpid.server.protocol.v0_10;
import java.util.ArrayList;
import java.util.Collection;
@@ -39,7 +39,7 @@ import org.apache.qpid.server.security.S
import org.apache.qpid.server.security.SubjectCreator;
import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationStatus;
import org.apache.qpid.server.security.auth.SubjectAuthenticationResult;
-import org.apache.qpid.server.subscription.Subscription_0_10;
+import org.apache.qpid.server.protocol.v0_10.Subscription_0_10;
import org.apache.qpid.server.virtualhost.State;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.*;
Copied: qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java (from r1457505, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java)
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java?p2=qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java&p1=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java&r1=1457505&r2=1458042&rev=1458042&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java (original)
+++ qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java Mon Mar 18 22:19:50 2013
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpid.server.transport;
+package org.apache.qpid.server.protocol.v0_10;
import java.security.Principal;
import java.text.MessageFormat;
@@ -51,9 +51,9 @@ import org.apache.qpid.server.logging.ac
import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
import org.apache.qpid.server.message.InboundMessage;
-import org.apache.qpid.server.message.MessageMetaData_0_10;
+import org.apache.qpid.server.protocol.v0_10.MessageMetaData_0_10;
import org.apache.qpid.server.message.MessageReference;
-import org.apache.qpid.server.message.MessageTransferMessage;
+import org.apache.qpid.server.protocol.v0_10.MessageTransferMessage;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
@@ -63,7 +63,7 @@ import org.apache.qpid.server.queue.Queu
import org.apache.qpid.server.security.AuthorizationHolder;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreFuture;
-import org.apache.qpid.server.subscription.Subscription_0_10;
+import org.apache.qpid.server.protocol.v0_10.Subscription_0_10;
import org.apache.qpid.server.txn.AlreadyKnownDtxException;
import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
import org.apache.qpid.server.txn.DistributedTransaction;
Copied: qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java (from r1457505, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java)
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java?p2=qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java&p1=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java&r1=1457505&r2=1458042&rev=1458042&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java (original)
+++ qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java Mon Mar 18 22:19:50 2013
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpid.server.transport;
+package org.apache.qpid.server.protocol.v0_10;
import org.apache.log4j.Logger;
@@ -34,11 +34,7 @@ import org.apache.qpid.server.exchange.E
import org.apache.qpid.server.exchange.HeadersExchange;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.FilterManagerFactory;
-import org.apache.qpid.server.flow.FlowCreditManager_0_10;
-import org.apache.qpid.server.flow.WindowCreditManager;
import org.apache.qpid.server.logging.messages.ExchangeMessages;
-import org.apache.qpid.server.message.MessageMetaData_0_10;
-import org.apache.qpid.server.message.MessageTransferMessage;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.queue.AMQQueue;
@@ -50,8 +46,6 @@ import org.apache.qpid.server.store.Dura
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreFuture;
import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
-import org.apache.qpid.server.subscription.Subscription_0_10;
import org.apache.qpid.server.txn.AlreadyKnownDtxException;
import org.apache.qpid.server.txn.DtxNotSelectedException;
import org.apache.qpid.server.txn.IncorrectDtxStateException;
@@ -251,14 +245,15 @@ public class ServerSessionDelegate exten
return;
}
- Subscription_0_10 sub = SubscriptionFactoryImpl.INSTANCE.createSubscription((ServerSession)session,
- destination,
- method.getAcceptMode(),
- method.getAcquireMode(),
- MessageFlowMode.WINDOW,
- creditManager,
- filterManager,
- method.getArguments());
+ Subscription_0_10 sub =
+ new Subscription_0_10((ServerSession) session,
+ destination,
+ method.getAcceptMode(),
+ method.getAcquireMode(),
+ MessageFlowMode.WINDOW,
+ creditManager,
+ filterManager,
+ method.getArguments());
((ServerSession)session).register(destination, sub);
try
Copied: qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java (from r1457505, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java)
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java?p2=qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java&p1=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java&r1=1457505&r2=1458042&rev=1458042&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java (original)
+++ qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java Mon Mar 18 22:19:50 2013
@@ -18,33 +18,39 @@
* under the License.
*
*/
-package org.apache.qpid.server.subscription;
+package org.apache.qpid.server.protocol.v0_10;
+import java.text.MessageFormat;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.filter.FilterManager;
-import org.apache.qpid.server.flow.CreditCreditManager;
import org.apache.qpid.server.flow.FlowCreditManager;
-import org.apache.qpid.server.flow.FlowCreditManager_0_10;
-import org.apache.qpid.server.flow.WindowCreditManager;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.GenericActor;
import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.logging.messages.SubscriptionMessages;
-import org.apache.qpid.server.message.AMQMessage;
import org.apache.qpid.server.message.InboundMessage;
-import org.apache.qpid.server.message.MessageTransferMessage;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.plugin.MessageConverter;
+import org.apache.qpid.server.protocol.MessageConverterRegistry;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.InboundMessageAdapter;
import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.transport.ServerSession;
+import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.transport.DeliveryProperties;
@@ -52,35 +58,16 @@ import org.apache.qpid.transport.Header;
import org.apache.qpid.transport.MessageAcceptMode;
import org.apache.qpid.transport.MessageAcquireMode;
import org.apache.qpid.transport.MessageCreditUnit;
-import org.apache.qpid.transport.MessageDeliveryPriority;
import org.apache.qpid.transport.MessageFlowMode;
import org.apache.qpid.transport.MessageProperties;
import org.apache.qpid.transport.MessageTransfer;
import org.apache.qpid.transport.Method;
import org.apache.qpid.transport.Option;
-import org.apache.qpid.transport.ReplyTo;
import org.apache.qpid.transport.Struct;
-import org.apache.qpid.url.AMQBindingURL;
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.QUEUE_FORMAT;
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SUBSCRIPTION_FORMAT;
-import java.net.URISyntaxException;
-import java.nio.ByteBuffer;
-import java.text.MessageFormat;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCreditManagerListener, LogSubject
{
private final long _subscriptionID;
@@ -137,9 +124,9 @@ public class Subscription_0_10 implement
MessageAcquireMode acquireMode,
MessageFlowMode flowMode,
FlowCreditManager_0_10 creditManager,
- FilterManager filters,Map<String, Object> arguments, long subscriptionId)
+ FilterManager filters,Map<String, Object> arguments)
{
- _subscriptionID = subscriptionId;
+ _subscriptionID = SUB_ID_GENERATOR.getAndIncrement();
_session = session;
_postIdSettingAction = new AddMessageDispositionListenerAction(session);
_destination = destination;
@@ -201,7 +188,7 @@ public class Subscription_0_10 implement
{
return _destination;
}
-
+
public boolean isSuspended()
{
return !isActive() || _deleted.get() || _session.isClosing(); // TODO check for Session suspension
@@ -219,16 +206,26 @@ public class Subscription_0_10 implement
return false;
}
- if (_noLocal && entry.getMessage() instanceof MessageTransferMessage)
+ if(entry.getMessage() instanceof MessageTransferMessage)
+ {
+ if (_noLocal)
+ {
+ Object connectionRef = ((MessageTransferMessage)entry.getMessage()).getConnectionReference();
+ if (connectionRef != null && connectionRef == _session.getReference())
+ {
+ return false;
+ }
+ }
+ }
+ else
{
- Object connectionRef = ((MessageTransferMessage)entry.getMessage()).getConnectionReference();
- if (connectionRef != null && connectionRef == _session.getReference())
+ // No interest in messages we can't convert into MessageTransferMessages
+ if(MessageConverterRegistry.getConverter(entry.getMessage().getClass(),MessageTransferMessage.class)==null)
{
return false;
}
}
-
return checkFilters(entry);
@@ -357,201 +354,71 @@ public class Subscription_0_10 implement
DeliveryProperties deliveryProps;
MessageProperties messageProps = null;
+ MessageTransferMessage msg;
if(serverMsg instanceof MessageTransferMessage)
{
- MessageTransferMessage msg = (MessageTransferMessage) serverMsg;
- DeliveryProperties origDeliveryProps = msg.getHeader() == null ? null : msg.getHeader().getDeliveryProperties();
- messageProps = msg.getHeader() == null ? null : msg.getHeader().getMessageProperties();
-
- deliveryProps = new DeliveryProperties();
- if(origDeliveryProps != null)
- {
- if(origDeliveryProps.hasDeliveryMode())
- {
- deliveryProps.setDeliveryMode(origDeliveryProps.getDeliveryMode());
- }
- if(origDeliveryProps.hasExchange())
- {
- deliveryProps.setExchange(origDeliveryProps.getExchange());
- }
- if(origDeliveryProps.hasExpiration())
- {
- deliveryProps.setExpiration(origDeliveryProps.getExpiration());
- }
- if(origDeliveryProps.hasPriority())
- {
- deliveryProps.setPriority(origDeliveryProps.getPriority());
- }
- if(origDeliveryProps.hasRoutingKey())
- {
- deliveryProps.setRoutingKey(origDeliveryProps.getRoutingKey());
- }
- if(origDeliveryProps.hasTimestamp())
- {
- deliveryProps.setTimestamp(origDeliveryProps.getTimestamp());
- }
- if(origDeliveryProps.hasTtl())
- {
- deliveryProps.setTtl(origDeliveryProps.getTtl());
- }
-
-
- }
-
- deliveryProps.setRedelivered(entry.isRedelivered());
-
- if(_trace != null && messageProps == null)
- {
- messageProps = new MessageProperties();
- }
-
- Header header = new Header(deliveryProps, messageProps, msg.getHeader() == null ? null : msg.getHeader().getNonStandardProperties());
-
-
- xfr = batch ? new MessageTransfer(_destination,_acceptMode,_acquireMode,header,msg.getBody(), BATCHED)
- : new MessageTransfer(_destination,_acceptMode,_acquireMode,header,msg.getBody());
+ msg = (MessageTransferMessage) serverMsg;
}
- else if(serverMsg instanceof AMQMessage)
+ else
{
- AMQMessage message_0_8 = (AMQMessage) serverMsg;
- deliveryProps = new DeliveryProperties();
- messageProps = new MessageProperties();
+ final MessageConverter converter =
+ MessageConverterRegistry.getConverter(serverMsg.getClass(), MessageTransferMessage.class);
+
+ msg = (MessageTransferMessage) converter.convert(serverMsg, getQueue().getVirtualHost());
+ }
- int size = (int) message_0_8.getSize();
- ByteBuffer body = ByteBuffer.allocate(size);
- message_0_8.getContent(body, 0);
- body.flip();
+ DeliveryProperties origDeliveryProps = msg.getHeader() == null ? null : msg.getHeader().getDeliveryProperties();
+ messageProps = msg.getHeader() == null ? null : msg.getHeader().getMessageProperties();
- BasicContentHeaderProperties properties =
- (BasicContentHeaderProperties) message_0_8.getContentHeaderBody().getProperties();
- final AMQShortString exchange = message_0_8.getMessagePublishInfo().getExchange();
- if(exchange != null)
+ deliveryProps = new DeliveryProperties();
+ if(origDeliveryProps != null)
+ {
+ if(origDeliveryProps.hasDeliveryMode())
{
- deliveryProps.setExchange(exchange.toString());
+ deliveryProps.setDeliveryMode(origDeliveryProps.getDeliveryMode());
}
- deliveryProps.setExpiration(message_0_8.getExpiration());
- deliveryProps.setImmediate(message_0_8.isImmediate());
- deliveryProps.setPriority(MessageDeliveryPriority.get(properties.getPriority()));
- deliveryProps.setRedelivered(entry.isRedelivered());
- deliveryProps.setRoutingKey(message_0_8.getRoutingKey());
- deliveryProps.setTimestamp(properties.getTimestamp());
-
- messageProps.setContentEncoding(properties.getEncodingAsString());
- messageProps.setContentLength(size);
- if(properties.getAppId() != null)
+ if(origDeliveryProps.hasExchange())
{
- messageProps.setAppId(properties.getAppId().getBytes());
+ deliveryProps.setExchange(origDeliveryProps.getExchange());
}
- messageProps.setContentType(properties.getContentTypeAsString());
- if(properties.getCorrelationId() != null)
+ if(origDeliveryProps.hasExpiration())
{
- messageProps.setCorrelationId(properties.getCorrelationId().getBytes());
+ deliveryProps.setExpiration(origDeliveryProps.getExpiration());
}
-
- if(properties.getReplyTo() != null && properties.getReplyTo().length() != 0)
+ if(origDeliveryProps.hasPriority())
{
- String origReplyToString = properties.getReplyTo().asString();
- ReplyTo replyTo = new ReplyTo();
- // if the string looks like a binding URL, then attempt to parse it...
- try
- {
- AMQBindingURL burl = new AMQBindingURL(origReplyToString);
- AMQShortString routingKey = burl.getRoutingKey();
- if(routingKey != null)
- {
- replyTo.setRoutingKey(routingKey.asString());
- }
-
- AMQShortString exchangeName = burl.getExchangeName();
- if(exchangeName != null)
- {
- replyTo.setExchange(exchangeName.asString());
- }
- }
- catch (URISyntaxException e)
- {
- replyTo.setRoutingKey(origReplyToString);
- }
- messageProps.setReplyTo(replyTo);
-
+ deliveryProps.setPriority(origDeliveryProps.getPriority());
}
-
- if(properties.getMessageId() != null)
+ if(origDeliveryProps.hasRoutingKey())
{
- try
- {
- String messageIdAsString = properties.getMessageIdAsString();
- if(messageIdAsString.startsWith("ID:"))
- {
- messageIdAsString = messageIdAsString.substring(3);
- }
- UUID uuid = UUID.fromString(messageIdAsString);
- messageProps.setMessageId(uuid);
- }
- catch(IllegalArgumentException e)
- {
- // ignore - can't parse
- }
+ deliveryProps.setRoutingKey(origDeliveryProps.getRoutingKey());
}
-
-
-
- if(properties.getUserId() != null)
+ if(origDeliveryProps.hasTimestamp())
{
- messageProps.setUserId(properties.getUserId().getBytes());
+ deliveryProps.setTimestamp(origDeliveryProps.getTimestamp());
}
-
- FieldTable fieldTable = properties.getHeaders();
-
- Map<String, Object> appHeaders = FieldTable.convertToMap(fieldTable);
-
- if(properties.getType() != null)
+ if(origDeliveryProps.hasTtl())
{
- appHeaders.put("x-jms-type", properties.getTypeAsString());
+ deliveryProps.setTtl(origDeliveryProps.getTtl());
}
- messageProps.setApplicationHeaders(appHeaders);
-
- Header header = new Header(deliveryProps, messageProps, null);
- xfr = batch ? new MessageTransfer(_destination,_acceptMode,_acquireMode,header, body, BATCHED)
- : new MessageTransfer(_destination,_acceptMode,_acquireMode,header, body);
}
- else
- {
-
- deliveryProps = new DeliveryProperties();
- messageProps = new MessageProperties();
-
- int size = (int) serverMsg.getSize();
- ByteBuffer body = ByteBuffer.allocate(size);
- serverMsg.getContent(body, 0);
- body.flip();
+ deliveryProps.setRedelivered(entry.isRedelivered());
- deliveryProps.setExpiration(serverMsg.getExpiration());
- deliveryProps.setImmediate(serverMsg.isImmediate());
- deliveryProps.setPriority(MessageDeliveryPriority.get(serverMsg.getMessageHeader().getPriority()));
- deliveryProps.setRedelivered(entry.isRedelivered());
- deliveryProps.setRoutingKey(serverMsg.getRoutingKey());
- deliveryProps.setTimestamp(serverMsg.getMessageHeader().getTimestamp());
+ if(_trace != null && messageProps == null)
+ {
+ messageProps = new MessageProperties();
+ }
- messageProps.setContentEncoding(serverMsg.getMessageHeader().getEncoding());
- messageProps.setContentLength(size);
- messageProps.setContentType(serverMsg.getMessageHeader().getMimeType());
- if(serverMsg.getMessageHeader().getCorrelationId() != null)
- {
- messageProps.setCorrelationId(serverMsg.getMessageHeader().getCorrelationId().getBytes());
- }
+ Header header = new Header(deliveryProps, messageProps, msg.getHeader() == null ? null : msg.getHeader().getNonStandardProperties());
- // TODO - ReplyTo
- Header header = new Header(deliveryProps, messageProps, null);
- xfr = batch ? new MessageTransfer(_destination,_acceptMode,_acquireMode,header, body, BATCHED)
- : new MessageTransfer(_destination,_acceptMode,_acquireMode,header, body);
- }
+ xfr = batch ? new MessageTransfer(_destination,_acceptMode,_acquireMode,header,msg.getBody(), BATCHED)
+ : new MessageTransfer(_destination,_acceptMode,_acquireMode,header,msg.getBody());
boolean excludeDueToFederation = false;
Copied: qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/TransferMessageReference.java (from r1457505, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/TransferMessageReference.java)
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/TransferMessageReference.java?p2=qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/TransferMessageReference.java&p1=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/TransferMessageReference.java&r1=1457505&r2=1458042&rev=1458042&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/TransferMessageReference.java (original)
+++ qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/TransferMessageReference.java Mon Mar 18 22:19:50 2013
@@ -18,7 +18,9 @@
* under the License.
*
*/
-package org.apache.qpid.server.message;
+package org.apache.qpid.server.protocol.v0_10;
+
+import org.apache.qpid.server.message.MessageReference;
public class TransferMessageReference extends MessageReference<MessageTransferMessage>
{
Copied: qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java (from r1457505, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java)
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java?p2=qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java&p1=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java&r1=1457505&r2=1458042&rev=1458042&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java (original)
+++ qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java Mon Mar 18 22:19:50 2013
@@ -18,9 +18,10 @@
* under the License.
*
*/
-package org.apache.qpid.server.flow;
+package org.apache.qpid.server.protocol.v0_10;
import org.apache.log4j.Logger;
+import org.apache.qpid.server.flow.AbstractFlowCreditManager;
public class WindowCreditManager extends AbstractFlowCreditManager implements FlowCreditManager_0_10
{
Copied: qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (from r1457505, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java)
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?p2=qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java&p1=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java&r1=1457505&r2=1458042&rev=1458042&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Mon Mar 18 22:19:50 2013
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpid.server;
+package org.apache.qpid.server.protocol.v0_8;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -37,7 +37,6 @@ import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
-
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQSecurityException;
@@ -51,9 +50,8 @@ import org.apache.qpid.framing.MethodReg
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.TransactionTimeoutHelper;
import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction;
-import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
-import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl;
import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.flow.FlowCreditManager;
@@ -65,19 +63,17 @@ import org.apache.qpid.server.logging.ac
import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.logging.messages.ExchangeMessages;
import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
-import org.apache.qpid.server.message.AMQMessage;
import org.apache.qpid.server.message.InboundMessage;
-import org.apache.qpid.server.message.MessageMetaData;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.protocol.AMQConnectionModel;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.protocol.v0_8.ack.UnacknowledgedMessageMap;
+import org.apache.qpid.server.protocol.v0_8.ack.UnacknowledgedMessageMapImpl;
+import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.InboundMessageAdapter;
-import org.apache.qpid.server.queue.IncomingMessage;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreFuture;
@@ -85,7 +81,6 @@ import org.apache.qpid.server.store.Stor
import org.apache.qpid.server.subscription.ClientDeliveryMethod;
import org.apache.qpid.server.subscription.RecordDeliveryMethod;
import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.LocalTransaction.ActivityTimeAccessor;
Propchange: qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Copied: qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java (from r1457505, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java)
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java?p2=qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java&p1=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java&r1=1457505&r2=1458042&rev=1458042&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java (original)
+++ qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java Mon Mar 18 22:19:50 2013
@@ -18,21 +18,21 @@
* under the License.
*
*/
-package org.apache.qpid.server.message;
+package org.apache.qpid.server.protocol.v0_8;
+import java.lang.ref.WeakReference;
+import java.nio.ByteBuffer;
import org.apache.log4j.Logger;
-
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.AbstractServerMessageImpl;
+import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.store.StoredMessage;
-import java.lang.ref.WeakReference;
-import java.nio.ByteBuffer;
-
/**
* A deliverable message.
*/
@@ -150,7 +150,7 @@ public class AMQMessage extends Abstract
}
- public MessagePublishInfo getMessagePublishInfo() throws AMQException
+ public MessagePublishInfo getMessagePublishInfo()
{
return getMessageMetaData().getMessagePublishInfo();
}
Propchange: qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Copied: qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessageReference.java (from r1457505, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessageReference.java)
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessageReference.java?p2=qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessageReference.java&p1=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessageReference.java&r1=1457505&r2=1458042&rev=1458042&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessageReference.java (original)
+++ qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessageReference.java Mon Mar 18 22:19:50 2013
@@ -18,7 +18,9 @@
* under the License.
*
*/
-package org.apache.qpid.server.message;
+package org.apache.qpid.server.protocol.v0_8;
+
+import org.apache.qpid.server.message.MessageReference;
public class AMQMessageReference extends MessageReference<AMQMessage>
{
Copied: qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java (from r1457505, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java)
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java?p2=qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java&p1=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java&r1=1457505&r2=1458042&rev=1458042&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java (original)
+++ qpid/branches/QPID-4659/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java Mon Mar 18 22:19:50 2013
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpid.server.protocol;
+package org.apache.qpid.server.protocol.v0_8;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -51,9 +51,8 @@ import org.apache.qpid.protocol.AMQConst
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.protocol.ServerProtocolEngine;
-import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.configuration.BrokerProperties;
-import org.apache.qpid.server.handler.ServerMethodDispatcherImpl;
+import org.apache.qpid.server.protocol.v0_8.handler.ServerMethodDispatcherImpl;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.actors.AMQPConnectionActor;
@@ -62,16 +61,17 @@ import org.apache.qpid.server.logging.ac
import org.apache.qpid.server.logging.messages.ConnectionMessages;
import org.apache.qpid.server.logging.subjects.ConnectionLogSubject;
import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.output.ProtocolOutputConverter;
-import org.apache.qpid.server.output.ProtocolOutputConverterRegistry;
+import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
+import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverterRegistry;
+import org.apache.qpid.server.protocol.AMQNoMethodHandlerException;
+import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
-import org.apache.qpid.server.state.AMQState;
-import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.protocol.v0_8.state.AMQState;
+import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.subscription.ClientDeliveryMethod;
import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.subscription.SubscriptionImpl;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.TransportException;
@@ -1157,6 +1157,12 @@ public class AMQProtocolEngine implement
return getAuthId();
}
+ @Override
+ public String getVirtualHostName()
+ {
+ return _virtualHost == null ? null : _virtualHost.getName();
+ }
+
public long getSessionCountLimit()
{
return getMaximumNumberOfChannels();
@@ -1179,7 +1185,7 @@ public class AMQProtocolEngine implement
public String getAuthId()
{
- return getAuthorizedPrincipal().getName();
+ return getAuthorizedPrincipal() == null ? null : getAuthorizedPrincipal().getName();
}
public Integer getRemotePID()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org