You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2016/04/04 18:09:45 UTC
[36/42] activemq-artemis git commit: ARTEMIS-463 Refactoring on
Openwire https://issues.apache.org/jira/browse/ARTEMIS-463
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index 926aebd..4675dca 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -16,9 +16,8 @@
*/
package org.apache.activemq.artemis.core.protocol.openwire.amq;
+import javax.jms.ResourceAllocationException;
import javax.transaction.xa.Xid;
-import java.io.IOException;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@@ -26,52 +25,56 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection;
+import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter;
+import org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManager;
+import org.apache.activemq.artemis.core.protocol.openwire.OpenWireUtil;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.apache.activemq.artemis.core.server.ServerConsumer;
+import org.apache.activemq.artemis.core.server.ServerMessage;
+import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
+import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
+import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
+import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
+import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
+import org.apache.activemq.artemis.utils.IDGenerator;
+import org.apache.activemq.artemis.utils.SimpleIDGenerator;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.command.ExceptionResponse;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.ProducerAck;
-import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.ProducerInfo;
-import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.TransactionInfo;
import org.apache.activemq.command.XATransactionId;
-import org.apache.activemq.artemis.core.paging.impl.PagingStoreImpl;
-import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection;
-import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter;
-import org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManager;
-import org.apache.activemq.artemis.core.protocol.openwire.OpenWireUtil;
-import org.apache.activemq.artemis.core.protocol.openwire.SendingResult;
-import org.apache.activemq.artemis.core.server.ActiveMQServer;
-import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
-import org.apache.activemq.artemis.core.server.ServerConsumer;
-import org.apache.activemq.artemis.core.server.ServerMessage;
-import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
-import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
-import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
+import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.wireformat.WireFormat;
public class AMQSession implements SessionCallback {
- private AMQServerSession coreSession;
+
+ // ConsumerID is generated inside the session, 0, 1, 2, ... as many consumers as you have on the session
+ protected final IDGenerator idGenerator = new SimpleIDGenerator(0);
+
private ConnectionInfo connInfo;
+ private AMQServerSession coreSession;
private SessionInfo sessInfo;
private ActiveMQServer server;
private OpenWireConnection connection;
private Map<Long, AMQConsumer> consumers = new ConcurrentHashMap<>();
- private Map<Long, AMQProducer> producers = new HashMap<>();
-
private AtomicBoolean started = new AtomicBoolean(false);
private TransactionId txId = null;
@@ -82,6 +85,11 @@ public class AMQSession implements SessionCallback {
private OpenWireProtocolManager manager;
+ // The sessionWireformat used by the session
+ // this object is meant to be used per thread / session
+ // so we make a new one per AMQSession
+ private final OpenWireMessageConverter converter;
+
public AMQSession(ConnectionInfo connInfo,
SessionInfo sessInfo,
ActiveMQServer server,
@@ -90,10 +98,18 @@ public class AMQSession implements SessionCallback {
OpenWireProtocolManager manager) {
this.connInfo = connInfo;
this.sessInfo = sessInfo;
+
this.server = server;
this.connection = connection;
this.scheduledPool = scheduledPool;
this.manager = manager;
+ OpenWireFormat marshaller = (OpenWireFormat) connection.getMarshaller();
+
+ this.converter = new OpenWireMessageConverter(marshaller.copy());
+ }
+
+ public OpenWireMessageConverter getConverter() {
+ return converter;
}
public void initialize() {
@@ -106,7 +122,7 @@ public class AMQSession implements SessionCallback {
// now
try {
- coreSession = (AMQServerSession) server.createSession(name, username, password, minLargeMessageSize, connection, true, false, false, false, null, this, new AMQServerSessionFactory(), true);
+ coreSession = (AMQServerSession) server.createSession(name, username, password, minLargeMessageSize, connection, true, false, false, false, null, this, AMQServerSessionFactory.getInstance(), true);
long sessionId = sessInfo.getSessionId().getValue();
if (sessionId == -1) {
@@ -119,7 +135,9 @@ public class AMQSession implements SessionCallback {
}
- public void createConsumer(ConsumerInfo info, AMQSession amqSession) throws Exception {
+ public List<AMQConsumer> createConsumer(ConsumerInfo info,
+ AMQSession amqSession,
+ SlowConsumerDetectionListener slowConsumerDetectionListener) throws Exception {
//check destination
ActiveMQDestination dest = info.getDestination();
ActiveMQDestination[] dests = null;
@@ -129,29 +147,46 @@ public class AMQSession implements SessionCallback {
else {
dests = new ActiveMQDestination[]{dest};
}
- Map<ActiveMQDestination, AMQConsumer> consumerMap = new HashMap<>();
- for (ActiveMQDestination d : dests) {
- if (d.isQueue()) {
- SimpleString queueName = OpenWireUtil.toCoreAddress(d);
+// Map<ActiveMQDestination, AMQConsumer> consumerMap = new HashMap<>();
+ List<AMQConsumer> consumersList = new java.util.LinkedList<>();
+
+ for (ActiveMQDestination openWireDest : dests) {
+ if (openWireDest.isQueue()) {
+ SimpleString queueName = OpenWireUtil.toCoreAddress(openWireDest);
getCoreServer().getJMSQueueCreator().create(queueName);
}
- AMQConsumer consumer = new AMQConsumer(this, d, info, scheduledPool);
- consumer.init();
- consumerMap.put(d, consumer);
+ AMQConsumer consumer = new AMQConsumer(this, openWireDest, info, scheduledPool);
+
+ consumer.init(slowConsumerDetectionListener, idGenerator.generateID());
+ consumersList.add(consumer);
consumers.put(consumer.getNativeId(), consumer);
}
- connection.addConsumerBrokerExchange(info.getConsumerId(), amqSession, consumerMap);
+
+ return consumersList;
+ }
+
+ public void start() {
coreSession.start();
started.set(true);
+
}
+ // rename actualDest to destination
@Override
public void afterDelivery() throws Exception {
}
@Override
+ public void browserFinished(ServerConsumer consumer) {
+ AMQConsumer theConsumer = ((AMQServerConsumer) consumer).getAmqConsumer();
+ if (theConsumer != null) {
+ theConsumer.browseFinished();
+ }
+ }
+
+ @Override
public boolean isWritable(ReadyListener callback) {
return connection.isWritable(callback);
}
@@ -197,49 +232,26 @@ public class AMQSession implements SessionCallback {
@Override
public boolean hasCredits(ServerConsumer consumerID) {
- AMQConsumer amqConsumer = consumers.get(consumerID.getID());
- return amqConsumer.hasCredits();
- }
-
- @Override
- public void disconnect(ServerConsumer consumerId, String queueName) {
- // TODO Auto-generated method stub
-
- }
-
- public AMQServerSession getCoreSession() {
- return this.coreSession;
- }
-
- public ActiveMQServer getCoreServer() {
- return this.server;
- }
- public void removeConsumer(long consumerId) throws Exception {
- boolean failed = !(this.txId != null || this.isTx);
+ AMQConsumer amqConsumer;
- coreSession.amqCloseConsumer(consumerId, failed);
- consumers.remove(consumerId);
- }
+ amqConsumer = consumers.get(consumerID.getID());
- public void createProducer(ProducerInfo info) throws Exception {
- AMQProducer producer = new AMQProducer(this, info);
- producer.init();
- producers.put(info.getProducerId().getValue(), producer);
+ if (amqConsumer != null) {
+ return amqConsumer.hasCredits();
+ }
+ return false;
}
- public void removeProducer(ProducerInfo info) {
- removeProducer(info.getProducerId());
- }
+ @Override
+ public void disconnect(ServerConsumer consumerId, String queueName) {
+ // TODO Auto-generated method stub
- public void removeProducer(ProducerId id) {
- producers.remove(id.getValue());
}
- public SendingResult send(AMQProducerBrokerExchange producerExchange,
- Message messageSend,
- boolean sendProducerAck) throws Exception {
- SendingResult result = new SendingResult();
+ public void send(final ProducerInfo producerInfo,
+ final Message messageSend,
+ boolean sendProducerAck) throws Exception {
TransactionId tid = messageSend.getTransactionId();
if (tid != null) {
resetSessionTx(tid);
@@ -251,41 +263,132 @@ public class AMQSession implements SessionCallback {
ActiveMQDestination[] actualDestinations = null;
if (destination.isComposite()) {
actualDestinations = destination.getCompositeDestinations();
+ messageSend.setOriginalDestination(destination);
}
else {
actualDestinations = new ActiveMQDestination[]{destination};
}
- for (ActiveMQDestination dest : actualDestinations) {
- ServerMessageImpl coreMsg = new ServerMessageImpl(-1, 1024);
+ ServerMessage originalCoreMsg = getConverter().inbound(messageSend);
- /* ActiveMQ failover transport will attempt to reconnect after connection failure. Any sent messages that did
- * not receive acks will be resent. (ActiveMQ broker handles this by returning a last sequence id received to
- * the client). To handle this in Artemis we use a duplicate ID cache. To do this we check to see if the
- * message comes from failover connection. If so we add a DUPLICATE_ID to handle duplicates after a resend. */
- if (producerExchange.getConnectionContext().isFaultTolerant() && !messageSend.getProperties().containsKey(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID)) {
- coreMsg.putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(), messageSend.getMessageId().toString());
+ /* ActiveMQ failover transport will attempt to reconnect after connection failure. Any sent messages that did
+ * not receive acks will be resent. (ActiveMQ broker handles this by returning a last sequence id received to
+ * the client). To handle this in Artemis we use a duplicate ID cache. To do this we check to see if the
+ * message comes from failover connection. If so we add a DUPLICATE_ID to handle duplicates after a resend. */
+ if (connection.getContext().isFaultTolerant() && !messageSend.getProperties().containsKey(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID)) {
+ originalCoreMsg.putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(), messageSend.getMessageId().toString());
+ }
+
+ Runnable runnable;
+
+ if (sendProducerAck) {
+ runnable = new Runnable() {
+ public void run() {
+ try {
+ ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
+ connection.dispatchSync(ack);
+ }
+ catch (Exception e) {
+ ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
+ connection.sendException(e);
+ }
+
+ }
+ };
+ }
+ else {
+ final Connection transportConnection = connection.getTransportConnection();
+
+ // new Exception("Setting to false").printStackTrace();
+
+ if (transportConnection == null) {
+ // I don't think this could happen, but just in case, avoiding races
+ runnable = null;
+ }
+ else {
+ runnable = new Runnable() {
+ public void run() {
+ transportConnection.setAutoRead(true);
+ }
+ };
}
- OpenWireMessageConverter.toCoreMessage(coreMsg, messageSend, connection.getMarshaller());
- SimpleString address = OpenWireUtil.toCoreAddress(dest);
- coreMsg.setAddress(address);
+ }
+
+ internalSend(actualDestinations, originalCoreMsg, runnable);
+ }
+
+ private void internalSend(ActiveMQDestination[] actualDestinations,
+ ServerMessage originalCoreMsg,
+ final Runnable onComplete) throws Exception {
+
+ Runnable runToUse;
+
+ if (actualDestinations.length <= 1 || onComplete == null) {
+ // if onComplete is null, this will be null ;)
+ runToUse = onComplete;
+ }
+ else {
+ final AtomicInteger count = new AtomicInteger(actualDestinations.length);
+ runToUse = new Runnable() {
+ @Override
+ public void run() {
+ if (count.decrementAndGet() == 0) {
+ onComplete.run();
+ }
+ }
+ };
+ }
+
+ SimpleString[] addresses = new SimpleString[actualDestinations.length];
+ PagingStore[] pagingStores = new PagingStore[actualDestinations.length];
+
+ // We fillup addresses, pagingStores and we will throw failure if that's the case
+ for (int i = 0; i < actualDestinations.length; i++) {
+ ActiveMQDestination dest = actualDestinations[i];
+ addresses[i] = OpenWireUtil.toCoreAddress(dest);
+ pagingStores[i] = server.getPagingManager().getPageStore(addresses[i]);
+ if (pagingStores[i].getAddressFullMessagePolicy() == AddressFullMessagePolicy.FAIL && pagingStores[i].isFull()) {
+ throw new ResourceAllocationException("Queue is full");
+ }
+ }
+
+ for (int i = 0; i < actualDestinations.length; i++) {
+
+ ServerMessage coreMsg = originalCoreMsg.copy();
+
+ coreMsg.setAddress(addresses[i]);
+
+ PagingStore store = pagingStores[i];
- PagingStoreImpl store = (PagingStoreImpl) server.getPagingManager().getPageStore(address);
if (store.isFull()) {
- result.setBlockNextSend(true);
- result.setBlockPagingStore(store);
- result.setBlockingAddress(address);
- //now we hold this message send until the store has space.
- //we do this by put it in a scheduled task
- ScheduledExecutorService scheduler = server.getScheduledPool();
- Runnable sendRetryTask = new SendRetryTask(coreMsg, producerExchange, sendProducerAck, messageSend.getSize(), messageSend.getCommandId());
- scheduler.schedule(sendRetryTask, 10, TimeUnit.MILLISECONDS);
+ connection.getTransportConnection().setAutoRead(false);
}
- else {
- coreSession.send(coreMsg, false);
+
+ getCoreSession().send(coreMsg, false);
+
+ if (runToUse != null) {
+ // if the timeout is >0, it will wait this much milliseconds
+ // before running the the runToUse
+ // this will eventually unblock blocked destinations
+ // playing flow control
+ store.checkMemory(runToUse);
}
}
- return result;
+ }
+
+ public AMQServerSession getCoreSession() {
+ return this.coreSession;
+ }
+
+ public ActiveMQServer getCoreServer() {
+ return this.server;
+ }
+
+ public void removeConsumer(long consumerId) throws Exception {
+ boolean failed = !(this.txId != null || this.isTx);
+
+ coreSession.amqCloseConsumer(consumerId, failed);
+ consumers.remove(consumerId);
}
public WireFormat getMarshaller() {
@@ -449,87 +552,17 @@ public class AMQSession implements SessionCallback {
return consumers.get(coreConsumerId);
}
- private class SendRetryTask implements Runnable {
-
- private ServerMessage coreMsg;
- private AMQProducerBrokerExchange producerExchange;
- private boolean sendProducerAck;
- private int msgSize;
- private int commandId;
-
- public SendRetryTask(ServerMessage coreMsg,
- AMQProducerBrokerExchange producerExchange,
- boolean sendProducerAck,
- int msgSize,
- int commandId) {
- this.coreMsg = coreMsg;
- this.producerExchange = producerExchange;
- this.sendProducerAck = sendProducerAck;
- this.msgSize = msgSize;
- this.commandId = commandId;
- }
-
- @Override
- public void run() {
- synchronized (AMQSession.this) {
- try {
- // check pageStore
- SimpleString address = coreMsg.getAddress();
- PagingStoreImpl store = (PagingStoreImpl) server.getPagingManager().getPageStore(address);
- if (store.isFull()) {
- // if store is still full, schedule another
- server.getScheduledPool().schedule(this, 10, TimeUnit.MILLISECONDS);
- }
- else {
- // now send the message again.
- coreSession.send(coreMsg, false);
-
- if (sendProducerAck) {
- ProducerInfo producerInfo = producerExchange.getProducerState().getInfo();
- ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), msgSize);
- connection.dispatchAsync(ack);
- }
- else {
- Response response = new Response();
- response.setCorrelationId(commandId);
- connection.dispatchAsync(response);
- }
- }
- }
- catch (Exception e) {
- ExceptionResponse response = new ExceptionResponse(e);
- response.setCorrelationId(commandId);
- connection.dispatchAsync(response);
- }
+ public void updateConsumerPrefetchSize(ConsumerId consumerId, int prefetch) {
+ Iterator<AMQConsumer> iterator = consumers.values().iterator();
+ while (iterator.hasNext()) {
+ AMQConsumer consumer = iterator.next();
+ if (consumer.getId().equals(consumerId)) {
+ consumer.setPrefetchSize(prefetch);
}
-
}
}
- public void blockingWaitForSpace(AMQProducerBrokerExchange producerExchange,
- SendingResult result) throws IOException {
- long start = System.currentTimeMillis();
- long nextWarn = start;
- producerExchange.blockingOnFlowControl(true);
-
- AMQConnectionContext context = producerExchange.getConnectionContext();
- PagingStoreImpl store = result.getBlockPagingStore();
-
- //Destination.DEFAULT_BLOCKED_PRODUCER_WARNING_INTERVAL
- long blockedProducerWarningInterval = 30000;
- ProducerId producerId = producerExchange.getProducerState().getInfo().getProducerId();
-
- while (store.isFull()) {
- if (context.getStopping().get()) {
- throw new IOException("Connection closed, send aborted.");
- }
-
- long now = System.currentTimeMillis();
- if (now >= nextWarn) {
- ActiveMQServerLogger.LOGGER.memoryLimitReached(producerId.toString(), result.getBlockingAddress().toString(), ((now - start) / 1000));
- nextWarn = now + blockedProducerWarningInterval;
- }
- }
- producerExchange.blockingOnFlowControl(false);
+ public OpenWireConnection getConnection() {
+ return connection;
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/BrowserListener.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/BrowserListener.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/BrowserListener.java
deleted file mode 100644
index 0e192db..0000000
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/BrowserListener.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.protocol.openwire.amq;
-
-interface BrowserListener {
-
- void browseFinished();
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java
new file mode 100644
index 0000000..1c64676
--- /dev/null
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.protocol.openwire.util;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.server.ServerMessage;
+import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.command.XATransactionId;
+import org.apache.activemq.util.ByteSequence;
+
+public class OpenWireUtil {
+
+ public static ActiveMQBuffer toActiveMQBuffer(ByteSequence bytes) {
+ ActiveMQBuffer buffer = ActiveMQBuffers.fixedBuffer(bytes.length);
+
+ buffer.writeBytes(bytes.data, bytes.offset, bytes.length);
+ return buffer;
+ }
+
+ public static SimpleString toCoreAddress(ActiveMQDestination dest) {
+ if (dest.isQueue()) {
+ return new SimpleString("jms.queue." + dest.getPhysicalName());
+ }
+ else {
+ return new SimpleString("jms.topic." + dest.getPhysicalName());
+ }
+ }
+
+ /**
+ * We convert the core address to an ActiveMQ Destination. We use the actual address on the message rather than the
+ * destination set on the consumer because it maybe different and the JMS spec says that it should be what ever was
+ * set on publish/send so a divert or wildcard may mean thats its different to the destination subscribed to by the
+ * consumer
+ */
+ public static ActiveMQDestination toAMQAddress(ServerMessage message, ActiveMQDestination actualDestination) {
+ String address = message.getAddress().toString();
+ String strippedAddress = address.replace("jms.queue.", "").replace("jms.topic.", "");
+ if (actualDestination.isQueue()) {
+ return new ActiveMQQueue(strippedAddress);
+ }
+ else {
+ return new ActiveMQTopic(strippedAddress);
+ }
+ }
+
+ /*
+ *This util converts amq wildcards to compatible core wildcards
+ *The conversion is like this:
+ *AMQ * wildcard --> Core * wildcard (no conversion)
+ *AMQ > wildcard --> Core # wildcard
+ */
+ public static String convertWildcard(String physicalName) {
+ return physicalName.replaceAll("(\\.>)+", ".#");
+ }
+
+ public static XidImpl toXID(TransactionId xaXid) {
+ return toXID((XATransactionId)xaXid);
+ }
+
+ public static XidImpl toXID(XATransactionId xaXid) {
+ return new XidImpl(xaXid.getBranchQualifier(), xaXid.getFormatId(), xaXid.getGlobalTransactionId());
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
index e94e0bc..a6cbe71 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
@@ -113,6 +113,11 @@ public class StompSession implements SessionCallback {
}
@Override
+ public void browserFinished(ServerConsumer consumer) {
+
+ }
+
+ @Override
public int sendMessage(ServerMessage serverMessage, final ServerConsumer consumer, int deliveryCount) {
LargeServerMessageImpl largeMessage = null;
ServerMessage newServerMessage = serverMessage;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
index d52f53f..4a24b57 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
@@ -947,4 +947,8 @@ public interface Configuration {
StoreConfiguration getStoreConfiguration();
Configuration setStoreConfiguration(StoreConfiguration storeConfiguration);
+
+ /** It will return all the connectors in a toString manner for debug purposes. */
+ String debugConnectors();
+
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
index 7784a01..1a9690f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
@@ -21,7 +21,9 @@ import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
+import java.io.PrintWriter;
import java.io.Serializable;
+import java.io.StringWriter;
import java.lang.reflect.Array;
import java.net.URI;
import java.security.AccessController;
@@ -1299,6 +1301,8 @@ public class ConfigurationImpl implements Configuration, Serializable {
public TransportConfiguration[] getTransportConfigurations(final List<String> connectorNames) {
TransportConfiguration[] tcConfigs = (TransportConfiguration[]) Array.newInstance(TransportConfiguration.class, connectorNames.size());
int count = 0;
+ System.out.println(debugConnectors());
+
for (String connectorName : connectorNames) {
TransportConfiguration connector = getConnectorConfigurations().get(connectorName);
@@ -1314,6 +1318,21 @@ public class ConfigurationImpl implements Configuration, Serializable {
return tcConfigs;
}
+ public String debugConnectors() {
+ StringWriter stringWriter = new StringWriter();
+ PrintWriter writer = new PrintWriter(stringWriter);
+
+
+ for (Map.Entry<String, TransportConfiguration> connector : getConnectorConfigurations().entrySet()) {
+ writer.println("Connector::" + connector.getKey() + " value = " + connector.getValue());
+ }
+
+ writer.close();
+
+ return stringWriter.toString();
+
+ }
+
@Override
public boolean isResolveProtocols() {
return resolveProtocols;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
index e831966..566b91a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
@@ -126,6 +126,8 @@ public interface PagingStore extends ActiveMQComponent {
boolean checkMemory(Runnable runnable);
+ boolean isFull();
+
/**
* Write lock the PagingStore.
*
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
index 0b74fd7..c05a288 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
@@ -98,6 +98,10 @@ public final class CoreSessionCallback implements SessionCallback {
channel.send(packet);
}
+ @Override
+ public void browserFinished(ServerConsumer consumer) {
+
+ }
@Override
public void afterDelivery() throws Exception {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java
index 70d6289..db61f89 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java
@@ -138,6 +138,12 @@ public class InVMConnection implements Connection {
}
@Override
+ public void setAutoRead(boolean autoRead) {
+ // nothing to be done on the INVM.
+ // maybe we could eventually implement something, but not needed now
+ }
+
+ @Override
public ActiveMQBuffer createTransportBuffer(final int size) {
return ActiveMQBuffers.dynamicBuffer(size);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
index efbc1ea..34cc8cf 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
@@ -169,6 +169,8 @@ public class NettyAcceptor extends AbstractAcceptor {
private final long connectionsAllowed;
+ private Map<String, Object> extraConfigs;
+
public NettyAcceptor(final String name,
final ClusterConnection clusterConnection,
final Map<String, Object> configuration,
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
index d2cde4b..795bbb5 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
@@ -146,7 +146,7 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
this.flushExecutor = flushExecutor;
ActiveMQServerLogger.LOGGER.addingProtocolSupport(coreProtocolManagerFactory.getProtocols()[0], coreProtocolManagerFactory.getModuleName());
-// this.protocolMap.put(coreProtocolManagerFactory.getProtocols()[0], coreProtocolManagerFactory.createProtocolManager(server, coreProtocolManagerFactory.filterInterceptors(incomingInterceptors), coreProtocolManagerFactory.filterInterceptors(outgoingInterceptors)));
+ // this.protocolMap.put(coreProtocolManagerFactory.getProtocols()[0], coreProtocolManagerFactory.createProtocolManager(server, coreProtocolManagerFactory.filterInterceptors(incomingInterceptors), coreProtocolManagerFactory.filterInterceptors(outgoingInterceptors)));
this.protocolMap.put(coreProtocolManagerFactory.getProtocols()[0], coreProtocolManagerFactory);
if (config.isResolveProtocols()) {
@@ -206,8 +206,8 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
@Override
public ThreadFactory run() {
return new ActiveMQThreadFactory("ActiveMQ-remoting-threads-" + server.toString() +
- "-" +
- System.identityHashCode(this), false, Thread.currentThread().getContextClassLoader());
+ "-" +
+ System.identityHashCode(this), false, Thread.currentThread().getContextClassLoader());
}
});
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
index e3c1b2a..64633bb 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
@@ -243,6 +243,10 @@ public interface ActiveMQServer extends ActiveMQComponent {
Queue locateQueue(SimpleString queueName);
+ BindingQueryResult bindingQuery(SimpleString address) throws Exception;
+
+ QueueQueryResult queueQuery(SimpleString name) throws Exception;
+
void destroyQueue(SimpleString queueName) throws Exception;
void destroyQueue(SimpleString queueName, SecurityAuth session) throws Exception;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java
index 6045e2c..d75efdd 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java
@@ -25,6 +25,12 @@ import org.apache.activemq.artemis.core.transaction.Transaction;
*/
public interface ServerConsumer extends Consumer {
+ void setlowConsumerDetection(SlowConsumerDetectionListener listener);
+
+ SlowConsumerDetectionListener getSlowConsumerDetecion();
+
+ void fireSlowConsumer();
+
/**
* @param protocolContext
* @see #getProtocolContext()
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/SlowConsumerDetectionListener.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/SlowConsumerDetectionListener.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/SlowConsumerDetectionListener.java
new file mode 100644
index 0000000..0c60f25
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/SlowConsumerDetectionListener.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.server;
+
+public interface SlowConsumerDetectionListener {
+ void onSlowConsumer(ServerConsumer consumer);
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/embedded/EmbeddedActiveMQ.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/embedded/EmbeddedActiveMQ.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/embedded/EmbeddedActiveMQ.java
index ef384e0..e3a583f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/embedded/EmbeddedActiveMQ.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/embedded/EmbeddedActiveMQ.java
@@ -69,6 +69,11 @@ public class EmbeddedActiveMQ {
* @return
*/
public boolean waitClusterForming(long timeWait, TimeUnit unit, int iterations, int servers) throws Exception {
+ if (activeMQServer.getClusterManager().getClusterConnections() == null ||
+ activeMQServer.getClusterManager().getClusterConnections().size() == 0) {
+ return servers == 0;
+ }
+
for (int i = 0; i < iterations; i++) {
for (ClusterConnection connection : activeMQServer.getClusterManager().getClusterConnections()) {
if (connection.getTopology().getMembers().size() == servers) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 7554127..13a1283 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -45,6 +45,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl;
import org.apache.activemq.artemis.core.config.BridgeConfiguration;
import org.apache.activemq.artemis.core.config.Configuration;
@@ -76,6 +77,8 @@ import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageM
import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
import org.apache.activemq.artemis.core.postoffice.Binding;
+import org.apache.activemq.artemis.core.postoffice.BindingType;
+import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.postoffice.impl.DivertBinding;
@@ -97,6 +100,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.Bindable;
+import org.apache.activemq.artemis.core.server.BindingQueryResult;
import org.apache.activemq.artemis.core.server.Divert;
import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
@@ -105,6 +109,7 @@ import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueCreator;
import org.apache.activemq.artemis.core.server.QueueFactory;
+import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.core.server.SecuritySettingPlugin;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.ServerSessionFactory;
@@ -545,6 +550,72 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
@Override
+ public BindingQueryResult bindingQuery(SimpleString address) throws Exception {
+ if (address == null) {
+ throw ActiveMQMessageBundle.BUNDLE.addressIsNull();
+ }
+
+ boolean autoCreateJmsQueues = address.toString().startsWith(ResourceNames.JMS_QUEUE) && getAddressSettingsRepository().getMatch(address.toString()).isAutoCreateJmsQueues();
+
+ List<SimpleString> names = new ArrayList<>();
+
+ // make an exception for the management address (see HORNETQ-29)
+ ManagementService managementService = getManagementService();
+ if (managementService != null) {
+ if (address.equals(managementService.getManagementAddress())) {
+ return new BindingQueryResult(true, names, autoCreateJmsQueues);
+ }
+ }
+
+ Bindings bindings = getPostOffice().getMatchingBindings(address);
+
+ for (Binding binding : bindings.getBindings()) {
+ if (binding.getType() == BindingType.LOCAL_QUEUE || binding.getType() == BindingType.REMOTE_QUEUE) {
+ names.add(binding.getUniqueName());
+ }
+ }
+
+ return new BindingQueryResult(!names.isEmpty(), names, autoCreateJmsQueues);
+ }
+
+ @Override
+ public QueueQueryResult queueQuery(SimpleString name) {
+ if (name == null) {
+ throw ActiveMQMessageBundle.BUNDLE.queueNameIsNull();
+ }
+
+ boolean autoCreateJmsQueues = name.toString().startsWith(ResourceNames.JMS_QUEUE) && getAddressSettingsRepository().getMatch(name.toString()).isAutoCreateJmsQueues();
+
+ QueueQueryResult response;
+
+ Binding binding = getPostOffice().getBinding(name);
+
+ SimpleString managementAddress = getManagementService() != null ? getManagementService().getManagementAddress() : null;
+
+ if (binding != null && binding.getType() == BindingType.LOCAL_QUEUE) {
+ Queue queue = (Queue) binding.getBindable();
+
+ Filter filter = queue.getFilter();
+
+ SimpleString filterString = filter == null ? null : filter.getFilterString();
+
+ response = new QueueQueryResult(name, binding.getAddress(), queue.isDurable(), queue.isTemporary(), filterString, queue.getConsumerCount(), queue.getMessageCount(), autoCreateJmsQueues);
+ }
+ // make an exception for the management address (see HORNETQ-29)
+ else if (name.equals(managementAddress)) {
+ response = new QueueQueryResult(name, managementAddress, true, false, null, -1, -1, autoCreateJmsQueues);
+ }
+ else if (autoCreateJmsQueues) {
+ response = new QueueQueryResult(name, name, true, false, null, 0, 0, true, false);
+ }
+ else {
+ response = new QueueQueryResult(null, null, false, false, null, 0, 0, false, false);
+ }
+
+ return response;
+ }
+
+ @Override
public void threadDump() {
StringWriter str = new StringWriter();
PrintWriter out = new PrintWriter(str);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 8bf5d08..86ca36c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -2930,6 +2930,8 @@ public class QueueImpl implements Queue {
}
}
+ serverConsumer.fireSlowConsumer();
+
if (connection != null) {
ActiveMQServerLogger.LOGGER.slowConsumerDetected(serverConsumer.getSessionID(), serverConsumer.getID(), getName().toString(), connection.getRemoteAddress(), threshold, consumerRate);
if (policy.equals(SlowConsumerPolicy.KILL)) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index 422d324..14f22ed 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -50,6 +50,7 @@ import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.ServerSession;
+import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
import org.apache.activemq.artemis.core.server.management.ManagementService;
import org.apache.activemq.artemis.core.server.management.Notification;
import org.apache.activemq.artemis.core.transaction.Transaction;
@@ -90,6 +91,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
private final ActiveMQServer server;
+ private SlowConsumerDetectionListener slowConsumerListener;
+
/**
* We get a readLock when a message is handled, and return the readLock when the message is finally delivered
* When stopping the consumer we need to get a writeLock to make sure we had all delivery finished
@@ -228,6 +231,23 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
// ----------------------------------------------------------------------
@Override
+ public void setlowConsumerDetection(SlowConsumerDetectionListener listener) {
+ this.slowConsumerListener = listener;
+ }
+
+ @Override
+ public SlowConsumerDetectionListener getSlowConsumerDetecion() {
+ return slowConsumerListener;
+ }
+
+ @Override
+ public void fireSlowConsumer() {
+ if (slowConsumerListener != null) {
+ slowConsumerListener.onSlowConsumer(this);
+ }
+ }
+
+ @Override
public Object getProtocolContext() {
return protocolContext;
}
@@ -546,12 +566,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
}
else {
refs.add(ref);
- if (!failed) {
- // We don't decrement delivery count if the client failed, since there's a possibility that refs
- // were actually delivered but we just didn't get any acks for them
- // before failure
- ref.decrementDeliveryCount();
- }
+ updateDeliveryCountForCanceledRef(ref, failed);
}
if (isTrace) {
@@ -566,6 +581,15 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
return refs;
}
+ protected void updateDeliveryCountForCanceledRef(MessageReference ref, boolean failed) {
+ if (!failed) {
+ // We don't decrement delivery count if the client failed, since there's a possibility that refs
+ // were actually delivered but we just didn't get any acks for them
+ // before failure
+ ref.decrementDeliveryCount();
+ }
+ }
+
@Override
public void setStarted(final boolean started) {
synchronized (lock) {
@@ -1191,6 +1215,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
ref = null;
synchronized (messageQueue) {
if (!iterator.hasNext()) {
+ callback.browserFinished(ServerConsumerImpl.this);
break;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index d628bde..77705fa 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -50,7 +50,6 @@ import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.BindingType;
-import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.remoting.CloseListener;
@@ -623,63 +622,12 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
@Override
public QueueQueryResult executeQueueQuery(final SimpleString name) throws Exception {
- if (name == null) {
- throw ActiveMQMessageBundle.BUNDLE.queueNameIsNull();
- }
-
- boolean autoCreateJmsQueues = name.toString().startsWith(ResourceNames.JMS_QUEUE) && server.getAddressSettingsRepository().getMatch(name.toString()).isAutoCreateJmsQueues();
-
- QueueQueryResult response;
-
- Binding binding = postOffice.getBinding(name);
-
- if (binding != null && binding.getType() == BindingType.LOCAL_QUEUE) {
- Queue queue = (Queue) binding.getBindable();
-
- Filter filter = queue.getFilter();
-
- SimpleString filterString = filter == null ? null : filter.getFilterString();
-
- response = new QueueQueryResult(name, binding.getAddress(), queue.isDurable(), queue.isTemporary(), filterString, queue.getConsumerCount(), queue.getMessageCount(), autoCreateJmsQueues);
- }
- // make an exception for the management address (see HORNETQ-29)
- else if (name.equals(managementAddress)) {
- response = new QueueQueryResult(name, managementAddress, true, false, null, -1, -1, autoCreateJmsQueues);
- }
- else if (autoCreateJmsQueues) {
- response = new QueueQueryResult(name, name, true, false, null, 0, 0, true, false);
- }
- else {
- response = new QueueQueryResult(null, null, false, false, null, 0, 0, false, false);
- }
-
- return response;
+ return server.queueQuery(name);
}
@Override
public BindingQueryResult executeBindingQuery(final SimpleString address) throws Exception {
- if (address == null) {
- throw ActiveMQMessageBundle.BUNDLE.addressIsNull();
- }
-
- boolean autoCreateJmsQueues = address.toString().startsWith(ResourceNames.JMS_QUEUE) && server.getAddressSettingsRepository().getMatch(address.toString()).isAutoCreateJmsQueues();
-
- List<SimpleString> names = new ArrayList<>();
-
- // make an exception for the management address (see HORNETQ-29)
- if (address.equals(managementAddress)) {
- return new BindingQueryResult(true, names, autoCreateJmsQueues);
- }
-
- Bindings bindings = postOffice.getMatchingBindings(address);
-
- for (Binding binding : bindings.getBindings()) {
- if (binding.getType() == BindingType.LOCAL_QUEUE || binding.getType() == BindingType.REMOTE_QUEUE) {
- names.add(binding.getUniqueName());
- }
- }
-
- return new BindingQueryResult(!names.isEmpty(), names, autoCreateJmsQueues);
+ return server.bindingQuery(address);
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
index 3309fab..4b53ec6 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
@@ -70,6 +70,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
public static final SlowConsumerPolicy DEFAULT_SLOW_CONSUMER_POLICY = SlowConsumerPolicy.NOTIFY;
+ public static final int DEFAULT_QUEUE_PREFETCH = 1000;
+
private AddressFullMessagePolicy addressFullMessagePolicy = null;
private Long maxSizeBytes = null;
@@ -114,6 +116,10 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
private Integer managementBrowsePageSize = AddressSettings.MANAGEMENT_BROWSE_PAGE_SIZE;
+ //from amq5
+ //make it transient
+ private transient Integer queuePrefetch = null;
+
public AddressSettings(AddressSettings other) {
this.addressFullMessagePolicy = other.addressFullMessagePolicy;
this.maxSizeBytes = other.maxSizeBytes;
@@ -137,6 +143,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
this.autoCreateJmsQueues = other.autoCreateJmsQueues;
this.autoDeleteJmsQueues = other.autoDeleteJmsQueues;
this.managementBrowsePageSize = other.managementBrowsePageSize;
+ this.queuePrefetch = other.queuePrefetch;
}
public AddressSettings() {
@@ -333,6 +340,15 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
return this;
}
+ public int getQueuePrefetch() {
+ return queuePrefetch != null ? queuePrefetch : AddressSettings.DEFAULT_QUEUE_PREFETCH;
+ }
+
+ public AddressSettings setQueuePrefetch(int queuePrefetch) {
+ this.queuePrefetch = queuePrefetch;
+ return this;
+ }
+
/**
* merge 2 objects in to 1
*
@@ -403,6 +419,9 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
if (managementBrowsePageSize == null) {
managementBrowsePageSize = merged.managementBrowsePageSize;
}
+ if (queuePrefetch == null) {
+ queuePrefetch = merged.queuePrefetch;
+ }
}
@Override
@@ -569,6 +588,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
result = prime * result + ((autoCreateJmsQueues == null) ? 0 : autoCreateJmsQueues.hashCode());
result = prime * result + ((autoDeleteJmsQueues == null) ? 0 : autoDeleteJmsQueues.hashCode());
result = prime * result + ((managementBrowsePageSize == null) ? 0 : managementBrowsePageSize.hashCode());
+ result = prime * result + ((queuePrefetch == null) ? 0 : queuePrefetch.hashCode());
return result;
}
@@ -718,6 +738,12 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
}
else if (!managementBrowsePageSize.equals(other.managementBrowsePageSize))
return false;
+ if (queuePrefetch == null) {
+ if (other.queuePrefetch != null)
+ return false;
+ }
+ else if (!queuePrefetch.equals(other.queuePrefetch))
+ return false;
return true;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
index 4b27bc4..a9eb0f2 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
@@ -50,4 +50,7 @@ public interface SessionCallback {
void disconnect(ServerConsumer consumerId, String queueName);
boolean isWritable(ReadyListener callback);
+
+ /** Some protocols (Openwire) needs a special message with the browser is finished. */
+ void browserFinished(ServerConsumer consumer);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ThreadLeakCheckRule.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ThreadLeakCheckRule.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ThreadLeakCheckRule.java
index 846d31b..232d3ae 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ThreadLeakCheckRule.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ThreadLeakCheckRule.java
@@ -17,7 +17,9 @@
package org.apache.activemq.artemis.tests.util;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
import org.junit.Assert;
import org.junit.rules.ExternalResource;
@@ -26,6 +28,7 @@ import org.junit.rules.ExternalResource;
* This is useful to make sure you won't have leaking threads between tests
*/
public class ThreadLeakCheckRule extends ExternalResource {
+ private static Set<String> extraThreads = new HashSet<String>();
boolean enabled = true;
@@ -94,6 +97,11 @@ public class ThreadLeakCheckRule extends ExternalResource {
}
+ public static void addExtraThreads(String... threads) {
+ for (String th : threads) {
+ extraThreads.add(th);
+ }
+ }
private boolean checkThread() {
boolean failedThread = false;
@@ -183,6 +191,9 @@ public class ThreadLeakCheckRule extends ExternalResource {
// Static workers used by MQTT client.
return true;
}
+ else if (extraThreads.contains(threadName)) {
+ return true;
+ }
else {
for (StackTraceElement element : thread.getStackTrace()) {
if (element.getClassName().contains("org.jboss.byteman.agent.TransformListener")) {
@@ -194,4 +205,7 @@ public class ThreadLeakCheckRule extends ExternalResource {
}
+ public static void clearExtraThreads() {
+ extraThreads.clear();
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java
index c57845d..8a64a85 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java
@@ -116,7 +116,7 @@ public class JmsRollbackRedeliveryTest {
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(destinationName);
MessageConsumer consumer = session.createConsumer(destination);
- TextMessage msg = (TextMessage) consumer.receive(5000);
+ TextMessage msg = (TextMessage) consumer.receive(6000000);
if (msg != null) {
if (rolledback.put(msg.getText(), Boolean.TRUE) != null) {
LOG.info("Received message " + msg.getText() + " (" + received.getAndIncrement() + ")" + msg.getJMSMessageID());
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/SoWriteTimeoutClientTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/SoWriteTimeoutClientTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/SoWriteTimeoutClientTest.java
index 0b62b31..48c36cf 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/SoWriteTimeoutClientTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/SoWriteTimeoutClientTest.java
@@ -54,12 +54,12 @@ public class SoWriteTimeoutClientTest extends OpenwireArtemisBaseTest {
@BeforeClass
public static void beforeTest() throws Exception {
//this thread keeps alive in original test too. Exclude it.
- ThreadLeakCheckRule.addKownThread("WriteTimeoutFilter-Timeout");
+ ThreadLeakCheckRule.addExtraThreads("WriteTimeoutFilter-Timeout-1");
}
@AfterClass
public static void afterTest() throws Exception {
- ThreadLeakCheckRule.removeKownThread("WriteTimeoutFilter-Timeout");
+ ThreadLeakCheckRule.clearExtraThreads();
}
@Before
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java
index 40cbccb..e44a490 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.fail;
import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -38,7 +39,10 @@ import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection;
+import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext;
+import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl;
import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest;
import org.jboss.byteman.contrib.bmunit.BMRule;
@@ -83,8 +87,8 @@ public class FailoverConsumerOutstandingCommitTest extends OpenwireArtemisBaseTe
targetLocation = "ENTRY",
action = "org.apache.activemq.transport.failover.FailoverConsumerOutstandingCommitTest.holdResponse($0)"), @BMRule(
name = "stop broker before commit",
- targetClass = "org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection$CommandProcessor",
- targetMethod = "processCommitTransactionOnePhase",
+ targetClass = "org.apache.activemq.artemis.core.server.impl.ServerSessionImpl",
+ targetMethod = "commit",
targetLocation = "ENTRY",
action = "org.apache.activemq.transport.failover.FailoverConsumerOutstandingCommitTest.stopServerInTransaction()"),})
public void testFailoverConsumerDups() throws Exception {
@@ -177,10 +181,10 @@ public class FailoverConsumerOutstandingCommitTest extends OpenwireArtemisBaseTe
@BMRule(
name = "stop broker before commit",
- targetClass = "org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection$CommandProcessor",
- targetMethod = "processCommitTransactionOnePhase",
+ targetClass = "org.apache.activemq.artemis.core.server.impl.ServerSessionImpl",
+ targetMethod = "commit",
targetLocation = "ENTRY",
- action = "org.apache.activemq.transport.failover.FailoverConsumerOutstandingCommitTest.stopServerInTransaction();return null")})
+ action = "org.apache.activemq.transport.failover.FailoverConsumerOutstandingCommitTest.stopServerInTransaction();return")})
public void TestFailoverConsumerOutstandingSendTxIncomplete() throws Exception {
doTestFailoverConsumerOutstandingSendTx(false);
}
@@ -194,8 +198,8 @@ public class FailoverConsumerOutstandingCommitTest extends OpenwireArtemisBaseTe
targetLocation = "ENTRY",
action = "org.apache.activemq.transport.failover.FailoverConsumerOutstandingCommitTest.holdResponse($0)"), @BMRule(
name = "stop broker after commit",
- targetClass = "org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection$CommandProcessor",
- targetMethod = "processCommitTransactionOnePhase",
+ targetClass = "org.apache.activemq.artemis.core.server.impl.ServerSessionImpl",
+ targetMethod = "commit",
targetLocation = "AT EXIT",
action = "org.apache.activemq.transport.failover.FailoverConsumerOutstandingCommitTest.stopServerInTransaction()")})
public void TestFailoverConsumerOutstandingSendTxComplete() throws Exception {
@@ -232,13 +236,11 @@ public class FailoverConsumerOutstandingCommitTest extends OpenwireArtemisBaseTe
testConsumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
- LOG.info("consume one: " + message);
+ LOG.info("consume one and commit: " + message);
assertNotNull("got message", message);
receivedMessages.add((TextMessage) message);
try {
- LOG.info("send one");
produceMessage(consumerSession, signalDestination, 1);
- LOG.info("commit session");
consumerSession.commit();
}
catch (JMSException e) {
@@ -270,8 +272,8 @@ public class FailoverConsumerOutstandingCommitTest extends OpenwireArtemisBaseTe
// will be stopped by the plugin
brokerStopLatch.await();
- doByteman.set(false);
server.stop();
+ doByteman.set(false);
server = createBroker();
server.start();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
index e704274..8403ee3 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
@@ -519,31 +519,31 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest {
Assert.assertTrue("connectionconsumer did not get a message", connectionConsumerGotOne.await(10, TimeUnit.SECONDS));
}
-// @Test
-// @BMRules(
-// rules = {
-// @BMRule(
-// name = "set no return response and stop the broker",
-// targetClass = "org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection$CommandProcessor",
-// targetMethod = "processMessageAck",
-// targetLocation = "ENTRY",
-// action = "org.apache.activemq.transport.failover.FailoverTransactionTest.holdResponseAndStopBroker($0)")
-// }
-// )
-// public void testFailoverConsumerAckLost() throws Exception {
-// LOG.info(this + " running test testFailoverConsumerAckLost");
-// // as failure depends on hash order of state tracker recovery, do a few times
-// for (int i = 0; i < 3; i++) {
-// try {
-// LOG.info("Iteration: " + i);
-// doTestFailoverConsumerAckLost(i);
-// }
-// finally {
-// stopBroker();
-// }
-// }
-// }
-//
+ @Test
+ @BMRules(
+ rules = {
+ @BMRule(
+ name = "set no return response and stop the broker",
+ targetClass = "org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection$CommandProcessor",
+ targetMethod = "processMessageAck",
+ targetLocation = "ENTRY",
+ action = "org.apache.activemq.transport.failover.FailoverTransactionTest.holdResponseAndStopBroker($0)")
+ }
+ )
+ public void testFailoverConsumerAckLost() throws Exception {
+ LOG.info(this + " running test testFailoverConsumerAckLost");
+ // as failure depends on hash order of state tracker recovery, do a few times
+ for (int i = 0; i < 3; i++) {
+ try {
+ LOG.info("Iteration: " + i);
+ doTestFailoverConsumerAckLost(i);
+ }
+ finally {
+ stopBroker();
+ }
+ }
+ }
+
@SuppressWarnings("unchecked")
public void doTestFailoverConsumerAckLost(final int pauseSeconds) throws Exception {
broker = createBroker();
@@ -567,12 +567,12 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest {
connection = cf.createConnection();
connection.start();
connections.add(connection);
- final Session consumerSession1 = connection.createSession(true, Session.SESSION_TRANSACTED);
+ final Session consumerSession1 = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
connection = cf.createConnection();
connection.start();
connections.add(connection);
- final Session consumerSession2 = connection.createSession(true, Session.SESSION_TRANSACTED);
+ final Session consumerSession2 = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
final MessageConsumer consumer1 = consumerSession1.createConsumer(destination);
final MessageConsumer consumer2 = consumerSession2.createConsumer(destination);
@@ -583,7 +583,7 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest {
final Vector<Message> receivedMessages = new Vector<>();
final CountDownLatch commitDoneLatch = new CountDownLatch(1);
final AtomicBoolean gotTransactionRolledBackException = new AtomicBoolean(false);
- Thread t = new Thread("doTestFailoverConsumerAckLost(" + pauseSeconds + ")") {
+ new Thread() {
public void run() {
LOG.info("doing async commit after consume...");
try {
@@ -630,16 +630,10 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest {
e.printStackTrace();
}
}
- };
- t.start();
+ }.start();
// will be stopped by the plugin
brokerStopLatch.await(60, TimeUnit.SECONDS);
- t.join(30000);
- if (t.isAlive()) {
- t.interrupt();
- Assert.fail("Thread " + t.getName() + " is still alive");
- }
broker = createBroker();
broker.start();
doByteman.set(false);
@@ -1062,10 +1056,8 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest {
new Thread() {
public void run() {
try {
- if (broker != null) {
- broker.stop();
- broker = null;
- }
+ broker.stop();
+ broker = null;
LOG.info("broker stopped.");
}
catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
index a3bae65..54ae6c8 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
@@ -50,10 +50,10 @@ import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveMessage;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
-import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerMessage;
+import org.apache.activemq.artemis.core.server.ServerSessionFactory;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.QueueFactoryImpl;
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
@@ -507,7 +507,7 @@ public class HangConsumerTest extends ActiveMQTestBase {
* @see SessionCallback#sendMessage(org.apache.activemq.artemis.core.server.ServerMessage, long, int)
*/
@Override
- public int sendMessage(MessageReference ref, ServerMessage message, ServerConsumer consumer, int deliveryCount) {
+ public int sendMessage(ServerMessage message, ServerConsumer consumer, int deliveryCount) {
inCall.countDown();
try {
callbackSemaphore.acquire();
@@ -518,7 +518,7 @@ public class HangConsumerTest extends ActiveMQTestBase {
}
try {
- return targetCallback.sendMessage(ref, message, consumer, deliveryCount);
+ return targetCallback.sendMessage(message, consumer, deliveryCount);
}
finally {
callbackSemaphore.release();
@@ -530,8 +530,8 @@ public class HangConsumerTest extends ActiveMQTestBase {
* @see SessionCallback#sendLargeMessage(org.apache.activemq.artemis.core.server.ServerMessage, long, long, int)
*/
@Override
- public int sendLargeMessage(MessageReference reference, ServerMessage message, ServerConsumer consumer, long bodySize, int deliveryCount) {
- return targetCallback.sendLargeMessage(reference, message, consumer, bodySize, deliveryCount);
+ public int sendLargeMessage(ServerMessage message, ServerConsumer consumer, long bodySize, int deliveryCount) {
+ return targetCallback.sendLargeMessage(message, consumer, bodySize, deliveryCount);
}
/* (non-Javadoc)
@@ -581,6 +581,7 @@ public class HangConsumerTest extends ActiveMQTestBase {
String defaultAddress,
SessionCallback callback,
OperationContext context,
+ ServerSessionFactory sessionFactory,
boolean autoCreateQueue) throws Exception {
return new ServerSessionImpl(name, username, password, minLargeMessageSize, autoCommitSends, autoCommitAcks, preAcknowledge, getConfiguration().isPersistDeliveryCountBeforeDelivery(), xa, connection, getStorageManager(), getPostOffice(), getResourceManager(), getSecurityStore(), getManagementService(), this, getConfiguration().getManagementAddress(), defaultAddress == null ? null : new SimpleString(defaultAddress), new MyCallback(callback), context, null);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicSecurityTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicSecurityTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicSecurityTest.java
index 14cfee0..a1a5e38 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicSecurityTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicSecurityTest.java
@@ -26,7 +26,6 @@ import javax.jms.TemporaryQueue;
import javax.jms.TextMessage;
import org.apache.activemq.command.ActiveMQQueue;
-import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -119,7 +118,7 @@ public class BasicSecurityTest extends BasicOpenWireTest {
}
@Test
- public void testSendnReceiveAuthorization() throws Exception {
+ public void testSendnReceiveAuthorization() throws Exception {
Connection sendingConn = null;
Connection receivingConn = null;
@@ -153,18 +152,16 @@ public class BasicSecurityTest extends BasicOpenWireTest {
producer = sendingSession.createProducer(dest);
producer.send(message);
- MessageConsumer consumer;
+ MessageConsumer consumer = null;
try {
consumer = sendingSession.createConsumer(dest);
- Assert.fail("exception expected");
}
catch (JMSSecurityException e) {
- e.printStackTrace();
//expected
}
consumer = receivingSession.createConsumer(dest);
- TextMessage received = (TextMessage) consumer.receive(5000);
+ TextMessage received = (TextMessage) consumer.receive();
assertNotNull(received);
assertEquals("Hello World", received.getText());
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireUtilTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireUtilTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireUtilTest.java
index 69d9784..825b8b5 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireUtilTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireUtilTest.java
@@ -18,7 +18,7 @@ package org.apache.activemq.artemis.tests.integration.openwire;
import static org.junit.Assert.assertEquals;
-import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil;
+import org.apache.activemq.artemis.core.protocol.openwire.OpenWireUtil;
import org.junit.Test;
public class OpenWireUtilTest {