You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2009/10/20 18:23:11 UTC
svn commit: r827724 [6/8] - in /qpid/branches/java-broker-0-10/qpid/java: ./
broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/
broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/
broker/src/main/java/org/apach...
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLog.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLog.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLog.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLog.java Tue Oct 20 16:23:01 2009
@@ -21,74 +21,67 @@
package org.apache.qpid.server.store;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.AMQException;
+import org.apache.commons.configuration.Configuration;
public interface TransactionLog
{
- /**
- * Places a message onto a specified queue, in a given transactional context.
- *
- * @param context The transactional context for the operation.
- * @param queue The queue to place the message on.
- * @param messageId The message to enqueue.
- * @throws org.apache.qpid.AMQException If the operation fails for any reason.
- */
- void enqueueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException;
-
- /**
- * Extracts a message from a specified queue, in a given transactional context.
- *
- * @param context The transactional context for the operation.
- * @param queue The queue to place the message on.
- * @param messageId The message to dequeue.
- * @throws org.apache.qpid.AMQException If the operation fails for any reason, or if the specified message does not exist.
- */
- void dequeueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException;
-
- /**
- * Begins a transactional context.
- *
- * @param context The transactional context to begin.
- *
- * @throws org.apache.qpid.AMQException If the operation fails for any reason.
- */
- void beginTran(StoreContext context) throws AMQException;
-
- /**
- * Commits all operations performed within a given transactional context.
- *
- * @param context The transactional context to commit all operations for.
- *
- * @throws org.apache.qpid.AMQException If the operation fails for any reason.
- */
- void commitTran(StoreContext context) throws AMQException;
-
- /**
- * Commits all operations performed within a given transactional context.
- *
- * @param context The transactional context to commit all operations for.
- *
- * @throws org.apache.qpid.AMQException If the operation fails for any reason.
- */
- StoreFuture commitTranAsync(StoreContext context) throws AMQException;
-
- /**
- * Abandons all operations performed within a given transactional context.
- *
- * @param context The transactional context to abandon.
- *
- * @throws org.apache.qpid.AMQException If the operation fails for any reason.
- */
- void abortTran(StoreContext context) throws AMQException;
-
- /**
- * Tests a transactional context to see if it has been begun but not yet committed or aborted.
- *
- * @param context The transactional context to test.
- *
- * @return <tt>true</tt> if the transactional context is live, <tt>false</tt> otherwise.
- */
- boolean inTran(StoreContext context);
+
+ public static interface Transaction
+ {
+ /**
+ * Places a message onto a specified queue, in a given transactional context.
+ *
+ * @param queue The queue to place the message on.
+ * @param messageId The message to enqueue.
+ * @throws org.apache.qpid.AMQException If the operation fails for any reason.
+ */
+ void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQException;
+
+ /**
+ * Extracts a message from a specified queue, in a given transactional context.
+ *
+ * @param queue The queue to place the message on.
+ * @param messageId The message to dequeue.
+ * @throws org.apache.qpid.AMQException If the operation fails for any reason, or if the specified message does not exist.
+ */
+ void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQException;
+
+
+ /**
+ * Commits all operations performed within a given transactional context.
+ *
+ * @throws org.apache.qpid.AMQException If the operation fails for any reason.
+ */
+ void commitTran() throws AMQException;
+
+ /**
+ * Commits all operations performed within a given transactional context.
+ *
+ * @throws org.apache.qpid.AMQException If the operation fails for any reason.
+ */
+ StoreFuture commitTranAsync() throws AMQException;
+
+ /**
+ * Abandons all operations performed within a given transactional context.
+ *
+ * @throws org.apache.qpid.AMQException If the operation fails for any reason.
+ */
+ void abortTran() throws AMQException;
+
+
+
+ }
+
+ public void configureTransactionLog(String name,
+ TransactionLogRecoveryHandler recoveryHandler,
+ Configuration storeConfiguration,
+ LogSubject logSubject) throws Exception;
+
+ Transaction newTransaction();
+
+
public static interface StoreFuture
{
Added: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java?rev=827724&view=auto
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java (added)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java Tue Oct 20 16:23:01 2009
@@ -0,0 +1,33 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.store;
+
+public interface TransactionLogRecoveryHandler
+{
+ QueueEntryRecoveryHandler begin(TransactionLog log);
+
+ public static interface QueueEntryRecoveryHandler
+ {
+ void queueEntry(String queuename, long messageId);
+
+ void completeQueueEntryRecovery();
+ }
+}
Propchange: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java
------------------------------------------------------------------------------
svn:executable = *
Added: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java?rev=827724&view=auto
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java (added)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java Tue Oct 20 16:23:01 2009
@@ -0,0 +1,26 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.store;
+
+public interface TransactionLogResource
+{
+ public String getResourceName();
+}
Propchange: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java
------------------------------------------------------------------------------
svn:executable = *
Added: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ExplicitAcceptDispositionChangeListener.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ExplicitAcceptDispositionChangeListener.java?rev=827724&view=auto
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ExplicitAcceptDispositionChangeListener.java (added)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ExplicitAcceptDispositionChangeListener.java Tue Oct 20 16:23:01 2009
@@ -0,0 +1,93 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.subscription;
+
+import org.apache.qpid.server.transport.ServerSession;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.log4j.Logger;
+
+
+class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDispositionChangeListener
+{
+ private static final Logger _logger = Logger.getLogger(ExplicitAcceptDispositionChangeListener.class);
+
+
+ private final QueueEntry _entry;
+ private final Subscription_0_10 _sub;
+
+ public ExplicitAcceptDispositionChangeListener(QueueEntry entry, Subscription_0_10 subscription_0_10)
+ {
+ _entry = entry;
+ _sub = subscription_0_10;
+ }
+
+ public void onAccept()
+ {
+ final Subscription_0_10 subscription = getSubscription();
+ if(subscription != null && _entry.isAcquiredBy(_sub))
+ {
+ subscription.getSession().acknowledge(subscription, _entry);
+ }
+ else
+ {
+ _logger.warn("MessageAccept received for message which has not been acquired (likely client error)");
+ }
+
+ }
+
+ public void onRelease()
+ {
+ final Subscription_0_10 subscription = getSubscription();
+ if(subscription != null && _entry.isAcquiredBy(_sub))
+ {
+ subscription.release(_entry);
+ }
+ else
+ {
+ _logger.warn("MessageRelease received for message which has not been acquired (likely client error)");
+ }
+ }
+
+ public void onReject()
+ {
+ final Subscription_0_10 subscription = getSubscription();
+ if(subscription != null && _entry.isAcquiredBy(_sub))
+ {
+ subscription.reject(_entry);
+ }
+ else
+ {
+ _logger.warn("MessageReject received for message which has not been acquired (likely client error)");
+ }
+
+ }
+
+ public boolean acquire()
+ {
+ return _entry.acquire(getSubscription());
+ }
+
+
+ private Subscription_0_10 getSubscription()
+ {
+ return _sub;
+ }
+}
Propchange: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ExplicitAcceptDispositionChangeListener.java
------------------------------------------------------------------------------
svn:executable = *
Added: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ImplicitAcceptDispositionChangeListener.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ImplicitAcceptDispositionChangeListener.java?rev=827724&view=auto
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ImplicitAcceptDispositionChangeListener.java (added)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ImplicitAcceptDispositionChangeListener.java Tue Oct 20 16:23:01 2009
@@ -0,0 +1,86 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.subscription;
+
+import org.apache.qpid.server.transport.ServerSession;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.log4j.Logger;
+
+class ImplicitAcceptDispositionChangeListener implements ServerSession.MessageDispositionChangeListener
+{
+ private static final Logger _logger = Logger.getLogger(ImplicitAcceptDispositionChangeListener.class);
+
+
+ private final QueueEntry _entry;
+ private Subscription_0_10 _sub;
+
+ public ImplicitAcceptDispositionChangeListener(QueueEntry entry, Subscription_0_10 subscription_0_10)
+ {
+ _entry = entry;
+ _sub = subscription_0_10;
+ }
+
+ public void onAccept()
+ {
+ _logger.warn("MessageAccept received for message which is using NONE as the accept mode (likely client error)");
+ }
+
+ public void onRelease()
+ {
+ if(_entry.isAcquiredBy(_sub))
+ {
+ getSubscription().release(_entry);
+ }
+ else
+ {
+ _logger.warn("MessageRelease received for message which has not been acquired (likely client error)");
+ }
+ }
+
+ public void onReject()
+ {
+ if(_entry.isAcquiredBy(_sub))
+ {
+ getSubscription().reject(_entry);
+ }
+ else
+ {
+ _logger.warn("MessageReject received for message which has not been acquired (likely client error)");
+ }
+
+ }
+
+ public boolean acquire()
+ {
+ boolean acquired = _entry.acquire(getSubscription());
+ //TODO - why acknowledge here??? seems bizarre...
+ // getSubscription().getSession().acknowledge(getSubscription(), _entry);
+ return acquired;
+
+ }
+
+ public Subscription_0_10 getSubscription()
+ {
+ return _sub;
+ }
+
+
+}
Propchange: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ImplicitAcceptDispositionChangeListener.java
------------------------------------------------------------------------------
svn:executable = *
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/MessageAcceptCompletionListener.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/MessageAcceptCompletionListener.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/MessageAcceptCompletionListener.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/MessageAcceptCompletionListener.java Tue Oct 20 16:23:01 2009
@@ -43,11 +43,15 @@
public void onComplete(Method method)
{
- _session.acknowledge(_sub, _entry);
if(_restoreCredit)
{
_sub.restoreCredit(_entry);
}
- _session.removeDispositionListener(method);
+ if(_entry.isAcquiredBy(_sub))
+ {
+ _session.acknowledge(_sub, _entry);
+ }
+
+ _session.removeDispositionListener(method);
}
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java Tue Oct 20 16:23:01 2009
@@ -51,6 +51,8 @@
void setQueue(AMQQueue queue, boolean exclusive);
+ void setNoLocal(boolean noLocal);
+
AMQShortString getConsumerTag();
long getSubscriptionID();
@@ -71,7 +73,7 @@
void send(QueueEntry msg) throws AMQException;
- void queueDeleted(AMQQueue queue);
+ void queueDeleted(AMQQueue queue);
boolean wouldSuspend(QueueEntry msg);
@@ -97,5 +99,8 @@
void confirmAutoClose();
+ public void set(String key, Object value);
+
+ public Object get(String key);
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java Tue Oct 20 16:23:01 2009
@@ -25,6 +25,8 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.Map;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
@@ -33,6 +35,7 @@
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.message.AMQMessage;
import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.SubscriptionActor;
@@ -42,7 +45,6 @@
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.flow.FlowCreditManager;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.FilterManagerFactory;
@@ -74,6 +76,8 @@
private final QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this);
private final QueueEntry.SubscriptionAssignedState _assignedState = new QueueEntry.SubscriptionAssignedState(this);
+ private final Map<String, Object> _properties = new ConcurrentHashMap<String, Object>();
+
private final Lock _stateChangeLock;
private static final AtomicLong idGenerator = new AtomicLong(0);
@@ -254,7 +258,7 @@
private final AMQShortString _consumerTag;
- private final boolean _noLocal;
+ private boolean _noLocal;
private final FlowCreditManager _creditManager;
@@ -410,11 +414,7 @@
public boolean hasInterest(QueueEntry entry)
{
- // TODO 0-10 to 0-8 conversion
- if(!(entry.getMessage() instanceof AMQMessage))
- {
- return false;
- }
+
//check that the message hasn't been rejected
@@ -667,5 +667,21 @@
return !isBrowser();
}
+ public void set(String key, Object value)
+ {
+ _properties.put(key, value);
+ }
+
+ public Object get(String key)
+ {
+ return _properties.get(key);
+ }
+
+
+ public void setNoLocal(boolean noLocal)
+ {
+ _noLocal = noLocal;
+ }
+
abstract boolean isBrowser();
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java Tue Oct 20 16:23:01 2009
@@ -29,14 +29,18 @@
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.SubscriptionActor;
-import org.apache.qpid.server.logging.messages.SubscriptionMessages;
import org.apache.qpid.server.logging.subjects.SubscriptionLogSubject;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.message.MessageTransferMessage;
+import org.apache.qpid.server.message.AMQMessage;
import org.apache.qpid.server.transport.ServerSession;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ContentHeaderProperties;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.AMQTypedValue;
import org.apache.qpid.AMQException;
import org.apache.qpid.transport.*;
@@ -47,6 +51,9 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.ConcurrentHashMap;
import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.nio.ByteBuffer;
public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCreditManagerListener
{
@@ -90,6 +97,7 @@
private LogSubject _logSubject;
private LogActor _logActor;
+ private Map<String, Object> _properties = new ConcurrentHashMap<String, Object>();
public Subscription_0_10(ServerSession session, String destination, MessageAcceptMode acceptMode,
@@ -111,6 +119,11 @@
}
+ public void setNoLocal(boolean noLocal)
+ {
+ _noLocal = noLocal;
+ }
+
public AMQQueue getQueue()
{
return _queue;
@@ -135,7 +148,7 @@
_queue = queue;
_logSubject = new SubscriptionLogSubject(this);
_logActor = new SubscriptionActor(CurrentActor.get().getRootMessageLogger(), this);
-
+
}
public AMQShortString getConsumerTag()
@@ -151,11 +164,7 @@
public boolean hasInterest(QueueEntry entry)
{
- //TODO 0-8/9 to 0-10 conversion
- if(!(entry.getMessage() instanceof MessageTransferMessage))
- {
- return false;
- }
+
//check that the message hasn't been rejected
if (entry.isRejectedBy(this))
@@ -261,70 +270,164 @@
}
+ private class AddMessageDispositionListnerAction implements Runnable
+ {
+ public MessageTransfer _xfr;
+ public ServerSession.MessageDispositionChangeListener _action;
+
+ public void run()
+ {
+ _session.onMessageDispositionChange(_xfr, _action);
+ }
+ }
+
+ private final AddMessageDispositionListnerAction _postIdSettingAction = new AddMessageDispositionListnerAction();
+
public void send(final QueueEntry entry) throws AMQException
{
ServerMessage serverMsg = entry.getMessage();
- MessageTransferMessage msg = (MessageTransferMessage) serverMsg;
+ MessageTransfer xfr;
-
- Struct[] headers;
- if(msg.getHeader() == null)
+ if(serverMsg instanceof MessageTransferMessage)
{
- headers = EMPTY_STRUCT_ARRAY;
- }
- else
- {
- headers = msg.getHeader().getStructs();
- }
- ArrayList<Struct> newHeaders = new ArrayList<Struct>(headers.length);
- DeliveryProperties origDeliveryProps = null;
- for(Struct header : headers)
- {
- if(header instanceof DeliveryProperties)
+ MessageTransferMessage msg = (MessageTransferMessage) serverMsg;
+
+
+ Struct[] headers;
+ if(msg.getHeader() == null)
{
- origDeliveryProps = (DeliveryProperties) header;
+ headers = EMPTY_STRUCT_ARRAY;
}
else
{
- newHeaders.add(header);
+ headers = msg.getHeader().getStructs();
}
- }
- DeliveryProperties deliveryProps = new DeliveryProperties();
- if(origDeliveryProps != null)
- {
- if(origDeliveryProps.hasDeliveryMode())
+ ArrayList<Struct> newHeaders = new ArrayList<Struct>(headers.length);
+ DeliveryProperties origDeliveryProps = null;
+ for(Struct header : headers)
{
- deliveryProps.setDeliveryMode(origDeliveryProps.getDeliveryMode());
+ if(header instanceof DeliveryProperties)
+ {
+ origDeliveryProps = (DeliveryProperties) header;
+ }
+ else
+ {
+ newHeaders.add(header);
+ }
}
- if(origDeliveryProps.hasExchange())
+
+ DeliveryProperties deliveryProps = new DeliveryProperties();
+ if(origDeliveryProps != null)
{
- deliveryProps.setExchange(origDeliveryProps.getExchange());
+ if(origDeliveryProps.hasDeliveryMode())
+ {
+ deliveryProps.setDeliveryMode(origDeliveryProps.getDeliveryMode());
+ }
+ if(origDeliveryProps.hasExchange())
+ {
+ deliveryProps.setExchange(origDeliveryProps.getExchange());
+ }
+ if(origDeliveryProps.hasExpiration())
+ {
+ deliveryProps.setExpiration(origDeliveryProps.getExpiration());
+ }
+ if(origDeliveryProps.hasPriority())
+ {
+ deliveryProps.setPriority(origDeliveryProps.getPriority());
+ }
+ if(origDeliveryProps.hasRoutingKey())
+ {
+ deliveryProps.setRoutingKey(origDeliveryProps.getRoutingKey());
+ }
+
}
- if(origDeliveryProps.hasExpiration())
+
+ deliveryProps.setRedelivered(entry.isRedelivered());
+
+ newHeaders.add(deliveryProps);
+ Header header = new Header(newHeaders);
+
+ xfr = new MessageTransfer(_destination,_acceptMode,_acquireMode,header,msg.getBody());
+ }
+ else
+ {
+ AMQMessage message_0_8 = (AMQMessage) serverMsg;
+ DeliveryProperties deliveryProps = new DeliveryProperties();
+ MessageProperties messageProps = new MessageProperties();
+
+ int size = (int) message_0_8.getSize();
+ ByteBuffer body = ByteBuffer.allocate(size);
+ message_0_8.getContent(body, 0);
+ body.flip();
+
+ Struct[] headers = new Struct[] { deliveryProps, messageProps };
+
+ BasicContentHeaderProperties properties =
+ (BasicContentHeaderProperties) message_0_8.getContentHeaderBody().properties;
+ final AMQShortString exchange = message_0_8.getMessagePublishInfo().getExchange();
+ if(exchange != null)
{
- deliveryProps.setExpiration(origDeliveryProps.getExpiration());
+ deliveryProps.setExchange(exchange.toString());
}
- if(origDeliveryProps.hasPriority())
+ deliveryProps.setExpiration(message_0_8.getExpiration());
+ deliveryProps.setImmediate(message_0_8.isImmediate());
+ deliveryProps.setPriority(MessageDeliveryPriority.get(properties.getPriority()));
+ deliveryProps.setRedelivered(entry.isRedelivered());
+ deliveryProps.setRoutingKey(message_0_8.getRoutingKey());
+ deliveryProps.setTimestamp(properties.getTimestamp());
+
+ messageProps.setContentEncoding(properties.getEncodingAsString());
+ messageProps.setContentLength(size);
+ if(properties.getAppId() != null)
{
- deliveryProps.setPriority(origDeliveryProps.getPriority());
+ messageProps.setAppId(properties.getAppId().getBytes());
}
- if(origDeliveryProps.hasRoutingKey())
+ messageProps.setContentType(properties.getContentTypeAsString());
+ if(properties.getCorrelationId() != null)
{
- deliveryProps.setRoutingKey(origDeliveryProps.getRoutingKey());
+ messageProps.setCorrelationId(properties.getCorrelationId().getBytes());
}
- }
+ // TODO - ReplyTo
+
+ if(properties.getUserId() != null)
+ {
+ messageProps.setUserId(properties.getUserId().getBytes());
+ }
+
+ final Map<String, Object> appHeaders = new HashMap<String, Object>();
+
+ properties.getHeaders().processOverElements(
+ new FieldTable.FieldTableElementProcessor()
+ {
- deliveryProps.setRedelivered(entry.isRedelivered());
+ public boolean processElement(String propertyName, AMQTypedValue value)
+ {
+ Object val = value.getValue();
+ if(val instanceof AMQShortString)
+ {
+ val = val.toString();
+ }
+ appHeaders.put(propertyName, val);
+ return true;
+ }
- newHeaders.add(deliveryProps);
- Header header = new Header(newHeaders);
+ public Object getResult()
+ {
+ return appHeaders;
+ }
+ });
- MessageTransfer xfr = new MessageTransfer(_destination,_acceptMode,_acquireMode,header,msg.getBody());
+
+ messageProps.setApplicationHeaders(appHeaders);
+
+ Header header = new Header(headers);
+ xfr = new MessageTransfer(_destination,_acceptMode,_acquireMode,header, body);
+ }
if(_acceptMode == MessageAcceptMode.NONE)
{
@@ -342,81 +445,30 @@
}
-
- _session.sendMessage(xfr);
-
+ _postIdSettingAction._xfr = xfr;
if(_acceptMode == MessageAcceptMode.EXPLICIT)
{
- // potential race condition if incomming commands on this session can be processed on a different thread
- // to this one (i.e. the message is only put in the map *after* it has been sent, theoretically we could get
- // acknowledgement back before reaching the next line)
- _session.onMessageDispositionChange(xfr, new ServerSession.MessageDispositionChangeListener()
- {
- public void onAccept()
- {
- _session.acknowledge(Subscription_0_10.this,entry);
- }
-
- public void onRelease()
- {
- release(entry);
- }
-
- public void onReject()
- {
- reject(entry);
- }
-
- public boolean acquire()
- {
- return entry.acquire(Subscription_0_10.this);
- }
- });
+ _postIdSettingAction._action = new ExplicitAcceptDispositionChangeListener(entry, this);
}
else
{
- _session.onMessageDispositionChange(xfr, new ServerSession.MessageDispositionChangeListener()
- {
- public void onAccept()
- {
- // TODO : should log error of explicit accept on non-explicit sub
- }
-
- public void onRelease()
- {
- release(entry);
- }
-
- public void onReject()
- {
- reject(entry);
- }
-
- public boolean acquire()
- {
- boolean acquired = entry.acquire(Subscription_0_10.this);
- //TODO - why acknowledge here??? seems bizarre...
- _session.acknowledge(Subscription_0_10.this,entry);
- return acquired;
-
- }
-
- });
+ _postIdSettingAction._action = new ImplicitAcceptDispositionChangeListener(entry, this);
}
+ _session.sendMessage(xfr, _postIdSettingAction);
}
- private void reject(QueueEntry entry)
+ void reject(QueueEntry entry)
{
- entry.setRedelivered(true);
+ entry.setRedelivered();
entry.routeToAlternate();
}
- private void release(QueueEntry entry)
+ void release(QueueEntry entry)
{
- entry.setRedelivered(true);
+ entry.setRedelivered();
entry.release();
}
@@ -480,6 +532,16 @@
//No such thing in 0-10
}
+ public void set(String key, Object value)
+ {
+ _properties.put(key, value);
+ }
+
+ public Object get(String key)
+ {
+ return _properties.get(key);
+ }
+
public FlowCreditManager_0_10 getCreditManager()
{
@@ -565,8 +627,10 @@
public void acknowledge(QueueEntry entry)
{
// TODO Fix Store Context / cleanup
-
- entry.discard();
+ if(entry.isAcquiredBy(this))
+ {
+ entry.discard();
+ }
}
public void flush() throws AMQException
@@ -585,5 +649,10 @@
return _logActor;
}
+ ServerSession getSession()
+ {
+ return _session;
+ }
+
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java Tue Oct 20 16:23:01 2009
@@ -21,7 +21,6 @@
package org.apache.qpid.server.transport;
import org.apache.qpid.transport.Connection;
-import org.apache.qpid.transport.ConnectionDelegate;
import org.apache.qpid.transport.Method;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -36,7 +35,7 @@
@Override
protected void setState(State state)
{
- super.setState(state);
+ super.setState(state);
}
@Override
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java Tue Oct 20 16:23:01 2009
@@ -22,8 +22,6 @@
import org.apache.qpid.transport.*;
-import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
-import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.virtualhost.VirtualHost;
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java Tue Oct 20 16:23:01 2009
@@ -25,10 +25,10 @@
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.subscription.Subscription_0_10;
-import org.apache.qpid.server.txn.Transaction;
+import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.LocalTransaction;
-import org.apache.qpid.server.PrincipalHolder;
+import org.apache.qpid.server.security.PrincipalHolder;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.AMQException;
@@ -46,7 +46,6 @@
{
private static final String NULL_DESTINTATION = UUID.randomUUID().toString();
-
public static interface MessageDispositionChangeListener
{
public void onAccept();
@@ -69,7 +68,7 @@
private final SortedMap<Integer, MessageDispositionChangeListener> _messageDispositionListenerMap =
new ConcurrentSkipListMap<Integer, MessageDispositionChangeListener>();
- private Transaction _transaction;
+ private ServerTransaction _transaction;
private Principal _principal;
@@ -97,10 +96,16 @@
_reference = new WeakReference(this);
}
+ @Override
+ protected boolean isFull(int id)
+ {
+ return isCommandsFull(id);
+ }
+
public void enqueue(final ServerMessage message, final ArrayList<AMQQueue> queues)
{
- _transaction.enqueue(queues,message, new Transaction.Action()
+ _transaction.enqueue(queues,message, new ServerTransaction.Action()
{
AMQQueue[] _queues = queues.toArray(new AMQQueue[queues.size()]);
@@ -117,6 +122,7 @@
{
// TODO
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ throw new RuntimeException(e);
}
}
}
@@ -130,10 +136,11 @@
}
-
- public void sendMessage(MessageTransfer xfr)
+
+ public void sendMessage(MessageTransfer xfr,
+ Runnable postIdSettingAction)
{
- invoke(xfr);
+ invoke(xfr, postIdSettingAction);
}
public void onMessageDispositionChange(MessageTransfer xfr, MessageDispositionChangeListener acceptListener)
@@ -231,12 +238,12 @@
public void dispositionChange(RangeSet ranges, MessageDispositionAction action)
{
- if(!_messageDispositionListenerMap.isEmpty())
+ if(ranges != null && !_messageDispositionListenerMap.isEmpty())
{
Iterator<Integer> unacceptedMessages = _messageDispositionListenerMap.keySet().iterator();
Iterator<Range> rangeIter = ranges.iterator();
- if(rangeIter.hasNext())
+ if(rangeIter.hasNext())
{
Range range = rangeIter.next();
@@ -266,7 +273,6 @@
}
-
}
}
@@ -287,14 +293,14 @@
for (Task task : _taskList)
{
task.doTask(this);
- }
+ }
}
public void acknowledge(final Subscription_0_10 sub, final QueueEntry entry)
{
_transaction.dequeue(entry.getQueue(), entry.getMessage(),
- new Transaction.Action()
+ new ServerTransaction.Action()
{
public void postCommit()
@@ -307,7 +313,6 @@
entry.release();
}
});
-
}
public Collection<Subscription_0_10> getSubscriptions()
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java Tue Oct 20 16:23:01 2009
@@ -29,15 +29,19 @@
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.message.MessageTransferMessage;
+import org.apache.qpid.server.message.MessageMetaData_0_10;
import org.apache.qpid.server.subscription.Subscription_0_10;
import org.apache.qpid.server.flow.*;
import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQUnknownExchangeType;
import org.apache.qpid.framing.*;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Map;
public class ServerSessionDelegate extends SessionDelegate
{
@@ -205,20 +209,19 @@
}
-
-
- MessageTransferMessage message = new MessageTransferMessage(xfr, ((ServerSession)ssn).getReference());
- final MessageStore store = getVirtualHost(ssn).getMessageStore();
-
- store.storeMessageHeader(message.getMessageNumber(),message);
- store.storeContent(message.getMessageNumber(), 0, xfr.getBody());
-
DeliveryProperties delvProps = null;
- if(message.getHeader() != null && (delvProps = message.getHeader().get(DeliveryProperties.class)) != null && delvProps.hasTtl() && !delvProps.hasExpiration())
+ if(xfr.getHeader() != null && (delvProps = xfr.getHeader().get(DeliveryProperties.class)) != null && delvProps.hasTtl() && !delvProps.hasExpiration())
{
delvProps.setExpiration(System.currentTimeMillis() + delvProps.getTtl());
}
+ MessageMetaData_0_10 messageMetaData = new MessageMetaData_0_10(xfr);
+ final MessageStore store = getVirtualHost(ssn).getMessageStore();
+ StoredMessage<MessageMetaData_0_10> storeMessage = store.addMessage(messageMetaData);
+ storeMessage.addContent(0,xfr.getBody());
+ storeMessage.flushToStore();
+ MessageTransferMessage message = new MessageTransferMessage(storeMessage, ((ServerSession)ssn).getReference());
+
ArrayList<AMQQueue> queues = exchange.route(message);
@@ -267,8 +270,6 @@
ssn.processed(xfr);
-
- super.messageTransfer(ssn, xfr); //To change body of overridden methods use File | Settings | File Templates.
}
@Override
@@ -397,6 +398,11 @@
exchange.setAlternateExchange(alternate);
}
+ if (exchange.isDurable() && !exchange.isAutoDelete())
+ {
+ DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
+ store.createExchange(exchange);
+ }
exchangeRegistry.registerExchange(exchange);
}
@@ -407,7 +413,6 @@
catch (AMQException e)
{
//TODO
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
throw new RuntimeException(e);
}
@@ -431,7 +436,7 @@
ex.setDescription(description);
session.invoke(ex);
- //session.close();
+
}
private Exchange getExchange(Session session, String exchangeName)
@@ -487,6 +492,13 @@
else
{
exchangeRegistry.unregisterExchange(method.getExchange(), method.getIfUnused());
+
+ if (exchange.isDurable() && !exchange.isAutoDelete())
+ {
+ DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
+ store.removeExchange(exchange);
+ }
+
}
}
catch (ExchangeInUseException e)
@@ -496,7 +508,6 @@
catch (AMQException e)
{
// TODO
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
throw new RuntimeException(e);
}
}
@@ -585,6 +596,7 @@
if (!exchange.isBound(routingKey, fieldTable, queue))
{
queue.bind(exchange, routingKey, fieldTable);
+
}
else
{
@@ -607,7 +619,7 @@
@Override
public void exchangeUnbind(Session session, ExchangeUnbind method)
{
- VirtualHost virtualHost = getVirtualHost(session);
+ VirtualHost virtualHost = getVirtualHost(session);
ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
@@ -643,7 +655,6 @@
}
catch (AMQException e)
{
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
throw new RuntimeException(e);
}
}
@@ -761,6 +772,7 @@
{
VirtualHost virtualHost = getVirtualHost(session);
+ DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
String queueName = method.getQueue();
@@ -818,10 +830,35 @@
queue.setAlternateExchange(alternate);
}
+ if(method.hasArguments() && method.getArguments() != null)
+ {
+ if(method.getArguments().containsKey("no-local"))
+ {
+ Object no_local = method.getArguments().get("no-local");
+ if(no_local instanceof Boolean && ((Boolean)no_local))
+ {
+ queue.setNoLocal(true);
+ }
+ }
+ }
+
if (queue.isDurable() && !queue.isAutoDelete())
{
- //store.createQueue(queue, body.getArguments());
+ if(method.hasArguments() && method.getArguments() != null)
+ {
+ Map<String,Object> args = method.getArguments();
+ FieldTable ftArgs = new FieldTable();
+ for(Map.Entry<String, Object> entry : args.entrySet())
+ {
+ ftArgs.put(new AMQShortString(entry.getKey()), entry.getValue());
+ }
+ store.createQueue(queue, ftArgs);
+ }
+ else
+ {
+ store.createQueue(queue);
+ }
}
queueRegistry.registerQueue(queue);
boolean autoRegister = ApplicationRegistry.getInstance().getConfiguration().getQueueAutoRegister();
@@ -853,8 +890,7 @@
}
catch (AMQException e)
{
- //TODO
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ throw new RuntimeException(e);
}
}
};
@@ -896,7 +932,6 @@
}
catch (AMQException e)
{
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
throw new RuntimeException(e);
}
}
@@ -948,7 +983,6 @@
catch (AMQException e)
{
//TODO
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
throw new RuntimeException(e);
}
}
@@ -1018,19 +1052,19 @@
try
{
int purged = queue.delete();
+ if (queue.isDurable() && !queue.isAutoDelete())
+ {
+ DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
+ store.removeQueue(queue);
+ }
+
}
catch (AMQException e)
{
//TODO
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
throw new RuntimeException(e);
}
-
- /* if (queue.isDurable())
- {
- store.removeQueue(queue);
- }*/
}
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java Tue Oct 20 16:23:01 2009
@@ -24,14 +24,13 @@
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.message.EnqueableMessage;
import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.store.TransactionLog;
import org.apache.qpid.AMQException;
import java.util.List;
import java.util.Collection;
-public class AutoCommitTransaction implements Transaction
+public class AutoCommitTransaction implements ServerTransaction
{
private final TransactionLog _transactionLog;
@@ -55,13 +54,11 @@
if(message.isPersistent() && queue.isDurable())
{
- StoreContext context = new StoreContext();
-
- _transactionLog.beginTran(context);
- _transactionLog.dequeueMessage(context, queue, message.getMessageNumber());
+ TransactionLog.Transaction txn = _transactionLog.newTransaction();
+ txn.dequeueMessage(queue, message.getMessageNumber());
// store.remove enqueue
// store.commit
- _transactionLog.commitTran(context);
+ txn.commitTran();
}
postCommitAction.postCommit();
}
@@ -77,7 +74,7 @@
{
try
{
- StoreContext context = null;
+ TransactionLog.Transaction txn = null;
for(QueueEntry entry : ackedMessages)
{
ServerMessage message = entry.getMessage();
@@ -85,18 +82,17 @@
if(message.isPersistent() && queue.isDurable())
{
- if(context == null)
+ if(txn == null)
{
- context = new StoreContext();
- _transactionLog.beginTran(context);
+ txn = _transactionLog.newTransaction();
}
- _transactionLog.dequeueMessage(context, queue, message.getMessageNumber());
+ txn.dequeueMessage(queue, message.getMessageNumber());
}
}
- if(context != null)
+ if(txn != null)
{
- _transactionLog.commitTran(context);
+ txn.commitTran();
}
postCommitAction.postCommit();
}
@@ -115,11 +111,10 @@
{
if(message.isPersistent() && queue.isDurable())
{
- StoreContext context = new StoreContext();
- _transactionLog.beginTran(context);
- _transactionLog.enqueueMessage(context, queue, message.getMessageNumber());
- _transactionLog.commitTran(context);
+ TransactionLog.Transaction txn = _transactionLog.newTransaction();
+ txn.enqueueMessage(queue, message.getMessageNumber());
+ txn.commitTran();
}
postCommitAction.postCommit();
}
@@ -140,19 +135,16 @@
if(message.isPersistent())
{
- StoreContext context = new StoreContext();
-
- _transactionLog.beginTran(context);
-
+ TransactionLog.Transaction txn = _transactionLog.newTransaction();
Long id = message.getMessageNumber();
for(AMQQueue q : queues)
{
if(q.isDurable())
{
- _transactionLog.enqueueMessage(context, q, id);
+ txn.enqueueMessage(q, id);
}
}
- _transactionLog.commitTran(context);
+ txn.commitTran();
}
postCommitAction.postCommit();
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java Tue Oct 20 16:23:01 2009
@@ -4,7 +4,6 @@
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.message.EnqueableMessage;
import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.store.TransactionLog;
import org.apache.qpid.AMQException;
@@ -12,11 +11,11 @@
import java.util.ArrayList;
import java.util.Collection;
-public class LocalTransaction implements Transaction
+public class LocalTransaction implements ServerTransaction
{
private final List<Action> _postCommitActions = new ArrayList<Action>();
- private volatile StoreContext _storeContext;
+ private volatile TransactionLog.Transaction _transaction;
private TransactionLog _transactionLog;
public LocalTransaction(TransactionLog transactionLog)
@@ -37,7 +36,7 @@
{
beginTranIfNecessary();
- _transactionLog.dequeueMessage(_storeContext, queue, message.getMessageNumber());
+ _transaction.dequeueMessage(queue, message.getMessageNumber());
}
catch(AMQException e)
@@ -60,7 +59,7 @@
if(message.isPersistent() && queue.isDurable())
{
beginTranIfNecessary();
- _transactionLog.dequeueMessage(_storeContext, queue, message.getMessageNumber());
+ _transaction.dequeueMessage(queue, message.getMessageNumber());
}
}
@@ -70,10 +69,10 @@
tidyUpOnError(e);
}
_postCommitActions.add(postCommitAction);
-
+
}
- private void tidyUpOnError(AMQException e)
+ private void tidyUpOnError(Exception e)
{
try
{
@@ -86,13 +85,13 @@
{
try
{
- _transactionLog.abortTran(_storeContext);
+ _transaction.abortTran();
}
- catch (AMQException e1)
+ catch (Exception e1)
{
// TODO could try to chain the information to the original error
}
- _storeContext = null;
+ _transaction = null;
_postCommitActions.clear();
}
@@ -101,14 +100,13 @@
private void beginTranIfNecessary()
{
- if(_storeContext == null)
+ if(_transaction == null)
{
- _storeContext = new StoreContext();
try
{
- _transactionLog.beginTran(_storeContext);
+ _transaction = _transactionLog.newTransaction();
}
- catch (AMQException e)
+ catch (Exception e)
{
tidyUpOnError(e);
}
@@ -122,9 +120,9 @@
beginTranIfNecessary();
try
{
- _transactionLog.enqueueMessage(_storeContext, queue, message.getMessageNumber());
+ _transaction.enqueueMessage(queue, message.getMessageNumber());
}
- catch (AMQException e)
+ catch (Exception e)
{
tidyUpOnError(e);
}
@@ -137,10 +135,10 @@
public void enqueue(List<AMQQueue> queues, EnqueableMessage message, Action postCommitAction)
{
-
+
if(message.isPersistent())
{
- if(_storeContext == null)
+ if(_transaction == null)
{
for(AMQQueue queue : queues)
{
@@ -161,12 +159,12 @@
{
if(queue.isDurable())
{
- _transactionLog.enqueueMessage(_storeContext, queue, message.getMessageNumber());
+ _transaction.enqueueMessage(queue, message.getMessageNumber());
}
}
}
- catch (AMQException e)
+ catch (Exception e)
{
tidyUpOnError(e);
}
@@ -180,10 +178,10 @@
{
try
{
- if(_storeContext != null)
+ if(_transaction != null)
{
- _transactionLog.commitTran(_storeContext);
+ _transaction.commitTran();
}
for(Action action : _postCommitActions)
@@ -191,7 +189,7 @@
action.postCommit();
}
}
- catch (AMQException e)
+ catch (Exception e)
{
for(Action action : _postCommitActions)
{
@@ -202,7 +200,7 @@
}
finally
{
- _storeContext = null;
+ _transaction = null;
_postCommitActions.clear();
}
@@ -214,10 +212,10 @@
try
{
- if(_storeContext != null)
+ if(_transaction != null)
{
- _transactionLog.abortTran(_storeContext);
+ _transaction.abortTran();
}
}
catch (AMQException e)
@@ -237,7 +235,7 @@
}
finally
{
- _storeContext = null;
+ _transaction = null;
_postCommitActions.clear();
}
}
Copied: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java (from r824084, qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/Transaction.java)
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java?p2=qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java&p1=qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/Transaction.java&r1=824084&r2=827724&rev=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/Transaction.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java Tue Oct 20 16:23:01 2009
@@ -30,7 +30,7 @@
import java.util.SortedSet;
import java.util.Collection;
-public interface Transaction
+public interface ServerTransaction
{
void addPostCommitAction(Action postCommitAction);
Propchange: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java
------------------------------------------------------------------------------
svn:executable = *
Added: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java?rev=827724&view=auto
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java (added)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java Tue Oct 20 16:23:01 2009
@@ -0,0 +1,62 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.virtualhost;
+
+import org.apache.qpid.server.connection.IConnectionRegistry;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.exchange.ExchangeFactory;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.TransactionLog;
+import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
+import org.apache.qpid.server.security.access.ACLManager;
+import org.apache.qpid.server.management.ManagedObject;
+
+public interface VirtualHost
+{
+ IConnectionRegistry getConnectionRegistry();
+
+ VirtualHostConfiguration getConfiguration();
+
+ String getName();
+
+ QueueRegistry getQueueRegistry();
+
+ ExchangeRegistry getExchangeRegistry();
+
+ ExchangeFactory getExchangeFactory();
+
+ MessageStore getMessageStore();
+
+ TransactionLog getTransactionLog();
+
+ DurableConfigurationStore getDurableConfigurationStore();
+
+ AuthenticationManager getAuthenticationManager();
+
+ ACLManager getAccessManager();
+
+ void close() throws Exception;
+
+ ManagedObject getManagedObject();
+}
Propchange: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
------------------------------------------------------------------------------
svn:executable = *
Added: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java?rev=827724&view=auto
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java (added)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java Tue Oct 20 16:23:01 2009
@@ -0,0 +1,340 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.virtualhost;
+
+import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.MessageStoreRecoveryHandler;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.store.TransactionLogRecoveryHandler;
+import org.apache.qpid.server.store.TransactionLog;
+import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.messages.TransactionLogMessages;
+import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.message.MessageTransferMessage;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.AMQException;
+
+import org.apache.log4j.Logger;
+
+import java.nio.ByteBuffer;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.TreeMap;
+
+public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHandler,
+ ConfigurationRecoveryHandler.QueueRecoveryHandler,
+ ConfigurationRecoveryHandler.ExchangeRecoveryHandler,
+ ConfigurationRecoveryHandler.BindingRecoveryHandler,
+ MessageStoreRecoveryHandler,
+ MessageStoreRecoveryHandler.StoredMessageRecoveryHandler,
+ TransactionLogRecoveryHandler,
+ TransactionLogRecoveryHandler.QueueEntryRecoveryHandler
+{
+ private static final Logger _logger = Logger.getLogger(VirtualHostConfigRecoveryHandler.class);
+
+
+ private final VirtualHost _virtualHost;
+
+ private MessageStoreLogSubject _logSubject;
+ private List<ProcessAction> _actions;
+
+ private MessageStore _store;
+ private TransactionLog _transactionLog;
+
+ private final Map<String, Integer> _queueRecoveries = new TreeMap<String, Integer>();
+ private Map<Long, ServerMessage> _recoveredMessages = new HashMap<Long, ServerMessage>();
+ private Map<Long, StoredMessage> _unusedMessages = new HashMap<Long, StoredMessage>();
+
+
+
+ public VirtualHostConfigRecoveryHandler(VirtualHost virtualHost)
+ {
+ _virtualHost = virtualHost;
+ }
+
+ public QueueRecoveryHandler begin(MessageStore store)
+ {
+ _logSubject = new MessageStoreLogSubject(_virtualHost,store);
+ _store = store;
+ CurrentActor.get().message(_logSubject, TransactionLogMessages.TXN_1004(null, false));
+
+ return this;
+ }
+
+ public void queue(String queueName, String owner, FieldTable arguments)
+ {
+ AMQShortString queueNameShortString = new AMQShortString(queueName);
+
+ AMQQueue q = _virtualHost.getQueueRegistry().getQueue(queueNameShortString);
+
+ if (q == null)
+ {
+ q = AMQQueueFactory.createAMQQueueImpl(queueNameShortString, true, owner == null ? null : new AMQShortString(owner), false, _virtualHost,
+ arguments);
+ _virtualHost.getQueueRegistry().registerQueue(q);
+ }
+
+ CurrentActor.get().message(_logSubject, TransactionLogMessages.TXN_1004(queueName, true));
+
+ //Record that we have a queue for recovery
+ _queueRecoveries.put(queueName, 0);
+ }
+
+ public ExchangeRecoveryHandler completeQueueRecovery()
+ {
+ return this;
+ }
+
+ public void exchange(String exchangeName, String type, boolean autoDelete)
+ {
+ try
+ {
+ Exchange exchange;
+ AMQShortString exchangeNameSS = new AMQShortString(exchangeName);
+ exchange = _virtualHost.getExchangeRegistry().getExchange(exchangeNameSS);
+ if (exchange == null)
+ {
+ exchange = _virtualHost.getExchangeFactory().createExchange(exchangeNameSS, new AMQShortString(type), true, autoDelete, 0);
+ _virtualHost.getExchangeRegistry().registerExchange(exchange);
+ }
+ }
+ catch (AMQException e)
+ {
+ throw new RuntimeException(e);
+ }
+
+ }
+
+ public BindingRecoveryHandler completeExchangeRecovery()
+ {
+ return this;
+ }
+
+ public StoredMessageRecoveryHandler begin()
+ {
+ // TODO - log begin
+ return this;
+ }
+
+ public void message(StoredMessage message)
+ {
+ ServerMessage serverMessage;
+ switch(message.getMetaData().getType())
+ {
+ case META_DATA_0_8:
+ serverMessage = new AMQMessage(message);
+ break;
+ case META_DATA_0_10:
+ serverMessage = new MessageTransferMessage(message, null);
+ break;
+ default:
+ throw new RuntimeException("Unknown message type retreived from store " + message.getMetaData().getClass());
+ }
+
+
+ _recoveredMessages.put(message.getMessageNumber(), serverMessage);
+ _unusedMessages.put(message.getMessageNumber(), message);
+ }
+
+ public void completeMessageRecovery()
+ {
+ //TODO - log end
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public TransactionLogRecoveryHandler.QueueEntryRecoveryHandler begin(TransactionLog log)
+ {
+ _transactionLog = log;
+ return this;
+ }
+
+ private static final class ProcessAction
+ {
+ private final AMQQueue _queue;
+ private final AMQMessage _message;
+
+ public ProcessAction(AMQQueue queue, AMQMessage message)
+ {
+ _queue = queue;
+ _message = message;
+ }
+
+ public void process()
+ {
+ try
+ {
+ _queue.enqueue(_message);
+ }
+ catch(AMQException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ }
+
+ public void binding(String exchangeName, String queueName, String bindingKey, ByteBuffer buf)
+ {
+ _actions = new ArrayList<ProcessAction>();
+ try
+ {
+ QueueRegistry queueRegistry = _virtualHost.getQueueRegistry();
+ Exchange exchange = _virtualHost.getExchangeRegistry().getExchange(exchangeName);
+ AMQQueue queue = queueRegistry.getQueue(new AMQShortString(queueName));
+ if (queue == null)
+ {
+ _logger.error("Unkown queue: " + queueName + " cannot be bound to exchange: "
+ + exchange.getName());
+ }
+ else
+ {
+
+
+ FieldTable argumentsFT = null;
+ if(buf != null)
+ {
+ argumentsFT = new FieldTable(org.apache.mina.common.ByteBuffer.wrap(buf),buf.limit());
+ }
+
+ _logger.info("Restoring binding: (Exchange: " + exchange.getName() + ", Queue: " + queueName
+ + ", Routing Key: " + bindingKey + ", Arguments: " + argumentsFT + ")");
+
+ queue.bind(exchange, bindingKey == null ? null : new AMQShortString(bindingKey), argumentsFT);
+
+ }
+ }
+ catch (AMQException e)
+ {
+ throw new RuntimeException(e);
+ }
+
+ }
+
+ public void completeBindingRecovery()
+ {
+ //return this;
+ }
+
+ public void complete()
+ {
+
+
+ }
+
+ public void queueEntry(final String queueName, long messageId)
+ {
+ AMQShortString queueNameShortString = new AMQShortString(queueName);
+
+ AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(queueNameShortString);
+
+ try
+ {
+ if(queue != null)
+ {
+ ServerMessage message = _recoveredMessages.get(messageId);
+ _unusedMessages.remove(messageId);
+
+ if(message != null)
+ {
+
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("On recovery, delivering " + message.getMessageNumber() + " to " + queue.getName());
+ }
+
+ Integer count = _queueRecoveries.get(queueName);
+ if (count == null)
+ {
+ count = 0;
+ }
+
+ queue.enqueue(message);
+
+ _queueRecoveries.put(queueName, ++count);
+ }
+ else
+ {
+ _logger.warn("Message id " + messageId + " referenced in log as enqueue in queue " + queue.getName() + " is unknwon, entry will be discarded");
+ TransactionLog.Transaction txn = _transactionLog.newTransaction();
+ txn.dequeueMessage(queue, messageId);
+ txn.commitTranAsync();
+ }
+ }
+ else
+ {
+ _logger.warn("Message id " + messageId + " in log references queue " + queueName + " which is not in the configuration, entry will be discarded");
+ TransactionLog.Transaction txn = _transactionLog.newTransaction();
+ TransactionLogResource mockQueue =
+ new TransactionLogResource()
+ {
+
+ public String getResourceName()
+ {
+ return queueName;
+ }
+ };
+ txn.dequeueMessage(mockQueue, messageId);
+ txn.commitTranAsync();
+ }
+
+ }
+ catch(AMQException e)
+ {
+ throw new RuntimeException(e);
+ }
+
+
+
+ }
+
+ public void completeQueueEntryRecovery()
+ {
+
+ for(StoredMessage m : _unusedMessages.values())
+ {
+ _logger.warn("Message id " + m.getMessageNumber() + " in store, but not in any queue - removing....");
+ m.remove();
+ }
+
+ for(Map.Entry<String,Integer> entry : _queueRecoveries.entrySet())
+ {
+ CurrentActor.get().message(_logSubject, TransactionLogMessages.TXN_1005(entry.getValue(), entry.getKey()));
+
+ CurrentActor.get().message(_logSubject, TransactionLogMessages.TXN_1006(entry.getKey(), true));
+ }
+
+ CurrentActor.get().message(_logSubject, TransactionLogMessages.TXN_1006(null, false));
+ }
+
+}
Propchange: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
------------------------------------------------------------------------------
svn:executable = *
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org