You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2015/08/10 17:13:32 UTC
[33/53] [abbrv] [partial] activemq-artemis git commit: automatic
checkstyle change
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/Topology.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/Topology.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/Topology.java
index 2c181c5..83c327e 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/Topology.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/Topology.java
@@ -31,8 +31,7 @@ import org.apache.activemq.artemis.api.core.client.ClusterTopologyListener;
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
import org.apache.activemq.artemis.spi.core.remoting.Connector;
-public final class Topology
-{
+public final class Topology {
private final Set<ClusterTopologyListener> topologyListeners;
@@ -57,75 +56,62 @@ public final class Topology
private Map<String, Long> mapDelete;
- private static final class DirectExecutor implements Executor
- {
- public void execute(final Runnable runnable)
- {
+ private static final class DirectExecutor implements Executor {
+
+ public void execute(final Runnable runnable) {
runnable.run();
}
}
- public Topology(final Object owner)
- {
+
+ public Topology(final Object owner) {
this(owner, new DirectExecutor());
}
- public Topology(final Object owner, final Executor executor)
- {
+ public Topology(final Object owner, final Executor executor) {
this.topologyListeners = new HashSet<>();
this.topology = new ConcurrentHashMap<>();
- if (executor == null)
- {
+ if (executor == null) {
throw new IllegalArgumentException("Executor is required");
}
this.executor = executor;
this.owner = owner;
- if (ActiveMQClientLogger.LOGGER.isTraceEnabled())
- {
- ActiveMQClientLogger.LOGGER.trace("Topology@" + Integer.toHexString(System.identityHashCode(this)) + " CREATE",
- new Exception("trace"));
+ if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
+ ActiveMQClientLogger.LOGGER.trace("Topology@" + Integer.toHexString(System.identityHashCode(this)) + " CREATE", new Exception("trace"));
}
}
/**
* It will remove all elements as if it haven't received anyone from the server.
*/
- public void clear()
- {
+ public void clear() {
topology.clear();
}
- public void addClusterTopologyListener(final ClusterTopologyListener listener)
- {
- if (ActiveMQClientLogger.LOGGER.isTraceEnabled())
- {
+ public void addClusterTopologyListener(final ClusterTopologyListener listener) {
+ if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
ActiveMQClientLogger.LOGGER.trace(this + "::Adding topology listener " + listener, new Exception("Trace"));
}
- synchronized (topologyListeners)
- {
+ synchronized (topologyListeners) {
topologyListeners.add(listener);
}
this.sendTopology(listener);
}
- public void removeClusterTopologyListener(final ClusterTopologyListener listener)
- {
- if (ActiveMQClientLogger.LOGGER.isTraceEnabled())
- {
+ public void removeClusterTopologyListener(final ClusterTopologyListener listener) {
+ if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
ActiveMQClientLogger.LOGGER.trace(this + "::Removing topology listener " + listener, new Exception("Trace"));
}
- synchronized (topologyListeners)
- {
+ synchronized (topologyListeners) {
topologyListeners.remove(listener);
}
}
- /** This is called by the server when the node is activated from backup state. It will always succeed */
- public void updateAsLive(final String nodeId, final TopologyMemberImpl memberInput)
- {
- synchronized (this)
- {
- if (ActiveMQClientLogger.LOGGER.isDebugEnabled())
- {
+ /**
+ * This is called by the server when the node is activated from backup state. It will always succeed
+ */
+ public void updateAsLive(final String nodeId, final TopologyMemberImpl memberInput) {
+ synchronized (this) {
+ if (ActiveMQClientLogger.LOGGER.isDebugEnabled()) {
ActiveMQClientLogger.LOGGER.debug(this + "::node " + nodeId + "=" + memberInput);
}
memberInput.setUniqueEventID(System.currentTimeMillis());
@@ -137,48 +123,40 @@ public final class Topology
/**
* After the node is started, it will resend the notifyLive a couple of times to avoid gossip between two servers
+ *
* @param nodeId
*/
- public void resendNode(final String nodeId)
- {
- synchronized (this)
- {
+ public void resendNode(final String nodeId) {
+ synchronized (this) {
TopologyMemberImpl memberInput = topology.get(nodeId);
- if (memberInput != null)
- {
+ if (memberInput != null) {
memberInput.setUniqueEventID(System.currentTimeMillis());
sendMemberUp(nodeId, memberInput);
}
}
}
- /** This is called by the server when the node is activated from backup state. It will always succeed */
- public TopologyMemberImpl updateBackup(final TopologyMemberImpl memberInput)
- {
+ /**
+ * This is called by the server when the node is activated from backup state. It will always succeed
+ */
+ public TopologyMemberImpl updateBackup(final TopologyMemberImpl memberInput) {
final String nodeId = memberInput.getNodeId();
- if (ActiveMQClientLogger.LOGGER.isTraceEnabled())
- {
+ if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
ActiveMQClientLogger.LOGGER.trace(this + "::updateBackup::" + nodeId + ", memberInput=" + memberInput);
}
- synchronized (this)
- {
+ synchronized (this) {
TopologyMemberImpl currentMember = getMember(nodeId);
- if (currentMember == null)
- {
- if (ActiveMQClientLogger.LOGGER.isTraceEnabled())
- {
- ActiveMQClientLogger.LOGGER.trace("There's no live to be updated on backup update, node=" + nodeId + " memberInput=" + memberInput,
- new Exception("trace"));
+ if (currentMember == null) {
+ if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
+ ActiveMQClientLogger.LOGGER.trace("There's no live to be updated on backup update, node=" + nodeId + " memberInput=" + memberInput, new Exception("trace"));
}
currentMember = memberInput;
topology.put(nodeId, currentMember);
}
- TopologyMemberImpl newMember =
- new TopologyMemberImpl(nodeId, currentMember.getBackupGroupName(), currentMember.getScaleDownGroupName(), currentMember.getLive(),
- memberInput.getBackup());
+ TopologyMemberImpl newMember = new TopologyMemberImpl(nodeId, currentMember.getBackupGroupName(), currentMember.getScaleDownGroupName(), currentMember.getLive(), memberInput.getBackup());
newMember.setUniqueEventID(System.currentTimeMillis());
topology.remove(nodeId);
topology.put(nodeId, newMember);
@@ -190,64 +168,51 @@ public final class Topology
/**
* @param uniqueEventID an unique identifier for when the change was made. We will use current
- * time millis for starts, and a ++ of that number for shutdown.
+ * time millis for starts, and a ++ of that number for shutdown.
* @param nodeId
* @param memberInput
* @return {@code true} if an update did take place. Note that backups are *always* updated.
*/
- public boolean updateMember(final long uniqueEventID, final String nodeId, final TopologyMemberImpl memberInput)
- {
+ public boolean updateMember(final long uniqueEventID, final String nodeId, final TopologyMemberImpl memberInput) {
Long deleteTme = getMapDelete().get(nodeId);
- if (deleteTme != null && uniqueEventID != 0 && uniqueEventID < deleteTme)
- {
+ if (deleteTme != null && uniqueEventID != 0 && uniqueEventID < deleteTme) {
ActiveMQClientLogger.LOGGER.debug("Update uniqueEvent=" + uniqueEventID +
- ", nodeId=" +
- nodeId +
- ", memberInput=" +
- memberInput +
- " being rejected as there was a delete done after that");
+ ", nodeId=" +
+ nodeId +
+ ", memberInput=" +
+ memberInput +
+ " being rejected as there was a delete done after that");
return false;
}
- synchronized (this)
- {
+ synchronized (this) {
TopologyMemberImpl currentMember = topology.get(nodeId);
- if (currentMember == null)
- {
- if (ActiveMQClientLogger.LOGGER.isTraceEnabled())
- {
- ActiveMQClientLogger.LOGGER.trace(this + "::NewMemberAdd nodeId=" + nodeId + " member = " + memberInput,
- new Exception("trace"));
+ if (currentMember == null) {
+ if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
+ ActiveMQClientLogger.LOGGER.trace(this + "::NewMemberAdd nodeId=" + nodeId + " member = " + memberInput, new Exception("trace"));
}
memberInput.setUniqueEventID(uniqueEventID);
topology.put(nodeId, memberInput);
sendMemberUp(nodeId, memberInput);
return true;
}
- if (uniqueEventID > currentMember.getUniqueEventID())
- {
- TopologyMemberImpl newMember =
- new TopologyMemberImpl(nodeId, memberInput.getBackupGroupName(), memberInput.getScaleDownGroupName(), memberInput.getLive(),
- memberInput.getBackup());
-
- if (newMember.getLive() == null && currentMember.getLive() != null)
- {
+ if (uniqueEventID > currentMember.getUniqueEventID()) {
+ TopologyMemberImpl newMember = new TopologyMemberImpl(nodeId, memberInput.getBackupGroupName(), memberInput.getScaleDownGroupName(), memberInput.getLive(), memberInput.getBackup());
+
+ if (newMember.getLive() == null && currentMember.getLive() != null) {
newMember.setLive(currentMember.getLive());
}
- if (newMember.getBackup() == null && currentMember.getBackup() != null)
- {
+ if (newMember.getBackup() == null && currentMember.getBackup() != null) {
newMember.setBackup(currentMember.getBackup());
}
- if (ActiveMQClientLogger.LOGGER.isTraceEnabled())
- {
+ if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
ActiveMQClientLogger.LOGGER.trace(this + "::updated currentMember=nodeID=" + nodeId + ", currentMember=" +
- currentMember + ", memberInput=" + memberInput + "newMember=" +
- newMember,
- new Exception("trace"));
+ currentMember + ", memberInput=" + memberInput + "newMember=" +
+ newMember, new Exception("trace"));
}
newMember.setUniqueEventID(uniqueEventID);
@@ -261,8 +226,7 @@ public final class Topology
* always add the backup, better to try to reconnect to something that's not there then to
* not know about it at all
*/
- if (currentMember.getBackup() == null && memberInput.getBackup() != null)
- {
+ if (currentMember.getBackup() == null && memberInput.getBackup() != null) {
currentMember.setBackup(memberInput.getBackup());
}
return false;
@@ -273,39 +237,30 @@ public final class Topology
* @param nodeId
* @param memberToSend
*/
- private void sendMemberUp(final String nodeId, final TopologyMemberImpl memberToSend)
- {
+ private void sendMemberUp(final String nodeId, final TopologyMemberImpl memberToSend) {
final ArrayList<ClusterTopologyListener> copy = copyListeners();
- if (ActiveMQClientLogger.LOGGER.isTraceEnabled())
- {
+ if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
ActiveMQClientLogger.LOGGER.trace(this + "::prepare to send " + nodeId + " to " + copy.size() + " elements");
}
- if (copy.size() > 0)
- {
- executor.execute(new Runnable()
- {
- public void run()
- {
- for (ClusterTopologyListener listener : copy)
- {
- if (ActiveMQClientLogger.LOGGER.isTraceEnabled())
- {
+ if (copy.size() > 0) {
+ executor.execute(new Runnable() {
+ public void run() {
+ for (ClusterTopologyListener listener : copy) {
+ if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
ActiveMQClientLogger.LOGGER.trace(Topology.this + " informing " +
- listener +
- " about node up = " +
- nodeId +
- " connector = " +
- memberToSend.getConnector());
+ listener +
+ " about node up = " +
+ nodeId +
+ " connector = " +
+ memberToSend.getConnector());
}
- try
- {
+ try {
listener.nodeUP(memberToSend, false);
}
- catch (Throwable e)
- {
+ catch (Throwable e) {
ActiveMQClientLogger.LOGGER.errorSendingTopology(e);
}
}
@@ -317,69 +272,54 @@ public final class Topology
/**
* @return
*/
- private ArrayList<ClusterTopologyListener> copyListeners()
- {
+ private ArrayList<ClusterTopologyListener> copyListeners() {
ArrayList<ClusterTopologyListener> listenersCopy;
- synchronized (topologyListeners)
- {
+ synchronized (topologyListeners) {
listenersCopy = new ArrayList<>(topologyListeners);
}
return listenersCopy;
}
- boolean removeMember(final long uniqueEventID, final String nodeId)
- {
+ boolean removeMember(final long uniqueEventID, final String nodeId) {
TopologyMemberImpl member;
- synchronized (this)
- {
+ synchronized (this) {
member = topology.get(nodeId);
- if (member != null)
- {
- if (member.getUniqueEventID() > uniqueEventID)
- {
+ if (member != null) {
+ if (member.getUniqueEventID() > uniqueEventID) {
ActiveMQClientLogger.LOGGER.debug("The removeMember was issued before the node " + nodeId + " was started, ignoring call");
member = null;
}
- else
- {
+ else {
getMapDelete().put(nodeId, uniqueEventID);
member = topology.remove(nodeId);
}
}
}
- if (ActiveMQClientLogger.LOGGER.isTraceEnabled())
- {
+ if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
ActiveMQClientLogger.LOGGER.trace("removeMember " + this +
- " removing nodeID=" +
- nodeId +
- ", result=" +
- member +
- ", size = " +
- topology.size(), new Exception("trace"));
+ " removing nodeID=" +
+ nodeId +
+ ", result=" +
+ member +
+ ", size = " +
+ topology.size(), new Exception("trace"));
}
- if (member != null)
- {
+ if (member != null) {
final ArrayList<ClusterTopologyListener> copy = copyListeners();
- executor.execute(new Runnable()
- {
- public void run()
- {
- for (ClusterTopologyListener listener : copy)
- {
- if (ActiveMQClientLogger.LOGGER.isTraceEnabled())
- {
+ executor.execute(new Runnable() {
+ public void run() {
+ for (ClusterTopologyListener listener : copy) {
+ if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
ActiveMQClientLogger.LOGGER.trace(this + " informing " + listener + " about node down = " + nodeId);
}
- try
- {
+ try {
listener.nodeDown(uniqueEventID, nodeId);
}
- catch (Exception e)
- {
+ catch (Exception e) {
ActiveMQClientLogger.LOGGER.errorSendingTopologyNodedown(e);
}
}
@@ -389,36 +329,29 @@ public final class Topology
return member != null;
}
- public synchronized void sendTopology(final ClusterTopologyListener listener)
- {
- if (ActiveMQClientLogger.LOGGER.isDebugEnabled())
- {
+ public synchronized void sendTopology(final ClusterTopologyListener listener) {
+ if (ActiveMQClientLogger.LOGGER.isDebugEnabled()) {
ActiveMQClientLogger.LOGGER.debug(this + " is sending topology to " + listener);
}
- executor.execute(new Runnable()
- {
- public void run()
- {
+ executor.execute(new Runnable() {
+ public void run() {
int count = 0;
final Map<String, TopologyMemberImpl> copy;
- synchronized (Topology.this)
- {
+ synchronized (Topology.this) {
copy = new HashMap<String, TopologyMemberImpl>(topology);
}
- for (Map.Entry<String, TopologyMemberImpl> entry : copy.entrySet())
- {
- if (ActiveMQClientLogger.LOGGER.isDebugEnabled())
- {
+ for (Map.Entry<String, TopologyMemberImpl> entry : copy.entrySet()) {
+ if (ActiveMQClientLogger.LOGGER.isDebugEnabled()) {
ActiveMQClientLogger.LOGGER.debug(Topology.this + " sending " +
- entry.getKey() +
- " / " +
- entry.getValue().getConnector() +
- " to " +
- listener);
+ entry.getKey() +
+ " / " +
+ entry.getValue().getConnector() +
+ " to " +
+ listener);
}
listener.nodeUP(entry.getValue(), ++count == copy.size());
}
@@ -426,17 +359,13 @@ public final class Topology
});
}
- public synchronized TopologyMemberImpl getMember(final String nodeID)
- {
+ public synchronized TopologyMemberImpl getMember(final String nodeID) {
return topology.get(nodeID);
}
- public synchronized TopologyMemberImpl getMember(final TransportConfiguration configuration)
- {
- for (TopologyMemberImpl member : topology.values())
- {
- if (member.isMember(configuration))
- {
+ public synchronized TopologyMemberImpl getMember(final TransportConfiguration configuration) {
+ for (TopologyMemberImpl member : topology.values()) {
+ if (member.isMember(configuration)) {
return member;
}
}
@@ -444,77 +373,63 @@ public final class Topology
return null;
}
- public synchronized boolean isEmpty()
- {
+ public synchronized boolean isEmpty() {
return topology.isEmpty();
}
- public Collection<TopologyMemberImpl> getMembers()
- {
+ public Collection<TopologyMemberImpl> getMembers() {
ArrayList<TopologyMemberImpl> members;
- synchronized (this)
- {
+ synchronized (this) {
members = new ArrayList<>(topology.values());
}
return members;
}
- synchronized int nodes()
- {
+ synchronized int nodes() {
int count = 0;
- for (TopologyMemberImpl member : topology.values())
- {
- if (member.getLive() != null)
- {
+ for (TopologyMemberImpl member : topology.values()) {
+ if (member.getLive() != null) {
count++;
}
- if (member.getBackup() != null)
- {
+ if (member.getBackup() != null) {
count++;
}
}
return count;
}
- public synchronized String describe()
- {
+ public synchronized String describe() {
return describe("");
}
- private synchronized String describe(final String text)
- {
+ private synchronized String describe(final String text) {
StringBuilder desc = new StringBuilder(text + "topology on " + this + ":\n");
- for (Entry<String, TopologyMemberImpl> entry : new HashMap<>(topology).entrySet())
- {
+ for (Entry<String, TopologyMemberImpl> entry : new HashMap<>(topology).entrySet()) {
desc.append("\t").append(entry.getKey()).append(" => ").append(entry.getValue()).append("\n");
}
desc.append("\t" + "nodes=").append(nodes()).append("\t").append("members=").append(members());
- if (topology.isEmpty())
- {
+ if (topology.isEmpty()) {
desc.append("\tEmpty");
}
return desc.toString();
}
- private int members()
- {
+ private int members() {
return topology.size();
}
- /** The owner exists mainly for debug purposes.
- * When enabling logging and tracing, the Topology updates will include the owner, what will enable to identify
- * what instances are receiving the updates, what will enable better debugging.*/
- public void setOwner(final Object owner)
- {
+ /**
+ * The owner exists mainly for debug purposes.
+ * When enabling logging and tracing, the Topology updates will include the owner, what will enable to identify
+ * what instances are receiving the updates, what will enable better debugging.
+ */
+ public void setOwner(final Object owner) {
this.owner = owner;
}
- public TransportConfiguration getBackupForConnector(final Connector connector)
- {
- for (TopologyMemberImpl member : topology.values())
- {
- if (member.getLive() != null && connector.isEquivalent(member.getLive().getParams()))
- {
+ public TransportConfiguration getBackupForConnector(final Connector connector) {
+ for (TopologyMemberImpl member : topology.values()) {
+ if (member.getLive() != null && connector.isEquivalent(member.getLive().getParams())) {
return member.getBackup();
}
}
@@ -522,19 +437,15 @@ public final class Topology
}
@Override
- public String toString()
- {
- if (owner == null)
- {
+ public String toString() {
+ if (owner == null) {
return "Topology@" + Integer.toHexString(System.identityHashCode(this));
}
return "Topology@" + Integer.toHexString(System.identityHashCode(this)) + "[owner=" + owner + "]";
}
- private synchronized Map<String, Long> getMapDelete()
- {
- if (mapDelete == null)
- {
+ private synchronized Map<String, Long> getMapDelete() {
+ if (mapDelete == null) {
mapDelete = new ConcurrentHashMap<>();
}
return mapDelete;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/TopologyMemberImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/TopologyMemberImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/TopologyMemberImpl.java
index 4f71b8f..f220457 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/TopologyMemberImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/TopologyMemberImpl.java
@@ -21,8 +21,8 @@ import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.TopologyMember;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
-public final class TopologyMemberImpl implements TopologyMember
-{
+public final class TopologyMemberImpl implements TopologyMember {
+
private static final long serialVersionUID = 1123652191795626133L;
private final Pair<TransportConfiguration, TransportConfiguration> connector;
@@ -38,9 +38,11 @@ public final class TopologyMemberImpl implements TopologyMember
private final String nodeId;
- public TopologyMemberImpl(String nodeId, final String backupGroupName, final String scaleDownGroupName, final TransportConfiguration a,
- final TransportConfiguration b)
- {
+ public TopologyMemberImpl(String nodeId,
+ final String backupGroupName,
+ final String scaleDownGroupName,
+ final TransportConfiguration a,
+ final TransportConfiguration b) {
this.nodeId = nodeId;
this.backupGroupName = backupGroupName;
this.scaleDownGroupName = scaleDownGroupName;
@@ -49,90 +51,72 @@ public final class TopologyMemberImpl implements TopologyMember
}
@Override
- public TransportConfiguration getLive()
- {
+ public TransportConfiguration getLive() {
return connector.getA();
}
@Override
- public TransportConfiguration getBackup()
- {
+ public TransportConfiguration getBackup() {
return connector.getB();
}
- public void setBackup(final TransportConfiguration param)
- {
+ public void setBackup(final TransportConfiguration param) {
connector.setB(param);
}
- public void setLive(final TransportConfiguration param)
- {
+ public void setLive(final TransportConfiguration param) {
connector.setA(param);
}
@Override
- public String getNodeId()
- {
+ public String getNodeId() {
return nodeId;
}
@Override
- public long getUniqueEventID()
- {
+ public long getUniqueEventID() {
return uniqueEventID;
}
@Override
- public String getBackupGroupName()
- {
+ public String getBackupGroupName() {
return backupGroupName;
}
@Override
- public String getScaleDownGroupName()
- {
+ public String getScaleDownGroupName() {
return scaleDownGroupName;
}
/**
* @param uniqueEventID the uniqueEventID to set
*/
- public void setUniqueEventID(final long uniqueEventID)
- {
+ public void setUniqueEventID(final long uniqueEventID) {
this.uniqueEventID = uniqueEventID;
}
- public Pair<TransportConfiguration, TransportConfiguration> getConnector()
- {
+ public Pair<TransportConfiguration, TransportConfiguration> getConnector() {
return connector;
}
-
- public boolean isMember(RemotingConnection connection)
- {
+ public boolean isMember(RemotingConnection connection) {
TransportConfiguration connectorConfig = connection.getTransportConnection() != null ? connection.getTransportConnection().getConnectorConfig() : null;
return isMember(connectorConfig);
}
- public boolean isMember(TransportConfiguration configuration)
- {
- if (getConnector().getA() != null && getConnector().getA().equals(configuration) ||
- getConnector().getB() != null && getConnector().getB().equals(configuration))
- {
+ public boolean isMember(TransportConfiguration configuration) {
+ if (getConnector().getA() != null && getConnector().getA().equals(configuration) || getConnector().getB() != null && getConnector().getB().equals(configuration)) {
return true;
}
- else
- {
+ else {
return false;
}
}
-
@Override
- public String toString()
- {
+ public String toString() {
return "TopologyMember[id = " + nodeId + ", connector=" + connector + ", backupGroupName=" + backupGroupName + ", scaleDownGroupName=" + scaleDownGroupName + "]";
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/cluster/DiscoveryEntry.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/cluster/DiscoveryEntry.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/cluster/DiscoveryEntry.java
index 3af57c5..59de1fd 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/cluster/DiscoveryEntry.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/cluster/DiscoveryEntry.java
@@ -18,38 +18,32 @@ package org.apache.activemq.artemis.core.cluster;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
-public class DiscoveryEntry
-{
+public class DiscoveryEntry {
+
private final String nodeID;
private final TransportConfiguration connector;
private final long lastUpdate;
-
- public DiscoveryEntry(final String nodeID, final TransportConfiguration connector, final long lastUpdate)
- {
+ public DiscoveryEntry(final String nodeID, final TransportConfiguration connector, final long lastUpdate) {
this.nodeID = nodeID;
this.connector = connector;
this.lastUpdate = lastUpdate;
}
- public String getNodeID()
- {
+ public String getNodeID() {
return nodeID;
}
- public TransportConfiguration getConnector()
- {
+ public TransportConfiguration getConnector() {
return connector;
}
- public long getLastUpdate()
- {
+ public long getLastUpdate() {
return lastUpdate;
}
@Override
- public String toString()
- {
+ public String toString() {
return "DiscoveryEntry[nodeID=" + nodeID + ", connector=" + connector + ", lastUpdate=" + lastUpdate + "]";
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/cluster/DiscoveryGroup.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/cluster/DiscoveryGroup.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/cluster/DiscoveryGroup.java
index bb4ea6d..5e8fe77 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/cluster/DiscoveryGroup.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/cluster/DiscoveryGroup.java
@@ -44,10 +44,9 @@ import org.apache.activemq.artemis.utils.TypedProperties;
*
* We will probably keep both interfaces for a while as UDP is a simple solution requiring no extra dependencies which
* is suitable for users looking for embedded solutions.
- *
*/
-public final class DiscoveryGroup implements ActiveMQComponent
-{
+public final class DiscoveryGroup implements ActiveMQComponent {
+
private static final boolean isTrace = ActiveMQClientLogger.LOGGER.isTraceEnabled();
private final List<DiscoveryListener> listeners = new ArrayList<DiscoveryListener>();
@@ -84,10 +83,11 @@ public final class DiscoveryGroup implements ActiveMQComponent
* @param service
* @throws Exception
*/
- public DiscoveryGroup(final String nodeID, final String name, final long timeout,
+ public DiscoveryGroup(final String nodeID,
+ final String name,
+ final long timeout,
BroadcastEndpointFactory endpointFactory,
- NotificationService service) throws Exception
- {
+ NotificationService service) throws Exception {
this.nodeID = nodeID;
this.name = name;
this.timeout = timeout;
@@ -95,10 +95,8 @@ public final class DiscoveryGroup implements ActiveMQComponent
this.notificationService = service;
}
- public synchronized void start() throws Exception
- {
- if (started)
- {
+ public synchronized void start() throws Exception {
+ if (started) {
return;
}
@@ -112,8 +110,7 @@ public final class DiscoveryGroup implements ActiveMQComponent
thread.start();
- if (notificationService != null)
- {
+ if (notificationService != null) {
TypedProperties props = new TypedProperties();
props.putSimpleStringProperty(new SimpleString("name"), new SimpleString(name));
@@ -128,113 +125,90 @@ public final class DiscoveryGroup implements ActiveMQComponent
* This will start the DiscoveryRunnable and run it directly.
* This is useful for a test process where we need this execution blocking a thread.
*/
- public void internalRunning() throws Exception
- {
+ public void internalRunning() throws Exception {
endpoint.openClient();
started = true;
DiscoveryRunnable runnable = new DiscoveryRunnable();
runnable.run();
}
- public void stop()
- {
- synchronized (this)
- {
- if (!started)
- {
+ public void stop() {
+ synchronized (this) {
+ if (!started) {
return;
}
started = false;
}
- synchronized (waitLock)
- {
+ synchronized (waitLock) {
waitLock.notifyAll();
}
- try
- {
+ try {
endpoint.close(false);
}
- catch (Exception e1)
- {
+ catch (Exception e1) {
ActiveMQClientLogger.LOGGER.errorStoppingDiscoveryBroadcastEndpoint(endpoint, e1);
}
- try
- {
- if (thread != null)
- {
+ try {
+ if (thread != null) {
thread.interrupt();
thread.join(10000);
- if (thread.isAlive())
- {
+ if (thread.isAlive()) {
ActiveMQClientLogger.LOGGER.timedOutStoppingDiscovery();
}
}
}
- catch (InterruptedException e)
- {
+ catch (InterruptedException e) {
throw new ActiveMQInterruptedException(e);
}
thread = null;
- if (notificationService != null)
- {
+ if (notificationService != null) {
TypedProperties props = new TypedProperties();
props.putSimpleStringProperty(new SimpleString("name"), new SimpleString(name));
Notification notification = new Notification(nodeID, CoreNotificationType.DISCOVERY_GROUP_STOPPED, props);
- try
- {
+ try {
notificationService.sendNotification(notification);
}
- catch (Exception e)
- {
+ catch (Exception e) {
ActiveMQClientLogger.LOGGER.errorSendingNotifOnDiscoveryStop(e);
}
}
}
- public boolean isStarted()
- {
+ public boolean isStarted() {
return started;
}
- public String getName()
- {
+ public String getName() {
return name;
}
- public synchronized List<DiscoveryEntry> getDiscoveryEntries()
- {
+ public synchronized List<DiscoveryEntry> getDiscoveryEntries() {
List<DiscoveryEntry> list = new ArrayList<DiscoveryEntry>(connectors.values());
return list;
}
- public boolean waitForBroadcast(final long timeout)
- {
- synchronized (waitLock)
- {
+ public boolean waitForBroadcast(final long timeout) {
+ synchronized (waitLock) {
long start = System.currentTimeMillis();
long toWait = timeout;
- while (started && !received && (toWait > 0 || timeout == 0))
- {
- try
- {
+ while (started && !received && (toWait > 0 || timeout == 0)) {
+ try {
waitLock.wait(toWait);
}
- catch (InterruptedException e)
- {
+ catch (InterruptedException e) {
throw new ActiveMQInterruptedException(e);
}
- if (timeout != 0)
- {
+ if (timeout != 0) {
long now = System.currentTimeMillis();
toWait -= now - start;
@@ -255,42 +229,32 @@ public final class DiscoveryGroup implements ActiveMQComponent
* This is a sanity check to catch any cases where two different nodes are broadcasting the same node id either
* due to misconfiguration or problems in failover
*/
- private void checkUniqueID(final String originatingNodeID, final String uniqueID)
- {
+ private void checkUniqueID(final String originatingNodeID, final String uniqueID) {
String currentUniqueID = uniqueIDMap.get(originatingNodeID);
- if (currentUniqueID == null)
- {
+ if (currentUniqueID == null) {
uniqueIDMap.put(originatingNodeID, uniqueID);
}
- else
- {
- if (!currentUniqueID.equals(uniqueID))
- {
+ else {
+ if (!currentUniqueID.equals(uniqueID)) {
ActiveMQClientLogger.LOGGER.multipleServersBroadcastingSameNode(originatingNodeID);
uniqueIDMap.put(originatingNodeID, uniqueID);
}
}
}
- class DiscoveryRunnable implements Runnable
- {
- public void run()
- {
+ class DiscoveryRunnable implements Runnable {
+
+ public void run() {
byte[] data = null;
- while (started)
- {
- try
- {
- try
- {
+ while (started) {
+ try {
+ try {
data = endpoint.receiveBroadcast();
- if (data == null)
- {
- if (started)
- {
+ if (data == null) {
+ if (started) {
// This is totally unexpected, so I'm not even bothering on creating
// a log entry for that
ActiveMQClientLogger.LOGGER.warn("Unexpected null data received from DiscoveryEndpoint");
@@ -298,14 +262,11 @@ public final class DiscoveryGroup implements ActiveMQComponent
break;
}
}
- catch (Exception e)
- {
- if (!started)
- {
+ catch (Exception e) {
+ if (!started) {
return;
}
- else
- {
+ else {
ActiveMQClientLogger.LOGGER.errorReceivingPAcketInDiscovery(e);
}
}
@@ -318,10 +279,8 @@ public final class DiscoveryGroup implements ActiveMQComponent
checkUniqueID(originatingNodeID, uniqueID);
- if (nodeID.equals(originatingNodeID))
- {
- if (checkExpiration())
- {
+ if (nodeID.equals(originatingNodeID)) {
+ if (checkExpiration()) {
callListeners();
}
// Ignore traffic from own node
@@ -334,8 +293,7 @@ public final class DiscoveryGroup implements ActiveMQComponent
DiscoveryEntry[] entriesRead = new DiscoveryEntry[size];
// Will first decode all the elements outside of any lock
- for (int i = 0; i < size; i++)
- {
+ for (int i = 0; i < size; i++) {
TransportConfiguration connector = new TransportConfiguration();
connector.decode(buffer);
@@ -343,12 +301,9 @@ public final class DiscoveryGroup implements ActiveMQComponent
entriesRead[i] = new DiscoveryEntry(originatingNodeID, connector, System.currentTimeMillis());
}
- synchronized (DiscoveryGroup.this)
- {
- for (DiscoveryEntry entry : entriesRead)
- {
- if (connectors.put(originatingNodeID, entry) == null)
- {
+ synchronized (DiscoveryGroup.this) {
+ for (DiscoveryEntry entry : entriesRead) {
+ if (connectors.put(originatingNodeID, entry) == null) {
changed = true;
}
}
@@ -357,28 +312,23 @@ public final class DiscoveryGroup implements ActiveMQComponent
}
//only call the listeners if we have changed
//also make sure that we aren't stopping to avoid deadlock
- if (changed && started)
- {
- if (isTrace)
- {
+ if (changed && started) {
+ if (isTrace) {
ActiveMQClientLogger.LOGGER.trace("Connectors changed on Discovery:");
- for (DiscoveryEntry connector : connectors.values())
- {
+ for (DiscoveryEntry connector : connectors.values()) {
ActiveMQClientLogger.LOGGER.trace(connector);
}
}
callListeners();
}
- synchronized (waitLock)
- {
+ synchronized (waitLock) {
received = true;
waitLock.notifyAll();
}
}
- catch (Throwable e)
- {
+ catch (Throwable e) {
ActiveMQClientLogger.LOGGER.failedToReceiveDatagramInDiscovery(e);
}
}
@@ -386,39 +336,31 @@ public final class DiscoveryGroup implements ActiveMQComponent
}
- public synchronized void registerListener(final DiscoveryListener listener)
- {
+ public synchronized void registerListener(final DiscoveryListener listener) {
listeners.add(listener);
- if (!connectors.isEmpty())
- {
+ if (!connectors.isEmpty()) {
listener.connectorsChanged(getDiscoveryEntries());
}
}
- public synchronized void unregisterListener(final DiscoveryListener listener)
- {
+ public synchronized void unregisterListener(final DiscoveryListener listener) {
listeners.remove(listener);
}
- private void callListeners()
- {
- for (DiscoveryListener listener : listeners)
- {
- try
- {
+ private void callListeners() {
+ for (DiscoveryListener listener : listeners) {
+ try {
listener.connectorsChanged(getDiscoveryEntries());
}
- catch (Throwable t)
- {
+ catch (Throwable t) {
// Catch it so exception doesn't prevent other listeners from running
ActiveMQClientLogger.LOGGER.failedToCallListenerInDiscovery(t);
}
}
}
- private boolean checkExpiration()
- {
+ private boolean checkExpiration() {
boolean changed = false;
long now = System.currentTimeMillis();
@@ -426,14 +368,11 @@ public final class DiscoveryGroup implements ActiveMQComponent
// Weed out any expired connectors
- while (iter.hasNext())
- {
+ while (iter.hasNext()) {
Map.Entry<String, DiscoveryEntry> entry = iter.next();
- if (entry.getValue().getLastUpdate() + timeout <= now)
- {
- if (isTrace)
- {
+ if (entry.getValue().getLastUpdate() + timeout <= now) {
+ if (isTrace) {
ActiveMQClientLogger.LOGGER.trace("Timed out node on discovery:" + entry.getValue());
}
iter.remove();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/cluster/DiscoveryListener.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/cluster/DiscoveryListener.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/cluster/DiscoveryListener.java
index e37578c..fa86e61 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/cluster/DiscoveryListener.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/cluster/DiscoveryListener.java
@@ -21,7 +21,7 @@ import java.util.List;
/**
* To be called any time Discovery changes its list of nodes.
*/
-public interface DiscoveryListener
-{
+public interface DiscoveryListener {
+
void connectorsChanged(List<DiscoveryEntry> newConnectors);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/exception/ActiveMQXAException.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/exception/ActiveMQXAException.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/exception/ActiveMQXAException.java
index c66d4ee..e7283e0 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/exception/ActiveMQXAException.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/exception/ActiveMQXAException.java
@@ -18,19 +18,17 @@ package org.apache.activemq.artemis.core.exception;
import javax.transaction.xa.XAException;
-public class ActiveMQXAException extends XAException
-{
+public class ActiveMQXAException extends XAException {
+
private static final long serialVersionUID = 6535914602965015803L;
- public ActiveMQXAException(final int errorCode, final String message)
- {
+ public ActiveMQXAException(final int errorCode, final String message) {
super(message);
this.errorCode = errorCode;
}
- public ActiveMQXAException(final int errorCode)
- {
+ public ActiveMQXAException(final int errorCode) {
super(errorCode);
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/BodyEncoder.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/BodyEncoder.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/BodyEncoder.java
index bfc9cde..baafaac 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/BodyEncoder.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/BodyEncoder.java
@@ -26,8 +26,8 @@ import org.apache.activemq.artemis.api.core.ActiveMQException;
* <br>
* Used to send large streams over the wire
*/
-public interface BodyEncoder
-{
+public interface BodyEncoder {
+
/**
* This method must not be called directly by ActiveMQ Artemis clients.
*/
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageImpl.java
index 4598201..eae2de3 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageImpl.java
@@ -40,8 +40,8 @@ import org.apache.activemq.artemis.utils.UUID;
* <p>
* All messages handled by ActiveMQ Artemis core are of this type
*/
-public abstract class MessageImpl implements MessageInternal
-{
+public abstract class MessageImpl implements MessageInternal {
+
public static final SimpleString HDR_ROUTE_TO_IDS = new SimpleString("_AMQ_ROUTE_TO");
public static final SimpleString HDR_SCALEDOWN_TO_IDS = new SimpleString("_AMQ_SCALEDOWN_TO");
@@ -92,8 +92,7 @@ public abstract class MessageImpl implements MessageInternal
// Constructors --------------------------------------------------
- protected MessageImpl()
- {
+ protected MessageImpl() {
properties = new TypedProperties();
}
@@ -112,8 +111,7 @@ public abstract class MessageImpl implements MessageInternal
final long expiration,
final long timestamp,
final byte priority,
- final int initialMessageBufferSize)
- {
+ final int initialMessageBufferSize) {
this();
this.type = type;
this.durable = durable;
@@ -123,8 +121,7 @@ public abstract class MessageImpl implements MessageInternal
createBody(initialMessageBufferSize);
}
- protected MessageImpl(final int initialMessageBufferSize)
- {
+ protected MessageImpl(final int initialMessageBufferSize) {
this();
createBody(initialMessageBufferSize);
}
@@ -132,16 +129,14 @@ public abstract class MessageImpl implements MessageInternal
/*
* Copy constructor
*/
- protected MessageImpl(final MessageImpl other)
- {
+ protected MessageImpl(final MessageImpl other) {
this(other, other.getProperties());
}
/*
* Copy constructor
*/
- protected MessageImpl(final MessageImpl other, TypedProperties properties)
- {
+ protected MessageImpl(final MessageImpl other, TypedProperties properties) {
messageID = other.getMessageID();
userID = other.getUserID();
address = other.getAddress();
@@ -155,15 +150,13 @@ public abstract class MessageImpl implements MessageInternal
// This MUST be synchronized using the monitor on the other message to prevent it running concurrently
// with getEncodedBuffer(), otherwise can introduce race condition when delivering concurrently to
// many subscriptions and bridging to other nodes in a cluster
- synchronized (other)
- {
+ synchronized (other) {
bufferValid = other.bufferValid;
endOfBodyPosition = other.endOfBodyPosition;
endOfMessagePosition = other.endOfMessagePosition;
copied = other.copied;
- if (other.buffer != null)
- {
+ if (other.buffer != null) {
other.bufferUsed = true;
// We need to copy the underlying buffer too, since the different messsages thereafter might have different
@@ -177,8 +170,7 @@ public abstract class MessageImpl implements MessageInternal
// Message implementation ----------------------------------------
- public int getEncodeSize()
- {
+ public int getEncodeSize() {
int headersPropsSize = getHeadersAndPropertiesEncodeSize();
int bodyPos = getEndOfBodyPosition();
@@ -188,8 +180,7 @@ public abstract class MessageImpl implements MessageInternal
return DataConstants.SIZE_INT + bodySize + DataConstants.SIZE_INT + headersPropsSize;
}
- public int getHeadersAndPropertiesEncodeSize()
- {
+ public int getHeadersAndPropertiesEncodeSize() {
return DataConstants.SIZE_LONG + // Message ID
DataConstants.SIZE_BYTE + // user id null?
(userID == null ? 0 : 16) +
@@ -202,17 +193,13 @@ public abstract class MessageImpl implements MessageInternal
/* PropertySize and Properties */properties.getEncodeSize();
}
-
- public void encodeHeadersAndProperties(final ActiveMQBuffer buffer)
- {
+ public void encodeHeadersAndProperties(final ActiveMQBuffer buffer) {
buffer.writeLong(messageID);
buffer.writeNullableSimpleString(address);
- if (userID == null)
- {
+ if (userID == null) {
buffer.writeByte(DataConstants.NULL);
}
- else
- {
+ else {
buffer.writeByte(DataConstants.NOT_NULL);
buffer.writeBytes(userID.asBytes());
}
@@ -224,18 +211,15 @@ public abstract class MessageImpl implements MessageInternal
properties.encode(buffer);
}
- public void decodeHeadersAndProperties(final ActiveMQBuffer buffer)
- {
+ public void decodeHeadersAndProperties(final ActiveMQBuffer buffer) {
messageID = buffer.readLong();
address = buffer.readNullableSimpleString();
- if (buffer.readByte() == DataConstants.NOT_NULL)
- {
+ if (buffer.readByte() == DataConstants.NOT_NULL) {
byte[] bytes = new byte[16];
buffer.readBytes(bytes);
userID = new UUID(UUID.TYPE_TIME_BASED, bytes);
}
- else
- {
+ else {
userID = null;
}
type = buffer.readByte();
@@ -246,8 +230,7 @@ public abstract class MessageImpl implements MessageInternal
properties.decode(buffer);
}
- public void copyHeadersAndProperties(final MessageInternal msg)
- {
+ public void copyHeadersAndProperties(final MessageInternal msg) {
messageID = msg.getMessageID();
address = msg.getAddress();
userID = msg.getUserID();
@@ -259,38 +242,31 @@ public abstract class MessageImpl implements MessageInternal
properties = msg.getTypedProperties();
}
- public ActiveMQBuffer getBodyBuffer()
- {
- if (bodyBuffer == null)
- {
+ public ActiveMQBuffer getBodyBuffer() {
+ if (bodyBuffer == null) {
bodyBuffer = new ResetLimitWrappedActiveMQBuffer(BODY_OFFSET, buffer, this);
}
return bodyBuffer;
}
- public Message writeBodyBufferBytes(byte[] bytes)
- {
+ public Message writeBodyBufferBytes(byte[] bytes) {
getBodyBuffer().writeBytes(bytes);
return this;
}
- public Message writeBodyBufferString(String string)
- {
+ public Message writeBodyBufferString(String string) {
getBodyBuffer().writeString(string);
return this;
}
- public void checkCompletion() throws ActiveMQException
- {
+ public void checkCompletion() throws ActiveMQException {
// no op on regular messages
}
-
- public synchronized ActiveMQBuffer getBodyBufferCopy()
- {
+ public synchronized ActiveMQBuffer getBodyBufferCopy() {
// Must copy buffer before sending it
ActiveMQBuffer newBuffer = buffer.copy(0, buffer.capacity());
@@ -300,18 +276,15 @@ public abstract class MessageImpl implements MessageInternal
return new ResetLimitWrappedActiveMQBuffer(BODY_OFFSET, newBuffer, null);
}
- public long getMessageID()
- {
+ public long getMessageID() {
return messageID;
}
- public UUID getUserID()
- {
+ public UUID getUserID() {
return userID;
}
- public MessageImpl setUserID(final UUID userID)
- {
+ public MessageImpl setUserID(final UUID userID) {
this.userID = userID;
return this;
}
@@ -320,8 +293,7 @@ public abstract class MessageImpl implements MessageInternal
* this doesn't need to be synchronized as setAddress is protecting the buffer,
* not the address
*/
- public SimpleString getAddress()
- {
+ public SimpleString getAddress() {
return address;
}
@@ -330,13 +302,10 @@ public abstract class MessageImpl implements MessageInternal
* This synchronization can probably be removed since setAddress is always called from a single thread.
* However I will keep it as it's harmless and it's been well tested
*/
- public Message setAddress(final SimpleString address)
- {
+ public Message setAddress(final SimpleString address) {
// This is protecting the buffer
- synchronized (this)
- {
- if (this.address != address)
- {
+ synchronized (this) {
+ if (this.address != address) {
this.address = address;
bufferValid = false;
@@ -346,25 +315,20 @@ public abstract class MessageImpl implements MessageInternal
return this;
}
- public byte getType()
- {
+ public byte getType() {
return type;
}
- public void setType(byte type)
- {
+ public void setType(byte type) {
this.type = type;
}
- public boolean isDurable()
- {
+ public boolean isDurable() {
return durable;
}
- public MessageImpl setDurable(final boolean durable)
- {
- if (this.durable != durable)
- {
+ public MessageImpl setDurable(final boolean durable) {
+ if (this.durable != durable) {
this.durable = durable;
bufferValid = false;
@@ -372,15 +336,12 @@ public abstract class MessageImpl implements MessageInternal
return this;
}
- public long getExpiration()
- {
+ public long getExpiration() {
return expiration;
}
- public MessageImpl setExpiration(final long expiration)
- {
- if (this.expiration != expiration)
- {
+ public MessageImpl setExpiration(final long expiration) {
+ if (this.expiration != expiration) {
this.expiration = expiration;
bufferValid = false;
@@ -388,15 +349,12 @@ public abstract class MessageImpl implements MessageInternal
return this;
}
- public long getTimestamp()
- {
+ public long getTimestamp() {
return timestamp;
}
- public MessageImpl setTimestamp(final long timestamp)
- {
- if (this.timestamp != timestamp)
- {
+ public MessageImpl setTimestamp(final long timestamp) {
+ if (this.timestamp != timestamp) {
this.timestamp = timestamp;
bufferValid = false;
@@ -404,15 +362,12 @@ public abstract class MessageImpl implements MessageInternal
return this;
}
- public byte getPriority()
- {
+ public byte getPriority() {
return priority;
}
- public MessageImpl setPriority(final byte priority)
- {
- if (this.priority != priority)
- {
+ public MessageImpl setPriority(final byte priority) {
+ if (this.priority != priority) {
this.priority = priority;
bufferValid = false;
@@ -420,23 +375,19 @@ public abstract class MessageImpl implements MessageInternal
return this;
}
- public boolean isExpired()
- {
- if (expiration == 0)
- {
+ public boolean isExpired() {
+ if (expiration == 0) {
return false;
}
return System.currentTimeMillis() - expiration >= 0;
}
- public Map<String, Object> toMap()
- {
+ public Map<String, Object> toMap() {
Map<String, Object> map = new HashMap<String, Object>();
map.put("messageID", messageID);
- if (userID != null)
- {
+ if (userID != null) {
map.put("userID", "ID:" + userID.toString());
}
map.put("address", address.toString());
@@ -445,22 +396,19 @@ public abstract class MessageImpl implements MessageInternal
map.put("expiration", expiration);
map.put("timestamp", timestamp);
map.put("priority", priority);
- for (SimpleString propName : properties.getPropertyNames())
- {
+ for (SimpleString propName : properties.getPropertyNames()) {
map.put(propName.toString(), properties.getProperty(propName));
}
return map;
}
- public void decodeFromBuffer(final ActiveMQBuffer buffer)
- {
+ public void decodeFromBuffer(final ActiveMQBuffer buffer) {
this.buffer = buffer;
decode();
}
- public void bodyChanged()
- {
+ public void bodyChanged() {
// If the body is changed we must copy the buffer otherwise can affect the previously sent message
// which might be in the Netty write queue
checkCopy();
@@ -470,46 +418,38 @@ public abstract class MessageImpl implements MessageInternal
endOfBodyPosition = -1;
}
- public synchronized void checkCopy()
- {
- if (!copied)
- {
+ public synchronized void checkCopy() {
+ if (!copied) {
forceCopy();
copied = true;
}
}
- public synchronized void resetCopied()
- {
+ public synchronized void resetCopied() {
copied = false;
}
- public int getEndOfMessagePosition()
- {
+ public int getEndOfMessagePosition() {
return endOfMessagePosition;
}
- public int getEndOfBodyPosition()
- {
- if (endOfBodyPosition < 0)
- {
+ public int getEndOfBodyPosition() {
+ if (endOfBodyPosition < 0) {
endOfBodyPosition = buffer.writerIndex();
}
return endOfBodyPosition;
}
// Encode to journal or paging
- public void encode(final ActiveMQBuffer buff)
- {
+ public void encode(final ActiveMQBuffer buff) {
encodeToBuffer();
buff.writeBytes(buffer, BUFFER_HEADER_SPACE, endOfMessagePosition - BUFFER_HEADER_SPACE);
}
// Decode from journal or paging
- public void decode(final ActiveMQBuffer buff)
- {
+ public void decode(final ActiveMQBuffer buff) {
int start = buff.readerIndex();
endOfBodyPosition = buff.readInt();
@@ -527,20 +467,17 @@ public abstract class MessageImpl implements MessageInternal
buff.readerIndex(start + length);
}
- public synchronized ActiveMQBuffer getEncodedBuffer()
- {
+ public synchronized ActiveMQBuffer getEncodedBuffer() {
ActiveMQBuffer buff = encodeToBuffer();
- if (bufferUsed)
- {
+ if (bufferUsed) {
ActiveMQBuffer copied = buff.copy(0, buff.capacity());
copied.setIndex(0, endOfMessagePosition);
return copied;
}
- else
- {
+ else {
buffer.setIndex(0, endOfMessagePosition);
bufferUsed = true;
@@ -549,17 +486,14 @@ public abstract class MessageImpl implements MessageInternal
}
}
- public void setAddressTransient(final SimpleString address)
- {
+ public void setAddressTransient(final SimpleString address) {
this.address = address;
}
-
// Properties
// ---------------------------------------------------------------------------------------
- public Message putBooleanProperty(final SimpleString key, final boolean value)
- {
+ public Message putBooleanProperty(final SimpleString key, final boolean value) {
properties.putBooleanProperty(key, value);
bufferValid = false;
@@ -567,8 +501,7 @@ public abstract class MessageImpl implements MessageInternal
return this;
}
- public Message putByteProperty(final SimpleString key, final byte value)
- {
+ public Message putByteProperty(final SimpleString key, final byte value) {
properties.putByteProperty(key, value);
bufferValid = false;
@@ -576,8 +509,7 @@ public abstract class MessageImpl implements MessageInternal
return this;
}
- public Message putBytesProperty(final SimpleString key, final byte[] value)
- {
+ public Message putBytesProperty(final SimpleString key, final byte[] value) {
properties.putBytesProperty(key, value);
bufferValid = false;
@@ -586,8 +518,7 @@ public abstract class MessageImpl implements MessageInternal
}
@Override
- public Message putCharProperty(SimpleString key, char value)
- {
+ public Message putCharProperty(SimpleString key, char value) {
properties.putCharProperty(key, value);
bufferValid = false;
@@ -595,40 +526,35 @@ public abstract class MessageImpl implements MessageInternal
}
@Override
- public Message putCharProperty(String key, char value)
- {
+ public Message putCharProperty(String key, char value) {
properties.putCharProperty(new SimpleString(key), value);
bufferValid = false;
return this;
}
- public Message putShortProperty(final SimpleString key, final short value)
- {
+ public Message putShortProperty(final SimpleString key, final short value) {
properties.putShortProperty(key, value);
bufferValid = false;
return this;
}
- public Message putIntProperty(final SimpleString key, final int value)
- {
+ public Message putIntProperty(final SimpleString key, final int value) {
properties.putIntProperty(key, value);
bufferValid = false;
return this;
}
- public Message putLongProperty(final SimpleString key, final long value)
- {
+ public Message putLongProperty(final SimpleString key, final long value) {
properties.putLongProperty(key, value);
bufferValid = false;
return this;
}
- public Message putFloatProperty(final SimpleString key, final float value)
- {
+ public Message putFloatProperty(final SimpleString key, final float value) {
properties.putFloatProperty(key, value);
bufferValid = false;
@@ -636,8 +562,7 @@ public abstract class MessageImpl implements MessageInternal
return this;
}
- public Message putDoubleProperty(final SimpleString key, final double value)
- {
+ public Message putDoubleProperty(final SimpleString key, final double value) {
properties.putDoubleProperty(key, value);
bufferValid = false;
@@ -645,8 +570,7 @@ public abstract class MessageImpl implements MessageInternal
return this;
}
- public Message putStringProperty(final SimpleString key, final SimpleString value)
- {
+ public Message putStringProperty(final SimpleString key, final SimpleString value) {
properties.putSimpleStringProperty(key, value);
bufferValid = false;
@@ -654,16 +578,15 @@ public abstract class MessageImpl implements MessageInternal
return this;
}
- public Message putObjectProperty(final SimpleString key, final Object value) throws ActiveMQPropertyConversionException
- {
+ public Message putObjectProperty(final SimpleString key,
+ final Object value) throws ActiveMQPropertyConversionException {
TypedProperties.setObjectProperty(key, value, properties);
bufferValid = false;
return this;
}
- public Message putObjectProperty(final String key, final Object value) throws ActiveMQPropertyConversionException
- {
+ public Message putObjectProperty(final String key, final Object value) throws ActiveMQPropertyConversionException {
putObjectProperty(new SimpleString(key), value);
bufferValid = false;
@@ -671,8 +594,7 @@ public abstract class MessageImpl implements MessageInternal
return this;
}
- public Message putBooleanProperty(final String key, final boolean value)
- {
+ public Message putBooleanProperty(final String key, final boolean value) {
properties.putBooleanProperty(new SimpleString(key), value);
bufferValid = false;
@@ -680,8 +602,7 @@ public abstract class MessageImpl implements MessageInternal
return this;
}
- public Message putByteProperty(final String key, final byte value)
- {
+ public Message putByteProperty(final String key, final byte value) {
properties.putByteProperty(new SimpleString(key), value);
bufferValid = false;
@@ -689,8 +610,7 @@ public abstract class MessageImpl implements MessageInternal
return this;
}
- public Message putBytesProperty(final String key, final byte[] value)
- {
+ public Message putBytesProperty(final String key, final byte[] value) {
properties.putBytesProperty(new SimpleString(key), value);
bufferValid = false;
@@ -698,8 +618,7 @@ public abstract class MessageImpl implements MessageInternal
return this;
}
- public Message putShortProperty(final String key, final short value)
- {
+ public Message putShortProperty(final String key, final short value) {
properties.putShortProperty(new SimpleString(key), value);
bufferValid = false;
@@ -707,8 +626,7 @@ public abstract class MessageImpl implements MessageInternal
return this;
}
- public Message putIntProperty(final String key, final int value)
- {
+ public Message putIntProperty(final String key, final int value) {
properties.putIntProperty(new SimpleString(key), value);
bufferValid = false;
@@ -716,8 +634,7 @@ public abstract class MessageImpl implements MessageInternal
return this;
}
- public Message putLongProperty(final String key, final long value)
- {
+ public Message putLongProperty(final String key, final long value) {
properties.putLongProperty(new SimpleString(key), value);
bufferValid = false;
@@ -725,8 +642,7 @@ public abstract class MessageImpl implements MessageInternal
return this;
}
- public Message putFloatProperty(final String key, final float value)
- {
+ public Message putFloatProperty(final String key, final float value) {
properties.putFloatProperty(new SimpleString(key), value);
bufferValid = false;
@@ -734,8 +650,7 @@ public abstract class MessageImpl implements MessageInternal
return this;
}
- public Message putDoubleProperty(final String key, final double value)
- {
+ public Message putDoubleProperty(final String key, final double value) {
properties.putDoubleProperty(new SimpleString(key), value);
bufferValid = false;
@@ -743,8 +658,7 @@ public abstract class MessageImpl implements MessageInternal
return this;
}
- public Message putStringProperty(final String key, final String value)
- {
+ public Message putStringProperty(final String key, final String value) {
properties.putSimpleStringProperty(new SimpleString(key), SimpleString.toSimpleString(value));
bufferValid = false;
@@ -752,8 +666,7 @@ public abstract class MessageImpl implements MessageInternal
return this;
}
- public Message putTypedProperties(final TypedProperties otherProps)
- {
+ public Message putTypedProperties(final TypedProperties otherProps) {
properties.putTypedProperties(otherProps);
bufferValid = false;
@@ -761,180 +674,145 @@ public abstract class MessageImpl implements MessageInternal
return this;
}
- public Object getObjectProperty(final SimpleString key)
- {
+ public Object getObjectProperty(final SimpleString key) {
return properties.getProperty(key);
}
- public Boolean getBooleanProperty(final SimpleString key) throws ActiveMQPropertyConversionException
- {
+ public Boolean getBooleanProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
return properties.getBooleanProperty(key);
}
- public Boolean getBooleanProperty(final String key) throws ActiveMQPropertyConversionException
- {
+ public Boolean getBooleanProperty(final String key) throws ActiveMQPropertyConversionException {
return properties.getBooleanProperty(new SimpleString(key));
}
- public Byte getByteProperty(final SimpleString key) throws ActiveMQPropertyConversionException
- {
+ public Byte getByteProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
return properties.getByteProperty(key);
}
- public Byte getByteProperty(final String key) throws ActiveMQPropertyConversionException
- {
+ public Byte getByteProperty(final String key) throws ActiveMQPropertyConversionException {
return properties.getByteProperty(new SimpleString(key));
}
- public byte[] getBytesProperty(final SimpleString key) throws ActiveMQPropertyConversionException
- {
+ public byte[] getBytesProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
return properties.getBytesProperty(key);
}
- public byte[] getBytesProperty(final String key) throws ActiveMQPropertyConversionException
- {
+ public byte[] getBytesProperty(final String key) throws ActiveMQPropertyConversionException {
return getBytesProperty(new SimpleString(key));
}
- public Double getDoubleProperty(final SimpleString key) throws ActiveMQPropertyConversionException
- {
+ public Double getDoubleProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
return properties.getDoubleProperty(key);
}
- public Double getDoubleProperty(final String key) throws ActiveMQPropertyConversionException
- {
+ public Double getDoubleProperty(final String key) throws ActiveMQPropertyConversionException {
return properties.getDoubleProperty(new SimpleString(key));
}
- public Integer getIntProperty(final SimpleString key) throws ActiveMQPropertyConversionException
- {
+ public Integer getIntProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
return properties.getIntProperty(key);
}
- public Integer getIntProperty(final String key) throws ActiveMQPropertyConversionException
- {
+ public Integer getIntProperty(final String key) throws ActiveMQPropertyConversionException {
return properties.getIntProperty(new SimpleString(key));
}
- public Long getLongProperty(final SimpleString key) throws ActiveMQPropertyConversionException
- {
+ public Long getLongProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
return properties.getLongProperty(key);
}
- public Long getLongProperty(final String key) throws ActiveMQPropertyConversionException
- {
+ public Long getLongProperty(final String key) throws ActiveMQPropertyConversionException {
return properties.getLongProperty(new SimpleString(key));
}
- public Short getShortProperty(final SimpleString key) throws ActiveMQPropertyConversionException
- {
+ public Short getShortProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
return properties.getShortProperty(key);
}
- public Short getShortProperty(final String key) throws ActiveMQPropertyConversionException
- {
+ public Short getShortProperty(final String key) throws ActiveMQPropertyConversionException {
return properties.getShortProperty(new SimpleString(key));
}
- public Float getFloatProperty(final SimpleString key) throws ActiveMQPropertyConversionException
- {
+ public Float getFloatProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
return properties.getFloatProperty(key);
}
- public Float getFloatProperty(final String key) throws ActiveMQPropertyConversionException
- {
+ public Float getFloatProperty(final String key) throws ActiveMQPropertyConversionException {
return properties.getFloatProperty(new SimpleString(key));
}
- public String getStringProperty(final SimpleString key) throws ActiveMQPropertyConversionException
- {
+ public String getStringProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
SimpleString str = getSimpleStringProperty(key);
- if (str == null)
- {
+ if (str == null) {
return null;
}
- else
- {
+ else {
return str.toString();
}
}
- public String getStringProperty(final String key) throws ActiveMQPropertyConversionException
- {
+ public String getStringProperty(final String key) throws ActiveMQPropertyConversionException {
return getStringProperty(new SimpleString(key));
}
- public SimpleString getSimpleStringProperty(final SimpleString key) throws ActiveMQPropertyConversionException
- {
+ public SimpleString getSimpleStringProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
return properties.getSimpleStringProperty(key);
}
- public SimpleString getSimpleStringProperty(final String key) throws ActiveMQPropertyConversionException
- {
+ public SimpleString getSimpleStringProperty(final String key) throws ActiveMQPropertyConversionException {
return properties.getSimpleStringProperty(new SimpleString(key));
}
- public Object getObjectProperty(final String key)
- {
+ public Object getObjectProperty(final String key) {
return properties.getProperty(new SimpleString(key));
}
- public Object removeProperty(final SimpleString key)
- {
+ public Object removeProperty(final SimpleString key) {
bufferValid = false;
return properties.removeProperty(key);
}
- public Object removeProperty(final String key)
- {
+ public Object removeProperty(final String key) {
bufferValid = false;
return properties.removeProperty(new SimpleString(key));
}
- public boolean containsProperty(final SimpleString key)
- {
+ public boolean containsProperty(final SimpleString key) {
return properties.containsProperty(key);
}
- public boolean containsProperty(final String key)
- {
+ public boolean containsProperty(final String key) {
return properties.containsProperty(new SimpleString(key));
}
- public Set<SimpleString> getPropertyNames()
- {
+ public Set<SimpleString> getPropertyNames() {
return properties.getPropertyNames();
}
- public ActiveMQBuffer getWholeBuffer()
- {
+ public ActiveMQBuffer getWholeBuffer() {
return buffer;
}
- public BodyEncoder getBodyEncoder() throws ActiveMQException
- {
+ public BodyEncoder getBodyEncoder() throws ActiveMQException {
return new DecodingContext();
}
- public TypedProperties getTypedProperties()
- {
+ public TypedProperties getTypedProperties() {
return this.properties;
}
@Override
- public boolean equals(Object other)
- {
+ public boolean equals(Object other) {
- if (this == other)
- {
+ if (this == other) {
return true;
}
- if (other instanceof MessageImpl)
- {
+ if (other instanceof MessageImpl) {
MessageImpl message = (MessageImpl) other;
if (this.getMessageID() == message.getMessageID())
@@ -950,10 +828,10 @@ public abstract class MessageImpl implements MessageInternal
* I'm leaving this message here without any callers for a reason:
* During debugs it's important eventually to identify what's on the bodies, and this method will give you a good idea about them.
* Add the message.bodyToString() to the Watch variables on the debugger view and this will show up like a charm!!!
+ *
* @return
*/
- public String bodyToString()
- {
+ public String bodyToString() {
getEndOfBodyPosition();
int readerIndex1 = this.buffer.readerIndex();
buffer.readerIndex(0);
@@ -962,8 +840,7 @@ public abstract class MessageImpl implements MessageInternal
buffer.readerIndex(readerIndex1);
byte[] buffer2 = null;
- if (bodyBuffer != null)
- {
+ if (bodyBuffer != null) {
int readerIndex2 = this.bodyBuffer.readerIndex();
bodyBuffer.readerIndex(0);
buffer2 = new byte[bodyBuffer.writerIndex() - bodyBuffer.readerIndex()];
@@ -974,13 +851,9 @@ public abstract class MessageImpl implements MessageInternal
return "ServerMessage@" + Integer.toHexString(System.identityHashCode(this)) + "[" + ",bodyStart=" + getEndOfBodyPosition() + " buffer=" + ByteUtil.bytesToHex(buffer1, 1) + ", bodyBuffer=" + ByteUtil.bytesToHex(buffer2, 1);
}
-
-
-
@Override
- public int hashCode()
- {
- return 31 + (int)(messageID ^ (messageID >>> 32));
+ public int hashCode() {
+ return 31 + (int) (messageID ^ (messageID >>> 32));
}
// Public --------------------------------------------------------
@@ -991,20 +864,16 @@ public abstract class MessageImpl implements MessageInternal
// Private -------------------------------------------------------
- public TypedProperties getProperties()
- {
+ public TypedProperties getProperties() {
return properties;
}
// This must be synchronized as it can be called concurrently id the message is being delivered
// concurrently to
// many queues - the first caller in this case will actually encode it
- private synchronized ActiveMQBuffer encodeToBuffer()
- {
- if (!bufferValid)
- {
- if (bufferUsed)
- {
+ private synchronized ActiveMQBuffer encodeToBuffer() {
+ if (!bufferValid) {
+ if (bufferUsed) {
// Cannot use same buffer - must copy
forceCopy();
@@ -1021,13 +890,11 @@ public abstract class MessageImpl implements MessageInternal
// Position at end of body and skip past the message end position int.
// check for enough room in the buffer even though it is dynamic
- if ((bodySize + 4) > buffer.capacity())
- {
+ if ((bodySize + 4) > buffer.capacity()) {
buffer.setIndex(0, bodySize);
buffer.writeInt(0);
}
- else
- {
+ else {
buffer.setIndex(0, bodySize + DataConstants.SIZE_INT);
}
@@ -1045,8 +912,7 @@ public abstract class MessageImpl implements MessageInternal
return buffer;
}
- private void decode()
- {
+ private void decode() {
endOfBodyPosition = buffer.getInt(BUFFER_HEADER_SPACE);
buffer.readerIndex(endOfBodyPosition + DataConstants.SIZE_INT);
@@ -1058,8 +924,7 @@ public abstract class MessageImpl implements MessageInternal
bufferValid = true;
}
- public void createBody(final int initialMessageBufferSize)
- {
+ public void createBody(final int initialMessageBufferSize) {
buffer = ActiveMQBuffers.dynamicBuffer(initialMessageBufferSize);
// There's a bug in netty which means a dynamic buffer won't resize until you write a byte
@@ -1068,16 +933,14 @@ public abstract class MessageImpl implements MessageInternal
buffer.setIndex(BODY_OFFSET, BODY_OFFSET);
}
- private void forceCopy()
- {
+ private void forceCopy() {
// Must copy buffer before sending it
buffer = buffer.copy(0, buffer.capacity());
buffer.setIndex(0, getEndOfBodyPosition());
- if (bodyBuffer != null)
- {
+ if (bodyBuffer != null) {
bodyBuffer.setBuffer(buffer);
}
@@ -1086,35 +949,29 @@ public abstract class MessageImpl implements MessageInternal
// Inner classes -------------------------------------------------
- private final class DecodingContext implements BodyEncoder
- {
+ private final class DecodingContext implements BodyEncoder {
+
private int lastPos = 0;
- public DecodingContext()
- {
+ public DecodingContext() {
}
- public void open()
- {
+ public void open() {
}
- public void close()
- {
+ public void close() {
}
- public long getLargeBodySize()
- {
+ public long getLargeBodySize() {
return buffer.writerIndex();
}
- public int encode(final ByteBuffer bufferRead) throws ActiveMQException
- {
+ public int encode(final ByteBuffer bufferRead) throws ActiveMQException {
ActiveMQBuffer buffer = ActiveMQBuffers.wrappedBuffer(bufferRead);
return encode(buffer, bufferRead.capacity());
}
- public int encode(final ActiveMQBuffer bufferOut, final int size)
- {
+ public int encode(final ActiveMQBuffer bufferOut, final int size) {
bufferOut.writeBytes(getWholeBuffer(), lastPos, size);
lastPos += size;
return size;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageInternal.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageInternal.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageInternal.java
index 1606658..728f1c5 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageInternal.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageInternal.java
@@ -25,8 +25,8 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.message.BodyEncoder;
import org.apache.activemq.artemis.utils.TypedProperties;
-public interface MessageInternal extends Message
-{
+public interface MessageInternal extends Message {
+
void decodeFromBuffer(ActiveMQBuffer buffer);
int getEndOfMessagePosition();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/ClientPacketDecoder.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/ClientPacketDecoder.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/ClientPacketDecoder.java
index 4b49a6e..a7930cb 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/ClientPacketDecoder.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/ClientPacketDecoder.java
@@ -27,14 +27,13 @@ import org.apache.activemq.artemis.core.protocol.core.impl.PacketDecoder;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveClientLargeMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveMessage;
-public class ClientPacketDecoder extends PacketDecoder
-{
+public class ClientPacketDecoder extends PacketDecoder {
+
private static final long serialVersionUID = 6952614096979334582L;
public static final ClientPacketDecoder INSTANCE = new ClientPacketDecoder();
@Override
- public Packet decode(final ActiveMQBuffer in)
- {
+ public Packet decode(final ActiveMQBuffer in) {
final byte packetType = in.readByte();
Packet packet = decode(packetType);
@@ -45,24 +44,19 @@ public class ClientPacketDecoder extends PacketDecoder
}
@Override
- public Packet decode(byte packetType)
- {
+ public Packet decode(byte packetType) {
Packet packet;
- switch (packetType)
- {
- case SESS_RECEIVE_MSG:
- {
+ switch (packetType) {
+ case SESS_RECEIVE_MSG: {
packet = new SessionReceiveMessage(new ClientMessageImpl());
break;
}
- case SESS_RECEIVE_LARGE_MSG:
- {
+ case SESS_RECEIVE_LARGE_MSG: {
packet = new SessionReceiveClientLargeMessage(new ClientLargeMessageImpl());
break;
}
- default:
- {
+ default: {
packet = super.decode(packetType);
}
}