You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/03/18 02:42:46 UTC
[57/65] [abbrv] activemq-artemis git commit: more openwire test
fixing added a queuePrefetch config param to AddressSettings
more openwire test fixing
added a queuePrefetch config param to AddressSettings
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/3189d659
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/3189d659
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/3189d659
Branch: refs/heads/refactor-openwire
Commit: 3189d6592c67d1b2c589f15766beb5b1a954a293
Parents: 95f76e2
Author: Howard Gao <ho...@gmail.com>
Authored: Wed Mar 2 20:55:37 2016 +0800
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Mar 17 14:10:46 2016 -0400
----------------------------------------------------------------------
.../protocol/openwire/OpenWireConnection.java | 2 +-
.../openwire/OpenWireProtocolManager.java | 2 +-
.../core/protocol/openwire/amq/AMQConsumer.java | 13 +++++++++
.../core/protocol/openwire/amq/AMQSession.java | 4 +++
.../core/settings/impl/AddressSettings.java | 26 +++++++++++++++++
.../artemiswrapper/ArtemisBrokerWrapper.java | 2 ++
.../transport/tcp/TcpTransportFactory.java | 9 +++++-
.../activemq/ActiveMQConnectionFactoryTest.java | 5 +++-
.../activemq/QueueConsumerPriorityTest.java | 1 +
.../activemq/ZeroPrefetchConsumerTest.java | 30 +++++++++++---------
10 files changed, 76 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3189d659/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index 1e1e953..03871ab 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -388,7 +388,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
dispatchAsync(ce);
}
- protected void dispatch(Command command) throws IOException {
+ public void dispatch(Command command) throws IOException {
this.physicalSend(command);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3189d659/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
index 7445960..122788e 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
@@ -524,7 +524,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
public void sendBrokerInfo(OpenWireConnection connection) throws Exception {
BrokerInfo brokerInfo = new BrokerInfo();
- brokerInfo.setBrokerName(server.getIdentity());
+ brokerInfo.setBrokerName(getBrokerName());
brokerInfo.setBrokerId(new BrokerId("" + server.getNodeID()));
brokerInfo.setPeerBrokerInfos(null);
brokerInfo.setFaultTolerantConfiguration(false);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3189d659/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index 221679f..7c2a9bd 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -34,7 +34,9 @@ import org.apache.activemq.artemis.core.protocol.openwire.OpenWireUtil;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
+import org.apache.activemq.command.ConsumerControl;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.MessageAck;
@@ -136,6 +138,17 @@ public class AMQConsumer implements BrowserListener {
else {
SimpleString queueName = new SimpleString("jms.queue." + this.actualDest.getPhysicalName());
coreSession.createConsumer(nativeId, queueName, selector, info.isBrowser(), false, -1);
+ AddressSettings addrSettings = session.getCoreServer().getAddressSettingsRepository().getMatch(queueName.toString());
+ if (addrSettings != null) {
+ //see PolicyEntry
+ if (prefetchSize != 0 && addrSettings.getQueuePrefetch() == 0) {
+ //sends back a ConsumerControl
+ ConsumerControl cc = new ConsumerControl();
+ cc.setConsumerId(info.getConsumerId());
+ cc.setPrefetch(0);
+ session.getConnection().dispatch(cc);
+ }
+ }
}
if (info.isBrowser()) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3189d659/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index b68861e..e3d2266 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -542,4 +542,8 @@ public class AMQSession implements SessionCallback {
}
}
}
+
+ public OpenWireConnection getConnection() {
+ return connection;
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3189d659/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
index 3309fab..4b53ec6 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
@@ -70,6 +70,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
public static final SlowConsumerPolicy DEFAULT_SLOW_CONSUMER_POLICY = SlowConsumerPolicy.NOTIFY;
+ public static final int DEFAULT_QUEUE_PREFETCH = 1000;
+
private AddressFullMessagePolicy addressFullMessagePolicy = null;
private Long maxSizeBytes = null;
@@ -114,6 +116,10 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
private Integer managementBrowsePageSize = AddressSettings.MANAGEMENT_BROWSE_PAGE_SIZE;
+ //from amq5
+ //make it transient
+ private transient Integer queuePrefetch = null;
+
public AddressSettings(AddressSettings other) {
this.addressFullMessagePolicy = other.addressFullMessagePolicy;
this.maxSizeBytes = other.maxSizeBytes;
@@ -137,6 +143,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
this.autoCreateJmsQueues = other.autoCreateJmsQueues;
this.autoDeleteJmsQueues = other.autoDeleteJmsQueues;
this.managementBrowsePageSize = other.managementBrowsePageSize;
+ this.queuePrefetch = other.queuePrefetch;
}
public AddressSettings() {
@@ -333,6 +340,15 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
return this;
}
+ public int getQueuePrefetch() {
+ return queuePrefetch != null ? queuePrefetch : AddressSettings.DEFAULT_QUEUE_PREFETCH;
+ }
+
+ public AddressSettings setQueuePrefetch(int queuePrefetch) {
+ this.queuePrefetch = queuePrefetch;
+ return this;
+ }
+
/**
* merge 2 objects in to 1
*
@@ -403,6 +419,9 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
if (managementBrowsePageSize == null) {
managementBrowsePageSize = merged.managementBrowsePageSize;
}
+ if (queuePrefetch == null) {
+ queuePrefetch = merged.queuePrefetch;
+ }
}
@Override
@@ -569,6 +588,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
result = prime * result + ((autoCreateJmsQueues == null) ? 0 : autoCreateJmsQueues.hashCode());
result = prime * result + ((autoDeleteJmsQueues == null) ? 0 : autoDeleteJmsQueues.hashCode());
result = prime * result + ((managementBrowsePageSize == null) ? 0 : managementBrowsePageSize.hashCode());
+ result = prime * result + ((queuePrefetch == null) ? 0 : queuePrefetch.hashCode());
return result;
}
@@ -718,6 +738,12 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
}
else if (!managementBrowsePageSize.equals(other.managementBrowsePageSize))
return false;
+ if (queuePrefetch == null) {
+ if (other.queuePrefetch != null)
+ return false;
+ }
+ else if (!queuePrefetch.equals(other.queuePrefetch))
+ return false;
return true;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3189d659/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java
index 112d425..94faf26 100644
--- a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java
+++ b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java
@@ -210,6 +210,8 @@ public class ArtemisBrokerWrapper extends ArtemisBrokerBase {
settings.setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL);
}
}
+ int queuePrefetch = entry.getQueuePrefetch();
+ settings.setQueuePrefetch(queuePrefetch);
}
PolicyEntry defaultEntry = policyMap.getDefaultEntry();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3189d659/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java
index c44dd72..c0ed126 100644
--- a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java
+++ b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java
@@ -52,10 +52,17 @@ public class TcpTransportFactory extends TransportFactory {
//here check broker, if no broker, we start one
Map<String, String> params = URISupport.parseParameters(location);
String brokerId = params.remove("invmBrokerId");
+ boolean autoCreate = true;
+ String create = params.remove("create");
+ if (create != null)
+ {
+ autoCreate = "true".equals(create);
+ }
+
URI location1 = URISupport.createRemainingURI(location, Collections.EMPTY_MAP);
LOG.info("deciding whether starting an internal broker: " + brokerService + " flag: " + BrokerService.disableWrapper);
- if (brokerService == null && !BrokerService.disableWrapper && BrokerService.checkPort(location1.getPort())) {
+ if (autoCreate && brokerService == null && !BrokerService.disableWrapper && BrokerService.checkPort(location1.getPort())) {
LOG.info("starting internal broker: " + location1);
ArtemisBrokerHelper.startArtemisBroker(location1);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3189d659/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ActiveMQConnectionFactoryTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ActiveMQConnectionFactoryTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ActiveMQConnectionFactoryTest.java
index e1ea7e6..8769324 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ActiveMQConnectionFactoryTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ActiveMQConnectionFactoryTest.java
@@ -110,6 +110,7 @@ public class ActiveMQConnectionFactoryTest extends CombinationTestSupport {
connection.close();
}
+ //we don't support in-vm connector (will we?)
public void testCreateVMConnectionWithEmbdeddBroker() throws URISyntaxException, JMSException {
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://myBroker2?broker.persistent=false");
// Make sure the broker is not created until the connection is
@@ -124,7 +125,9 @@ public class ActiveMQConnectionFactoryTest extends CombinationTestSupport {
connection.close();
// Verify the broker was destroyed.
- assertNull(BrokerRegistry.getInstance().lookup("myBroker2"));
+ //I comment out this because this is pure client behavior in
+ //amq5. there shouldn't be any use-case like that with Artemis.
+ //assertNull(BrokerRegistry.getInstance().lookup("myBroker2"));
}
public void testGetBrokerName() throws URISyntaxException, JMSException {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3189d659/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/QueueConsumerPriorityTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/QueueConsumerPriorityTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/QueueConsumerPriorityTest.java
index 296f52b..4ae2feb 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/QueueConsumerPriorityTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/QueueConsumerPriorityTest.java
@@ -30,6 +30,7 @@ import org.apache.activemq.artemiswrapper.ArtemisBrokerHelper;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.transport.tcp.TcpTransportFactory;
+//https://issues.apache.org/jira/browse/ARTEMIS-196
public class QueueConsumerPriorityTest extends TestCase {
private static final String VM_BROKER_URL = "vm://localhost?broker.persistent=false&broker.useJmx=true";
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3189d659/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java
index 953032b..a9a564b 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java
@@ -26,10 +26,11 @@ import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl;
+import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.Subscription;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ConsumerControl;
@@ -349,8 +350,10 @@ public class ZeroPrefetchConsumerTest extends EmbeddedBrokerTestSupport {
assertEquals("broker config prefetch in effect", 0, consumer.info.getCurrentPrefetchSize());
// verify sub view broker
- Subscription sub = broker.getRegionBroker().getDestinationMap().get(ActiveMQDestination.transform(brokerZeroQueue)).getConsumers().get(0);
- assertEquals("broker sub prefetch is correct", 0, sub.getConsumerInfo().getCurrentPrefetchSize());
+ // I comment out this because it checks broker internal
+ // which doesn't apply to artemis broker.
+ //Subscription sub = broker.getRegionBroker().getDestinationMap().get(ActiveMQDestination.transform(brokerZeroQueue)).getConsumers().get(0);
+ //assertEquals("broker sub prefetch is correct", 0, sub.getConsumerInfo().getCurrentPrefetchSize());
// manipulate Prefetch (like failover and stomp)
ConsumerControl consumerControl = new ConsumerControl();
@@ -361,18 +364,17 @@ public class ZeroPrefetchConsumerTest extends EmbeddedBrokerTestSupport {
Object reply = ((ActiveMQConnection) connection).getTransport().request(consumerControl);
assertTrue("good request", !(reply instanceof ExceptionResponse));
assertEquals("broker config prefetch in effect", 0, consumer.info.getCurrentPrefetchSize());
- assertEquals("broker sub prefetch is correct", 0, sub.getConsumerInfo().getCurrentPrefetchSize());
}
@Override
- protected BrokerService createBroker() throws Exception {
- BrokerService brokerService = super.createBroker();
- PolicyMap policyMap = new PolicyMap();
- PolicyEntry zeroPrefetchPolicy = new PolicyEntry();
- zeroPrefetchPolicy.setQueuePrefetch(0);
- policyMap.put(ActiveMQDestination.transform(brokerZeroQueue), zeroPrefetchPolicy);
- brokerService.setDestinationPolicy(policyMap);
- return brokerService;
+ public EmbeddedJMS createArtemisBroker() throws Exception {
+ Configuration config0 = createConfig("localhost", 0);
+ String coreQueueAddress = "jms.queue." + brokerZeroQueue.getQueueName();
+ AddressSettings addrSettings = new AddressSettings();
+ addrSettings.setQueuePrefetch(0);
+ config0.getAddressesSettings().put(coreQueueAddress, addrSettings);
+ EmbeddedJMS newbroker = new EmbeddedJMS().setConfiguration(config0).setJmsConfiguration(new JMSConfigurationImpl());
+ return newbroker;
}
@Override