You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2015/08/10 17:13:31 UTC
[32/53] [abbrv] [partial] activemq-artemis git commit: automatic
checkstyle change
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java
index c73e7c9..fba1a1c 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java
@@ -30,45 +30,52 @@ import org.apache.activemq.artemis.api.core.ActiveMQException;
* <p>
* A Channel *does not* support concurrent access by more than one thread!
*/
-public interface Channel
-{
+public interface Channel {
+
/**
* Returns the id of this channel.
+ *
* @return the id
*/
long getID();
- /** For protocol check */
+ /**
+ * For protocol check
+ */
boolean supports(byte packetID);
/**
* Sends a packet on this channel.
+ *
* @param packet the packet to send
* @return false if the packet was rejected by an outgoing interceptor; true if the send was
- * successful
+ * successful
*/
boolean send(Packet packet);
/**
* Sends a packet on this channel using batching algorithm if appropriate
+ *
* @param packet the packet to send
* @return false if the packet was rejected by an outgoing interceptor; true if the send was
- * successful
+ * successful
*/
boolean sendBatched(Packet packet);
/**
* Sends a packet on this channel and then blocks until it has been written to the connection.
+ *
* @param packet the packet to send
* @return false if the packet was rejected by an outgoing interceptor; true if the send was
- * successful
+ * successful
*/
boolean sendAndFlush(Packet packet);
/**
* Sends a packet on this channel and then blocks until a response is received or a timeout
* occurs.
- * @param packet the packet to send
+ *
+ * @param packet the packet to send
* @param expectedPacket the packet being expected.
* @return the response
* @throws ActiveMQException if an error occurs during the send
@@ -78,6 +85,7 @@ public interface Channel
/**
* Sets the {@link org.apache.activemq.artemis.core.protocol.core.ChannelHandler} that this channel should
* forward received packets to.
+ *
* @param handler the handler
*/
void setHandler(ChannelHandler handler);
@@ -85,6 +93,7 @@ public interface Channel
/**
* Gets the {@link org.apache.activemq.artemis.core.protocol.core.ChannelHandler} that this channel should
* forward received packets to.
+ *
* @return the current channel handler
*/
ChannelHandler getHandler();
@@ -100,6 +109,7 @@ public interface Channel
* Transfers the connection used by this channel to the one specified.
* <p>
* All new packets will be sent via this connection.
+ *
* @param newConnection the new connection
*/
void transferConnection(CoreRemotingConnection newConnection);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/ChannelHandler.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/ChannelHandler.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/ChannelHandler.java
index 2b357e4..a44b6d5 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/ChannelHandler.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/ChannelHandler.java
@@ -16,13 +16,12 @@
*/
package org.apache.activemq.artemis.core.protocol.core;
-
/**
* A ChannelHandler is used by {@link Channel}. When a channel receives a packet it will call its handler to deal with the
* packet.
*/
-public interface ChannelHandler
-{
+public interface ChannelHandler {
+
/**
* called by the channel when a packet is received..
*
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CommandConfirmationHandler.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CommandConfirmationHandler.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CommandConfirmationHandler.java
index 35d923b..bc07019 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CommandConfirmationHandler.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CommandConfirmationHandler.java
@@ -16,13 +16,12 @@
*/
package org.apache.activemq.artemis.core.protocol.core;
-
/**
* A CommandConfirmationHandler is used by the channel to confirm confirmations of packets.
* <p>
*/
-public interface CommandConfirmationHandler
-{
+public interface CommandConfirmationHandler {
+
/**
* called by channel after a confirmation has been received.
*
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java
index 5123707..f2aa5b4 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java
@@ -19,15 +19,15 @@ package org.apache.activemq.artemis.core.protocol.core;
import org.apache.activemq.artemis.core.security.ActiveMQPrincipal;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
-
/**
* Extension of RemotingConnection for the ActiveMQ Artemis core protocol
*/
-public interface CoreRemotingConnection extends RemotingConnection
-{
+public interface CoreRemotingConnection extends RemotingConnection {
- /** The client protocol used on the communication.
- * This will determine if the client has support for certain packet types */
+ /**
+ * The client protocol used on the communication.
+ * This will determine if the client has support for certain packet types
+ */
int getClientVersion();
/**
@@ -40,7 +40,8 @@ public interface CoreRemotingConnection extends RemotingConnection
* Returns the channel with the channel id specified.
* <p>
* If it does not exist create it with the confirmation window size.
- * @param channelID the channel id
+ *
+ * @param channelID the channel id
* @param confWindowSize the confirmation window size
* @return the channel
*/
@@ -71,36 +72,42 @@ public interface CoreRemotingConnection extends RemotingConnection
/**
* Resets the id generator used to generate id's.
+ *
* @param id the first id to set it to
*/
void syncIDGeneratorSequence(long id);
/**
* Returns the next id to be chosen.
+ *
* @return the id
*/
long getIDGeneratorSequence();
/**
* Returns the current timeout for blocking calls
+ *
* @return the timeout in milliseconds
*/
long getBlockingCallTimeout();
/**
* Returns the current timeout for blocking calls
+ *
* @return the timeout in milliseconds
*/
long getBlockingCallFailoverTimeout();
/**
* Returns the transfer lock used when transferring connections.
+ *
* @return the lock
*/
Object getTransferLock();
/**
* Returns the default security principal
+ *
* @return the principal
*/
ActiveMQPrincipal getDefaultActiveMQPrincipal();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java
index 3da787c..ddb734e 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java
@@ -22,8 +22,8 @@ import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
/**
* A Packet represents a packet of data transmitted over a connection.
*/
-public interface Packet
-{
+public interface Packet {
+
/**
* Sets the channel id that should be used once the packet has been successfully decoded it is
* sent to the correct channel.
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java
index d938b85..73ea529 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java
@@ -72,8 +72,8 @@ import org.apache.activemq.artemis.utils.VersionLoader;
* Implementations of this class need to be stateless.
*/
-public class ActiveMQClientProtocolManager implements ClientProtocolManager
-{
+public class ActiveMQClientProtocolManager implements ClientProtocolManager {
+
private static final String handshake = "ARTEMIS";
private final int versionID = VersionLoader.getVersion().getIncrementingVersion();
@@ -106,117 +106,89 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager
private final CountDownLatch waitLatch = new CountDownLatch(1);
-
- public ActiveMQClientProtocolManager()
- {
+ public ActiveMQClientProtocolManager() {
}
- public String getName()
- {
+ public String getName() {
return ActiveMQClient.DEFAULT_CORE_PROTOCOL;
}
- public void setSessionFactory(ClientSessionFactory factory)
- {
- this.factoryInternal = (ClientSessionFactoryInternal)factory;
+ public void setSessionFactory(ClientSessionFactory factory) {
+ this.factoryInternal = (ClientSessionFactoryInternal) factory;
}
- public ClientSessionFactory getSessionFactory()
- {
+ public ClientSessionFactory getSessionFactory() {
return this.factoryInternal;
}
@Override
- public void addChannelHandlers(ChannelPipeline pipeline)
- {
+ public void addChannelHandlers(ChannelPipeline pipeline) {
pipeline.addLast("activemq-decoder", new ActiveMQFrameDecoder2());
}
- public boolean waitOnLatch(long milliseconds) throws InterruptedException
- {
+ public boolean waitOnLatch(long milliseconds) throws InterruptedException {
return waitLatch.await(milliseconds, TimeUnit.MILLISECONDS);
}
- public Channel getChannel0()
- {
- if (connection == null)
- {
+ public Channel getChannel0() {
+ if (connection == null) {
return null;
}
- else
- {
+ else {
return connection.getChannel(ChannelImpl.CHANNEL_ID.PING.id, -1);
}
}
- public RemotingConnection getCurrentConnection()
- {
+ public RemotingConnection getCurrentConnection() {
return connection;
}
-
- public Channel getChannel1()
- {
- if (connection == null)
- {
+ public Channel getChannel1() {
+ if (connection == null) {
return null;
}
- else
- {
+ else {
return connection.getChannel(1, -1);
}
}
- public Lock lockSessionCreation()
- {
- try
- {
+ public Lock lockSessionCreation() {
+ try {
Lock localFailoverLock = factoryInternal.lockFailover();
- try
- {
- if (connection == null)
- {
+ try {
+ if (connection == null) {
return null;
}
Lock lock = getChannel1().getLock();
// Lock it - this must be done while the failoverLock is held
- while (isAlive() && !lock.tryLock(100, TimeUnit.MILLISECONDS))
- {
+ while (isAlive() && !lock.tryLock(100, TimeUnit.MILLISECONDS)) {
}
return lock;
}
- finally
- {
+ finally {
localFailoverLock.unlock();
}
// We can now release the failoverLock
}
- catch (InterruptedException e)
- {
+ catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
}
}
-
- public void stop()
- {
+ public void stop() {
alive = false;
-
- synchronized (inCreateSessionGuard)
- {
+ synchronized (inCreateSessionGuard) {
if (inCreateSessionLatch != null)
inCreateSessionLatch.countDown();
}
-
Channel channel1 = getChannel1();
- if (channel1 != null)
- {
+ if (channel1 != null) {
channel1.returnBlocking();
}
@@ -224,15 +196,12 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager
}
- public boolean isAlive()
- {
+ public boolean isAlive() {
return alive;
}
-
@Override
- public void ping(long connectionTTL)
- {
+ public void ping(long connectionTTL) {
Channel channel = connection.getChannel(ChannelImpl.CHANNEL_ID.PING.id, -1);
Ping ping = new Ping(connectionTTL);
@@ -243,37 +212,26 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager
}
@Override
- public void sendSubscribeTopology(final boolean isServer)
- {
- getChannel0().send(new SubscribeClusterTopologyUpdatesMessageV2(isServer,
- VersionLoader.getVersion()
- .getIncrementingVersion()));
+ public void sendSubscribeTopology(final boolean isServer) {
+ getChannel0().send(new SubscribeClusterTopologyUpdatesMessageV2(isServer, VersionLoader.getVersion().getIncrementingVersion()));
}
@Override
- public SessionContext createSessionContext(String name, String username, String password,
- boolean xa, boolean autoCommitSends, boolean autoCommitAcks,
- boolean preAcknowledge, int minLargeMessageSize, int confirmationWindowSize) throws ActiveMQException
- {
- for (Version clientVersion : VersionLoader.getClientVersions())
- {
- try
- {
- return createSessionContext(clientVersion,
- name,
- username,
- password,
- xa,
- autoCommitSends,
- autoCommitAcks,
- preAcknowledge,
- minLargeMessageSize,
- confirmationWindowSize);
+ public SessionContext createSessionContext(String name,
+ String username,
+ String password,
+ boolean xa,
+ boolean autoCommitSends,
+ boolean autoCommitAcks,
+ boolean preAcknowledge,
+ int minLargeMessageSize,
+ int confirmationWindowSize) throws ActiveMQException {
+ for (Version clientVersion : VersionLoader.getClientVersions()) {
+ try {
+ return createSessionContext(clientVersion, name, username, password, xa, autoCommitSends, autoCommitAcks, preAcknowledge, minLargeMessageSize, confirmationWindowSize);
}
- catch (ActiveMQException e)
- {
- if (e.getType() != ActiveMQExceptionType.INCOMPATIBLE_CLIENT_SERVER_VERSIONS)
- {
+ catch (ActiveMQException e) {
+ if (e.getType() != ActiveMQExceptionType.INCOMPATIBLE_CLIENT_SERVER_VERSIONS) {
throw e;
}
}
@@ -282,10 +240,16 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager
throw new ActiveMQException(ActiveMQExceptionType.INCOMPATIBLE_CLIENT_SERVER_VERSIONS);
}
- public SessionContext createSessionContext(Version clientVersion, String name, String username, String password,
- boolean xa, boolean autoCommitSends, boolean autoCommitAcks,
- boolean preAcknowledge, int minLargeMessageSize, int confirmationWindowSize) throws ActiveMQException
- {
+ public SessionContext createSessionContext(Version clientVersion,
+ String name,
+ String username,
+ String password,
+ boolean xa,
+ boolean autoCommitSends,
+ boolean autoCommitAcks,
+ boolean preAcknowledge,
+ int minLargeMessageSize,
+ int confirmationWindowSize) throws ActiveMQException {
if (!isAlive())
throw ActiveMQClientMessageBundle.BUNDLE.clientSessionClosed();
@@ -293,20 +257,17 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager
CreateSessionResponseMessage response = null;
boolean retry;
- do
- {
+ do {
retry = false;
Lock lock = null;
- try
- {
+ try {
lock = lockSessionCreation();
// We now set a flag saying createSession is executing
- synchronized (inCreateSessionGuard)
- {
+ synchronized (inCreateSessionGuard) {
if (!isAlive())
throw ActiveMQClientMessageBundle.BUNDLE.clientSessionClosed();
inCreateSession = true;
@@ -315,32 +276,17 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager
long sessionChannelID = connection.generateChannelID();
- Packet request = new CreateSessionMessage(name,
- sessionChannelID,
- clientVersion.getIncrementingVersion(),
- username,
- password,
- minLargeMessageSize,
- xa,
- autoCommitSends,
- autoCommitAcks,
- preAcknowledge,
- confirmationWindowSize,
- null);
-
-
- try
- {
+ Packet request = new CreateSessionMessage(name, sessionChannelID, clientVersion.getIncrementingVersion(), username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge, confirmationWindowSize, null);
+
+ try {
// channel1 reference here has to go away
response = (CreateSessionResponseMessage) getChannel1().sendBlocking(request, PacketImpl.CREATESESSION_RESP);
}
- catch (ActiveMQException cause)
- {
+ catch (ActiveMQException cause) {
if (!isAlive())
throw cause;
- if (cause.getType() == ActiveMQExceptionType.UNBLOCKED)
- {
+ if (cause.getType() == ActiveMQExceptionType.UNBLOCKED) {
// This means the thread was blocked on create session and failover unblocked it
// so failover could occur
@@ -348,37 +294,29 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager
continue;
}
- else
- {
+ else {
throw cause;
}
}
sessionChannel = connection.getChannel(sessionChannelID, confirmationWindowSize);
-
}
- catch (Throwable t)
- {
- if (lock != null)
- {
+ catch (Throwable t) {
+ if (lock != null) {
lock.unlock();
lock = null;
}
- if (t instanceof ActiveMQException)
- {
+ if (t instanceof ActiveMQException) {
throw (ActiveMQException) t;
}
- else
- {
+ else {
throw ActiveMQClientMessageBundle.BUNDLE.failedToCreateSession(t);
}
}
- finally
- {
- if (lock != null)
- {
+ finally {
+ if (lock != null) {
lock.unlock();
}
@@ -386,60 +324,48 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager
inCreateSession = false;
inCreateSessionLatch.countDown();
}
- }
- while (retry);
-
+ } while (retry);
// these objects won't be null, otherwise it would keep retrying on the previous loop
return new ActiveMQSessionContext(name, connection, sessionChannel, response.getServerVersion(), confirmationWindowSize);
}
- public boolean cleanupBeforeFailover(ActiveMQException cause)
- {
+ public boolean cleanupBeforeFailover(ActiveMQException cause) {
boolean needToInterrupt;
CountDownLatch exitLockLatch;
Lock lock = lockSessionCreation();
- if (lock == null)
- {
+ if (lock == null) {
return false;
}
- try
- {
- synchronized (inCreateSessionGuard)
- {
+ try {
+ synchronized (inCreateSessionGuard) {
needToInterrupt = inCreateSession;
exitLockLatch = inCreateSessionLatch;
}
}
- finally
- {
+ finally {
lock.unlock();
}
- if (needToInterrupt)
- {
+ if (needToInterrupt) {
forceReturnChannel1(cause);
// Now we need to make sure that the thread has actually exited and returned it's
// connections
// before failover occurs
- while (inCreateSession && isAlive())
- {
- try
- {
- if (exitLockLatch != null)
- {
+ while (inCreateSession && isAlive()) {
+ try {
+ if (exitLockLatch != null) {
exitLockLatch.await(500, TimeUnit.MILLISECONDS);
}
}
- catch (InterruptedException e1)
- {
+ catch (InterruptedException e1) {
throw new ActiveMQInterruptedException(e1);
}
}
@@ -449,37 +375,31 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager
}
@Override
- public boolean checkForFailover(String liveNodeID) throws ActiveMQException
- {
+ public boolean checkForFailover(String liveNodeID) throws ActiveMQException {
CheckFailoverMessage packet = new CheckFailoverMessage(liveNodeID);
- CheckFailoverReplyMessage message = (CheckFailoverReplyMessage) getChannel1().sendBlocking(packet,
- PacketImpl.CHECK_FOR_FAILOVER_REPLY);
+ CheckFailoverReplyMessage message = (CheckFailoverReplyMessage) getChannel1().sendBlocking(packet, PacketImpl.CHECK_FOR_FAILOVER_REPLY);
return message.isOkToFailover();
}
-
- public RemotingConnection connect(Connection transportConnection, long callTimeout, long callFailoverTimeout,
- List<Interceptor> incomingInterceptors, List<Interceptor> outgoingInterceptors,
- TopologyResponseHandler topologyResponseHandler)
- {
- this.connection = new RemotingConnectionImpl(getPacketDecoder(), transportConnection,
- callTimeout, callFailoverTimeout,
- incomingInterceptors, outgoingInterceptors);
+ public RemotingConnection connect(Connection transportConnection,
+ long callTimeout,
+ long callFailoverTimeout,
+ List<Interceptor> incomingInterceptors,
+ List<Interceptor> outgoingInterceptors,
+ TopologyResponseHandler topologyResponseHandler) {
+ this.connection = new RemotingConnectionImpl(getPacketDecoder(), transportConnection, callTimeout, callFailoverTimeout, incomingInterceptors, outgoingInterceptors);
this.topologyResponseHandler = topologyResponseHandler;
getChannel0().setHandler(new Channel0Handler(connection));
-
sendHandshake(transportConnection);
return connection;
}
- private void sendHandshake(Connection transportConnection)
- {
- if (transportConnection.isUsingProtocolHandling())
- {
+ private void sendHandshake(Connection transportConnection) {
+ if (transportConnection.isUsingProtocolHandling()) {
// no need to send handshake on inVM as inVM is not using the NettyProtocolHandling
ActiveMQBuffer amqbuffer = connection.createTransportBuffer(handshake.length());
amqbuffer.writeBytes(handshake.getBytes());
@@ -487,29 +407,24 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager
}
}
+ private class Channel0Handler implements ChannelHandler {
- private class Channel0Handler implements ChannelHandler
- {
private final CoreRemotingConnection conn;
- private Channel0Handler(final CoreRemotingConnection conn)
- {
+ private Channel0Handler(final CoreRemotingConnection conn) {
this.conn = conn;
}
- public void handlePacket(final Packet packet)
- {
+ public void handlePacket(final Packet packet) {
final byte type = packet.getType();
- if (type == PacketImpl.DISCONNECT || type == PacketImpl.DISCONNECT_V2)
- {
+ if (type == PacketImpl.DISCONNECT || type == PacketImpl.DISCONNECT_V2) {
final DisconnectMessage msg = (DisconnectMessage) packet;
String scaleDownTargetNodeID = null;
SimpleString nodeID = msg.getNodeID();
- if (packet instanceof DisconnectMessage_V2)
- {
+ if (packet instanceof DisconnectMessage_V2) {
final DisconnectMessage_V2 msg_v2 = (DisconnectMessage_V2) packet;
scaleDownTargetNodeID = msg_v2.getScaleDownNodeID() == null ? null : msg_v2.getScaleDownNodeID().toString();
}
@@ -517,23 +432,19 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager
if (topologyResponseHandler != null)
topologyResponseHandler.nodeDisconnected(conn, nodeID == null ? null : nodeID.toString(), scaleDownTargetNodeID);
}
- else if (type == PacketImpl.CLUSTER_TOPOLOGY)
- {
+ else if (type == PacketImpl.CLUSTER_TOPOLOGY) {
ClusterTopologyChangeMessage topMessage = (ClusterTopologyChangeMessage) packet;
notifyTopologyChange(topMessage);
}
- else if (type == PacketImpl.CLUSTER_TOPOLOGY_V2)
- {
+ else if (type == PacketImpl.CLUSTER_TOPOLOGY_V2) {
ClusterTopologyChangeMessage_V2 topMessage = (ClusterTopologyChangeMessage_V2) packet;
notifyTopologyChange(topMessage);
}
- else if (type == PacketImpl.CLUSTER_TOPOLOGY || type == PacketImpl.CLUSTER_TOPOLOGY_V2 || type == PacketImpl.CLUSTER_TOPOLOGY_V3)
- {
+ else if (type == PacketImpl.CLUSTER_TOPOLOGY || type == PacketImpl.CLUSTER_TOPOLOGY_V2 || type == PacketImpl.CLUSTER_TOPOLOGY_V3) {
ClusterTopologyChangeMessage topMessage = (ClusterTopologyChangeMessage) packet;
notifyTopologyChange(topMessage);
}
- else if (type == PacketImpl.CHECK_FOR_FAILOVER_REPLY)
- {
+ else if (type == PacketImpl.CHECK_FOR_FAILOVER_REPLY) {
System.out.println("Channel0Handler.handlePacket");
}
}
@@ -541,73 +452,57 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager
/**
* @param topMessage
*/
- private void notifyTopologyChange(final ClusterTopologyChangeMessage topMessage)
- {
+ private void notifyTopologyChange(final ClusterTopologyChangeMessage topMessage) {
final long eventUID;
final String backupGroupName;
final String scaleDownGroupName;
- if (topMessage instanceof ClusterTopologyChangeMessage_V3)
- {
+ if (topMessage instanceof ClusterTopologyChangeMessage_V3) {
eventUID = ((ClusterTopologyChangeMessage_V3) topMessage).getUniqueEventID();
backupGroupName = ((ClusterTopologyChangeMessage_V3) topMessage).getBackupGroupName();
scaleDownGroupName = ((ClusterTopologyChangeMessage_V3) topMessage).getScaleDownGroupName();
}
- else if (topMessage instanceof ClusterTopologyChangeMessage_V2)
- {
+ else if (topMessage instanceof ClusterTopologyChangeMessage_V2) {
eventUID = ((ClusterTopologyChangeMessage_V2) topMessage).getUniqueEventID();
backupGroupName = ((ClusterTopologyChangeMessage_V2) topMessage).getBackupGroupName();
scaleDownGroupName = null;
}
- else
- {
+ else {
eventUID = System.currentTimeMillis();
backupGroupName = null;
scaleDownGroupName = null;
}
- if (topMessage.isExit())
- {
- if (ActiveMQClientLogger.LOGGER.isDebugEnabled())
- {
+ if (topMessage.isExit()) {
+ if (ActiveMQClientLogger.LOGGER.isDebugEnabled()) {
ActiveMQClientLogger.LOGGER.debug("Notifying " + topMessage.getNodeID() + " going down");
}
- if (topologyResponseHandler != null)
- {
+ if (topologyResponseHandler != null) {
topologyResponseHandler.notifyNodeDown(eventUID, topMessage.getNodeID());
}
}
- else
- {
+ else {
Pair<TransportConfiguration, TransportConfiguration> transportConfig = topMessage.getPair();
- if (transportConfig.getA() == null && transportConfig.getB() == null)
- {
- transportConfig = new Pair<>(conn.getTransportConnection()
- .getConnectorConfig(),
- null);
+ if (transportConfig.getA() == null && transportConfig.getB() == null) {
+ transportConfig = new Pair<>(conn.getTransportConnection().getConnectorConfig(), null);
}
- if (topologyResponseHandler != null)
- {
+ if (topologyResponseHandler != null) {
topologyResponseHandler.notifyNodeUp(eventUID, topMessage.getNodeID(), backupGroupName, scaleDownGroupName, transportConfig, topMessage.isLast());
}
}
}
}
- protected PacketDecoder getPacketDecoder()
- {
+ protected PacketDecoder getPacketDecoder() {
return ClientPacketDecoder.INSTANCE;
}
- private void forceReturnChannel1(ActiveMQException cause)
- {
- if (connection != null)
- {
+ private void forceReturnChannel1(ActiveMQException cause) {
+ if (connection != null) {
Channel channel1 = connection.getChannel(1, -1);
- if (channel1 != null)
- {
+ if (channel1 != null) {
channel1.returnBlocking(cause);
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManagerFactory.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManagerFactory.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManagerFactory.java
index 7c3435a..a58834b 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManagerFactory.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManagerFactory.java
@@ -19,23 +19,20 @@ package org.apache.activemq.artemis.core.protocol.core.impl;
import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager;
import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManagerFactory;
-public class ActiveMQClientProtocolManagerFactory implements ClientProtocolManagerFactory
-{
+public class ActiveMQClientProtocolManagerFactory implements ClientProtocolManagerFactory {
+
private static final long serialVersionUID = 1;
private static final ActiveMQClientProtocolManagerFactory INSTANCE = new ActiveMQClientProtocolManagerFactory();
- private ActiveMQClientProtocolManagerFactory()
- {
+ private ActiveMQClientProtocolManagerFactory() {
}
- public static final ActiveMQClientProtocolManagerFactory getInstance()
- {
+ public static final ActiveMQClientProtocolManagerFactory getInstance() {
return INSTANCE;
}
- public ClientProtocolManager newProtocolManager()
- {
+ public ClientProtocolManager newProtocolManager() {
return new ActiveMQClientProtocolManager();
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQConsumerContext.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQConsumerContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQConsumerContext.java
index 320930d..08abb91 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQConsumerContext.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQConsumerContext.java
@@ -18,36 +18,35 @@ package org.apache.activemq.artemis.core.protocol.core.impl;
import org.apache.activemq.artemis.spi.core.remoting.ConsumerContext;
-public class ActiveMQConsumerContext extends ConsumerContext
-{
+public class ActiveMQConsumerContext extends ConsumerContext {
+
private long id;
- public ActiveMQConsumerContext(long id)
- {
+ public ActiveMQConsumerContext(long id) {
this.id = id;
}
- public long getId()
- {
+ public long getId() {
return id;
}
@Override
- public boolean equals(Object o)
- {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
+ public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
ActiveMQConsumerContext that = (ActiveMQConsumerContext) o;
- if (id != that.id) return false;
+ if (id != that.id)
+ return false;
return true;
}
@Override
- public int hashCode()
- {
+ public int hashCode() {
return (int) (id ^ (id >>> 32));
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
index 23370d5..5279de2 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
@@ -107,16 +107,18 @@ import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SES
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_RECEIVE_LARGE_MSG;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_RECEIVE_MSG;
-public class ActiveMQSessionContext extends SessionContext
-{
+public class ActiveMQSessionContext extends SessionContext {
+
private final Channel sessionChannel;
private final int serverVersion;
private int confirmationWindow;
private final String name;
-
- public ActiveMQSessionContext(String name, RemotingConnection remotingConnection, Channel sessionChannel, int serverVersion, int confirmationWindow)
- {
+ public ActiveMQSessionContext(String name,
+ RemotingConnection remotingConnection,
+ Channel sessionChannel,
+ int serverVersion,
+ int confirmationWindow) {
super(remotingConnection);
this.name = name;
@@ -127,71 +129,55 @@ public class ActiveMQSessionContext extends SessionContext
ChannelHandler handler = new ClientSessionPacketHandler();
sessionChannel.setHandler(handler);
-
- if (confirmationWindow >= 0)
- {
+ if (confirmationWindow >= 0) {
sessionChannel.setCommandConfirmationHandler(confirmationHandler);
}
}
-
- private final CommandConfirmationHandler confirmationHandler = new CommandConfirmationHandler()
- {
- public void commandConfirmed(final Packet packet)
- {
- if (packet.getType() == PacketImpl.SESS_SEND)
- {
+ private final CommandConfirmationHandler confirmationHandler = new CommandConfirmationHandler() {
+ public void commandConfirmed(final Packet packet) {
+ if (packet.getType() == PacketImpl.SESS_SEND) {
SessionSendMessage ssm = (SessionSendMessage) packet;
callSendAck(ssm.getHandler(), ssm.getMessage());
}
- else if (packet.getType() == PacketImpl.SESS_SEND_CONTINUATION)
- {
+ else if (packet.getType() == PacketImpl.SESS_SEND_CONTINUATION) {
SessionSendContinuationMessage scm = (SessionSendContinuationMessage) packet;
- if (!scm.isContinues())
- {
+ if (!scm.isContinues()) {
callSendAck(scm.getHandler(), scm.getMessage());
}
}
}
- private void callSendAck(SendAcknowledgementHandler handler, final Message message)
- {
- if (handler != null)
- {
+ private void callSendAck(SendAcknowledgementHandler handler, final Message message) {
+ if (handler != null) {
handler.sendAcknowledged(message);
}
- else if (sendAckHandler != null)
- {
+ else if (sendAckHandler != null) {
sendAckHandler.sendAcknowledged(message);
}
}
};
-
// Failover utility methods
@Override
- public void returnBlocking(ActiveMQException cause)
- {
+ public void returnBlocking(ActiveMQException cause) {
sessionChannel.returnBlocking(cause);
}
@Override
- public void lockCommunications()
- {
+ public void lockCommunications() {
sessionChannel.lock();
}
@Override
- public void releaseCommunications()
- {
+ public void releaseCommunications() {
sessionChannel.setTransferring(false);
sessionChannel.unlock();
}
- public void cleanup()
- {
+ public void cleanup() {
sessionChannel.close();
// if the server is sending a disconnect
@@ -200,14 +186,11 @@ public class ActiveMQSessionContext extends SessionContext
}
@Override
- public void linkFlowControl(SimpleString address, ClientProducerCreditsImpl clientProducerCredits)
- {
+ public void linkFlowControl(SimpleString address, ClientProducerCreditsImpl clientProducerCredits) {
// nothing to be done here... Flow control here is done on the core side
}
-
- public void setSendAcknowledgementHandler(final SendAcknowledgementHandler handler)
- {
+ public void setSendAcknowledgementHandler(final SendAcknowledgementHandler handler) {
sessionChannel.setCommandConfirmationHandler(confirmationHandler);
this.sendAckHandler = handler;
}
@@ -215,37 +198,34 @@ public class ActiveMQSessionContext extends SessionContext
public void createSharedQueue(SimpleString address,
SimpleString queueName,
SimpleString filterString,
- boolean durable) throws ActiveMQException
- {
+ boolean durable) throws ActiveMQException {
sessionChannel.sendBlocking(new CreateSharedQueueMessage(address, queueName, filterString, durable, true), PacketImpl.NULL_RESPONSE);
}
- public void deleteQueue(final SimpleString queueName) throws ActiveMQException
- {
+ public void deleteQueue(final SimpleString queueName) throws ActiveMQException {
sessionChannel.sendBlocking(new SessionDeleteQueueMessage(queueName), PacketImpl.NULL_RESPONSE);
}
- public ClientSession.QueueQuery queueQuery(final SimpleString queueName) throws ActiveMQException
- {
+ public ClientSession.QueueQuery queueQuery(final SimpleString queueName) throws ActiveMQException {
SessionQueueQueryMessage request = new SessionQueueQueryMessage(queueName);
SessionQueueQueryResponseMessage_V2 response = (SessionQueueQueryResponseMessage_V2) sessionChannel.sendBlocking(request, PacketImpl.SESS_QUEUEQUERY_RESP_V2);
return response.toQueueQuery();
}
- public ClientConsumerInternal createConsumer(SimpleString queueName, SimpleString filterString,
- int windowSize, int maxRate, int ackBatchSize, boolean browseOnly,
- Executor executor, Executor flowControlExecutor) throws ActiveMQException
- {
+ public ClientConsumerInternal createConsumer(SimpleString queueName,
+ SimpleString filterString,
+ int windowSize,
+ int maxRate,
+ int ackBatchSize,
+ boolean browseOnly,
+ Executor executor,
+ Executor flowControlExecutor) throws ActiveMQException {
long consumerID = idGenerator.generateID();
ActiveMQConsumerContext consumerContext = new ActiveMQConsumerContext(consumerID);
- SessionCreateConsumerMessage request = new SessionCreateConsumerMessage(consumerID,
- queueName,
- filterString,
- browseOnly,
- true);
+ SessionCreateConsumerMessage request = new SessionCreateConsumerMessage(consumerID, queueName, filterString, browseOnly, true);
SessionQueueQueryResponseMessage_V2 queueInfo = (SessionQueueQueryResponseMessage_V2) sessionChannel.sendBlocking(request, PacketImpl.SESS_QUEUEQUERY_RESP_V2);
@@ -253,133 +233,93 @@ public class ActiveMQSessionContext extends SessionContext
// could be overridden on the queue settings
// The value we send is just a hint
- return new ClientConsumerImpl(session,
- consumerContext,
- queueName,
- filterString,
- browseOnly,
- calcWindowSize(windowSize),
- ackBatchSize,
- maxRate > 0 ? new TokenBucketLimiterImpl(maxRate,
- false)
- : null,
- executor,
- flowControlExecutor,
- this,
- queueInfo.toQueueQuery(),
- lookupTCCL());
- }
-
-
- public int getServerVersion()
- {
+ return new ClientConsumerImpl(session, consumerContext, queueName, filterString, browseOnly, calcWindowSize(windowSize), ackBatchSize, maxRate > 0 ? new TokenBucketLimiterImpl(maxRate, false) : null, executor, flowControlExecutor, this, queueInfo.toQueueQuery(), lookupTCCL());
+ }
+
+ public int getServerVersion() {
return serverVersion;
}
- public ClientSession.AddressQuery addressQuery(final SimpleString address) throws ActiveMQException
- {
- SessionBindingQueryResponseMessage_V2 response =
- (SessionBindingQueryResponseMessage_V2) sessionChannel.sendBlocking(new SessionBindingQueryMessage(address), PacketImpl.SESS_BINDINGQUERY_RESP_V2);
+ public ClientSession.AddressQuery addressQuery(final SimpleString address) throws ActiveMQException {
+ SessionBindingQueryResponseMessage_V2 response = (SessionBindingQueryResponseMessage_V2) sessionChannel.sendBlocking(new SessionBindingQueryMessage(address), PacketImpl.SESS_BINDINGQUERY_RESP_V2);
return new AddressQueryImpl(response.isExists(), response.getQueueNames(), response.isAutoCreateJmsQueues());
}
-
@Override
- public void closeConsumer(final ClientConsumer consumer) throws ActiveMQException
- {
+ public void closeConsumer(final ClientConsumer consumer) throws ActiveMQException {
sessionChannel.sendBlocking(new SessionConsumerCloseMessage(getConsumerID(consumer)), PacketImpl.NULL_RESPONSE);
}
- public void sendConsumerCredits(final ClientConsumer consumer, final int credits)
- {
+ public void sendConsumerCredits(final ClientConsumer consumer, final int credits) {
sessionChannel.send(new SessionConsumerFlowCreditMessage(getConsumerID(consumer), credits));
}
- public void forceDelivery(final ClientConsumer consumer, final long sequence) throws ActiveMQException
- {
+ public void forceDelivery(final ClientConsumer consumer, final long sequence) throws ActiveMQException {
SessionForceConsumerDelivery request = new SessionForceConsumerDelivery(getConsumerID(consumer), sequence);
sessionChannel.send(request);
}
- public void simpleCommit() throws ActiveMQException
- {
+ public void simpleCommit() throws ActiveMQException {
sessionChannel.sendBlocking(new PacketImpl(PacketImpl.SESS_COMMIT), PacketImpl.NULL_RESPONSE);
}
- public void simpleRollback(boolean lastMessageAsDelivered) throws ActiveMQException
- {
+ public void simpleRollback(boolean lastMessageAsDelivered) throws ActiveMQException {
sessionChannel.sendBlocking(new RollbackMessage(lastMessageAsDelivered), PacketImpl.NULL_RESPONSE);
}
- public void sessionStart() throws ActiveMQException
- {
+ public void sessionStart() throws ActiveMQException {
sessionChannel.send(new PacketImpl(PacketImpl.SESS_START));
}
- public void sessionStop() throws ActiveMQException
- {
+ public void sessionStop() throws ActiveMQException {
sessionChannel.sendBlocking(new PacketImpl(PacketImpl.SESS_STOP), PacketImpl.NULL_RESPONSE);
}
- public void addSessionMetadata(String key, String data) throws ActiveMQException
- {
+ public void addSessionMetadata(String key, String data) throws ActiveMQException {
sessionChannel.sendBlocking(new SessionAddMetaDataMessageV2(key, data), PacketImpl.NULL_RESPONSE);
}
-
- public void addUniqueMetaData(String key, String data) throws ActiveMQException
- {
+ public void addUniqueMetaData(String key, String data) throws ActiveMQException {
sessionChannel.sendBlocking(new SessionUniqueAddMetaDataMessage(key, data), PacketImpl.NULL_RESPONSE);
}
- public void xaCommit(Xid xid, boolean onePhase) throws XAException, ActiveMQException
- {
+ public void xaCommit(Xid xid, boolean onePhase) throws XAException, ActiveMQException {
SessionXACommitMessage packet = new SessionXACommitMessage(xid, onePhase);
SessionXAResponseMessage response = (SessionXAResponseMessage) sessionChannel.sendBlocking(packet, PacketImpl.SESS_XA_RESP);
- if (response.isError())
- {
+ if (response.isError()) {
throw new XAException(response.getResponseCode());
}
- if (ActiveMQClientLogger.LOGGER.isTraceEnabled())
- {
+ if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
ActiveMQClientLogger.LOGGER.trace("finished commit on " + ClientSessionImpl.convert(xid) + " with response = " + response);
}
}
- public void xaEnd(Xid xid, int flags) throws XAException, ActiveMQException
- {
+ public void xaEnd(Xid xid, int flags) throws XAException, ActiveMQException {
Packet packet;
- if (flags == XAResource.TMSUSPEND)
- {
+ if (flags == XAResource.TMSUSPEND) {
packet = new PacketImpl(PacketImpl.SESS_XA_SUSPEND);
}
- else if (flags == XAResource.TMSUCCESS)
- {
+ else if (flags == XAResource.TMSUCCESS) {
packet = new SessionXAEndMessage(xid, false);
}
- else if (flags == XAResource.TMFAIL)
- {
+ else if (flags == XAResource.TMFAIL) {
packet = new SessionXAEndMessage(xid, true);
}
- else
- {
+ else {
throw new XAException(XAException.XAER_INVAL);
}
SessionXAResponseMessage response = (SessionXAResponseMessage) sessionChannel.sendBlocking(packet, PacketImpl.SESS_XA_RESP);
- if (response.isError())
- {
+ if (response.isError()) {
throw new XAException(response.getResponseCode());
}
}
-
- public void sendProducerCreditsMessage(final int credits, final SimpleString address)
- {
+ public void sendProducerCreditsMessage(final int credits, final SimpleString address) {
sessionChannel.send(new SessionRequestProducerCreditsMessage(credits, address));
}
@@ -388,34 +328,31 @@ public class ActiveMQSessionContext extends SessionContext
*
* @return
*/
- public boolean supportsLargeMessage()
- {
+ public boolean supportsLargeMessage() {
return true;
}
@Override
- public int getCreditsOnSendingFull(MessageInternal msgI)
- {
+ public int getCreditsOnSendingFull(MessageInternal msgI) {
return msgI.getEncodeSize();
}
- public void sendFullMessage(MessageInternal msgI, boolean sendBlocking, SendAcknowledgementHandler handler, SimpleString defaultAddress) throws ActiveMQException
- {
+ public void sendFullMessage(MessageInternal msgI,
+ boolean sendBlocking,
+ SendAcknowledgementHandler handler,
+ SimpleString defaultAddress) throws ActiveMQException {
SessionSendMessage packet = new SessionSendMessage(msgI, sendBlocking, handler);
- if (sendBlocking)
- {
+ if (sendBlocking) {
sessionChannel.sendBlocking(packet, PacketImpl.NULL_RESPONSE);
}
- else
- {
+ else {
sessionChannel.sendBatched(packet);
}
}
@Override
- public int sendInitialChunkOnLargeMessage(MessageInternal msgI) throws ActiveMQException
- {
+ public int sendInitialChunkOnLargeMessage(MessageInternal msgI) throws ActiveMQException {
SessionSendLargeMessage initialChunk = new SessionSendLargeMessage(msgI);
sessionChannel.send(initialChunk);
@@ -424,89 +361,78 @@ public class ActiveMQSessionContext extends SessionContext
}
@Override
- public int sendLargeMessageChunk(MessageInternal msgI, long messageBodySize, boolean sendBlocking, boolean lastChunk, byte[] chunk, SendAcknowledgementHandler messageHandler) throws ActiveMQException
- {
+ public int sendLargeMessageChunk(MessageInternal msgI,
+ long messageBodySize,
+ boolean sendBlocking,
+ boolean lastChunk,
+ byte[] chunk,
+ SendAcknowledgementHandler messageHandler) throws ActiveMQException {
final boolean requiresResponse = lastChunk && sendBlocking;
- final SessionSendContinuationMessage chunkPacket =
- new SessionSendContinuationMessage(msgI, chunk, !lastChunk,
- requiresResponse, messageBodySize, messageHandler);
+ final SessionSendContinuationMessage chunkPacket = new SessionSendContinuationMessage(msgI, chunk, !lastChunk, requiresResponse, messageBodySize, messageHandler);
- if (requiresResponse)
- {
+ if (requiresResponse) {
// When sending it blocking, only the last chunk will be blocking.
sessionChannel.sendBlocking(chunkPacket, PacketImpl.NULL_RESPONSE);
}
- else
- {
+ else {
sessionChannel.send(chunkPacket);
}
return chunkPacket.getPacketSize();
}
- public void sendACK(boolean individual, boolean block, final ClientConsumer consumer, final Message message) throws ActiveMQException
- {
+ public void sendACK(boolean individual,
+ boolean block,
+ final ClientConsumer consumer,
+ final Message message) throws ActiveMQException {
PacketImpl messagePacket;
- if (individual)
- {
+ if (individual) {
messagePacket = new SessionIndividualAcknowledgeMessage(getConsumerID(consumer), message.getMessageID(), block);
}
- else
- {
+ else {
messagePacket = new SessionAcknowledgeMessage(getConsumerID(consumer), message.getMessageID(), block);
}
- if (block)
- {
+ if (block) {
sessionChannel.sendBlocking(messagePacket, PacketImpl.NULL_RESPONSE);
}
- else
- {
+ else {
sessionChannel.sendBatched(messagePacket);
}
}
- public void expireMessage(final ClientConsumer consumer, Message message) throws ActiveMQException
- {
+ public void expireMessage(final ClientConsumer consumer, Message message) throws ActiveMQException {
SessionExpireMessage messagePacket = new SessionExpireMessage(getConsumerID(consumer), message.getMessageID());
sessionChannel.send(messagePacket);
}
-
- public void sessionClose() throws ActiveMQException
- {
+ public void sessionClose() throws ActiveMQException {
sessionChannel.sendBlocking(new SessionCloseMessage(), PacketImpl.NULL_RESPONSE);
}
- public void xaForget(Xid xid) throws XAException, ActiveMQException
- {
+ public void xaForget(Xid xid) throws XAException, ActiveMQException {
SessionXAResponseMessage response = (SessionXAResponseMessage) sessionChannel.sendBlocking(new SessionXAForgetMessage(xid), PacketImpl.SESS_XA_RESP);
- if (response.isError())
- {
+ if (response.isError()) {
throw new XAException(response.getResponseCode());
}
}
- public int xaPrepare(Xid xid) throws XAException, ActiveMQException
- {
+ public int xaPrepare(Xid xid) throws XAException, ActiveMQException {
SessionXAPrepareMessage packet = new SessionXAPrepareMessage(xid);
SessionXAResponseMessage response = (SessionXAResponseMessage) sessionChannel.sendBlocking(packet, PacketImpl.SESS_XA_RESP);
- if (response.isError())
- {
+ if (response.isError()) {
throw new XAException(response.getResponseCode());
}
- else
- {
+ else {
return response.getResponseCode();
}
}
- public Xid[] xaScan() throws ActiveMQException
- {
+ public Xid[] xaScan() throws ActiveMQException {
SessionXAGetInDoubtXidsResponseMessage response = (SessionXAGetInDoubtXidsResponseMessage) sessionChannel.sendBlocking(new PacketImpl(PacketImpl.SESS_XA_INDOUBT_XIDS), PacketImpl.SESS_XA_INDOUBT_XIDS_RESP);
List<Xid> xids = response.getXids();
@@ -516,71 +442,63 @@ public class ActiveMQSessionContext extends SessionContext
return xidArray;
}
- public void xaRollback(Xid xid, boolean wasStarted) throws ActiveMQException, XAException
- {
+ public void xaRollback(Xid xid, boolean wasStarted) throws ActiveMQException, XAException {
SessionXARollbackMessage packet = new SessionXARollbackMessage(xid);
SessionXAResponseMessage response = (SessionXAResponseMessage) sessionChannel.sendBlocking(packet, PacketImpl.SESS_XA_RESP);
- if (response.isError())
- {
+ if (response.isError()) {
throw new XAException(response.getResponseCode());
}
}
- public void xaStart(Xid xid, int flags) throws XAException, ActiveMQException
- {
+ public void xaStart(Xid xid, int flags) throws XAException, ActiveMQException {
Packet packet;
- if (flags == XAResource.TMJOIN)
- {
+ if (flags == XAResource.TMJOIN) {
packet = new SessionXAJoinMessage(xid);
}
- else if (flags == XAResource.TMRESUME)
- {
+ else if (flags == XAResource.TMRESUME) {
packet = new SessionXAResumeMessage(xid);
}
- else if (flags == XAResource.TMNOFLAGS)
- {
+ else if (flags == XAResource.TMNOFLAGS) {
// Don't need to flush since the previous end will have done this
packet = new SessionXAStartMessage(xid);
}
- else
- {
+ else {
throw new XAException(XAException.XAER_INVAL);
}
SessionXAResponseMessage response = (SessionXAResponseMessage) sessionChannel.sendBlocking(packet, PacketImpl.SESS_XA_RESP);
- if (response.isError())
- {
+ if (response.isError()) {
ActiveMQClientLogger.LOGGER.errorCallingStart(response.getMessage(), response.getResponseCode());
throw new XAException(response.getResponseCode());
}
}
- public boolean configureTransactionTimeout(int seconds) throws ActiveMQException
- {
+ public boolean configureTransactionTimeout(int seconds) throws ActiveMQException {
SessionXASetTimeoutResponseMessage response = (SessionXASetTimeoutResponseMessage) sessionChannel.sendBlocking(new SessionXASetTimeoutMessage(seconds), PacketImpl.SESS_XA_SET_TIMEOUT_RESP);
return response.isOK();
}
- public int recoverSessionTimeout() throws ActiveMQException
- {
+ public int recoverSessionTimeout() throws ActiveMQException {
SessionXAGetTimeoutResponseMessage response = (SessionXAGetTimeoutResponseMessage) sessionChannel.sendBlocking(new PacketImpl(PacketImpl.SESS_XA_GET_TIMEOUT), PacketImpl.SESS_XA_GET_TIMEOUT_RESP);
return response.getTimeoutSeconds();
}
- public void createQueue(SimpleString address, SimpleString queueName, SimpleString filterString, boolean durable, boolean temp) throws ActiveMQException
- {
+ public void createQueue(SimpleString address,
+ SimpleString queueName,
+ SimpleString filterString,
+ boolean durable,
+ boolean temp) throws ActiveMQException {
CreateQueueMessage request = new CreateQueueMessage(address, queueName, filterString, durable, temp, true);
sessionChannel.sendBlocking(request, PacketImpl.NULL_RESPONSE);
}
@Override
- public boolean reattachOnNewConnection(RemotingConnection newConnection) throws ActiveMQException
- {
+ public boolean reattachOnNewConnection(RemotingConnection newConnection) throws ActiveMQException {
this.remotingConnection = newConnection;
@@ -592,8 +510,7 @@ public class ActiveMQSessionContext extends SessionContext
ReattachSessionResponseMessage response = (ReattachSessionResponseMessage) channel1.sendBlocking(request, PacketImpl.REATTACH_SESSION_RESP);
- if (response.isReattached())
- {
+ if (response.isReattached()) {
ActiveMQClientLogger.LOGGER.replayingCommands(sessionChannel.getID(), response.getLastConfirmedCommandID());
// The session was found on the server - we reattached transparently ok
@@ -601,8 +518,7 @@ public class ActiveMQSessionContext extends SessionContext
return true;
}
- else
- {
+ else {
ActiveMQClientLogger.LOGGER.reconnectCreatingNewSession(sessionChannel.getID());
sessionChannel.clearCommands();
@@ -619,148 +535,102 @@ public class ActiveMQSessionContext extends SessionContext
final boolean autoCommitSends,
final boolean autoCommitAcks,
final boolean preAcknowledge,
- final SimpleString defaultAddress) throws ActiveMQException
- {
- Packet createRequest = new CreateSessionMessage(name,
- sessionChannel.getID(),
- VersionLoader.getVersion().getIncrementingVersion(),
- username,
- password,
- minLargeMessageSize,
- xa,
- autoCommitSends,
- autoCommitAcks,
- preAcknowledge,
- confirmationWindow,
- defaultAddress == null ? null
- : defaultAddress.toString());
+ final SimpleString defaultAddress) throws ActiveMQException {
+ Packet createRequest = new CreateSessionMessage(name, sessionChannel.getID(), VersionLoader.getVersion().getIncrementingVersion(), username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge, confirmationWindow, defaultAddress == null ? null : defaultAddress.toString());
boolean retry;
- do
- {
- try
- {
+ do {
+ try {
getCreateChannel().sendBlocking(createRequest, PacketImpl.CREATESESSION_RESP);
retry = false;
}
- catch (ActiveMQException e)
- {
+ catch (ActiveMQException e) {
// the session was created while its server was starting, retry it:
- if (e.getType() == ActiveMQExceptionType.SESSION_CREATION_REJECTED)
- {
+ if (e.getType() == ActiveMQExceptionType.SESSION_CREATION_REJECTED) {
ActiveMQClientLogger.LOGGER.retryCreateSessionSeverStarting(name);
retry = true;
// sleep a little bit to avoid spinning too much
- try
- {
+ try {
Thread.sleep(10);
}
- catch (InterruptedException ie)
- {
+ catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw e;
}
}
- else
- {
+ else {
throw e;
}
}
- }
- while (retry && !session.isClosing());
+ } while (retry && !session.isClosing());
}
@Override
- public void recreateConsumerOnServer(ClientConsumerInternal consumerInternal) throws ActiveMQException
- {
+ public void recreateConsumerOnServer(ClientConsumerInternal consumerInternal) throws ActiveMQException {
ClientSession.QueueQuery queueInfo = consumerInternal.getQueueInfo();
// We try and recreate any non durable queues, since they probably won't be there unless
// they are defined in broker.xml
// This allows e.g. JMS non durable subs and temporary queues to continue to be used after failover
- if (!queueInfo.isDurable())
- {
- CreateQueueMessage createQueueRequest = new CreateQueueMessage(queueInfo.getAddress(),
- queueInfo.getName(),
- queueInfo.getFilterString(),
- false,
- queueInfo.isTemporary(),
- false);
+ if (!queueInfo.isDurable()) {
+ CreateQueueMessage createQueueRequest = new CreateQueueMessage(queueInfo.getAddress(), queueInfo.getName(), queueInfo.getFilterString(), false, queueInfo.isTemporary(), false);
sendPacketWithoutLock(sessionChannel, createQueueRequest);
}
- SessionCreateConsumerMessage createConsumerRequest = new SessionCreateConsumerMessage(getConsumerID(consumerInternal),
- consumerInternal.getQueueName(),
- consumerInternal.getFilterString(),
- consumerInternal.isBrowseOnly(),
- false);
+ SessionCreateConsumerMessage createConsumerRequest = new SessionCreateConsumerMessage(getConsumerID(consumerInternal), consumerInternal.getQueueName(), consumerInternal.getFilterString(), consumerInternal.isBrowseOnly(), false);
sendPacketWithoutLock(sessionChannel, createConsumerRequest);
int clientWindowSize = consumerInternal.getClientWindowSize();
- if (clientWindowSize != 0)
- {
- SessionConsumerFlowCreditMessage packet = new SessionConsumerFlowCreditMessage(getConsumerID(consumerInternal),
- clientWindowSize);
+ if (clientWindowSize != 0) {
+ SessionConsumerFlowCreditMessage packet = new SessionConsumerFlowCreditMessage(getConsumerID(consumerInternal), clientWindowSize);
sendPacketWithoutLock(sessionChannel, packet);
}
- else
- {
+ else {
// https://jira.jboss.org/browse/HORNETQ-522
- SessionConsumerFlowCreditMessage packet = new SessionConsumerFlowCreditMessage(getConsumerID(consumerInternal),
- 1);
+ SessionConsumerFlowCreditMessage packet = new SessionConsumerFlowCreditMessage(getConsumerID(consumerInternal), 1);
sendPacketWithoutLock(sessionChannel, packet);
}
}
- public void xaFailed(Xid xid) throws ActiveMQException
- {
+ public void xaFailed(Xid xid) throws ActiveMQException {
sendPacketWithoutLock(sessionChannel, new SessionXAAfterFailedMessage(xid));
}
- public void restartSession() throws ActiveMQException
- {
+ public void restartSession() throws ActiveMQException {
sendPacketWithoutLock(sessionChannel, new PacketImpl(PacketImpl.SESS_START));
}
@Override
- public void resetMetadata(HashMap<String, String> metaDataToSend)
- {
+ public void resetMetadata(HashMap<String, String> metaDataToSend) {
// Resetting the metadata after failover
- for (Map.Entry<String, String> entries : metaDataToSend.entrySet())
- {
+ for (Map.Entry<String, String> entries : metaDataToSend.entrySet()) {
sendPacketWithoutLock(sessionChannel, new SessionAddMetaDataMessageV2(entries.getKey(), entries.getValue(), false));
}
}
-
- private Channel getCreateChannel()
- {
+ private Channel getCreateChannel() {
return getCoreConnection().getChannel(1, -1);
}
- private CoreRemotingConnection getCoreConnection()
- {
+ private CoreRemotingConnection getCoreConnection() {
return (CoreRemotingConnection) remotingConnection;
}
-
/**
* This doesn't apply to other protocols probably, so it will be an ActiveMQ Artemis exclusive feature
*
* @throws ActiveMQException
*/
- private void handleConsumerDisconnected(DisconnectConsumerMessage packet) throws ActiveMQException
- {
+ private void handleConsumerDisconnected(DisconnectConsumerMessage packet) throws ActiveMQException {
DisconnectConsumerMessage message = packet;
session.handleConsumerDisconnect(new ActiveMQConsumerContext(message.getConsumerId()));
}
- private void handleReceivedMessagePacket(SessionReceiveMessage messagePacket) throws Exception
- {
+ private void handleReceivedMessagePacket(SessionReceiveMessage messagePacket) throws Exception {
ClientMessageInternal msgi = (ClientMessageInternal) messagePacket.getMessage();
msgi.setDeliveryCount(messagePacket.getDeliveryCount());
@@ -770,8 +640,7 @@ public class ActiveMQSessionContext extends SessionContext
handleReceiveMessage(new ActiveMQConsumerContext(messagePacket.getConsumerID()), msgi);
}
- private void handleReceiveLargeMessage(SessionReceiveLargeMessage serverPacket) throws Exception
- {
+ private void handleReceiveLargeMessage(SessionReceiveLargeMessage serverPacket) throws Exception {
ClientLargeMessageInternal clientLargeMessage = (ClientLargeMessageInternal) serverPacket.getLargeMessage();
clientLargeMessage.setFlowControlSize(serverPacket.getPacketSize());
@@ -781,73 +650,55 @@ public class ActiveMQSessionContext extends SessionContext
handleReceiveLargeMessage(new ActiveMQConsumerContext(serverPacket.getConsumerID()), clientLargeMessage, serverPacket.getLargeMessageSize());
}
-
- private void handleReceiveContinuation(SessionReceiveContinuationMessage continuationPacket) throws Exception
- {
- handleReceiveContinuation(new ActiveMQConsumerContext(continuationPacket.getConsumerID()), continuationPacket.getBody(), continuationPacket.getPacketSize(),
- continuationPacket.isContinues());
+ private void handleReceiveContinuation(SessionReceiveContinuationMessage continuationPacket) throws Exception {
+ handleReceiveContinuation(new ActiveMQConsumerContext(continuationPacket.getConsumerID()), continuationPacket.getBody(), continuationPacket.getPacketSize(), continuationPacket.isContinues());
}
-
- protected void handleReceiveProducerCredits(SessionProducerCreditsMessage message)
- {
+ protected void handleReceiveProducerCredits(SessionProducerCreditsMessage message) {
handleReceiveProducerCredits(message.getAddress(), message.getCredits());
}
-
- protected void handleReceiveProducerFailCredits(SessionProducerCreditsFailMessage message)
- {
+ protected void handleReceiveProducerFailCredits(SessionProducerCreditsFailMessage message) {
handleReceiveProducerFailCredits(message.getAddress(), message.getCredits());
}
- class ClientSessionPacketHandler implements ChannelHandler
- {
+ class ClientSessionPacketHandler implements ChannelHandler {
- public void handlePacket(final Packet packet)
- {
+ public void handlePacket(final Packet packet) {
byte type = packet.getType();
- try
- {
- switch (type)
- {
- case DISCONNECT_CONSUMER:
- {
+ try {
+ switch (type) {
+ case DISCONNECT_CONSUMER: {
handleConsumerDisconnected((DisconnectConsumerMessage) packet);
break;
}
- case SESS_RECEIVE_CONTINUATION:
- {
+ case SESS_RECEIVE_CONTINUATION: {
handleReceiveContinuation((SessionReceiveContinuationMessage) packet);
break;
}
- case SESS_RECEIVE_MSG:
- {
+ case SESS_RECEIVE_MSG: {
handleReceivedMessagePacket((SessionReceiveMessage) packet);
break;
}
- case SESS_RECEIVE_LARGE_MSG:
- {
+ case SESS_RECEIVE_LARGE_MSG: {
handleReceiveLargeMessage((SessionReceiveLargeMessage) packet);
break;
}
- case PacketImpl.SESS_PRODUCER_CREDITS:
- {
+ case PacketImpl.SESS_PRODUCER_CREDITS: {
handleReceiveProducerCredits((SessionProducerCreditsMessage) packet);
break;
}
- case PacketImpl.SESS_PRODUCER_FAIL_CREDITS:
- {
+ case PacketImpl.SESS_PRODUCER_FAIL_CREDITS: {
handleReceiveProducerFailCredits((SessionProducerCreditsFailMessage) packet);
break;
}
- case EXCEPTION:
- {
+ case EXCEPTION: {
// We can only log these exceptions
// maybe we should cache it on SessionContext and throw an exception on any next calls
ActiveMQExceptionMessage mem = (ActiveMQExceptionMessage) packet;
@@ -856,14 +707,12 @@ public class ActiveMQSessionContext extends SessionContext
break;
}
- default:
- {
+ default: {
throw new IllegalStateException("Invalid packet: " + type);
}
}
}
- catch (Exception e)
- {
+ catch (Exception e) {
ActiveMQClientLogger.LOGGER.failedToHandlePacket(e);
}
@@ -871,58 +720,46 @@ public class ActiveMQSessionContext extends SessionContext
}
}
- private long getConsumerID(ClientConsumer consumer)
- {
- return ((ActiveMQConsumerContext)consumer.getConsumerContext()).getId();
+ private long getConsumerID(ClientConsumer consumer) {
+ return ((ActiveMQConsumerContext) consumer.getConsumerContext()).getId();
}
- private ClassLoader lookupTCCL()
- {
- return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>()
- {
- public ClassLoader run()
- {
+ private ClassLoader lookupTCCL() {
+ return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>() {
+ public ClassLoader run() {
return Thread.currentThread().getContextClassLoader();
}
});
}
- private int calcWindowSize(final int windowSize)
- {
+ private int calcWindowSize(final int windowSize) {
int clientWindowSize;
- if (windowSize == -1)
- {
+ if (windowSize == -1) {
// No flow control - buffer can increase without bound! Only use with
// caution for very fast consumers
clientWindowSize = -1;
}
- else if (windowSize == 0)
- {
+ else if (windowSize == 0) {
// Slow consumer - no buffering
clientWindowSize = 0;
}
- else if (windowSize == 1)
- {
+ else if (windowSize == 1) {
// Slow consumer = buffer 1
clientWindowSize = 1;
}
- else if (windowSize > 1)
- {
+ else if (windowSize > 1) {
// Client window size is half server window size
clientWindowSize = windowSize >> 1;
}
- else
- {
+ else {
throw ActiveMQClientMessageBundle.BUNDLE.invalidWindowSize(windowSize);
}
return clientWindowSize;
}
-
- private void sendPacketWithoutLock(final Channel parameterChannel, final Packet packet)
- {
+ private void sendPacketWithoutLock(final Channel parameterChannel, final Packet packet) {
packet.setChannelID(parameterChannel.getID());
Connection conn = parameterChannel.getConnection().getTransportConnection();
@@ -932,5 +769,4 @@ public class ActiveMQSessionContext extends SessionContext
conn.write(buffer, false, false);
}
-
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/BackwardsCompatibilityUtils.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/BackwardsCompatibilityUtils.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/BackwardsCompatibilityUtils.java
index 2b32f1f..9419796 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/BackwardsCompatibilityUtils.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/BackwardsCompatibilityUtils.java
@@ -23,25 +23,21 @@ import org.apache.activemq.artemis.api.core.client.TopologyMember;
/**
* This is a utility class to house any HornetQ client specific backwards compatibility methods.
- *
*/
-public class BackwardsCompatibilityUtils
-{
+public class BackwardsCompatibilityUtils {
+
private static int INITIAL_ACTIVEMQ_INCREMENTING_VERSION = 126;
- public static Pair<TransportConfiguration, TransportConfiguration> getTCPair(int clientIncrementingVersion, TopologyMember member)
- {
- if (clientIncrementingVersion < INITIAL_ACTIVEMQ_INCREMENTING_VERSION)
- {
+ public static Pair<TransportConfiguration, TransportConfiguration> getTCPair(int clientIncrementingVersion,
+ TopologyMember member) {
+ if (clientIncrementingVersion < INITIAL_ACTIVEMQ_INCREMENTING_VERSION) {
return new Pair<>(replaceClassName(member.getLive()), replaceClassName(member.getBackup()));
}
return new Pair<>(member.getLive(), member.getBackup());
}
- private static TransportConfiguration replaceClassName(TransportConfiguration tc)
- {
- if (tc != null)
- {
+ private static TransportConfiguration replaceClassName(TransportConfiguration tc) {
+ if (tc != null) {
String className = tc.getFactoryClassName().replace("org.apache.activemq.artemis", "org.hornetq").replace("ActiveMQ", "HornetQ");
return new TransportConfiguration(className, tc.getParams(), tc.getName());
}