You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2015/06/05 13:50:56 UTC
svn commit: r1683719 - in /qpid/java/trunk:
broker-core/src/main/java/org/apache/qpid/server/exchange/
systests/src/test/java/org/apache/qpid/test/unit/client/
test-profiles/python_tests/
Author: kwall
Date: Fri Jun 5 11:50:56 2015
New Revision: 1683719
URL: http://svn.apache.org/r1683719
Log:
QPID-6570: [Java Broker] Change exchange implementation to remove autodelete exchanges on the removal of the last binding
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/client/DynamicQueueExchangeCreateTest.java
qpid/java/trunk/test-profiles/python_tests/Java010PythonExcludes
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java?rev=1683719&r1=1683718&r2=1683719&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java Fri Jun 5 11:50:56 2015
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.exchange;
+import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -37,6 +38,8 @@ import java.util.concurrent.atomic.Atomi
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import javax.security.auth.Subject;
+
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@@ -66,6 +69,7 @@ import org.apache.qpid.server.model.Stat
import org.apache.qpid.server.model.StateTransition;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.security.*;
import org.apache.qpid.server.store.MessageEnqueueRecord;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.txn.ServerTransaction;
@@ -226,6 +230,7 @@ public abstract class AbstractExchange<T
deleted();
}
+ @Override
public String toString()
{
return getClass().getSimpleName() + "[" + getName() +"]";
@@ -667,10 +672,34 @@ public abstract class AbstractExchange<T
{
b.delete();
}
+
+ autoDeleteIfNeccessary();
+
}
}
+ private void autoDeleteIfNeccessary()
+ {
+ if ((getLifetimePolicy() == LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS || getLifetimePolicy() == LifetimePolicy.DELETE_ON_NO_LINKS )
+ && getBindingCount() == 0)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Auto-deleting exchange:" + this);
+ }
+
+ if(getTaskExecutor().isTaskExecutorThread())
+ {
+ deleteAsync();
+ }
+ else
+ {
+ delete();
+ }
+ }
+ }
+
public BindingImpl getBinding(String bindingKey, AMQQueue queue)
{
assert queue != null;
@@ -886,6 +915,7 @@ public abstract class AbstractExchange<T
return Collections.emptySet();
}
+ // Used by the protocol layers
@Override
public boolean deleteBinding(final String bindingKey, final AMQQueue queue)
{
@@ -897,6 +927,7 @@ public abstract class AbstractExchange<T
else
{
binding.delete();
+ autoDeleteIfNeccessary();
return true;
}
}
Modified: qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/client/DynamicQueueExchangeCreateTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/client/DynamicQueueExchangeCreateTest.java?rev=1683719&r1=1683718&r2=1683719&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/client/DynamicQueueExchangeCreateTest.java (original)
+++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/client/DynamicQueueExchangeCreateTest.java Fri Jun 5 11:50:56 2015
@@ -23,6 +23,7 @@ package org.apache.qpid.test.unit.client
import java.io.IOException;
import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQSession;
import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.management.common.mbeans.ManagedExchange;
import org.apache.qpid.protocol.AMQConstant;
@@ -33,12 +34,16 @@ import org.apache.qpid.url.BindingURL;
import javax.jms.Connection;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
public class DynamicQueueExchangeCreateTest extends QpidBrokerTestCase
{
private JMXTestUtils _jmxUtils;
+ private static final String TEST_VHOST = "test";
+
@Override
public void setUp() throws Exception
@@ -115,7 +120,7 @@ public class DynamicQueueExchangeCreateT
}
//verify the exchange was not declared
- String exchangeObjectName = _jmxUtils.getExchangeObjectName("test", exchangeName);
+ String exchangeObjectName = _jmxUtils.getExchangeObjectName(TEST_VHOST, exchangeName);
assertFalse("exchange should not exist", _jmxUtils.doesManagedObjectExist(exchangeObjectName));
}
@@ -138,7 +143,7 @@ public class DynamicQueueExchangeCreateT
session1.close();
//verify the exchange was declared
- String exchangeObjectName = _jmxUtils.getExchangeObjectName("test", exchangeName1);
+ String exchangeObjectName = _jmxUtils.getExchangeObjectName(TEST_VHOST, exchangeName1);
assertTrue("exchange should exist", _jmxUtils.doesManagedObjectExist(exchangeObjectName));
//Now disable the implicit exchange declares and try again
@@ -155,7 +160,7 @@ public class DynamicQueueExchangeCreateT
session2.close();
//verify the exchange was not declared
- String exchangeObjectName2 = _jmxUtils.getExchangeObjectName("test", exchangeName2);
+ String exchangeObjectName2 = _jmxUtils.getExchangeObjectName(TEST_VHOST, exchangeName2);
assertFalse("exchange should not exist", _jmxUtils.doesManagedObjectExist(exchangeObjectName2));
}
@@ -181,6 +186,49 @@ public class DynamicQueueExchangeCreateT
//PASS
}
}
+
+ public void testTemporaryExchangeDeletedWhenLastBindingRemoved() throws Exception
+ {
+
+ Connection connection = getConnection();
+ connection.start();
+ AMQSession session = (AMQSession) connection.createSession(true, Session.SESSION_TRANSACTED);
+
+ final String exchangeName = getTestName() + "_exch";
+ final String queueName = getTestName() + "_queue";
+
+
+ String tmpQueueBoundToTmpExchange = String.format("direct://%s/%s/%s?%s='%b'&%s='%b'",
+ exchangeName,
+ queueName,
+ queueName,
+ BindingURL.OPTION_AUTODELETE,
+ true,
+ BindingURL.OPTION_EXCHANGE_AUTODELETE,
+ true);
+ Queue queue = session.createQueue(tmpQueueBoundToTmpExchange);
+
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ String exchangeObjectName = _jmxUtils.getExchangeObjectName(TEST_VHOST, exchangeName);
+ assertTrue("Exchange " + exchangeName + " should exist", _jmxUtils.doesManagedObjectExist(exchangeObjectName));
+ ManagedExchange exchange = _jmxUtils.getManagedExchange(exchangeName);
+ assertTrue("Exchange " + exchangeName + " should be autodelete", exchange.isAutoDelete());
+
+ sendMessage(session, queue, 1);
+
+ Message message = consumer.receive(1000);
+ session.commit();
+ assertNotNull("Message not received", message);
+
+ // Closing the session will cause the temporary queue to be deleted, causing the
+ // binding to be deleted. This will trigger the auto deleted exchange to be removed too
+ consumer.close();
+
+ assertFalse("Exchange " + exchangeName + " should not longer exist",
+ _jmxUtils.doesManagedObjectExist(exchangeObjectName));
+ }
+
private void checkExceptionErrorCode(JMSException original, AMQConstant code)
{
Exception linked = original.getLinkedException();
@@ -249,7 +297,7 @@ public class DynamicQueueExchangeCreateT
private void verifyDeclaredExchange(String exchangeName, boolean isAutoDelete, boolean isDurable) throws IOException
{
- String exchangeObjectName = _jmxUtils.getExchangeObjectName("test", exchangeName);
+ String exchangeObjectName = _jmxUtils.getExchangeObjectName(TEST_VHOST, exchangeName);
assertTrue("exchange should exist", _jmxUtils.doesManagedObjectExist(exchangeObjectName));
ManagedExchange exchange = _jmxUtils.getManagedExchange(exchangeName);
assertEquals(isAutoDelete, exchange.isAutoDelete());
Modified: qpid/java/trunk/test-profiles/python_tests/Java010PythonExcludes
URL: http://svn.apache.org/viewvc/qpid/java/trunk/test-profiles/python_tests/Java010PythonExcludes?rev=1683719&r1=1683718&r2=1683719&view=diff
==============================================================================
--- qpid/java/trunk/test-profiles/python_tests/Java010PythonExcludes (original)
+++ qpid/java/trunk/test-profiles/python_tests/Java010PythonExcludes Fri Jun 5 11:50:56 2015
@@ -56,8 +56,9 @@ qpid_tests.broker_0_10.priority.Priority
#QPID-6299 broker does not support ring queue on lvq
qpid_tests.broker_0_10.lvq.LVQTests.test_ring_lvq2
-#The broker does not support the autodelete property on exchanges
-qpid_tests.broker_0_10.exchange.AutodeleteTests.testAutodelete*
+#QPID-6572 These tests pass a empty binding key argument, which won't match with the Java Broker's stricter impl.
+qpid_tests.broker_0_10.exchange.AutodeleteTests.testAutodeleteFanout
+qpid_tests.broker_0_10.exchange.AutodeleteTests.testAutodeleteHeaders
# QPID-5531 : Changes to the C++ behaviour in having a default timeout for every transaction not implemented in Java Broker
qpid_tests.broker_0_10.dtx.DtxTests.test_get_timeout
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org