You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2009/07/20 21:05:08 UTC
svn commit: r795958 [2/3] - in /qpid/branches/java-broker-0-10/qpid/java:
broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/
broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/
broker/src/main/java/org/apache/q...
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java Mon Jul 20 19:05:05 2009
@@ -36,7 +36,7 @@
this.xpath = xpath;
}
- public Object evaluate(Filterable message) throws AMQException {
+ public Object evaluate(Filterable message) {
return Boolean.FALSE;
}
@@ -49,7 +49,7 @@
* @return true if the expression evaluates to Boolean.TRUE.
* @throws AMQException
*/
- public boolean matches(Filterable message) throws AMQException
+ public boolean matches(Filterable message)
{
Object object = evaluate(message);
return object!=null && object==Boolean.TRUE;
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java Mon Jul 20 19:05:05 2009
@@ -43,7 +43,7 @@
this.xpath = xpath;
}
- public boolean evaluate(Filterable m) throws AMQException
+ public boolean evaluate(Filterable m)
{
// TODO - we would have to check the content type and then evaluate the content
// here... is this really a feature we wish to implement? - RobG
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java Mon Jul 20 19:05:05 2009
@@ -1,11 +1,8 @@
package org.apache.qpid.server.flow;
-import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.message.ServerMessage;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.Set;
-import java.util.HashSet;
/*
*
@@ -52,7 +49,7 @@
return _bytesCredit.get() > 0L;
}
- public boolean useCreditForMessage(AMQMessage msg)
+ public boolean useCreditForMessage(ServerMessage msg)
{
final long msgSize = msg.getSize();
if(hasCredit())
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java Mon Jul 20 19:05:05 2009
@@ -1,6 +1,7 @@
package org.apache.qpid.server.flow;
import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.message.ServerMessage;
/*
*
@@ -40,5 +41,5 @@
public boolean hasCredit();
- public boolean useCreditForMessage(AMQMessage msg);
+ public boolean useCreditForMessage(ServerMessage msg);
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java Mon Jul 20 19:05:05 2009
@@ -1,6 +1,6 @@
package org.apache.qpid.server.flow;
-import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.message.ServerMessage;
/*
*
@@ -37,7 +37,7 @@
return true;
}
- public boolean useCreditForMessage(AMQMessage msg)
+ public boolean useCreditForMessage(ServerMessage msg)
{
return true;
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java Mon Jul 20 19:05:05 2009
@@ -1,8 +1,6 @@
package org.apache.qpid.server.flow;
-import org.apache.qpid.server.queue.AMQMessage;
-
-import java.util.concurrent.atomic.AtomicLong;
+import org.apache.qpid.server.message.ServerMessage;
/*
*
@@ -54,7 +52,7 @@
return (_messageCredit > 0L) && ( _bytesCredit > 0L );
}
- public synchronized boolean useCreditForMessage(AMQMessage msg)
+ public synchronized boolean useCreditForMessage(ServerMessage msg)
{
if(_messageCredit == 0L)
{
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java Mon Jul 20 19:05:05 2009
@@ -1,6 +1,6 @@
package org.apache.qpid.server.flow;
-import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.message.ServerMessage;
import java.util.concurrent.atomic.AtomicLong;
@@ -50,7 +50,7 @@
return _messageCredit.get() > 0L;
}
- public boolean useCreditForMessage(AMQMessage msg)
+ public boolean useCreditForMessage(ServerMessage msg)
{
if(hasCredit())
{
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java Mon Jul 20 19:05:05 2009
@@ -20,7 +20,7 @@
*/
package org.apache.qpid.server.flow;
-import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.message.ServerMessage;
public class Pre0_10CreditManager extends AbstractFlowCreditManager implements FlowCreditManager
{
@@ -123,7 +123,7 @@
&& (_messageCreditLimit == 0L || _messageCredit > 0);
}
- public synchronized boolean useCreditForMessage(final AMQMessage msg)
+ public synchronized boolean useCreditForMessage(final ServerMessage msg)
{
if(_messageCreditLimit != 0L)
{
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java Mon Jul 20 19:05:05 2009
@@ -41,6 +41,7 @@
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.queue.SimpleAMQQueue;
+import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.security.access.Permission;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
@@ -130,8 +131,16 @@
throws AMQException
{
singleMessageCredit.useCreditForMessage(entry.getMessage());
- session.getProtocolOutputConverter().writeGetOk(entry.getMessage(), channel.getChannelId(),
- deliveryTag, queue.getMessageCount());
+ if(entry.getMessage() instanceof AMQMessage)
+ {
+ session.getProtocolOutputConverter().writeGetOk((AMQMessage)(entry.getMessage()), channel.getChannelId(),
+ deliveryTag, queue.getMessageCount());
+ }
+ else
+ {
+ //TODO Convert AMQP 0-10 message
+ throw new RuntimeException("Not implemented conversion of 0-10 message");
+ }
}
};
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java Mon Jul 20 19:05:05 2009
@@ -87,7 +87,7 @@
return;
}
- if (!message.getMessage().isReferenced())
+ if (message.getMessage() == null)
{
_logger.warn("Message as already been purged, unable to Reject.");
return;
@@ -96,7 +96,7 @@
if (_logger.isDebugEnabled())
{
- _logger.debug("Rejecting: DT:" + deliveryTag + "-" + message.getMessage().debugIdentity() +
+ _logger.debug("Rejecting: DT:" + deliveryTag + "-" + message.getMessage() +
": Requeue:" + body.getRequeue() +
//": Resend:" + evt.getMethod().resend +
" on channel:" + channel.debugIdentity());
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java Mon Jul 20 19:05:05 2009
@@ -26,6 +26,7 @@
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
@@ -61,7 +62,7 @@
{
throw body.getChannelNotFoundException(channelId);
}
-
+ StoreContext.setCurrentContext(channel.getStoreContext());
channel.commit();
MethodRegistry methodRegistry = session.getMethodRegistry();
@@ -74,5 +75,9 @@
{
throw body.getChannelException(e.getErrorCode(), "Failed to commit: " + e.getMessage());
}
+ finally
+ {
+ StoreContext.clearCurrentContext();
+ }
}
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java Mon Jul 20 19:05:05 2009
@@ -36,8 +36,6 @@
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.AMQException;
-import org.apache.mina.common.ByteBuffer;
-
import java.util.Iterator;
public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java Mon Jul 20 19:05:05 2009
@@ -1,25 +1,25 @@
package org.apache.qpid.server.output.amqp0_9;
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
import org.apache.mina.common.ByteBuffer;
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Mon Jul 20 19:05:05 2009
@@ -33,6 +33,7 @@
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.txn.TransactionalContext;
+import org.apache.qpid.server.message.*;
import java.util.Iterator;
@@ -41,12 +42,12 @@
/**
* A deliverable message.
*/
-public class AMQMessage implements Filterable<AMQException>
+public class AMQMessage implements Filterable, ServerMessage
{
/** Used for debugging purposes. */
private static final Logger _log = Logger.getLogger(AMQMessage.class);
- private final AtomicInteger _referenceCount = new AtomicInteger(1);
+ private final AtomicInteger _referenceCount = new AtomicInteger(0);
private final AMQMessageHandle _messageHandle;
@@ -72,7 +73,7 @@
private AMQProtocolSession.ProtocolSessionIdentifier _sessionIdentifier;
private static final byte IMMEDIATE_AND_DELIVERED = (byte) (IMMEDIATE | DELIVERED_TO_CONSUMER);
-
+ private final AMQMessageHeader _messageHeader;
/**
@@ -202,6 +203,7 @@
_messageHandle = factory.createMessageHandle(messageId, store, true);
_storeContext = txnConext.getStoreContext();
_size = _messageHandle.getBodySize(txnConext.getStoreContext());
+ _messageHeader = new ContentHeaderBodyAdapter(_messageHandle.getContentHeaderBody(txnConext.getStoreContext()));
}
/**
@@ -221,6 +223,7 @@
{
_messageHandle = messageHandle;
_storeContext = storeConext;
+ _messageHeader = new ContentHeaderBodyAdapter(_messageHandle.getContentHeaderBody(storeConext));
if(info.isImmediate())
{
@@ -234,6 +237,7 @@
protected AMQMessage(AMQMessage msg) throws AMQException
{
_messageHandle = msg._messageHandle;
+ _messageHeader = msg._messageHeader;
_storeContext = msg._storeContext;
_flags = msg._flags;
_size = msg._size;
@@ -315,12 +319,11 @@
* Threadsafe. This will decrement the reference count and when it reaches zero will remove the message from the
* message store.
*
- * @param storeContext
*
* @throws MessageCleanupException when an attempt was made to remove the message from the message store and that
* failed
*/
- public void decrementReference(StoreContext storeContext) throws MessageCleanupException
+ public void decrementReference() throws MessageCleanupException
{
int count = _referenceCount.decrementAndGet();
@@ -342,13 +345,12 @@
// and the handle has not yet been constructed
if (_messageHandle != null)
{
- _messageHandle.removeMessage(storeContext);
+ _messageHandle.removeMessage(StoreContext.getCurrentContext());
}
}
catch (AMQException e)
{
- // to maintain consistency, we revert the count
- incrementReference();
+
throw new MessageCleanupException(getMessageId(), e);
}
}
@@ -373,7 +375,18 @@
return (_flags & DELIVERED_TO_CONSUMER) != 0;
}
- public boolean isPersistent() throws AMQException
+ public String getRoutingKey()
+ {
+ // TODO
+ return null;
+ }
+
+ public AMQMessageHeader getMessageHeader()
+ {
+ return _messageHeader;
+ }
+
+ public boolean isPersistent()
{
return _messageHandle.isPersistent();
}
@@ -455,6 +468,26 @@
}
+ public boolean isImmediate()
+ {
+ return (_flags & IMMEDIATE) == IMMEDIATE;
+ }
+
+ public long getExpiration()
+ {
+ return _expiration;
+ }
+
+ public MessageReference newReference()
+ {
+ return new AMQMessageReference(this);
+ }
+
+ public Long getMessageNumber()
+ {
+ return getMessageId();
+ }
+
public Object getPublisherClientInstance()
{
//todo store sessionIdentifier/client id with message in store
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Mon Jul 20 19:05:05 2009
@@ -20,15 +20,13 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.commons.configuration.CompositeConfiguration;
-import org.apache.commons.configuration.Configuration;
import org.apache.qpid.server.management.Managable;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.configuration.QueueConfiguration;
import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
@@ -88,7 +86,7 @@
int delete() throws AMQException;
- QueueEntry enqueue(StoreContext storeContext, AMQMessage message) throws AMQException;
+ QueueEntry enqueue(ServerMessage message) throws AMQException;
void requeue(StoreContext storeContext, QueueEntry entry) throws AMQException;
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java Mon Jul 20 19:05:05 2009
@@ -36,6 +36,7 @@
import org.apache.qpid.server.management.AMQManagedObject;
import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.message.ServerMessage;
import javax.management.JMException;
import javax.management.MBeanException;
@@ -246,7 +247,7 @@
/**
* Checks if there is any notification to be send to the listeners
*/
- public void checkForNotification(AMQMessage msg) throws AMQException
+ public void checkForNotification(ServerMessage msg) throws AMQException
{
final Set<NotificationCheck> notificationChecks = _queue.getNotificationChecks();
@@ -333,48 +334,60 @@
throw new OperationsException("AMQMessage with message id = " + msgId + " is not in the " + _queueName);
}
- AMQMessage msg = entry.getMessage();
- // get message content
- Iterator<ContentChunk> cBodies = msg.getContentBodyIterator();
- List<Byte> msgContent = new ArrayList<Byte>();
- while (cBodies.hasNext())
+ ServerMessage serverMsg = entry.getMessage();
+
+ if(serverMsg instanceof AMQMessage)
{
- ContentChunk body = cBodies.next();
- if (body.getSize() != 0)
+ AMQMessage msg = (AMQMessage) serverMsg;
+ // get message content
+ Iterator<ContentChunk> cBodies = msg.getContentBodyIterator();
+ List<Byte> msgContent = new ArrayList<Byte>();
+ while (cBodies.hasNext())
{
+ ContentChunk body = cBodies.next();
if (body.getSize() != 0)
{
- ByteBuffer slice = body.getData().slice();
- for (int j = 0; j < slice.limit(); j++)
+ if (body.getSize() != 0)
{
- msgContent.add(slice.get());
+ ByteBuffer slice = body.getData().slice();
+ for (int j = 0; j < slice.limit(); j++)
+ {
+ msgContent.add(slice.get());
+ }
}
}
}
- }
- try
- {
- // Create header attributes list
- CommonContentHeaderProperties headerProperties =
- (CommonContentHeaderProperties) msg.getContentHeaderBody().properties;
- String mimeType = null, encoding = null;
- if (headerProperties != null)
+
+ try
{
- AMQShortString mimeTypeShortSting = headerProperties.getContentType();
- mimeType = (mimeTypeShortSting == null) ? null : mimeTypeShortSting.toString();
- encoding = (headerProperties.getEncoding() == null) ? "" : headerProperties.getEncoding().toString();
- }
+ // Create header attributes list
+ CommonContentHeaderProperties headerProperties =
+ (CommonContentHeaderProperties) msg.getContentHeaderBody().properties;
+ String mimeType = null, encoding = null;
+ if (headerProperties != null)
+ {
+ AMQShortString mimeTypeShortSting = headerProperties.getContentType();
+ mimeType = (mimeTypeShortSting == null) ? null : mimeTypeShortSting.toString();
+ encoding = (headerProperties.getEncoding() == null) ? "" : headerProperties.getEncoding().toString();
+ }
- Object[] itemValues = { msgId, mimeType, encoding, msgContent.toArray(new Byte[0]) };
+ Object[] itemValues = { msgId, mimeType, encoding, msgContent.toArray(new Byte[0]) };
+
+ return new CompositeDataSupport(_msgContentType, VIEW_MSG_CONTENT_COMPOSITE_ITEM_NAMES, itemValues);
+ }
+ catch (AMQException e)
+ {
+ JMException jme = new JMException("Error creating header attributes list: " + e);
+ jme.initCause(e);
+ throw jme;
+ }
- return new CompositeDataSupport(_msgContentType, VIEW_MSG_CONTENT_COMPOSITE_ITEM_NAMES, itemValues);
}
- catch (AMQException e)
+ else
{
- JMException jme = new JMException("Error creating header attributes list: " + e);
- jme.initCause(e);
- throw jme;
+ // TODO 0-10 Messages for MBean
+ return null;
}
}
@@ -398,13 +411,21 @@
for (int i = beginIndex; (i <= endIndex) && (i <= list.size()); i++)
{
long position = i;
- AMQMessage msg = list.get(i - 1).getMessage();
- ContentHeaderBody headerBody = msg.getContentHeaderBody();
- // Create header attributes list
- String[] headerAttributes = getMessageHeaderProperties(headerBody);
- Object[] itemValues = { msg.getMessageId(), headerAttributes, headerBody.bodySize, msg.isRedelivered(), position};
- CompositeData messageData = new CompositeDataSupport(_messageDataType, VIEW_MSGS_COMPOSITE_ITEM_NAMES, itemValues);
- _messageList.put(messageData);
+ ServerMessage serverMsg = list.get(i - 1).getMessage();
+ if(serverMsg instanceof AMQMessage)
+ {
+ AMQMessage msg = (AMQMessage) serverMsg;
+ ContentHeaderBody headerBody = msg.getContentHeaderBody();
+ // Create header attributes list
+ String[] headerAttributes = getMessageHeaderProperties(headerBody);
+ Object[] itemValues = { msg.getMessageId(), headerAttributes, headerBody.bodySize, msg.isRedelivered(), position };
+ CompositeData messageData = new CompositeDataSupport(_messageDataType, VIEW_MSGS_COMPOSITE_ITEM_NAMES, itemValues);
+ _messageList.put(messageData);
+ }
+ else
+ {
+ // TODO 0-10 Message
+ }
}
}
catch (AMQException e)
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java Mon Jul 20 19:05:05 2009
@@ -68,4 +68,9 @@
{
return _queueMap.values();
}
+
+ public AMQQueue getQueue(String queue)
+ {
+ return getQueue(new AMQShortString(queue));
+ }
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Filterable.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Filterable.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Filterable.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Filterable.java Mon Jul 20 19:05:05 2009
@@ -22,12 +22,13 @@
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.message.AMQMessageHeader;
-public interface Filterable<E extends Exception>
+public interface Filterable
{
- ContentHeaderBody getContentHeaderBody() throws E;
+ AMQMessageHeader getMessageHeader();
- boolean isPersistent() throws E;
+ boolean isPersistent();
boolean isRedelivered();
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java Mon Jul 20 19:05:05 2009
@@ -28,16 +28,20 @@
import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.exchange.NoRouteException;
import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.ContentHeaderBodyAdapter;
+import org.apache.qpid.server.message.AMQMessageReference;
import org.apache.qpid.AMQException;
import org.apache.log4j.Logger;
import java.util.ArrayList;
-import java.util.Collection;
-public class IncomingMessage implements Filterable<RuntimeException>
+public class IncomingMessage implements Filterable, InboundMessage
{
/** Used for debugging purposes. */
@@ -73,6 +77,7 @@
private long _expiration;
private Exchange _exchange;
+ private AMQMessageHeader _messageHeader;
public IncomingMessage(final Long messageId,
@@ -90,6 +95,7 @@
public void setContentHeaderBody(final ContentHeaderBody contentHeaderBody) throws AMQException
{
_contentHeaderBody = contentHeaderBody;
+ _messageHeader = new ContentHeaderBodyAdapter(contentHeaderBody);
}
public void setExpiration()
@@ -158,17 +164,19 @@
}
AMQMessage message = null;
+ AMQMessageReference ref = null;
try
{
// first we allow the handle to know that the message has been fully received. This is useful if it is
// maintaining any calculated values based on content chunks
_messageHandle.setPublishAndContentHeaderBody(_txnContext.getStoreContext(),
- _messagePublishInfo, getContentHeaderBody());
+ _messagePublishInfo, getContentHeader());
message = new AMQMessage(_messageHandle,_txnContext.getStoreContext(), _messagePublishInfo);
+ ref = (AMQMessageReference) message.newReference();
message.setExpiration(_expiration);
message.setClientIdentifier(_publisher.getSessionIdentifier());
@@ -177,8 +185,8 @@
// now that it has all been received, before we attempt delivery
_txnContext.messageFullyReceived(isPersistent());
- AMQShortString userID = getContentHeaderBody().properties instanceof BasicContentHeaderProperties ?
- ((BasicContentHeaderProperties) getContentHeaderBody().properties).getUserId() : null;
+ AMQShortString userID = getContentHeader().properties instanceof BasicContentHeaderProperties ?
+ ((BasicContentHeaderProperties) getContentHeader().properties).getUserId() : null;
if (MSG_AUTH && !_publisher.getAuthorizedID().getName().equals(userID == null? "" : userID.toString()))
{
@@ -202,7 +210,7 @@
{
int offset;
final int queueCount = _destinationQueues.size();
- message.incrementReference(queueCount);
+
if(queueCount == 1)
{
offset = 0;
@@ -233,7 +241,8 @@
finally
{
// Remove refence for routing process . Reference count should now == delivered queue count
- if(message != null) message.decrementReference(_txnContext.getStoreContext());
+
+ if(ref != null) ref.release();
}
}
@@ -250,40 +259,51 @@
public boolean allContentReceived()
{
- return (_bodyLengthReceived == getContentHeaderBody().bodySize);
+ return (_bodyLengthReceived == getContentHeader().bodySize);
}
- public AMQShortString getExchange() throws AMQException
+ public AMQShortString getExchange()
{
return _messagePublishInfo.getExchange();
}
- public AMQShortString getRoutingKey() throws AMQException
+ public String getRoutingKey()
{
- return _messagePublishInfo.getRoutingKey();
+ return _messagePublishInfo.getRoutingKey() == null ? null : _messagePublishInfo.getRoutingKey().toString();
}
- public boolean isMandatory() throws AMQException
+ public String getBinding()
+ {
+ return _messagePublishInfo.getRoutingKey() == null ? null : _messagePublishInfo.getRoutingKey().toString();
+ }
+
+
+ public boolean isMandatory()
{
return _messagePublishInfo.isMandatory();
}
- public boolean isImmediate() throws AMQException
+ public boolean isImmediate()
{
return _messagePublishInfo.isImmediate();
}
- public ContentHeaderBody getContentHeaderBody()
+ public ContentHeaderBody getContentHeader()
{
return _contentHeaderBody;
}
+ public AMQMessageHeader getMessageHeader()
+ {
+ return _messageHeader;
+ }
+
public boolean isPersistent()
{
- return getContentHeaderBody().properties instanceof BasicContentHeaderProperties &&
- ((BasicContentHeaderProperties) getContentHeaderBody().properties).getDeliveryMode() ==
+ return getContentHeader().properties instanceof BasicContentHeaderProperties &&
+ ((BasicContentHeaderProperties) getContentHeader().properties).getDeliveryMode() ==
BasicContentHeaderProperties.PERSISTENT;
}
@@ -292,6 +312,11 @@
return false;
}
+ public long getSize()
+ {
+ return getContentHeader().bodySize;
+ }
+
public void setMessageStore(final MessageStore messageStore)
{
_messageStore = messageStore;
@@ -309,7 +334,8 @@
public void route() throws AMQException
{
- _exchange.route(this);
+ enqueue(_exchange.route(this));
+
}
public void enqueue(final ArrayList<AMQQueue> queues)
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java Mon Jul 20 19:05:05 2009
@@ -22,6 +22,7 @@
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.message.ServerMessage;
/**
* NoConsumersException is a {@link RequiredDeliveryException} that represents the failure case where an immediate
@@ -35,7 +36,7 @@
*/
public class NoConsumersException extends RequiredDeliveryException
{
- public NoConsumersException(AMQMessage message)
+ public NoConsumersException(ServerMessage message)
{
super("Immediate delivery is not possible.", message);
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java Mon Jul 20 19:05:05 2009
@@ -21,13 +21,14 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.message.ServerMessage;
public enum NotificationCheck
{
MESSAGE_COUNT_ALERT
{
- boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
+ boolean notifyIfNecessary(ServerMessage msg, AMQQueue queue, QueueNotificationListener listener)
{
int msgCount;
final long maximumMessageCount = queue.getMaximumMessageCount();
@@ -41,26 +42,19 @@
},
MESSAGE_SIZE_ALERT(true)
{
- boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
+ boolean notifyIfNecessary(ServerMessage msg, AMQQueue queue, QueueNotificationListener listener)
{
final long maximumMessageSize = queue.getMaximumMessageSize();
if(maximumMessageSize != 0)
{
// Check for threshold message size
long messageSize;
- try
- {
- messageSize = (msg == null) ? 0 : msg.getContentHeaderBody().bodySize;
- }
- catch (AMQException e)
- {
- messageSize = 0;
- }
+ messageSize = (msg == null) ? 0 : msg.getSize();
if (messageSize >= maximumMessageSize)
{
- listener.notifyClients(this, queue, messageSize + "b : Maximum message size threshold ("+ maximumMessageSize +") breached. [Message ID=" + msg.getMessageId() + "]");
+ listener.notifyClients(this, queue, messageSize + "b : Maximum message size threshold ("+ maximumMessageSize +") breached. [Message ID=" + msg.getMessageNumber() + "]");
return true;
}
}
@@ -70,7 +64,7 @@
},
QUEUE_DEPTH_ALERT
{
- boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
+ boolean notifyIfNecessary(ServerMessage msg, AMQQueue queue, QueueNotificationListener listener)
{
// Check for threshold queue depth in bytes
final long maximumQueueDepth = queue.getMaximumQueueDepth();
@@ -91,7 +85,7 @@
},
MESSAGE_AGE_ALERT
{
- boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
+ boolean notifyIfNecessary(ServerMessage msg, AMQQueue queue, QueueNotificationListener listener)
{
final long maxMessageAge = queue.getMaximumMessageAge();
@@ -133,6 +127,6 @@
return _messageSpecific;
}
- abstract boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener);
+ abstract boolean notifyIfNecessary(ServerMessage msg, AMQQueue queue, QueueNotificationListener listener);
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java Mon Jul 20 19:05:05 2009
@@ -22,6 +22,7 @@
import org.apache.qpid.framing.CommonContentHeaderProperties;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.message.ServerMessage;
public class PriorityQueueList implements QueueEntryList
{
@@ -52,26 +53,18 @@
return _queue;
}
- public QueueEntry add(AMQMessage message)
+ public QueueEntry add(ServerMessage message)
{
- try
+ int index = message.getMessageHeader().getPriority() - _priorityOffset;
+ if(index >= _priorities)
{
- int index = ((CommonContentHeaderProperties)((message.getContentHeaderBody().properties))).getPriority() - _priorityOffset;
- if(index >= _priorities)
- {
- index = _priorities-1;
- }
- else if(index < 0)
- {
- index = 0;
- }
- return _priorityLists[index].add(message);
+ index = _priorities-1;
}
- catch (AMQException e)
+ else if(index < 0)
{
- // TODO - fix AMQ Exception
- throw new RuntimeException(e);
+ index = 0;
}
+ return _priorityLists[index].add(message);
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java Mon Jul 20 19:05:05 2009
@@ -3,6 +3,7 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.message.ServerMessage;
/*
*
@@ -133,7 +134,7 @@
AMQQueue getQueue();
- AMQMessage getMessage();
+ ServerMessage getMessage();
long getSize();
@@ -155,8 +156,6 @@
void release();
- String debugIdentity();
-
boolean immediateAndNotDelivered();
void setRedelivered(boolean b);
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Mon Jul 20 19:05:05 2009
@@ -23,6 +23,8 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.message.MessageReference;
import org.apache.log4j.Logger;
import java.util.Set;
@@ -42,7 +44,7 @@
private final SimpleQueueEntryList _queueEntryList;
- private AMQMessage _message;
+ private MessageReference _message;
private Set<Subscription> _rejectedBy = null;
@@ -75,6 +77,8 @@
private volatile long _entryId;
volatile QueueEntryImpl _next;
+ private boolean _deliveredToConsumer;
+ private boolean _redelivered;
QueueEntryImpl(SimpleQueueEntryList queueEntryList)
@@ -84,18 +88,18 @@
}
- public QueueEntryImpl(SimpleQueueEntryList queueEntryList, AMQMessage message, final long entryId)
+ public QueueEntryImpl(SimpleQueueEntryList queueEntryList, ServerMessage message, final long entryId)
{
_queueEntryList = queueEntryList;
- _message = message;
+ _message = message == null ? null : message.newReference();
_entryIdUpdater.set(this, entryId);
}
- public QueueEntryImpl(SimpleQueueEntryList queueEntryList, AMQMessage message)
+ public QueueEntryImpl(SimpleQueueEntryList queueEntryList, ServerMessage message)
{
_queueEntryList = queueEntryList;
- _message = message;
+ _message = message == null ? null : message.newReference();
}
protected void setEntryId(long entryId)
@@ -113,9 +117,9 @@
return _queueEntryList.getQueue();
}
- public AMQMessage getMessage()
+ public ServerMessage getMessage()
{
- return _message;
+ return _message == null ? null : _message.getMessage();
}
public long getSize()
@@ -125,12 +129,21 @@
public boolean getDeliveredToConsumer()
{
- return getMessage().getDeliveredToConsumer();
+ return _deliveredToConsumer;
}
public boolean expired() throws AMQException
{
- return getMessage().expired(getQueue());
+ long expiration = getMessage().getExpiration();
+ if (expiration != 0L)
+ {
+ long now = System.currentTimeMillis();
+
+ return (now > expiration);
+ }
+
+ return false;
+
}
public boolean isAcquired()
@@ -167,7 +180,7 @@
public void setDeliveredToSubscription()
{
- getMessage().setDeliveredToConsumer();
+ _deliveredToConsumer = true;
}
public void release()
@@ -175,20 +188,15 @@
_stateUpdater.set(this,AVAILABLE_STATE);
}
- public String debugIdentity()
- {
- return getMessage().debugIdentity();
- }
-
public boolean immediateAndNotDelivered()
{
- return _message.immediateAndNotDelivered();
+ return getMessage().isImmediate() && !_deliveredToConsumer;
}
public void setRedelivered(boolean b)
{
- getMessage().setRedelivered(b);
+ _redelivered = b;
}
public Subscription getDeliveredSubscription()
@@ -223,7 +231,7 @@
}
else
{
- _log.warn("Requesting rejection by null subscriber:" + debugIdentity());
+ _log.warn("Requesting rejection by null subscriber:" + this);
}
}
@@ -284,7 +292,9 @@
{
if(delete())
{
- getMessage().decrementReference(storeContext);
+ StoreContext sc = StoreContext.setCurrentContext(storeContext);
+ _message.release();
+ StoreContext.setCurrentContext(sc);
}
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java Mon Jul 20 19:05:05 2009
@@ -20,11 +20,13 @@
*/
package org.apache.qpid.server.queue;
+import org.apache.qpid.server.message.ServerMessage;
+
public interface QueueEntryList
{
AMQQueue getQueue();
- QueueEntry add(AMQMessage message);
+ QueueEntry add(ServerMessage message);
QueueEntry next(QueueEntry node);
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java Mon Jul 20 19:05:05 2009
@@ -40,4 +40,5 @@
Collection<AMQQueue> getQueues();
+ AMQQueue getQueue(String queue);
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Mon Jul 20 19:05:05 2009
@@ -29,6 +29,8 @@
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionList;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.message.ServerMessage;
/*
*
@@ -319,7 +321,7 @@
// ------ Enqueue / Dequeue
- public QueueEntry enqueue(StoreContext storeContext, AMQMessage message) throws AMQException
+ public QueueEntry enqueue(ServerMessage message) throws AMQException
{
incrementQueueCount();
@@ -406,8 +408,10 @@
}
}
+
if (entry.immediateAndNotDelivered())
{
+ StoreContext storeContext = StoreContext.getCurrentContext();
dequeue(storeContext, entry);
entry.dispose(storeContext);
}
@@ -462,7 +466,7 @@
// Simple Queues don't :-)
}
- private void incrementQueueSize(final AMQMessage message)
+ private void incrementQueueSize(final ServerMessage message)
{
getAtomicQueueSize().addAndGet(message.getSize());
}
@@ -573,10 +577,10 @@
try
{
- AMQMessage msg = entry.getMessage();
+ ServerMessage msg = entry.getMessage();
if (msg.isPersistent())
{
- _virtualHost.getMessageStore().dequeueMessage(storeContext, this, msg.getMessageId());
+ _virtualHost.getMessageStore().dequeueMessage(storeContext, this, msg.getMessageNumber());
}
//entry.dispose(storeContext);
@@ -767,7 +771,7 @@
public boolean accept(QueueEntry entry)
{
- final long messageId = entry.getMessage().getMessageId();
+ final long messageId = entry.getMessage().getMessageNumber();
return messageId >= fromMessageId && messageId <= toMessageId;
}
@@ -786,7 +790,7 @@
public boolean accept(QueueEntry entry)
{
- _complete = entry.getMessage().getMessageId() == messageId;
+ _complete = entry.getMessage().getMessageNumber() == messageId;
return _complete;
}
@@ -828,7 +832,7 @@
public boolean accept(QueueEntry entry)
{
- final long messageId = entry.getMessage().getMessageId();
+ final long messageId = entry.getMessage().getMessageNumber();
return (messageId >= fromMessageId)
&& (messageId <= toMessageId)
&& entry.acquire();
@@ -847,11 +851,11 @@
// Move the messages in on the message store.
for (QueueEntry entry : entries)
{
- AMQMessage message = entry.getMessage();
+ ServerMessage message = entry.getMessage();
if (message.isPersistent() && toQueue.isDurable())
{
- store.enqueueMessage(storeContext, toQueue, message.getMessageId());
+ store.enqueueMessage(storeContext, toQueue, message.getMessageNumber());
}
// dequeue does not decrement the refence count
entry.dequeue(storeContext);
@@ -882,9 +886,11 @@
try
{
+ StoreContext.setCurrentContext(storeContext);
+
for (QueueEntry entry : entries)
{
- toQueue.enqueue(storeContext, entry.getMessage());
+ toQueue.enqueue(entry.getMessage());
entry.delete();
}
}
@@ -896,6 +902,11 @@
{
throw new RuntimeException(e);
}
+ finally
+ {
+ StoreContext.clearCurrentContext();
+
+ }
}
@@ -912,17 +923,9 @@
public boolean accept(QueueEntry entry)
{
- final long messageId = entry.getMessage().getMessageId();
- if ((messageId >= fromMessageId)
- && (messageId <= toMessageId))
- {
- if (!entry.isDeleted())
- {
- return entry.getMessage().incrementReference();
- }
- }
-
- return false;
+ final long messageId = entry.getMessage().getMessageNumber();
+ return ((messageId >= fromMessageId)
+ && (messageId <= toMessageId));
}
public boolean filterComplete()
@@ -938,11 +941,15 @@
// Move the messages in on the message store.
for (QueueEntry entry : entries)
{
- AMQMessage message = entry.getMessage();
+ ServerMessage message = entry.getMessage();
- if (message.isReferenced() && message.isPersistent() && toQueue.isDurable())
+ if (message.isPersistent() && toQueue.isDurable())
{
- store.enqueueMessage(storeContext, toQueue, message.getMessageId());
+
+ StoreContext sc = StoreContext.setCurrentContext(storeContext);
+ store.enqueueMessage(storeContext, toQueue, message.getMessageNumber());
+ StoreContext.setCurrentContext(sc);
+
}
}
@@ -973,9 +980,11 @@
{
for (QueueEntry entry : entries)
{
- if (entry.getMessage().isReferenced())
+
+ ServerMessage message = entry.getMessage();
+ if (message != null)
{
- toQueue.enqueue(storeContext, entry.getMessage());
+ toQueue.enqueue(entry.getMessage());
}
}
}
@@ -1001,7 +1010,7 @@
{
QueueEntry node = queueListIterator.getNode();
- final long messageId = node.getMessage().getMessageId();
+ final long messageId = node.getMessage().getMessageNumber();
if ((messageId >= fromMessageId)
&& (messageId <= toMessageId)
@@ -1418,7 +1427,7 @@
}
}
- @Override
+
public void checkMessageStatus() throws AMQException
{
@@ -1581,7 +1590,7 @@
for (int i = 0; i < num && !it.atTail(); i++)
{
it.advance();
- ids.add(it.getNode().getMessage().getMessageId());
+ ids.add(it.getNode().getMessage().getMessageNumber());
}
return ids;
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java Mon Jul 20 19:05:05 2009
@@ -1,5 +1,8 @@
package org.apache.qpid.server.queue;
+import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.message.ServerMessage;
+
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
/*
@@ -74,7 +77,7 @@
}
- public QueueEntry add(AMQMessage message)
+ public QueueEntry add(ServerMessage message)
{
QueueEntryImpl node = new QueueEntryImpl(this, message);
for (;;)
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java Mon Jul 20 19:05:05 2009
@@ -1352,7 +1352,10 @@
public void process() throws AMQException
{
- _queue.enqueue(_context, _message);
+ StoreContext.setCurrentContext(_context);
+ _queue.enqueue(_message);
+ StoreContext.clearCurrentContext();
+
}
@@ -1414,7 +1417,7 @@
if(message != null)
{
- message.incrementReference();
+// message.incrementReference();
}
else
{
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java Mon Jul 20 19:05:05 2009
@@ -32,9 +32,12 @@
{
private static final Logger _logger = Logger.getLogger(StoreContext.class);
+ private static final ThreadLocal<StoreContext> _threadLocalContext = new ThreadLocal<StoreContext>();
+
private String _name;
private Object _payload;
+
public StoreContext()
{
_name = "StoreContext";
@@ -68,4 +71,24 @@
{
return "<_name = " + _name + ", _payload = " + _payload + ">";
}
+
+
+ public static StoreContext setCurrentContext(StoreContext context)
+ {
+ StoreContext sc = getCurrentContext();
+ _threadLocalContext.set(context);
+ return sc;
+ }
+
+ public static StoreContext getCurrentContext()
+ {
+ return _threadLocalContext.get();
+ }
+
+ public static void clearCurrentContext()
+ {
+ _threadLocalContext.set(null);
+ }
+
+
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java Mon Jul 20 19:05:05 2009
@@ -32,8 +32,10 @@
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.flow.FlowCreditManager;
import org.apache.qpid.server.filter.FilterManager;
@@ -377,16 +379,19 @@
{
if (_logger.isDebugEnabled())
{
- _logger.debug("Subscription:" + debugIdentity() + " rejected message:" + entry.debugIdentity());
+ _logger.debug("Subscription:" + this + " rejected message:" + entry);
}
// return false;
}
if (_noLocal)
{
- //todo - client id should be recoreded so we don't have to handle
+
+ AMQMessage message = (AMQMessage) entry.getMessage();
+
+ //todo - client id should be recorded so we don't have to handle
// the case where this is null.
- final Object publisherId = entry.getMessage().getPublisherClientInstance();
+ final Object publisherId = message.getPublisherClientInstance();
// We don't want local messages so check to see if message is one we sent
Object localInstance;
@@ -404,8 +409,8 @@
localInstance = getProtocolSession().getClientIdentifier();
- //todo - client id should be recoreded so we don't have to do the null check
- if (localInstance != null && localInstance.equals(entry.getMessage().getPublisherIdentifier()))
+ //todo - client id should be recorded so we don't have to do the null check
+ if (localInstance != null && localInstance.equals(message.getPublisherIdentifier()))
{
return false;
}
@@ -417,7 +422,7 @@
if (_logger.isDebugEnabled())
{
- _logger.debug("(" + debugIdentity() + ") checking filters for message (" + entry.debugIdentity());
+ _logger.debug("(" + this + ") checking filters for message (" + entry);
}
return checkFilters(entry);
Added: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java?rev=795958&view=auto
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java (added)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java Mon Jul 20 19:05:05 2009
@@ -0,0 +1,65 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.transport;
+
+import org.apache.qpid.transport.Connection;
+import org.apache.qpid.transport.ConnectionDelegate;
+import org.apache.qpid.transport.Method;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+public class ServerConnection extends Connection
+{
+ @Override
+ protected void invoke(Method method)
+ {
+ super.invoke(method);
+ }
+
+ @Override
+ protected void setState(State state)
+ {
+ super.setState(state);
+ }
+
+ @Override
+ public ServerConnectionDelegate getConnectionDelegate()
+ {
+ return (ServerConnectionDelegate) super.getConnectionDelegate();
+ }
+
+ public void setConnectionDelegate(ServerConnectionDelegate delegate)
+ {
+ super.setConnectionDelegate(delegate);
+ }
+
+ private VirtualHost _virtualHost;
+
+
+ public VirtualHost getVirtualHost()
+ {
+ return _virtualHost;
+ }
+
+ public void setVirtualHost(VirtualHost virtualHost)
+ {
+ _virtualHost = virtualHost;
+ }
+}
Added: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java?rev=795958&view=auto
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java (added)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java Mon Jul 20 19:05:05 2009
@@ -0,0 +1,122 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.transport;
+
+import org.apache.qpid.transport.*;
+
+import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import javax.security.sasl.SaslServer;
+import javax.security.sasl.SaslException;
+import java.util.*;
+
+
+public class ServerConnectionDelegate extends ServerDelegate
+{
+
+ private String _localFQDN;
+ private final IApplicationRegistry _appRegistry;
+
+
+ public ServerConnectionDelegate(IApplicationRegistry appRegistry,
+ String localFQDN)
+ {
+ this(Collections.EMPTY_MAP, Collections.singletonList((Object)"en_US"), appRegistry, localFQDN);
+ }
+
+
+ public ServerConnectionDelegate(Map<String, Object> properties,
+ List<Object> locales,
+ IApplicationRegistry appRegistry,
+ String localFQDN)
+ {
+ super(properties, parseToList(appRegistry.getAuthenticationManager().getMechanisms()), locales);
+ _appRegistry = appRegistry;
+ _localFQDN = localFQDN;
+ }
+
+ private static List<Object> parseToList(String mechanisms)
+ {
+ List<Object> list = new ArrayList<Object>();
+ StringTokenizer tokenizer = new StringTokenizer(mechanisms, " ");
+ while(tokenizer.hasMoreTokens())
+ {
+ list.add(tokenizer.nextToken());
+ }
+ return list;
+ }
+
+ @Override public ServerSession getSession(Connection conn, SessionAttach atc)
+ {
+
+ SessionDelegate serverSessionDelegate = new ServerSessionDelegate(_appRegistry);
+
+ ServerSession ssn = new ServerSession(conn, serverSessionDelegate, new Binary(atc.getName()), 0);
+ //ssn.setSessionListener(new Echo());
+ return ssn;
+ }
+
+
+
+
+ @Override
+ protected SaslServer createSaslServer(String mechanism) throws SaslException
+ {
+ return _appRegistry.getAuthenticationManager().createSaslServer(mechanism, _localFQDN);
+
+ }
+
+
+ @Override public void connectionOpen(Connection conn, ConnectionOpen open)
+ {
+ ServerConnection sconn = (ServerConnection) conn;
+
+ VirtualHost vhost;
+ String vhostName;
+ if(open.hasVirtualHost())
+ {
+ vhostName = open.getVirtualHost();
+ }
+ else
+ {
+ vhostName = "";
+ }
+ vhost = _appRegistry.getVirtualHostRegistry().getVirtualHost(vhostName);
+
+ if(vhost != null)
+ {
+ sconn.setVirtualHost(vhost);
+
+ sconn.invoke(new ConnectionOpenOk(Collections.EMPTY_LIST));
+
+ sconn.setState(Connection.State.OPEN);
+ }
+ else
+ {
+ sconn.invoke(new ConnectionClose(ConnectionCloseCode.INVALID_PATH, "Unknown vistrulhost '"+vhostName+"'"));
+ sconn.setState(Connection.State.CLOSING);
+ }
+
+ }
+}
Added: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java?rev=795958&view=auto
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java (added)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java Mon Jul 20 19:05:05 2009
@@ -0,0 +1,60 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.transport;
+
+import org.apache.qpid.transport.*;
+import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.AMQException;
+
+import java.util.ArrayList;
+
+public class ServerSession extends Session
+{
+ ServerSession(Connection connection, Binary name, long expiry)
+ {
+ super(connection, name, expiry);
+ }
+
+ ServerSession(Connection connection, SessionDelegate delegate, Binary name, long expiry)
+ {
+ super(connection, delegate, name, expiry);
+ }
+
+ public void enqueue(ServerMessage message, ArrayList<AMQQueue> queues)
+ {
+ // TODO Txn
+
+ try
+ {
+ for(AMQQueue q : queues)
+ {
+ q.enqueue(message);
+ }
+ }
+ catch (AMQException e)
+ {
+ // TODO
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ }
+}
Added: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java?rev=795958&view=auto
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java (added)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java Mon Jul 20 19:05:05 2009
@@ -0,0 +1,402 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.transport;
+
+import org.apache.qpid.transport.*;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.message.MessageTransferMessage;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.AMQException;
+
+import java.util.ArrayList;
+
+public class ServerSessionDelegate extends SessionDelegate
+{
+ private final IApplicationRegistry _appRegistry;
+
+ public ServerSessionDelegate(IApplicationRegistry appRegistry)
+ {
+ _appRegistry = appRegistry;
+ }
+
+ @Override
+ public void messageAccept(Session session, MessageAccept method)
+ {
+ super.messageAccept(session, method);
+ }
+
+ @Override
+ public void messageReject(Session session, MessageReject method)
+ {
+ super.messageReject(session, method);
+ }
+
+ @Override
+ public void messageRelease(Session session, MessageRelease method)
+ {
+ super.messageRelease(session, method);
+ }
+
+ @Override
+ public void messageAcquire(Session session, MessageAcquire method)
+ {
+ super.messageAcquire(session, method);
+ }
+
+ @Override
+ public void messageResume(Session session, MessageResume method)
+ {
+ super.messageResume(session, method);
+ }
+
+ @Override
+ public void messageSubscribe(Session session, MessageSubscribe method)
+ {
+ super.messageSubscribe(session, method);
+ }
+
+
+ @Override
+ public void messageTransfer(Session ssn, MessageTransfer xfr)
+ {
+ ExchangeRegistry exchangeRegistry = getExchangeRegistry(ssn);
+ Exchange exchange;
+ if(xfr.hasDestination())
+ {
+ exchange = exchangeRegistry.getExchange(xfr.getDestination());
+ }
+ else
+ {
+ exchange = exchangeRegistry.getDefaultExchange();
+ }
+
+ MessageTransferMessage message = new MessageTransferMessage(xfr);
+ try
+ {
+ ArrayList<AMQQueue> queues = exchange.route(message);
+
+ ((ServerSession) ssn).enqueue(message, queues);
+
+
+ System.out.println(queues);
+
+ ssn.processed(xfr);
+ }
+ catch (AMQException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+
+
+ super.messageTransfer(ssn, xfr); //To change body of overridden methods use File | Settings | File Templates.
+ }
+
+ @Override
+ public void messageCancel(Session session, MessageCancel method)
+ {
+ super.messageCancel(session, method);
+ }
+
+ @Override
+ public void messageFlush(Session session, MessageFlush method)
+ {
+ super.messageFlush(session, method);
+ }
+
+ @Override
+ public void txSelect(Session session, TxSelect method)
+ {
+ super.txSelect(session, method);
+ }
+
+ @Override
+ public void txCommit(Session session, TxCommit method)
+ {
+ super.txCommit(session, method);
+ }
+
+ @Override
+ public void txRollback(Session session, TxRollback method)
+ {
+ super.txRollback(session, method);
+ }
+
+ @Override
+ public void dtxSelect(Session session, DtxSelect method)
+ {
+ super.dtxSelect(session, method);
+ }
+
+ @Override
+ public void dtxStart(Session session, DtxStart method)
+ {
+ super.dtxStart(session, method);
+ }
+
+ @Override
+ public void dtxEnd(Session session, DtxEnd method)
+ {
+ super.dtxEnd(session, method);
+ }
+
+ @Override
+ public void dtxCommit(Session session, DtxCommit method)
+ {
+ super.dtxCommit(session, method);
+ }
+
+ @Override
+ public void dtxForget(Session session, DtxForget method)
+ {
+ super.dtxForget(session, method);
+ }
+
+ @Override
+ public void dtxGetTimeout(Session session, DtxGetTimeout method)
+ {
+ super.dtxGetTimeout(session, method);
+ }
+
+ @Override
+ public void dtxPrepare(Session session, DtxPrepare method)
+ {
+ super.dtxPrepare(session, method);
+ }
+
+ @Override
+ public void dtxRecover(Session session, DtxRecover method)
+ {
+ super.dtxRecover(session, method);
+ }
+
+ @Override
+ public void dtxRollback(Session session, DtxRollback method)
+ {
+ super.dtxRollback(session, method);
+ }
+
+ @Override
+ public void dtxSetTimeout(Session session, DtxSetTimeout method)
+ {
+ super.dtxSetTimeout(session, method);
+ }
+
+ @Override
+ public void exchangeDeclare(Session session, ExchangeDeclare method)
+ {
+ String exchangeName = method.getExchange();
+
+ Exchange exchange = getExchange(session, exchangeName);
+
+ if(method.getPassive())
+ {
+ if(exchange == null)
+ {
+ ExecutionException ex = new ExecutionException();
+ ex.setErrorCode(ExecutionErrorCode.NOT_FOUND);
+ ex.setCommandId(method.getId());
+
+ ex.setDescription("not-found: exchange-name '"+exchangeName+"'");
+
+ session.invoke(ex);
+ session.close();
+ }
+
+ }
+ else
+ {
+ // TODO
+ }
+ super.exchangeDeclare(session, method);
+ }
+
+ private Exchange getExchange(Session session, String exchangeName)
+ {
+ ExchangeRegistry exchangeRegistry = getExchangeRegistry(session);
+ return exchangeRegistry.getExchange(exchangeName);
+ }
+
+ private ExchangeRegistry getExchangeRegistry(Session session)
+ {
+ VirtualHost virtualHost = getVirtualHost(session);
+ return virtualHost.getExchangeRegistry();
+
+ }
+
+ private VirtualHost getVirtualHost(Session session)
+ {
+ ServerConnection conn = getServerConnection(session);
+ VirtualHost vhost = conn.getVirtualHost();
+ return vhost;
+ }
+
+ private ServerConnection getServerConnection(Session session)
+ {
+ ServerConnection conn = (ServerConnection) session.getConnection();
+ return conn;
+ }
+
+ @Override
+ public void exchangeDelete(Session session, ExchangeDelete method)
+ {
+ super.exchangeDelete(session, method);
+ }
+
+ @Override
+ public void exchangeQuery(Session session, ExchangeQuery method)
+ {
+ super.exchangeQuery(session, method);
+
+ }
+
+ @Override
+ public void exchangeBind(Session session, ExchangeBind method)
+ {
+ super.exchangeBind(session, method);
+ }
+
+ @Override
+ public void exchangeUnbind(Session session, ExchangeUnbind method)
+ {
+ super.exchangeUnbind(session, method);
+ }
+
+ @Override
+ public void exchangeBound(Session session, ExchangeBound method)
+ {
+
+
+ ExchangeBoundResult result = new ExchangeBoundResult();
+ if(method.hasExchange())
+ {
+ Exchange exchange = getExchange(session, method.getExchange());
+
+ if(exchange == null)
+ {
+ result.setExchangeNotFound(true);
+ }
+
+ if(method.hasQueue())
+ {
+
+ AMQQueue queue = getQueue(session, method.getQueue());
+ if(queue == null)
+ {
+ result.setQueueNotFound(true);
+ }
+
+ if(exchange != null && queue != null)
+ {
+
+ if(method.hasBindingKey())
+ {
+
+ if(method.hasArguments())
+ {
+ // TODO
+ }
+ result.setKeyNotMatched(!exchange.isBound(method.getBindingKey(), queue));
+
+ }
+
+ result.setQueueNotMatched(!exchange.isBound(queue));
+
+ }
+ }
+ else if(exchange != null && method.hasBindingKey())
+ {
+ if(method.hasArguments())
+ {
+ // TODO
+ }
+ result.setKeyNotMatched(!exchange.isBound(method.getBindingKey()));
+
+ }
+
+ }
+ else if(method.hasQueue())
+ {
+ AMQQueue queue = getQueue(session, method.getQueue());
+ if(queue == null)
+ {
+ result.setQueueNotFound(true);
+ }
+ else
+ {
+ if(method.hasBindingKey())
+ {
+ if(method.hasArguments())
+ {
+ // TODO
+ }
+
+ // TODO
+ }
+ }
+
+ }
+
+
+ session.executionResult((int) method.getId(), result);
+ super.exchangeBound(session, method);
+ }
+
+ private AMQQueue getQueue(Session session, String queue)
+ {
+ QueueRegistry queueRegistry = getQueueRegistry(session);
+ return queueRegistry.getQueue(queue);
+ }
+
+ private QueueRegistry getQueueRegistry(Session session)
+ {
+ return getVirtualHost(session).getQueueRegistry();
+ }
+
+ @Override
+ public void queueDeclare(Session session, QueueDeclare method)
+ {
+ super.queueDeclare(session, method);
+ }
+
+ @Override
+ public void queueDelete(Session session, QueueDelete method)
+ {
+ super.queueDelete(session, method);
+ }
+
+ @Override
+ public void queuePurge(Session session, QueuePurge method)
+ {
+ super.queuePurge(session, method);
+ }
+
+ @Override
+ public void queueQuery(Session session, QueueQuery method)
+ {
+ super.queueQuery(session, method);
+ }
+}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org