You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2015/08/10 17:13:39 UTC
[40/53] [abbrv] [partial] activemq-artemis git commit: automatic
checkstyle change
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientMessageBundle.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientMessageBundle.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientMessageBundle.java
index 09cb114..f0543d1 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientMessageBundle.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientMessageBundle.java
@@ -16,7 +16,6 @@
*/
package org.apache.activemq.artemis.core.client;
-
import org.apache.activemq.artemis.api.core.ActiveMQAddressFullException;
import org.apache.activemq.artemis.api.core.ActiveMQConnectionTimedOutException;
import org.apache.activemq.artemis.api.core.ActiveMQDisconnectedException;
@@ -46,8 +45,8 @@ import org.w3c.dom.Node;
* so 119000 to 119999
*/
@MessageBundle(projectCode = "AMQ")
-public interface ActiveMQClientMessageBundle
-{
+public interface ActiveMQClientMessageBundle {
+
ActiveMQClientMessageBundle BUNDLE = Messages.getBundle(ActiveMQClientMessageBundle.class);
@Message(id = 119000, value = "ClientSession closed while creating session")
@@ -65,83 +64,83 @@ public interface ActiveMQClientMessageBundle
@Message(id = 119005, value = "Exception in Netty transport")
ActiveMQInternalErrorException nettyError();
- @Message(id = 119006, value = "Channel disconnected")
+ @Message(id = 119006, value = "Channel disconnected")
ActiveMQNotConnectedException channelDisconnected();
- @Message(id = 119007, value = "Cannot connect to server(s). Tried with all available servers.")
+ @Message(id = 119007, value = "Cannot connect to server(s). Tried with all available servers.")
ActiveMQNotConnectedException cannotConnectToServers();
- @Message(id = 119008, value = "Failed to connect to any static connectors")
+ @Message(id = 119008, value = "Failed to connect to any static connectors")
ActiveMQNotConnectedException cannotConnectToStaticConnectors(@Cause Exception e);
- @Message(id = 119009, value = "Failed to connect to any static connectors")
+ @Message(id = 119009, value = "Failed to connect to any static connectors")
ActiveMQNotConnectedException cannotConnectToStaticConnectors2();
- @Message(id = 119010, value = "Connection is destroyed")
+ @Message(id = 119010, value = "Connection is destroyed")
ActiveMQNotConnectedException connectionDestroyed();
- @Message(id = 119011, value = "Did not receive data from server for {0}", format = Message.Format.MESSAGE_FORMAT)
+ @Message(id = 119011, value = "Did not receive data from server for {0}", format = Message.Format.MESSAGE_FORMAT)
ActiveMQConnectionTimedOutException connectionTimedOut(Connection transportConnection);
- @Message(id = 119012, value = "Timed out waiting to receive initial broadcast from cluster")
+ @Message(id = 119012, value = "Timed out waiting to receive initial broadcast from cluster")
ActiveMQConnectionTimedOutException connectionTimedOutInInitialBroadcast();
- @Message(id = 119013, value = "Timed out waiting to receive cluster topology. Group:{0}", format = Message.Format.MESSAGE_FORMAT)
+ @Message(id = 119013, value = "Timed out waiting to receive cluster topology. Group:{0}", format = Message.Format.MESSAGE_FORMAT)
ActiveMQConnectionTimedOutException connectionTimedOutOnReceiveTopology(DiscoveryGroup discoveryGroup);
- @Message(id = 119014, value = "Timed out after waiting {0} ms for response when sending packet {1}", format = Message.Format.MESSAGE_FORMAT)
+ @Message(id = 119014, value = "Timed out after waiting {0} ms for response when sending packet {1}", format = Message.Format.MESSAGE_FORMAT)
ActiveMQConnectionTimedOutException timedOutSendingPacket(long timeout, Byte type);
- @Message(id = 119015, value = "The connection was disconnected because of server shutdown")
+ @Message(id = 119015, value = "The connection was disconnected because of server shutdown")
ActiveMQDisconnectedException disconnected();
- @Message(id = 119016, value = "Connection failure detected. Unblocking a blocking call that will never get a response")
+ @Message(id = 119016, value = "Connection failure detected. Unblocking a blocking call that will never get a response")
ActiveMQUnBlockedException unblockingACall(@Cause Throwable t);
- @Message(id = 119017, value = "Consumer is closed")
+ @Message(id = 119017, value = "Consumer is closed")
ActiveMQObjectClosedException consumerClosed();
- @Message(id = 119018, value = "Producer is closed")
+ @Message(id = 119018, value = "Producer is closed")
ActiveMQObjectClosedException producerClosed();
- @Message(id = 119019, value = "Session is closed")
+ @Message(id = 119019, value = "Session is closed")
ActiveMQObjectClosedException sessionClosed();
- @Message(id = 119020, value = "Cannot call receive(...) - a MessageHandler is set")
+ @Message(id = 119020, value = "Cannot call receive(...) - a MessageHandler is set")
ActiveMQIllegalStateException messageHandlerSet();
- @Message(id = 119021, value = "Cannot set MessageHandler - consumer is in receive(...)")
+ @Message(id = 119021, value = "Cannot set MessageHandler - consumer is in receive(...)")
ActiveMQIllegalStateException inReceive();
- @Message(id = 119022, value = "Header size ({0}) is too big, use the messageBody for large data, or increase minLargeMessageSize",
- format = Message.Format.MESSAGE_FORMAT)
+ @Message(id = 119022, value = "Header size ({0}) is too big, use the messageBody for large data, or increase minLargeMessageSize",
+ format = Message.Format.MESSAGE_FORMAT)
ActiveMQIllegalStateException headerSizeTooBig(Integer headerSize);
- @Message(id = 119023, value = "The large message lost connection with its session, either because of a rollback or a closed session")
+ @Message(id = 119023, value = "The large message lost connection with its session, either because of a rollback or a closed session")
ActiveMQIllegalStateException largeMessageLostSession();
- @Message(id = 119024, value = "Could not select a TransportConfiguration to create SessionFactory")
+ @Message(id = 119024, value = "Could not select a TransportConfiguration to create SessionFactory")
ActiveMQIllegalStateException noTCForSessionFactory();
@Message(id = 119025, value = "Error saving the message body")
ActiveMQLargeMessageException errorSavingBody(@Cause Exception e);
- @Message(id = 119026, value = "Error reading the LargeMessageBody")
+ @Message(id = 119026, value = "Error reading the LargeMessageBody")
ActiveMQLargeMessageException errorReadingBody(@Cause Exception e);
- @Message(id = 119027, value = "Error closing stream from LargeMessageBody")
+ @Message(id = 119027, value = "Error closing stream from LargeMessageBody")
ActiveMQLargeMessageException errorClosingLargeMessage(@Cause Exception e);
- @Message(id = 119028, value = "Timeout waiting for LargeMessage Body")
+ @Message(id = 119028, value = "Timeout waiting for LargeMessage Body")
ActiveMQLargeMessageException timeoutOnLargeMessage();
- @Message(id = 119029, value = "Error writing body of message")
+ @Message(id = 119029, value = "Error writing body of message")
ActiveMQLargeMessageException errorWritingLargeMessage(@Cause Exception e);
- @Message(id = 119030, value = "The transaction was rolled back on failover to a backup server")
+ @Message(id = 119030, value = "The transaction was rolled back on failover to a backup server")
ActiveMQTransactionRolledBackException txRolledBack();
- @Message(id = 119031, value = "The transaction was rolled back on failover however commit may have been successful")
+ @Message(id = 119031, value = "The transaction was rolled back on failover however commit may have been successful")
ActiveMQTransactionOutcomeUnknownException txOutcomeUnknown();
@Message(id = 119032, value = "Invalid type: {0}", format = Message.Format.MESSAGE_FORMAT)
@@ -151,7 +150,7 @@ public interface ActiveMQClientMessageBundle
IllegalArgumentException invalidEncodeType(Object type);
@Message(id = 119034, value = "Params for management operations must be of the following type: int long double String boolean Map or array thereof but found {0}",
- format = Message.Format.MESSAGE_FORMAT)
+ format = Message.Format.MESSAGE_FORMAT)
IllegalArgumentException invalidManagementParam(Object type);
@Message(id = 119035, value = "Invalid window size {0}", format = Message.Format.MESSAGE_FORMAT)
@@ -223,8 +222,7 @@ public interface ActiveMQClientMessageBundle
@Message(id = 119058, value = "Address \"{0}\" is full. Message encode size = {1}B", format = Message.Format.MESSAGE_FORMAT)
ActiveMQAddressFullException addressIsFull(String addressName, int size);
- @Message(id = 119059, value = "Interceptor {0} rejected packet in a blocking call. This call will never complete."
- , format = Message.Format.MESSAGE_FORMAT)
+ @Message(id = 119059, value = "Interceptor {0} rejected packet in a blocking call. This call will never complete.", format = Message.Format.MESSAGE_FORMAT)
ActiveMQInterceptorRejectedPacketException interceptorRejectedPacket(String interceptionResult);
@Message(id = 119060, value = "Large Message Transmission interrupted on consumer shutdown.")
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ActiveMQXAResource.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ActiveMQXAResource.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ActiveMQXAResource.java
index 9e4756c..7ac184f 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ActiveMQXAResource.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ActiveMQXAResource.java
@@ -18,7 +18,7 @@ package org.apache.activemq.artemis.core.client.impl;
import javax.transaction.xa.XAResource;
-public interface ActiveMQXAResource extends XAResource
-{
+public interface ActiveMQXAResource extends XAResource {
+
XAResource getResource();
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AddressQueryImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AddressQueryImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AddressQueryImpl.java
index a92b8af..96aed1d 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AddressQueryImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AddressQueryImpl.java
@@ -22,33 +22,31 @@ import java.util.List;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientSession;
-public class AddressQueryImpl implements ClientSession.AddressQuery
-{
+public class AddressQueryImpl implements ClientSession.AddressQuery {
+
private final boolean exists;
private final ArrayList<SimpleString> queueNames;
private final boolean autoCreateJmsQueues;
- public AddressQueryImpl(final boolean exists, final List<SimpleString> queueNames, final boolean autoCreateJmsQueues)
- {
+ public AddressQueryImpl(final boolean exists,
+ final List<SimpleString> queueNames,
+ final boolean autoCreateJmsQueues) {
this.exists = exists;
this.queueNames = new ArrayList<SimpleString>(queueNames);
this.autoCreateJmsQueues = autoCreateJmsQueues;
}
- public List<SimpleString> getQueueNames()
- {
+ public List<SimpleString> getQueueNames() {
return queueNames;
}
- public boolean isExists()
- {
+ public boolean isExists() {
return exists;
}
- public boolean isAutoCreateJmsQueues()
- {
+ public boolean isAutoCreateJmsQueues() {
return autoCreateJmsQueues;
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AfterConnectInternalListener.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AfterConnectInternalListener.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AfterConnectInternalListener.java
index 0fc7cc7..8267a70 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AfterConnectInternalListener.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AfterConnectInternalListener.java
@@ -21,7 +21,7 @@ package org.apache.activemq.artemis.core.client.impl;
* This listener is not part of the API and shouldn't be used by users.
* (if you do so we can't guarantee any API compatibility on this class)
*/
-public interface AfterConnectInternalListener
-{
+public interface AfterConnectInternalListener {
+
void onConnection(ClientSessionFactoryInternal sf);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java
index b9cccec..080a5dd 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java
@@ -44,8 +44,7 @@ import org.apache.activemq.artemis.utils.PriorityLinkedListImpl;
import org.apache.activemq.artemis.utils.ReusableLatch;
import org.apache.activemq.artemis.utils.TokenBucketLimiter;
-public final class ClientConsumerImpl implements ClientConsumerInternal
-{
+public final class ClientConsumerImpl implements ClientConsumerInternal {
// Constants
// ------------------------------------------------------------------------------------
@@ -145,8 +144,7 @@ public final class ClientConsumerImpl implements ClientConsumerInternal
final Executor flowControlExecutor,
final SessionContext sessionContext,
final ClientSession.QueueQuery queueInfo,
- final ClassLoader contextClassLoader)
- {
+ final ClassLoader contextClassLoader) {
this.consumerContext = consumerContext;
this.queueName = queueName;
@@ -177,34 +175,28 @@ public final class ClientConsumerImpl implements ClientConsumerInternal
// ClientConsumer implementation
// -----------------------------------------------------------------
- public ConsumerContext getConsumerContext()
- {
+ public ConsumerContext getConsumerContext() {
return consumerContext;
}
- private ClientMessage receive(final long timeout, final boolean forcingDelivery) throws ActiveMQException
- {
+ private ClientMessage receive(final long timeout, final boolean forcingDelivery) throws ActiveMQException {
checkClosed();
- if (largeMessageReceived != null)
- {
+ if (largeMessageReceived != null) {
// Check if there are pending packets to be received
largeMessageReceived.discardBody();
largeMessageReceived = null;
}
- if (rateLimiter != null)
- {
+ if (rateLimiter != null) {
rateLimiter.limit();
}
- if (handler != null)
- {
+ if (handler != null) {
throw ActiveMQClientMessageBundle.BUNDLE.messageHandlerSet();
}
- if (clientWindowSize == 0)
- {
+ if (clientWindowSize == 0) {
startSlowConsumer();
}
@@ -219,47 +211,36 @@ public final class ClientConsumerImpl implements ClientConsumerInternal
long toWait = timeout == 0 ? Long.MAX_VALUE : timeout;
- try
- {
- while (true)
- {
+ try {
+ while (true) {
ClientMessageInternal m = null;
- synchronized (this)
- {
- while ((stopped || (m = buffer.poll()) == null) && !closed && toWait > 0)
- {
- if (start == -1)
- {
+ synchronized (this) {
+ while ((stopped || (m = buffer.poll()) == null) && !closed && toWait > 0) {
+ if (start == -1) {
start = System.currentTimeMillis();
}
- if (m == null && forcingDelivery)
- {
- if (stopped)
- {
+ if (m == null && forcingDelivery) {
+ if (stopped) {
break;
}
// we only force delivery once per call to receive
- if (!deliveryForced)
- {
+ if (!deliveryForced) {
callForceDelivery = true;
break;
}
}
- try
- {
+ try {
wait(toWait);
}
- catch (InterruptedException e)
- {
+ catch (InterruptedException e) {
throw new ActiveMQInterruptedException(e);
}
- if (m != null || closed)
- {
+ if (m != null || closed) {
break;
}
@@ -271,26 +252,21 @@ public final class ClientConsumerImpl implements ClientConsumerInternal
}
}
- if (failedOver)
- {
- if (m == null)
- {
+ if (failedOver) {
+ if (m == null) {
// if failed over and the buffer is null, we reset the state and try it again
failedOver = false;
deliveryForced = false;
toWait = timeout == 0 ? Long.MAX_VALUE : timeout;
continue;
}
- else
- {
+ else {
failedOver = false;
}
}
- if (callForceDelivery)
- {
- if (isTrace)
- {
+ if (callForceDelivery) {
+ if (isTrace) {
ActiveMQClientLogger.LOGGER.trace("Forcing delivery");
}
// JBPAPP-6030 - Calling forceDelivery outside of the lock to avoid distributed dead locks
@@ -300,32 +276,26 @@ public final class ClientConsumerImpl implements ClientConsumerInternal
continue;
}
- if (m != null)
- {
+ if (m != null) {
session.workDone();
- if (m.containsProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE))
- {
+ if (m.containsProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE)) {
long seq = m.getLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE);
// Need to check if forceDelivery was called at this call
// As we could be receiving a message that came from a previous call
- if (forcingDelivery && deliveryForced && seq == forceDeliveryCount - 1)
- {
+ if (forcingDelivery && deliveryForced && seq == forceDeliveryCount - 1) {
// forced delivery messages are discarded, nothing has been delivered by the queue
resetIfSlowConsumer();
- if (isTrace)
- {
+ if (isTrace) {
ActiveMQClientLogger.LOGGER.trace("There was nothing on the queue, leaving it now:: returning null");
}
return null;
}
- else
- {
- if (isTrace)
- {
+ else {
+ if (isTrace) {
ActiveMQClientLogger.LOGGER.trace("Ignored force delivery answer as it belonged to another call");
}
// Ignore the message
@@ -337,43 +307,35 @@ public final class ClientConsumerImpl implements ClientConsumerInternal
flowControlBeforeConsumption(m);
- if (expired)
- {
+ if (expired) {
m.discardBody();
session.expire(this, m);
- if (clientWindowSize == 0)
- {
+ if (clientWindowSize == 0) {
startSlowConsumer();
}
- if (toWait > 0)
- {
+ if (toWait > 0) {
continue;
}
- else
- {
+ else {
return null;
}
}
- if (m.isLargeMessage())
- {
+ if (m.isLargeMessage()) {
largeMessageReceived = m;
}
- if (isTrace)
- {
+ if (isTrace) {
ActiveMQClientLogger.LOGGER.trace("Returning " + m);
}
return m;
}
- else
- {
- if (isTrace)
- {
+ else {
+ if (isTrace) {
ActiveMQClientLogger.LOGGER.trace("Returning null");
}
resetIfSlowConsumer();
@@ -381,36 +343,30 @@ public final class ClientConsumerImpl implements ClientConsumerInternal
}
}
}
- finally
- {
+ finally {
receiverThread = null;
}
}
- public ClientMessage receive(final long timeout) throws ActiveMQException
- {
+ public ClientMessage receive(final long timeout) throws ActiveMQException {
ClientMessage msg = receive(timeout, false);
- if (msg == null && !closed)
- {
+ if (msg == null && !closed) {
msg = receive(0, true);
}
return msg;
}
- public ClientMessage receive() throws ActiveMQException
- {
+ public ClientMessage receive() throws ActiveMQException {
return receive(0, false);
}
- public ClientMessage receiveImmediate() throws ActiveMQException
- {
+ public ClientMessage receiveImmediate() throws ActiveMQException {
return receive(0, true);
}
- public MessageHandler getMessageHandler() throws ActiveMQException
- {
+ public MessageHandler getMessageHandler() throws ActiveMQException {
checkClosed();
return handler;
@@ -418,61 +374,52 @@ public final class ClientConsumerImpl implements ClientConsumerInternal
// Must be synchronized since messages may be arriving while handler is being set and might otherwise end
// up not queueing enough executors - so messages get stranded
- public synchronized ClientConsumerImpl setMessageHandler(final MessageHandler theHandler) throws ActiveMQException
- {
+ public synchronized ClientConsumerImpl setMessageHandler(final MessageHandler theHandler) throws ActiveMQException {
checkClosed();
- if (receiverThread != null)
- {
+ if (receiverThread != null) {
throw ActiveMQClientMessageBundle.BUNDLE.inReceive();
}
boolean noPreviousHandler = handler == null;
- if (handler != theHandler && clientWindowSize == 0)
- {
+ if (handler != theHandler && clientWindowSize == 0) {
startSlowConsumer();
}
handler = theHandler;
// if no previous handler existed queue up messages for delivery
- if (handler != null && noPreviousHandler)
- {
+ if (handler != null && noPreviousHandler) {
requeueExecutors();
}
// if unsetting a previous handler may be in onMessage so wait for completion
- else if (handler == null && !noPreviousHandler)
- {
+ else if (handler == null && !noPreviousHandler) {
waitForOnMessageToComplete(true);
}
return this;
}
- public void close() throws ActiveMQException
- {
+ public void close() throws ActiveMQException {
doCleanUp(true);
}
/**
* To be used by MDBs to stop any more handling of messages.
*
- * @throws ActiveMQException
* @param future the future to run once the onMessage Thread has completed
+ * @throws ActiveMQException
*/
- public Thread prepareForClose(final FutureLatch future) throws ActiveMQException
- {
+ public Thread prepareForClose(final FutureLatch future) throws ActiveMQException {
closing = true;
resetLargeMessageController();
//execute the future after the last onMessage call
- sessionExecutor.execute(new Runnable()
- {
+ sessionExecutor.execute(new Runnable() {
@Override
- public void run()
- {
+ public void run() {
future.run();
}
});
@@ -480,37 +427,29 @@ public final class ClientConsumerImpl implements ClientConsumerInternal
return onMessageThread;
}
- public void cleanUp()
- {
- try
- {
+ public void cleanUp() {
+ try {
doCleanUp(false);
}
- catch (ActiveMQException e)
- {
+ catch (ActiveMQException e) {
ActiveMQClientLogger.LOGGER.warn("problem cleaning up: " + this);
}
}
- public boolean isClosed()
- {
+ public boolean isClosed() {
return closed;
}
- public void stop(final boolean waitForOnMessage) throws ActiveMQException
- {
+ public void stop(final boolean waitForOnMessage) throws ActiveMQException {
waitForOnMessageToComplete(waitForOnMessage);
- if (browseOnly)
- {
+ if (browseOnly) {
// stop shouldn't affect browser delivery
return;
}
- synchronized (this)
- {
- if (stopped)
- {
+ synchronized (this) {
+ if (stopped) {
return;
}
@@ -518,8 +457,7 @@ public final class ClientConsumerImpl implements ClientConsumerInternal
}
}
- public void clearAtFailover()
- {
+ public void clearAtFailover() {
clearBuffer();
// failover will issue a start later
@@ -536,70 +474,57 @@ public final class ClientConsumerImpl implements ClientConsumerInternal
ackIndividually = false;
}
- public synchronized void start()
- {
+ public synchronized void start() {
stopped = false;
requeueExecutors();
}
- public Exception getLastException()
- {
+ public Exception getLastException() {
return lastException;
}
// ClientConsumerInternal implementation
// --------------------------------------------------------------
- public ClientSession.QueueQuery getQueueInfo()
- {
+ public ClientSession.QueueQuery getQueueInfo() {
return queueInfo;
}
- public SimpleString getFilterString()
- {
+ public SimpleString getFilterString() {
return filterString;
}
- public SimpleString getQueueName()
- {
+ public SimpleString getQueueName() {
return queueName;
}
- public boolean isBrowseOnly()
- {
+ public boolean isBrowseOnly() {
return browseOnly;
}
- public synchronized void handleMessage(final ClientMessageInternal message) throws Exception
- {
- if (closing)
- {
+ public synchronized void handleMessage(final ClientMessageInternal message) throws Exception {
+ if (closing) {
// This is ok - we just ignore the message
return;
}
- if (message.getBooleanProperty(Message.HDR_LARGE_COMPRESSED))
- {
+ if (message.getBooleanProperty(Message.HDR_LARGE_COMPRESSED)) {
handleCompressedMessage(message);
}
- else
- {
+ else {
handleRegularMessage(message);
}
}
- private void handleRegularMessage(ClientMessageInternal message)
- {
- if (message.getAddress() == null)
- {
+ private void handleRegularMessage(ClientMessageInternal message) {
+ if (message.getAddress() == null) {
message.setAddressTransient(queueInfo.getAddress());
}
message.onReceipt(this);
- if (!ackIndividually && message.getPriority() != 4 && !message.containsProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE))
- {
+ if (!ackIndividually && message.getPriority() != 4 && !message.containsProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE)) {
// We have messages of different priorities so we need to ack them individually since the order
// of them in the ServerConsumerImpl delivery list might not be the same as the order they are
// consumed in, which means that acking all up to won't work
@@ -609,16 +534,13 @@ public final class ClientConsumerImpl implements ClientConsumerInternal
// Add it to the buffer
buffer.addTail(message, message.getPriority());
- if (handler != null)
- {
+ if (handler != null) {
// Execute using executor
- if (!stopped)
- {
+ if (!stopped) {
queueExecutor();
}
}
- else
- {
+ else {
notify();
}
}
@@ -635,17 +557,14 @@ public final class ClientConsumerImpl implements ClientConsumerInternal
* Say that you sent a 1G message full of spaces. That could be just bellow 100K compressed but you wouldn't have
* enough memory to decompress it
*/
- private void handleCompressedMessage(final ClientMessageInternal clMessage) throws Exception
- {
+ private void handleCompressedMessage(final ClientMessageInternal clMessage) throws Exception {
ClientLargeMessageImpl largeMessage = new ClientLargeMessageImpl();
largeMessage.retrieveExistingData(clMessage);
File largeMessageCache = null;
- if (session.isCacheLargeMessageClient())
- {
- largeMessageCache = File.createTempFile("tmp-large-message-" + largeMessage.getMessageID() + "-",
- ".tmp");
+ if (session.isCacheLargeMessageClient()) {
+ largeMessageCache = File.createTempFile("tmp-large-message-" + largeMessage.getMessageID() + "-", ".tmp");
largeMessageCache.deleteOnExit();
}
@@ -667,10 +586,9 @@ public final class ClientConsumerImpl implements ClientConsumerInternal
handleRegularMessage(largeMessage);
}
- public synchronized void handleLargeMessage(final ClientLargeMessageInternal clientLargeMessage, long largeMessageSize) throws Exception
- {
- if (closing)
- {
+ public synchronized void handleLargeMessage(final ClientLargeMessageInternal clientLargeMessage,
+ long largeMessageSize) throws Exception {
+ if (closing) {
// This is ok - we just ignore the message
return;
}
@@ -678,10 +596,8 @@ public final class ClientConsumerImpl implements ClientConsumerInternal
// Flow control for the first packet, we will have others
File largeMessageCache = null;
- if (session.isCacheLargeMessageClient())
- {
- largeMessageCache = File.createTempFile("tmp-large-message-" + clientLargeMessage.getMessageID() + "-",
- ".tmp");
+ if (session.isCacheLargeMessageClient()) {
+ largeMessageCache = File.createTempFile("tmp-large-message-" + clientLargeMessage.getMessageID() + "-", ".tmp");
largeMessageCache.deleteOnExit();
}
@@ -691,74 +607,61 @@ public final class ClientConsumerImpl implements ClientConsumerInternal
currentLargeMessageController = new LargeMessageControllerImpl(this, largeMessageSize, callTimeout, largeMessageCache);
- if (clientLargeMessage.isCompressed())
- {
+ if (clientLargeMessage.isCompressed()) {
clientLargeMessage.setLargeMessageController(new CompressedLargeMessageControllerImpl(currentLargeMessageController));
}
- else
- {
+ else {
clientLargeMessage.setLargeMessageController(currentLargeMessageController);
}
handleRegularMessage(clientLargeMessage);
}
- public synchronized void handleLargeMessageContinuation(final byte[] chunk, final int flowControlSize, final boolean isContinues) throws Exception
- {
- if (closing)
- {
+ public synchronized void handleLargeMessageContinuation(final byte[] chunk,
+ final int flowControlSize,
+ final boolean isContinues) throws Exception {
+ if (closing) {
return;
}
- if (currentLargeMessageController == null)
- {
- if (isTrace)
- {
+ if (currentLargeMessageController == null) {
+ if (isTrace) {
ActiveMQClientLogger.LOGGER.trace("Sending back credits for largeController = null " + flowControlSize);
}
flowControl(flowControlSize, false);
}
- else
- {
+ else {
currentLargeMessageController.addPacket(chunk, flowControlSize, isContinues);
}
}
- public void clear(boolean waitForOnMessage) throws ActiveMQException
- {
- synchronized (this)
- {
+ public void clear(boolean waitForOnMessage) throws ActiveMQException {
+ synchronized (this) {
// Need to send credits for the messages in the buffer
Iterator<ClientMessageInternal> iter = buffer.iterator();
- while (iter.hasNext())
- {
- try
- {
+ while (iter.hasNext()) {
+ try {
ClientMessageInternal message = iter.next();
- if (message.isLargeMessage())
- {
+ if (message.isLargeMessage()) {
ClientLargeMessageInternal largeMessage = (ClientLargeMessageInternal) message;
largeMessage.getLargeMessageController().cancel();
}
flowControlBeforeConsumption(message);
}
- catch (Exception e)
- {
+ catch (Exception e) {
ActiveMQClientLogger.LOGGER.errorClearingMessages(e);
}
}
clearBuffer();
- try
- {
+ try {
resetLargeMessageController();
}
- catch (Throwable e)
- {
+ catch (Throwable e) {
// nothing that could be done here
ActiveMQClientLogger.LOGGER.errorClearingMessages(e);
}
@@ -769,64 +672,51 @@ public final class ClientConsumerImpl implements ClientConsumerInternal
waitForOnMessageToComplete(waitForOnMessage);
}
- private void resetLargeMessageController()
- {
+ private void resetLargeMessageController() {
LargeMessageController controller = currentLargeMessageController;
- if (controller != null)
- {
+ if (controller != null) {
controller.cancel();
currentLargeMessageController = null;
}
}
- public int getClientWindowSize()
- {
+ public int getClientWindowSize() {
return clientWindowSize;
}
- public int getBufferSize()
- {
+ public int getBufferSize() {
return buffer.size();
}
- public void acknowledge(final ClientMessage message) throws ActiveMQException
- {
+ public void acknowledge(final ClientMessage message) throws ActiveMQException {
ClientMessageInternal cmi = (ClientMessageInternal) message;
- if (ackIndividually)
- {
+ if (ackIndividually) {
individualAcknowledge(message);
}
- else
- {
+ else {
ackBytes += message.getEncodeSize();
- if (ackBytes >= ackBatchSize)
- {
+ if (ackBytes >= ackBatchSize) {
doAck(cmi);
}
- else
- {
+ else {
lastAckedMessage = cmi;
}
}
}
- public void individualAcknowledge(ClientMessage message) throws ActiveMQException
- {
- if (lastAckedMessage != null)
- {
+ public void individualAcknowledge(ClientMessage message) throws ActiveMQException {
+ if (lastAckedMessage != null) {
flushAcks();
}
session.individualAcknowledge(this, message);
}
- public void flushAcks() throws ActiveMQException
- {
- if (lastAckedMessage != null)
- {
+ public void flushAcks() throws ActiveMQException {
+ if (lastAckedMessage != null) {
doAck(lastAckedMessage);
}
}
@@ -837,18 +727,13 @@ public final class ClientConsumerImpl implements ClientConsumerInternal
*
* @param discountSlowConsumer When dealing with slowConsumers, we need to discount one credit that was pre-sent when the first receive was called. For largeMessage that is only done at the latest packet
*/
- public void flowControl(final int messageBytes, final boolean discountSlowConsumer) throws ActiveMQException
- {
- if (clientWindowSize >= 0)
- {
+ public void flowControl(final int messageBytes, final boolean discountSlowConsumer) throws ActiveMQException {
+ if (clientWindowSize >= 0) {
creditsToSend += messageBytes;
- if (creditsToSend >= clientWindowSize)
- {
- if (clientWindowSize == 0 && discountSlowConsumer)
- {
- if (isTrace)
- {
+ if (creditsToSend >= clientWindowSize) {
+ if (clientWindowSize == 0 && discountSlowConsumer) {
+ if (isTrace) {
ActiveMQClientLogger.LOGGER.trace("FlowControl::Sending " + creditsToSend + " -1, for slow consumer");
}
@@ -858,15 +743,12 @@ public final class ClientConsumerImpl implements ClientConsumerInternal
creditsToSend = 0;
- if (credits > 0)
- {
+ if (credits > 0) {
sendCredits(credits);
}
}
- else
- {
- if (ActiveMQClientLogger.LOGGER.isDebugEnabled())
- {
+ else {
+ if (ActiveMQClientLogger.LOGGER.isDebugEnabled()) {
ActiveMQClientLogger.LOGGER.debug("Sending " + messageBytes + " from flow-control");
}
@@ -874,8 +756,7 @@ public final class ClientConsumerImpl implements ClientConsumerInternal
creditsToSend = 0;
- if (credits > 0)
- {
+ if (credits > 0) {
sendCredits(credits);
}
}
@@ -898,66 +779,52 @@ public final class ClientConsumerImpl implements ClientConsumerInternal
/**
* Sending an initial credit for slow consumers
*/
- private void startSlowConsumer()
- {
- if (isTrace)
- {
+ private void startSlowConsumer() {
+ if (isTrace) {
ActiveMQClientLogger.LOGGER.trace("Sending 1 credit to start delivering of one message to slow consumer");
}
sendCredits(1);
- try
- {
+ try {
// We use an executor here to guarantee the messages will arrive in order.
// However when starting a slow consumer, we have to guarantee the credit was sent before we can perform any
// operations like forceDelivery
pendingFlowControl.await(10, TimeUnit.SECONDS);
}
- catch (InterruptedException e)
- {
+ catch (InterruptedException e) {
// will just ignore and forward the ignored
Thread.currentThread().interrupt();
}
}
- private void resetIfSlowConsumer()
- {
- if (clientWindowSize == 0)
- {
+ private void resetIfSlowConsumer() {
+ if (clientWindowSize == 0) {
sendCredits(0);
// If resetting a slow consumer, we need to wait the execution
final CountDownLatch latch = new CountDownLatch(1);
- flowControlExecutor.execute(new Runnable()
- {
- public void run()
- {
+ flowControlExecutor.execute(new Runnable() {
+ public void run() {
latch.countDown();
}
});
- try
- {
+ try {
latch.await(10, TimeUnit.SECONDS);
}
- catch (InterruptedException e)
- {
+ catch (InterruptedException e) {
throw new ActiveMQInterruptedException(e);
}
}
}
- private void requeueExecutors()
- {
- for (int i = 0; i < buffer.size(); i++)
- {
+ private void requeueExecutors() {
+ for (int i = 0; i < buffer.size(); i++) {
queueExecutor();
}
}
- private void queueExecutor()
- {
- if (isTrace)
- {
+ private void queueExecutor() {
+ if (isTrace) {
ActiveMQClientLogger.LOGGER.trace("Adding Runner on Executor for delivery");
}
@@ -967,34 +834,26 @@ public final class ClientConsumerImpl implements ClientConsumerInternal
/**
* @param credits
*/
- private void sendCredits(final int credits)
- {
+ private void sendCredits(final int credits) {
pendingFlowControl.countUp();
- flowControlExecutor.execute(new Runnable()
- {
- public void run()
- {
- try
- {
+ flowControlExecutor.execute(new Runnable() {
+ public void run() {
+ try {
sessionContext.sendConsumerCredits(ClientConsumerImpl.this, credits);
}
- finally
- {
+ finally {
pendingFlowControl.countDown();
}
}
});
}
- private void waitForOnMessageToComplete(boolean waitForOnMessage)
- {
- if (handler == null)
- {
+ private void waitForOnMessageToComplete(boolean waitForOnMessage) {
+ if (handler == null) {
return;
}
- if (!waitForOnMessage || Thread.currentThread() == onMessageThread)
- {
+ if (!waitForOnMessage || Thread.currentThread() == onMessageThread) {
// If called from inside onMessage then return immediately - otherwise would block
return;
}
@@ -1005,24 +864,19 @@ public final class ClientConsumerImpl implements ClientConsumerInternal
boolean ok = future.await(ClientConsumerImpl.CLOSE_TIMEOUT_MILLISECONDS);
- if (!ok)
- {
+ if (!ok) {
ActiveMQClientLogger.LOGGER.timeOutWaitingForProcessing();
}
}
- private void checkClosed() throws ActiveMQException
- {
- if (closed)
- {
+ private void checkClosed() throws ActiveMQException {
+ if (closed) {
throw ActiveMQClientMessageBundle.BUNDLE.consumerClosed();
}
}
- private void callOnMessage() throws Exception
- {
- if (closing || stopped)
- {
+ private void callOnMessage() throws Exception {
+ if (closing || stopped) {
return;
}
@@ -1038,43 +892,33 @@ public final class ClientConsumerImpl implements ClientConsumerInternal
// otherwise while this is executing and give NPE when calling onMessage
MessageHandler theHandler = handler;
- if (theHandler != null)
- {
- if (rateLimiter != null)
- {
+ if (theHandler != null) {
+ if (rateLimiter != null) {
rateLimiter.limit();
}
failedOver = false;
- synchronized (this)
- {
+ synchronized (this) {
message = buffer.poll();
}
- if (message != null)
- {
- if (message.containsProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE))
- {
+ if (message != null) {
+ if (message.containsProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE)) {
//Ignore, this could be a relic from a previous receiveImmediate();
return;
}
-
boolean expired = message.isExpired();
flowControlBeforeConsumption(message);
- if (!expired)
- {
- if (isTrace)
- {
+ if (!expired) {
+ if (isTrace) {
ActiveMQClientLogger.LOGGER.trace("Calling handler.onMessage");
}
- final ClassLoader originalLoader = AccessController.doPrivileged(new PrivilegedAction<ClassLoader>()
- {
- public ClassLoader run()
- {
+ final ClassLoader originalLoader = AccessController.doPrivileged(new PrivilegedAction<ClassLoader>() {
+ public ClassLoader run() {
ClassLoader originalLoader = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(contextClassLoader);
@@ -1084,49 +928,39 @@ public final class ClientConsumerImpl implements ClientConsumerInternal
});
onMessageThread = Thread.currentThread();
- try
- {
+ try {
theHandler.onMessage(message);
}
- finally
- {
- try
- {
- AccessController.doPrivileged(new PrivilegedAction<Object>()
- {
- public Object run()
- {
+ finally {
+ try {
+ AccessController.doPrivileged(new PrivilegedAction<Object>() {
+ public Object run() {
Thread.currentThread().setContextClassLoader(originalLoader);
return null;
}
});
}
- catch (Exception e)
- {
+ catch (Exception e) {
ActiveMQClientLogger.LOGGER.warn(e.getMessage(), e);
}
onMessageThread = null;
}
- if (isTrace)
- {
+ if (isTrace) {
ActiveMQClientLogger.LOGGER.trace("Handler.onMessage done");
}
- if (message.isLargeMessage())
- {
+ if (message.isLargeMessage()) {
message.discardBody();
}
}
- else
- {
+ else {
session.expire(this, message);
}
// If slow consumer, we need to send 1 credit to make sure we get another message
- if (clientWindowSize == 0)
- {
+ if (clientWindowSize == 0) {
startSlowConsumer();
}
}
@@ -1137,22 +971,17 @@ public final class ClientConsumerImpl implements ClientConsumerInternal
* @param message
* @throws ActiveMQException
*/
- private void flowControlBeforeConsumption(final ClientMessageInternal message) throws ActiveMQException
- {
+ private void flowControlBeforeConsumption(final ClientMessageInternal message) throws ActiveMQException {
// Chunk messages will execute the flow control while receiving the chunks
- if (message.getFlowControlSize() != 0)
- {
+ if (message.getFlowControlSize() != 0) {
// on large messages we should discount 1 on the first packets as we need continuity until the last packet
flowControl(message.getFlowControlSize(), !message.isLargeMessage());
}
}
- private void doCleanUp(final boolean sendCloseMessage) throws ActiveMQException
- {
- try
- {
- if (closed)
- {
+ private void doCleanUp(final boolean sendCloseMessage) throws ActiveMQException {
+ try {
+ if (closed) {
return;
}
@@ -1168,10 +997,8 @@ public final class ClientConsumerImpl implements ClientConsumerInternal
closed = true;
- synchronized (this)
- {
- if (receiverThread != null)
- {
+ synchronized (this) {
+ if (receiverThread != null) {
// Wake up any receive() thread that might be waiting
notify();
}
@@ -1185,26 +1012,22 @@ public final class ClientConsumerImpl implements ClientConsumerInternal
clearBuffer();
- if (sendCloseMessage)
- {
+ if (sendCloseMessage) {
sessionContext.closeConsumer(this);
}
}
- catch (Throwable t)
- {
+ catch (Throwable t) {
// Consumer close should always return without exception
}
session.removeConsumer(this);
}
- private void clearBuffer()
- {
+ private void clearBuffer() {
buffer.clear();
}
- private void doAck(final ClientMessageInternal message) throws ActiveMQException
- {
+ private void doAck(final ClientMessageInternal message) throws ActiveMQException {
ackBytes = 0;
lastAckedMessage = null;
@@ -1215,16 +1038,13 @@ public final class ClientConsumerImpl implements ClientConsumerInternal
// Inner classes
// --------------------------------------------------------------------------------
- private class Runner implements Runnable
- {
- public void run()
- {
- try
- {
+ private class Runner implements Runnable {
+
+ public void run() {
+ try {
callOnMessage();
}
- catch (Exception e)
- {
+ catch (Exception e) {
ActiveMQClientLogger.LOGGER.onMessageError(e);
lastException = e;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerInternal.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerInternal.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerInternal.java
index a61ccf6..82f3dbb 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerInternal.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerInternal.java
@@ -23,8 +23,8 @@ import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.utils.FutureLatch;
-public interface ClientConsumerInternal extends ClientConsumer
-{
+public interface ClientConsumerInternal extends ClientConsumer {
+
SimpleString getQueueName();
SimpleString getFilterString();
@@ -44,8 +44,8 @@ public interface ClientConsumerInternal extends ClientConsumer
/**
* To be called by things like MDBs during shutdown of the server
*
- * @throws ActiveMQException
* @param future
+ * @throws ActiveMQException
*/
Thread prepareForClose(FutureLatch future) throws ActiveMQException;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientLargeMessageImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientLargeMessageImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientLargeMessageImpl.java
index 4b94364..78c53c9 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientLargeMessageImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientLargeMessageImpl.java
@@ -31,8 +31,7 @@ import org.apache.activemq.artemis.utils.DataConstants;
* At the time of sending a regular Message is sent as we won't know the message is considered large
* until the buffer is filled up or the user set a streaming.
*/
-public final class ClientLargeMessageImpl extends ClientMessageImpl implements ClientLargeMessageInternal
-{
+public final class ClientLargeMessageImpl extends ClientMessageImpl implements ClientLargeMessageInternal {
// Used only when receiving large messages
private LargeMessageController largeMessageController;
@@ -42,33 +41,27 @@ public final class ClientLargeMessageImpl extends ClientMessageImpl implements C
/**
* @param largeMessageSize the largeMessageSize to set
*/
- public void setLargeMessageSize(long largeMessageSize)
- {
+ public void setLargeMessageSize(long largeMessageSize) {
this.largeMessageSize = largeMessageSize;
}
- public long getLargeMessageSize()
- {
+ public long getLargeMessageSize() {
return this.largeMessageSize;
}
// we only need this constructor as this is only used at decoding large messages on client
- public ClientLargeMessageImpl()
- {
+ public ClientLargeMessageImpl() {
super();
}
// Public --------------------------------------------------------
@Override
- public int getEncodeSize()
- {
- if (bodyBuffer != null)
- {
+ public int getEncodeSize() {
+ if (bodyBuffer != null) {
return super.getEncodeSize();
}
- else
- {
+ else {
return DataConstants.SIZE_INT + DataConstants.SIZE_INT + getHeadersAndPropertiesEncodeSize();
}
}
@@ -77,31 +70,25 @@ public final class ClientLargeMessageImpl extends ClientMessageImpl implements C
* @return the largeMessage
*/
@Override
- public boolean isLargeMessage()
- {
+ public boolean isLargeMessage() {
return true;
}
- public void setLargeMessageController(final LargeMessageController controller)
- {
+ public void setLargeMessageController(final LargeMessageController controller) {
largeMessageController = controller;
}
- public void checkCompletion() throws ActiveMQException
- {
+ public void checkCompletion() throws ActiveMQException {
checkBuffer();
}
@Override
- public ActiveMQBuffer getBodyBuffer()
- {
+ public ActiveMQBuffer getBodyBuffer() {
- try
- {
+ try {
checkBuffer();
}
- catch (ActiveMQException e)
- {
+ catch (ActiveMQException e) {
throw new RuntimeException(e.getMessage(), e);
}
@@ -109,39 +96,31 @@ public final class ClientLargeMessageImpl extends ClientMessageImpl implements C
}
@Override
- public int getBodySize()
- {
+ public int getBodySize() {
return getLongProperty(Message.HDR_LARGE_BODY_SIZE).intValue();
}
- public LargeMessageController getLargeMessageController()
- {
+ public LargeMessageController getLargeMessageController() {
return largeMessageController;
}
@Override
- public void saveToOutputStream(final OutputStream out) throws ActiveMQException
- {
- if (bodyBuffer != null)
- {
+ public void saveToOutputStream(final OutputStream out) throws ActiveMQException {
+ if (bodyBuffer != null) {
// The body was rebuilt on the client, so we need to behave as a regular message on this case
super.saveToOutputStream(out);
}
- else
- {
+ else {
largeMessageController.saveBuffer(out);
}
}
@Override
- public ClientLargeMessageImpl setOutputStream(final OutputStream out) throws ActiveMQException
- {
- if (bodyBuffer != null)
- {
+ public ClientLargeMessageImpl setOutputStream(final OutputStream out) throws ActiveMQException {
+ if (bodyBuffer != null) {
super.setOutputStream(out);
}
- else
- {
+ else {
largeMessageController.setOutputStream(out);
}
@@ -149,42 +128,33 @@ public final class ClientLargeMessageImpl extends ClientMessageImpl implements C
}
@Override
- public boolean waitOutputStreamCompletion(final long timeMilliseconds) throws ActiveMQException
- {
- if (bodyBuffer != null)
- {
+ public boolean waitOutputStreamCompletion(final long timeMilliseconds) throws ActiveMQException {
+ if (bodyBuffer != null) {
return super.waitOutputStreamCompletion(timeMilliseconds);
}
- else
- {
+ else {
return largeMessageController.waitCompletion(timeMilliseconds);
}
}
@Override
- public void discardBody()
- {
- if (bodyBuffer != null)
- {
+ public void discardBody() {
+ if (bodyBuffer != null) {
super.discardBody();
}
- else
- {
+ else {
largeMessageController.discardUnusedPackets();
}
}
- private void checkBuffer() throws ActiveMQException
- {
- if (bodyBuffer == null)
- {
+ private void checkBuffer() throws ActiveMQException {
+ if (bodyBuffer == null) {
long bodySize = this.largeMessageSize + BODY_OFFSET;
- if (bodySize > Integer.MAX_VALUE)
- {
+ if (bodySize > Integer.MAX_VALUE) {
bodySize = Integer.MAX_VALUE;
}
- createBody((int)bodySize);
+ createBody((int) bodySize);
bodyBuffer = new ResetLimitWrappedActiveMQBuffer(BODY_OFFSET, buffer, this);
@@ -194,24 +164,21 @@ public final class ClientLargeMessageImpl extends ClientMessageImpl implements C
// Inner classes -------------------------------------------------
- private static class ActiveMQOutputStream extends OutputStream
- {
+ private static class ActiveMQOutputStream extends OutputStream {
+
private final ActiveMQBuffer bufferOut;
- ActiveMQOutputStream(ActiveMQBuffer out)
- {
+ ActiveMQOutputStream(ActiveMQBuffer out) {
this.bufferOut = out;
}
@Override
- public void write(int b) throws IOException
- {
- bufferOut.writeByte((byte)(b & 0xff));
+ public void write(int b) throws IOException {
+ bufferOut.writeByte((byte) (b & 0xff));
}
}
- public void retrieveExistingData(ClientMessageInternal clMessage)
- {
+ public void retrieveExistingData(ClientMessageInternal clMessage) {
this.messageID = clMessage.getMessageID();
this.address = clMessage.getAddress();
this.setUserID(clMessage.getUserID());
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientLargeMessageInternal.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientLargeMessageInternal.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientLargeMessageInternal.java
index b9fd159..191a99b 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientLargeMessageInternal.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientLargeMessageInternal.java
@@ -16,9 +16,7 @@
*/
package org.apache.activemq.artemis.core.client.impl;
-
-public interface ClientLargeMessageInternal extends ClientMessageInternal
-{
+public interface ClientLargeMessageInternal extends ClientMessageInternal {
void setLargeMessageController(LargeMessageController controller);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java
index 78c443f..7668251 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java
@@ -33,29 +33,28 @@ import org.apache.activemq.artemis.core.message.impl.MessageImpl;
import org.apache.activemq.artemis.reader.MessageUtil;
/**
- *
* A ClientMessageImpl
*/
-public class ClientMessageImpl extends MessageImpl implements ClientMessageInternal
-{
+public class ClientMessageImpl extends MessageImpl implements ClientMessageInternal {
+
// added this constant here so that the client package have no dependency on JMS
public static final SimpleString REPLYTO_HEADER_NAME = MessageUtil.REPLYTO_HEADER_NAME;
-
private int deliveryCount;
private ClientConsumerInternal consumer;
private int flowControlSize = -1;
- /** Used on LargeMessages only */
+ /**
+ * Used on LargeMessages only
+ */
private InputStream bodyInputStream;
/*
* Constructor for when reading from remoting
*/
- public ClientMessageImpl()
- {
+ public ClientMessageImpl() {
}
/*
@@ -66,41 +65,34 @@ public class ClientMessageImpl extends MessageImpl implements ClientMessageInter
final long expiration,
final long timestamp,
final byte priority,
- final int initialMessageBufferSize)
- {
+ final int initialMessageBufferSize) {
super(type, durable, expiration, timestamp, priority, initialMessageBufferSize);
}
@Override
- public boolean isServerMessage()
- {
+ public boolean isServerMessage() {
return false;
}
@Override
- public void onReceipt(final ClientConsumerInternal consumer)
- {
+ public void onReceipt(final ClientConsumerInternal consumer) {
this.consumer = consumer;
}
@Override
- public ClientMessageImpl setDeliveryCount(final int deliveryCount)
- {
+ public ClientMessageImpl setDeliveryCount(final int deliveryCount) {
this.deliveryCount = deliveryCount;
return this;
}
@Override
- public int getDeliveryCount()
- {
+ public int getDeliveryCount() {
return deliveryCount;
}
@Override
- public ClientMessageImpl acknowledge() throws ActiveMQException
- {
- if (consumer != null)
- {
+ public ClientMessageImpl acknowledge() throws ActiveMQException {
+ if (consumer != null) {
consumer.acknowledge(this);
}
@@ -108,10 +100,8 @@ public class ClientMessageImpl extends MessageImpl implements ClientMessageInter
}
@Override
- public ClientMessageImpl individualAcknowledge() throws ActiveMQException
- {
- if (consumer != null)
- {
+ public ClientMessageImpl individualAcknowledge() throws ActiveMQException {
+ if (consumer != null) {
consumer.individualAcknowledge(this);
}
@@ -119,18 +109,15 @@ public class ClientMessageImpl extends MessageImpl implements ClientMessageInter
}
@Override
- public int getFlowControlSize()
- {
- if (flowControlSize < 0)
- {
+ public int getFlowControlSize() {
+ if (flowControlSize < 0) {
throw new IllegalStateException("Flow Control hasn't been set");
}
return flowControlSize;
}
@Override
- public void setFlowControlSize(final int flowControlSize)
- {
+ public void setFlowControlSize(final int flowControlSize) {
this.flowControlSize = flowControlSize;
}
@@ -138,69 +125,58 @@ public class ClientMessageImpl extends MessageImpl implements ClientMessageInter
* @return the largeMessage
*/
@Override
- public boolean isLargeMessage()
- {
+ public boolean isLargeMessage() {
return false;
}
@Override
- public boolean isCompressed()
- {
+ public boolean isCompressed() {
return properties.getBooleanProperty(Message.HDR_LARGE_COMPRESSED);
}
@Override
- public int getBodySize()
- {
+ public int getBodySize() {
return buffer.writerIndex() - buffer.readerIndex();
}
@Override
- public String toString()
- {
- return "ClientMessage[messageID=" + messageID + ", durable=" + durable + ", address=" + getAddress() + ",userID=" + (getUserID() != null ? getUserID() : "null") + ",properties=" + properties.toString() + "]";
+ public String toString() {
+ return "ClientMessage[messageID=" + messageID + ", durable=" + durable + ", address=" + getAddress() + ",userID=" + (getUserID() != null ? getUserID() : "null") + ",properties=" + properties.toString() + "]";
}
@Override
- public void saveToOutputStream(final OutputStream out) throws ActiveMQException
- {
- try
- {
+ public void saveToOutputStream(final OutputStream out) throws ActiveMQException {
+ try {
byte[] readBuffer = new byte[getBodySize()];
getBodyBuffer().readBytes(readBuffer);
out.write(readBuffer);
out.flush();
}
- catch (IOException e)
- {
+ catch (IOException e) {
throw ActiveMQClientMessageBundle.BUNDLE.errorSavingBody(e);
}
}
@Override
- public ClientMessageImpl setOutputStream(final OutputStream out) throws ActiveMQException
- {
+ public ClientMessageImpl setOutputStream(final OutputStream out) throws ActiveMQException {
saveToOutputStream(out);
return this;
}
@Override
- public boolean waitOutputStreamCompletion(final long timeMilliseconds) throws ActiveMQException
- {
+ public boolean waitOutputStreamCompletion(final long timeMilliseconds) throws ActiveMQException {
return true;
}
@Override
- public void discardBody()
- {
+ public void discardBody() {
}
/**
* @return the bodyInputStream
*/
@Override
- public InputStream getBodyInputStream()
- {
+ public InputStream getBodyInputStream() {
return bodyInputStream;
}
@@ -208,202 +184,170 @@ public class ClientMessageImpl extends MessageImpl implements ClientMessageInter
* @param bodyInputStream the bodyInputStream to set
*/
@Override
- public ClientMessageImpl setBodyInputStream(final InputStream bodyInputStream)
- {
+ public ClientMessageImpl setBodyInputStream(final InputStream bodyInputStream) {
this.bodyInputStream = bodyInputStream;
return this;
}
@Override
- public BodyEncoder getBodyEncoder() throws ActiveMQException
- {
+ public BodyEncoder getBodyEncoder() throws ActiveMQException {
return new DecodingContext();
}
@Override
- public ClientMessageImpl putBooleanProperty(final SimpleString key, final boolean value)
- {
+ public ClientMessageImpl putBooleanProperty(final SimpleString key, final boolean value) {
return (ClientMessageImpl) super.putBooleanProperty(key, value);
}
@Override
- public ClientMessageImpl putByteProperty(final SimpleString key, final byte value)
- {
+ public ClientMessageImpl putByteProperty(final SimpleString key, final byte value) {
return (ClientMessageImpl) super.putByteProperty(key, value);
}
@Override
- public ClientMessageImpl putBytesProperty(final SimpleString key, final byte[] value)
- {
+ public ClientMessageImpl putBytesProperty(final SimpleString key, final byte[] value) {
return (ClientMessageImpl) super.putBytesProperty(key, value);
}
@Override
- public ClientMessageImpl putCharProperty(SimpleString key, char value)
- {
+ public ClientMessageImpl putCharProperty(SimpleString key, char value) {
return (ClientMessageImpl) super.putCharProperty(key, value);
}
@Override
- public ClientMessageImpl putCharProperty(String key, char value)
- {
+ public ClientMessageImpl putCharProperty(String key, char value) {
return (ClientMessageImpl) super.putCharProperty(key, value);
}
@Override
- public ClientMessageImpl putShortProperty(final SimpleString key, final short value)
- {
+ public ClientMessageImpl putShortProperty(final SimpleString key, final short value) {
return (ClientMessageImpl) super.putShortProperty(key, value);
}
@Override
- public ClientMessageImpl putIntProperty(final SimpleString key, final int value)
- {
+ public ClientMessageImpl putIntProperty(final SimpleString key, final int value) {
return (ClientMessageImpl) super.putIntProperty(key, value);
}
@Override
- public ClientMessageImpl putLongProperty(final SimpleString key, final long value)
- {
+ public ClientMessageImpl putLongProperty(final SimpleString key, final long value) {
return (ClientMessageImpl) super.putLongProperty(key, value);
}
@Override
- public ClientMessageImpl putFloatProperty(final SimpleString key, final float value)
- {
+ public ClientMessageImpl putFloatProperty(final SimpleString key, final float value) {
return (ClientMessageImpl) super.putFloatProperty(key, value);
}
@Override
- public ClientMessageImpl putDoubleProperty(final SimpleString key, final double value)
- {
+ public ClientMessageImpl putDoubleProperty(final SimpleString key, final double value) {
return (ClientMessageImpl) super.putDoubleProperty(key, value);
}
@Override
- public ClientMessageImpl putStringProperty(final SimpleString key, final SimpleString value)
- {
+ public ClientMessageImpl putStringProperty(final SimpleString key, final SimpleString value) {
return (ClientMessageImpl) super.putStringProperty(key, value);
}
@Override
- public ClientMessageImpl putObjectProperty(final SimpleString key, final Object value) throws ActiveMQPropertyConversionException
- {
+ public ClientMessageImpl putObjectProperty(final SimpleString key,
+ final Object value) throws ActiveMQPropertyConversionException {
return (ClientMessageImpl) super.putObjectProperty(key, value);
}
@Override
- public ClientMessageImpl putObjectProperty(final String key, final Object value) throws ActiveMQPropertyConversionException
- {
+ public ClientMessageImpl putObjectProperty(final String key,
+ final Object value) throws ActiveMQPropertyConversionException {
return (ClientMessageImpl) super.putObjectProperty(key, value);
}
@Override
- public ClientMessageImpl putBooleanProperty(final String key, final boolean value)
- {
+ public ClientMessageImpl putBooleanProperty(final String key, final boolean value) {
return (ClientMessageImpl) super.putBooleanProperty(key, value);
}
@Override
- public ClientMessageImpl putByteProperty(final String key, final byte value)
- {
+ public ClientMessageImpl putByteProperty(final String key, final byte value) {
return (ClientMessageImpl) super.putByteProperty(key, value);
}
@Override
- public ClientMessageImpl putBytesProperty(final String key, final byte[] value)
- {
+ public ClientMessageImpl putBytesProperty(final String key, final byte[] value) {
return (ClientMessageImpl) super.putBytesProperty(key, value);
}
@Override
- public ClientMessageImpl putShortProperty(final String key, final short value)
- {
+ public ClientMessageImpl putShortProperty(final String key, final short value) {
return (ClientMessageImpl) super.putShortProperty(key, value);
}
@Override
- public ClientMessageImpl putIntProperty(final String key, final int value)
- {
+ public ClientMessageImpl putIntProperty(final String key, final int value) {
return (ClientMessageImpl) super.putIntProperty(key, value);
}
@Override
- public ClientMessageImpl putLongProperty(final String key, final long value)
- {
+ public ClientMessageImpl putLongProperty(final String key, final long value) {
return (ClientMessageImpl) super.putLongProperty(key, value);
}
@Override
- public ClientMessageImpl putFloatProperty(final String key, final float value)
- {
+ public ClientMessageImpl putFloatProperty(final String key, final float value) {
return (ClientMessageImpl) super.putFloatProperty(key, value);
}
@Override
- public ClientMessageImpl putDoubleProperty(final String key, final double value)
- {
+ public ClientMessageImpl putDoubleProperty(final String key, final double value) {
return (ClientMessageImpl) super.putDoubleProperty(key, value);
}
@Override
- public ClientMessageImpl putStringProperty(final String key, final String value)
- {
+ public ClientMessageImpl putStringProperty(final String key, final String value) {
return (ClientMessageImpl) super.putStringProperty(key, value);
}
@Override
- public ClientMessageImpl writeBodyBufferBytes(byte[] bytes)
- {
+ public ClientMessageImpl writeBodyBufferBytes(byte[] bytes) {
return (ClientMessageImpl) super.writeBodyBufferBytes(bytes);
}
@Override
- public ClientMessageImpl writeBodyBufferString(String string)
- {
+ public ClientMessageImpl writeBodyBufferString(String string) {
return (ClientMessageImpl) super.writeBodyBufferString(string);
}
- private final class DecodingContext implements BodyEncoder
- {
- public DecodingContext()
- {
+ private final class DecodingContext implements BodyEncoder {
+
+ public DecodingContext() {
}
@Override
- public void open()
- {
+ public void open() {
getBodyBuffer().readerIndex(0);
}
@Override
- public void close()
- {
+ public void close() {
}
@Override
- public long getLargeBodySize()
- {
- if (isLargeMessage())
- {
+ public long getLargeBodySize() {
+ if (isLargeMessage()) {
return getBodyBuffer().writerIndex();
}
- else
- {
+ else {
return getBodyBuffer().writerIndex() - BODY_OFFSET;
}
}
@Override
- public int encode(final ByteBuffer bufferRead) throws ActiveMQException
- {
+ public int encode(final ByteBuffer bufferRead) throws ActiveMQException {
ActiveMQBuffer buffer1 = ActiveMQBuffers.wrappedBuffer(bufferRead);
return encode(buffer1, bufferRead.capacity());
}
@Override
- public int encode(final ActiveMQBuffer bufferOut, final int size)
- {
+ public int encode(final ActiveMQBuffer bufferOut, final int size) {
byte[] bytes = new byte[size];
getWholeBuffer().readBytes(bytes);
bufferOut.writeBytes(bytes, 0, size);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageInternal.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageInternal.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageInternal.java
index 97cec9c..07d4719 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageInternal.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageInternal.java
@@ -20,15 +20,18 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.utils.TypedProperties;
-public interface ClientMessageInternal extends ClientMessage
-{
+public interface ClientMessageInternal extends ClientMessage {
TypedProperties getProperties();
- /** Size used for FlowControl */
+ /**
+ * Size used for FlowControl
+ */
int getFlowControlSize();
- /** Size used for FlowControl */
+ /**
+ * Size used for FlowControl
+ */
void setFlowControlSize(int flowControlSize);
void setAddressTransient(SimpleString address);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManager.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManager.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManager.java
index dd53493..3c10e1a 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManager.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManager.java
@@ -19,8 +19,8 @@ package org.apache.activemq.artemis.core.client.impl;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.spi.core.remoting.SessionContext;
-public interface ClientProducerCreditManager
-{
+public interface ClientProducerCreditManager {
+
ClientProducerCredits getCredits(SimpleString address, boolean anon, SessionContext context);
void returnCredits(SimpleString address);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManagerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManagerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManagerImpl.java
index ddfb7d4..30c8376 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManagerImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManagerImpl.java
@@ -23,8 +23,8 @@ import java.util.Map;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.spi.core.remoting.SessionContext;
-public class ClientProducerCreditManagerImpl implements ClientProducerCreditManager
-{
+public class ClientProducerCreditManagerImpl implements ClientProducerCreditManager {
+
public static final int MAX_UNREFERENCED_CREDITS_CACHE_SIZE = 1000;
private final Map<SimpleString, ClientProducerCredits> producerCredits = new LinkedHashMap<SimpleString, ClientProducerCredits>();
@@ -35,30 +35,26 @@ public class ClientProducerCreditManagerImpl implements ClientProducerCreditMana
private int windowSize;
- public ClientProducerCreditManagerImpl(final ClientSessionInternal session, final int windowSize)
- {
+ public ClientProducerCreditManagerImpl(final ClientSessionInternal session, final int windowSize) {
this.session = session;
this.windowSize = windowSize;
}
- public synchronized ClientProducerCredits getCredits(final SimpleString address, final boolean anon, SessionContext context)
- {
- if (windowSize == -1)
- {
+ public synchronized ClientProducerCredits getCredits(final SimpleString address,
+ final boolean anon,
+ SessionContext context) {
+ if (windowSize == -1) {
return ClientProducerCreditsNoFlowControl.instance;
}
- else
- {
+ else {
boolean needInit = false;
ClientProducerCredits credits;
- synchronized (this)
- {
+ synchronized (this) {
credits = producerCredits.get(address);
- if (credits == null)
- {
+ if (credits == null) {
// Doesn't need to be fair since session is single threaded
credits = new ClientProducerCreditsImpl(session, address, windowSize);
needInit = true;
@@ -66,15 +62,13 @@ public class ClientProducerCreditManagerImpl implements ClientProducerCreditMana
producerCredits.put(address, credits);
}
- if (!anon)
- {
+ if (!anon) {
credits.incrementRefCount();
// Remove from anon credits (if there)
unReferencedCredits.remove(address);
}
- else
- {
+ else {
addToUnReferencedCache(address, credits);
}
}
@@ -82,8 +76,7 @@ public class ClientProducerCreditManagerImpl implements ClientProducerCreditMana
// The init is done outside of the lock
// otherwise packages may arrive with flow control
// while this is still sending requests causing a dead lock
- if (needInit)
- {
+ if (needInit) {
credits.init(context);
}
@@ -91,50 +84,40 @@ public class ClientProducerCreditManagerImpl implements ClientProducerCreditMana
}
}
- public synchronized void returnCredits(final SimpleString address)
- {
+ public synchronized void returnCredits(final SimpleString address) {
ClientProducerCredits credits = producerCredits.get(address);
- if (credits != null && credits.decrementRefCount() == 0)
- {
+ if (credits != null && credits.decrementRefCount() == 0) {
addToUnReferencedCache(address, credits);
}
}
- public synchronized void receiveCredits(final SimpleString address, final int credits)
- {
+ public synchronized void receiveCredits(final SimpleString address, final int credits) {
ClientProducerCredits cr = producerCredits.get(address);
- if (cr != null)
- {
+ if (cr != null) {
cr.receiveCredits(credits);
}
}
- public synchronized void receiveFailCredits(final SimpleString address, int credits)
- {
+ public synchronized void receiveFailCredits(final SimpleString address, int credits) {
ClientProducerCredits cr = producerCredits.get(address);
- if (cr != null)
- {
+ if (cr != null) {
cr.receiveFailCredits(credits);
}
}
- public synchronized void reset()
- {
- for (ClientProducerCredits credits : producerCredits.values())
- {
+ public synchronized void reset() {
+ for (ClientProducerCredits credits : producerCredits.values()) {
credits.reset();
}
}
- public synchronized void close()
- {
+ public synchronized void close() {
windowSize = -1;
- for (ClientProducerCredits credits : producerCredits.values())
- {
+ for (ClientProducerCredits credits : producerCredits.values()) {
credits.close();
}
@@ -143,22 +126,18 @@ public class ClientProducerCreditManagerImpl implements ClientProducerCreditMana
unReferencedCredits.clear();
}
- public synchronized int creditsMapSize()
- {
+ public synchronized int creditsMapSize() {
return producerCredits.size();
}
- public synchronized int unReferencedCreditsSize()
- {
+ public synchronized int unReferencedCreditsSize() {
return unReferencedCredits.size();
}
- private void addToUnReferencedCache(final SimpleString address, final ClientProducerCredits credits)
- {
+ private void addToUnReferencedCache(final SimpleString address, final ClientProducerCredits credits) {
unReferencedCredits.put(address, credits);
- if (unReferencedCredits.size() > MAX_UNREFERENCED_CREDITS_CACHE_SIZE)
- {
+ if (unReferencedCredits.size() > MAX_UNREFERENCED_CREDITS_CACHE_SIZE) {
// Remove the oldest entry
Iterator<Map.Entry<SimpleString, ClientProducerCredits>> iter = unReferencedCredits.entrySet().iterator();
@@ -171,8 +150,7 @@ public class ClientProducerCreditManagerImpl implements ClientProducerCreditMana
}
}
- private void removeEntry(final SimpleString address, final ClientProducerCredits credits)
- {
+ private void removeEntry(final SimpleString address, final ClientProducerCredits credits) {
producerCredits.remove(address);
credits.releaseOutstanding();
@@ -180,51 +158,40 @@ public class ClientProducerCreditManagerImpl implements ClientProducerCreditMana
credits.close();
}
+ static class ClientProducerCreditsNoFlowControl implements ClientProducerCredits {
- static class ClientProducerCreditsNoFlowControl implements ClientProducerCredits
- {
static ClientProducerCreditsNoFlowControl instance = new ClientProducerCreditsNoFlowControl();
- public void acquireCredits(int credits) throws InterruptedException
- {
+ public void acquireCredits(int credits) throws InterruptedException {
}
- public void receiveCredits(int credits)
- {
+ public void receiveCredits(int credits) {
}
- public void receiveFailCredits(int credits)
- {
+ public void receiveFailCredits(int credits) {
}
- public boolean isBlocked()
- {
+ public boolean isBlocked() {
return false;
}
- public void init(SessionContext ctx)
- {
+ public void init(SessionContext ctx) {
}
- public void reset()
- {
+ public void reset() {
}
- public void close()
- {
+ public void close() {
}
- public void incrementRefCount()
- {
+ public void incrementRefCount() {
}
- public int decrementRefCount()
- {
+ public int decrementRefCount() {
return 1;
}
- public void releaseOutstanding()
- {
+ public void releaseOutstanding() {
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCredits.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCredits.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCredits.java
index 27e1528..443d7e5 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCredits.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCredits.java
@@ -19,8 +19,8 @@ package org.apache.activemq.artemis.core.client.impl;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.spi.core.remoting.SessionContext;
-public interface ClientProducerCredits
-{
+public interface ClientProducerCredits {
+
void acquireCredits(int credits) throws InterruptedException, ActiveMQException;
void receiveCredits(int credits);