You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/07/25 16:24:37 UTC

svn commit: r1613440 [2/2] - in /qpid/trunk/qpid/java: bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/...

Modified: qpid/trunk/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java?rev=1613440&r1=1613439&r2=1613440&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java Fri Jul 25 14:24:36 2014
@@ -59,6 +59,7 @@ import org.apache.qpid.server.plugin.Mes
 import org.apache.qpid.server.plugin.SystemNodeCreator;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.protocol.MessageConverterRegistry;
+import org.apache.qpid.server.store.MessageDurability;
 import org.apache.qpid.server.store.StorableMessageMetaData;
 import org.apache.qpid.server.store.TransactionLogResource;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
@@ -1010,9 +1011,9 @@ class ManagementNode implements MessageS
     }
 
     @Override
-    public boolean isDurable()
+    public MessageDurability getMessageDurability()
     {
-        return false;
+        return MessageDurability.NEVER;
     }
 
     private class ConsumedMessageInstance implements MessageInstance

Modified: qpid/trunk/qpid/java/broker-plugins/management-http/src/main/java/resources/addQueue.html
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/management-http/src/main/java/resources/addQueue.html?rev=1613440&r1=1613439&r2=1613440&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/management-http/src/main/java/resources/addQueue.html (original)
+++ qpid/trunk/qpid/java/broker-plugins/management-http/src/main/java/resources/addQueue.html Fri Jul 25 14:24:36 2014
@@ -33,6 +33,17 @@
                     <td><input type="checkbox" name="durable" id="formAddQueue.durable" value="durable" checked="checked" dojoType="dijit.form.CheckBox" /></td>
                 </tr>
                 <tr>
+                    <td valign="top"><strong>Persist Messages? </strong></td>
+                    <td>
+                        <select id="formAddQueue.messageDurability" name="messageDurability" data-dojo-type="dijit.form.FilteringSelect"
+                                data-dojo-props="name: 'messageDurability', value: '', searchAttr: 'name', placeHolder: '', value: '', required: false ">
+                            <option value="ALWAYS">Always</option>
+                            <option value="DEFAULT">Default</option>
+                            <option value="NEVER">Never</option>
+                        </select>
+                    </td>
+                </tr>
+                <tr>
                     <td valign="top"><strong>Queue Type: </strong></td>
                     <td>
                     <input type="radio" id="formAddQueueTypeStandard" name="type" value="standard" checked="checked" dojoType="dijit.form.RadioButton" />

