You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/03/19 06:08:51 UTC
[45/67] [abbrv] activemq-artemis git commit: moving send method to
the connection
moving send method to the connection
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/ec6034a3
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/ec6034a3
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/ec6034a3
Branch: refs/heads/refactor-openwire
Commit: ec6034a32727a791db71d0736f3ede75f4dfd47b
Parents: 26bed31
Author: Clebert Suconic <cl...@apache.org>
Authored: Wed Feb 24 14:26:55 2016 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Sat Mar 19 01:07:37 2016 -0400
----------------------------------------------------------------------
.../protocol/openwire/OpenWireConnection.java | 39 ++++++++++++++------
.../openwire/OpenWireProtocolManager.java | 36 ------------------
2 files changed, 27 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec6034a3/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index 7c1c094..991f24b 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -22,9 +22,7 @@ import javax.jms.JMSSecurityException;
import javax.jms.ResourceAllocationException;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -36,13 +34,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException;
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
-import org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManager;
-import org.apache.activemq.artemis.core.protocol.openwire.OpenWireUtil;
-import org.apache.activemq.artemis.core.protocol.openwire.SendingResult;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQCompositeConsumerBrokerExchange;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer;
@@ -58,7 +52,6 @@ import org.apache.activemq.artemis.spi.core.protocol.AbstractRemotingConnection;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
-import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.BrokerInfo;
@@ -104,7 +97,6 @@ import org.apache.activemq.wireformat.WireFormat;
/**
* Represents an activemq connection.
- * ToDo: extends AbstractRemotingConnection
*/
public class OpenWireConnection extends AbstractRemotingConnection implements SecurityAuth {
@@ -214,7 +206,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
info.setResponseRequired(false);
// if we don't respond to KeepAlive commands then the client will think the server is dead and timeout
// for some reason KeepAliveInfo.isResponseRequired() is always false
- protocolManager.sendReply(this, info);
+ sendCommand(info);
}
else {
Response response = null;
@@ -333,7 +325,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
// throw a WireFormatInfo to the peer
public void init() {
WireFormatInfo info = wireFormat.getPreferedWireFormatInfo();
- protocolManager.send(this, info);
+ sendCommand(info);
}
public ConnectionState getState() {
@@ -536,7 +528,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
m.setBrokerOutTime(endTime);
}
- protocolManager.send(this, dispatch);
+ sendCommand(dispatch);
}
public WireFormat getMarshaller() {
@@ -577,7 +569,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
destroyed = true;
- //before closing transport, send the last response if any
+ //before closing transport, sendCommand the last response if any
Command command = context.getLastCommand();
if (command != null && command.isResponseRequired()) {
Response lastResponse = new Response();
@@ -689,6 +681,29 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
context.incRefCount();
}
+ /** This will answer with commands to the client */
+ public boolean sendCommand(final Command command) {
+ if (ActiveMQServerLogger.LOGGER.isTraceEnabled()) {
+ ActiveMQServerLogger.LOGGER.trace("sending " + command);
+ }
+ synchronized (this) {
+ if (isDestroyed()) {
+ return false;
+ }
+
+ try {
+ physicalSend(command);
+ }
+ catch (Exception e) {
+ return false;
+ }
+ catch (Throwable t) {
+ return false;
+ }
+ return true;
+ }
+ }
+
// This will listen for commands throught the protocolmanager
public class CommandProcessor implements CommandVisitor {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec6034a3/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
index 440fcce..add1455 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
@@ -280,42 +280,6 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
}
- public void sendReply(final OpenWireConnection connection, final Command command) {
- server.getStorageManager().afterCompleteOperations(new IOCallback() {
- @Override
- public void onError(final int errorCode, final String errorMessage) {
- ActiveMQServerLogger.LOGGER.errorProcessingIOCallback(errorCode, errorMessage);
- }
-
- @Override
- public void done() {
- send(connection, command);
- }
- });
- }
-
- public boolean send(final OpenWireConnection connection, final Command command) {
- if (ActiveMQServerLogger.LOGGER.isTraceEnabled()) {
- ActiveMQServerLogger.LOGGER.trace("sending " + command);
- }
- synchronized (connection) {
- if (connection.isDestroyed()) {
- return false;
- }
-
- try {
- connection.physicalSend(command);
- }
- catch (Exception e) {
- return false;
- }
- catch (Throwable t) {
- return false;
- }
- return true;
- }
- }
-
public void addConnection(OpenWireConnection connection, ConnectionInfo info) throws Exception {
String username = info.getUserName();
String password = info.getPassword();