You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/03/31 04:31:47 UTC
[65/69] [abbrv] activemq-artemis git commit: Some tweaks to the code
Some tweaks to the code
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/44a6622b
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/44a6622b
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/44a6622b
Branch: refs/heads/refactor-openwire
Commit: 44a6622b8a83057fb84fd4123147855751783387
Parents: 8f3db59
Author: Clebert Suconic <cl...@apache.org>
Authored: Fri Mar 18 14:30:52 2016 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Mar 30 22:29:44 2016 -0400
----------------------------------------------------------------------
.../protocol/openwire/OpenWireConnection.java | 17 +--
.../openwire/OpenWireMessageConverter.java | 2 +-
.../amq/AMQCompositeConsumerBrokerExchange.java | 9 +-
.../core/protocol/openwire/amq/AMQConsumer.java | 144 ++++++++++---------
.../core/protocol/openwire/amq/AMQSession.java | 45 +++---
5 files changed, 125 insertions(+), 92 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/44a6622b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index 17f26b0..e8259c3 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -426,16 +426,16 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
}
}
- public void addConsumerBrokerExchange(ConsumerId id,
+ private void addConsumerBrokerExchange(ConsumerId id,
AMQSession amqSession,
- Map<ActiveMQDestination, AMQConsumer> consumerMap) {
+ List<AMQConsumer> consumerList) {
AMQConsumerBrokerExchange result = consumerExchanges.get(id);
if (result == null) {
- if (consumerMap.size() == 1) {
- result = new AMQSingleConsumerBrokerExchange(amqSession, consumerMap.values().iterator().next());
+ if (consumerList.size() == 1) {
+ result = new AMQSingleConsumerBrokerExchange(amqSession, consumerList.get(0));
}
else {
- result = new AMQCompositeConsumerBrokerExchange(amqSession, consumerMap);
+ result = new AMQCompositeConsumerBrokerExchange(amqSession, consumerList);
}
synchronized (consumerExchanges) {
consumerExchanges.put(id, result);
@@ -717,9 +717,11 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
throw new IllegalStateException("Session not exist! : " + sessionId);
}
- amqSession.createConsumer(info, amqSession, new SlowConsumerDetection());
+ List<AMQConsumer> consumersList = amqSession.createConsumer(info, amqSession, new SlowConsumerDetection());
+ this.addConsumerBrokerExchange(info.getConsumerId(), amqSession, consumersList);
ss.addConsumer(info);
+ amqSession.start();
}
}
@@ -729,7 +731,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
public void onSlowConsumer(ServerConsumer consumer) {
if (consumer instanceof AMQServerConsumer) {
AMQServerConsumer serverConsumer = (AMQServerConsumer)consumer;
- ActiveMQTopic topic = AdvisorySupport.getSlowConsumerAdvisoryTopic(serverConsumer.getAmqConsumer().getDestination());
+ ActiveMQTopic topic = AdvisorySupport.getSlowConsumerAdvisoryTopic(serverConsumer.getAmqConsumer().getOpenwireDestination());
ActiveMQMessage advisoryMessage = new ActiveMQMessage();
try {
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID, serverConsumer.getAmqConsumer().getId().toString());
@@ -1002,7 +1004,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
@Override
public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
- new Exception("commit").printStackTrace();
try {
protocolManager.commitTransactionOnePhase(info);
TransactionId txId = info.getTransactionId();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/44a6622b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
index 6176490..89f71ed 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
@@ -443,7 +443,7 @@ public class OpenWireMessageConverter implements MessageConverter {
public static MessageDispatch createMessageDispatch(ServerMessage message,
int deliveryCount,
AMQConsumer consumer) throws IOException, JMSException {
- ActiveMQMessage amqMessage = toAMQMessage(message, consumer.getMarshaller(), consumer.getActualDestination());
+ ActiveMQMessage amqMessage = toAMQMessage(message, consumer.getMarshaller(), consumer.getOpenwireDestination());
MessageDispatch md = new MessageDispatch();
md.setConsumerId(consumer.getId());
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/44a6622b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQCompositeConsumerBrokerExchange.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQCompositeConsumerBrokerExchange.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQCompositeConsumerBrokerExchange.java
index 7e83767..56b4b6d 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQCompositeConsumerBrokerExchange.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQCompositeConsumerBrokerExchange.java
@@ -20,15 +20,20 @@ import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessagePull;
+import java.util.HashMap;
+import java.util.List;
import java.util.Map;
public class AMQCompositeConsumerBrokerExchange extends AMQConsumerBrokerExchange {
private final Map<ActiveMQDestination, AMQConsumer> consumerMap;
- public AMQCompositeConsumerBrokerExchange(AMQSession amqSession, Map<ActiveMQDestination, AMQConsumer> consumerMap) {
+ public AMQCompositeConsumerBrokerExchange(AMQSession amqSession, List<AMQConsumer> consumerList) {
super(amqSession);
- this.consumerMap = consumerMap;
+ this.consumerMap = new HashMap<>();
+ for (AMQConsumer consumer : consumerList) {
+ consumerMap.put(consumer.getOpenwireDestination(), consumer);
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/44a6622b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index d296213..b4056fb 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -35,7 +35,6 @@ import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
-import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
import org.apache.activemq.command.ConsumerControl;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
@@ -49,11 +48,10 @@ import org.apache.activemq.wireformat.WireFormat;
public class AMQConsumer {
private AMQSession session;
- private org.apache.activemq.command.ActiveMQDestination actualDest;
+ private org.apache.activemq.command.ActiveMQDestination openwireDestination;
private ConsumerInfo info;
private final ScheduledExecutorService scheduledPool;
private long nativeId = -1;
- private SimpleString subQueueName = null;
private int prefetchSize;
private AtomicInteger windowAvailable;
@@ -66,7 +64,7 @@ public class AMQConsumer {
ConsumerInfo info,
ScheduledExecutorService scheduledPool) {
this.session = amqSession;
- this.actualDest = d;
+ this.openwireDestination = d;
this.info = info;
this.scheduledPool = scheduledPool;
this.prefetchSize = info.getPrefetchSize();
@@ -76,73 +74,38 @@ public class AMQConsumer {
}
}
- public void init(SlowConsumerDetectionListener slowConsumerDetectionListener) throws Exception {
- AMQServerSession coreSession = session.getCoreSession();
-
- SimpleString selector = info.getSelector() == null ? null : new SimpleString(info.getSelector());
-
- nativeId = session.getCoreServer().getStorageManager().generateID();
-
- SimpleString address = new SimpleString(this.actualDest.getPhysicalName());
-
- if (this.actualDest.isTopic()) {
- String physicalName = this.actualDest.getPhysicalName();
- if (physicalName.contains(".>")) {
- //wildcard
- physicalName = OpenWireUtil.convertWildcard(physicalName);
- }
-
- // on recreate we don't need to create queues
- address = new SimpleString("jms.topic." + physicalName);
- if (info.isDurable()) {
- subQueueName = new SimpleString(ActiveMQDestination.createQueueNameForDurableSubscription(true, info.getClientId(), info.getSubscriptionName()));
-
- QueueQueryResult result = coreSession.executeQueueQuery(subQueueName);
- if (result.isExists()) {
- // Already exists
- if (result.getConsumerCount() > 0) {
- throw new IllegalStateException("Cannot create a subscriber on the durable subscription since it already has subscriber(s)");
- }
+ public void init(SlowConsumerDetectionListener slowConsumerDetectionListener, long nativeId) throws Exception {
+ this.nativeId = nativeId;
+ AMQServerConsumer serverConsumer = createServerConsumer(info, slowConsumerDetectionListener);
+ serverConsumer.setAmqConsumer(this);
+ }
- SimpleString oldFilterString = result.getFilterString();
- boolean selectorChanged = selector == null && oldFilterString != null || oldFilterString == null && selector != null || oldFilterString != null && selector != null && !oldFilterString.equals(selector);
+ private AMQServerConsumer createServerConsumer(ConsumerInfo info, SlowConsumerDetectionListener slowConsumerDetectionListener) throws Exception {
- SimpleString oldTopicName = result.getAddress();
+ SimpleString selector = info.getSelector() == null ? null : new SimpleString(info.getSelector());
- boolean topicChanged = !oldTopicName.equals(address);
+ String physicalName = OpenWireUtil.convertWildcard(openwireDestination.getPhysicalName());
- if (selectorChanged || topicChanged) {
- // Delete the old durable sub
- coreSession.deleteQueue(subQueueName);
+ SimpleString address;
- // Create the new one
- coreSession.createQueue(address, subQueueName, selector, false, true);
- }
+ if (openwireDestination.isTopic()) {
+ address = new SimpleString("jms.topic." + physicalName);
- }
- else {
- coreSession.createQueue(address, subQueueName, selector, false, true);
- }
- }
- else {
- subQueueName = new SimpleString(UUID.randomUUID().toString());
+ SimpleString queueName = createTopicSubscription(info.isDurable(), info.getClientId(), physicalName, info.getSubscriptionName(), selector, address);
- coreSession.createQueue(address, subQueueName, selector, true, false);
- }
-
- AMQServerConsumer serverConsumer = (AMQServerConsumer) coreSession.createConsumer(nativeId, subQueueName, null, info.isBrowser(), false, -1);
+ AMQServerConsumer serverConsumer = (AMQServerConsumer) session.getCoreSession().createConsumer(nativeId, queueName, null, info.isBrowser(), false, -1);
serverConsumer.setlowConsumerDetection(slowConsumerDetectionListener);
- serverConsumer.setAmqConsumer(this);
+ return serverConsumer;
}
else {
- SimpleString queueName = new SimpleString("jms.queue." + this.actualDest.getPhysicalName());
- AMQServerConsumer serverConsumer = (AMQServerConsumer)coreSession.createConsumer(nativeId, queueName, selector, info.isBrowser(), false, -1);
- serverConsumer.setAmqConsumer(this);
+ SimpleString queueName = new SimpleString("jms.queue." + physicalName);
+ AMQServerConsumer serverConsumer = (AMQServerConsumer) session.getCoreSession().createConsumer(nativeId, queueName, selector, info.isBrowser(), false, -1);
+ serverConsumer.setlowConsumerDetection(slowConsumerDetectionListener);
AddressSettings addrSettings = session.getCoreServer().getAddressSettingsRepository().getMatch(queueName.toString());
if (addrSettings != null) {
//see PolicyEntry
- if (prefetchSize != 0 && addrSettings.getQueuePrefetch() == 0) {
+ if (info.getPrefetchSize() != 0 && addrSettings.getQueuePrefetch() == 0) {
//sends back a ConsumerControl
ConsumerControl cc = new ConsumerControl();
cc.setConsumerId(info.getConsumerId());
@@ -150,9 +113,63 @@ public class AMQConsumer {
session.getConnection().dispatch(cc);
}
}
+
+ return serverConsumer;
+
+ }
+
+ }
+
+ private SimpleString createTopicSubscription(boolean isDurable,
+ String clientID,
+ String physicalName,
+ String subscriptionName,
+ SimpleString selector,
+ SimpleString address) throws Exception {
+
+ SimpleString queueName;
+
+ if (isDurable) {
+ queueName = new SimpleString(org.apache.activemq.artemis.jms.client.ActiveMQDestination.createQueueNameForDurableSubscription(true, clientID, subscriptionName));
+ QueueQueryResult result = session.getCoreSession().executeQueueQuery(queueName);
+ if (result.isExists()) {
+ // Already exists
+ if (result.getConsumerCount() > 0) {
+ throw new IllegalStateException("Cannot create a subscriber on the durable subscription since it already has subscriber(s)");
+ }
+
+ SimpleString oldFilterString = result.getFilterString();
+
+ boolean selectorChanged = selector == null && oldFilterString != null || oldFilterString == null && selector != null || oldFilterString != null && selector != null && !oldFilterString.equals(selector);
+
+ SimpleString oldTopicName = result.getAddress();
+
+ boolean topicChanged = !oldTopicName.equals(address);
+
+ if (selectorChanged || topicChanged) {
+ // Delete the old durable sub
+ session.getCoreSession().deleteQueue(queueName);
+
+ // Create the new one
+ session.getCoreSession().createQueue(address, queueName, selector, false, true);
+ }
+ }
+ else {
+ session.getCoreSession().createQueue(address, queueName, selector, false, true);
+ }
+ }
+ else {
+ queueName = new SimpleString(UUID.randomUUID().toString());
+
+ session.getCoreSession().createQueue(address, queueName, selector, true, false);
+
}
+
+ return queueName;
}
+
+
public long getNativeId() {
return this.nativeId;
}
@@ -200,7 +217,7 @@ public class AMQConsumer {
public void handleDeliverNullDispatch() {
MessageDispatch md = new MessageDispatch();
md.setConsumerId(getId());
- md.setDestination(actualDest);
+ md.setDestination(openwireDestination);
session.deliverMessage(md);
windowAvailable.decrementAndGet();
}
@@ -351,10 +368,6 @@ public class AMQConsumer {
}
}
- public org.apache.activemq.command.ActiveMQDestination getDestination() {
- return actualDest;
- }
-
public ConsumerInfo getInfo() {
return info;
}
@@ -375,8 +388,8 @@ public class AMQConsumer {
session.removeConsumer(nativeId);
}
- public org.apache.activemq.command.ActiveMQDestination getActualDestination() {
- return actualDest;
+ public org.apache.activemq.command.ActiveMQDestination getOpenwireDestination() {
+ return openwireDestination;
}
public void setPrefetchSize(int prefetchSize) {
@@ -388,6 +401,9 @@ public class AMQConsumer {
}
}
+ /**
+ * The MessagePullHandler is used with slow consumer policies.
+ * */
private class MessagePullHandler {
private long next = -1;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/44a6622b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index 86ea582..4675dca 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -18,7 +18,6 @@ package org.apache.activemq.artemis.core.protocol.openwire.amq;
import javax.jms.ResourceAllocationException;
import javax.transaction.xa.Xid;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@@ -45,6 +44,8 @@ import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
+import org.apache.activemq.artemis.utils.IDGenerator;
+import org.apache.activemq.artemis.utils.SimpleIDGenerator;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerId;
@@ -62,6 +63,10 @@ import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.wireformat.WireFormat;
public class AMQSession implements SessionCallback {
+
+ // ConsumerID is generated inside the session, 0, 1, 2, ... as many consumers as you have on the session
+ protected final IDGenerator idGenerator = new SimpleIDGenerator(0);
+
private ConnectionInfo connInfo;
private AMQServerSession coreSession;
private SessionInfo sessInfo;
@@ -98,7 +103,7 @@ public class AMQSession implements SessionCallback {
this.connection = connection;
this.scheduledPool = scheduledPool;
this.manager = manager;
- OpenWireFormat marshaller = (OpenWireFormat)connection.getMarshaller();
+ OpenWireFormat marshaller = (OpenWireFormat) connection.getMarshaller();
this.converter = new OpenWireMessageConverter(marshaller.copy());
}
@@ -130,7 +135,9 @@ public class AMQSession implements SessionCallback {
}
- public void createConsumer(ConsumerInfo info, AMQSession amqSession, SlowConsumerDetectionListener slowConsumerDetectionListener) throws Exception {
+ public List<AMQConsumer> createConsumer(ConsumerInfo info,
+ AMQSession amqSession,
+ SlowConsumerDetectionListener slowConsumerDetectionListener) throws Exception {
//check destination
ActiveMQDestination dest = info.getDestination();
ActiveMQDestination[] dests = null;
@@ -140,25 +147,32 @@ public class AMQSession implements SessionCallback {
else {
dests = new ActiveMQDestination[]{dest};
}
- Map<ActiveMQDestination, AMQConsumer> consumerMap = new HashMap<>();
- for (ActiveMQDestination d : dests) {
- if (d.isQueue()) {
- SimpleString queueName = OpenWireUtil.toCoreAddress(d);
+// Map<ActiveMQDestination, AMQConsumer> consumerMap = new HashMap<>();
+ List<AMQConsumer> consumersList = new java.util.LinkedList<>();
+
+ for (ActiveMQDestination openWireDest : dests) {
+ if (openWireDest.isQueue()) {
+ SimpleString queueName = OpenWireUtil.toCoreAddress(openWireDest);
getCoreServer().getJMSQueueCreator().create(queueName);
}
- AMQConsumer consumer = new AMQConsumer(this, d, info, scheduledPool);
+ AMQConsumer consumer = new AMQConsumer(this, openWireDest, info, scheduledPool);
- consumer.init(slowConsumerDetectionListener);
- consumerMap.put(d, consumer);
+ consumer.init(slowConsumerDetectionListener, idGenerator.generateID());
+ consumersList.add(consumer);
consumers.put(consumer.getNativeId(), consumer);
}
- connection.addConsumerBrokerExchange(info.getConsumerId(), amqSession, consumerMap);
- // TODO: This is wrong. We should only start when the client starts
+ return consumersList;
+ }
+
+ public void start() {
+
coreSession.start();
started.set(true);
+
}
+ // rename actualDest to destination
@Override
public void afterDelivery() throws Exception {
@@ -166,7 +180,7 @@ public class AMQSession implements SessionCallback {
@Override
public void browserFinished(ServerConsumer consumer) {
- AMQConsumer theConsumer = ((AMQServerConsumer)consumer).getAmqConsumer();
+ AMQConsumer theConsumer = ((AMQServerConsumer) consumer).getAmqConsumer();
if (theConsumer != null) {
theConsumer.browseFinished();
}
@@ -235,7 +249,6 @@ public class AMQSession implements SessionCallback {
}
-
public void send(final ProducerInfo producerInfo,
final Message messageSend,
boolean sendProducerAck) throws Exception {
@@ -286,7 +299,7 @@ public class AMQSession implements SessionCallback {
else {
final Connection transportConnection = connection.getTransportConnection();
-// new Exception("Setting to false").printStackTrace();
+ // new Exception("Setting to false").printStackTrace();
if (transportConnection == null) {
// I don't think this could happen, but just in case, avoiding races
@@ -301,7 +314,6 @@ public class AMQSession implements SessionCallback {
}
}
-
internalSend(actualDestinations, originalCoreMsg, runnable);
}
@@ -340,7 +352,6 @@ public class AMQSession implements SessionCallback {
}
}
-
for (int i = 0; i < actualDestinations.length; i++) {
ServerMessage coreMsg = originalCoreMsg.copy();