Modified: qpid/trunk/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js?rev=1613440&r1=1613439&r2=1613440&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js (original)
+++ qpid/trunk/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js Fri Jul 25 14:24:36 2014
@@ -277,6 +277,7 @@ define(["dojo/_base/xhr",
                storeNodes(["name",
                            "state",
                            "durable",
+                           "messageDurability",
                            "exclusive",
                            "owner",
                            "lifetimePolicy",
@@ -351,6 +352,7 @@ define(["dojo/_base/xhr",
                this.exclusive.innerHTML = entities.encode(String(this.queueData[ "exclusive" ]));
                this.owner.innerHTML = this.queueData[ "owner" ] ? entities.encode(String(this.queueData[ "owner" ])) : "" ;
                this.lifetimePolicy.innerHTML = entities.encode(String(this.queueData[ "lifetimePolicy" ]));
+               this.messageDurability.innerHTML = entities.encode(String(this.queueData[ "messageDurability" ]));
                this.alternateExchange.innerHTML = this.queueData[ "alternateExchange" ] ? entities.encode(String(this.queueData[ "alternateExchange" ])) : "" ;
 
                this.queueDepthMessages.innerHTML = entities.encode(String(this.queueData["queueDepthMessages"]));

Modified: qpid/trunk/qpid/java/broker-plugins/management-http/src/main/java/resources/showQueue.html
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/management-http/src/main/java/resources/showQueue.html?rev=1613440&r1=1613439&r2=1613440&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/management-http/src/main/java/resources/showQueue.html (original)
+++ qpid/trunk/qpid/java/broker-plugins/management-http/src/main/java/resources/showQueue.html Fri Jul 25 14:24:36 2014
@@ -33,6 +33,10 @@
             <div class="durable" style="float:left;"></div>
         </div>
         <div style="clear:both">
+            <div class="formLabel-labelCell" style="float:left; width: 150px;">Persist Messages:</div>
+            <div class="messageDurability" style="float:left;"></div>
+        </div>
+        <div style="clear:both">
             <div class="formLabel-labelCell" style="float:left; width: 150px;">Exclusive:</div>
             <div class="exclusive" style="float:left;"></div>
         </div>

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1613440&r1=1613439&r2=1613440&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Fri Jul 25 14:24:36 2014
@@ -118,7 +118,7 @@ public abstract class AMQSession<C exten
     /** Immediate message prefetch default. */
     public static final String IMMEDIATE_PREFETCH_DEFAULT = "false";
 
-    private final boolean _delareQueues =
+    private final boolean _declareQueues =
         Boolean.parseBoolean(System.getProperty(ClientProperties.QPID_DECLARE_QUEUES_PROP_NAME, "true"));
 
     private final boolean _declareExchanges =
@@ -2871,7 +2871,7 @@ public abstract class AMQSession<C exten
                 declareExchange(amqd, nowait);
             }
 
-            if ((_delareQueues || amqd.isNameRequired()) && !amqd.neverDeclare())
+            if ((_declareQueues || amqd.isNameRequired()) && !amqd.neverDeclare())
             {
                 declareQueue(amqd, consumer.isNoLocal(), nowait);
             }

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=1613440&r1=1613439&r2=1613440&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Fri Jul 25 14:24:36 2014
@@ -24,6 +24,7 @@ import static org.apache.qpid.transport.
 
 import java.lang.ref.WeakReference;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -37,6 +38,9 @@ import java.util.concurrent.ConcurrentLi
 import javax.jms.Destination;
 import javax.jms.JMSException;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.AMQDestination.AddressOption;
 import org.apache.qpid.client.AMQDestination.Binding;
@@ -57,29 +61,9 @@ import org.apache.qpid.exchange.Exchange
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.transport.Connection;
-import org.apache.qpid.transport.ExchangeBoundResult;
-import org.apache.qpid.transport.ExchangeQueryResult;
-import org.apache.qpid.transport.ExecutionErrorCode;
-import org.apache.qpid.transport.ExecutionException;
-import org.apache.qpid.transport.MessageAcceptMode;
-import org.apache.qpid.transport.MessageAcquireMode;
-import org.apache.qpid.transport.MessageCreditUnit;
-import org.apache.qpid.transport.MessageFlowMode;
-import org.apache.qpid.transport.MessageTransfer;
-import org.apache.qpid.transport.Option;
-import org.apache.qpid.transport.QueueQueryResult;
-import org.apache.qpid.transport.Range;
-import org.apache.qpid.transport.RangeSet;
-import org.apache.qpid.transport.RangeSetFactory;
-import org.apache.qpid.transport.Session;
-import org.apache.qpid.transport.SessionException;
-import org.apache.qpid.transport.SessionListener;
-import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.transport.*;
 import org.apache.qpid.util.Serial;
 import org.apache.qpid.util.Strings;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * This is a 0.10 Session
@@ -362,19 +346,24 @@ public class AMQSession_0_10 extends AMQ
                               final AMQDestination destination, final boolean nowait)
             throws AMQException
     {
-        if (destination.getDestSyntax() == DestSyntax.BURL)
+        if (destination == null || destination.getDestSyntax() == DestSyntax.BURL)
         {
             Map args = FieldTableSupport.convertToMap(arguments);
 
-            for (AMQShortString rk: destination.getBindingKeys())
+            if(destination != null)
+            {
+                for (AMQShortString rk: destination.getBindingKeys())
+                {
+                    doSendQueueBind(queueName, exchangeName, args, rk);
+                }
+                if(!Arrays.asList(destination.getBindingKeys()).contains(routingKey))
+                {
+                    doSendQueueBind(queueName, exchangeName, args, routingKey);
+                }
+            }
+            else
             {
-                _logger.debug("Binding queue : " + queueName.toString() +
-                              " exchange: " + exchangeName.toString() +
-                              " using binding key " + rk.asString());
-                getQpidSession().exchangeBind(queueName.toString(),
-                                              exchangeName.toString(),
-                                              rk.toString(),
-                                              args);
+                doSendQueueBind(queueName, exchangeName, args, routingKey);
             }
         }
         else
@@ -420,6 +409,20 @@ public class AMQSession_0_10 extends AMQ
         }
     }
 
+    private void doSendQueueBind(final AMQShortString queueName,
+                                 final AMQShortString exchangeName,
+                                 final Map args,
+                                 final AMQShortString rk)
+    {
+        _logger.debug("Binding queue : " + queueName.toString() +
+                      " exchange: " + exchangeName.toString() +
+                      " using binding key " + rk.asString());
+        getQpidSession().exchangeBind(queueName.toString(),
+                                      exchangeName.toString(),
+                                      rk.toString(),
+                                      args);
+    }
+
 
     /**
      * Close this session.

Added: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueMessageDurabilityTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueMessageDurabilityTest.java?rev=1613440&view=auto
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueMessageDurabilityTest.java (added)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueMessageDurabilityTest.java Fri Jul 25 14:24:36 2014
@@ -0,0 +1,216 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.configuration.ClientProperties;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.store.MessageDurability;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+public class QueueMessageDurabilityTest extends QpidBrokerTestCase
+{
+
+    private static final String QPID_MESSAGE_DURABILITY = "qpid.message_durability";
+    private static final String DURABLE_ALWAYS_PERSIST_NAME = "DURABLE_QUEUE_ALWAYS_PERSIST";
+    private static final String DURABLE_NEVER_PERSIST_NAME = "DURABLE_QUEUE_NEVER_PERSIST";
+    private static final String DURABLE_DEFAULT_PERSIST_NAME = "DURABLE_QUEUE_DEFAULT_PERSIST";
+    private static final String NONDURABLE_ALWAYS_PERSIST_NAME = "NONDURABLE_QUEUE_ALWAYS_PERSIST";
+
+    @Override
+    public void setUp() throws Exception
+    {
+        super.setUp();
+        Connection conn = getConnection();
+        Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        AMQSession amqSession = (AMQSession) session;
+
+        Map<String,Object> arguments = new HashMap<>();
+        arguments.put(QPID_MESSAGE_DURABILITY, MessageDurability.ALWAYS.name());
+        amqSession.createQueue(new AMQShortString(DURABLE_ALWAYS_PERSIST_NAME), false, true, false, arguments);
+
+        arguments = new HashMap<>();
+        arguments.put(QPID_MESSAGE_DURABILITY, MessageDurability.NEVER.name());
+        amqSession.createQueue(new AMQShortString(DURABLE_NEVER_PERSIST_NAME), false, true, false, arguments);
+
+        arguments = new HashMap<>();
+        arguments.put(QPID_MESSAGE_DURABILITY, MessageDurability.DEFAULT.name());
+        amqSession.createQueue(new AMQShortString(DURABLE_DEFAULT_PERSIST_NAME), false, true, false, arguments);
+
+        arguments = new HashMap<>();
+        arguments.put(QPID_MESSAGE_DURABILITY,MessageDurability.ALWAYS.name());
+        amqSession.createQueue(new AMQShortString(NONDURABLE_ALWAYS_PERSIST_NAME), false, false, false, arguments);
+
+        amqSession.bindQueue(AMQShortString.valueOf(DURABLE_ALWAYS_PERSIST_NAME),
+                             AMQShortString.valueOf("Y.*.*.*"),
+                             null,
+                             AMQShortString.valueOf(ExchangeDefaults.TOPIC_EXCHANGE_NAME),
+                             null);
+
+        amqSession.bindQueue(AMQShortString.valueOf(DURABLE_NEVER_PERSIST_NAME),
+                             AMQShortString.valueOf("*.Y.*.*"),
+                             null,
+                             AMQShortString.valueOf(ExchangeDefaults.TOPIC_EXCHANGE_NAME),
+                             null);
+
+        amqSession.bindQueue(AMQShortString.valueOf(DURABLE_DEFAULT_PERSIST_NAME),
+                             AMQShortString.valueOf("*.*.Y.*"),
+                             null,
+                             AMQShortString.valueOf(ExchangeDefaults.TOPIC_EXCHANGE_NAME),
+                             null);
+
+        amqSession.bindQueue(AMQShortString.valueOf(NONDURABLE_ALWAYS_PERSIST_NAME),
+                             AMQShortString.valueOf("*.*.*.Y"),
+                             null,
+                             AMQShortString.valueOf(ExchangeDefaults.TOPIC_EXCHANGE_NAME),
+                             null);
+    }
+
+    public void testSendPersistentMessageToAll() throws Exception
+    {
+        Connection conn = getConnection();
+        Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
+        MessageProducer producer = session.createProducer(null);
+        conn.start();
+        producer.send(session.createTopic("Y.Y.Y.Y"), session.createTextMessage("test"));
+        session.commit();
+
+        AMQSession amqSession = (AMQSession) session;
+        assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_ALWAYS_PERSIST_NAME)));
+        assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_NEVER_PERSIST_NAME)));
+        assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_DEFAULT_PERSIST_NAME)));
+        assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(NONDURABLE_ALWAYS_PERSIST_NAME)));
+
+        restartBroker();
+
+        conn = getConnection();
+        session = conn.createSession(true, Session.SESSION_TRANSACTED);
+        amqSession = (AMQSession) session;
+        assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_ALWAYS_PERSIST_NAME)));
+        assertEquals(0,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_NEVER_PERSIST_NAME)));
+        assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_DEFAULT_PERSIST_NAME)));
+
+        assertFalse(amqSession.isQueueBound((AMQDestination) session.createQueue(NONDURABLE_ALWAYS_PERSIST_NAME)));
+
+    }
+
+
+    public void testSendNonPersistentMessageToAll() throws Exception
+    {
+        Connection conn = getConnection();
+        Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
+        MessageProducer producer = session.createProducer(null);
+        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+        conn.start();
+        producer.send(session.createTopic("Y.Y.Y.Y"), session.createTextMessage("test"));
+        session.commit();
+
+        AMQSession amqSession = (AMQSession) session;
+        assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_ALWAYS_PERSIST_NAME)));
+        assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_NEVER_PERSIST_NAME)));
+        assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_DEFAULT_PERSIST_NAME)));
+        assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(NONDURABLE_ALWAYS_PERSIST_NAME)));
+
+        restartBroker();
+
+        conn = getConnection();
+        session = conn.createSession(true, Session.SESSION_TRANSACTED);
+        amqSession = (AMQSession) session;
+        assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_ALWAYS_PERSIST_NAME)));
+        assertEquals(0,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_NEVER_PERSIST_NAME)));
+        assertEquals(0,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_DEFAULT_PERSIST_NAME)));
+
+        assertFalse(amqSession.isQueueBound((AMQDestination)session.createQueue(NONDURABLE_ALWAYS_PERSIST_NAME)));
+
+    }
+
+    public void testNonPersistentContentRetained() throws Exception
+    {
+        Connection conn = getConnection();
+        Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
+        MessageProducer producer = session.createProducer(null);
+        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+        conn.start();
+        producer.send(session.createTopic("N.N.Y.Y"), session.createTextMessage("test1"));
+        producer.send(session.createTopic("Y.N.Y.Y"), session.createTextMessage("test2"));
+        session.commit();
+        MessageConsumer consumer = session.createConsumer(session.createQueue(DURABLE_ALWAYS_PERSIST_NAME));
+        Message msg = consumer.receive(1000l);
+        assertNotNull(msg);
+        assertTrue(msg instanceof TextMessage);
+        assertEquals("test2", ((TextMessage) msg).getText());
+        session.rollback();
+        restartBroker();
+        conn = getConnection();
+        conn.start();
+        session = conn.createSession(true, Session.SESSION_TRANSACTED);
+        AMQSession amqSession = (AMQSession) session;
+        assertEquals(1, amqSession.getQueueDepth((AMQDestination) session.createQueue(DURABLE_ALWAYS_PERSIST_NAME)));
+        assertEquals(0,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_NEVER_PERSIST_NAME)));
+        assertEquals(0,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_DEFAULT_PERSIST_NAME)));
+        consumer = session.createConsumer(session.createQueue(DURABLE_ALWAYS_PERSIST_NAME));
+        msg = consumer.receive(1000l);
+        assertNotNull(msg);
+        assertTrue(msg instanceof TextMessage);
+        assertEquals("test2", ((TextMessage)msg).getText());
+        session.commit();
+    }
+
+    public void testPersistentContentRetainedOnTransientQueue() throws Exception
+    {
+        setTestClientSystemProperty(ClientProperties.QPID_DECLARE_QUEUES_PROP_NAME, "false");
+        Connection conn = getConnection();
+        Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
+        MessageProducer producer = session.createProducer(null);
+        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+        conn.start();
+        producer.send(session.createTopic("N.N.Y.Y"), session.createTextMessage("test1"));
+        session.commit();
+        MessageConsumer consumer = session.createConsumer(session.createQueue(DURABLE_DEFAULT_PERSIST_NAME));
+        Message msg = consumer.receive(1000l);
+        assertNotNull(msg);
+        assertTrue(msg instanceof TextMessage);
+        assertEquals("test1", ((TextMessage)msg).getText());
+        session.commit();
+        System.gc();
+        consumer = session.createConsumer(session.createQueue(NONDURABLE_ALWAYS_PERSIST_NAME));
+        msg = consumer.receive(1000l);
+        assertNotNull(msg);
+        assertTrue(msg instanceof TextMessage);
+        assertEquals("test1", ((TextMessage)msg).getText());
+        session.commit();
+    }
+
+
+}

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java?rev=1613440&r1=1613439&r2=1613440&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java Fri Jul 25 14:24:36 2014
@@ -24,7 +24,6 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.io.File;
-import java.io.IOException;
 import java.security.PrivilegedAction;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -34,6 +33,8 @@ import java.util.Map;
 
 import javax.security.auth.Subject;
 
+import org.codehaus.jackson.map.ObjectMapper;
+
 import org.apache.qpid.common.AMQPFilterTypes;
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.framing.AMQShortString;
@@ -77,7 +78,6 @@ import org.apache.qpid.server.virtualhos
 import org.apache.qpid.server.virtualhostnode.JsonVirtualHostNode;
 import org.apache.qpid.test.utils.QpidTestCase;
 import org.apache.qpid.util.FileUtils;
-import org.codehaus.jackson.map.ObjectMapper;
 
 /**
  *
@@ -604,7 +604,6 @@ public class VirtualHostMessageStoreTest
         MessageMetaData mmd = new MessageMetaData(messageInfo, headerBody, System.currentTimeMillis());
 
         final StoredMessage<MessageMetaData> storedMessage = _virtualHost.getMessageStore().addMessage(mmd);
-        storedMessage.flushToStore();
         final AMQMessage currentMessage = new AMQMessage(storedMessage);
 
 

Modified: qpid/trunk/qpid/java/test-profiles/CPPExcludes
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/test-profiles/CPPExcludes?rev=1613440&r1=1613439&r2=1613440&view=diff
==============================================================================
--- qpid/trunk/qpid/java/test-profiles/CPPExcludes (original)
+++ qpid/trunk/qpid/java/test-profiles/CPPExcludes Fri Jul 25 14:24:36 2014
@@ -193,3 +193,7 @@ org.apache.qpid.client.HeartbeatTest#tes
 
 // Exclude java broker specific behavior allowing queue re-bind to topic exchanges on 0.8/0-10 paths
 org.apache.qpid.server.queue.QueueBindTest#testQueueCanBeReboundOnTopicExchange
+
+// Tests queue message durability settings which are a Java Broker specific feature
+org.apache.qpid.server.queue.QueueMessageDurabilityTest#*
+

Modified: qpid/trunk/qpid/java/test-profiles/JavaTransientExcludes
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/test-profiles/JavaTransientExcludes?rev=1613440&r1=1613439&r2=1613440&view=diff
==============================================================================
--- qpid/trunk/qpid/java/test-profiles/JavaTransientExcludes (original)
+++ qpid/trunk/qpid/java/test-profiles/JavaTransientExcludes Fri Jul 25 14:24:36 2014
@@ -33,6 +33,8 @@ org.apache.qpid.test.unit.xa.TopicTest#t
 org.apache.qpid.test.unit.xa.TopicTest#testDurSubCrash
 org.apache.qpid.test.unit.xa.TopicTest#testRecover
 
+org.apache.qpid.server.queue.QueueMessageDurabilityTest#*
+
 org.apache.qpid.server.store.VirtualHostMessageStoreTest#testMessagePersistence
 org.apache.qpid.server.store.VirtualHostMessageStoreTest#testMessageRemoval
 org.apache.qpid.server.store.VirtualHostMessageStoreTest#testBindingPersistence



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org