You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/08/30 00:57:47 UTC

activemq-artemis git commit: ARTEMIS-1378 Update Address is broken, may lose messages

Repository: activemq-artemis
Updated Branches:
  refs/heads/master 9df2ffe24 -> 0df12657a


ARTEMIS-1378 Update Address is broken, may lose messages


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/0df12657
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/0df12657
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/0df12657

Branch: refs/heads/master
Commit: 0df12657aff955eca56dee6c88ede9febb3167bd
Parents: 9df2ffe
Author: Clebert Suconic <cl...@apache.org>
Authored: Tue Aug 29 20:25:09 2017 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Tue Aug 29 20:48:01 2017 -0400

----------------------------------------------------------------------
 .../core/persistence/StorageManager.java        |  2 +
 .../journal/AbstractJournalStorageManager.java  | 23 ++++-
 .../impl/nullpm/NullStorageManager.java         |  4 +
 .../core/postoffice/impl/PostOfficeImpl.java    | 81 +++++++++++------
 .../core/server/impl/ActiveMQServerImpl.java    | 12 +--
 .../transaction/impl/TransactionImplTest.java   |  5 ++
 .../integration/client/SendAckFailTest.java     |  5 ++
 .../integration/client/UpdateQueueTest.java     | 92 ++++++++++++++++++++
 8 files changed, 185 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0df12657/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
index 369dba6..ba32252 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
@@ -283,6 +283,8 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent {
 
    void addQueueBinding(long tx, Binding binding) throws Exception;
 
+   void updateQueueBinding(long tx, Binding binding) throws Exception;
+
    void deleteQueueBinding(long tx, long queueBindingID) throws Exception;
 
    /**

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0df12657/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
index 58c86a0..d3db9e5 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
@@ -1221,7 +1221,16 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
    // BindingsImpl operations
 
    @Override
+   public void updateQueueBinding(long tx, Binding binding) throws Exception {
+      internalQueueBinding(true, tx, binding);
+   }
+
+   @Override
    public void addQueueBinding(final long tx, final Binding binding) throws Exception {
+      internalQueueBinding(false, tx, binding);
+   }
+
+   private void internalQueueBinding(boolean update, final long tx, final Binding binding) throws Exception {
       Queue queue = (Queue) binding.getBindable();
 
       Filter filter = queue.getFilter();
@@ -1232,7 +1241,14 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
 
       readLock();
       try {
-         bindingsJournal.appendAddRecordTransactional(tx, binding.getID(), JournalRecordIds.QUEUE_BINDING_RECORD, bindingEncoding);
+
+         if (update) {
+            System.out.println("Update " + binding.getID());
+            bindingsJournal.appendUpdateRecordTransactional(tx, binding.getID(), JournalRecordIds.QUEUE_BINDING_RECORD, bindingEncoding);
+         } else {
+            System.out.println("Adding " + binding.getID());
+            bindingsJournal.appendAddRecordTransactional(tx, binding.getID(), JournalRecordIds.QUEUE_BINDING_RECORD, bindingEncoding);
+         }
       } finally {
          readUnLock();
       }
@@ -1402,7 +1418,6 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
 
          if (rec == JournalRecordIds.QUEUE_BINDING_RECORD) {
             PersistentQueueBindingEncoding bindingEncoding = newQueueBindingEncoding(id, buffer);
-            queueBindingInfos.add(bindingEncoding);
             mapBindings.put(bindingEncoding.getId(), bindingEncoding);
          } else if (rec == JournalRecordIds.ID_COUNTER_RECORD) {
             idGenerator.loadState(record.id, buffer);
@@ -1434,6 +1449,10 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
          }
       }
 
+      for (PersistentQueueBindingEncoding queue : mapBindings.values()) {
+         queueBindingInfos.add(queue);
+      }
+
       mapBindings.clear(); // just to give a hand to GC
 
       // This will instruct the IDGenerator to beforeStop old records

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0df12657/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
index 2c297d9..32f9010 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
@@ -92,6 +92,10 @@ public class NullStorageManager implements StorageManager {
    }
 
    @Override
+   public void updateQueueBinding(long tx, Binding binding) throws Exception {
+   }
+
+   @Override
    public void deleteQueueStatus(long recordID) throws Exception {
 
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0df12657/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index a64394a..3e5ede5 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -437,6 +437,24 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
       }
    }
 
+   /** used on update queue, to validate when a value needs update */
+   private static int replaceNull(Integer value) {
+      if (value == null) {
+         return -1;
+      } else {
+         return value.intValue();
+      }
+   }
+
+   /** used on update queue, to validate when a value needs update */
+   private static boolean replaceNull(Boolean value) {
+      if (value == null) {
+         return false;
+      } else {
+         return value.booleanValue();
+      }
+   }
+
    @Override
    public QueueBinding updateQueue(SimpleString name,
                                    RoutingType routingType,
@@ -447,8 +465,17 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
          if (queueBinding == null) {
             return null;
          }
+
          final Queue queue = queueBinding.getQueue();
-         //TODO put the whole update logic on Queue
+
+         if (queue.getRoutingType() == routingType && replaceNull(maxConsumers) == replaceNull(queue.getMaxConsumers()) && queue.isPurgeOnNoConsumers() == replaceNull(purgeOnNoConsumers)) {
+
+            if (logger.isTraceEnabled()) {
+               logger.tracef("Queue " + name + " didn't need to be updated");
+            }
+            return queueBinding;
+         }
+
          //validate update
          if (maxConsumers != null && maxConsumers.intValue() != Queue.MAX_CONSUMERS_UNLIMITED) {
             final int consumerCount = queue.getConsumerCount();
@@ -464,6 +491,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
                throw ActiveMQMessageBundle.BUNDLE.invalidRoutingTypeUpdate(name.toString(), routingType, address.toString(), addressRoutingTypes);
             }
          }
+
          //atomic update
          if (maxConsumers != null) {
             queue.setMaxConsumer(maxConsumers);
@@ -474,6 +502,17 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
          if (purgeOnNoConsumers != null) {
             queue.setPurgeOnNoConsumers(purgeOnNoConsumers);
          }
+
+         final long txID = storageManager.generateID();
+         try {
+            storageManager.updateQueueBinding(txID, queueBinding);
+            storageManager.commitBindings(txID);
+         } catch (Throwable throwable) {
+            storageManager.rollback(txID);
+            logger.warn(throwable.getMessage(), throwable);
+            throw throwable;
+         }
+
          return queueBinding;
       }
    }
