You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2007/03/05 19:01:23 UTC

svn commit: r514751 - in /incubator/qpid/branches/qpid.0-9: ./ cpp/lib/broker/BrokerMessageMessage.cpp cpp/lib/broker/BrokerMessageMessage.h cpp/lib/broker/MessageHandlerImpl.cpp

Author: astitcher
Date: Mon Mar  5 10:01:22 2007
New Revision: 514751

URL: http://svn.apache.org/viewvc?view=rev&rev=514751
Log:
 r1239@fuschia:  andrew | 2007-02-26 10:58:52 +0000
 Refactored message transfer to extract commmonality from deliver/get
 r1242@fuschia:  andrew | 2007-03-05 17:54:44 +0000
 Turn oversize inline transfers into reference transfers

Modified:
    incubator/qpid/branches/qpid.0-9/   (props changed)
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp

Propchange: incubator/qpid/branches/qpid.0-9/
------------------------------------------------------------------------------
--- svk:merge (original)
+++ svk:merge Mon Mar  5 10:01:22 2007
@@ -1 +1 @@
-8427bd24-ae5a-4eba-a324-d2fc9c9c6c77:/local/qpid.0-9.ams:1237
+8427bd24-ae5a-4eba-a324-d2fc9c9c6c77:/local/qpid.0-9.ams:1242

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.cpp?view=diff&rev=514751&r1=514750&r2=514751
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.cpp Mon Mar  5 10:01:22 2007
@@ -61,14 +61,13 @@
     reference(reference_)
 {}
 
-void MessageMessage::deliver(
+void MessageMessage::transferMessage(
     framing::ChannelAdapter& channel, 
     const std::string& consumerTag, 
-    u_int64_t /*deliveryTag*/, 
-    u_int32_t /*framesize*/)
-{
+    u_int32_t framesize)
+{	
 	const framing::Content& body = transfer->getBody();
-	
+
 	// Send any reference data
 	if (!body.isInline()){
 		// Open
@@ -81,8 +80,9 @@
 		}
 	}
 	
-	// The the transfer
-    channel.send(
+	// The transfer
+	if ( transfer->size()<=framesize ) {
+    	channel.send(
     	new MessageTransferBody(channel.getVersion(), 
                                 transfer->getTicket(),
                                 consumerTag,
@@ -107,6 +107,44 @@
                                 transfer->getApplicationHeaders(),
                                 body,
                                 transfer->getMandatory()));
+	} else {
+		// Thing to do here is to construct a simple reference message then deliver that instead
+		// fragmentmentation will be taken care of in the delivery
+		// if necessary; problem is to invent a reference name to use
+		string content = body.getValue();
+		string refname = "dummy";
+		TransferPtr newTransfer(
+	    	new MessageTransferBody(channel.getVersion(), 
+	                                transfer->getTicket(),
+	                                consumerTag,
+	                                getRedelivered(),
+	                                transfer->getImmediate(),
+	                                transfer->getTtl(),
+	                                transfer->getPriority(),
+	                                transfer->getTimestamp(),
+	                                transfer->getDeliveryMode(),
+	                                transfer->getExpiration(),
+	                                getExchange(),
+	                                getRoutingKey(),
+	                                transfer->getMessageId(),
+	                                transfer->getCorrelationId(),
+	                                transfer->getReplyTo(),
+	                                transfer->getContentType(),
+	                                transfer->getContentEncoding(),
+	                                transfer->getUserId(),
+	                                transfer->getAppId(),
+	                                transfer->getTransactionId(),
+	                                transfer->getSecurityToken(),
+	                                transfer->getApplicationHeaders(),
+	                                framing::Content(REFERENCE, refname),
+	                                transfer->getMandatory()));
+		ReferencePtr newRef(new Reference(refname));
+		Reference::AppendPtr newAppend(new MessageAppendBody(channel.getVersion(), refname, content));
+		newRef->append(newAppend);
+		MessageMessage newMsg(const_cast<ConnectionToken*>(getPublisher()), 0, newTransfer, newRef);
+		newMsg.transferMessage(channel, consumerTag, framesize);
+		return;
+	}
 	// Close any reference data
 	if (!body.isInline()){
 		// Close
@@ -114,39 +152,24 @@
 	}
 }
 
+void MessageMessage::deliver(
+    framing::ChannelAdapter& channel, 
+    const std::string& consumerTag, 
+    u_int64_t /*deliveryTag*/, 
+    u_int32_t framesize)
+{
+	transferMessage(channel, consumerTag, framesize);
+}
+
 void MessageMessage::sendGetOk(
     const framing::MethodContext& context,
 	const std::string& destination,
     u_int32_t /*messageCount*/,
     u_int64_t /*deliveryTag*/, 
-    u_int32_t /*framesize*/)
+    u_int32_t framesize)
 {
 	framing::ChannelAdapter* channel = context.channel;
-    channel->send(
-    	new MessageTransferBody(channel->getVersion(), 
-                                transfer->getTicket(),
-                                destination,
-                                getRedelivered(),
-                                transfer->getImmediate(),
-                                transfer->getTtl(),
-                                transfer->getPriority(),
-                                transfer->getTimestamp(),
-                                transfer->getDeliveryMode(),
-                                transfer->getExpiration(),
-                                getExchange(),
-                                getRoutingKey(),
-                                transfer->getMessageId(),
-                                transfer->getCorrelationId(),
-                                transfer->getReplyTo(),
-                                transfer->getContentType(),
-                                transfer->getContentEncoding(),
-                                transfer->getUserId(),
-                                transfer->getAppId(),
-                                transfer->getTransactionId(),
-                                transfer->getSecurityToken(),
-                                transfer->getApplicationHeaders(),
-                                transfer->getBody(),
-                                transfer->getMandatory()));
+	transferMessage(*channel, destination, framesize);
 }
 
 bool MessageMessage::isComplete()

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.h?view=diff&rev=514751&r1=514750&r2=514751
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.h Mon Mar  5 10:01:22 2007
@@ -76,6 +76,10 @@
     u_int64_t expectedContentSize();
 
   private:
+  	void transferMessage(framing::ChannelAdapter& channel, 
+    					 const std::string& consumerTag, 
+    					 u_int32_t framesize);
+  
     framing::RequestId requestId;
     const TransferPtr transfer;
     const ReferencePtr reference;

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp?view=diff&rev=514751&r1=514750&r2=514751
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp Mon Mar  5 10:01:22 2007
@@ -187,12 +187,12 @@
 }
 
 void
-MessageHandlerImpl::reject(const MethodContext&,
+MessageHandlerImpl::reject(const MethodContext& /*context*/,
                            u_int16_t /*code*/,
                            const string& /*text*/ )
 {
-    // FIXME astitcher 2007-01-11: 0-9 feature
-    THROW_QPID_ERROR(INTERNAL_ERROR, "Unimplemented ");
+    channel.ack();
+    // channel.requeue();
 }
 
 void