You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/03/16 02:54:05 UTC

[41/60] [abbrv] activemq-artemis git commit: moving send method to the connection

moving send method to the connection


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/9898f60b
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/9898f60b
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/9898f60b

Branch: refs/heads/refactor-openwire
Commit: 9898f60b53fd77f6f3b374173fa41b879a26a89a
Parents: 8e29ff7
Author: Clebert Suconic <cl...@apache.org>
Authored: Wed Feb 24 14:26:55 2016 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Tue Mar 15 20:44:21 2016 -0400

----------------------------------------------------------------------
 .../protocol/openwire/OpenWireConnection.java   | 39 ++++++++++++++------
 .../openwire/OpenWireProtocolManager.java       | 36 ------------------
 2 files changed, 27 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9898f60b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index 7c1c094..991f24b 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -22,9 +22,7 @@ import javax.jms.JMSSecurityException;
 import javax.jms.ResourceAllocationException;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -36,13 +34,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException;
 import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
-import org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManager;
-import org.apache.activemq.artemis.core.protocol.openwire.OpenWireUtil;
-import org.apache.activemq.artemis.core.protocol.openwire.SendingResult;
 import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQCompositeConsumerBrokerExchange;
 import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext;
 import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer;
@@ -58,7 +52,6 @@ import org.apache.activemq.artemis.spi.core.protocol.AbstractRemotingConnection;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
 import org.apache.activemq.artemis.spi.core.remoting.Connection;
-import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
 import org.apache.activemq.artemis.utils.ConcurrentHashSet;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.BrokerInfo;
@@ -104,7 +97,6 @@ import org.apache.activemq.wireformat.WireFormat;
 
 /**
  * Represents an activemq connection.
- * ToDo: extends AbstractRemotingConnection
  */
 public class OpenWireConnection extends AbstractRemotingConnection implements SecurityAuth {
 
@@ -214,7 +206,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
             info.setResponseRequired(false);
             // if we don't respond to KeepAlive commands then the client will think the server is dead and timeout
             // for some reason KeepAliveInfo.isResponseRequired() is always false
-            protocolManager.sendReply(this, info);
+            sendCommand(info);
          }
          else {
             Response response = null;
@@ -333,7 +325,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
    // throw a WireFormatInfo to the peer
    public void init() {
       WireFormatInfo info = wireFormat.getPreferedWireFormatInfo();
-      protocolManager.send(this, info);
+      sendCommand(info);
    }
 
    public ConnectionState getState() {
@@ -536,7 +528,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
          m.setBrokerOutTime(endTime);
       }
 
-      protocolManager.send(this, dispatch);
+      sendCommand(dispatch);
    }
 
    public WireFormat getMarshaller() {
@@ -577,7 +569,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
 
       destroyed = true;
 
-      //before closing transport, send the last response if any
+      //before closing transport, sendCommand the last response if any
       Command command = context.getLastCommand();
       if (command != null && command.isResponseRequired()) {
          Response lastResponse = new Response();
@@ -689,6 +681,29 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
       context.incRefCount();
    }
 
+   /** This will answer with commands to the client */
+   public boolean sendCommand(final Command command) {
+      if (ActiveMQServerLogger.LOGGER.isTraceEnabled()) {
+         ActiveMQServerLogger.LOGGER.trace("sending " + command);
+      }
+      synchronized (this) {
+         if (isDestroyed()) {
+            return false;
+         }
+
+         try {
+            physicalSend(command);
+         }
+         catch (Exception e) {
+            return false;
+         }
+         catch (Throwable t) {
+            return false;
+         }
+         return true;
+      }
+   }
+
    // This will listen for commands throught the protocolmanager
    public class CommandProcessor implements CommandVisitor {
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9898f60b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
index 440fcce..add1455 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
@@ -280,42 +280,6 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
 
    }
 
-   public void sendReply(final OpenWireConnection connection, final Command command) {
-      server.getStorageManager().afterCompleteOperations(new IOCallback() {
-         @Override
-         public void onError(final int errorCode, final String errorMessage) {
-            ActiveMQServerLogger.LOGGER.errorProcessingIOCallback(errorCode, errorMessage);
-         }
-
-         @Override
-         public void done() {
-            send(connection, command);
-         }
-      });
-   }
-
-   public boolean send(final OpenWireConnection connection, final Command command) {
-      if (ActiveMQServerLogger.LOGGER.isTraceEnabled()) {
-         ActiveMQServerLogger.LOGGER.trace("sending " + command);
-      }
-      synchronized (connection) {
-         if (connection.isDestroyed()) {
-            return false;
-         }
-
-         try {
-            connection.physicalSend(command);
-         }
-         catch (Exception e) {
-            return false;
-         }
-         catch (Throwable t) {
-            return false;
-         }
-         return true;
-      }
-   }
-
    public void addConnection(OpenWireConnection connection, ConnectionInfo info) throws Exception {
       String username = info.getUserName();
       String password = info.getPassword();