You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/12/21 17:06:58 UTC

[3/5] activemq-artemis git commit: ARTEMIS-883 Fix OpenWire ProducerFlowControlTest

ARTEMIS-883 Fix OpenWire ProducerFlowControlTest


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/ae90edfd
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/ae90edfd
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/ae90edfd

Branch: refs/heads/master
Commit: ae90edfdb624e1cdf4a4b36cbf82dbc7a1be8295
Parents: 0b131bd
Author: Howard Gao <ho...@gmail.com>
Authored: Tue Dec 20 21:13:38 2016 +0800
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Dec 21 12:04:06 2016 -0500

----------------------------------------------------------------------
 .../protocol/openwire/OpenWireConnection.java   |  18 ++
 .../core/protocol/openwire/amq/AMQSession.java  | 169 +++++++++----------
 .../integration/openwire/BasicOpenWireTest.java |  11 +-
 .../openwire/amq/ProducerBlockingTtlTest.java   | 147 ++++++++++++++++
 4 files changed, 255 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ae90edfd/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index f90c0b7..0bcff66 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -183,6 +183,8 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
 
    private final Set<SimpleString> knownDestinations = new ConcurrentHashSet<>();
 
+   private AtomicBoolean disableTtl = new AtomicBoolean(false);
+
    // TODO-NOW: check on why there are two connections created for every createConnection on the client.
    public OpenWireConnection(Connection connection,
                              ActiveMQServer server,
@@ -776,6 +778,14 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
       this.connectionEntry = connectionEntry;
    }
 
+   @Override
+   public boolean checkDataReceived() {
+      if (disableTtl.get()) {
+         return true;
+      }
+      return super.checkDataReceived();
+   }
+
    public void setUpTtl(final long inactivityDuration,
                         final long inactivityDurationInitialDelay,
                         final boolean useKeepAlive) {
@@ -818,6 +828,14 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
       }
    }
 
+   public void disableTtl() {
+      disableTtl.set(true);
+   }
+
+   public void enableTtl() {
+      disableTtl.set(false);
+   }
+
    class SlowConsumerDetection implements SlowConsumerDetectionListener {
 
       @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ae90edfd/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index 9c592ca..006f05e 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -40,9 +40,7 @@ import org.apache.activemq.artemis.core.server.ServerConsumer;
 import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.ServerSession;
 import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
-import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
 import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
-import org.apache.activemq.artemis.spi.core.remoting.Connection;
 import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
 import org.apache.activemq.artemis.utils.IDGenerator;
 import org.apache.activemq.artemis.utils.SimpleIDGenerator;
@@ -53,12 +51,12 @@ import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageDispatch;
 import org.apache.activemq.command.ProducerAck;
 import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.Response;
 import org.apache.activemq.command.SessionInfo;
 import org.apache.activemq.openwire.OpenWireFormat;
 import org.apache.activemq.wireformat.WireFormat;
 
 public class AMQSession implements SessionCallback {
-
    // ConsumerID is generated inside the session, 0, 1, 2, ... as many consumers as you have on the session
    protected final IDGenerator consumerIDGenerator = new SimpleIDGenerator(0);
 
@@ -303,108 +301,103 @@ public class AMQSession implements SessionCallback {
          originalCoreMsg.putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(), messageSend.getMessageId().toString());
       }
 
-      Runnable runnable;
-
-      if (sendProducerAck) {
-         runnable = new Runnable() {
-            @Override
-            public void run() {
-               try {
-                  ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
-                  connection.dispatchSync(ack);
-               } catch (Exception e) {
-                  ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
-                  connection.sendException(e);
-               }
-
-            }
-         };
-      } else {
-         final Connection transportConnection = connection.getTransportConnection();
-
-         if (transportConnection == null) {
-            // I don't think this could happen, but just in case, avoiding races
-            runnable = null;
-         } else {
-            runnable = new Runnable() {
-               @Override
-               public void run() {
-                  transportConnection.setAutoRead(true);
-               }
-            };
-         }
-      }
-
-      internalSend(actualDestinations, originalCoreMsg, runnable);
-   }
+      boolean shouldBlockProducer = producerInfo.getWindowSize() > 0 || messageSend.isResponseRequired();
 