@@ -518,7 +557,6 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
       return queues;
    }
 
-
    // TODO - needs to be synchronized to prevent happening concurrently with activate()
    // (and possible removeBinding and other methods)
    // Otherwise can have situation where createQueue comes in before failover, then failover occurs
@@ -662,15 +700,12 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
    }
 
    @Override
-   public RoutingStatus route(final Message message,
-                              final boolean direct) throws Exception {
+   public RoutingStatus route(final Message message, final boolean direct) throws Exception {
       return route(message, (Transaction) null, direct);
    }
 
    @Override
-   public RoutingStatus route(final Message message,
-                              final Transaction tx,
-                              final boolean direct) throws Exception {
+   public RoutingStatus route(final Message message, final Transaction tx, final boolean direct) throws Exception {
       return route(message, new RoutingContextImpl(tx), direct);
    }
 
@@ -721,11 +756,11 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
       // first check for the auto-queue creation thing
       if (bindings == null) {
          // There is no queue with this address, we will check if it needs to be created
-//         if (queueCreator.create(address)) {
-            // TODO: this is not working!!!!
-            // reassign bindings if it was created
-//            bindings = addressManager.getBindingsForRoutingAddress(address);
-//         }
+         //         if (queueCreator.create(address)) {
+         // TODO: this is not working!!!!
+         // reassign bindings if it was created
+         //            bindings = addressManager.getBindingsForRoutingAddress(address);
+         //         }
       }
 
       if (bindings != null) {
@@ -786,8 +821,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
             server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeMessageRoute(message, context, direct, rejectDuplicates) : null);
             processRoute(message, context, direct);
             final RoutingStatus finalResult = result;
-            server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterMessageRoute(message, context, direct,
-                  rejectDuplicates, finalResult) : null);
+            server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterMessageRoute(message, context, direct, rejectDuplicates, finalResult) : null);
          } catch (ActiveMQAddressFullException e) {
             if (startedTX.get()) {
                context.getTransaction().rollback();
@@ -818,9 +852,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
    }
 
    @Override
-   public MessageReference reroute(final Message message,
-                                   final Queue queue,
-                                   final Transaction tx) throws Exception {
+   public MessageReference reroute(final Message message, final Queue queue, final Transaction tx) throws Exception {
 
       setPagingStore(message);
 
@@ -853,8 +885,8 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
     */
    @Override
    public Pair<RoutingContext, Message> redistribute(final Message message,
-                                                           final Queue originatingQueue,
-                                                           final Transaction tx) throws Exception {
+                                                     final Queue originatingQueue,
+                                                     final Transaction tx) throws Exception {
       // We have to copy the message and store it separately, otherwise we may lose remote bindings in case of restart before the message
       // arrived the target node
       // as described on https://issues.jboss.org/browse/JBPAPP-6130
@@ -912,7 +944,9 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
    }
 
    @Override
-   public SimpleString getMatchingQueue(SimpleString address, SimpleString queueName, RoutingType routingType) throws Exception {
+   public SimpleString getMatchingQueue(SimpleString address,
+                                        SimpleString queueName,
+                                        RoutingType routingType) throws Exception {
       return addressManager.getMatchingQueue(address, queueName, routingType);
    }
 
@@ -1006,16 +1040,13 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
 
    // Private -----------------------------------------------------------------
 
-
    private void setPagingStore(final Message message) throws Exception {
       PagingStore store = pagingManager.getPageStore(message.getAddressSimpleString());
 
       message.setContext(store);
    }
 
-   private void routeQueueInfo(final Message message,
-                               final Queue queue,
-                               final boolean applyFilters) throws Exception {
+   private void routeQueueInfo(final Message message, final Queue queue, final boolean applyFilters) throws Exception {
       if (!applyFilters || queue.getFilter() == null || queue.getFilter().match(message)) {
          RoutingContext context = new RoutingContextImpl(null);
 
@@ -1098,7 +1129,6 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
                }
             }
 
-
             if (deliveryTime != null) {
                reference.setScheduledDeliveryTime(deliveryTime);
             }
@@ -1127,7 +1157,6 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
                   storageManager.storeReference(queue.getID(), message.getMessageID(), !iter.hasNext());
                }
 
-
                if (deliveryTime > 0) {
                   if (tx != null) {
                      storageManager.updateScheduledDeliveryTimeTransactional(tx.getID(), reference);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0df12657/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 4093e21..13f6965 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -2809,20 +2809,10 @@ public class ActiveMQServerImpl implements ActiveMQServer {
                             RoutingType routingType,
                             Integer maxConsumers,
                             Boolean purgeOnNoConsumers) throws Exception {
+
       final QueueBinding queueBinding = this.postOffice.updateQueue(new SimpleString(name), routingType, maxConsumers, purgeOnNoConsumers);
       if (queueBinding != null) {
          final Queue queue = queueBinding.getQueue();
-         if (queue.isDurable()) {
-            final long txID = storageManager.generateID();
-            try {
-               storageManager.deleteQueueBinding(txID, queueBinding.getID());
-               storageManager.addQueueBinding(txID, queueBinding);
-               storageManager.commitBindings(txID);
-            } catch (Throwable throwable) {
-               storageManager.rollbackBindings(txID);
-               throw throwable;
-            }
-         }
          return queue;
       } else {
          return null;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0df12657/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
index b1ea206..b256eb9 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
@@ -230,6 +230,11 @@ public class TransactionImplTest extends ActiveMQTestBase {
       }
 
       @Override
+      public void updateQueueBinding(long tx, Binding binding) throws Exception {
+
+      }
+
+      @Override
       public void stop(boolean ioCriticalError, boolean sendFailover) throws Exception {
 
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0df12657/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java
index a098436..d37d134 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java
@@ -271,6 +271,11 @@ public class SendAckFailTest extends ActiveMQTestBase {
       }
 
       @Override
+      public void updateQueueBinding(long tx, Binding binding) throws Exception {
+         manager.updateQueueBinding(tx, binding);
+      }
+
+      @Override
       public boolean isStarted() {
          return manager.isStarted();
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0df12657/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/UpdateQueueTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/UpdateQueueTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/UpdateQueueTest.java
new file mode 100644
index 0000000..542fe33
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/UpdateQueueTest.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.client;
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class UpdateQueueTest extends ActiveMQTestBase {
+
+   @Test
+   public void testUpdateQueue() throws Exception {
+      ActiveMQServer server = createServer(true, true);
+
+      ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
+
+      server.start();
+
+
+      SimpleString ADDRESS = SimpleString.toSimpleString("queue.0");
+
+      server.createQueue(ADDRESS, RoutingType.ANYCAST, ADDRESS, null,
+                         null, true, false);
+
+      Connection conn = factory.createConnection();
+      Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      MessageProducer prod = session.createProducer(session.createQueue(ADDRESS.toString()));
+
+      for (int i = 0; i < 100; i++) {
+         prod.send(session.createTextMessage("message " + i));
+      }
+
+      server.updateQueue(ADDRESS.toString(), RoutingType.ANYCAST, 1, false);
+
+      conn.close();
+      factory.close();
+
+      server.stop();
+
+      server.start();
+
+      Queue queue = server.locateQueue(ADDRESS);
+
+      Assert.assertNotNull("queue not found", queue);
+
+      factory = new ActiveMQConnectionFactory();
+
+      conn = factory.createConnection();
+      session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+      MessageConsumer consumer = session.createConsumer(session.createQueue(ADDRESS.toString()));
+
+      conn.start();
+      for (int i = 0; i < 100; i++) {
+         Assert.assertNotNull(consumer.receive(5000));
+      }
+
+      Assert.assertNull(consumer.receiveNoWait());
+
+      Assert.assertEquals(1, queue.getMaxConsumers());
+
+      conn.close();
+
+      server.stop();
+
+   }
+}