You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/03/15 21:22:30 UTC
[47/59] [abbrv] activemq-artemis git commit: producer refactor
producer refactor
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/27d6b97a
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/27d6b97a
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/27d6b97a
Branch: refs/heads/refactor-openwire
Commit: 27d6b97a4129bc505aa2e90e2b42133dcc5b954e
Parents: 45b8be6
Author: Clebert Suconic <cl...@apache.org>
Authored: Tue Mar 1 22:31:39 2016 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Tue Mar 15 16:21:23 2016 -0400
----------------------------------------------------------------------
.../remoting/impl/netty/NettyConnection.java | 4 +
.../artemis/spi/core/remoting/Connection.java | 6 +
.../protocol/openwire/OpenWireConnection.java | 163 ++++-------
.../openwire/OpenWireProtocolManager.java | 15 +-
.../core/protocol/openwire/SendingResult.java | 57 ----
.../core/protocol/openwire/amq/AMQSession.java | 281 +++++++++----------
.../openwire/impl/OpenWireServerCallback.java | 75 -----
.../artemis/core/paging/PagingStore.java | 2 +
.../core/remoting/impl/invm/InVMConnection.java | 6 +
.../FailoverConsumerOutstandingCommitTest.java | 25 +-
.../InvestigationOpenwireTest.java | 27 ++
.../storage/PersistMultiThreadTest.java | 4 +
12 files changed, 255 insertions(+), 410 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/27d6b97a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
index 9268699..3f10227 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
@@ -100,6 +100,10 @@ public class NettyConnection implements Connection {
}
// Connection implementation ----------------------------
+ @Override
+ public void setAutoRead(boolean autoRead) {
+ channel.config().setAutoRead(autoRead);
+ }
@Override
public synchronized boolean isWritable(ReadyListener callback) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/27d6b97a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java
index ed10113..4352d49 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java
@@ -44,6 +44,12 @@ public interface Connection {
void fireReady(boolean ready);
/**
+ * This will disable reading from the channel.
+ * This is basically the same as blocking the reading.
+ * */
+ void setAutoRead(boolean autoRead);
+
+ /**
* returns the unique id of this wire.
*
* @return the id
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/27d6b97a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index 0fd8dc2..1e1e953 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -19,7 +19,6 @@ package org.apache.activemq.artemis.core.protocol.openwire;
import javax.jms.InvalidClientIDException;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSSecurityException;
-import javax.jms.ResourceAllocationException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
@@ -270,22 +269,26 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
catch (Exception e) {
ActiveMQServerLogger.LOGGER.debug(e);
- Response resp;
- if (e instanceof ActiveMQSecurityException) {
- resp = new ExceptionResponse(new JMSSecurityException(e.getMessage()));
- }
- else if (e instanceof ActiveMQNonExistentQueueException) {
- resp = new ExceptionResponse(new InvalidDestinationException(e.getMessage()));
- }
- else {
- resp = new ExceptionResponse(e);
- }
- try {
- dispatch(resp);
- }
- catch (IOException e2) {
- ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e2);
- }
+ sendException(e);
+ }
+ }
+
+ public void sendException(Exception e) {
+ Response resp;
+ if (e instanceof ActiveMQSecurityException) {
+ resp = new ExceptionResponse(new JMSSecurityException(e.getMessage()));
+ }
+ else if (e instanceof ActiveMQNonExistentQueueException) {
+ resp = new ExceptionResponse(new InvalidDestinationException(e.getMessage()));
+ }
+ else {
+ resp = new ExceptionResponse(e);
+ }
+ try {
+ dispatch(resp);
+ }
+ catch (IOException e2) {
+ ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e2);
}
}
@@ -371,75 +374,18 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
}
- public void dispatchAsync(Command message) {
- if (!stopping.get()) {
- dispatchSync(message);
- }
- else {
- if (message.isMessageDispatch()) {
- MessageDispatch md = (MessageDispatch) message;
- TransmitCallback sub = md.getTransmitCallback();
- protocolManager.postProcessDispatch(md);
- if (sub != null) {
- sub.onFailure();
- }
- }
- }
- }
-
- public void dispatchSync(Command message) {
- try {
- processDispatch(message);
- }
- catch (IOException e) {
- serviceExceptionAsync(e);
- }
- }
-
- public void serviceExceptionAsync(final IOException e) {
- if (asyncException.compareAndSet(false, true)) {
- // TODO: Why this is not through an executor?
- new Thread("Async Exception Handler") {
- @Override
- public void run() {
- serviceException(e);
- }
- }.start();
- }
+ public void dispatchAsync(Command message) throws Exception {
+ dispatchSync(message);
}
- public void serviceException(Throwable e) {
- // are we a transport exception such as not being able to dispatch
- // synchronously to a transport
- if (e instanceof IOException) {
- serviceTransportException((IOException) e);
- }
- else if (!stopping.get() && !inServiceException) {
- inServiceException = true;
- try {
- ConnectionError ce = new ConnectionError();
- ce.setException(e);
- dispatchAsync(ce);
- }
- finally {
- inServiceException = false;
- }
- }
+ public void dispatchSync(Command message) throws Exception {
+ processDispatch(message);
}
- public void serviceTransportException(IOException e) {
- /*
- * deal with it later BrokerService bService =
- * connector.getBrokerService(); if (bService.isShutdownOnSlaveFailure())
- * { if (brokerInfo != null) { if (brokerInfo.isSlaveBroker()) {
- * LOG.error("Slave has exception: {} shutting down master now.",
- * e.getMessage(), e); try { doStop(); bService.stop(); } catch (Exception
- * ex) { LOG.warn("Failed to stop the master", ex); } } } } if
- * (!stopping.get() && !pendingStop) { transportException.set(e); if
- * (TRANSPORTLOG.isDebugEnabled()) { TRANSPORTLOG.debug(this + " failed: "
- * + e, e); } else if (TRANSPORTLOG.isWarnEnabled() && !expected(e)) {
- * TRANSPORTLOG.warn(this + " failed: " + e); } stopAsync(); }
- */
+ public void serviceException(Throwable e) throws Exception {
+ ConnectionError ce = new ConnectionError();
+ ce.setException(e);
+ dispatchAsync(ce);
}
protected void dispatch(Command command) throws IOException {
@@ -570,7 +516,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
}
}
- private void disconnect(ActiveMQException me, String reason, boolean fail) {
+ private void disconnect(ActiveMQException me, String reason, boolean fail) {
if (context == null || destroyed) {
return;
@@ -596,8 +542,12 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
if (command != null && command.isResponseRequired()) {
Response lastResponse = new Response();
lastResponse.setCorrelationId(command.getCommandId());
- dispatchSync(lastResponse);
- context.setDontSendReponse(true);
+ try {
+ dispatchSync(lastResponse);
+ }
+ catch (Throwable e) {
+ ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
+ }
}
}
@@ -632,12 +582,10 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
return this.context;
}
- public void updateClient(ConnectionControl control) {
- // if (!destroyed && context.isFaultTolerant()) {
+ public void updateClient(ConnectionControl control) throws Exception {
if (protocolManager.isUpdateClusterClients()) {
dispatchAsync(control);
}
- // }
}
public AMQConnectionContext initContext(ConnectionInfo info) {
@@ -1063,9 +1011,16 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
@Override
public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
- protocolManager.commitTransactionOnePhase(info);
- TransactionId txId = info.getTransactionId();
- txMap.remove(txId);
+ new Exception("commit").printStackTrace();
+ try {
+ protocolManager.commitTransactionOnePhase(info);
+ TransactionId txId = info.getTransactionId();
+ txMap.remove(txId);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ throw e;
+ }
return null;
}
@@ -1150,33 +1105,11 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
AMQSession session = getSession(producerId.getParentId());
- SendingResult result = session.send(producerExchange, messageSend, sendProducerAck);
- if (result.isBlockNextSend()) {
- if (!context.isNetworkConnection() && result.isSendFailIfNoSpace()) {
- // TODO see logging
- throw new ResourceAllocationException("Usage Manager Memory Limit reached. Stopping producer (" + producerId + ") to prevent flooding " + result.getBlockingAddress() + "." + " See http://activemq.apache.org/producer-flow-control.html for more info");
- }
-
- if (producerInfo.getWindowSize() > 0 || messageSend.isResponseRequired()) {
- //in that case don't send the response
- //this will force the client to wait until
- //the response is got.
- context.setDontSendReponse(true);
- }
- else {
- //hang the connection until the space is available
- session.blockingWaitForSpace(producerExchange, result);
- }
- }
- else if (sendProducerAck) {
- // TODO-now: send through OperationContext
- ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
- OpenWireConnection.this.dispatchAsync(ack);
- }
-
+ session.send(producerInfo, messageSend, sendProducerAck);
return null;
}
+
@Override
public Response processMessageAck(MessageAck ack) throws Exception {
AMQConsumerBrokerExchange consumerBrokerExchange = consumerExchanges.get(ack.getConsumerId());
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/27d6b97a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
index 51c4bec..7445960 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
@@ -40,6 +40,7 @@ import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQProducerBrokerE
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
@@ -192,7 +193,13 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
for (OpenWireConnection c : this.connections) {
ConnectionControl control = newConnectionControl();
- c.updateClient(control);
+ try {
+ c.updateClient(control);
+ }
+ catch (Exception e) {
+ ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
+ c.sendException(e);
+ }
}
}
@@ -365,7 +372,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
context.setProducerFlowControl(false);
AMQSession sess = context.getConnection().getAdvisorySession();
if (sess != null) {
- sess.send(producerExchange, advisoryMessage, false);
+ sess.send(producerExchange.getProducerState().getInfo(), advisoryMessage, false);
}
}
finally {
@@ -515,7 +522,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
server.destroyQueue(subQueueName);
}
- public void sendBrokerInfo(OpenWireConnection connection) {
+ public void sendBrokerInfo(OpenWireConnection connection) throws Exception {
BrokerInfo brokerInfo = new BrokerInfo();
brokerInfo.setBrokerName(server.getIdentity());
brokerInfo.setBrokerId(new BrokerId("" + server.getNodeID()));
@@ -525,7 +532,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
//cluster support yet to support
brokerInfo.setPeerBrokerInfos(null);
- connection.dispatchAsync(brokerInfo);
+ connection.dispatch(brokerInfo);
}
public void setRebalanceClusterClients(boolean rebalance) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/27d6b97a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/SendingResult.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/SendingResult.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/SendingResult.java
deleted file mode 100644
index 0e21ca4..0000000
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/SendingResult.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.protocol.openwire;
-
-import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.paging.impl.PagingStoreImpl;
-import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
-
-public class SendingResult {
-
- private boolean blockNextSend;
- private PagingStoreImpl blockPagingStore;
- private SimpleString blockingAddress;
-
- public void setBlockNextSend(boolean block) {
- this.blockNextSend = block;
- }
-
- public boolean isBlockNextSend() {
- return this.blockNextSend;
- }
-
- public void setBlockPagingStore(PagingStoreImpl store) {
- this.blockPagingStore = store;
- }
-
- public PagingStoreImpl getBlockPagingStore() {
- return this.blockPagingStore;
- }
-
- public void setBlockingAddress(SimpleString address) {
- this.blockingAddress = address;
- }
-
- public SimpleString getBlockingAddress() {
- return this.blockingAddress;
- }
-
- public boolean isSendFailIfNoSpace() {
- AddressFullMessagePolicy policy = this.blockPagingStore.getAddressFullMessagePolicy();
- return policy == AddressFullMessagePolicy.FAIL;
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/27d6b97a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index c787eda..6614f55 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -16,8 +16,8 @@
*/
package org.apache.activemq.artemis.core.protocol.openwire.amq;
+import javax.jms.ResourceAllocationException;
import javax.transaction.xa.Xid;
-import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -26,41 +26,38 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection;
+import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter;
+import org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManager;
+import org.apache.activemq.artemis.core.protocol.openwire.OpenWireUtil;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.apache.activemq.artemis.core.server.ServerConsumer;
+import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
+import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
+import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
+import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
+import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.command.ExceptionResponse;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.ProducerAck;
-import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.ProducerInfo;
-import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.TransactionInfo;
import org.apache.activemq.command.XATransactionId;
-import org.apache.activemq.artemis.core.paging.impl.PagingStoreImpl;
-import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection;
-import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter;
-import org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManager;
-import org.apache.activemq.artemis.core.protocol.openwire.OpenWireUtil;
-import org.apache.activemq.artemis.core.protocol.openwire.SendingResult;
-import org.apache.activemq.artemis.core.server.ActiveMQServer;
-import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
-import org.apache.activemq.artemis.core.server.ServerConsumer;
-import org.apache.activemq.artemis.core.server.ServerMessage;
-import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
-import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
-import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.wireformat.WireFormat;
@@ -106,6 +103,10 @@ public class AMQSession implements SessionCallback {
this.converter = new OpenWireMessageConverter(marshaller.copy());
}
+ public OpenWireMessageConverter getConverter() {
+ return converter;
+ }
+
public void initialize() {
String name = sessInfo.getSessionId().toString();
String username = connInfo.getUserName();
@@ -221,25 +222,10 @@ public class AMQSession implements SessionCallback {
}
- public AMQServerSession getCoreSession() {
- return this.coreSession;
- }
-
- public ActiveMQServer getCoreServer() {
- return this.server;
- }
-
- public void removeConsumer(long consumerId) throws Exception {
- boolean failed = !(this.txId != null || this.isTx);
-
- coreSession.amqCloseConsumer(consumerId, failed);
- consumers.remove(consumerId);
- }
- public SendingResult send(AMQProducerBrokerExchange producerExchange,
- Message messageSend,
- boolean sendProducerAck) throws Exception {
- SendingResult result = new SendingResult();
+ public void send(final ProducerInfo producerInfo,
+ final Message messageSend,
+ boolean sendProducerAck) throws Exception {
TransactionId tid = messageSend.getTransactionId();
if (tid != null) {
resetSessionTx(tid);
@@ -257,39 +243,128 @@ public class AMQSession implements SessionCallback {
actualDestinations = new ActiveMQDestination[]{destination};
}
- for (ActiveMQDestination dest : actualDestinations) {
+ ServerMessage originalCoreMsg = getConverter().inbound(messageSend);
+
+ /* ActiveMQ failover transport will attempt to reconnect after connection failure. Any sent messages that did
+ * not receive acks will be resent. (ActiveMQ broker handles this by returning a last sequence id received to
+ * the client). To handle this in Artemis we use a duplicate ID cache. To do this we check to see if the
+ * message comes from failover connection. If so we add a DUPLICATE_ID to handle duplicates after a resend. */
+ if (connection.getContext().isFaultTolerant() && !messageSend.getProperties().containsKey(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID)) {
+ originalCoreMsg.putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(), messageSend.getMessageId().toString());
+ }
+
+ Runnable runnable;
+
+ if (sendProducerAck) {
+ runnable = new Runnable() {
+ public void run() {
+ try {
+ ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
+ connection.dispatchSync(ack);
+ }
+ catch (Exception e) {
+ ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
+ connection.sendException(e);
+ }
+
+ }
+ };
+ }
+ else {
+ final Connection transportConnection = connection.getTransportConnection();
+
+// new Exception("Setting to false").printStackTrace();
+
+ if (transportConnection == null) {
+ // I don't think this could happen, but just in case, avoiding races
+ runnable = null;
+ }
+ else {
+ runnable = new Runnable() {
+ public void run() {
+ transportConnection.setAutoRead(true);
+ }
+ };
+ }
+ }
+
- ServerMessageImpl coreMsg = (ServerMessageImpl)converter.inbound(messageSend);
+ internalSend(actualDestinations, originalCoreMsg, runnable);
+ }
+
+ private void internalSend(ActiveMQDestination[] actualDestinations,
+ ServerMessage originalCoreMsg,
+ final Runnable onComplete) throws Exception {
+
+ Runnable runToUse;
+
+ if (actualDestinations.length <= 1 || onComplete == null) {
+ // if onComplete is null, this will be null ;)
+ runToUse = onComplete;
+ }
+ else {
+ final AtomicInteger count = new AtomicInteger(actualDestinations.length);
+ runToUse = new Runnable() {
+ @Override
+ public void run() {
+ if (count.decrementAndGet() == 0) {
+ onComplete.run();
+ }
+ }
+ };
+ }
- /* ActiveMQ failover transport will attempt to reconnect after connection failure. Any sent messages that did
- * not receive acks will be resent. (ActiveMQ broker handles this by returning a last sequence id received to
- * the client). To handle this in Artemis we use a duplicate ID cache. To do this we check to see if the
- * message comes from failover connection. If so we add a DUPLICATE_ID to handle duplicates after a resend. */
- if (producerExchange.getConnectionContext().isFaultTolerant() && !messageSend.getProperties().containsKey(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID)) {
- coreMsg.putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(), messageSend.getMessageId().toString());
+ SimpleString[] addresses = new SimpleString[actualDestinations.length];
+ PagingStore[] pagingStores = new PagingStore[actualDestinations.length];
+
+ // We fillup addresses, pagingStores and we will throw failure if that's the case
+ for (int i = 0; i < actualDestinations.length; i++) {
+ ActiveMQDestination dest = actualDestinations[i];
+ addresses[i] = OpenWireUtil.toCoreAddress(dest);
+ pagingStores[i] = server.getPagingManager().getPageStore(addresses[i]);
+ if (pagingStores[i].getAddressFullMessagePolicy() == AddressFullMessagePolicy.FAIL && pagingStores[i].isFull()) {
+ throw new ResourceAllocationException("Queue is full");
}
- SimpleString address = OpenWireUtil.toCoreAddress(dest);
- coreMsg.setAddress(address);
+ }
+
+
+ for (int i = 0; i < actualDestinations.length; i++) {
+
+ ServerMessage coreMsg = originalCoreMsg.copy();
- PagingStoreImpl store = (PagingStoreImpl) server.getPagingManager().getPageStore(address);
+ coreMsg.setAddress(addresses[i]);
+ PagingStore store = pagingStores[i];
- // TODO: Improve this, tested with ProducerFlowControlSendFailTest
if (store.isFull()) {
- result.setBlockNextSend(true);
- result.setBlockPagingStore(store);
- result.setBlockingAddress(address);
- //now we hold this message send until the store has space.
- //we do this by put it in a scheduled task
- ScheduledExecutorService scheduler = server.getScheduledPool();
- Runnable sendRetryTask = new SendRetryTask(coreMsg, producerExchange, sendProducerAck, messageSend.getSize(), messageSend.getCommandId());
- scheduler.schedule(sendRetryTask, 10, TimeUnit.MILLISECONDS);
+ connection.getTransportConnection().setAutoRead(false);
}
- else {
- coreSession.send(coreMsg, false);
+
+ getCoreSession().send(coreMsg, false);
+
+ if (runToUse != null) {
+ // if the timeout is >0, it will wait this much milliseconds
+ // before running the the runToUse
+ // this will eventually unblock blocked destinations
+ // playing flow control
+ store.checkMemory(runToUse);
}
}
- return result;
+ }
+
+ public AMQServerSession getCoreSession() {
+ return this.coreSession;
+ }
+
+ public ActiveMQServer getCoreServer() {
+ return this.server;
+ }
+
+ public void removeConsumer(long consumerId) throws Exception {
+ boolean failed = !(this.txId != null || this.isTx);
+
+ coreSession.amqCloseConsumer(consumerId, failed);
+ consumers.remove(consumerId);
}
public WireFormat getMarshaller() {
@@ -462,92 +537,4 @@ public class AMQSession implements SessionCallback {
}
}
}
-
- private class SendRetryTask implements Runnable {
-
- private ServerMessage coreMsg;
- private AMQProducerBrokerExchange producerExchange;
- private boolean sendProducerAck;
- private int msgSize;
- private int commandId;
-
- public SendRetryTask(ServerMessage coreMsg,
- AMQProducerBrokerExchange producerExchange,
- boolean sendProducerAck,
- int msgSize,
- int commandId) {
- this.coreMsg = coreMsg;
- this.producerExchange = producerExchange;
- this.sendProducerAck = sendProducerAck;
- this.msgSize = msgSize;
- this.commandId = commandId;
- }
-
- @Override
- public void run() {
- synchronized (AMQSession.this) {
- try {
- // check pageStore
- SimpleString address = coreMsg.getAddress();
- PagingStoreImpl store = (PagingStoreImpl) server.getPagingManager().getPageStore(address);
- if (store.isFull()) {
- // if store is still full, schedule another
- server.getScheduledPool().schedule(this, 10, TimeUnit.MILLISECONDS);
- }
- else {
- // now send the message again.
- coreSession.send(coreMsg, false);
-
- if (sendProducerAck) {
- ProducerInfo producerInfo = producerExchange.getProducerState().getInfo();
- ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), msgSize);
- connection.dispatchAsync(ack);
- }
- else {
- Response response = new Response();
- response.setCorrelationId(commandId);
- connection.dispatchAsync(response);
- }
- }
- }
- catch (Exception e) {
- ExceptionResponse response = new ExceptionResponse(e);
- response.setCorrelationId(commandId);
- connection.dispatchAsync(response);
- }
- }
-
- }
- }
-
- // TODO: remove this, we should do the same as we do on core for blocking
- public void blockingWaitForSpace(AMQProducerBrokerExchange producerExchange,
- SendingResult result) throws IOException {
-
-
- new Exception("blocking").printStackTrace();
- long start = System.currentTimeMillis();
- long nextWarn = start;
- producerExchange.blockingOnFlowControl(true);
-
- AMQConnectionContext context = producerExchange.getConnectionContext();
- PagingStoreImpl store = result.getBlockPagingStore();
-
- //Destination.DEFAULT_BLOCKED_PRODUCER_WARNING_INTERVAL
- long blockedProducerWarningInterval = 30000;
- ProducerId producerId = producerExchange.getProducerState().getInfo().getProducerId();
-
- while (store.isFull()) {
- if (context.getStopping().get()) {
- throw new IOException("Connection closed, send aborted.");
- }
-
- long now = System.currentTimeMillis();
- if (now >= nextWarn) {
- ActiveMQServerLogger.LOGGER.memoryLimitReached(producerId.toString(), result.getBlockingAddress().toString(), ((now - start) / 1000));
- nextWarn = now + blockedProducerWarningInterval;
- }
- }
- producerExchange.blockingOnFlowControl(false);
- }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/27d6b97a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/impl/OpenWireServerCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/impl/OpenWireServerCallback.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/impl/OpenWireServerCallback.java
deleted file mode 100644
index 8ab3815..0000000
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/impl/OpenWireServerCallback.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.artemis.core.protocol.openwire.impl;
-
-import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.server.ServerConsumer;
-import org.apache.activemq.artemis.core.server.ServerMessage;
-import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
-import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
-
-public class OpenWireServerCallback implements SessionCallback {
-
- @Override
- public boolean hasCredits(ServerConsumer consumerID) {
- return false;
- }
-
- @Override
- public void sendProducerCreditsMessage(int credits, SimpleString address) {
-
- }
-
- @Override
- public void sendProducerCreditsFailMessage(int credits, SimpleString address) {
-
- }
-
- @Override
- public int sendMessage(ServerMessage message, ServerConsumer consumerID, int deliveryCount) {
- return 0;
- }
-
- @Override
- public int sendLargeMessage(ServerMessage message, ServerConsumer consumerID, long bodySize, int deliveryCount) {
- return 0;
- }
-
- @Override
- public int sendLargeMessageContinuation(ServerConsumer consumerID,
- byte[] body,
- boolean continues,
- boolean requiresResponse) {
- return 0;
- }
-
- @Override
- public void closed() {
-
- }
-
- @Override
- public void disconnect(ServerConsumer consumerId, String queueName) {
-
- }
-
- @Override
- public boolean isWritable(ReadyListener callback) {
- return false;
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/27d6b97a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
index e831966..566b91a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
@@ -126,6 +126,8 @@ public interface PagingStore extends ActiveMQComponent {
boolean checkMemory(Runnable runnable);
+ boolean isFull();
+
/**
* Write lock the PagingStore.
*
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/27d6b97a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java
index 70d6289..db61f89 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java
@@ -138,6 +138,12 @@ public class InVMConnection implements Connection {
}
@Override
+ public void setAutoRead(boolean autoRead) {
+ // nothing to be done on the INVM.
+ // maybe we could eventually implement something, but not needed now
+ }
+
+ @Override
public ActiveMQBuffer createTransportBuffer(final int size) {
return ActiveMQBuffers.dynamicBuffer(size);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/27d6b97a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java
index 5a160ab..e44a490 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java
@@ -170,20 +170,21 @@ public class FailoverConsumerOutstandingCommitTest extends OpenwireArtemisBaseTe
@Test
@BMRules(
- rules = {@BMRule(
- name = "set no return response",
- targetClass = "org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection$CommandProcessor",
- targetMethod = "processCommitTransactionOnePhase",
- targetLocation = "ENTRY",
- binding = "owconn:OpenWireConnection = $0; context = owconn.getContext()",
- action = "org.apache.activemq.transport.failover.FailoverConsumerOutstandingCommitTest.holdResponse($0)"),
+ rules = {
+ @BMRule(
+ name = "set no return response",
+ targetClass = "org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection$CommandProcessor",
+ targetMethod = "processCommitTransactionOnePhase",
+ targetLocation = "ENTRY",
+ binding = "owconn:OpenWireConnection = $0; context = owconn.getContext()",
+ action = "org.apache.activemq.transport.failover.FailoverConsumerOutstandingCommitTest.holdResponse($0)"),
@BMRule(
- name = "stop broker before commit",
- targetClass = "org.apache.activemq.artemis.core.server.impl.ServerSessionImpl",
- targetMethod = "commit",
- targetLocation = "ENTRY",
- action = "org.apache.activemq.transport.failover.FailoverConsumerOutstandingCommitTest.stopServerInTransaction();return")})
+ name = "stop broker before commit",
+ targetClass = "org.apache.activemq.artemis.core.server.impl.ServerSessionImpl",
+ targetMethod = "commit",
+ targetLocation = "ENTRY",
+ action = "org.apache.activemq.transport.failover.FailoverConsumerOutstandingCommitTest.stopServerInTransaction();return")})
public void TestFailoverConsumerOutstandingSendTxIncomplete() throws Exception {
doTestFailoverConsumerOutstandingSendTx(false);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/27d6b97a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/investigations/InvestigationOpenwireTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/investigations/InvestigationOpenwireTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/investigations/InvestigationOpenwireTest.java
index 914a8e1..1599a2c 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/investigations/InvestigationOpenwireTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/investigations/InvestigationOpenwireTest.java
@@ -29,6 +29,7 @@ import javax.transaction.xa.XAResource;
import java.util.Collection;
import java.util.LinkedList;
+import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest;
import org.junit.Assert;
import org.junit.Test;
@@ -59,6 +60,30 @@ public class InvestigationOpenwireTest extends BasicOpenWireTest {
}
@Test
+ public void testProducerFlowControl() throws Exception {
+ try {
+
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(urlString);
+
+ factory.setProducerWindowSize(1024 * 64);
+
+ Connection connection = factory.createConnection();
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ Queue queue = session.createQueue(queueName);
+ MessageProducer producer = session.createProducer(queue);
+ producer.send(session.createTextMessage("test"));
+
+
+ connection.close();
+
+ System.err.println("Done!!!");
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Test
public void testAutoAck() throws Exception {
try {
@@ -108,6 +133,7 @@ public class InvestigationOpenwireTest extends BasicOpenWireTest {
MessageProducer producer = session.createProducer(queue);
MessageConsumer consumer = session.createConsumer(queue);
producer.send(session.createTextMessage("test"));
+ producer.send(session.createTextMessage("test2"));
connection.start();
Assert.assertNull(consumer.receive(1000));
session.rollback();
@@ -130,6 +156,7 @@ public class InvestigationOpenwireTest extends BasicOpenWireTest {
}
+
@Test
public void testClientACK() throws Exception {
try {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/27d6b97a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
----------------------------------------------------------------------
diff --git a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
index 6351357..cd08b9e 100644
--- a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
+++ b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
@@ -301,6 +301,10 @@ public class PersistMultiThreadTest extends ActiveMQTestBase {
return 0;
}
+ public boolean isFull() {
+ return false;
+ }
+
@Override
public void applySetting(AddressSettings addressSettings) {