-   private void internalSend(ActiveMQDestination[] actualDestinations,
-                             ServerMessage originalCoreMsg,
-                             final Runnable onComplete) throws Exception {
+      final AtomicInteger count = new AtomicInteger(actualDestinations.length);
 
-      Runnable runToUse;
+      final Exception[] anyException = new Exception[] {null};
 
-      if (actualDestinations.length <= 1 || onComplete == null) {
-         // if onComplete is null, this will be null ;)
-         runToUse = onComplete;
-      } else {
-         final AtomicInteger count = new AtomicInteger(actualDestinations.length);
-         runToUse = new Runnable() {
-            @Override
-            public void run() {
-               if (count.decrementAndGet() == 0) {
-                  onComplete.run();
-               }
-            }
-         };
+      if (shouldBlockProducer) {
+         connection.getContext().setDontSendReponse(true);
       }
 
-      SimpleString[] addresses = new SimpleString[actualDestinations.length];
-      PagingStore[] pagingStores = new PagingStore[actualDestinations.length];
-
-      // We fillup addresses, pagingStores and we will throw failure if that's the case
       for (int i = 0; i < actualDestinations.length; i++) {
          ActiveMQDestination dest = actualDestinations[i];
-         addresses[i] = new SimpleString(dest.getPhysicalName());
-         pagingStores[i] = server.getPagingManager().getPageStore(addresses[i]);
-         if (pagingStores[i].getAddressFullMessagePolicy() == AddressFullMessagePolicy.FAIL && pagingStores[i].isFull()) {
-            throw new ResourceAllocationException("Queue is full");
-         }
-      }
-
-      for (int i = 0; i < actualDestinations.length; i++) {
-
+         SimpleString address = new SimpleString(dest.getPhysicalName());
          ServerMessage coreMsg = originalCoreMsg.copy();
-
-         coreMsg.setAddress(addresses[i]);
-
-         PagingStore store = pagingStores[i];
-
-         if (store.isFull()) {
-            connection.getTransportConnection().setAutoRead(false);
-         }
+         coreMsg.setAddress(address);
 
          if (actualDestinations[i].isQueue()) {
             checkAutoCreateQueue(new SimpleString(actualDestinations[i].getPhysicalName()), actualDestinations[i].isTemporary());
-         }
-
-         if (actualDestinations[i].isQueue()) {
             coreMsg.putByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE, RoutingType.ANYCAST.getType());
          } else {
             coreMsg.putByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE, RoutingType.MULTICAST.getType());
          }
-         RoutingStatus result = getCoreSession().send(coreMsg, false, actualDestinations[i].isTemporary());
+         PagingStore store = server.getPagingManager().getPageStore(address);
 
-         if (result == RoutingStatus.NO_BINDINGS && actualDestinations[i].isQueue()) {
-            throw new InvalidDestinationException("Cannot publish to a non-existent Destination: " + actualDestinations[i]);
-         }
 
-         if (runToUse != null) {
-            // if the timeout is >0, it will wait this much milliseconds
-            // before running the the runToUse
-            // this will eventually unblock blocked destinations
-            // playing flow control
-            store.checkMemory(runToUse);
+         this.connection.disableTtl();
+         if (shouldBlockProducer) {
+            if (!store.checkMemory(() -> {
+               try {
+                  RoutingStatus result = getCoreSession().send(coreMsg, false, dest.isTemporary());
+
+                  if (result == RoutingStatus.NO_BINDINGS && dest.isQueue()) {
+                     throw new InvalidDestinationException("Cannot publish to a non-existent Destination: " + dest);
+                  }
+               } catch (Exception e) {
+                  if (anyException[0] == null) {
+                     anyException[0] = e;
+                  }
+               }
+               connection.enableTtl();
+               if (count.decrementAndGet() == 0) {
+                  if (anyException[0] != null) {
+                     this.connection.getContext().setDontSendReponse(false);
+                     ActiveMQServerLogger.LOGGER.warn(anyException[0].getMessage(), anyException[0]);
+                     connection.sendException(anyException[0]);
+                  } else {
+                     if (sendProducerAck) {
+                        try {
+                           ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
+                           connection.dispatchAsync(ack);
+                        } catch (Exception e) {
+                           this.connection.getContext().setDontSendReponse(false);
+                           ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
+                           connection.sendException(e);
+                        }
+                     } else {
+                        connection.getContext().setDontSendReponse(false);
+                        try {
+                           Response response = new Response();
+                           response.setCorrelationId(messageSend.getCommandId());
+                           connection.dispatchAsync(response);
+                        } catch (Exception e) {
+                           ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
+                           connection.sendException(e);
+                        }
+                     }
+                  }
+               }
+            })) {
+               this.connection.getContext().setDontSendReponse(false);
+               connection.enableTtl();
+               throw new ResourceAllocationException("Queue is full " + address);
+            }
+         } else {
+            //non-persistent messages goes here, by default we stop reading from
+            //transport
+            connection.getTransportConnection().setAutoRead(false);
+            if (!store.checkMemory(() -> {
+               connection.getTransportConnection().setAutoRead(true);
+               connection.enableTtl();
+            })) {
+               connection.getTransportConnection().setAutoRead(true);
+               connection.enableTtl();
+               throw new ResourceAllocationException("Queue is full " + address);
+            }
+
+            RoutingStatus result = getCoreSession().send(coreMsg, false, dest.isTemporary());
+            if (result == RoutingStatus.NO_BINDINGS && dest.isQueue()) {
+               throw new InvalidDestinationException("Cannot publish to a non-existent Destination: " + dest);
+            }
+
+            if (count.decrementAndGet() == 0) {
+               if (sendProducerAck) {
+                  ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
+                  connection.dispatchAsync(ack);
+               }
+            }
          }
       }
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ae90edfd/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicOpenWireTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicOpenWireTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicOpenWireTest.java
index 6be92f8..d01e237 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicOpenWireTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicOpenWireTest.java
@@ -45,8 +45,8 @@ public class BasicOpenWireTest extends OpenWireTestBase {
    public TestName name = new TestName();
 
    protected static final String urlString = "tcp://" + OWHOST + ":" + OWPORT + "?wireFormat.cacheEnabled=true";
-   protected ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(urlString);
-   protected ActiveMQXAConnectionFactory xaFactory = new ActiveMQXAConnectionFactory(urlString);
+   protected ActiveMQConnectionFactory factory;
+   protected ActiveMQXAConnectionFactory xaFactory;
 
    protected ActiveMQConnection connection;
    protected String topicName = "amqTestTopic1";
@@ -64,6 +64,9 @@ public class BasicOpenWireTest extends OpenWireTestBase {
    @Before
    public void setUp() throws Exception {
       super.setUp();
+      System.setProperty("org.apache.activemq.transport.AbstractInactivityMonitor.keepAliveTime", "5");
+      factory = new ActiveMQConnectionFactory(getConnectionUrl());
+      xaFactory = new ActiveMQXAConnectionFactory(getConnectionUrl());
       SimpleString coreQueue = new SimpleString(queueName);
       this.server.createQueue(coreQueue, RoutingType.ANYCAST, coreQueue, null, false, false, -1, false, true);
       testQueues.put(queueName, coreQueue);
@@ -81,6 +84,10 @@ public class BasicOpenWireTest extends OpenWireTestBase {
       }
    }
 
+   protected String getConnectionUrl() {
+      return urlString;
+   }
+
    @Override
    @After
    public void tearDown() throws Exception {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ae90edfd/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/ProducerBlockingTtlTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/ProducerBlockingTtlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/ProducerBlockingTtlTest.java
new file mode 100644
index 0000000..8473d67
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/ProducerBlockingTtlTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.openwire.amq;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.transport.tcp.TcpTransport;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class ProducerBlockingTtlTest extends BasicOpenWireTest {
+
+   ActiveMQQueue queueA = new ActiveMQQueue("QUEUE.A");
+   protected ActiveMQConnection flowControlConnection;
+
+   @Override
+   protected void extraServerConfig(Configuration serverConfig) {
+      String match = "#";
+      Map<String, AddressSettings> asMap = serverConfig.getAddressesSettings();
+      asMap.get(match).setMaxSizeBytes(1).setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
+   }
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+      this.makeSureCoreQueueExist("QUEUE.A");
+   }
+
+   @Override
+   @After
+   public void tearDown() throws Exception {
+      try {
+         if (flowControlConnection != null) {
+            TcpTransport t = flowControlConnection.getTransport().narrow(TcpTransport.class);
+            try {
+               flowControlConnection.getTransport().stop();
+               flowControlConnection.close();
+            } catch (Throwable ignored) {
+            }
+         }
+      } finally {
+         super.tearDown();
+      }
+   }
+
+   //set ttl to 1000
+   @Override
+   protected String getConnectionUrl() {
+      return urlString + "&wireFormat.maxInactivityDuration=1000&wireFormat.maxInactivityDurationInitalDelay=1000";
+   }
+
+   @Test
+   public void testProducerBlockWontGetTimeout() throws Exception {
+
+      flowControlConnection = (ActiveMQConnection) factory.createConnection();
+      Connection consumerConnection = factory.createConnection();
+      Thread fillThread = null;
+      AtomicBoolean keepGoing = new AtomicBoolean(true);
+      try {
+         flowControlConnection.start();
+
+         final Session session = flowControlConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         final MessageProducer producer = session.createProducer(queueA);
+         producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+         final String text = "Hello World";
+         final int num = 10;
+
+         fillThread = new Thread("Fill thread.") {
+            @Override
+            public void run() {
+               try {
+                  for (int i = 0; i < num && keepGoing.get(); i++) {
+                     producer.send(session.createTextMessage(text + i));
+                  }
+               } catch (JMSException e) {
+               }
+            }
+         };
+
+         fillThread.start();
+
+         //longer enough than TTL (1000)
+         Thread.sleep(4000);
+
+         //receive messages and unblock the producer
+         consumerConnection.start();
+         Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageConsumer consumer = consumerSession.createConsumer(queueA);
+
+         for (int i = 0; i < num; i++) {
+            TextMessage m = (TextMessage) consumer.receive(5000);
+            assertNotNull(m);
+            assertEquals("Hello World" + i, m.getText());
+         }
+         assertNull(consumer.receive(3));
+
+      } catch (Exception e) {
+         e.printStackTrace();
+      } finally {
+
+         if (fillThread != null) {
+            keepGoing.set(false);
+            fillThread.interrupt();
+            fillThread.join();
+         }
+         try {
+            flowControlConnection.close();
+            flowControlConnection = null;
+         } catch (Throwable t) {
+         }
+         try {
+            consumerConnection.close();
+         } catch (Throwable t) {
+         }
+      }
+   }
+}