You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/03/16 16:22:17 UTC

[48/61] [abbrv] activemq-artemis git commit: more openwire test fixing added a queuePrefetch config param to AddressSettings

more openwire test fixing
added a queuePrefetch config param to AddressSettings


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/e72dd99b
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/e72dd99b
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/e72dd99b

Branch: refs/heads/refactor-openwire
Commit: e72dd99b67b2ae4e12750492ccb575e1d0ae48af
Parents: b3b8a2f
Author: Howard Gao <ho...@gmail.com>
Authored: Wed Mar 2 20:55:37 2016 +0800
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Mar 16 11:19:15 2016 -0400

----------------------------------------------------------------------
 .../protocol/openwire/OpenWireConnection.java   |  2 +-
 .../openwire/OpenWireProtocolManager.java       |  2 +-
 .../core/protocol/openwire/amq/AMQConsumer.java | 13 +++++++++
 .../core/protocol/openwire/amq/AMQSession.java  |  4 +++
 .../core/settings/impl/AddressSettings.java     | 26 +++++++++++++++++
 .../artemiswrapper/ArtemisBrokerWrapper.java    |  2 ++
 .../transport/tcp/TcpTransportFactory.java      |  9 +++++-
 .../activemq/ActiveMQConnectionFactoryTest.java |  5 +++-
 .../activemq/QueueConsumerPriorityTest.java     |  1 +
 .../activemq/ZeroPrefetchConsumerTest.java      | 30 +++++++++++---------
 10 files changed, 76 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e72dd99b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index 1e1e953..03871ab 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -388,7 +388,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
       dispatchAsync(ce);
    }
 
