You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2015/08/10 17:13:34 UTC
[35/53] [abbrv] [partial] activemq-artemis git commit: automatic
checkstyle change
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java
index d3b2d1a..7f44cff 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java
@@ -48,8 +48,7 @@ import org.apache.activemq.artemis.utils.UTF8Util;
* packets until they are read using the ChannelBuffer interface, or the setOutputStream or
* saveStream are called.
*/
-public class LargeMessageControllerImpl implements LargeMessageController
-{
+public class LargeMessageControllerImpl implements LargeMessageController {
// Constants -----------------------------------------------------
private static final String READ_ONLY_ERROR_MESSAGE = "This is a read-only buffer, setOperations are not supported";
@@ -100,16 +99,14 @@ public class LargeMessageControllerImpl implements LargeMessageController
public LargeMessageControllerImpl(final ClientConsumerInternal consumerInternal,
final long totalSize,
- final long readTimeout)
- {
+ final long readTimeout) {
this(consumerInternal, totalSize, readTimeout, null);
}
public LargeMessageControllerImpl(final ClientConsumerInternal consumerInternal,
final long totalSize,
final long readTimeout,
- final File cachedFile)
- {
+ final File cachedFile) {
this(consumerInternal, totalSize, readTimeout, cachedFile, 10 * 1024);
}
@@ -117,17 +114,14 @@ public class LargeMessageControllerImpl implements LargeMessageController
final long totalSize,
final long readTimeout,
final File cachedFile,
- final int bufferSize)
- {
+ final int bufferSize) {
this.consumerInternal = consumerInternal;
this.readTimeout = readTimeout;
this.totalSize = totalSize;
- if (cachedFile == null)
- {
+ if (cachedFile == null) {
fileCache = null;
}
- else
- {
+ else {
fileCache = new FileCache(cachedFile);
}
this.bufferSize = bufferSize;
@@ -135,22 +129,18 @@ public class LargeMessageControllerImpl implements LargeMessageController
// Public --------------------------------------------------------
- public void setLocal(boolean local)
- {
+ public void setLocal(boolean local) {
this.local = local;
}
- public void discardUnusedPackets()
- {
- if (outStream == null)
- {
- if (local) return;
- try
- {
+ public void discardUnusedPackets() {
+ if (outStream == null) {
+ if (local)
+ return;
+ try {
checkForPacket(totalSize - 1);
}
- catch (Throwable ignored)
- {
+ catch (Throwable ignored) {
}
}
}
@@ -159,24 +149,18 @@ public class LargeMessageControllerImpl implements LargeMessageController
* TODO: move this to ConsumerContext as large message is a protocol specific thing
* Add a buff to the List, or save it to the OutputStream if set
*/
- public void addPacket(byte[] chunk, int flowControlSize, boolean isContinues)
- {
+ public void addPacket(byte[] chunk, int flowControlSize, boolean isContinues) {
int flowControlCredit = 0;
- synchronized (this)
- {
+ synchronized (this) {
packetAdded = true;
- if (outStream != null)
- {
- try
- {
- if (!isContinues)
- {
+ if (outStream != null) {
+ try {
+ if (!isContinues) {
streamEnded = true;
}
- if (fileCache != null)
- {
+ if (fileCache != null) {
fileCache.cachePackage(chunk);
}
@@ -186,70 +170,55 @@ public class LargeMessageControllerImpl implements LargeMessageController
notifyAll();
- if (streamEnded)
- {
+ if (streamEnded) {
outStream.close();
}
}
- catch (Exception e)
- {
+ catch (Exception e) {
ActiveMQClientLogger.LOGGER.errorAddingPacket(e);
handledException = e;
}
}
- else
- {
- if (fileCache != null)
- {
- try
- {
+ else {
+ if (fileCache != null) {
+ try {
fileCache.cachePackage(chunk);
}
- catch (Exception e)
- {
+ catch (Exception e) {
ActiveMQClientLogger.LOGGER.errorAddingPacket(e);
handledException = e;
}
}
-
largeMessageData.offer(new LargeData(chunk, flowControlSize, isContinues));
}
}
- if (flowControlCredit != 0)
- {
- try
- {
+ if (flowControlCredit != 0) {
+ try {
consumerInternal.flowControl(flowControlCredit, !isContinues);
}
- catch (Exception e)
- {
+ catch (Exception e) {
ActiveMQClientLogger.LOGGER.errorAddingPacket(e);
handledException = e;
}
}
}
- public void cancel()
- {
+ public void cancel() {
this.handledException = ActiveMQClientMessageBundle.BUNDLE.largeMessageInterrupted();
- synchronized (this)
- {
+ synchronized (this) {
int totalSize = 0;
LargeData polledPacket = null;
- while ((polledPacket = largeMessageData.poll()) != null)
- {
+ while ((polledPacket = largeMessageData.poll()) != null) {
totalSize += polledPacket.getFlowControlSize();
}
- try
- {
+ try {
consumerInternal.flowControl(totalSize, false);
}
- catch (Exception ignored)
- {
+ catch (Exception ignored) {
// what else can we do here?
ActiveMQClientLogger.LOGGER.errorCallingCancel(ignored);
}
@@ -262,32 +231,25 @@ public class LargeMessageControllerImpl implements LargeMessageController
}
}
- public synchronized void close()
- {
- if (fileCache != null)
- {
+ public synchronized void close() {
+ if (fileCache != null) {
fileCache.close();
}
}
- public void setOutputStream(final OutputStream output) throws ActiveMQException
- {
+ public void setOutputStream(final OutputStream output) throws ActiveMQException {
int totalFlowControl = 0;
boolean continues = false;
- synchronized (this)
- {
- if (currentPacket != null)
- {
+ synchronized (this) {
+ if (currentPacket != null) {
sendPacketToOutput(output, currentPacket);
currentPacket = null;
}
- while (handledException == null)
- {
+ while (handledException == null) {
LargeData packet = largeMessageData.poll();
- if (packet == null)
- {
+ if (packet == null) {
break;
}
totalFlowControl += packet.getFlowControlSize();
@@ -300,16 +262,13 @@ public class LargeMessageControllerImpl implements LargeMessageController
outStream = output;
}
- if (totalFlowControl > 0)
- {
+ if (totalFlowControl > 0) {
consumerInternal.flowControl(totalFlowControl, !continues);
}
}
- public synchronized void saveBuffer(final OutputStream output) throws ActiveMQException
- {
- if (streamClosed)
- {
+ public synchronized void saveBuffer(final OutputStream output) throws ActiveMQException {
+ if (streamClosed) {
throw ActiveMQClientMessageBundle.BUNDLE.largeMessageLostSession();
}
setOutputStream(output);
@@ -320,10 +279,8 @@ public class LargeMessageControllerImpl implements LargeMessageController
* @param timeWait Milliseconds to Wait. 0 means forever
* @throws ActiveMQException
*/
- public synchronized boolean waitCompletion(final long timeWait) throws ActiveMQException
- {
- if (outStream == null)
- {
+ public synchronized boolean waitCompletion(final long timeWait) throws ActiveMQException {
+ if (outStream == null) {
// There is no stream.. it will never achieve the end of streaming
return false;
}
@@ -332,34 +289,26 @@ public class LargeMessageControllerImpl implements LargeMessageController
// If timeWait = 0, we will use the readTimeout
// And we will check if no packets have arrived within readTimeout milliseconds
- if (timeWait != 0)
- {
+ if (timeWait != 0) {
timeOut = System.currentTimeMillis() + timeWait;
}
- else
- {
+ else {
timeOut = System.currentTimeMillis() + readTimeout;
}
- while (!streamEnded && handledException == null)
- {
- try
- {
+ while (!streamEnded && handledException == null) {
+ try {
this.wait(timeWait == 0 ? readTimeout : timeWait);
}
- catch (InterruptedException e)
- {
+ catch (InterruptedException e) {
throw new ActiveMQInterruptedException(e);
}
- if (!streamEnded && handledException == null)
- {
- if (timeWait != 0 && System.currentTimeMillis() > timeOut)
- {
+ if (!streamEnded && handledException == null) {
+ if (timeWait != 0 && System.currentTimeMillis() > timeOut) {
throw ActiveMQClientMessageBundle.BUNDLE.timeoutOnLargeMessage();
}
- else if (System.currentTimeMillis() > timeOut && !packetAdded)
- {
+ else if (System.currentTimeMillis() > timeOut && !packetAdded) {
throw ActiveMQClientMessageBundle.BUNDLE.timeoutOnLargeMessage();
}
}
@@ -374,21 +323,15 @@ public class LargeMessageControllerImpl implements LargeMessageController
/**
* @throws ActiveMQException
*/
- private void checkException() throws ActiveMQException
- {
+ private void checkException() throws ActiveMQException {
// it's not needed to copy it as we never set it back to null
// once the exception is set, the controller is pretty much useless
- if (handledException != null)
- {
- if (handledException instanceof ActiveMQException)
- {
- throw (ActiveMQException)handledException;
+ if (handledException != null) {
+ if (handledException instanceof ActiveMQException) {
+ throw (ActiveMQException) handledException;
}
- else
- {
- throw new ActiveMQException(ActiveMQExceptionType.LARGE_MESSAGE_ERROR_BODY,
- "Error on saving LargeMessageBufferImpl",
- handledException);
+ else {
+ throw new ActiveMQException(ActiveMQExceptionType.LARGE_MESSAGE_ERROR_BODY, "Error on saving LargeMessageBufferImpl", handledException);
}
}
}
@@ -396,54 +339,45 @@ public class LargeMessageControllerImpl implements LargeMessageController
// Channel Buffer Implementation ---------------------------------
@Override
- public int capacity()
- {
+ public int capacity() {
return -1;
}
- public byte readByte()
- {
+ public byte readByte() {
return getByte(readerIndex++);
}
@Override
- public byte getByte(final int index)
- {
- return getByte((long)index);
+ public byte getByte(final int index) {
+ return getByte((long) index);
}
- private byte getByte(final long index)
- {
+ private byte getByte(final long index) {
checkForPacket(index);
- if (fileCache != null && index < packetPosition)
- {
+ if (fileCache != null && index < packetPosition) {
return fileCache.getByteFromCache(index);
}
- else
- {
- return currentPacket.getChunk()[(int)(index - packetPosition)];
+ else {
+ return currentPacket.getChunk()[(int) (index - packetPosition)];
}
}
@Override
- public void getBytes(final int index, final ActiveMQBuffer dst, final int dstIndex, final int length)
- {
+ public void getBytes(final int index, final ActiveMQBuffer dst, final int dstIndex, final int length) {
byte[] destBytes = new byte[length];
getBytes(index, destBytes);
dst.setBytes(dstIndex, destBytes);
}
- private void getBytes(final long index, final ActiveMQBuffer dst, final int dstIndex, final int length)
- {
+ private void getBytes(final long index, final ActiveMQBuffer dst, final int dstIndex, final int length) {
byte[] destBytes = new byte[length];
getBytes(index, destBytes);
dst.setBytes(dstIndex, destBytes);
}
@Override
- public void getBytes(final int index, final byte[] dst, final int dstIndex, final int length)
- {
+ public void getBytes(final int index, final byte[] dst, final int dstIndex, final int length) {
byte[] bytesToGet = new byte[length];
getBytes(index, bytesToGet);
@@ -451,8 +385,7 @@ public class LargeMessageControllerImpl implements LargeMessageController
System.arraycopy(bytesToGet, 0, dst, dstIndex, length);
}
- public void getBytes(final long index, final byte[] dst, final int dstIndex, final int length)
- {
+ public void getBytes(final long index, final byte[] dst, final int dstIndex, final int length) {
byte[] bytesToGet = new byte[length];
getBytes(index, bytesToGet);
@@ -461,577 +394,472 @@ public class LargeMessageControllerImpl implements LargeMessageController
}
@Override
- public void getBytes(final int index, final ByteBuffer dst)
- {
+ public void getBytes(final int index, final ByteBuffer dst) {
byte[] bytesToGet = new byte[dst.remaining()];
getBytes(index, bytesToGet);
dst.put(bytesToGet);
}
- public void getBytes(final long index, final ByteBuffer dst)
- {
+ public void getBytes(final long index, final ByteBuffer dst) {
byte[] bytesToGet = new byte[dst.remaining()];
getBytes(index, bytesToGet);
dst.put(bytesToGet);
}
- public void getBytes(final int index, final OutputStream out, final int length) throws IOException
- {
+ public void getBytes(final int index, final OutputStream out, final int length) throws IOException {
byte[] bytesToGet = new byte[length];
getBytes(index, bytesToGet);
out.write(bytesToGet);
}
- public void getBytes(final long index, final OutputStream out, final int length) throws IOException
- {
+ public void getBytes(final long index, final OutputStream out, final int length) throws IOException {
byte[] bytesToGet = new byte[length];
getBytes(index, bytesToGet);
out.write(bytesToGet);
}
- public int getBytes(final int index, final GatheringByteChannel out, final int length) throws IOException
- {
+ public int getBytes(final int index, final GatheringByteChannel out, final int length) throws IOException {
byte[] bytesToGet = new byte[length];
getBytes(index, bytesToGet);
return out.write(ByteBuffer.wrap(bytesToGet));
}
- public int getInt(final int index)
- {
+ public int getInt(final int index) {
return (getByte(index) & 0xff) << 24 | (getByte(index + 1) & 0xff) << 16 |
(getByte(index + 2) & 0xff) << 8 |
(getByte(index + 3) & 0xff) << 0;
}
- public int getInt(final long index)
- {
+ public int getInt(final long index) {
return (getByte(index) & 0xff) << 24 | (getByte(index + 1) & 0xff) << 16 |
(getByte(index + 2) & 0xff) << 8 |
(getByte(index + 3) & 0xff) << 0;
}
@Override
- public long getLong(final int index)
- {
- return ((long)getByte(index) & 0xff) << 56 | ((long)getByte(index + 1) & 0xff) << 48 |
- ((long)getByte(index + 2) & 0xff) << 40 |
- ((long)getByte(index + 3) & 0xff) << 32 |
- ((long)getByte(index + 4) & 0xff) << 24 |
- ((long)getByte(index + 5) & 0xff) << 16 |
- ((long)getByte(index + 6) & 0xff) << 8 |
- ((long)getByte(index + 7) & 0xff) << 0;
- }
-
- public long getLong(final long index)
- {
- return ((long)getByte(index) & 0xff) << 56 | ((long)getByte(index + 1) & 0xff) << 48 |
- ((long)getByte(index + 2) & 0xff) << 40 |
- ((long)getByte(index + 3) & 0xff) << 32 |
- ((long)getByte(index + 4) & 0xff) << 24 |
- ((long)getByte(index + 5) & 0xff) << 16 |
- ((long)getByte(index + 6) & 0xff) << 8 |
- ((long)getByte(index + 7) & 0xff) << 0;
+ public long getLong(final int index) {
+ return ((long) getByte(index) & 0xff) << 56 | ((long) getByte(index + 1) & 0xff) << 48 |
+ ((long) getByte(index + 2) & 0xff) << 40 |
+ ((long) getByte(index + 3) & 0xff) << 32 |
+ ((long) getByte(index + 4) & 0xff) << 24 |
+ ((long) getByte(index + 5) & 0xff) << 16 |
+ ((long) getByte(index + 6) & 0xff) << 8 |
+ ((long) getByte(index + 7) & 0xff) << 0;
+ }
+
+ public long getLong(final long index) {
+ return ((long) getByte(index) & 0xff) << 56 | ((long) getByte(index + 1) & 0xff) << 48 |
+ ((long) getByte(index + 2) & 0xff) << 40 |
+ ((long) getByte(index + 3) & 0xff) << 32 |
+ ((long) getByte(index + 4) & 0xff) << 24 |
+ ((long) getByte(index + 5) & 0xff) << 16 |
+ ((long) getByte(index + 6) & 0xff) << 8 |
+ ((long) getByte(index + 7) & 0xff) << 0;
}
@Override
- public short getShort(final int index)
- {
- return (short)(getByte(index) << 8 | getByte(index + 1) & 0xFF);
+ public short getShort(final int index) {
+ return (short) (getByte(index) << 8 | getByte(index + 1) & 0xFF);
}
- public short getShort(final long index)
- {
- return (short)(getByte(index) << 8 | getByte(index + 1) & 0xFF);
+ public short getShort(final long index) {
+ return (short) (getByte(index) << 8 | getByte(index + 1) & 0xFF);
}
- private int getUnsignedMedium(final int index)
- {
+ private int getUnsignedMedium(final int index) {
return (getByte(index) & 0xff) << 16 | (getByte(index + 1) & 0xff) << 8 | (getByte(index + 2) & 0xff) << 0;
}
- public int getUnsignedMedium(final long index)
- {
+ public int getUnsignedMedium(final long index) {
return (getByte(index) & 0xff) << 16 | (getByte(index + 1) & 0xff) << 8 | (getByte(index + 2) & 0xff) << 0;
}
@Override
- public void setByte(final int index, final byte value)
- {
+ public void setByte(final int index, final byte value) {
throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE);
}
@Override
- public void setBytes(final int index, final ActiveMQBuffer src, final int srcIndex, final int length)
- {
+ public void setBytes(final int index, final ActiveMQBuffer src, final int srcIndex, final int length) {
throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE);
}
@Override
- public void setBytes(final int index, final byte[] src, final int srcIndex, final int length)
- {
+ public void setBytes(final int index, final byte[] src, final int srcIndex, final int length) {
throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE);
}
@Override
- public void setBytes(final int index, final ByteBuffer src)
- {
+ public void setBytes(final int index, final ByteBuffer src) {
throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE);
}
@Override
- public void setInt(final int index, final int value)
- {
+ public void setInt(final int index, final int value) {
throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE);
}
@Override
- public void setLong(final int index, final long value)
- {
+ public void setLong(final int index, final long value) {
throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE);
}
@Override
- public void setShort(final int index, final short value)
- {
+ public void setShort(final int index, final short value) {
throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE);
}
@Override
- public ByteBuffer toByteBuffer(final int index, final int length)
- {
+ public ByteBuffer toByteBuffer(final int index, final int length) {
throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE);
}
- public int readerIndex()
- {
- return (int)readerIndex;
+ public int readerIndex() {
+ return (int) readerIndex;
}
- public void readerIndex(final int readerIndex)
- {
- try
- {
+ public void readerIndex(final int readerIndex) {
+ try {
checkForPacket(readerIndex);
}
- catch (Exception e)
- {
+ catch (Exception e) {
ActiveMQClientLogger.LOGGER.errorReadingIndex(e);
throw new RuntimeException(e.getMessage(), e);
}
this.readerIndex = readerIndex;
}
- public int writerIndex()
- {
- return (int)totalSize;
+ public int writerIndex() {
+ return (int) totalSize;
}
- public long getSize()
- {
+ public long getSize() {
return totalSize;
}
- public void writerIndex(final int writerIndex)
- {
+ public void writerIndex(final int writerIndex) {
throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE);
}
- public void setIndex(final int readerIndex, final int writerIndex)
- {
- try
- {
+ public void setIndex(final int readerIndex, final int writerIndex) {
+ try {
checkForPacket(readerIndex);
}
- catch (Exception e)
- {
+ catch (Exception e) {
ActiveMQClientLogger.LOGGER.errorSettingIndex(e);
throw new RuntimeException(e.getMessage(), e);
}
this.readerIndex = readerIndex;
}
- public void clear()
- {
+ public void clear() {
}
- public boolean readable()
- {
+ public boolean readable() {
return true;
}
- public boolean writable()
- {
+ public boolean writable() {
return false;
}
- public int readableBytes()
- {
+ public int readableBytes() {
long readableBytes = totalSize - readerIndex;
- if (readableBytes > Integer.MAX_VALUE)
- {
+ if (readableBytes > Integer.MAX_VALUE) {
return Integer.MAX_VALUE;
}
- else
- {
- return (int)(totalSize - readerIndex);
+ else {
+ return (int) (totalSize - readerIndex);
}
}
- public int writableBytes()
- {
+ public int writableBytes() {
return 0;
}
- public void markReaderIndex()
- {
+ public void markReaderIndex() {
throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE);
}
- public void resetReaderIndex()
- {
- try
- {
+ public void resetReaderIndex() {
+ try {
checkForPacket(0);
}
- catch (Exception e)
- {
+ catch (Exception e) {
ActiveMQClientLogger.LOGGER.errorReSettingIndex(e);
throw new RuntimeException(e.getMessage(), e);
}
}
- public void markWriterIndex()
- {
+ public void markWriterIndex() {
throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE);
}
- public void resetWriterIndex()
- {
+ public void resetWriterIndex() {
throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE);
}
- public void discardReadBytes()
- {
+ public void discardReadBytes() {
throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE);
}
- public short getUnsignedByte(final int index)
- {
- return (short)(getByte(index) & 0xFF);
+ public short getUnsignedByte(final int index) {
+ return (short) (getByte(index) & 0xFF);
}
- public int getUnsignedShort(final int index)
- {
+ public int getUnsignedShort(final int index) {
return getShort(index) & 0xFFFF;
}
- public int getMedium(final int index)
- {
+ public int getMedium(final int index) {
int value = getUnsignedMedium(index);
- if ((value & 0x800000) != 0)
- {
+ if ((value & 0x800000) != 0) {
value |= 0xff000000;
}
return value;
}
- public long getUnsignedInt(final int index)
- {
+ public long getUnsignedInt(final int index) {
return getInt(index) & 0xFFFFFFFFL;
}
- public void getBytes(int index, final byte[] dst)
- {
+ public void getBytes(int index, final byte[] dst) {
// TODO: optimize this by using System.arraycopy
- for (int i = 0; i < dst.length; i++)
- {
+ for (int i = 0; i < dst.length; i++) {
dst[i] = getByte(index++);
}
}
- public void getBytes(long index, final byte[] dst)
- {
+ public void getBytes(long index, final byte[] dst) {
// TODO: optimize this by using System.arraycopy
- for (int i = 0; i < dst.length; i++)
- {
+ for (int i = 0; i < dst.length; i++) {
dst[i] = getByte(index++);
}
}
- public void getBytes(final int index, final ActiveMQBuffer dst)
- {
+ public void getBytes(final int index, final ActiveMQBuffer dst) {
getBytes(index, dst, dst.writableBytes());
}
- public void getBytes(final int index, final ActiveMQBuffer dst, final int length)
- {
- if (length > dst.writableBytes())
- {
+ public void getBytes(final int index, final ActiveMQBuffer dst, final int length) {
+ if (length > dst.writableBytes()) {
throw new IndexOutOfBoundsException();
}
getBytes(index, dst, dst.writerIndex(), length);
dst.writerIndex(dst.writerIndex() + length);
}
- public void setBytes(final int index, final byte[] src)
- {
+ public void setBytes(final int index, final byte[] src) {
throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE);
}
- public void setBytes(final int index, final ActiveMQBuffer src)
- {
+ public void setBytes(final int index, final ActiveMQBuffer src) {
throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE);
}
- public void setBytes(final int index, final ActiveMQBuffer src, final int length)
- {
+ public void setBytes(final int index, final ActiveMQBuffer src, final int length) {
throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE);
}
- public void setZero(final int index, final int length)
- {
+ public void setZero(final int index, final int length) {
throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE);
}
- public short readUnsignedByte()
- {
- return (short)(readByte() & 0xFF);
+ public short readUnsignedByte() {
+ return (short) (readByte() & 0xFF);
}
- public short readShort()
- {
+ public short readShort() {
short v = getShort(readerIndex);
readerIndex += 2;
return v;
}
- public int readUnsignedShort()
- {
+ public int readUnsignedShort() {
return readShort() & 0xFFFF;
}
- public int readMedium()
- {
+ public int readMedium() {
int value = readUnsignedMedium();
- if ((value & 0x800000) != 0)
- {
+ if ((value & 0x800000) != 0) {
value |= 0xff000000;
}
return value;
}
- public int readUnsignedMedium()
- {
+ public int readUnsignedMedium() {
int v = getUnsignedMedium(readerIndex);
readerIndex += 3;
return v;
}
- public int readInt()
- {
+ public int readInt() {
int v = getInt(readerIndex);
readerIndex += 4;
return v;
}
- public int readInt(final int pos)
- {
+ public int readInt(final int pos) {
int v = getInt(pos);
return v;
}
- public long readUnsignedInt()
- {
+ public long readUnsignedInt() {
return readInt() & 0xFFFFFFFFL;
}
- public long readLong()
- {
+ public long readLong() {
long v = getLong(readerIndex);
readerIndex += 8;
return v;
}
- public void readBytes(final byte[] dst, final int dstIndex, final int length)
- {
+ public void readBytes(final byte[] dst, final int dstIndex, final int length) {
getBytes(readerIndex, dst, dstIndex, length);
readerIndex += length;
}
- public void readBytes(final byte[] dst)
- {
+ public void readBytes(final byte[] dst) {
readBytes(dst, 0, dst.length);
}
- public void readBytes(final ActiveMQBuffer dst)
- {
+ public void readBytes(final ActiveMQBuffer dst) {
readBytes(dst, dst.writableBytes());
}
- public void readBytes(final ActiveMQBuffer dst, final int length)
- {
- if (length > dst.writableBytes())
- {
+ public void readBytes(final ActiveMQBuffer dst, final int length) {
+ if (length > dst.writableBytes()) {
throw new IndexOutOfBoundsException();
}
readBytes(dst, dst.writerIndex(), length);
dst.writerIndex(dst.writerIndex() + length);
}
- public void readBytes(final ActiveMQBuffer dst, final int dstIndex, final int length)
- {
+ public void readBytes(final ActiveMQBuffer dst, final int dstIndex, final int length) {
getBytes(readerIndex, dst, dstIndex, length);
readerIndex += length;
}
- public void readBytes(final ByteBuffer dst)
- {
+ public void readBytes(final ByteBuffer dst) {
int length = dst.remaining();
getBytes(readerIndex, dst);
readerIndex += length;
}
- public int readBytes(final GatheringByteChannel out, final int length) throws IOException
- {
- int readBytes = getBytes((int)readerIndex, out, length);
+ public int readBytes(final GatheringByteChannel out, final int length) throws IOException {
+ int readBytes = getBytes((int) readerIndex, out, length);
readerIndex += readBytes;
return readBytes;
}
- public void readBytes(final OutputStream out, final int length) throws IOException
- {
+ public void readBytes(final OutputStream out, final int length) throws IOException {
getBytes(readerIndex, out, length);
readerIndex += length;
}
- public void skipBytes(final int length)
- {
+ public void skipBytes(final int length) {
long newReaderIndex = readerIndex + length;
checkForPacket(newReaderIndex);
readerIndex = newReaderIndex;
}
- public void writeByte(final byte value)
- {
+ public void writeByte(final byte value) {
throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE);
}
- public void writeShort(final short value)
- {
+ public void writeShort(final short value) {
throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE);
}
- public void writeMedium(final int value)
- {
+ public void writeMedium(final int value) {
throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE);
}
- public void writeInt(final int value)
- {
+ public void writeInt(final int value) {
throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE);
}
- public void writeLong(final long value)
- {
+ public void writeLong(final long value) {
throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE);
}
- public void writeBytes(final byte[] src, final int srcIndex, final int length)
- {
+ public void writeBytes(final byte[] src, final int srcIndex, final int length) {
throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE);
}
- public void writeBytes(final byte[] src)
- {
+ public void writeBytes(final byte[] src) {
throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE);
}
- public void writeBytes(final ActiveMQBuffer src)
- {
+ public void writeBytes(final ActiveMQBuffer src) {
throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE);
}
- public void writeBytes(final ActiveMQBuffer src, final int length)
- {
+ public void writeBytes(final ActiveMQBuffer src, final int length) {
throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE);
}
- public void writeBytes(final ByteBuffer src)
- {
+ public void writeBytes(final ByteBuffer src) {
throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE);
}
- public int writeBytes(final InputStream in, final int length) throws IOException
- {
+ public int writeBytes(final InputStream in, final int length) throws IOException {
throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE);
}
- public int writeBytes(final ScatteringByteChannel in, final int length) throws IOException
- {
+ public int writeBytes(final ScatteringByteChannel in, final int length) throws IOException {
throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE);
}
- public void writeZero(final int length)
- {
+ public void writeZero(final int length) {
throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE);
}
- public ByteBuffer toByteBuffer()
- {
+ public ByteBuffer toByteBuffer() {
throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE);
}
- public ByteBuffer[] toByteBuffers()
- {
+ public ByteBuffer[] toByteBuffers() {
throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE);
}
- public ByteBuffer[] toByteBuffers(final int index, final int length)
- {
+ public ByteBuffer[] toByteBuffers(final int index, final int length) {
throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE);
}
- public String toString(final String charsetName)
- {
+ public String toString(final String charsetName) {
throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE);
}
- public Object getUnderlyingBuffer()
- {
+ public Object getUnderlyingBuffer() {
return this;
}
@Override
- public boolean readBoolean()
- {
+ public boolean readBoolean() {
return readByte() != 0;
}
@Override
- public char readChar()
- {
- return (char)readShort();
+ public char readChar() {
+ return (char) readShort();
}
- public char getChar(final int index)
- {
- return (char)getShort(index);
+ public char getChar(final int index) {
+ return (char) getShort(index);
}
- public double getDouble(final int index)
- {
+ public double getDouble(final int index) {
return Double.longBitsToDouble(getLong(index));
}
- public float getFloat(final int index)
- {
+ public float getFloat(final int index) {
return Float.intBitsToFloat(getInt(index));
}
- public ActiveMQBuffer readBytes(final int length)
- {
+ public ActiveMQBuffer readBytes(final int length) {
byte[] bytesToGet = new byte[length];
getBytes(readerIndex, bytesToGet);
readerIndex += length;
@@ -1039,48 +867,39 @@ public class LargeMessageControllerImpl implements LargeMessageController
}
@Override
- public double readDouble()
- {
+ public double readDouble() {
return Double.longBitsToDouble(readLong());
}
@Override
- public float readFloat()
- {
+ public float readFloat() {
return Float.intBitsToFloat(readInt());
}
@Override
- public SimpleString readNullableSimpleString()
- {
+ public SimpleString readNullableSimpleString() {
int b = readByte();
- if (b == DataConstants.NULL)
- {
+ if (b == DataConstants.NULL) {
return null;
}
- else
- {
+ else {
return readSimpleString();
}
}
@Override
- public String readNullableString()
- {
+ public String readNullableString() {
int b = readByte();
- if (b == DataConstants.NULL)
- {
+ if (b == DataConstants.NULL) {
return null;
}
- else
- {
+ else {
return readString();
}
}
@Override
- public SimpleString readSimpleString()
- {
+ public SimpleString readSimpleString() {
int len = readInt();
byte[] data = new byte[len];
readBytes(data);
@@ -1088,98 +907,81 @@ public class LargeMessageControllerImpl implements LargeMessageController
}
@Override
- public String readString()
- {
+ public String readString() {
int len = readInt();
- if (len < 9)
- {
+ if (len < 9) {
char[] chars = new char[len];
- for (int i = 0; i < len; i++)
- {
- chars[i] = (char)readShort();
+ for (int i = 0; i < len; i++) {
+ chars[i] = (char) readShort();
}
return new String(chars);
}
- else if (len < 0xfff)
- {
+ else if (len < 0xfff) {
return readUTF();
}
- else
- {
+ else {
return readSimpleString().toString();
}
}
@Override
- public String readUTF()
- {
+ public String readUTF() {
return UTF8Util.readUTF(this);
}
@Override
- public void writeBoolean(final boolean val)
- {
+ public void writeBoolean(final boolean val) {
throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE);
}
@Override
- public void writeChar(final char val)
- {
+ public void writeChar(final char val) {
throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE);
}
@Override
- public void writeDouble(final double val)
- {
+ public void writeDouble(final double val) {
throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE);
}
@Override
- public void writeFloat(final float val)
- {
+ public void writeFloat(final float val) {
throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE);
}
@Override
- public void writeNullableSimpleString(final SimpleString val)
- {
+ public void writeNullableSimpleString(final SimpleString val) {
throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE);
}
@Override
- public void writeNullableString(final String val)
- {
+ public void writeNullableString(final String val) {
throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE);
}
@Override
- public void writeSimpleString(final SimpleString val)
- {
+ public void writeSimpleString(final SimpleString val) {
throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE);
}
@Override
- public void writeString(final String val)
- {
+ public void writeString(final String val) {
throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE);
}
@Override
- public void writeUTF(final String utf)
- {
+ public void writeUTF(final String utf) {
throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE);
}
- public ActiveMQBuffer copy()
- {
+ public ActiveMQBuffer copy() {
throw new UnsupportedOperationException();
}
- public ActiveMQBuffer slice(final int index, final int length)
- {
+ public ActiveMQBuffer slice(final int index, final int length) {
throw new UnsupportedOperationException();
}
@@ -1188,38 +990,30 @@ public class LargeMessageControllerImpl implements LargeMessageController
* @param packet
* @throws ActiveMQException
*/
- private void sendPacketToOutput(final OutputStream output, final LargeData packet) throws ActiveMQException
- {
- try
- {
+ private void sendPacketToOutput(final OutputStream output, final LargeData packet) throws ActiveMQException {
+ try {
output.write(packet.getChunk());
- if (!packet.isContinues())
- {
+ if (!packet.isContinues()) {
streamEnded = true;
output.close();
}
}
- catch (IOException e)
- {
+ catch (IOException e) {
throw ActiveMQClientMessageBundle.BUNDLE.errorWritingLargeMessage(e);
}
}
- private void popPacket()
- {
- try
- {
+ private void popPacket() {
+ try {
- if (streamEnded)
- {
+ if (streamEnded) {
// no more packets, we are over the last one already
throw new IndexOutOfBoundsException();
}
int sizeToAdd = currentPacket != null ? currentPacket.chunk.length : 1;
currentPacket = largeMessageData.poll(readTimeout, TimeUnit.SECONDS);
- if (currentPacket == null)
- {
+ if (currentPacket == null) {
throw new IndexOutOfBoundsException();
}
@@ -1236,53 +1030,42 @@ public class LargeMessageControllerImpl implements LargeMessageController
packetLastPosition = packetPosition + currentPacket.getChunk().length;
}
- catch (IndexOutOfBoundsException e)
- {
+ catch (IndexOutOfBoundsException e) {
throw e;
}
- catch (Exception e)
- {
+ catch (Exception e) {
throw new RuntimeException(e);
}
}
- private void checkForPacket(final long index)
- {
- if (outStream != null)
- {
+ private void checkForPacket(final long index) {
+ if (outStream != null) {
throw new IllegalAccessError("Can't read the messageBody after setting outputStream");
}
- if (index >= totalSize)
- {
+ if (index >= totalSize) {
throw new IndexOutOfBoundsException();
}
- if (streamClosed)
- {
+ if (streamClosed) {
throw new IllegalAccessError("The consumer associated with this large message was closed before the body was read");
}
- if (fileCache == null)
- {
- if (index < lastIndex)
- {
+ if (fileCache == null) {
+ if (index < lastIndex) {
throw new IllegalAccessError("LargeMessage have read-only and one-way buffers");
}
lastIndex = index;
}
- while (index >= packetLastPosition && !streamEnded)
- {
+ while (index >= packetLastPosition && !streamEnded) {
popPacket();
}
}
- private final class FileCache
- {
+ private final class FileCache {
- public FileCache(final File cachedFile)
- {
+ public FileCache(final File cachedFile) {
this.cachedFile = cachedFile;
}
@@ -1298,18 +1081,14 @@ public class LargeMessageControllerImpl implements LargeMessageController
private volatile FileChannel cachedChannel;
- private synchronized void readCache(final long position)
- {
+ private synchronized void readCache(final long position) {
- try
- {
- if (position < readCachePositionStart || position > readCachePositionEnd)
- {
+ try {
+ if (position < readCachePositionStart || position > readCachePositionEnd) {
checkOpen();
- if (position > cachedChannel.size())
- {
+ if (position > cachedChannel.size()) {
throw new ArrayIndexOutOfBoundsException("position > " + cachedChannel.size());
}
@@ -1317,8 +1096,7 @@ public class LargeMessageControllerImpl implements LargeMessageController
cachedChannel.position(readCachePositionStart);
- if (readCache == null)
- {
+ if (readCache == null) {
readCache = ByteBuffer.allocate(bufferSize);
}
@@ -1327,27 +1105,23 @@ public class LargeMessageControllerImpl implements LargeMessageController
readCachePositionEnd = readCachePositionStart + cachedChannel.read(readCache) - 1;
}
}
- catch (Exception e)
- {
+ catch (Exception e) {
ActiveMQClientLogger.LOGGER.errorReadingCache(e);
throw new RuntimeException(e.getMessage(), e);
}
- finally
- {
+ finally {
close();
}
}
- public synchronized byte getByteFromCache(final long position)
- {
+ public synchronized byte getByteFromCache(final long position) {
readCache(position);
- return readCache.get((int)(position - readCachePositionStart));
+ return readCache.get((int) (position - readCachePositionStart));
}
- public void cachePackage(final byte[] body) throws Exception
- {
+ public void cachePackage(final byte[] body) throws Exception {
checkOpen();
cachedChannel.position(cachedChannel.size());
@@ -1359,39 +1133,30 @@ public class LargeMessageControllerImpl implements LargeMessageController
/**
* @throws FileNotFoundException
*/
- public void checkOpen() throws FileNotFoundException
- {
- if (cachedFile != null || !cachedChannel.isOpen())
- {
+ public void checkOpen() throws FileNotFoundException {
+ if (cachedFile != null || !cachedChannel.isOpen()) {
cachedRAFile = new RandomAccessFile(cachedFile, "rw");
cachedChannel = cachedRAFile.getChannel();
}
}
- public void close()
- {
- if (cachedChannel != null && cachedChannel.isOpen())
- {
- try
- {
+ public void close() {
+ if (cachedChannel != null && cachedChannel.isOpen()) {
+ try {
cachedChannel.close();
}
- catch (Exception e)
- {
+ catch (Exception e) {
ActiveMQClientLogger.LOGGER.errorClosingCache(e);
}
cachedChannel = null;
}
- if (cachedRAFile != null)
- {
- try
- {
+ if (cachedRAFile != null) {
+ try {
cachedRAFile.close();
}
- catch (Exception e)
- {
+ catch (Exception e) {
ActiveMQClientLogger.LOGGER.errorClosingCache(e);
}
cachedRAFile = null;
@@ -1400,17 +1165,13 @@ public class LargeMessageControllerImpl implements LargeMessageController
}
@Override
- protected void finalize()
- {
+ protected void finalize() {
close();
- if (cachedFile != null && cachedFile.exists())
- {
- try
- {
+ if (cachedFile != null && cachedFile.exists()) {
+ try {
cachedFile.delete();
}
- catch (Exception e)
- {
+ catch (Exception e) {
ActiveMQClientLogger.LOGGER.errorFinalisingCache(e);
}
}
@@ -1418,83 +1179,69 @@ public class LargeMessageControllerImpl implements LargeMessageController
}
- public ByteBuf byteBuf()
- {
+ public ByteBuf byteBuf() {
return null;
}
- public ActiveMQBuffer copy(final int index, final int length)
- {
+ public ActiveMQBuffer copy(final int index, final int length) {
throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE);
}
- public ActiveMQBuffer duplicate()
- {
+ public ActiveMQBuffer duplicate() {
throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE);
}
- public ActiveMQBuffer readSlice(final int length)
- {
+ public ActiveMQBuffer readSlice(final int length) {
throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE);
}
- public void setChar(final int index, final char value)
- {
+ public void setChar(final int index, final char value) {
throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE);
}
- public void setDouble(final int index, final double value)
- {
+ public void setDouble(final int index, final double value) {
throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE);
}
- public void setFloat(final int index, final float value)
- {
+ public void setFloat(final int index, final float value) {
throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE);
}
- public ActiveMQBuffer slice()
- {
+ public ActiveMQBuffer slice() {
throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE);
}
- public void writeBytes(final ActiveMQBuffer src, final int srcIndex, final int length)
- {
+ public void writeBytes(final ActiveMQBuffer src, final int srcIndex, final int length) {
throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE);
}
- private static class LargeData
- {
+ private static class LargeData {
+
final byte[] chunk;
final int flowControlSize;
final boolean continues;
- public LargeData()
- {
+ public LargeData() {
continues = false;
flowControlSize = 0;
chunk = null;
}
- public LargeData(byte[] chunk, int flowControlSize, boolean continues)
- {
+ public LargeData(byte[] chunk, int flowControlSize, boolean continues) {
this.chunk = chunk;
this.flowControlSize = flowControlSize;
this.continues = continues;
}
- public byte[] getChunk()
- {
+ public byte[] getChunk() {
return chunk;
}
- public int getFlowControlSize()
- {
+ public int getFlowControlSize() {
return flowControlSize;
}
- public boolean isContinues()
- {
+ public boolean isContinues() {
return continues;
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/QueueQueryImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/QueueQueryImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/QueueQueryImpl.java
index af76497..27607bc 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/QueueQueryImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/QueueQueryImpl.java
@@ -19,8 +19,7 @@ package org.apache.activemq.artemis.core.client.impl;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientSession;
-public class QueueQueryImpl implements ClientSession.QueueQuery
-{
+public class QueueQueryImpl implements ClientSession.QueueQuery {
private final boolean exists;
@@ -47,8 +46,7 @@ public class QueueQueryImpl implements ClientSession.QueueQuery
final SimpleString filterString,
final SimpleString address,
final SimpleString name,
- final boolean exists)
- {
+ final boolean exists) {
this(durable, temporary, consumerCount, messageCount, filterString, address, name, exists, false);
}
@@ -60,8 +58,7 @@ public class QueueQueryImpl implements ClientSession.QueueQuery
final SimpleString address,
final SimpleString name,
final boolean exists,
- final boolean autoCreateJmsQueues)
- {
+ final boolean autoCreateJmsQueues) {
this.durable = durable;
this.temporary = temporary;
this.consumerCount = consumerCount;
@@ -73,48 +70,39 @@ public class QueueQueryImpl implements ClientSession.QueueQuery
this.autoCreateJmsQueues = autoCreateJmsQueues;
}
- public SimpleString getName()
- {
+ public SimpleString getName() {
return name;
}
- public SimpleString getAddress()
- {
+ public SimpleString getAddress() {
return address;
}
- public int getConsumerCount()
- {
+ public int getConsumerCount() {
return consumerCount;
}
- public SimpleString getFilterString()
- {
+ public SimpleString getFilterString() {
return filterString;
}
- public long getMessageCount()
- {
+ public long getMessageCount() {
return messageCount;
}
- public boolean isDurable()
- {
+ public boolean isDurable() {
return durable;
}
- public boolean isAutoCreateJmsQueues()
- {
+ public boolean isAutoCreateJmsQueues() {
return autoCreateJmsQueues;
}
- public boolean isTemporary()
- {
+ public boolean isTemporary() {
return temporary;
}
- public boolean isExists()
- {
+ public boolean isExists() {
return exists;
}