You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/03/15 21:22:40 UTC
[57/59] [abbrv] activemq-artemis git commit: using converter interface
using converter interface
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/027cfb21
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/027cfb21
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/027cfb21
Branch: refs/heads/refactor-openwire
Commit: 027cfb2131cdcffafc7836741d6df9804b1b5733
Parents: 5d171e3
Author: Clebert Suconic <cl...@apache.org>
Authored: Thu Feb 25 18:57:21 2016 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Tue Mar 15 16:21:23 2016 -0400
----------------------------------------------------------------------
.../protocol/openwire/OpenWireConnection.java | 7 -------
.../openwire/OpenWireMessageConverter.java | 22 +++++++++++++-------
.../openwire/OpenWireProtocolManager.java | 9 +++++++-
.../core/protocol/openwire/amq/AMQSession.java | 13 ++++++++++--
4 files changed, 33 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/027cfb21/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index 6839259..0fd8dc2 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -146,8 +146,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
private volatile AMQSession advisorySession;
- private String defaultSocketURIString;
-
// TODO-NOW: check on why there are two connections created for every createConnection on the client.
public OpenWireConnection(Connection connection,
Executor executor,
@@ -156,7 +154,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
super(connection, executor);
this.protocolManager = openWireProtocolManager;
this.wireFormat = wf;
- this.defaultSocketURIString = connection.getLocalAddress();
}
// SecurityAuth implementation
@@ -635,10 +632,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
return this.context;
}
- public String getDefaultSocketURIString() {
- return defaultSocketURIString;
- }
-
public void updateClient(ConnectionControl control) {
// if (!destroyed && context.isFaultTolerant()) {
if (protocolManager.isUpdateClusterClients()) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/027cfb21/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
index d040955..6176490 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
@@ -96,10 +96,11 @@ public class OpenWireMessageConverter implements MessageConverter {
private static final String AMQ_MSG_DROPPABLE = AMQ_PREFIX + "DROPPABLE";
private static final String AMQ_MSG_COMPRESSED = AMQ_PREFIX + "COMPRESSED";
- @Override
- public ServerMessage inbound(Object message) {
- // TODO: implement this
- return null;
+
+ private final WireFormat marshaller;
+
+ public OpenWireMessageConverter(WireFormat marshaller) {
+ this.marshaller = marshaller;
}
@Override
@@ -108,10 +109,13 @@ public class OpenWireMessageConverter implements MessageConverter {
return null;
}
- //convert an ActiveMQ Artemis message to coreMessage
- public static void toCoreMessage(ServerMessageImpl coreMessage,
- Message messageSend,
- WireFormat marshaller) throws IOException {
+
+ @Override
+ public ServerMessage inbound(Object message) throws Exception {
+
+ Message messageSend = (Message)message;
+ ServerMessageImpl coreMessage = new ServerMessageImpl(-1, messageSend.getSize());
+
String type = messageSend.getType();
if (type != null) {
coreMessage.putStringProperty(new SimpleString("JMSType"), new SimpleString(type));
@@ -398,6 +402,8 @@ public class OpenWireMessageConverter implements MessageConverter {
origDestBytes.compact();
coreMessage.putBytesProperty(AMQ_MSG_ORIG_DESTINATION, origDestBytes.data);
}
+
+ return coreMessage;
}
private static void loadMapIntoProperties(TypedProperties props, Map<String, Object> map) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/027cfb21/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
index 514a2b9..51c4bec 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
@@ -115,6 +115,8 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
private boolean updateClusterClients = false;
private boolean updateClusterClientsOnRemove = false;
+ private final OpenWireMessageConverter messageConverter;
+
public OpenWireProtocolManager(OpenWireProtocolManagerFactory factory, ActiveMQServer server) {
this.factory = factory;
this.server = server;
@@ -123,6 +125,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
wireFactory.setCacheEnabled(false);
advisoryProducerId.setConnectionId(ID_GENERATOR.generateId());
scheduledPool = server.getScheduledPool();
+ this.messageConverter = new OpenWireMessageConverter(wireFactory.createWireFormat());
final ClusterManager clusterManager = this.server.getClusterManager();
@@ -134,6 +137,10 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
}
}
+ public OpenWireFormat getNewWireFormat() {
+ return (OpenWireFormat)wireFactory.createWireFormat();
+ }
+
@Override
public void nodeUP(TopologyMember member, boolean last) {
if (topologyMap.put(member.getNodeId(), member) == null) {
@@ -217,7 +224,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
@Override
public MessageConverter getConverter() {
- return new OpenWireMessageConverter();
+ return messageConverter;
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/027cfb21/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index e59295d..c787eda 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -61,6 +61,7 @@ import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
+import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.wireformat.WireFormat;
public class AMQSession implements SessionCallback {
@@ -82,6 +83,11 @@ public class AMQSession implements SessionCallback {
private OpenWireProtocolManager manager;
+ // The sessionWireformat used by the session
+ // this object is meant to be used per thread / session
+ // so we make a new one per AMQSession
+ private final OpenWireMessageConverter converter;
+
public AMQSession(ConnectionInfo connInfo,
SessionInfo sessInfo,
ActiveMQServer server,
@@ -95,6 +101,9 @@ public class AMQSession implements SessionCallback {
this.connection = connection;
this.scheduledPool = scheduledPool;
this.manager = manager;
+ OpenWireFormat marshaller = (OpenWireFormat)connection.getMarshaller();
+
+ this.converter = new OpenWireMessageConverter(marshaller.copy());
}
public void initialize() {
@@ -249,7 +258,8 @@ public class AMQSession implements SessionCallback {
}
for (ActiveMQDestination dest : actualDestinations) {
- ServerMessageImpl coreMsg = new ServerMessageImpl(-1, 1024);
+
+ ServerMessageImpl coreMsg = (ServerMessageImpl)converter.inbound(messageSend);
/* ActiveMQ failover transport will attempt to reconnect after connection failure. Any sent messages that did
* not receive acks will be resent. (ActiveMQ broker handles this by returning a last sequence id received to
@@ -258,7 +268,6 @@ public class AMQSession implements SessionCallback {
if (producerExchange.getConnectionContext().isFaultTolerant() && !messageSend.getProperties().containsKey(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID)) {
coreMsg.putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(), messageSend.getMessageId().toString());
}
- OpenWireMessageConverter.toCoreMessage(coreMsg, messageSend, connection.getMarshaller());
SimpleString address = OpenWireUtil.toCoreAddress(dest);
coreMsg.setAddress(address);