You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/03/31 04:31:24 UTC
[42/69] [abbrv] activemq-artemis git commit: Progress on refactoring
- first pass: using AbstractConnection on OpenWireConnection and adding more
TODOs
Progress on refactoring - first pass: using AbstractConnection on OpenWireConnection and adding more TODOs
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/17921535
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/17921535
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/17921535
Branch: refs/heads/refactor-openwire
Commit: 17921535378d5cd7be1488704a37d1fc72c1a4e6
Parents: 33edfff
Author: Clebert Suconic <cl...@apache.org>
Authored: Tue Feb 23 21:52:21 2016 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Mar 30 22:29:44 2016 -0400
----------------------------------------------------------------------
.../protocol/AbstractRemotingConnection.java | 5 +
.../protocol/openwire/AMQTransactionImpl.java | 14 +-
.../protocol/openwire/OpenWireConnection.java | 1100 +++++++-----------
.../openwire/OpenWireProtocolManager.java | 14 +-
.../openwire/amq/AMQProducerBrokerExchange.java | 51 +-
.../openwire/amq/AMQServerSessionFactory.java | 9 +
.../core/protocol/openwire/amq/AMQSession.java | 13 +-
.../openwire/impl/OpenWireServerCallback.java | 75 ++
8 files changed, 560 insertions(+), 721 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/17921535/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java
index b759ccc..ee2449b 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java
@@ -104,6 +104,11 @@ public abstract class AbstractRemotingConnection implements RemotingConnection {
return transportConnection.getID();
}
+
+ public String getLocalAddress() {
+ return transportConnection.getLocalAddress();
+ }
+
@Override
public String getRemoteAddress() {
return transportConnection.getRemoteAddress();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/17921535/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/AMQTransactionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/AMQTransactionImpl.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/AMQTransactionImpl.java
index e356522..bbd7e95 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/AMQTransactionImpl.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/AMQTransactionImpl.java
@@ -28,22 +28,10 @@ public class AMQTransactionImpl extends TransactionImpl {
private boolean rollbackForClose = false;
- public AMQTransactionImpl(StorageManager storageManager, int timeoutSeconds) {
- super(storageManager, timeoutSeconds);
- }
-
- public AMQTransactionImpl(StorageManager storageManager) {
- super(storageManager);
- }
-
public AMQTransactionImpl(Xid xid, StorageManager storageManager, int timeoutSeconds) {
super(xid, storageManager, timeoutSeconds);
}
- public AMQTransactionImpl(long id, Xid xid, StorageManager storageManager) {
- super(id, xid, storageManager);
- }
-
@Override
public RefsOperation createRefsOperation(Queue queue) {
return new AMQrefsOperation(queue, storageManager);
@@ -55,6 +43,8 @@ public class AMQTransactionImpl extends TransactionImpl {
super(queue, storageManager);
}
+
+ // This is because the Rollbacks happen through the consumer, not through the server's
@Override
public void afterRollback(Transaction tx) {
if (rollbackForClose) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/17921535/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index a6f0f34..dbbb59f 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -31,6 +31,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -39,6 +40,9 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException;
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
+import org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManager;
+import org.apache.activemq.artemis.core.protocol.openwire.OpenWireUtil;
+import org.apache.activemq.artemis.core.protocol.openwire.SendingResult;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQCompositeConsumerBrokerExchange;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer;
@@ -50,6 +54,7 @@ import org.apache.activemq.artemis.core.remoting.CloseListener;
import org.apache.activemq.artemis.core.remoting.FailureListener;
import org.apache.activemq.artemis.core.security.SecurityAuth;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.apache.activemq.artemis.spi.core.protocol.AbstractRemotingConnection;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
@@ -101,32 +106,22 @@ import org.apache.activemq.wireformat.WireFormat;
* Represents an activemq connection.
* ToDo: extends AbstractRemotingConnection
*/
-public class OpenWireConnection implements RemotingConnection, CommandVisitor, SecurityAuth {
+public class OpenWireConnection extends AbstractRemotingConnection implements SecurityAuth {
private final OpenWireProtocolManager protocolManager;
- private final Connection transportConnection;
-
- private final long creationTime;
-
- private final List<FailureListener> failureListeners = new CopyOnWriteArrayList<>();
-
private final List<CloseListener> closeListeners = new CopyOnWriteArrayList<>();
private boolean destroyed = false;
private final Object sendLock = new Object();
- private volatile boolean dataReceived;
-
private final Acceptor acceptorUsed;
private final OpenWireFormat wireFormat;
private AMQConnectionContext context;
- private boolean pendingStop;
-
private Throwable stopError = null;
private final AtomicBoolean stopping = new AtomicBoolean(false);
@@ -154,21 +149,16 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
public OpenWireConnection(Acceptor acceptorUsed,
Connection connection,
+ Executor executor,
OpenWireProtocolManager openWireProtocolManager,
OpenWireFormat wf) {
+ super(connection, executor);
this.protocolManager = openWireProtocolManager;
- this.transportConnection = connection;
this.acceptorUsed = acceptorUsed;
this.wireFormat = wf;
- this.creationTime = System.currentTimeMillis();
this.defaultSocketURIString = connection.getLocalAddress();
}
- @Override
- public boolean isWritable(ReadyListener callback) {
- return transportConnection.isWritable(callback);
- }
-
// SecurityAuth implementation
@Override
public String getUsername() {
@@ -181,6 +171,12 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
// SecurityAuth implementation
@Override
+ public RemotingConnection getRemotingConnection() {
+ return this;
+ }
+
+ // SecurityAuth implementation
+ @Override
public String getPassword() {
ConnectionInfo info = getConnectionInfo();
if (info == null) {
@@ -200,18 +196,15 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
return info;
}
- public String getLocalAddress() {
- return transportConnection.getLocalAddress();
- }
-
@Override
public void bufferReceived(Object connectionID, ActiveMQBuffer buffer) {
+ super.bufferReceived(connectionID, buffer);
try {
- dataReceived = true;
+
+ // TODO-NOW: set OperationContext
Command command = (Command) wireFormat.unmarshal(buffer);
- //logger.log("got command: " + command);
boolean responseRequired = command.isResponseRequired();
int commandId = command.getCommandId();
// the connection handles pings, negotiations directly.
@@ -223,36 +216,27 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
// for some reason KeepAliveInfo.isResponseRequired() is always false
protocolManager.sendReply(this, info);
}
- else if (command.getClass() == WireFormatInfo.class) {
- // amq here starts a read/write monitor thread (detect ttl?)
- negotiate((WireFormatInfo) command);
- }
else {
Response response = null;
- if (pendingStop) {
- response = new ExceptionResponse(this.stopError);
+ try {
+ setLastCommand(command);
+ response = command.visit(new CommandProcessor());
}
- else {
- try {
- setLastCommand(command);
- response = command.visit(this);
- }
- catch (Exception e) {
- if (responseRequired) {
- response = new ExceptionResponse(e);
- }
- }
- finally {
- setLastCommand(null);
+ catch (Exception e) {
+ if (responseRequired) {
+ response = new ExceptionResponse(e);
}
+ }
+ finally {
+ setLastCommand(null);
+ }
- if (response instanceof ExceptionResponse) {
- if (!responseRequired) {
- Throwable cause = ((ExceptionResponse) response).getException();
- serviceException(cause);
- response = null;
- }
+ if (response instanceof ExceptionResponse) {
+ if (!responseRequired) {
+ Throwable cause = ((ExceptionResponse) response).getException();
+ serviceException(cause);
+ response = null;
}
}
@@ -272,6 +256,8 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
}
}
+ // TODO-NOW: response through operation-context
+
if (response != null && !protocolManager.isStopping()) {
response.setCorrelationId(commandId);
dispatchSync(response);
@@ -280,6 +266,8 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
}
}
catch (IOException e) {
+
+ // TODO-NOW: send errors
ActiveMQServerLogger.LOGGER.error("error decoding", e);
}
catch (Throwable t) {
@@ -293,135 +281,11 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
}
}
- private void negotiate(WireFormatInfo command) throws IOException {
- this.wireFormat.renegotiateWireFormat(command);
- //throw back a brokerInfo here
- protocolManager.sendBrokerInfo(this);
- }
-
- @Override
- public Object getID() {
- return transportConnection.getID();
- }
-
- @Override
- public long getCreationTime() {
- return creationTime;
- }
-
- @Override
- public String getRemoteAddress() {
- return transportConnection.getRemoteAddress();
- }
-
- @Override
- public void addFailureListener(FailureListener listener) {
- if (listener == null) {
- throw new IllegalStateException("FailureListener cannot be null");
- }
-
- failureListeners.add(listener);
- }
-
- @Override
- public boolean removeFailureListener(FailureListener listener) {
- if (listener == null) {
- throw new IllegalStateException("FailureListener cannot be null");
- }
-
- return failureListeners.remove(listener);
- }
-
- @Override
- public void addCloseListener(CloseListener listener) {
- if (listener == null) {
- throw new IllegalStateException("CloseListener cannot be null");
- }
-
- closeListeners.add(listener);
- }
-
- @Override
- public boolean removeCloseListener(CloseListener listener) {
- if (listener == null) {
- throw new IllegalStateException("CloseListener cannot be null");
- }
-
- return closeListeners.remove(listener);
- }
-
- @Override
- public List<CloseListener> removeCloseListeners() {
- List<CloseListener> ret = new ArrayList<>(closeListeners);
-
- closeListeners.clear();
-
- return ret;
- }
-
- @Override
- public void setCloseListeners(List<CloseListener> listeners) {
- closeListeners.clear();
-
- closeListeners.addAll(listeners);
- }
-
- @Override
- public List<FailureListener> getFailureListeners() {
- // we do not return the listeners otherwise the remoting service
- // would NOT destroy the connection.
- return Collections.emptyList();
- }
-
- @Override
- public List<FailureListener> removeFailureListeners() {
- List<FailureListener> ret = new ArrayList<>(failureListeners);
-
- failureListeners.clear();
-
- return ret;
- }
-
- @Override
- public void setFailureListeners(List<FailureListener> listeners) {
- failureListeners.clear();
-
- failureListeners.addAll(listeners);
- }
-
- @Override
- public ActiveMQBuffer createTransportBuffer(int size) {
- return ActiveMQBuffers.dynamicBuffer(size);
- }
-
- @Override
- public void fail(ActiveMQException me) {
- fail(me, null);
- }
-
@Override
public void destroy() {
fail(null, null);
}
- private void deleteTempQueues() throws Exception {
- Iterator<ActiveMQDestination> tmpQs = tempQueues.iterator();
- while (tmpQs.hasNext()) {
- ActiveMQDestination q = tmpQs.next();
- protocolManager.removeDestination(this, q);
- }
- }
-
- @Override
- public RemotingConnection getRemotingConnection() {
- return this;
- }
-
- @Override
- public Connection getTransportConnection() {
- return this.transportConnection;
- }
-
@Override
public boolean isClient() {
return false;
@@ -466,22 +330,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
}
}
- private void callClosingListeners() {
- final List<CloseListener> listenersClone = new ArrayList<>(closeListeners);
-
- for (final CloseListener listener : listenersClone) {
- try {
- listener.connectionClosed();
- }
- catch (final Throwable t) {
- // Failure of one listener to execute shouldn't prevent others
- // from
- // executing
- ActiveMQServerLogger.LOGGER.errorCallingFailureListener(t);
- }
- }
- }
-
// throw a WireFormatInfo to the peer
public void init() {
WireFormatInfo info = wireFormat.getPreferedWireFormatInfo();
@@ -509,34 +357,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
}
- @Override
- public Response processAddConnection(ConnectionInfo info) throws Exception {
- //let protoclmanager handle connection add/remove
- try {
- protocolManager.addConnection(this, info);
- }
- catch (Exception e) {
- if (e instanceof SecurityException) {
- // close this down - in case the peer of this transport doesn't play
- // nice
- delayedStop(2000, "Failed with SecurityException: " + e.getLocalizedMessage(), e);
- }
- Response resp = new ExceptionResponse(e);
- return resp;
- }
- if (info.isManageable() && protocolManager.isUpdateClusterClients()) {
- // send ConnectionCommand
- ConnectionControl command = protocolManager.newConnectionControl();
- command.setFaultTolerant(protocolManager.isFaultTolerantConfiguration());
- if (info.isFailoverReconnect()) {
- command.setRebalanceConnection(false);
- }
- dispatchAsync(command);
- }
- return null;
-
- }
-
public void dispatchAsync(Command message) {
if (!stopping.get()) {
dispatchSync(message);
@@ -564,7 +384,7 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
public void serviceExceptionAsync(final IOException e) {
if (asyncException.compareAndSet(false, true)) {
- // Why this is not through an executor?
+ // TODO: Why this is not through an executor?
new Thread("Async Exception Handler") {
@Override
public void run() {
@@ -585,12 +405,7 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
try {
ConnectionError ce = new ConnectionError();
ce.setException(e);
- if (pendingStop) {
- dispatchSync(ce);
- }
- else {
- dispatchAsync(ce);
- }
+ dispatchAsync(ce);
}
finally {
inServiceException = false;
@@ -649,89 +464,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
}
}
- public void delayedStop(final int waitTime, final String reason, Throwable cause) {
- if (waitTime > 0) {
- synchronized (this) {
- pendingStop = true;
- stopError = cause;
- }
- }
- }
-
- public void stopAsync() {
- // If we're in the middle of starting then go no further... for now.
- synchronized (this) {
- pendingStop = true;
- }
- if (stopping.compareAndSet(false, true)) {
- if (context != null) {
- context.getStopping().set(true);
- }
- }
- }
-
- protected void doStop() throws Exception {
- /*
- * What's a duplex bridge? try { synchronized (this) { if (duplexBridge !=
- * null) { duplexBridge.stop(); } } } catch (Exception ignore) {
- * LOG.trace("Exception caught stopping. This exception is ignored.",
- * ignore); }
- */
- try {
- getTransportConnection().close();
- }
- catch (Exception e) {
- // log
- }
-
- // Run the MessageDispatch callbacks so that message references get
- // cleaned up.
- synchronized (dispatchQueue) {
- for (Iterator<Command> iter = dispatchQueue.iterator(); iter.hasNext(); ) {
- Command command = iter.next();
- if (command.isMessageDispatch()) {
- MessageDispatch md = (MessageDispatch) command;
- TransmitCallback sub = md.getTransmitCallback();
- protocolManager.postProcessDispatch(md);
- if (sub != null) {
- sub.onFailure();
- }
- }
- }
- dispatchQueue.clear();
- }
- //
- // Remove all logical connection associated with this connection
- // from the broker.
- if (!protocolManager.isStopped()) {
- context.getStopping().set(true);
- try {
- processRemoveConnection(state.getInfo().getConnectionId(), 0L);
- }
- catch (Throwable ignore) {
- ignore.printStackTrace();
- }
- }
- }
-
- @Override
- public Response processAddConsumer(ConsumerInfo info) {
- Response resp = null;
- try {
- protocolManager.addConsumer(this, info);
- }
- catch (Exception e) {
- e.printStackTrace();
- if (e instanceof ActiveMQSecurityException) {
- resp = new ExceptionResponse(new JMSSecurityException(e.getMessage()));
- }
- else {
- resp = new ExceptionResponse(e);
- }
- }
- return resp;
- }
-
public void addConsumerBrokerExchange(ConsumerId id,
AMQSession amqSession,
Map<ActiveMQDestination, AMQConsumer> consumerMap) {
@@ -762,222 +494,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
}
}
- public int getConsumerCount() {
- int result = 0;
- for (SessionId sessionId : state.getSessionIds()) {
- SessionState sessionState = state.getSessionState(sessionId);
- if (sessionState != null) {
- result += sessionState.getConsumerIds().size();
- }
- }
- return result;
- }
-
- public int getProducerCount() {
- int result = 0;
- for (SessionId sessionId : state.getSessionIds()) {
- SessionState sessionState = state.getSessionState(sessionId);
- if (sessionState != null) {
- result += sessionState.getProducerIds().size();
- }
- }
- return result;
- }
-
- @Override
- public Response processAddDestination(DestinationInfo dest) throws Exception {
- Response resp = null;
- try {
- protocolManager.addDestination(this, dest);
- }
- catch (Exception e) {
- if (e instanceof ActiveMQSecurityException) {
- resp = new ExceptionResponse(new JMSSecurityException(e.getMessage()));
- }
- else {
- resp = new ExceptionResponse(e);
- }
- }
- return resp;
- }
-
- @Override
- public Response processAddProducer(ProducerInfo info) throws Exception {
- Response resp = null;
- try {
- protocolManager.addProducer(this, info);
- }
- catch (Exception e) {
- if (e instanceof ActiveMQSecurityException) {
- resp = new ExceptionResponse(new JMSSecurityException(e.getMessage()));
- }
- else if (e instanceof ActiveMQNonExistentQueueException) {
- resp = new ExceptionResponse(new InvalidDestinationException(e.getMessage()));
- }
- else {
- resp = new ExceptionResponse(e);
- }
- }
- return resp;
- }
-
- @Override
- public Response processAddSession(SessionInfo info) throws Exception {
- // Avoid replaying dup commands
- if (!state.getSessionIds().contains(info.getSessionId())) {
- protocolManager.addSession(this, info);
- try {
- state.addSession(info);
- }
- catch (IllegalStateException e) {
- e.printStackTrace();
- protocolManager.removeSession(context, info);
- }
- }
- return null;
- }
-
- @Override
- public Response processBeginTransaction(TransactionInfo info) throws Exception {
- TransactionId txId = info.getTransactionId();
-
- if (!txMap.containsKey(txId)) {
- txMap.put(txId, info);
- }
- return null;
- }
-
- @Override
- public Response processBrokerInfo(BrokerInfo arg0) throws Exception {
- throw new IllegalStateException("not implemented! ");
- }
-
- @Override
- public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
- protocolManager.commitTransactionOnePhase(info);
- TransactionId txId = info.getTransactionId();
- txMap.remove(txId);
-
- return null;
- }
-
- @Override
- public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
- protocolManager.commitTransactionTwoPhase(info);
- TransactionId txId = info.getTransactionId();
- txMap.remove(txId);
-
- return null;
- }
-
- @Override
- public Response processConnectionControl(ConnectionControl connectionControl) throws Exception {
- //activemq5 keeps a var to remember only the faultTolerant flag
- //this can be sent over a reconnected transport as the first command
- //before restoring the connection.
- return null;
- }
-
- @Override
- public Response processConnectionError(ConnectionError arg0) throws Exception {
- throw new IllegalStateException("not implemented! ");
- }
-
- @Override
- public Response processConsumerControl(ConsumerControl consumerControl) throws Exception {
- //amq5 clients send this command to restore prefetchSize
- //after successful reconnect
- try {
- protocolManager.updateConsumer(this, consumerControl);
- }
- catch (Exception e) {
- //log error
- }
- return null;
- }
-
- @Override
- public Response processControlCommand(ControlCommand arg0) throws Exception {
- throw new IllegalStateException("not implemented! ");
- }
-
- @Override
- public Response processEndTransaction(TransactionInfo info) throws Exception {
- protocolManager.endTransaction(info);
- TransactionId txId = info.getTransactionId();
-
- if (!txMap.containsKey(txId)) {
- txMap.put(txId, info);
- }
- return null;
- }
-
- @Override
- public Response processFlush(FlushCommand arg0) throws Exception {
- throw new IllegalStateException("not implemented! ");
- }
-
- @Override
- public Response processForgetTransaction(TransactionInfo info) throws Exception {
- TransactionId txId = info.getTransactionId();
- txMap.remove(txId);
-
- protocolManager.forgetTransaction(info.getTransactionId());
- return null;
- }
-
- @Override
- public Response processKeepAlive(KeepAliveInfo arg0) throws Exception {
- throw new IllegalStateException("not implemented! ");
- }
-
- @Override
- public Response processMessage(Message messageSend) {
- Response resp = null;
- try {
- ProducerId producerId = messageSend.getProducerId();
- AMQProducerBrokerExchange producerExchange = getProducerBrokerExchange(producerId);
- final AMQConnectionContext pcontext = producerExchange.getConnectionContext();
- final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo();
- boolean sendProducerAck = !messageSend.isResponseRequired() && producerInfo.getWindowSize() > 0 && !pcontext.isInRecoveryMode();
-
- AMQSession session = protocolManager.getSession(producerId.getParentId());
-
- if (producerExchange.canDispatch(messageSend)) {
- SendingResult result = session.send(producerExchange, messageSend, sendProducerAck);
- if (result.isBlockNextSend()) {
- if (!context.isNetworkConnection() && result.isSendFailIfNoSpace()) {
- throw new ResourceAllocationException("Usage Manager Memory Limit reached. Stopping producer (" + producerId + ") to prevent flooding " + result.getBlockingAddress() + "." + " See http://activemq.apache.org/producer-flow-control.html for more info");
- }
-
- if (producerInfo.getWindowSize() > 0 || messageSend.isResponseRequired()) {
- //in that case don't send the response
- //this will force the client to wait until
- //the response is got.
- context.setDontSendReponse(true);
- }
- else {
- //hang the connection until the space is available
- session.blockingWaitForSpace(producerExchange, result);
- }
- }
- else if (sendProducerAck) {
- ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
- this.dispatchAsync(ack);
- }
- }
- }
- catch (Throwable e) {
- if (e instanceof ActiveMQSecurityException) {
- resp = new ExceptionResponse(new JMSSecurityException(e.getMessage()));
- }
- else {
- resp = new ExceptionResponse(e);
- }
- }
- return resp;
- }
-
private AMQProducerBrokerExchange getProducerBrokerExchange(ProducerId id) throws IOException {
AMQProducerBrokerExchange result = producerExchanges.get(id);
if (result == null) {
@@ -1007,163 +523,12 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
return result;
}
- @Override
- public Response processMessageAck(MessageAck ack) throws Exception {
- AMQConsumerBrokerExchange consumerBrokerExchange = consumerExchanges.get(ack.getConsumerId());
- consumerBrokerExchange.acknowledge(ack);
- return null;
- }
-
- @Override
- public Response processMessageDispatch(MessageDispatch arg0) throws Exception {
- throw new IllegalStateException("not implemented! ");
- }
-
- @Override
- public Response processMessageDispatchNotification(MessageDispatchNotification arg0) throws Exception {
- throw new IllegalStateException("not implemented! ");
- }
-
- @Override
- public Response processMessagePull(MessagePull arg0) throws Exception {
- AMQConsumerBrokerExchange amqConsumerBrokerExchange = consumerExchanges.get(arg0.getConsumerId());
- if (amqConsumerBrokerExchange == null) {
- throw new IllegalStateException("Consumer does not exist");
- }
- amqConsumerBrokerExchange.processMessagePull(arg0);
- return null;
- }
-
- @Override
- public Response processPrepareTransaction(TransactionInfo info) throws Exception {
- protocolManager.prepareTransaction(info);
- return null;
- }
-
- @Override
- public Response processProducerAck(ProducerAck arg0) throws Exception {
- throw new IllegalStateException("not implemented! ");
- }
-
- @Override
- public Response processRecoverTransactions(TransactionInfo info) throws Exception {
- Set<SessionId> sIds = state.getSessionIds();
- TransactionId[] recovered = protocolManager.recoverTransactions(sIds);
- return new DataArrayResponse(recovered);
- }
-
- @Override
- public Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId) throws Exception {
- //we let protocol manager to handle connection add/remove
- try {
- protocolManager.removeConnection(this, state.getInfo(), null);
- }
- catch (Throwable e) {
- // log
- }
- return null;
- }
-
- @Override
- public Response processRemoveConsumer(ConsumerId id, long lastDeliveredSequenceId) throws Exception {
- SessionId sessionId = id.getParentId();
- SessionState ss = state.getSessionState(sessionId);
- if (ss == null) {
- throw new IllegalStateException("Cannot remove a consumer from a session that had not been registered: " + sessionId);
- }
- ConsumerState consumerState = ss.removeConsumer(id);
- if (consumerState == null) {
- throw new IllegalStateException("Cannot remove a consumer that had not been registered: " + id);
- }
- ConsumerInfo info = consumerState.getInfo();
- info.setLastDeliveredSequenceId(lastDeliveredSequenceId);
-
- AMQConsumerBrokerExchange consumerBrokerExchange = consumerExchanges.get(id);
-
- consumerBrokerExchange.removeConsumer();
-
- removeConsumerBrokerExchange(id);
-
- return null;
- }
-
private void removeConsumerBrokerExchange(ConsumerId id) {
synchronized (consumerExchanges) {
consumerExchanges.remove(id);
}
}
- @Override
- public Response processRemoveDestination(DestinationInfo info) throws Exception {
- ActiveMQDestination dest = info.getDestination();
- protocolManager.removeDestination(this, dest);
- return null;
- }
-
- @Override
- public Response processRemoveProducer(ProducerId id) throws Exception {
- protocolManager.removeProducer(id);
- return null;
- }
-
- @Override
- public Response processRemoveSession(SessionId id, long lastDeliveredSequenceId) throws Exception {
- SessionState session = state.getSessionState(id);
- if (session == null) {
- throw new IllegalStateException("Cannot remove session that had not been registered: " + id);
- }
- // Don't let new consumers or producers get added while we are closing
- // this down.
- session.shutdown();
- // Cascade the connection stop producers.
- // we don't stop consumer because in core
- // closing the session will do the job
- for (ProducerId producerId : session.getProducerIds()) {
- try {
- processRemoveProducer(producerId);
- }
- catch (Throwable e) {
- // LOG.warn("Failed to remove producer: {}", producerId, e);
- }
- }
- state.removeSession(id);
- protocolManager.removeSession(context, session.getInfo());
- return null;
- }
-
- @Override
- public Response processRemoveSubscription(RemoveSubscriptionInfo subInfo) throws Exception {
- protocolManager.removeSubscription(subInfo);
- return null;
- }
-
- @Override
- public Response processRollbackTransaction(TransactionInfo info) throws Exception {
- protocolManager.rollbackTransaction(info);
- TransactionId txId = info.getTransactionId();
- txMap.remove(txId);
- return null;
- }
-
- @Override
- public Response processShutdown(ShutdownInfo info) throws Exception {
- this.shutdown(false);
- return null;
- }
-
- @Override
- public Response processWireFormat(WireFormatInfo arg0) throws Exception {
- throw new IllegalStateException("not implemented! ");
- }
-
- public int getMaximumConsumersAllowedPerConnection() {
- return 1000000;//this belongs to configuration, now hardcoded
- }
-
- public int getMaximumProducersAllowedPerConnection() {
- return 1000000;//this belongs to configuration, now hardcoded
- }
-
public void deliverMessage(MessageDispatch dispatch) {
Message m = dispatch.getMessage();
if (m != null) {
@@ -1323,4 +688,393 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
context.setReconnect(true);
context.incRefCount();
}
+
+ // This will listen for commands throught the protocolmanager
+ class CommandProcessor implements CommandVisitor {
+
+ @Override
+ public Response processAddConnection(ConnectionInfo info) throws Exception {
+ //let protoclmanager handle connection add/remove
+ try {
+ protocolManager.addConnection(OpenWireConnection.this, info);
+ }
+ catch (Exception e) {
+ Response resp = new ExceptionResponse(e);
+ return resp;
+ }
+ if (info.isManageable() && protocolManager.isUpdateClusterClients()) {
+ // send ConnectionCommand
+ ConnectionControl command = protocolManager.newConnectionControl();
+ command.setFaultTolerant(protocolManager.isFaultTolerantConfiguration());
+ if (info.isFailoverReconnect()) {
+ command.setRebalanceConnection(false);
+ }
+ dispatchAsync(command);
+ }
+ return null;
+
+ }
+
+ @Override
+ public Response processAddProducer(ProducerInfo info) throws Exception {
+ Response resp = null;
+ try {
+ protocolManager.addProducer(OpenWireConnection.this, info);
+ }
+ catch (Exception e) {
+ if (e instanceof ActiveMQSecurityException) {
+ resp = new ExceptionResponse(new JMSSecurityException(e.getMessage()));
+ }
+ else if (e instanceof ActiveMQNonExistentQueueException) {
+ resp = new ExceptionResponse(new InvalidDestinationException(e.getMessage()));
+ }
+ else {
+ resp = new ExceptionResponse(e);
+ }
+ }
+ return resp;
+ }
+
+ @Override
+ public Response processAddConsumer(ConsumerInfo info) {
+ Response resp = null;
+ try {
+ protocolManager.addConsumer(OpenWireConnection.this, info);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ if (e instanceof ActiveMQSecurityException) {
+ resp = new ExceptionResponse(new JMSSecurityException(e.getMessage()));
+ }
+ else {
+ resp = new ExceptionResponse(e);
+ }
+ }
+ return resp;
+ }
+
+ @Override
+ public Response processRemoveDestination(DestinationInfo info) throws Exception {
+ ActiveMQDestination dest = info.getDestination();
+ protocolManager.removeDestination(OpenWireConnection.this, dest);
+ return null;
+ }
+
+ @Override
+ public Response processRemoveProducer(ProducerId id) throws Exception {
+ protocolManager.removeProducer(id);
+ return null;
+ }
+
+ @Override
+ public Response processRemoveSession(SessionId id, long lastDeliveredSequenceId) throws Exception {
+ SessionState session = state.getSessionState(id);
+ if (session == null) {
+ throw new IllegalStateException("Cannot remove session that had not been registered: " + id);
+ }
+ // Don't let new consumers or producers get added while we are closing
+ // this down.
+ session.shutdown();
+ // Cascade the connection stop producers.
+ // we don't stop consumer because in core
+ // closing the session will do the job
+ for (ProducerId producerId : session.getProducerIds()) {
+ try {
+ processRemoveProducer(producerId);
+ }
+ catch (Throwable e) {
+ // LOG.warn("Failed to remove producer: {}", producerId, e);
+ }
+ }
+ state.removeSession(id);
+ protocolManager.removeSession(context, session.getInfo());
+ return null;
+ }
+
+ @Override
+ public Response processRemoveSubscription(RemoveSubscriptionInfo subInfo) throws Exception {
+ protocolManager.removeSubscription(subInfo);
+ return null;
+ }
+
+ @Override
+ public Response processRollbackTransaction(TransactionInfo info) throws Exception {
+ protocolManager.rollbackTransaction(info);
+ TransactionId txId = info.getTransactionId();
+ txMap.remove(txId);
+ return null;
+ }
+
+ @Override
+ public Response processShutdown(ShutdownInfo info) throws Exception {
+ OpenWireConnection.this.shutdown(false);
+ return null;
+ }
+
+ @Override
+ public Response processWireFormat(WireFormatInfo command) throws Exception {
+ wireFormat.renegotiateWireFormat(command);
+ //throw back a brokerInfo here
+ protocolManager.sendBrokerInfo(OpenWireConnection.this);
+ return null;
+ }
+
+ @Override
+ public Response processAddDestination(DestinationInfo dest) throws Exception {
+ Response resp = null;
+ try {
+ protocolManager.addDestination(OpenWireConnection.this, dest);
+ }
+ catch (Exception e) {
+ if (e instanceof ActiveMQSecurityException) {
+ resp = new ExceptionResponse(new JMSSecurityException(e.getMessage()));
+ }
+ else {
+ resp = new ExceptionResponse(e);
+ }
+ }
+ return resp;
+ }
+
+ @Override
+ public Response processAddSession(SessionInfo info) throws Exception {
+ // Avoid replaying dup commands
+ if (!state.getSessionIds().contains(info.getSessionId())) {
+ protocolManager.addSession(OpenWireConnection.this, info);
+ try {
+ state.addSession(info);
+ }
+ catch (IllegalStateException e) {
+ e.printStackTrace();
+ protocolManager.removeSession(context, info);
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public Response processBeginTransaction(TransactionInfo info) throws Exception {
+ TransactionId txId = info.getTransactionId();
+
+ if (!txMap.containsKey(txId)) {
+ txMap.put(txId, info);
+ }
+ return null;
+ }
+
+ @Override
+ public Response processBrokerInfo(BrokerInfo arg0) throws Exception {
+ throw new IllegalStateException("not implemented! ");
+ }
+
+ @Override
+ public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
+ protocolManager.commitTransactionOnePhase(info);
+ TransactionId txId = info.getTransactionId();
+ txMap.remove(txId);
+
+ return null;
+ }
+
+ @Override
+ public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
+ protocolManager.commitTransactionTwoPhase(info);
+ TransactionId txId = info.getTransactionId();
+ txMap.remove(txId);
+
+ return null;
+ }
+
+ @Override
+ public Response processConnectionControl(ConnectionControl connectionControl) throws Exception {
+ //activemq5 keeps a var to remember only the faultTolerant flag
+ //this can be sent over a reconnected transport as the first command
+ //before restoring the connection.
+ return null;
+ }
+
+ @Override
+ public Response processConnectionError(ConnectionError arg0) throws Exception {
+ throw new IllegalStateException("not implemented! ");
+ }
+
+ @Override
+ public Response processConsumerControl(ConsumerControl consumerControl) throws Exception {
+ //amq5 clients send this command to restore prefetchSize
+ //after successful reconnect
+ try {
+ protocolManager.updateConsumer(OpenWireConnection.this, consumerControl);
+ }
+ catch (Exception e) {
+ //log error
+ }
+ return null;
+ }
+
+ @Override
+ public Response processControlCommand(ControlCommand arg0) throws Exception {
+ throw new IllegalStateException("not implemented! ");
+ }
+
+ @Override
+ public Response processEndTransaction(TransactionInfo info) throws Exception {
+ protocolManager.endTransaction(info);
+ TransactionId txId = info.getTransactionId();
+
+ if (!txMap.containsKey(txId)) {
+ txMap.put(txId, info);
+ }
+ return null;
+ }
+
+ @Override
+ public Response processFlush(FlushCommand arg0) throws Exception {
+ throw new IllegalStateException("not implemented! ");
+ }
+
+ @Override
+ public Response processForgetTransaction(TransactionInfo info) throws Exception {
+ TransactionId txId = info.getTransactionId();
+ txMap.remove(txId);
+
+ protocolManager.forgetTransaction(info.getTransactionId());
+ return null;
+ }
+
+ @Override
+ public Response processKeepAlive(KeepAliveInfo arg0) throws Exception {
+ throw new IllegalStateException("not implemented! ");
+ }
+
+ @Override
+ public Response processMessage(Message messageSend) {
+ Response resp = null;
+ try {
+ ProducerId producerId = messageSend.getProducerId();
+ AMQProducerBrokerExchange producerExchange = getProducerBrokerExchange(producerId);
+ final AMQConnectionContext pcontext = producerExchange.getConnectionContext();
+ final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo();
+ boolean sendProducerAck = !messageSend.isResponseRequired() && producerInfo.getWindowSize() > 0 && !pcontext.isInRecoveryMode();
+
+ AMQSession session = protocolManager.getSession(producerId.getParentId());
+
+ // TODO: canDispatch is always returning true;
+ if (producerExchange.canDispatch(messageSend)) {
+ SendingResult result = session.send(producerExchange, messageSend, sendProducerAck);
+ if (result.isBlockNextSend()) {
+ if (!context.isNetworkConnection() && result.isSendFailIfNoSpace()) {
+ // TODO see logging
+ throw new ResourceAllocationException("Usage Manager Memory Limit reached. Stopping producer (" + producerId + ") to prevent flooding " + result.getBlockingAddress() + "." + " See http://activemq.apache.org/producer-flow-control.html for more info");
+ }
+
+ if (producerInfo.getWindowSize() > 0 || messageSend.isResponseRequired()) {
+ //in that case don't send the response
+ //this will force the client to wait until
+ //the response is got.
+ context.setDontSendReponse(true);
+ }
+ else {
+ //hang the connection until the space is available
+ session.blockingWaitForSpace(producerExchange, result);
+ }
+ }
+ else if (sendProducerAck) {
+ ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
+ OpenWireConnection.this.dispatchAsync(ack);
+ }
+ }
+ }
+ catch (Throwable e) {
+ if (e instanceof ActiveMQSecurityException) {
+ resp = new ExceptionResponse(new JMSSecurityException(e.getMessage()));
+ }
+ else {
+ resp = new ExceptionResponse(e);
+ }
+ }
+ return resp;
+ }
+
+ @Override
+ public Response processMessageAck(MessageAck ack) throws Exception {
+ AMQConsumerBrokerExchange consumerBrokerExchange = consumerExchanges.get(ack.getConsumerId());
+ consumerBrokerExchange.acknowledge(ack);
+ return null;
+ }
+
+ @Override
+ public Response processMessageDispatch(MessageDispatch arg0) throws Exception {
+ throw new IllegalStateException("not implemented! ");
+ }
+
+ @Override
+ public Response processMessageDispatchNotification(MessageDispatchNotification arg0) throws Exception {
+ throw new IllegalStateException("not implemented! ");
+ }
+
+ @Override
+ public Response processMessagePull(MessagePull arg0) throws Exception {
+ AMQConsumerBrokerExchange amqConsumerBrokerExchange = consumerExchanges.get(arg0.getConsumerId());
+ if (amqConsumerBrokerExchange == null) {
+ throw new IllegalStateException("Consumer does not exist");
+ }
+ amqConsumerBrokerExchange.processMessagePull(arg0);
+ return null;
+ }
+
+ @Override
+ public Response processPrepareTransaction(TransactionInfo info) throws Exception {
+ protocolManager.prepareTransaction(info);
+ return null;
+ }
+
+ @Override
+ public Response processProducerAck(ProducerAck arg0) throws Exception {
+ throw new IllegalStateException("not implemented! ");
+ }
+
+ @Override
+ public Response processRecoverTransactions(TransactionInfo info) throws Exception {
+ Set<SessionId> sIds = state.getSessionIds();
+ TransactionId[] recovered = protocolManager.recoverTransactions(sIds);
+ return new DataArrayResponse(recovered);
+ }
+
+ @Override
+ public Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId) throws Exception {
+ //we let protocol manager to handle connection add/remove
+ try {
+ protocolManager.removeConnection(OpenWireConnection.this, state.getInfo(), null);
+ }
+ catch (Throwable e) {
+ // log
+ }
+ return null;
+ }
+
+ @Override
+ public Response processRemoveConsumer(ConsumerId id, long lastDeliveredSequenceId) throws Exception {
+ SessionId sessionId = id.getParentId();
+ SessionState ss = state.getSessionState(sessionId);
+ if (ss == null) {
+ throw new IllegalStateException("Cannot remove a consumer from a session that had not been registered: " + sessionId);
+ }
+ ConsumerState consumerState = ss.removeConsumer(id);
+ if (consumerState == null) {
+ throw new IllegalStateException("Cannot remove a consumer that had not been registered: " + id);
+ }
+ ConsumerInfo info = consumerState.getInfo();
+ info.setLastDeliveredSequenceId(lastDeliveredSequenceId);
+
+ AMQConsumerBrokerExchange consumerBrokerExchange = consumerExchanges.get(id);
+
+ consumerBrokerExchange.removeConsumer();
+
+ removeConsumerBrokerExchange(id);
+
+ return null;
+ }
+
+ }
+
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/17921535/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
index 014181d..440fcce 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
@@ -219,7 +219,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
@Override
public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, Connection connection) {
OpenWireFormat wf = (OpenWireFormat) wireFactory.createWireFormat();
- OpenWireConnection owConn = new OpenWireConnection(acceptorUsed, connection, this, wf);
+ OpenWireConnection owConn = new OpenWireConnection(acceptorUsed, connection, server.getExecutorFactory().getExecutor(), this, wf);
owConn.init();
// TODO CLEBERT What is this constant here? we should get it from TTL initial pings
@@ -506,9 +506,6 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
ActiveMQDestination destination = info.getDestination();
if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination)) {
- if (theConn.getProducerCount() >= theConn.getMaximumProducersAllowedPerConnection()) {
- throw new IllegalStateException("Can't add producer on connection " + connectionId + ": at maximum limit: " + theConn.getMaximumProducersAllowedPerConnection());
- }
if (destination.isQueue()) {
OpenWireUtil.validateDestination(destination, amqSession);
}
@@ -549,12 +546,6 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
}
// Avoid replaying dup commands
if (!ss.getConsumerIds().contains(info.getConsumerId())) {
- ActiveMQDestination destination = info.getDestination();
- if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination)) {
- if (theConn.getConsumerCount() >= theConn.getMaximumConsumersAllowedPerConnection()) {
- throw new IllegalStateException("Can't add consumer on connection " + connectionId + ": at maximum limit: " + theConn.getMaximumConsumersAllowedPerConnection());
- }
- }
AMQSession amqSession = sessions.get(sessionId);
if (amqSession == null) {
@@ -760,6 +751,9 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
transactions.remove(xid);
}
+ /**
+ * TODO: remove this, use the regular ResourceManager from the Server's
+ * */
public void registerTx(TransactionId txId, AMQSession amqSession) {
transactions.put(txId, amqSession);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/17921535/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQProducerBrokerExchange.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQProducerBrokerExchange.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQProducerBrokerExchange.java
index f94c119..e9c4044 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQProducerBrokerExchange.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQProducerBrokerExchange.java
@@ -29,8 +29,6 @@ public class AMQProducerBrokerExchange {
private ProducerState producerState;
private boolean mutable = true;
private AtomicLong lastSendSequenceNumber = new AtomicLong(-1);
- private boolean auditProducerSequenceIds;
- private boolean isNetworkProducer;
private final FlowControlInfo flowControlInfo = new FlowControlInfo();
public AMQProducerBrokerExchange() {
@@ -92,29 +90,34 @@ public class AMQProducerBrokerExchange {
* @return false if message should be ignored as a duplicate
*/
public boolean canDispatch(Message messageSend) {
+ // TODO: auditProduceSequenceIds is never true
boolean canDispatch = true;
- if (auditProducerSequenceIds && messageSend.isPersistent()) {
- final long producerSequenceId = messageSend.getMessageId().getProducerSequenceId();
- if (isNetworkProducer) {
- // messages are multiplexed on this producer so we need to query the
- // persistenceAdapter
- long lastStoredForMessageProducer = getStoredSequenceIdForMessage(messageSend.getMessageId());
- if (producerSequenceId <= lastStoredForMessageProducer) {
- canDispatch = false;
- }
- }
- else if (producerSequenceId <= lastSendSequenceNumber.get()) {
- canDispatch = false;
- if (messageSend.isInTransaction()) {
- }
- else {
- }
- }
- else {
- // track current so we can suppress duplicates later in the stream
- lastSendSequenceNumber.set(producerSequenceId);
- }
- }
+ //TODO: DEAD CODE
+// if (auditProducerSequenceIds && messageSend.isPersistent()) {
+// final long producerSequenceId = messageSend.getMessageId().getProducerSequenceId();
+// if (isNetworkProducer) {
+// // messages are multiplexed on this producer so we need to query the
+// // persistenceAdapter
+// long lastStoredForMessageProducer = getStoredSequenceIdForMessage(messageSend.getMessageId());
+// if (producerSequenceId <= lastStoredForMessageProducer) {
+// canDispatch = false;
+// }
+// }
+// else if (producerSequenceId <= lastSendSequenceNumber.get()) {
+// canDispatch = false;
+// // TODO: WHAT IS THIS?
+// if (messageSend.isInTransaction()) {
+//
+//
+// }
+// else {
+// }
+// }
+// else {
+// // track current so we can suppress duplicates later in the stream
+// lastSendSequenceNumber.set(producerSequenceId);
+// }
+// }
return canDispatch;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/17921535/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSessionFactory.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSessionFactory.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSessionFactory.java
index 9ce21e3..a6ca4a0 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSessionFactory.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSessionFactory.java
@@ -32,6 +32,15 @@ import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
public class AMQServerSessionFactory implements ServerSessionFactory {
+ private static final AMQServerSessionFactory singleInstance = new AMQServerSessionFactory();
+
+ public static AMQServerSessionFactory getInstance() {
+ return singleInstance;
+ }
+
+ private AMQServerSessionFactory() {
+ }
+
@Override
public ServerSessionImpl createCoreSession(String name,
String username,
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/17921535/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index f5ccb82..0cee3d3 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -63,8 +63,8 @@ import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
import org.apache.activemq.wireformat.WireFormat;
public class AMQSession implements SessionCallback {
- private AMQServerSession coreSession;
private ConnectionInfo connInfo;
+ private AMQServerSession coreSession;
private SessionInfo sessInfo;
private ActiveMQServer server;
private OpenWireConnection connection;
@@ -91,6 +91,7 @@ public class AMQSession implements SessionCallback {
OpenWireProtocolManager manager) {
this.connInfo = connInfo;
this.sessInfo = sessInfo;
+
this.server = server;
this.connection = connection;
this.scheduledPool = scheduledPool;
@@ -107,7 +108,7 @@ public class AMQSession implements SessionCallback {
// now
try {
- coreSession = (AMQServerSession) server.createSession(name, username, password, minLargeMessageSize, connection, true, false, false, false, null, this, new AMQServerSessionFactory(), true);
+ coreSession = (AMQServerSession) server.createSession(name, username, password, minLargeMessageSize, connection, true, false, false, false, null, this, AMQServerSessionFactory.getInstance(), true);
long sessionId = sessInfo.getSessionId().getValue();
if (sessionId == -1) {
@@ -144,6 +145,7 @@ public class AMQSession implements SessionCallback {
}
connection.addConsumerBrokerExchange(info.getConsumerId(), amqSession, consumerMap);
+ // TODO: This is wrong. We should only start when the client starts
coreSession.start();
started.set(true);
}
@@ -281,6 +283,9 @@ public class AMQSession implements SessionCallback {
coreMsg.setAddress(address);
PagingStoreImpl store = (PagingStoreImpl) server.getPagingManager().getPageStore(address);
+
+
+ // TODO: Improve this, tested with ProducerFlowControlSendFailTest
if (store.isFull()) {
result.setBlockNextSend(true);
result.setBlockPagingStore(store);
@@ -526,8 +531,12 @@ public class AMQSession implements SessionCallback {
}
}
+ // TODO: remove this, we should do the same as we do on core for blocking
public void blockingWaitForSpace(AMQProducerBrokerExchange producerExchange,
SendingResult result) throws IOException {
+
+
+ new Exception("blocking").printStackTrace();
long start = System.currentTimeMillis();
long nextWarn = start;
producerExchange.blockingOnFlowControl(true);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/17921535/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/impl/OpenWireServerCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/impl/OpenWireServerCallback.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/impl/OpenWireServerCallback.java
new file mode 100644
index 0000000..8ab3815
--- /dev/null
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/impl/OpenWireServerCallback.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.protocol.openwire.impl;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.server.ServerConsumer;
+import org.apache.activemq.artemis.core.server.ServerMessage;
+import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
+import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
+
+public class OpenWireServerCallback implements SessionCallback {
+
+ @Override
+ public boolean hasCredits(ServerConsumer consumerID) {
+ return false;
+ }
+
+ @Override
+ public void sendProducerCreditsMessage(int credits, SimpleString address) {
+
+ }
+
+ @Override
+ public void sendProducerCreditsFailMessage(int credits, SimpleString address) {
+
+ }
+
+ @Override
+ public int sendMessage(ServerMessage message, ServerConsumer consumerID, int deliveryCount) {
+ return 0;
+ }
+
+ @Override
+ public int sendLargeMessage(ServerMessage message, ServerConsumer consumerID, long bodySize, int deliveryCount) {
+ return 0;
+ }
+
+ @Override
+ public int sendLargeMessageContinuation(ServerConsumer consumerID,
+ byte[] body,
+ boolean continues,
+ boolean requiresResponse) {
+ return 0;
+ }
+
+ @Override
+ public void closed() {
+
+ }
+
+ @Override
+ public void disconnect(ServerConsumer consumerId, String queueName) {
+
+ }
+
+ @Override
+ public boolean isWritable(ReadyListener callback) {
+ return false;
+ }
+}