You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2015/08/10 17:13:38 UTC
[39/53] [abbrv] [partial] activemq-artemis git commit: automatic
checkstyle change
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditsImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditsImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditsImpl.java
index fa4ad16..206f188 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditsImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditsImpl.java
@@ -25,8 +25,8 @@ import org.apache.activemq.artemis.spi.core.remoting.SessionContext;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
-public class ClientProducerCreditsImpl implements ClientProducerCredits
-{
+public class ClientProducerCreditsImpl implements ClientProducerCredits {
+
private final Semaphore semaphore;
private final int windowSize;
@@ -51,8 +51,7 @@ public class ClientProducerCreditsImpl implements ClientProducerCredits
public ClientProducerCreditsImpl(final ClientSessionInternal session,
final SimpleString address,
- final int windowSize)
- {
+ final int windowSize) {
this.session = session;
this.address = address;
@@ -64,8 +63,7 @@ public class ClientProducerCreditsImpl implements ClientProducerCredits
semaphore = new Semaphore(0, false);
}
- public void init(SessionContext sessionContext)
- {
+ public void init(SessionContext sessionContext) {
// We initial request twice as many credits as we request in subsequent requests
// This allows the producer to keep sending as more arrive, minimising pauses
checkCredits(windowSize);
@@ -75,50 +73,38 @@ public class ClientProducerCreditsImpl implements ClientProducerCredits
this.sessionContext.linkFlowControl(address, this);
}
- public void acquireCredits(final int credits) throws InterruptedException, ActiveMQException
- {
+ public void acquireCredits(final int credits) throws InterruptedException, ActiveMQException {
checkCredits(credits);
-
boolean tryAcquire;
- synchronized (this)
- {
+ synchronized (this) {
tryAcquire = semaphore.tryAcquire(credits);
}
- if (!tryAcquire)
- {
- if (!closed)
- {
+ if (!tryAcquire) {
+ if (!closed) {
this.blocked = true;
- try
- {
- while (!semaphore.tryAcquire(credits, 10, TimeUnit.SECONDS))
- {
+ try {
+ while (!semaphore.tryAcquire(credits, 10, TimeUnit.SECONDS)) {
// I'm using string concatenation here in case address is null
// better getting a "null" string than a NPE
ActiveMQClientLogger.LOGGER.outOfCreditOnFlowControl("" + address);
}
}
- finally
- {
+ finally {
this.blocked = false;
}
}
}
-
- synchronized (this)
- {
+ synchronized (this) {
pendingCredits -= credits;
}
// check to see if the blocking mode is FAIL on the server
- synchronized (this)
- {
- if (serverRespondedWithFail)
- {
+ synchronized (this) {
+ if (serverRespondedWithFail) {
serverRespondedWithFail = false;
// remove existing credits to force the client to ask the server for more on the next send
@@ -131,35 +117,29 @@ public class ClientProducerCreditsImpl implements ClientProducerCredits
}
}
- public boolean isBlocked()
- {
+ public boolean isBlocked() {
return blocked;
}
- public int getBalance()
- {
+ public int getBalance() {
return semaphore.availablePermits();
}
- public void receiveCredits(final int credits)
- {
- synchronized (this)
- {
+ public void receiveCredits(final int credits) {
+ synchronized (this) {
arriving -= credits;
}
semaphore.release(credits);
}
- public void receiveFailCredits(final int credits)
- {
+ public void receiveFailCredits(final int credits) {
serverRespondedWithFail = true;
// receive credits like normal to keep the sender from blocking
receiveCredits(credits);
}
- public synchronized void reset()
- {
+ public synchronized void reset() {
// Any pendingCredits credits from before failover won't arrive, so we re-initialise
semaphore.drainPermits();
@@ -174,39 +154,32 @@ public class ClientProducerCreditsImpl implements ClientProducerCredits
checkCredits(Math.max(windowSize * 2, beforeFailure));
}
- public void close()
- {
+ public void close() {
// Closing a producer that is blocking should make it return
closed = true;
semaphore.release(Integer.MAX_VALUE / 2);
}
- public synchronized void incrementRefCount()
- {
+ public synchronized void incrementRefCount() {
refCount++;
}
- public synchronized int decrementRefCount()
- {
+ public synchronized int decrementRefCount() {
return --refCount;
}
- public synchronized void releaseOutstanding()
- {
+ public synchronized void releaseOutstanding() {
semaphore.drainPermits();
}
- private void checkCredits(final int credits)
- {
+ private void checkCredits(final int credits) {
int needed = Math.max(credits, windowSize);
int toRequest = -1;
- synchronized (this)
- {
- if (semaphore.availablePermits() + arriving < needed)
- {
+ synchronized (this) {
+ if (semaphore.availablePermits() + arriving < needed) {
toRequest = needed - arriving;
pendingCredits += toRequest;
@@ -214,14 +187,12 @@ public class ClientProducerCreditsImpl implements ClientProducerCredits
}
}
- if (toRequest != -1)
- {
+ if (toRequest != -1) {
requestCredits(toRequest);
}
}
- private void requestCredits(final int credits)
- {
+ private void requestCredits(final int credits) {
session.sendProducerCreditsMessage(credits, address);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
index ae2981a..6247bfa 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
@@ -40,8 +40,8 @@ import org.apache.activemq.artemis.utils.UUIDGenerator;
/**
* The client-side Producer.
*/
-public class ClientProducerImpl implements ClientProducerInternal
-{
+public class ClientProducerImpl implements ClientProducerInternal {
+
private final SimpleString address;
private final ClientSessionInternal session;
@@ -76,8 +76,7 @@ public class ClientProducerImpl implements ClientProducerInternal
final boolean autoGroup,
final SimpleString groupID,
final int minLargeMessageSize,
- final SessionContext sessionContext)
- {
+ final SessionContext sessionContext) {
this.sessionContext = sessionContext;
this.session = session;
@@ -90,129 +89,107 @@ public class ClientProducerImpl implements ClientProducerInternal
this.blockOnDurableSend = blockOnDurableSend;
- if (autoGroup)
- {
+ if (autoGroup) {
this.groupID = UUIDGenerator.getInstance().generateSimpleStringUUID();
}
- else
- {
+ else {
this.groupID = groupID;
}
this.minLargeMessageSize = minLargeMessageSize;
- if (address != null)
- {
+ if (address != null) {
producerCredits = session.getCredits(address, false);
}
- else
- {
+ else {
producerCredits = null;
}
}
// ClientProducer implementation ----------------------------------------------------------------
- public SimpleString getAddress()
- {
+ public SimpleString getAddress() {
return address;
}
- public void send(final Message msg) throws ActiveMQException
- {
+ public void send(final Message msg) throws ActiveMQException {
checkClosed();
doSend(null, msg, null, false);
}
- public void send(final SimpleString address1, final Message msg) throws ActiveMQException
- {
+ public void send(final SimpleString address1, final Message msg) throws ActiveMQException {
checkClosed();
doSend(address1, msg, null, false);
}
- public void send(final String address1, final Message message) throws ActiveMQException
- {
+ public void send(final String address1, final Message message) throws ActiveMQException {
send(SimpleString.toSimpleString(address1), message);
}
@Override
- public void send(SimpleString address1, Message message, SendAcknowledgementHandler handler) throws ActiveMQException
- {
+ public void send(SimpleString address1,
+ Message message,
+ SendAcknowledgementHandler handler) throws ActiveMQException {
checkClosed();
boolean confirmationWindowEnabled = session.isConfirmationWindowEnabled();
- if (confirmationWindowEnabled)
- {
+ if (confirmationWindowEnabled) {
doSend(address1, message, handler, true);
}
- else
- {
+ else {
doSend(address1, message, null, true);
- if (handler != null)
- {
+ if (handler != null) {
session.scheduleConfirmation(handler, message);
}
}
}
@Override
- public void send(Message message, SendAcknowledgementHandler handler) throws ActiveMQException
- {
+ public void send(Message message, SendAcknowledgementHandler handler) throws ActiveMQException {
send(null, message, handler);
}
- public synchronized void close() throws ActiveMQException
- {
- if (closed)
- {
+ public synchronized void close() throws ActiveMQException {
+ if (closed) {
return;
}
doCleanup();
}
- public void cleanUp()
- {
- if (closed)
- {
+ public void cleanUp() {
+ if (closed) {
return;
}
doCleanup();
}
- public boolean isClosed()
- {
+ public boolean isClosed() {
return closed;
}
- public boolean isBlockOnDurableSend()
- {
+ public boolean isBlockOnDurableSend() {
return blockOnDurableSend;
}
- public boolean isBlockOnNonDurableSend()
- {
+ public boolean isBlockOnNonDurableSend() {
return blockOnNonDurableSend;
}
- public int getMaxRate()
- {
+ public int getMaxRate() {
return rateLimiter == null ? -1 : rateLimiter.getRate();
}
// Public ---------------------------------------------------------------------------------------
- public ClientProducerCredits getProducerCredits()
- {
+ public ClientProducerCredits getProducerCredits() {
return producerCredits;
}
- private void doCleanup()
- {
- if (address != null)
- {
+ private void doCleanup() {
+ if (address != null) {
session.returnCredits(address);
}
@@ -221,13 +198,13 @@ public class ClientProducerImpl implements ClientProducerInternal
closed = true;
}
- private void doSend(final SimpleString address1, final Message msg, final SendAcknowledgementHandler handler,
- final boolean forceAsync) throws ActiveMQException
- {
+ private void doSend(final SimpleString address1,
+ final Message msg,
+ final SendAcknowledgementHandler handler,
+ final boolean forceAsync) throws ActiveMQException {
session.startCall();
- try
- {
+ try {
MessageInternal msgI = (MessageInternal) msg;
ClientProducerCredits theCredits;
@@ -237,52 +214,42 @@ public class ClientProducerImpl implements ClientProducerInternal
// If it's a server's message, it means this is being done through the bridge or some special consumer on the
// server's on which case we can't' convert the message into large at the servers
if (sessionContext.supportsLargeMessage() && (msgI.getBodyInputStream() != null || msgI.isLargeMessage() ||
- msgI.getBodyBuffer().writerIndex() > minLargeMessageSize && !msgI.isServerMessage()))
- {
+ msgI.getBodyBuffer().writerIndex() > minLargeMessageSize && !msgI.isServerMessage())) {
isLarge = true;
}
- else
- {
+ else {
isLarge = false;
}
- if (address1 != null)
- {
- if (!isLarge)
- {
+ if (address1 != null) {
+ if (!isLarge) {
session.setAddress(msg, address1);
}
- else
- {
+ else {
msg.setAddress(address1);
}
// Anonymous
theCredits = session.getCredits(address1, true);
}
- else
- {
- if (!isLarge)
- {
+ else {
+ if (!isLarge) {
session.setAddress(msg, this.address);
}
- else
- {
+ else {
msg.setAddress(this.address);
}
theCredits = producerCredits;
}
- if (rateLimiter != null)
- {
+ if (rateLimiter != null) {
// Rate flow control
rateLimiter.limit();
}
- if (groupID != null)
- {
+ if (groupID != null) {
msgI.putStringProperty(Message.HDR_GROUP_ID, groupID);
}
@@ -292,25 +259,23 @@ public class ClientProducerImpl implements ClientProducerInternal
session.workDone();
- if (isLarge)
- {
+ if (isLarge) {
largeMessageSend(sendBlocking, msgI, theCredits, handler);
}
- else
- {
+ else {
sendRegularMessage(msgI, sendBlocking, theCredits, handler);
}
}
- finally
- {
+ finally {
session.endCall();
}
}
- private void sendRegularMessage(final MessageInternal msgI, final boolean sendBlocking, final ClientProducerCredits theCredits, final SendAcknowledgementHandler handler) throws ActiveMQException
- {
- try
- {
+ private void sendRegularMessage(final MessageInternal msgI,
+ final boolean sendBlocking,
+ final ClientProducerCredits theCredits,
+ final SendAcknowledgementHandler handler) throws ActiveMQException {
+ try {
// This will block if credits are not available
// Note, that for a large message, the encode size only includes the properties + headers
@@ -321,18 +286,15 @@ public class ClientProducerImpl implements ClientProducerInternal
theCredits.acquireCredits(creditSize);
}
- catch (InterruptedException e)
- {
+ catch (InterruptedException e) {
throw new ActiveMQInterruptedException(e);
}
sessionContext.sendFullMessage(msgI, sendBlocking, handler, address);
}
- private void checkClosed() throws ActiveMQException
- {
- if (closed)
- {
+ private void checkClosed() throws ActiveMQException {
+ if (closed) {
throw ActiveMQClientMessageBundle.BUNDLE.producerClosed();
}
}
@@ -344,51 +306,45 @@ public class ClientProducerImpl implements ClientProducerInternal
* @param handler
* @throws ActiveMQException
*/
- private void largeMessageSend(final boolean sendBlocking, final MessageInternal msgI,
- final ClientProducerCredits credits, SendAcknowledgementHandler handler) throws ActiveMQException
- {
+ private void largeMessageSend(final boolean sendBlocking,
+ final MessageInternal msgI,
+ final ClientProducerCredits credits,
+ SendAcknowledgementHandler handler) throws ActiveMQException {
int headerSize = msgI.getHeadersAndPropertiesEncodeSize();
- if (msgI.getHeadersAndPropertiesEncodeSize() >= minLargeMessageSize)
- {
+ if (msgI.getHeadersAndPropertiesEncodeSize() >= minLargeMessageSize) {
throw ActiveMQClientMessageBundle.BUNDLE.headerSizeTooBig(headerSize);
}
// msg.getBody() could be Null on LargeServerMessage
- if (msgI.getBodyInputStream() == null && msgI.getWholeBuffer() != null)
- {
+ if (msgI.getBodyInputStream() == null && msgI.getWholeBuffer() != null) {
msgI.getWholeBuffer().readerIndex(0);
}
InputStream input;
- if (msgI.isServerMessage())
- {
+ if (msgI.isServerMessage()) {
largeMessageSendServer(sendBlocking, msgI, credits, handler);
}
- else if ((input = msgI.getBodyInputStream()) != null)
- {
+ else if ((input = msgI.getBodyInputStream()) != null) {
largeMessageSendStreamed(sendBlocking, msgI, input, credits, handler);
}
- else
- {
+ else {
largeMessageSendBuffered(sendBlocking, msgI, credits, handler);
}
}
- private void sendInitialLargeMessageHeader(MessageInternal msgI, ClientProducerCredits credits) throws ActiveMQException
- {
+ private void sendInitialLargeMessageHeader(MessageInternal msgI,
+ ClientProducerCredits credits) throws ActiveMQException {
int creditsUsed = sessionContext.sendInitialChunkOnLargeMessage(msgI);
// On the case of large messages we tried to send credits before but we would starve otherwise
// we may find a way to improve the logic and always acquire the credits before
// but that's the way it's been tested and been working ATM
- try
- {
+ try {
credits.acquireCredits(creditsUsed);
}
- catch (InterruptedException e)
- {
+ catch (InterruptedException e) {
throw new ActiveMQInterruptedException(e);
}
}
@@ -402,9 +358,10 @@ public class ClientProducerImpl implements ClientProducerInternal
* @param handler
* @throws ActiveMQException
*/
- private void largeMessageSendServer(final boolean sendBlocking, final MessageInternal msgI,
- final ClientProducerCredits credits, SendAcknowledgementHandler handler) throws ActiveMQException
- {
+ private void largeMessageSendServer(final boolean sendBlocking,
+ final MessageInternal msgI,
+ final ClientProducerCredits credits,
+ SendAcknowledgementHandler handler) throws ActiveMQException {
sendInitialLargeMessageHeader(msgI, credits);
BodyEncoder context = msgI.getBodyEncoder();
@@ -412,11 +369,9 @@ public class ClientProducerImpl implements ClientProducerInternal
final long bodySize = context.getLargeBodySize();
context.open();
- try
- {
+ try {
- for (int pos = 0; pos < bodySize; )
- {
+ for (int pos = 0; pos < bodySize; ) {
final boolean lastChunk;
final int chunkLength = Math.min((int) (bodySize - pos), minLargeMessageSize);
@@ -432,18 +387,15 @@ public class ClientProducerImpl implements ClientProducerInternal
int creditsUsed = sessionContext.sendLargeMessageChunk(msgI, -1, sendBlocking, lastChunk, bodyBuffer.toByteBuffer().array(), messageHandler);
- try
- {
+ try {
credits.acquireCredits(creditsUsed);
}
- catch (InterruptedException e)
- {
+ catch (InterruptedException e) {
throw new ActiveMQInterruptedException(e);
}
}
}
- finally
- {
+ finally {
context.close();
}
}
@@ -454,13 +406,12 @@ public class ClientProducerImpl implements ClientProducerInternal
* @param handler
* @throws ActiveMQException
*/
- private void
- largeMessageSendBuffered(final boolean sendBlocking, final MessageInternal msgI,
- final ClientProducerCredits credits, SendAcknowledgementHandler handler) throws ActiveMQException
- {
+ private void largeMessageSendBuffered(final boolean sendBlocking,
+ final MessageInternal msgI,
+ final ClientProducerCredits credits,
+ SendAcknowledgementHandler handler) throws ActiveMQException {
msgI.getBodyBuffer().readerIndex(0);
- largeMessageSendStreamed(sendBlocking, msgI, new ActiveMQBufferInputStream(msgI.getBodyBuffer()), credits,
- handler);
+ largeMessageSendStreamed(sendBlocking, msgI, new ActiveMQBufferInputStream(msgI.getBodyBuffer()), credits, handler);
}
/**
@@ -470,10 +421,11 @@ public class ClientProducerImpl implements ClientProducerInternal
* @param credits
* @throws ActiveMQException
*/
- private void largeMessageSendStreamed(final boolean sendBlocking, final MessageInternal msgI,
- final InputStream inputStreamParameter, final ClientProducerCredits credits,
- SendAcknowledgementHandler handler) throws ActiveMQException
- {
+ private void largeMessageSendStreamed(final boolean sendBlocking,
+ final MessageInternal msgI,
+ final InputStream inputStreamParameter,
+ final ClientProducerCredits credits,
+ SendAcknowledgementHandler handler) throws ActiveMQException {
boolean lastPacket = false;
InputStream input = inputStreamParameter;
@@ -484,8 +436,7 @@ public class ClientProducerImpl implements ClientProducerInternal
DeflaterReader deflaterReader = null;
- if (session.isCompressLargeMessages())
- {
+ if (session.isCompressLargeMessages()) {
msgI.putBooleanProperty(Message.HDR_LARGE_COMPRESSED, true);
deflaterReader = new DeflaterReader(inputStreamParameter, messageSize);
input = deflaterReader;
@@ -495,46 +446,38 @@ public class ClientProducerImpl implements ClientProducerInternal
boolean headerSent = false;
- while (!lastPacket)
- {
+ while (!lastPacket) {
byte[] buff = new byte[minLargeMessageSize];
int pos = 0;
- do
- {
+ do {
int numberOfBytesRead;
int wanted = minLargeMessageSize - pos;
- try
- {
+ try {
numberOfBytesRead = input.read(buff, pos, wanted);
}
- catch (IOException e)
- {
+ catch (IOException e) {
throw ActiveMQClientMessageBundle.BUNDLE.errorReadingBody(e);
}
- if (numberOfBytesRead == -1)
- {
+ if (numberOfBytesRead == -1) {
lastPacket = true;
break;
}
pos += numberOfBytesRead;
- }
- while (pos < minLargeMessageSize);
+ } while (pos < minLargeMessageSize);
totalSize += pos;
final SessionSendContinuationMessage chunk;
- if (lastPacket)
- {
- if (!session.isCompressLargeMessages())
- {
+ if (lastPacket) {
+ if (!session.isCompressLargeMessages()) {
messageSize.set(totalSize);
}
@@ -546,8 +489,7 @@ public class ClientProducerImpl implements ClientProducerInternal
buff = buff2;
// This is the case where the message is being converted as a regular message
- if (!headerSent && session.isCompressLargeMessages() && buff2.length < minLargeMessageSize)
- {
+ if (!headerSent && session.isCompressLargeMessages() && buff2.length < minLargeMessageSize) {
msgI.getBodyBuffer().resetReaderIndex();
msgI.getBodyBuffer().resetWriterIndex();
msgI.putLongProperty(Message.HDR_LARGE_BODY_SIZE, deflaterReader.getTotalSize());
@@ -556,51 +498,40 @@ public class ClientProducerImpl implements ClientProducerInternal
sendRegularMessage(msgI, sendBlocking, credits, handler);
return;
}
- else
- {
- if (!headerSent)
- {
+ else {
+ if (!headerSent) {
headerSent = true;
sendInitialLargeMessageHeader(msgI, credits);
}
int creditsSent = sessionContext.sendLargeMessageChunk(msgI, messageSize.get(), sendBlocking, true, buff, handler);
- try
- {
+ try {
credits.acquireCredits(creditsSent);
}
- catch (InterruptedException e)
- {
+ catch (InterruptedException e) {
throw new ActiveMQInterruptedException(e);
}
}
}
- else
- {
- if (!headerSent)
- {
+ else {
+ if (!headerSent) {
headerSent = true;
sendInitialLargeMessageHeader(msgI, credits);
}
-
int creditsSent = sessionContext.sendLargeMessageChunk(msgI, messageSize.get(), sendBlocking, false, buff, handler);
- try
- {
+ try {
credits.acquireCredits(creditsSent);
}
- catch (InterruptedException e)
- {
+ catch (InterruptedException e) {
throw new ActiveMQInterruptedException(e);
}
}
}
- try
- {
+ try {
input.close();
}
- catch (IOException e)
- {
+ catch (IOException e) {
throw ActiveMQClientMessageBundle.BUNDLE.errorClosingLargeMessage(e);
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerInternal.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerInternal.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerInternal.java
index 173821d..6190885 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerInternal.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerInternal.java
@@ -19,11 +19,10 @@ package org.apache.activemq.artemis.core.client.impl;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
/**
- *
* A ClientProducerInternal
*/
-public interface ClientProducerInternal extends ClientProducer
-{
+public interface ClientProducerInternal extends ClientProducer {
+
void cleanUp();
ClientProducerCredits getProducerCredits();