-   protected void dispatch(Command command) throws IOException {
+   public void dispatch(Command command) throws IOException {
       this.physicalSend(command);
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e72dd99b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
index 7445960..122788e 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
@@ -524,7 +524,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
 
    public void sendBrokerInfo(OpenWireConnection connection) throws Exception {
       BrokerInfo brokerInfo = new BrokerInfo();
-      brokerInfo.setBrokerName(server.getIdentity());
+      brokerInfo.setBrokerName(getBrokerName());
       brokerInfo.setBrokerId(new BrokerId("" + server.getNodeID()));
       brokerInfo.setPeerBrokerInfos(null);
       brokerInfo.setFaultTolerantConfiguration(false);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e72dd99b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index 221679f..7c2a9bd 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -34,7 +34,9 @@ import org.apache.activemq.artemis.core.protocol.openwire.OpenWireUtil;
 import org.apache.activemq.artemis.core.server.QueueQueryResult;
 import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
+import org.apache.activemq.command.ConsumerControl;
 import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.MessageAck;
@@ -136,6 +138,17 @@ public class AMQConsumer implements BrowserListener {
       else {
          SimpleString queueName = new SimpleString("jms.queue." + this.actualDest.getPhysicalName());
          coreSession.createConsumer(nativeId, queueName, selector, info.isBrowser(), false, -1);
+         AddressSettings addrSettings = session.getCoreServer().getAddressSettingsRepository().getMatch(queueName.toString());
+         if (addrSettings != null) {
+            //see PolicyEntry
+            if (prefetchSize != 0 && addrSettings.getQueuePrefetch() == 0) {
+               //sends back a ConsumerControl
+               ConsumerControl cc = new ConsumerControl();
+               cc.setConsumerId(info.getConsumerId());
+               cc.setPrefetch(0);
+               session.getConnection().dispatch(cc);
+            }
+         }
       }
 
       if (info.isBrowser()) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e72dd99b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index b68861e..e3d2266 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -542,4 +542,8 @@ public class AMQSession implements SessionCallback {
          }
       }
    }
+
+   public OpenWireConnection getConnection() {
+      return connection;
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e72dd99b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
index 3309fab..4b53ec6 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
@@ -70,6 +70,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
 
    public static final SlowConsumerPolicy DEFAULT_SLOW_CONSUMER_POLICY = SlowConsumerPolicy.NOTIFY;
 
+   public static final int DEFAULT_QUEUE_PREFETCH = 1000;
+
    private AddressFullMessagePolicy addressFullMessagePolicy = null;
 
    private Long maxSizeBytes = null;
@@ -114,6 +116,10 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
 
    private Integer managementBrowsePageSize = AddressSettings.MANAGEMENT_BROWSE_PAGE_SIZE;
 
+   //from amq5
+   //make it transient
+   private transient Integer queuePrefetch = null;
+
    public AddressSettings(AddressSettings other) {
       this.addressFullMessagePolicy = other.addressFullMessagePolicy;
       this.maxSizeBytes = other.maxSizeBytes;
@@ -137,6 +143,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
       this.autoCreateJmsQueues = other.autoCreateJmsQueues;
       this.autoDeleteJmsQueues = other.autoDeleteJmsQueues;
       this.managementBrowsePageSize = other.managementBrowsePageSize;
+      this.queuePrefetch = other.queuePrefetch;
    }
 
    public AddressSettings() {
@@ -333,6 +340,15 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
       return this;
    }
 
+   public int getQueuePrefetch() {
+      return queuePrefetch != null ? queuePrefetch : AddressSettings.DEFAULT_QUEUE_PREFETCH;
+   }
+
+   public AddressSettings setQueuePrefetch(int queuePrefetch) {
+      this.queuePrefetch = queuePrefetch;
+      return this;
+   }
+
    /**
     * merge 2 objects in to 1
     *
@@ -403,6 +419,9 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
       if (managementBrowsePageSize == null) {
          managementBrowsePageSize = merged.managementBrowsePageSize;
       }
+      if (queuePrefetch == null) {
+         queuePrefetch = merged.queuePrefetch;
+      }
    }
 
    @Override
@@ -569,6 +588,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
       result = prime * result + ((autoCreateJmsQueues == null) ? 0 : autoCreateJmsQueues.hashCode());
       result = prime * result + ((autoDeleteJmsQueues == null) ? 0 : autoDeleteJmsQueues.hashCode());
       result = prime * result + ((managementBrowsePageSize == null) ? 0 : managementBrowsePageSize.hashCode());
+      result = prime * result + ((queuePrefetch == null) ? 0 : queuePrefetch.hashCode());
       return result;
    }
 
@@ -718,6 +738,12 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
       }
       else if (!managementBrowsePageSize.equals(other.managementBrowsePageSize))
          return false;
+      if (queuePrefetch == null) {
+         if (other.queuePrefetch != null)
+            return false;
+      }
+      else if (!queuePrefetch.equals(other.queuePrefetch))
+         return false;
       return true;
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e72dd99b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java
index 112d425..94faf26 100644
--- a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java
+++ b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java
@@ -210,6 +210,8 @@ public class ArtemisBrokerWrapper extends ArtemisBrokerBase {
                settings.setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL);
             }
          }
+         int queuePrefetch = entry.getQueuePrefetch();
+         settings.setQueuePrefetch(queuePrefetch);
       }
 
       PolicyEntry defaultEntry = policyMap.getDefaultEntry();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e72dd99b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java
index c44dd72..c0ed126 100644
--- a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java
+++ b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java
@@ -52,10 +52,17 @@ public class TcpTransportFactory extends TransportFactory {
       //here check broker, if no broker, we start one
       Map<String, String> params = URISupport.parseParameters(location);
       String brokerId = params.remove("invmBrokerId");
+      boolean autoCreate = true;
+      String create = params.remove("create");
+      if (create != null)
+      {
+         autoCreate = "true".equals(create);
+      }
+
       URI location1 = URISupport.createRemainingURI(location, Collections.EMPTY_MAP);
 
       LOG.info("deciding whether starting an internal broker: " + brokerService + " flag: " + BrokerService.disableWrapper);
-      if (brokerService == null && !BrokerService.disableWrapper && BrokerService.checkPort(location1.getPort())) {
+      if (autoCreate && brokerService == null && !BrokerService.disableWrapper && BrokerService.checkPort(location1.getPort())) {
 
          LOG.info("starting internal broker: " + location1);
          ArtemisBrokerHelper.startArtemisBroker(location1);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e72dd99b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ActiveMQConnectionFactoryTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ActiveMQConnectionFactoryTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ActiveMQConnectionFactoryTest.java
index e1ea7e6..8769324 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ActiveMQConnectionFactoryTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ActiveMQConnectionFactoryTest.java
@@ -110,6 +110,7 @@ public class ActiveMQConnectionFactoryTest extends CombinationTestSupport {
       connection.close();
    }
 
+   //we don't support in-vm connector (will we?)
    public void testCreateVMConnectionWithEmbdeddBroker() throws URISyntaxException, JMSException {
       ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://myBroker2?broker.persistent=false");
       // Make sure the broker is not created until the connection is
@@ -124,7 +125,9 @@ public class ActiveMQConnectionFactoryTest extends CombinationTestSupport {
       connection.close();
 
       // Verify the broker was destroyed.
-      assertNull(BrokerRegistry.getInstance().lookup("myBroker2"));
+      //I comment out this because this is pure client behavior in
+      //amq5. there shouldn't be any use-case like that with Artemis.
+      //assertNull(BrokerRegistry.getInstance().lookup("myBroker2"));
    }
 
    public void testGetBrokerName() throws URISyntaxException, JMSException {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e72dd99b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/QueueConsumerPriorityTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/QueueConsumerPriorityTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/QueueConsumerPriorityTest.java
index 296f52b..4ae2feb 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/QueueConsumerPriorityTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/QueueConsumerPriorityTest.java
@@ -30,6 +30,7 @@ import org.apache.activemq.artemiswrapper.ArtemisBrokerHelper;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.transport.tcp.TcpTransportFactory;
 
+//https://issues.apache.org/jira/browse/ARTEMIS-196
 public class QueueConsumerPriorityTest extends TestCase {
 
    private static final String VM_BROKER_URL = "vm://localhost?broker.persistent=false&broker.useJmx=true";

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e72dd99b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java
index 953032b..a9a564b 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java
@@ -26,10 +26,11 @@ import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.TextMessage;
 
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl;
+import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
 import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.Subscription;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ConsumerControl;
@@ -349,8 +350,10 @@ public class ZeroPrefetchConsumerTest extends EmbeddedBrokerTestSupport {
       assertEquals("broker config prefetch in effect", 0, consumer.info.getCurrentPrefetchSize());
 
       // verify sub view broker
-      Subscription sub = broker.getRegionBroker().getDestinationMap().get(ActiveMQDestination.transform(brokerZeroQueue)).getConsumers().get(0);
-      assertEquals("broker sub prefetch is correct", 0, sub.getConsumerInfo().getCurrentPrefetchSize());
+      // I comment out this because it checks broker internal
+      // which doesn't apply to artemis broker.
+      //Subscription sub = broker.getRegionBroker().getDestinationMap().get(ActiveMQDestination.transform(brokerZeroQueue)).getConsumers().get(0);
+      //assertEquals("broker sub prefetch is correct", 0, sub.getConsumerInfo().getCurrentPrefetchSize());
 
       // manipulate Prefetch (like failover and stomp)
       ConsumerControl consumerControl = new ConsumerControl();
@@ -361,18 +364,17 @@ public class ZeroPrefetchConsumerTest extends EmbeddedBrokerTestSupport {
       Object reply = ((ActiveMQConnection) connection).getTransport().request(consumerControl);
       assertTrue("good request", !(reply instanceof ExceptionResponse));
       assertEquals("broker config prefetch in effect", 0, consumer.info.getCurrentPrefetchSize());
-      assertEquals("broker sub prefetch is correct", 0, sub.getConsumerInfo().getCurrentPrefetchSize());
    }
 
    @Override
-   protected BrokerService createBroker() throws Exception {
-      BrokerService brokerService = super.createBroker();
-      PolicyMap policyMap = new PolicyMap();
-      PolicyEntry zeroPrefetchPolicy = new PolicyEntry();
-      zeroPrefetchPolicy.setQueuePrefetch(0);
-      policyMap.put(ActiveMQDestination.transform(brokerZeroQueue), zeroPrefetchPolicy);
-      brokerService.setDestinationPolicy(policyMap);
-      return brokerService;
+   public EmbeddedJMS createArtemisBroker() throws Exception {
+      Configuration config0 = createConfig("localhost", 0);
+      String coreQueueAddress = "jms.queue." + brokerZeroQueue.getQueueName();
+      AddressSettings addrSettings = new AddressSettings();
+      addrSettings.setQueuePrefetch(0);
+      config0.getAddressesSettings().put(coreQueueAddress, addrSettings);
+      EmbeddedJMS newbroker = new EmbeddedJMS().setConfiguration(config0).setJmsConfiguration(new JMSConfigurationImpl());
+      return newbroker;
    }
 
    @Override