You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/03/31 04:31:35 UTC

[53/69] [abbrv] activemq-artemis git commit: using converter interface

using converter interface


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/f653a18e
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/f653a18e
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/f653a18e

Branch: refs/heads/refactor-openwire
Commit: f653a18ef1e34390d574a1f37dd4ac0a948afcfd
Parents: a9e0bf3
Author: Clebert Suconic <cl...@apache.org>
Authored: Thu Feb 25 18:57:21 2016 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Mar 30 22:29:44 2016 -0400

----------------------------------------------------------------------
 .../protocol/openwire/OpenWireConnection.java   |  7 -------
 .../openwire/OpenWireMessageConverter.java      | 22 +++++++++++++-------
 .../openwire/OpenWireProtocolManager.java       |  9 +++++++-
 .../core/protocol/openwire/amq/AMQSession.java  | 13 ++++++++++--
 4 files changed, 33 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f653a18e/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index 6839259..0fd8dc2 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -146,8 +146,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
 
    private volatile AMQSession advisorySession;
 
-   private String defaultSocketURIString;
-
    // TODO-NOW: check on why there are two connections created for every createConnection on the client.
    public OpenWireConnection(Connection connection,
                              Executor executor,
@@ -156,7 +154,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
       super(connection, executor);
       this.protocolManager = openWireProtocolManager;
       this.wireFormat = wf;
-      this.defaultSocketURIString = connection.getLocalAddress();
    }
 
    // SecurityAuth implementation
@@ -635,10 +632,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
       return this.context;
    }
 
-   public String getDefaultSocketURIString() {
-      return defaultSocketURIString;
-   }
-
    public void updateClient(ConnectionControl control) {
       //      if (!destroyed && context.isFaultTolerant()) {
       if (protocolManager.isUpdateClusterClients()) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f653a18e/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
index d040955..6176490 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
@@ -96,10 +96,11 @@ public class OpenWireMessageConverter implements MessageConverter {
    private static final String AMQ_MSG_DROPPABLE = AMQ_PREFIX + "DROPPABLE";
    private static final String AMQ_MSG_COMPRESSED = AMQ_PREFIX + "COMPRESSED";
 
-   @Override
-   public ServerMessage inbound(Object message) {
-      // TODO: implement this
-      return null;
+
+   private final WireFormat marshaller;
+
+   public OpenWireMessageConverter(WireFormat marshaller) {
+      this.marshaller = marshaller;
    }
 
    @Override
@@ -108,10 +109,13 @@ public class OpenWireMessageConverter implements MessageConverter {
       return null;
    }
 
-   //convert an ActiveMQ Artemis message to coreMessage
-   public static void toCoreMessage(ServerMessageImpl coreMessage,
-                                    Message messageSend,
-                                    WireFormat marshaller) throws IOException {
+
+   @Override
+   public ServerMessage inbound(Object message) throws Exception {
+
+      Message messageSend = (Message)message;
+      ServerMessageImpl coreMessage = new ServerMessageImpl(-1, messageSend.getSize());
+
       String type = messageSend.getType();
       if (type != null) {
          coreMessage.putStringProperty(new SimpleString("JMSType"), new SimpleString(type));
@@ -398,6 +402,8 @@ public class OpenWireMessageConverter implements MessageConverter {
          origDestBytes.compact();
          coreMessage.putBytesProperty(AMQ_MSG_ORIG_DESTINATION, origDestBytes.data);
       }
+
+      return coreMessage;
    }
 
    private static void loadMapIntoProperties(TypedProperties props, Map<String, Object> map) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f653a18e/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
index 514a2b9..51c4bec 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
@@ -115,6 +115,8 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
    private boolean updateClusterClients = false;
    private boolean updateClusterClientsOnRemove = false;
 
+   private final OpenWireMessageConverter messageConverter;
+
    public OpenWireProtocolManager(OpenWireProtocolManagerFactory factory, ActiveMQServer server) {
       this.factory = factory;
       this.server = server;
@@ -123,6 +125,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
       wireFactory.setCacheEnabled(false);
       advisoryProducerId.setConnectionId(ID_GENERATOR.generateId());
       scheduledPool = server.getScheduledPool();
+      this.messageConverter = new OpenWireMessageConverter(wireFactory.createWireFormat());
 
       final ClusterManager clusterManager = this.server.getClusterManager();
 
@@ -134,6 +137,10 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
       }
    }
 
+   public OpenWireFormat getNewWireFormat() {
+      return (OpenWireFormat)wireFactory.createWireFormat();
+   }
+
    @Override
    public void nodeUP(TopologyMember member, boolean last) {
       if (topologyMap.put(member.getNodeId(), member) == null) {
@@ -217,7 +224,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
 
    @Override
    public MessageConverter getConverter() {
-      return new OpenWireMessageConverter();
+      return messageConverter;
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f653a18e/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index d16d4c8..4db5967 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -61,6 +61,7 @@ import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
 import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
 import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
+import org.apache.activemq.openwire.OpenWireFormat;
 import org.apache.activemq.wireformat.WireFormat;
 
 public class AMQSession implements SessionCallback {
@@ -82,6 +83,11 @@ public class AMQSession implements SessionCallback {
 
    private OpenWireProtocolManager manager;
 
+   // The sessionWireformat used by the session
+   // this object is meant to be used per thread / session
+   // so we make a new one per AMQSession
+   private final OpenWireMessageConverter converter;
+
    public AMQSession(ConnectionInfo connInfo,
                      SessionInfo sessInfo,
                      ActiveMQServer server,
@@ -95,6 +101,9 @@ public class AMQSession implements SessionCallback {
       this.connection = connection;
       this.scheduledPool = scheduledPool;
       this.manager = manager;
+      OpenWireFormat marshaller = (OpenWireFormat)connection.getMarshaller();
+
+      this.converter = new OpenWireMessageConverter(marshaller.copy());
    }
 
    public void initialize() {
@@ -254,7 +263,8 @@ public class AMQSession implements SessionCallback {
       }
 
       for (ActiveMQDestination dest : actualDestinations) {
-         ServerMessageImpl coreMsg = new ServerMessageImpl(-1, 1024);
+
+         ServerMessageImpl coreMsg = (ServerMessageImpl)converter.inbound(messageSend);
 
          /* ActiveMQ failover transport will attempt to reconnect after connection failure.  Any sent messages that did
          * not receive acks will be resent.  (ActiveMQ broker handles this by returning a last sequence id received to
@@ -263,7 +273,6 @@ public class AMQSession implements SessionCallback {
          if (producerExchange.getConnectionContext().isFaultTolerant() && !messageSend.getProperties().containsKey(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID)) {
             coreMsg.putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(), messageSend.getMessageId().toString());
          }
-         OpenWireMessageConverter.toCoreMessage(coreMsg, messageSend, connection.getMarshaller());
          SimpleString address = OpenWireUtil.toCoreAddress(dest);
          coreMsg.setAddress(address);