You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2015/08/10 17:13:27 UTC
[28/53] [abbrv] [partial] activemq-artemis git commit: automatic
checkstyle change
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXAJoinMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXAJoinMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXAJoinMessage.java
index 8b7844b..dde0dfb 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXAJoinMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXAJoinMessage.java
@@ -22,42 +22,36 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.utils.XidCodecSupport;
-public class SessionXAJoinMessage extends PacketImpl
-{
+public class SessionXAJoinMessage extends PacketImpl {
+
private Xid xid;
- public SessionXAJoinMessage(final Xid xid)
- {
+ public SessionXAJoinMessage(final Xid xid) {
super(SESS_XA_JOIN);
this.xid = xid;
}
- public SessionXAJoinMessage()
- {
+ public SessionXAJoinMessage() {
super(SESS_XA_JOIN);
}
- public Xid getXid()
- {
+ public Xid getXid() {
return xid;
}
@Override
- public void encodeRest(final ActiveMQBuffer buffer)
- {
+ public void encodeRest(final ActiveMQBuffer buffer) {
XidCodecSupport.encodeXid(xid, buffer);
}
@Override
- public void decodeRest(final ActiveMQBuffer buffer)
- {
+ public void decodeRest(final ActiveMQBuffer buffer) {
xid = XidCodecSupport.decodeXid(buffer);
}
@Override
- public int hashCode()
- {
+ public int hashCode() {
final int prime = 31;
int result = super.hashCode();
result = prime * result + ((xid == null) ? 0 : xid.hashCode());
@@ -65,17 +59,15 @@ public class SessionXAJoinMessage extends PacketImpl
}
@Override
- public boolean equals(Object obj)
- {
+ public boolean equals(Object obj) {
if (this == obj)
return true;
if (!super.equals(obj))
return false;
if (!(obj instanceof SessionXAJoinMessage))
return false;
- SessionXAJoinMessage other = (SessionXAJoinMessage)obj;
- if (xid == null)
- {
+ SessionXAJoinMessage other = (SessionXAJoinMessage) obj;
+ if (xid == null) {
if (other.xid != null)
return false;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXAPrepareMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXAPrepareMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXAPrepareMessage.java
index 2da03ba..948c6db 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXAPrepareMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXAPrepareMessage.java
@@ -22,44 +22,36 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.utils.XidCodecSupport;
-public class SessionXAPrepareMessage extends PacketImpl
-{
+public class SessionXAPrepareMessage extends PacketImpl {
private Xid xid;
- public SessionXAPrepareMessage(final Xid xid)
- {
+ public SessionXAPrepareMessage(final Xid xid) {
super(SESS_XA_PREPARE);
this.xid = xid;
}
- public SessionXAPrepareMessage()
- {
+ public SessionXAPrepareMessage() {
super(SESS_XA_PREPARE);
}
-
- public Xid getXid()
- {
+ public Xid getXid() {
return xid;
}
@Override
- public void encodeRest(final ActiveMQBuffer buffer)
- {
+ public void encodeRest(final ActiveMQBuffer buffer) {
XidCodecSupport.encodeXid(xid, buffer);
}
@Override
- public void decodeRest(final ActiveMQBuffer buffer)
- {
+ public void decodeRest(final ActiveMQBuffer buffer) {
xid = XidCodecSupport.decodeXid(buffer);
}
@Override
- public int hashCode()
- {
+ public int hashCode() {
final int prime = 31;
int result = super.hashCode();
result = prime * result + ((xid == null) ? 0 : xid.hashCode());
@@ -67,17 +59,15 @@ public class SessionXAPrepareMessage extends PacketImpl
}
@Override
- public boolean equals(Object obj)
- {
+ public boolean equals(Object obj) {
if (this == obj)
return true;
if (!super.equals(obj))
return false;
if (!(obj instanceof SessionXAPrepareMessage))
return false;
- SessionXAPrepareMessage other = (SessionXAPrepareMessage)obj;
- if (xid == null)
- {
+ SessionXAPrepareMessage other = (SessionXAPrepareMessage) obj;
+ if (xid == null) {
if (other.xid != null)
return false;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXAResponseMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXAResponseMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXAResponseMessage.java
index 05906ef..d19035b 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXAResponseMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXAResponseMessage.java
@@ -19,16 +19,15 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
-public class SessionXAResponseMessage extends PacketImpl
-{
+public class SessionXAResponseMessage extends PacketImpl {
+
private boolean error;
private int responseCode;
private String message;
- public SessionXAResponseMessage(final boolean isError, final int responseCode, final String message)
- {
+ public SessionXAResponseMessage(final boolean isError, final int responseCode, final String message) {
super(SESS_XA_RESP);
error = isError;
@@ -38,53 +37,45 @@ public class SessionXAResponseMessage extends PacketImpl
this.message = message;
}
- public SessionXAResponseMessage()
- {
+ public SessionXAResponseMessage() {
super(SESS_XA_RESP);
}
// Public --------------------------------------------------------
@Override
- public boolean isResponse()
- {
+ public boolean isResponse() {
return true;
}
- public boolean isError()
- {
+ public boolean isError() {
return error;
}
- public int getResponseCode()
- {
+ public int getResponseCode() {
return responseCode;
}
- public String getMessage()
- {
+ public String getMessage() {
return message;
}
@Override
- public void encodeRest(final ActiveMQBuffer buffer)
- {
+ public void encodeRest(final ActiveMQBuffer buffer) {
buffer.writeBoolean(error);
buffer.writeInt(responseCode);
buffer.writeNullableString(message);
}
@Override
- public void decodeRest(final ActiveMQBuffer buffer)
- {
+ public void decodeRest(final ActiveMQBuffer buffer) {
error = buffer.readBoolean();
responseCode = buffer.readInt();
message = buffer.readNullableString();
}
@Override
- public int hashCode()
- {
+ public int hashCode() {
final int prime = 31;
int result = super.hashCode();
result = prime * result + (error ? 1231 : 1237);
@@ -94,19 +85,17 @@ public class SessionXAResponseMessage extends PacketImpl
}
@Override
- public boolean equals(Object obj)
- {
+ public boolean equals(Object obj) {
if (this == obj)
return true;
if (!super.equals(obj))
return false;
if (!(obj instanceof SessionXAResponseMessage))
return false;
- SessionXAResponseMessage other = (SessionXAResponseMessage)obj;
+ SessionXAResponseMessage other = (SessionXAResponseMessage) obj;
if (error != other.error)
return false;
- if (message == null)
- {
+ if (message == null) {
if (other.message != null)
return false;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXAResumeMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXAResumeMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXAResumeMessage.java
index 6055f8f..dbb6eb0 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXAResumeMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXAResumeMessage.java
@@ -22,45 +22,38 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.utils.XidCodecSupport;
-public class SessionXAResumeMessage extends PacketImpl
-{
+public class SessionXAResumeMessage extends PacketImpl {
private Xid xid;
- public SessionXAResumeMessage(final Xid xid)
- {
+ public SessionXAResumeMessage(final Xid xid) {
super(SESS_XA_RESUME);
this.xid = xid;
}
- public SessionXAResumeMessage()
- {
+ public SessionXAResumeMessage() {
super(SESS_XA_RESUME);
}
// Public --------------------------------------------------------
- public Xid getXid()
- {
+ public Xid getXid() {
return xid;
}
@Override
- public void encodeRest(final ActiveMQBuffer buffer)
- {
+ public void encodeRest(final ActiveMQBuffer buffer) {
XidCodecSupport.encodeXid(xid, buffer);
}
@Override
- public void decodeRest(final ActiveMQBuffer buffer)
- {
+ public void decodeRest(final ActiveMQBuffer buffer) {
xid = XidCodecSupport.decodeXid(buffer);
}
@Override
- public int hashCode()
- {
+ public int hashCode() {
final int prime = 31;
int result = super.hashCode();
result = prime * result + ((xid == null) ? 0 : xid.hashCode());
@@ -68,17 +61,15 @@ public class SessionXAResumeMessage extends PacketImpl
}
@Override
- public boolean equals(Object obj)
- {
+ public boolean equals(Object obj) {
if (this == obj)
return true;
if (!super.equals(obj))
return false;
if (!(obj instanceof SessionXAResumeMessage))
return false;
- SessionXAResumeMessage other = (SessionXAResumeMessage)obj;
- if (xid == null)
- {
+ SessionXAResumeMessage other = (SessionXAResumeMessage) obj;
+ if (xid == null) {
if (other.xid != null)
return false;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXARollbackMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXARollbackMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXARollbackMessage.java
index ed40eb0..e7e337f 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXARollbackMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXARollbackMessage.java
@@ -22,45 +22,38 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.utils.XidCodecSupport;
-public class SessionXARollbackMessage extends PacketImpl
-{
+public class SessionXARollbackMessage extends PacketImpl {
+
private Xid xid;
- public SessionXARollbackMessage(final Xid xid)
- {
+ public SessionXARollbackMessage(final Xid xid) {
super(SESS_XA_ROLLBACK);
this.xid = xid;
}
- public SessionXARollbackMessage()
- {
+ public SessionXARollbackMessage() {
super(SESS_XA_ROLLBACK);
}
// Public --------------------------------------------------------
- public Xid getXid()
- {
+ public Xid getXid() {
return xid;
}
-
@Override
- public void encodeRest(final ActiveMQBuffer buffer)
- {
+ public void encodeRest(final ActiveMQBuffer buffer) {
XidCodecSupport.encodeXid(xid, buffer);
}
@Override
- public void decodeRest(final ActiveMQBuffer buffer)
- {
+ public void decodeRest(final ActiveMQBuffer buffer) {
xid = XidCodecSupport.decodeXid(buffer);
}
@Override
- public int hashCode()
- {
+ public int hashCode() {
final int prime = 31;
int result = super.hashCode();
result = prime * result + ((xid == null) ? 0 : xid.hashCode());
@@ -68,17 +61,15 @@ public class SessionXARollbackMessage extends PacketImpl
}
@Override
- public boolean equals(Object obj)
- {
+ public boolean equals(Object obj) {
if (this == obj)
return true;
if (!super.equals(obj))
return false;
if (!(obj instanceof SessionXARollbackMessage))
return false;
- SessionXARollbackMessage other = (SessionXARollbackMessage)obj;
- if (xid == null)
- {
+ SessionXARollbackMessage other = (SessionXARollbackMessage) obj;
+ if (xid == null) {
if (other.xid != null)
return false;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXASetTimeoutMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXASetTimeoutMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXASetTimeoutMessage.java
index 5e1f063..ff5d72e 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXASetTimeoutMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXASetTimeoutMessage.java
@@ -19,44 +19,38 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
-public class SessionXASetTimeoutMessage extends PacketImpl
-{
+public class SessionXASetTimeoutMessage extends PacketImpl {
+
private int timeoutSeconds;
- public SessionXASetTimeoutMessage(final int timeoutSeconds)
- {
+ public SessionXASetTimeoutMessage(final int timeoutSeconds) {
super(SESS_XA_SET_TIMEOUT);
this.timeoutSeconds = timeoutSeconds;
}
- public SessionXASetTimeoutMessage()
- {
+ public SessionXASetTimeoutMessage() {
super(SESS_XA_SET_TIMEOUT);
}
// Public --------------------------------------------------------
- public int getTimeoutSeconds()
- {
+ public int getTimeoutSeconds() {
return timeoutSeconds;
}
@Override
- public void encodeRest(final ActiveMQBuffer buffer)
- {
+ public void encodeRest(final ActiveMQBuffer buffer) {
buffer.writeInt(timeoutSeconds);
}
@Override
- public void decodeRest(final ActiveMQBuffer buffer)
- {
+ public void decodeRest(final ActiveMQBuffer buffer) {
timeoutSeconds = buffer.readInt();
}
@Override
- public int hashCode()
- {
+ public int hashCode() {
final int prime = 31;
int result = super.hashCode();
result = prime * result + timeoutSeconds;
@@ -64,15 +58,14 @@ public class SessionXASetTimeoutMessage extends PacketImpl
}
@Override
- public boolean equals(Object obj)
- {
+ public boolean equals(Object obj) {
if (this == obj)
return true;
if (!super.equals(obj))
return false;
if (!(obj instanceof SessionXASetTimeoutMessage))
return false;
- SessionXASetTimeoutMessage other = (SessionXASetTimeoutMessage)obj;
+ SessionXASetTimeoutMessage other = (SessionXASetTimeoutMessage) obj;
if (timeoutSeconds != other.timeoutSeconds)
return false;
return true;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXASetTimeoutResponseMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXASetTimeoutResponseMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXASetTimeoutResponseMessage.java
index f9d90a7..cc84c43 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXASetTimeoutResponseMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXASetTimeoutResponseMessage.java
@@ -19,50 +19,43 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
-public class SessionXASetTimeoutResponseMessage extends PacketImpl
-{
+public class SessionXASetTimeoutResponseMessage extends PacketImpl {
+
private boolean ok;
- public SessionXASetTimeoutResponseMessage(final boolean ok)
- {
+ public SessionXASetTimeoutResponseMessage(final boolean ok) {
super(SESS_XA_SET_TIMEOUT_RESP);
this.ok = ok;
}
- public SessionXASetTimeoutResponseMessage()
- {
+ public SessionXASetTimeoutResponseMessage() {
super(SESS_XA_SET_TIMEOUT_RESP);
}
// Public --------------------------------------------------------
@Override
- public boolean isResponse()
- {
+ public boolean isResponse() {
return true;
}
- public boolean isOK()
- {
+ public boolean isOK() {
return ok;
}
@Override
- public void encodeRest(final ActiveMQBuffer buffer)
- {
+ public void encodeRest(final ActiveMQBuffer buffer) {
buffer.writeBoolean(ok);
}
@Override
- public void decodeRest(final ActiveMQBuffer buffer)
- {
+ public void decodeRest(final ActiveMQBuffer buffer) {
ok = buffer.readBoolean();
}
@Override
- public int hashCode()
- {
+ public int hashCode() {
final int prime = 31;
int result = super.hashCode();
result = prime * result + (ok ? 1231 : 1237);
@@ -70,15 +63,14 @@ public class SessionXASetTimeoutResponseMessage extends PacketImpl
}
@Override
- public boolean equals(Object obj)
- {
+ public boolean equals(Object obj) {
if (this == obj)
return true;
if (!super.equals(obj))
return false;
if (!(obj instanceof SessionXASetTimeoutResponseMessage))
return false;
- SessionXASetTimeoutResponseMessage other = (SessionXASetTimeoutResponseMessage)obj;
+ SessionXASetTimeoutResponseMessage other = (SessionXASetTimeoutResponseMessage) obj;
if (ok != other.ok)
return false;
return true;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXAStartMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXAStartMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXAStartMessage.java
index 2d38a03..68b37c3 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXAStartMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXAStartMessage.java
@@ -22,8 +22,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.utils.XidCodecSupport;
-public class SessionXAStartMessage extends PacketImpl
-{
+public class SessionXAStartMessage extends PacketImpl {
// Constants -----------------------------------------------------
// Attributes ----------------------------------------------------
@@ -34,40 +33,34 @@ public class SessionXAStartMessage extends PacketImpl
// Constructors --------------------------------------------------
- public SessionXAStartMessage(final Xid xid)
- {
+ public SessionXAStartMessage(final Xid xid) {
super(SESS_XA_START);
this.xid = xid;
}
- public SessionXAStartMessage()
- {
+ public SessionXAStartMessage() {
super(SESS_XA_START);
}
// Public --------------------------------------------------------
- public Xid getXid()
- {
+ public Xid getXid() {
return xid;
}
@Override
- public void encodeRest(final ActiveMQBuffer buffer)
- {
+ public void encodeRest(final ActiveMQBuffer buffer) {
XidCodecSupport.encodeXid(xid, buffer);
}
@Override
- public void decodeRest(final ActiveMQBuffer buffer)
- {
+ public void decodeRest(final ActiveMQBuffer buffer) {
xid = XidCodecSupport.decodeXid(buffer);
}
@Override
- public int hashCode()
- {
+ public int hashCode() {
final int prime = 31;
int result = super.hashCode();
result = prime * result + ((xid == null) ? 0 : xid.hashCode());
@@ -75,17 +68,15 @@ public class SessionXAStartMessage extends PacketImpl
}
@Override
- public boolean equals(Object obj)
- {
+ public boolean equals(Object obj) {
if (this == obj)
return true;
if (!super.equals(obj))
return false;
if (!(obj instanceof SessionXAStartMessage))
return false;
- SessionXAStartMessage other = (SessionXAStartMessage)obj;
- if (xid == null)
- {
+ SessionXAStartMessage other = (SessionXAStartMessage) obj;
+ if (xid == null) {
if (other.xid != null)
return false;
}
@@ -94,5 +85,4 @@ public class SessionXAStartMessage extends PacketImpl
return true;
}
-
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SubscribeClusterTopologyUpdatesMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SubscribeClusterTopologyUpdatesMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SubscribeClusterTopologyUpdatesMessage.java
index 4e3401a..1b8abc9 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SubscribeClusterTopologyUpdatesMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SubscribeClusterTopologyUpdatesMessage.java
@@ -19,66 +19,56 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
-public class SubscribeClusterTopologyUpdatesMessage extends PacketImpl
-{
+public class SubscribeClusterTopologyUpdatesMessage extends PacketImpl {
private boolean clusterConnection;
- public SubscribeClusterTopologyUpdatesMessage(final boolean clusterConnection)
- {
+ public SubscribeClusterTopologyUpdatesMessage(final boolean clusterConnection) {
super(SUBSCRIBE_TOPOLOGY);
this.clusterConnection = clusterConnection;
}
- protected SubscribeClusterTopologyUpdatesMessage(byte packetType, final boolean clusterConnection)
- {
+ protected SubscribeClusterTopologyUpdatesMessage(byte packetType, final boolean clusterConnection) {
super(packetType);
this.clusterConnection = clusterConnection;
}
- public SubscribeClusterTopologyUpdatesMessage()
- {
+ public SubscribeClusterTopologyUpdatesMessage() {
super(SUBSCRIBE_TOPOLOGY);
}
- protected SubscribeClusterTopologyUpdatesMessage(byte packetType)
- {
+ protected SubscribeClusterTopologyUpdatesMessage(byte packetType) {
super(packetType);
}
// Public --------------------------------------------------------
- public boolean isClusterConnection()
- {
+ public boolean isClusterConnection() {
return clusterConnection;
}
@Override
- public void encodeRest(final ActiveMQBuffer buffer)
- {
+ public void encodeRest(final ActiveMQBuffer buffer) {
buffer.writeBoolean(clusterConnection);
}
@Override
- public void decodeRest(final ActiveMQBuffer buffer)
- {
+ public void decodeRest(final ActiveMQBuffer buffer) {
clusterConnection = buffer.readBoolean();
}
@Override
- public String toString()
- {
+ public String toString() {
return "SubscribeClusterTopologyUpdatesMessage [clusterConnection=" + clusterConnection +
- ", toString()=" +
- super.toString() +
- "]";
+ ", toString()=" +
+ super.toString() +
+ "]";
}
@Override
- public int hashCode()
- {
+ public int hashCode() {
final int prime = 31;
int result = super.hashCode();
result = prime * result + (clusterConnection ? 1231 : 1237);
@@ -86,15 +76,14 @@ public class SubscribeClusterTopologyUpdatesMessage extends PacketImpl
}
@Override
- public boolean equals(Object obj)
- {
+ public boolean equals(Object obj) {
if (this == obj)
return true;
if (!super.equals(obj))
return false;
if (!(obj instanceof SubscribeClusterTopologyUpdatesMessage))
return false;
- SubscribeClusterTopologyUpdatesMessage other = (SubscribeClusterTopologyUpdatesMessage)obj;
+ SubscribeClusterTopologyUpdatesMessage other = (SubscribeClusterTopologyUpdatesMessage) obj;
if (clusterConnection != other.clusterConnection)
return false;
return true;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SubscribeClusterTopologyUpdatesMessageV2.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SubscribeClusterTopologyUpdatesMessageV2.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SubscribeClusterTopologyUpdatesMessageV2.java
index f380a3d..2be7401 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SubscribeClusterTopologyUpdatesMessageV2.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SubscribeClusterTopologyUpdatesMessageV2.java
@@ -18,30 +18,24 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-public class SubscribeClusterTopologyUpdatesMessageV2 extends SubscribeClusterTopologyUpdatesMessage
-{
+public class SubscribeClusterTopologyUpdatesMessageV2 extends SubscribeClusterTopologyUpdatesMessage {
private int clientVersion;
- public SubscribeClusterTopologyUpdatesMessageV2(final boolean clusterConnection, int clientVersion)
- {
+ public SubscribeClusterTopologyUpdatesMessageV2(final boolean clusterConnection, int clientVersion) {
super(SUBSCRIBE_TOPOLOGY_V2, clusterConnection);
this.clientVersion = clientVersion;
}
- public SubscribeClusterTopologyUpdatesMessageV2()
- {
+ public SubscribeClusterTopologyUpdatesMessageV2() {
super(SUBSCRIBE_TOPOLOGY_V2);
}
// Public --------------------------------------------------------
-
-
@Override
- public void encodeRest(final ActiveMQBuffer buffer)
- {
+ public void encodeRest(final ActiveMQBuffer buffer) {
super.encodeRest(buffer);
buffer.writeInt(clientVersion);
}
@@ -49,21 +43,18 @@ public class SubscribeClusterTopologyUpdatesMessageV2 extends SubscribeClusterTo
/**
* @return the clientVersion
*/
- public int getClientVersion()
- {
+ public int getClientVersion() {
return clientVersion;
}
@Override
- public void decodeRest(final ActiveMQBuffer buffer)
- {
+ public void decodeRest(final ActiveMQBuffer buffer) {
super.decodeRest(buffer);
clientVersion = buffer.readInt();
}
@Override
- public int hashCode()
- {
+ public int hashCode() {
final int prime = 31;
int result = super.hashCode();
result = prime * result + clientVersion;
@@ -71,15 +62,14 @@ public class SubscribeClusterTopologyUpdatesMessageV2 extends SubscribeClusterTo
}
@Override
- public boolean equals(Object obj)
- {
+ public boolean equals(Object obj) {
if (this == obj)
return true;
if (!super.equals(obj))
return false;
if (!(obj instanceof SubscribeClusterTopologyUpdatesMessageV2))
return false;
- SubscribeClusterTopologyUpdatesMessageV2 other = (SubscribeClusterTopologyUpdatesMessageV2)obj;
+ SubscribeClusterTopologyUpdatesMessageV2 other = (SubscribeClusterTopologyUpdatesMessageV2) obj;
if (clientVersion != other.clientVersion)
return false;
return true;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/CloseListener.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/CloseListener.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/CloseListener.java
index 48ec74e..12c8f7a 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/CloseListener.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/CloseListener.java
@@ -21,8 +21,8 @@ package org.apache.activemq.artemis.core.remoting;
* <p>
* {@link org.apache.activemq.artemis.spi.core.protocol.RemotingConnection#addCloseListener(CloseListener)}
*/
-public interface CloseListener
-{
+public interface CloseListener {
+
/**
* called when the connection is closed
*/
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/FailureListener.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/FailureListener.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/FailureListener.java
index 2417dd1..05e185b 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/FailureListener.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/FailureListener.java
@@ -21,12 +21,12 @@ import org.apache.activemq.artemis.api.core.ActiveMQException;
/**
* A FailureListener notifies the user when a connection failure occurred.
*/
-public interface FailureListener
-{
+public interface FailureListener {
+
/**
* Notifies that a connection has failed due to the specified exception.
*
- * @param exception exception which has caused the connection to fail
+ * @param exception exception which has caused the connection to fail
* @param failedOver
*/
void connectionFailed(ActiveMQException exception, boolean failedOver);
@@ -34,7 +34,7 @@ public interface FailureListener
/**
* Notifies that a connection has failed due to the specified exception.
*
- * @param exception exception which has caused the connection to fail
+ * @param exception exception which has caused the connection to fail
* @param failedOver
* @param scaleDownTargetNodeID the ID of the node to which messages are scaling down
*/
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/TransportConfigurationUtil.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/TransportConfigurationUtil.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/TransportConfigurationUtil.java
index 6d704ad..721290a 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/TransportConfigurationUtil.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/TransportConfigurationUtil.java
@@ -29,31 +29,26 @@ import org.apache.activemq.artemis.utils.ClassloadingUtil;
* objects.
*/
-public class TransportConfigurationUtil
-{
+public class TransportConfigurationUtil {
+
private static final Map<String, Map<String, Object>> DEFAULTS = new HashMap<>();
private static final HashMap<String, Object> EMPTY_HELPER = new HashMap<>();
- public static Map<String, Object> getDefaults(String className)
- {
- if (className == null)
- {
+ public static Map<String, Object> getDefaults(String className) {
+ if (className == null) {
/* Returns a new clone of the empty helper. This allows any parent objects to update the map key/values
without polluting the EMPTY_HELPER map. */
return (Map<String, Object>) EMPTY_HELPER.clone();
}
- if (!DEFAULTS.containsKey(className))
- {
+ if (!DEFAULTS.containsKey(className)) {
Object object = instantiateObject(className);
- if (object != null && object instanceof TransportConfigurationHelper)
- {
+ if (object != null && object instanceof TransportConfigurationHelper) {
DEFAULTS.put(className, ((TransportConfigurationHelper) object).getDefaults());
}
- else
- {
+ else {
DEFAULTS.put(className, EMPTY_HELPER);
}
}
@@ -63,29 +58,22 @@ public class TransportConfigurationUtil
return cloneDefaults(DEFAULTS.get(className));
}
- private static Object instantiateObject(final String className)
- {
- return AccessController.doPrivileged(new PrivilegedAction<Object>()
- {
- public Object run()
- {
- try
- {
+ private static Object instantiateObject(final String className) {
+ return AccessController.doPrivileged(new PrivilegedAction<Object>() {
+ public Object run() {
+ try {
return ClassloadingUtil.newInstanceFromClassLoader(className);
}
- catch (IllegalStateException e)
- {
+ catch (IllegalStateException e) {
return null;
}
}
});
}
- private static Map<String, Object> cloneDefaults(Map<String, Object> defaults)
- {
+ private static Map<String, Object> cloneDefaults(Map<String, Object> defaults) {
Map<String, Object> cloned = new HashMap<String, Object>();
- for (Map.Entry entry : defaults.entrySet())
- {
+ for (Map.Entry entry : defaults.entrySet()) {
cloned.put((String) entry.getKey(), entry.getValue());
}
return cloned;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/ActiveMQAMQPFrameDecoder.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/ActiveMQAMQPFrameDecoder.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/ActiveMQAMQPFrameDecoder.java
index 5d9dae1..b3e444c 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/ActiveMQAMQPFrameDecoder.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/ActiveMQAMQPFrameDecoder.java
@@ -23,18 +23,15 @@ import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
/**
* A Netty decoder specially optimised to to decode messages on the core protocol only
*/
-public class ActiveMQAMQPFrameDecoder extends LengthFieldBasedFrameDecoder
-{
- public ActiveMQAMQPFrameDecoder()
- {
+public class ActiveMQAMQPFrameDecoder extends LengthFieldBasedFrameDecoder {
+
+ public ActiveMQAMQPFrameDecoder() {
// The interface itself is part of the buffer (hence the -4)
- super(Integer.MAX_VALUE, 0, 4, -4 , 0);
+ super(Integer.MAX_VALUE, 0, 4, -4, 0);
}
-
@Override
- protected ByteBuf extractFrame(ChannelHandlerContext ctx, ByteBuf buffer, int index, int length)
- {
+ protected ByteBuf extractFrame(ChannelHandlerContext ctx, ByteBuf buffer, int index, int length) {
return super.extractFrame(ctx, buffer, index, length);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/ActiveMQChannelHandler.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/ActiveMQChannelHandler.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/ActiveMQChannelHandler.java
index 009d1c4..d2233d1 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/ActiveMQChannelHandler.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/ActiveMQChannelHandler.java
@@ -28,12 +28,11 @@ import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
import org.apache.activemq.artemis.spi.core.remoting.BufferHandler;
import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener;
-
/**
* Common handler implementation for client and server side handler.
*/
-public class ActiveMQChannelHandler extends ChannelDuplexHandler
-{
+public class ActiveMQChannelHandler extends ChannelDuplexHandler {
+
private final ChannelGroup group;
private final BufferHandler handler;
@@ -44,42 +43,35 @@ public class ActiveMQChannelHandler extends ChannelDuplexHandler
protected ActiveMQChannelHandler(final ChannelGroup group,
final BufferHandler handler,
- final ConnectionLifeCycleListener listener)
- {
+ final ConnectionLifeCycleListener listener) {
this.group = group;
this.handler = handler;
this.listener = listener;
}
@Override
- public void channelActive(final ChannelHandlerContext ctx) throws Exception
- {
+ public void channelActive(final ChannelHandlerContext ctx) throws Exception {
group.add(ctx.channel());
ctx.fireChannelActive();
}
@Override
- public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception
- {
+ public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
// TODO: Think about the id thingy
listener.connectionReadyForWrites(channelId(ctx.channel()), ctx.channel().isWritable());
}
@Override
- public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception
- {
+ public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
ByteBuf buffer = (ByteBuf) msg;
handler.bufferReceived(channelId(ctx.channel()), new ChannelBufferWrapper(buffer));
}
@Override
- public void channelInactive(final ChannelHandlerContext ctx) throws Exception
- {
- synchronized (this)
- {
- if (active)
- {
+ public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
+ synchronized (this) {
+ if (active) {
listener.connectionDestroyed(channelId(ctx.channel()));
active = false;
@@ -88,10 +80,8 @@ public class ActiveMQChannelHandler extends ChannelDuplexHandler
}
@Override
- public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) throws Exception
- {
- if (!active)
- {
+ public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) throws Exception {
+ if (!active) {
return;
}
// We don't want to log this - since it is normal for this to happen during failover/reconnect
@@ -101,22 +91,18 @@ public class ActiveMQChannelHandler extends ChannelDuplexHandler
ActiveMQException me = ActiveMQClientMessageBundle.BUNDLE.nettyError();
me.initCause(cause);
- synchronized (listener)
- {
- try
- {
+ synchronized (listener) {
+ try {
listener.connectionException(channelId(ctx.channel()), me);
active = false;
}
- catch (Exception ex)
- {
+ catch (Exception ex) {
ActiveMQClientLogger.LOGGER.errorCallingLifeCycleListener(ex);
}
}
}
- protected static int channelId(Channel channel)
- {
+ protected static int channelId(Channel channel) {
return channel.hashCode();
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/ActiveMQFrameDecoder2.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/ActiveMQFrameDecoder2.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/ActiveMQFrameDecoder2.java
index 8feb655..60af9cf 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/ActiveMQFrameDecoder2.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/ActiveMQFrameDecoder2.java
@@ -24,16 +24,14 @@ import org.apache.activemq.artemis.utils.DataConstants;
/**
* A Netty decoder specially optimised to to decode messages on the core protocol only
*/
-public class ActiveMQFrameDecoder2 extends LengthFieldBasedFrameDecoder
-{
- public ActiveMQFrameDecoder2()
- {
+public class ActiveMQFrameDecoder2 extends LengthFieldBasedFrameDecoder {
+
+ public ActiveMQFrameDecoder2() {
super(Integer.MAX_VALUE, 0, DataConstants.SIZE_INT);
}
@Override
- protected ByteBuf extractFrame(ChannelHandlerContext ctx, ByteBuf buffer, int index, int length)
- {
+ protected ByteBuf extractFrame(ChannelHandlerContext ctx, ByteBuf buffer, int index, int length) {
// This is a work around on https://github.com/netty/netty/commit/55fbf007f04fbba7bf50028f3c8b35d6c5ea5947
// Right now we need a copy when sending a message on the server otherwise messages won't be resent to the client
ByteBuf frame = ctx.alloc().buffer(length);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
index 631259f..f6fe267 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
@@ -41,8 +41,8 @@ import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
-public class NettyConnection implements Connection
-{
+public class NettyConnection implements Connection {
+
// Constants -----------------------------------------------------
private static final int BATCHING_BUFFER_SIZE = 8192;
@@ -68,7 +68,7 @@ public class NettyConnection implements Connection
private RemotingConnection protocolConnection;
-// Static --------------------------------------------------------
+ // Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -76,8 +76,7 @@ public class NettyConnection implements Connection
final Channel channel,
final ConnectionLifeCycleListener listener,
boolean batchingEnabled,
- boolean directDeliver)
- {
+ boolean directDeliver) {
this.configuration = configuration;
this.channel = channel;
@@ -91,53 +90,41 @@ public class NettyConnection implements Connection
// Public --------------------------------------------------------
- public Channel getNettyChannel()
- {
+ public Channel getNettyChannel() {
return channel;
}
// Connection implementation ----------------------------
-
- public void forceClose()
- {
- if (channel != null)
- {
- try
- {
+ public void forceClose() {
+ if (channel != null) {
+ try {
channel.close();
}
- catch (Throwable e)
- {
+ catch (Throwable e) {
ActiveMQClientLogger.LOGGER.warn(e.getMessage(), e);
}
}
}
-
/**
* This is exposed so users would have the option to look at any data through interceptors
*
* @return
*/
- public Channel getChannel()
- {
+ public Channel getChannel() {
return channel;
}
- public RemotingConnection getProtocolConnection()
- {
+ public RemotingConnection getProtocolConnection() {
return protocolConnection;
}
- public void setProtocolConnection(RemotingConnection protocolConnection)
- {
+ public void setProtocolConnection(RemotingConnection protocolConnection) {
this.protocolConnection = protocolConnection;
}
- public void close()
- {
- if (closed)
- {
+ public void close() {
+ if (closed) {
return;
}
@@ -145,17 +132,13 @@ public class NettyConnection implements Connection
EventLoop eventLoop = channel.eventLoop();
boolean inEventLoop = eventLoop.inEventLoop();
//if we are in an event loop we need to close the channel after the writes have finished
- if (!inEventLoop)
- {
+ if (!inEventLoop) {
closeSSLAndChannel(sslHandler, channel);
}
- else
- {
- eventLoop.execute(new Runnable()
- {
+ else {
+ eventLoop.execute(new Runnable() {
@Override
- public void run()
- {
+ public void run() {
closeSSLAndChannel(sslHandler, channel);
}
});
@@ -166,90 +149,74 @@ public class NettyConnection implements Connection
listener.connectionDestroyed(getID());
}
- public ActiveMQBuffer createTransportBuffer(final int size)
- {
+ public ActiveMQBuffer createTransportBuffer(final int size) {
return new ChannelBufferWrapper(PartialPooledByteBufAllocator.INSTANCE.directBuffer(size), true);
}
- public Object getID()
- {
+ public Object getID() {
// TODO: Think of it
return channel.hashCode();
}
// This is called periodically to flush the batch buffer
- public void checkFlushBatchBuffer()
- {
- if (!batchingEnabled)
- {
+ public void checkFlushBatchBuffer() {
+ if (!batchingEnabled) {
return;
}
- if (writeLock.tryAcquire())
- {
- try
- {
- if (batchBuffer != null && batchBuffer.readable())
- {
+ if (writeLock.tryAcquire()) {
+ try {
+ if (batchBuffer != null && batchBuffer.readable()) {
channel.writeAndFlush(batchBuffer.byteBuf());
batchBuffer = createTransportBuffer(BATCHING_BUFFER_SIZE);
}
}
- finally
- {
+ finally {
writeLock.release();
}
}
}
- public void write(final ActiveMQBuffer buffer)
- {
+ public void write(final ActiveMQBuffer buffer) {
write(buffer, false, false);
}
- public void write(ActiveMQBuffer buffer, final boolean flush, final boolean batched)
- {
+ public void write(ActiveMQBuffer buffer, final boolean flush, final boolean batched) {
write(buffer, flush, batched, null);
}
- public void write(ActiveMQBuffer buffer, final boolean flush, final boolean batched, final ChannelFutureListener futureListener)
- {
+ public void write(ActiveMQBuffer buffer,
+ final boolean flush,
+ final boolean batched,
+ final ChannelFutureListener futureListener) {
- try
- {
+ try {
writeLock.acquire();
- try
- {
- if (batchBuffer == null && batchingEnabled && batched && !flush)
- {
+ try {
+ if (batchBuffer == null && batchingEnabled && batched && !flush) {
// Lazily create batch buffer
batchBuffer = ActiveMQBuffers.dynamicBuffer(BATCHING_BUFFER_SIZE);
}
- if (batchBuffer != null)
- {
+ if (batchBuffer != null) {
batchBuffer.writeBytes(buffer, 0, buffer.writerIndex());
- if (batchBuffer.writerIndex() >= BATCHING_BUFFER_SIZE || !batched || flush)
- {
+ if (batchBuffer.writerIndex() >= BATCHING_BUFFER_SIZE || !batched || flush) {
// If the batch buffer is full or it's flush param or not batched then flush the buffer
buffer = batchBuffer;
}
- else
- {
+ else {
return;
}
- if (!batched || flush)
- {
+ if (!batched || flush) {
batchBuffer = null;
}
- else
- {
+ else {
// Create a new buffer
batchBuffer = ActiveMQBuffers.dynamicBuffer(BATCHING_BUFFER_SIZE);
@@ -260,44 +227,34 @@ public class NettyConnection implements Connection
// use a normal promise
final ByteBuf buf = buffer.byteBuf();
final ChannelPromise promise;
- if (flush || futureListener != null)
- {
+ if (flush || futureListener != null) {
promise = channel.newPromise();
}
- else
- {
+ else {
promise = channel.voidPromise();
}
EventLoop eventLoop = channel.eventLoop();
boolean inEventLoop = eventLoop.inEventLoop();
- if (!inEventLoop)
- {
- if (futureListener != null)
- {
+ if (!inEventLoop) {
+ if (futureListener != null) {
channel.writeAndFlush(buf, promise).addListener(futureListener);
}
- else
- {
+ else {
channel.writeAndFlush(buf, promise);
}
}
- else
- {
+ else {
// create a task which will be picked up by the eventloop and trigger the write.
// This is mainly needed as this method is triggered by different threads for the same channel.
// if we not do this we may produce out of order writes.
- final Runnable task = new Runnable()
- {
+ final Runnable task = new Runnable() {
@Override
- public void run()
- {
- if (futureListener != null)
- {
+ public void run() {
+ if (futureListener != null) {
channel.writeAndFlush(buf, promise).addListener(futureListener);
}
- else
- {
+ else {
channel.writeAndFlush(buf, promise);
}
}
@@ -306,106 +263,83 @@ public class NettyConnection implements Connection
eventLoop.execute(task);
}
-
// only try to wait if not in the eventloop otherwise we will produce a deadlock
- if (flush && !inEventLoop)
- {
- while (true)
- {
- try
- {
+ if (flush && !inEventLoop) {
+ while (true) {
+ try {
boolean ok = promise.await(10000);
- if (!ok)
- {
+ if (!ok) {
ActiveMQClientLogger.LOGGER.timeoutFlushingPacket();
}
break;
}
- catch (InterruptedException e)
- {
+ catch (InterruptedException e) {
throw new ActiveMQInterruptedException(e);
}
}
}
}
- finally
- {
+ finally {
writeLock.release();
}
}
- catch (InterruptedException e)
- {
+ catch (InterruptedException e) {
throw new ActiveMQInterruptedException(e);
}
}
- public String getRemoteAddress()
- {
+ public String getRemoteAddress() {
SocketAddress address = channel.remoteAddress();
- if (address == null)
- {
+ if (address == null) {
return null;
}
return address.toString();
}
- public boolean isDirectDeliver()
- {
+ public boolean isDirectDeliver() {
return directDeliver;
}
- public void addReadyListener(final ReadyListener listener)
- {
+ public void addReadyListener(final ReadyListener listener) {
readyListeners.add(listener);
}
- public void removeReadyListener(final ReadyListener listener)
- {
+ public void removeReadyListener(final ReadyListener listener) {
readyListeners.remove(listener);
}
//never allow this
- public ActiveMQPrincipal getDefaultActiveMQPrincipal()
- {
+ public ActiveMQPrincipal getDefaultActiveMQPrincipal() {
return null;
}
- void fireReady(final boolean ready)
- {
- for (ReadyListener listener : readyListeners)
- {
+ void fireReady(final boolean ready) {
+ for (ReadyListener listener : readyListeners) {
listener.readyForWriting(ready);
}
}
-
@Override
- public TransportConfiguration getConnectorConfig()
- {
- if (configuration != null)
- {
+ public TransportConfiguration getConnectorConfig() {
+ if (configuration != null) {
return new TransportConfiguration(NettyConnectorFactory.class.getName(), this.configuration);
}
- else
- {
+ else {
return null;
}
}
@Override
- public boolean isUsingProtocolHandling()
- {
+ public boolean isUsingProtocolHandling() {
return true;
}
-
// Public --------------------------------------------------------
@Override
- public String toString()
- {
+ public String toString() {
return super.toString() + "[local= " + channel.localAddress() + ", remote=" + channel.remoteAddress() + "]";
}
@@ -415,29 +349,22 @@ public class NettyConnection implements Connection
// Private -------------------------------------------------------
-
- private void closeSSLAndChannel(SslHandler sslHandler, Channel channel)
- {
- if (sslHandler != null)
- {
- try
- {
+ private void closeSSLAndChannel(SslHandler sslHandler, Channel channel) {
+ if (sslHandler != null) {
+ try {
ChannelFuture sslCloseFuture = sslHandler.close();
- if (!sslCloseFuture.awaitUninterruptibly(10000))
- {
+ if (!sslCloseFuture.awaitUninterruptibly(10000)) {
ActiveMQClientLogger.LOGGER.timeoutClosingSSL();
}
}
- catch (Throwable t)
- {
+ catch (Throwable t) {
// ignore
}
}
ChannelFuture closeFuture = channel.close();
- if (!closeFuture.awaitUninterruptibly(10000))
- {
+ if (!closeFuture.awaitUninterruptibly(10000)) {
ActiveMQClientLogger.LOGGER.timeoutClosingNettyChannel();
}
}