You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2015/08/10 17:13:06 UTC
[07/53] [abbrv] [partial] activemq-artemis git commit: automatic
checkstyle change
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java
index d20045e..ae93a31 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java
@@ -36,8 +36,8 @@ import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.journal.ActiveMQJournalBundle;
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
-public final class NIOSequentialFile extends AbstractSequentialFile
-{
+public final class NIOSequentialFile extends AbstractSequentialFile {
+
private FileChannel channel;
private RandomAccessFile rfile;
@@ -55,24 +55,20 @@ public final class NIOSequentialFile extends AbstractSequentialFile
final File directory,
final String file,
final int maxIO,
- final Executor writerExecutor)
- {
+ final Executor writerExecutor) {
super(directory, file, factory, writerExecutor);
defaultMaxIO = maxIO;
}
- public int getAlignment()
- {
+ public int getAlignment() {
return 1;
}
- public int calculateBlockStart(final int position)
- {
+ public int calculateBlockStart(final int position) {
return position;
}
- public synchronized boolean isOpen()
- {
+ public synchronized boolean isOpen() {
return channel != null;
}
@@ -80,50 +76,42 @@ public final class NIOSequentialFile extends AbstractSequentialFile
* this.maxIO represents the default maxIO.
* Some operations while initializing files on the journal may require a different maxIO
*/
- public synchronized void open() throws IOException
- {
+ public synchronized void open() throws IOException {
open(defaultMaxIO, true);
}
- public void open(final int maxIO, final boolean useExecutor) throws IOException
- {
- try
- {
+ public void open(final int maxIO, final boolean useExecutor) throws IOException {
+ try {
rfile = new RandomAccessFile(getFile(), "rw");
channel = rfile.getChannel();
fileSize = channel.size();
}
- catch (IOException e)
- {
+ catch (IOException e) {
factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
throw e;
}
- if (writerExecutor != null && useExecutor)
- {
+ if (writerExecutor != null && useExecutor) {
maxIOSemaphore = new Semaphore(maxIO);
this.maxIO = maxIO;
}
}
- public void fill(final int size) throws IOException
- {
+ public void fill(final int size) throws IOException {
ByteBuffer bb = ByteBuffer.allocate(size);
bb.limit(size);
bb.position(0);
- try
- {
+ try {
channel.position(0);
channel.write(bb);
channel.force(false);
channel.position(0);
}
- catch (IOException e)
- {
+ catch (IOException e) {
factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
throw e;
}
@@ -132,33 +120,26 @@ public final class NIOSequentialFile extends AbstractSequentialFile
}
@Override
- public synchronized void close() throws IOException, InterruptedException, ActiveMQException
- {
+ public synchronized void close() throws IOException, InterruptedException, ActiveMQException {
super.close();
- if (maxIOSemaphore != null)
- {
- while (!maxIOSemaphore.tryAcquire(maxIO, 60, TimeUnit.SECONDS))
- {
+ if (maxIOSemaphore != null) {
+ while (!maxIOSemaphore.tryAcquire(maxIO, 60, TimeUnit.SECONDS)) {
ActiveMQJournalLogger.LOGGER.errorClosingFile(getFileName());
}
}
maxIOSemaphore = null;
- try
- {
- if (channel != null)
- {
+ try {
+ if (channel != null) {
channel.close();
}
- if (rfile != null)
- {
+ if (rfile != null) {
rfile.close();
}
}
- catch (IOException e)
- {
+ catch (IOException e) {
factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
throw e;
}
@@ -169,24 +150,19 @@ public final class NIOSequentialFile extends AbstractSequentialFile
notifyAll();
}
- public int read(final ByteBuffer bytes) throws Exception
- {
+ public int read(final ByteBuffer bytes) throws Exception {
return read(bytes, null);
}
- public synchronized int read(final ByteBuffer bytes, final IOCallback callback) throws IOException,
- ActiveMQIllegalStateException
- {
- try
- {
- if (channel == null)
- {
+ public synchronized int read(final ByteBuffer bytes,
+ final IOCallback callback) throws IOException, ActiveMQIllegalStateException {
+ try {
+ if (channel == null) {
throw new ActiveMQIllegalStateException("File " + this.getFileName() + " has a null channel");
}
int bytesRead = channel.read(bytes);
- if (callback != null)
- {
+ if (callback != null) {
callback.done();
}
@@ -194,10 +170,8 @@ public final class NIOSequentialFile extends AbstractSequentialFile
return bytesRead;
}
- catch (IOException e)
- {
- if (callback != null)
- {
+ catch (IOException e) {
+ if (callback != null) {
callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), e.getLocalizedMessage());
}
@@ -207,96 +181,76 @@ public final class NIOSequentialFile extends AbstractSequentialFile
}
}
- public void sync() throws IOException
- {
- if (channel != null)
- {
- try
- {
+ public void sync() throws IOException {
+ if (channel != null) {
+ try {
channel.force(false);
}
- catch (IOException e)
- {
+ catch (IOException e) {
factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
throw e;
}
}
}
- public long size() throws IOException
- {
- if (channel == null)
- {
+ public long size() throws IOException {
+ if (channel == null) {
return getFile().length();
}
- try
- {
+ try {
return channel.size();
}
- catch (IOException e)
- {
+ catch (IOException e) {
factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
throw e;
}
}
@Override
- public void position(final long pos) throws IOException
- {
- try
- {
+ public void position(final long pos) throws IOException {
+ try {
super.position(pos);
channel.position(pos);
}
- catch (IOException e)
- {
+ catch (IOException e) {
factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
throw e;
}
}
@Override
- public String toString()
- {
+ public String toString() {
return "NIOSequentialFile " + getFile();
}
- public SequentialFile cloneFile()
- {
+ public SequentialFile cloneFile() {
return new NIOSequentialFile(factory, directory, getFileName(), maxIO, writerExecutor);
}
- public void writeDirect(final ByteBuffer bytes, final boolean sync, final IOCallback callback)
- {
- if (callback == null)
- {
+ public void writeDirect(final ByteBuffer bytes, final boolean sync, final IOCallback callback) {
+ if (callback == null) {
throw new NullPointerException("callback parameter need to be set");
}
- try
- {
+ try {
internalWrite(bytes, sync, callback);
}
- catch (Exception e)
- {
+ catch (Exception e) {
callback.onError(ActiveMQExceptionType.GENERIC_EXCEPTION.getCode(), e.getMessage());
}
}
- public void writeDirect(final ByteBuffer bytes, final boolean sync) throws Exception
- {
+ public void writeDirect(final ByteBuffer bytes, final boolean sync) throws Exception {
internalWrite(bytes, sync, null);
}
- public void writeInternal(final ByteBuffer bytes) throws Exception
- {
+ public void writeInternal(final ByteBuffer bytes) throws Exception {
internalWrite(bytes, true, null);
}
@Override
- protected ByteBuffer newBuffer(int size, final int limit)
- {
+ protected ByteBuffer newBuffer(int size, final int limit) {
// For NIO, we don't need to allocate a buffer the entire size of the timed buffer, unlike AIO
size = limit;
@@ -304,16 +258,14 @@ public final class NIOSequentialFile extends AbstractSequentialFile
return super.newBuffer(size, limit);
}
- private void internalWrite(final ByteBuffer bytes, final boolean sync, final IOCallback callback) throws IOException, ActiveMQIOErrorException, InterruptedException
- {
- if (!isOpen())
- {
- if (callback != null)
- {
+ private void internalWrite(final ByteBuffer bytes,
+ final boolean sync,
+ final IOCallback callback) throws IOException, ActiveMQIOErrorException, InterruptedException {
+ if (!isOpen()) {
+ if (callback != null) {
callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), "File not opened");
}
- else
- {
+ else {
throw ActiveMQJournalBundle.BUNDLE.fileNotOpened();
}
return;
@@ -321,47 +273,36 @@ public final class NIOSequentialFile extends AbstractSequentialFile
position.addAndGet(bytes.limit());
- if (maxIOSemaphore == null || callback == null)
- {
+ if (maxIOSemaphore == null || callback == null) {
// if maxIOSemaphore == null, that means we are not using executors and the writes are synchronous
- try
- {
+ try {
doInternalWrite(bytes, sync, callback);
}
- catch (IOException e)
- {
+ catch (IOException e) {
factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
}
}
- else
- {
+ else {
// This is a flow control on writing, just like maxAIO on libaio
maxIOSemaphore.acquire();
- writerExecutor.execute(new Runnable()
- {
- public void run()
- {
- try
- {
- try
- {
+ writerExecutor.execute(new Runnable() {
+ public void run() {
+ try {
+ try {
doInternalWrite(bytes, sync, callback);
}
- catch (IOException e)
- {
+ catch (IOException e) {
ActiveMQJournalLogger.LOGGER.errorSubmittingWrite(e);
factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), NIOSequentialFile.this);
callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), e.getMessage());
}
- catch (Throwable e)
- {
+ catch (Throwable e) {
ActiveMQJournalLogger.LOGGER.errorSubmittingWrite(e);
callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), e.getMessage());
}
}
- finally
- {
+ finally {
maxIOSemaphore.release();
}
}
@@ -376,17 +317,16 @@ public final class NIOSequentialFile extends AbstractSequentialFile
* @throws IOException
* @throws Exception
*/
- private void doInternalWrite(final ByteBuffer bytes, final boolean sync, final IOCallback callback) throws IOException
- {
+ private void doInternalWrite(final ByteBuffer bytes,
+ final boolean sync,
+ final IOCallback callback) throws IOException {
channel.write(bytes);
- if (sync)
- {
+ if (sync) {
sync();
}
- if (callback != null)
- {
+ if (callback != null) {
callback.done();
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java
index e64e405..36962b7 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java
@@ -25,40 +25,25 @@ import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.journal.impl.JournalConstants;
-public class NIOSequentialFileFactory extends AbstractSequentialFileFactory
-{
- public NIOSequentialFileFactory(final File journalDir, final int maxIO)
- {
+public class NIOSequentialFileFactory extends AbstractSequentialFileFactory {
+
+ public NIOSequentialFileFactory(final File journalDir, final int maxIO) {
this(journalDir, null, maxIO);
}
- public NIOSequentialFileFactory(final File journalDir, final IOCriticalErrorListener listener, final int maxIO)
- {
- this(journalDir,
- false,
- JournalConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO,
- JournalConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO,
- maxIO,
- false,
- listener);
+ public NIOSequentialFileFactory(final File journalDir, final IOCriticalErrorListener listener, final int maxIO) {
+ this(journalDir, false, JournalConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO, JournalConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO, maxIO, false, listener);
}
- public NIOSequentialFileFactory(final File journalDir, final boolean buffered, final int maxIO)
- {
+ public NIOSequentialFileFactory(final File journalDir, final boolean buffered, final int maxIO) {
this(journalDir, buffered, null, maxIO);
}
public NIOSequentialFileFactory(final File journalDir,
final boolean buffered,
- final IOCriticalErrorListener listener, final int maxIO)
- {
- this(journalDir,
- buffered,
- JournalConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO,
- JournalConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO,
- maxIO,
- false,
- listener);
+ final IOCriticalErrorListener listener,
+ final int maxIO) {
+ this(journalDir, buffered, JournalConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO, JournalConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO, maxIO, false, listener);
}
public NIOSequentialFileFactory(final File journalDir,
@@ -66,8 +51,7 @@ public class NIOSequentialFileFactory extends AbstractSequentialFileFactory
final int bufferSize,
final int bufferTimeout,
final int maxIO,
- final boolean logRates)
- {
+ final boolean logRates) {
this(journalDir, buffered, bufferSize, bufferTimeout, maxIO, logRates, null);
}
@@ -77,48 +61,38 @@ public class NIOSequentialFileFactory extends AbstractSequentialFileFactory
final int bufferTimeout,
final int maxIO,
final boolean logRates,
- final IOCriticalErrorListener listener)
- {
+ final IOCriticalErrorListener listener) {
super(journalDir, buffered, bufferSize, bufferTimeout, maxIO, logRates, listener);
}
- public SequentialFile createSequentialFile(final String fileName)
- {
+ public SequentialFile createSequentialFile(final String fileName) {
return new NIOSequentialFile(this, journalDir, fileName, maxIO, writeExecutor);
}
- public boolean isSupportsCallbacks()
- {
+ public boolean isSupportsCallbacks() {
return timedBuffer != null;
}
-
- public ByteBuffer allocateDirectBuffer(final int size)
- {
+ public ByteBuffer allocateDirectBuffer(final int size) {
// Using direct buffer, as described on https://jira.jboss.org/browse/HORNETQ-467
ByteBuffer buffer2 = null;
- try
- {
+ try {
buffer2 = ByteBuffer.allocateDirect(size);
}
- catch (OutOfMemoryError error)
- {
+ catch (OutOfMemoryError error) {
// This is a workaround for the way the JDK will deal with native buffers.
// the main portion is outside of the VM heap
// and the JDK will not have any reference about it to take GC into account
// so we force a GC and try again.
WeakReference<Object> obj = new WeakReference<Object>(new Object());
- try
- {
+ try {
long timeout = System.currentTimeMillis() + 5000;
- while (System.currentTimeMillis() > timeout && obj.get() != null)
- {
+ while (System.currentTimeMillis() > timeout && obj.get() != null) {
System.gc();
Thread.sleep(100);
}
}
- catch (InterruptedException e)
- {
+ catch (InterruptedException e) {
}
buffer2 = ByteBuffer.allocateDirect(size);
@@ -127,41 +101,34 @@ public class NIOSequentialFileFactory extends AbstractSequentialFileFactory
return buffer2;
}
- public void releaseDirectBuffer(ByteBuffer buffer)
- {
+ public void releaseDirectBuffer(ByteBuffer buffer) {
// nothing we can do on this case. we can just have good faith on GC
}
- public ByteBuffer newBuffer(final int size)
- {
+ public ByteBuffer newBuffer(final int size) {
return ByteBuffer.allocate(size);
}
- public void clearBuffer(final ByteBuffer buffer)
- {
+ public void clearBuffer(final ByteBuffer buffer) {
final int limit = buffer.limit();
buffer.rewind();
- for (int i = 0; i < limit; i++)
- {
- buffer.put((byte)0);
+ for (int i = 0; i < limit; i++) {
+ buffer.put((byte) 0);
}
buffer.rewind();
}
- public ByteBuffer wrapBuffer(final byte[] bytes)
- {
+ public ByteBuffer wrapBuffer(final byte[] bytes) {
return ByteBuffer.wrap(bytes);
}
- public int getAlignment()
- {
+ public int getAlignment() {
return 1;
}
- public int calculateBlockSize(final int bytes)
- {
+ public int calculateBlockSize(final int bytes) {
return bytes;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/EncodingSupport.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/EncodingSupport.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/EncodingSupport.java
index 3835754..5bf6839 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/EncodingSupport.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/EncodingSupport.java
@@ -21,8 +21,8 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
/**
* This interface provides encoding support for the Journal.
*/
-public interface EncodingSupport
-{
+public interface EncodingSupport {
+
int getEncodeSize();
void encode(ActiveMQBuffer buffer);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/IOCompletion.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/IOCompletion.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/IOCompletion.java
index d0140f1..ee5e9c3 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/IOCompletion.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/IOCompletion.java
@@ -18,7 +18,7 @@ package org.apache.activemq.artemis.core.journal;
import org.apache.activemq.artemis.core.io.IOCallback;
-public interface IOCompletion extends IOCallback
-{
+public interface IOCompletion extends IOCallback {
+
void storeLineUp();
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java
index 6b0beab..3c1f7fd 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java
@@ -30,10 +30,9 @@ import org.apache.activemq.artemis.core.server.ActiveMQComponent;
* Notice also that even on the callback methods it's possible to pass the sync mode. That will only
* make sense on the NIO operations.
*/
-public interface Journal extends ActiveMQComponent
-{
- enum JournalState
- {
+public interface Journal extends ActiveMQComponent {
+
+ enum JournalState {
STOPPED,
/**
* The journal has some fields initialized and services running. But it is not fully
@@ -63,7 +62,11 @@ public interface Journal extends ActiveMQComponent
void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception;
- void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean sync, IOCompletion completionCallback) throws Exception;
+ void appendAddRecord(long id,
+ byte recordType,
+ EncodingSupport record,
+ boolean sync,
+ IOCompletion completionCallback) throws Exception;
void appendUpdateRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception;
@@ -104,15 +107,14 @@ public interface Journal extends ActiveMQComponent
* @param sync
* @param callback
* @param lineUpContext if appendCommitRecord should call a storeLineUp. This is because the
- * caller may have already taken into account
+ * caller may have already taken into account
* @throws Exception
*/
void appendCommitRecord(long txID, boolean sync, IOCompletion callback, boolean lineUpContext) throws Exception;
/**
- *
* <p>If the system crashed after a prepare was called, it should store information that is required to bring the transaction
- * back to a state it could be committed. </p>
+ * back to a state it could be committed. </p>
*
* <p> transactionData allows you to store any other supporting user-data related to the transaction</p>
*
@@ -122,7 +124,10 @@ public interface Journal extends ActiveMQComponent
*/
void appendPrepareRecord(long txID, EncodingSupport transactionData, boolean sync) throws Exception;
- void appendPrepareRecord(long txID, EncodingSupport transactionData, boolean sync, IOCompletion callback) throws Exception;
+ void appendPrepareRecord(long txID,
+ EncodingSupport transactionData,
+ boolean sync,
+ IOCompletion callback) throws Exception;
void appendPrepareRecord(long txID, byte[] transactionData, boolean sync) throws Exception;
@@ -143,6 +148,7 @@ public interface Journal extends ActiveMQComponent
/**
* Load internal data structures, and remain waiting for synchronization to complete.
+ *
* @param state the current state of the journal, this parameter ensures consistency.
*/
JournalLoadInformation loadSyncOnly(JournalState state) throws Exception;
@@ -170,6 +176,7 @@ public interface Journal extends ActiveMQComponent
* During the synchronization between a live server and backup, we reserve in the backup the
* journal file IDs used in the live server. This call also makes sure the files are created
* empty without any kind of headers added.
+ *
* @param fileIds IDs to reserve for synchronization
* @return map to be filled with id and journal file pairs for <b>synchronization</b>.
* @throws Exception
@@ -184,18 +191,21 @@ public interface Journal extends ActiveMQComponent
/**
* Unlock the Journal and the compacting process.
+ *
* @see Journal#synchronizationLock()
*/
void synchronizationUnlock();
/**
* Force the usage of a new {@link JournalFile}.
+ *
* @throws Exception
*/
void forceMoveNextFile() throws Exception;
/**
* Returns the {@link JournalFile}s in use.
+ *
* @return array with all {@link JournalFile}s in use
*/
JournalFile[] getDataFiles();
@@ -206,6 +216,7 @@ public interface Journal extends ActiveMQComponent
/**
* This method will start compact using the compactorExecutor and block up to timeout seconds
+ *
* @param timeout the timeout in seconds or block forever if {@code <= 0}
* @throws Exception
*/
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/JournalLoadInformation.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/JournalLoadInformation.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/JournalLoadInformation.java
index 7e13336..fe9fdc6 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/JournalLoadInformation.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/JournalLoadInformation.java
@@ -19,15 +19,13 @@ package org.apache.activemq.artemis.core.journal;
/**
* This is a POJO containing information about the journal during load time.
*/
-public class JournalLoadInformation
-{
+public class JournalLoadInformation {
private int numberOfRecords = 0;
private long maxID = -1;
- public JournalLoadInformation()
- {
+ public JournalLoadInformation() {
super();
}
@@ -35,8 +33,7 @@ public class JournalLoadInformation
* @param numberOfRecords
* @param maxID
*/
- public JournalLoadInformation(final int numberOfRecords, final long maxID)
- {
+ public JournalLoadInformation(final int numberOfRecords, final long maxID) {
super();
this.numberOfRecords = numberOfRecords;
this.maxID = maxID;
@@ -45,75 +42,63 @@ public class JournalLoadInformation
/**
* @return the numberOfRecords
*/
- public int getNumberOfRecords()
- {
+ public int getNumberOfRecords() {
return numberOfRecords;
}
/**
* @param numberOfRecords the numberOfRecords to set
*/
- public void setNumberOfRecords(final int numberOfRecords)
- {
+ public void setNumberOfRecords(final int numberOfRecords) {
this.numberOfRecords = numberOfRecords;
}
/**
* @return the maxID
*/
- public long getMaxID()
- {
+ public long getMaxID() {
return maxID;
}
/**
* @param maxID the maxID to set
*/
- public void setMaxID(final long maxID)
- {
+ public void setMaxID(final long maxID) {
this.maxID = maxID;
}
@Override
- public int hashCode()
- {
+ public int hashCode() {
final int prime = 31;
int result = 1;
- result = prime * result + (int)(maxID ^ maxID >>> 32);
+ result = prime * result + (int) (maxID ^ maxID >>> 32);
result = prime * result + numberOfRecords;
return result;
}
@Override
- public boolean equals(final Object obj)
- {
- if (this == obj)
- {
+ public boolean equals(final Object obj) {
+ if (this == obj) {
return true;
}
- if (obj == null)
- {
+ if (obj == null) {
return false;
}
- if (getClass() != obj.getClass())
- {
+ if (getClass() != obj.getClass()) {
return false;
}
- JournalLoadInformation other = (JournalLoadInformation)obj;
- if (maxID != other.maxID)
- {
+ JournalLoadInformation other = (JournalLoadInformation) obj;
+ if (maxID != other.maxID) {
return false;
}
- if (numberOfRecords != other.numberOfRecords)
- {
+ if (numberOfRecords != other.numberOfRecords) {
return false;
}
return true;
}
@Override
- public String toString()
- {
+ public String toString() {
return "JournalLoadInformation [maxID=" + maxID + ", numberOfRecords=" + numberOfRecords + "]";
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/LoaderCallback.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/LoaderCallback.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/LoaderCallback.java
index 4b0e8fa..d2c9d32 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/LoaderCallback.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/LoaderCallback.java
@@ -16,8 +16,8 @@
*/
package org.apache.activemq.artemis.core.journal;
-public interface LoaderCallback extends TransactionFailureCallback
-{
+public interface LoaderCallback extends TransactionFailureCallback {
+
void addPreparedTransaction(PreparedTransactionInfo preparedTransaction);
void addRecord(RecordInfo info);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/PreparedTransactionInfo.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/PreparedTransactionInfo.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/PreparedTransactionInfo.java
index d0934ee..3d82a23 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/PreparedTransactionInfo.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/PreparedTransactionInfo.java
@@ -19,8 +19,8 @@ package org.apache.activemq.artemis.core.journal;
import java.util.ArrayList;
import java.util.List;
-public class PreparedTransactionInfo
-{
+public class PreparedTransactionInfo {
+
public final long id;
public final byte[] extraData;
@@ -29,8 +29,7 @@ public class PreparedTransactionInfo
public final List<RecordInfo> recordsToDelete = new ArrayList<RecordInfo>();
- public PreparedTransactionInfo(final long id, final byte[] extraData)
- {
+ public PreparedTransactionInfo(final long id, final byte[] extraData) {
this.id = id;
this.extraData = extraData;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/RecordInfo.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/RecordInfo.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/RecordInfo.java
index 5805597..ddc5ea6 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/RecordInfo.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/RecordInfo.java
@@ -16,10 +16,13 @@
*/
package org.apache.activemq.artemis.core.journal;
-public class RecordInfo
-{
- public RecordInfo(final long id, final byte userRecordType, final byte[] data, final boolean isUpdate, final short compactCount)
- {
+public class RecordInfo {
+
+ public RecordInfo(final long id,
+ final byte userRecordType,
+ final byte[] data,
+ final boolean isUpdate,
+ final short compactCount) {
this.id = id;
this.userRecordType = userRecordType;
@@ -46,22 +49,18 @@ public class RecordInfo
public boolean isUpdate;
- public byte getUserRecordType()
- {
+ public byte getUserRecordType() {
return userRecordType;
}
@Override
- public int hashCode()
- {
+ public int hashCode() {
return (int) (id >>> 32 ^ id);
}
@Override
- public boolean equals(final Object other)
- {
- if (!(other instanceof RecordInfo))
- {
+ public boolean equals(final Object other) {
+ if (!(other instanceof RecordInfo)) {
return false;
}
RecordInfo r = (RecordInfo) other;
@@ -70,8 +69,7 @@ public class RecordInfo
}
@Override
- public String toString()
- {
+ public String toString() {
return "RecordInfo (id=" + id +
", userRecordType = " +
userRecordType +
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/TestableJournal.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/TestableJournal.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/TestableJournal.java
index 3a2bb83..4f8dc4a 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/TestableJournal.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/TestableJournal.java
@@ -18,8 +18,8 @@ package org.apache.activemq.artemis.core.journal;
import org.apache.activemq.artemis.core.journal.impl.JournalFile;
-public interface TestableJournal extends Journal
-{
+public interface TestableJournal extends Journal {
+
int getDataFilesCount();
int getFreeFilesCount();
@@ -58,6 +58,7 @@ public interface TestableJournal extends Journal
* It will among other things, remove stale files and make them available for reuse.
* <p>
* This method locks the journal.
+ *
* @return true if it needs to re-check due to cleanup or other factors
*/
boolean checkReclaimStatus() throws Exception;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/TransactionFailureCallback.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/TransactionFailureCallback.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/TransactionFailureCallback.java
index c4dbc00..a64a8a7 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/TransactionFailureCallback.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/TransactionFailureCallback.java
@@ -21,11 +21,12 @@ import java.util.List;
/**
* A Callback to receive information about bad transactions for extra cleanup required for broken transactions such as large messages.
*/
-public interface TransactionFailureCallback
-{
+public interface TransactionFailureCallback {
- /** To be used to inform about transactions without commit records.
- * This could be used to remove extra resources associated with the transactions (such as external files received during the transaction) */
+ /**
+ * To be used to inform about transactions without commit records.
+ * This could be used to remove extra resources associated with the transactions (such as external files received during the transaction)
+ */
void failedTransaction(long transactionID, List<RecordInfo> records, List<RecordInfo> recordsToDelete);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java
index b36a0c4..6e9bc69 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java
@@ -32,11 +32,9 @@ import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalInternalR
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
/**
- *
* Super class for Journal maintenances such as clean up and Compactor
*/
-public abstract class AbstractJournalUpdateTask implements JournalReaderCallback
-{
+public abstract class AbstractJournalUpdateTask implements JournalReaderCallback {
// Constants -----------------------------------------------------
@@ -69,8 +67,7 @@ public abstract class AbstractJournalUpdateTask implements JournalReaderCallback
final JournalImpl journal,
final JournalFilesRepository filesRepository,
final Set<Long> recordsSnapshot,
- final long nextOrderingID)
- {
+ final long nextOrderingID) {
super();
this.journal = journal;
this.filesRepository = filesRepository;
@@ -84,13 +81,11 @@ public abstract class AbstractJournalUpdateTask implements JournalReaderCallback
public static SequentialFile writeControlFile(final SequentialFileFactory fileFactory,
final List<JournalFile> files,
final List<JournalFile> newFiles,
- final List<Pair<String, String>> renames) throws Exception
- {
+ final List<Pair<String, String>> renames) throws Exception {
SequentialFile controlFile = fileFactory.createSequentialFile(AbstractJournalUpdateTask.FILE_COMPACT_CONTROL);
- try
- {
+ try {
controlFile.open(1, false);
JournalImpl.initFileHeader(fileFactory, controlFile, 0, 0);
@@ -99,56 +94,43 @@ public abstract class AbstractJournalUpdateTask implements JournalReaderCallback
// DataFiles first
- if (files == null)
- {
+ if (files == null) {
filesToRename.writeInt(0);
}
- else
- {
+ else {
filesToRename.writeInt(files.size());
- for (JournalFile file : files)
- {
+ for (JournalFile file : files) {
filesToRename.writeUTF(file.getFile().getFileName());
}
}
// New Files second
- if (newFiles == null)
- {
+ if (newFiles == null) {
filesToRename.writeInt(0);
}
- else
- {
+ else {
filesToRename.writeInt(newFiles.size());
- for (JournalFile file : newFiles)
- {
+ for (JournalFile file : newFiles) {
filesToRename.writeUTF(file.getFile().getFileName());
}
}
// Renames from clean up third
- if (renames == null)
- {
+ if (renames == null) {
filesToRename.writeInt(0);
}
- else
- {
+ else {
filesToRename.writeInt(renames.size());
- for (Pair<String, String> rename : renames)
- {
+ for (Pair<String, String> rename : renames) {
filesToRename.writeUTF(rename.getA());
filesToRename.writeUTF(rename.getB());
}
}
- JournalInternalRecord controlRecord = new JournalAddRecord(true,
- 1,
- (byte)0,
- new ByteArrayEncoding(filesToRename.toByteBuffer()
- .array()));
+ JournalInternalRecord controlRecord = new JournalAddRecord(true, 1, (byte) 0, new ByteArrayEncoding(filesToRename.toByteBuffer().array()));
ActiveMQBuffer renameBuffer = ActiveMQBuffers.dynamicBuffer(filesToRename.writerIndex());
@@ -166,17 +148,16 @@ public abstract class AbstractJournalUpdateTask implements JournalReaderCallback
return controlFile;
}
- finally
- {
+ finally {
controlFile.close();
}
}
- /** Write pending output into file */
- public void flush() throws Exception
- {
- if (writingChannel != null)
- {
+ /**
+ * Write pending output into file
+ */
+ public void flush() throws Exception {
+ if (writingChannel != null) {
sequentialFile.position(0);
// To Fix the size of the file
@@ -190,20 +171,19 @@ public abstract class AbstractJournalUpdateTask implements JournalReaderCallback
writingChannel = null;
}
- public boolean lookupRecord(final long id)
- {
+ public boolean lookupRecord(final long id) {
return recordsSnapshot.contains(id);
}
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
+
/**
* @throws Exception
*/
- protected void openFile() throws Exception
- {
+ protected void openFile() throws Exception {
flush();
ByteBuffer bufferWrite = fileFactory.newBuffer(journal.getFileSize());
@@ -221,27 +201,23 @@ public abstract class AbstractJournalUpdateTask implements JournalReaderCallback
JournalImpl.writeHeader(writingChannel, journal.getUserVersion(), currentFile.getFileID());
}
- protected void addToRecordsSnaptshot(final long id)
- {
+ protected void addToRecordsSnaptshot(final long id) {
recordsSnapshot.add(id);
}
/**
* @return the writingChannel
*/
- protected ActiveMQBuffer getWritingChannel()
- {
+ protected ActiveMQBuffer getWritingChannel() {
return writingChannel;
}
- protected void writeEncoder(final JournalInternalRecord record) throws Exception
- {
+ protected void writeEncoder(final JournalInternalRecord record) throws Exception {
record.setFileID(currentFile.getRecordID());
record.encode(getWritingChannel());
}
- protected void writeEncoder(final JournalInternalRecord record, final int txcounter) throws Exception
- {
+ protected void writeEncoder(final JournalInternalRecord record, final int txcounter) throws Exception {
record.setNumberOfRecords(txcounter);
writeEncoder(record);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java
index 5a0f11f..ba3807b 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java
@@ -48,8 +48,8 @@ import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalInternalR
*
* Its main purpose is to store the data as a Journal would, but without verifying records.
*/
-public final class FileWrapperJournal extends JournalBase
-{
+public final class FileWrapperJournal extends JournalBase {
+
private final ReentrantLock lockAppend = new ReentrantLock();
private final ConcurrentMap<Long, AtomicInteger> transactions = new ConcurrentHashMap<Long, AtomicInteger>();
@@ -60,29 +60,25 @@ public final class FileWrapperJournal extends JournalBase
* @param journal
* @throws Exception
*/
- public FileWrapperJournal(Journal journal) throws Exception
- {
+ public FileWrapperJournal(Journal journal) throws Exception {
super(journal.getFileFactory().isSupportsCallbacks(), journal.getFileSize());
- this.journal = (JournalImpl)journal;
+ this.journal = (JournalImpl) journal;
currentFile = this.journal.setUpCurrentFile(JournalImpl.SIZE_HEADER);
}
@Override
- public void start() throws Exception
- {
+ public void start() throws Exception {
throw new UnsupportedOperationException();
}
@Override
- public void stop() throws Exception
- {
+ public void stop() throws Exception {
if (currentFile.getFile().isOpen())
currentFile.getFile().close();
}
@Override
- public boolean isStarted()
- {
+ public boolean isStarted() {
throw new UnsupportedOperationException();
}
@@ -91,8 +87,11 @@ public final class FileWrapperJournal extends JournalBase
// ------------------------
@Override
- public void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean sync, IOCompletion callback) throws Exception
- {
+ public void appendAddRecord(long id,
+ byte recordType,
+ EncodingSupport record,
+ boolean sync,
+ IOCompletion callback) throws Exception {
JournalInternalRecord addRecord = new JournalAddRecord(true, id, recordType, record);
writeRecord(addRecord, sync, callback);
@@ -101,80 +100,81 @@ public final class FileWrapperJournal extends JournalBase
/**
* Write the record to the current file.
*/
- private void writeRecord(JournalInternalRecord encoder, final boolean sync, final IOCompletion callback) throws Exception
- {
+ private void writeRecord(JournalInternalRecord encoder,
+ final boolean sync,
+ final IOCompletion callback) throws Exception {
lockAppend.lock();
- try
- {
- if (callback != null)
- {
+ try {
+ if (callback != null) {
callback.storeLineUp();
}
currentFile = journal.switchFileIfNecessary(encoder.getEncodeSize());
encoder.setFileID(currentFile.getRecordID());
- if (callback != null)
- {
+ if (callback != null) {
currentFile.getFile().write(encoder, sync, callback);
}
- else
- {
+ else {
currentFile.getFile().write(encoder, sync);
}
}
- finally
- {
+ finally {
lockAppend.unlock();
}
}
@Override
- public void appendDeleteRecord(long id, boolean sync, IOCompletion callback) throws Exception
- {
+ public void appendDeleteRecord(long id, boolean sync, IOCompletion callback) throws Exception {
JournalInternalRecord deleteRecord = new JournalDeleteRecord(id);
writeRecord(deleteRecord, sync, callback);
}
@Override
- public void appendDeleteRecordTransactional(long txID, long id, EncodingSupport record) throws Exception
- {
+ public void appendDeleteRecordTransactional(long txID, long id, EncodingSupport record) throws Exception {
count(txID);
JournalInternalRecord deleteRecordTX = new JournalDeleteRecordTX(txID, id, record);
writeRecord(deleteRecordTX, false, null);
}
@Override
- public void appendAddRecordTransactional(long txID, long id, byte recordType, EncodingSupport record) throws Exception
- {
+ public void appendAddRecordTransactional(long txID,
+ long id,
+ byte recordType,
+ EncodingSupport record) throws Exception {
count(txID);
JournalInternalRecord addRecord = new JournalAddRecordTX(true, txID, id, recordType, record);
writeRecord(addRecord, false, null);
}
@Override
- public void
- appendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean sync, IOCompletion callback) throws Exception
- {
+ public void appendUpdateRecord(long id,
+ byte recordType,
+ EncodingSupport record,
+ boolean sync,
+ IOCompletion callback) throws Exception {
JournalInternalRecord updateRecord = new JournalAddRecord(false, id, recordType, record);
writeRecord(updateRecord, sync, callback);
}
@Override
- public void appendUpdateRecordTransactional(long txID, long id, byte recordType, EncodingSupport record) throws Exception
- {
+ public void appendUpdateRecordTransactional(long txID,
+ long id,
+ byte recordType,
+ EncodingSupport record) throws Exception {
count(txID);
JournalInternalRecord updateRecordTX = new JournalAddRecordTX(false, txID, id, recordType, record);
writeRecord(updateRecordTX, false, null);
}
@Override
- public void appendCommitRecord(long txID, boolean sync, IOCompletion callback, boolean lineUpContext) throws Exception
- {
+ public void appendCommitRecord(long txID,
+ boolean sync,
+ IOCompletion callback,
+ boolean lineUpContext) throws Exception {
JournalInternalRecord commitRecord = new JournalCompleteRecordTX(TX_RECORD_TYPE.COMMIT, txID, null);
AtomicInteger value = transactions.remove(Long.valueOf(txID));
- if (value != null)
- {
+ if (value != null) {
commitRecord.setNumberOfRecords(value.get());
}
@@ -182,160 +182,138 @@ public final class FileWrapperJournal extends JournalBase
}
@Override
- public void appendPrepareRecord(long txID, EncodingSupport transactionData, boolean sync, IOCompletion callback) throws Exception
- {
+ public void appendPrepareRecord(long txID,
+ EncodingSupport transactionData,
+ boolean sync,
+ IOCompletion callback) throws Exception {
JournalInternalRecord prepareRecord = new JournalCompleteRecordTX(TX_RECORD_TYPE.PREPARE, txID, transactionData);
AtomicInteger value = transactions.get(Long.valueOf(txID));
- if (value != null)
- {
+ if (value != null) {
prepareRecord.setNumberOfRecords(value.get());
}
writeRecord(prepareRecord, sync, callback);
}
- private int count(long txID) throws ActiveMQException
- {
+ private int count(long txID) throws ActiveMQException {
AtomicInteger defaultValue = new AtomicInteger(1);
AtomicInteger count = transactions.putIfAbsent(Long.valueOf(txID), defaultValue);
- if (count != null)
- {
+ if (count != null) {
return count.incrementAndGet();
}
return defaultValue.get();
}
@Override
- public String toString()
- {
+ public String toString() {
return FileWrapperJournal.class.getName() + "(currentFile=[" + currentFile + "], hash=" + super.toString() + ")";
}
// UNSUPPORTED STUFF
@Override
- public void appendRollbackRecord(long txID, boolean sync, IOCompletion callback) throws Exception
- {
+ public void appendRollbackRecord(long txID, boolean sync, IOCompletion callback) throws Exception {
throw new ActiveMQUnsupportedPacketException();
}
@Override
- public JournalLoadInformation load(LoaderCallback reloadManager) throws Exception
- {
+ public JournalLoadInformation load(LoaderCallback reloadManager) throws Exception {
throw new ActiveMQUnsupportedPacketException();
}
@Override
- public JournalLoadInformation loadInternalOnly() throws Exception
- {
+ public JournalLoadInformation loadInternalOnly() throws Exception {
throw new ActiveMQUnsupportedPacketException();
}
@Override
- public void lineUpContext(IOCompletion callback)
- {
+ public void lineUpContext(IOCompletion callback) {
throw new UnsupportedOperationException();
}
@Override
public JournalLoadInformation load(List<RecordInfo> committedRecords,
- List<PreparedTransactionInfo> preparedTransactions, TransactionFailureCallback transactionFailure) throws Exception
- {
+ List<PreparedTransactionInfo> preparedTransactions,
+ TransactionFailureCallback transactionFailure) throws Exception {
throw new ActiveMQUnsupportedPacketException();
}
@Override
- public int getAlignment() throws Exception
- {
+ public int getAlignment() throws Exception {
throw new ActiveMQUnsupportedPacketException();
}
@Override
- public int getNumberOfRecords()
- {
+ public int getNumberOfRecords() {
throw new UnsupportedOperationException();
}
@Override
- public int getUserVersion()
- {
+ public int getUserVersion() {
throw new UnsupportedOperationException();
}
@Override
- public void perfBlast(int pages)
- {
+ public void perfBlast(int pages) {
throw new UnsupportedOperationException();
}
@Override
- public void runDirectJournalBlast() throws Exception
- {
+ public void runDirectJournalBlast() throws Exception {
throw new UnsupportedOperationException();
}
@Override
- public JournalLoadInformation loadSyncOnly(JournalState state) throws Exception
- {
+ public JournalLoadInformation loadSyncOnly(JournalState state) throws Exception {
throw new UnsupportedOperationException();
}
@Override
- public Map<Long, JournalFile> createFilesForBackupSync(long[] fileIds) throws Exception
- {
+ public Map<Long, JournalFile> createFilesForBackupSync(long[] fileIds) throws Exception {
throw new UnsupportedOperationException();
}
@Override
- public void synchronizationLock()
- {
+ public void synchronizationLock() {
throw new UnsupportedOperationException();
}
@Override
- public void synchronizationUnlock()
- {
+ public void synchronizationUnlock() {
throw new UnsupportedOperationException();
}
@Override
- public void forceMoveNextFile()
- {
+ public void forceMoveNextFile() {
throw new UnsupportedOperationException();
}
@Override
- public JournalFile[] getDataFiles()
- {
+ public JournalFile[] getDataFiles() {
throw new UnsupportedOperationException();
}
@Override
- void scheduleReclaim()
- {
+ void scheduleReclaim() {
// no-op
}
@Override
- public SequentialFileFactory getFileFactory()
- {
+ public SequentialFileFactory getFileFactory() {
throw new UnsupportedOperationException();
}
@Override
- public void scheduleCompactAndBlock(int timeout) throws Exception
- {
+ public void scheduleCompactAndBlock(int timeout) throws Exception {
throw new UnsupportedOperationException();
}
@Override
- public void replicationSyncPreserveOldFiles()
- {
+ public void replicationSyncPreserveOldFiles() {
throw new UnsupportedOperationException();
}
@Override
- public void replicationSyncFinished()
- {
+ public void replicationSyncFinished() {
throw new UnsupportedOperationException();
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java
index 1ba8f0b..5a844f3 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java
@@ -23,170 +23,175 @@ import org.apache.activemq.artemis.core.journal.IOCompletion;
import org.apache.activemq.artemis.core.journal.Journal;
import org.apache.activemq.artemis.core.journal.impl.dataformat.ByteArrayEncoding;
-abstract class JournalBase implements Journal
-{
+abstract class JournalBase implements Journal {
protected final int fileSize;
private final boolean supportsCallback;
- public JournalBase(boolean supportsCallback, int fileSize)
- {
- if (fileSize < JournalImpl.MIN_FILE_SIZE)
- {
+ public JournalBase(boolean supportsCallback, int fileSize) {
+ if (fileSize < JournalImpl.MIN_FILE_SIZE) {
throw new IllegalArgumentException("File size cannot be less than " + JournalImpl.MIN_FILE_SIZE + " bytes");
}
this.supportsCallback = supportsCallback;
this.fileSize = fileSize;
}
- public abstract void appendAddRecord(final long id, final byte recordType, final EncodingSupport record,
- final boolean sync, final IOCompletion callback) throws Exception;
+ public abstract void appendAddRecord(final long id,
+ final byte recordType,
+ final EncodingSupport record,
+ final boolean sync,
+ final IOCompletion callback) throws Exception;
- public abstract void appendAddRecordTransactional(final long txID, final long id, final byte recordType,
+ public abstract void appendAddRecordTransactional(final long txID,
+ final long id,
+ final byte recordType,
final EncodingSupport record) throws Exception;
- public abstract void appendCommitRecord(final long txID, final boolean sync, final IOCompletion callback,
+ public abstract void appendCommitRecord(final long txID,
+ final boolean sync,
+ final IOCompletion callback,
boolean lineUpContext) throws Exception;
- public abstract void appendDeleteRecord(final long id, final boolean sync, final IOCompletion callback) throws Exception;
+ public abstract void appendDeleteRecord(final long id,
+ final boolean sync,
+ final IOCompletion callback) throws Exception;
- public abstract void appendDeleteRecordTransactional(final long txID, final long id, final EncodingSupport record) throws Exception;
+ public abstract void appendDeleteRecordTransactional(final long txID,
+ final long id,
+ final EncodingSupport record) throws Exception;
- public abstract void appendPrepareRecord(final long txID, final EncodingSupport transactionData, final boolean sync,
+ public abstract void appendPrepareRecord(final long txID,
+ final EncodingSupport transactionData,
+ final boolean sync,
final IOCompletion callback) throws Exception;
- public abstract void appendUpdateRecord(final long id, final byte recordType, final EncodingSupport record,
- final boolean sync, final IOCompletion callback) throws Exception;
+ public abstract void appendUpdateRecord(final long id,
+ final byte recordType,
+ final EncodingSupport record,
+ final boolean sync,
+ final IOCompletion callback) throws Exception;
- public abstract void appendUpdateRecordTransactional(final long txID, final long id, final byte recordType,
+ public abstract void appendUpdateRecordTransactional(final long txID,
+ final long id,
+ final byte recordType,
final EncodingSupport record) throws Exception;
- public abstract void appendRollbackRecord(final long txID, final boolean sync, final IOCompletion callback) throws Exception;
-
+ public abstract void appendRollbackRecord(final long txID,
+ final boolean sync,
+ final IOCompletion callback) throws Exception;
- public void appendAddRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception
- {
+ public void appendAddRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception {
appendAddRecord(id, recordType, new ByteArrayEncoding(record), sync);
}
- public void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception
- {
+ public void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception {
SyncIOCompletion callback = getSyncCallback(sync);
appendAddRecord(id, recordType, record, sync, callback);
- if (callback != null)
- {
+ if (callback != null) {
callback.waitCompletion();
}
}
- public void appendCommitRecord(final long txID, final boolean sync) throws Exception
- {
+ public void appendCommitRecord(final long txID, final boolean sync) throws Exception {
SyncIOCompletion syncCompletion = getSyncCallback(sync);
appendCommitRecord(txID, sync, syncCompletion, true);
- if (syncCompletion != null)
- {
+ if (syncCompletion != null) {
syncCompletion.waitCompletion();
}
}
- public void appendCommitRecord(final long txID, final boolean sync, final IOCompletion callback) throws Exception
- {
+ public void appendCommitRecord(final long txID, final boolean sync, final IOCompletion callback) throws Exception {
appendCommitRecord(txID, sync, callback, true);
}
- public void appendUpdateRecord(final long id, final byte recordType, final byte[] record, final boolean sync) throws Exception
- {
+ public void appendUpdateRecord(final long id,
+ final byte recordType,
+ final byte[] record,
+ final boolean sync) throws Exception {
appendUpdateRecord(id, recordType, new ByteArrayEncoding(record), sync);
}
- public void appendUpdateRecordTransactional(final long txID, final long id, final byte recordType,
- final byte[] record) throws Exception
- {
+ public void appendUpdateRecordTransactional(final long txID,
+ final long id,
+ final byte recordType,
+ final byte[] record) throws Exception {
appendUpdateRecordTransactional(txID, id, recordType, new ByteArrayEncoding(record));
}
- public void appendAddRecordTransactional(final long txID, final long id, final byte recordType, final byte[] record) throws Exception
- {
+ public void appendAddRecordTransactional(final long txID,
+ final long id,
+ final byte recordType,
+ final byte[] record) throws Exception {
appendAddRecordTransactional(txID, id, recordType, new ByteArrayEncoding(record));
}
- public void appendDeleteRecordTransactional(final long txID, final long id) throws Exception
- {
+ public void appendDeleteRecordTransactional(final long txID, final long id) throws Exception {
appendDeleteRecordTransactional(txID, id, NullEncoding.instance);
}
- public void appendPrepareRecord(final long txID, final byte[] transactionData, final boolean sync) throws Exception
- {
+ public void appendPrepareRecord(final long txID, final byte[] transactionData, final boolean sync) throws Exception {
appendPrepareRecord(txID, new ByteArrayEncoding(transactionData), sync);
}
- public void appendPrepareRecord(final long txID, final EncodingSupport transactionData, final boolean sync) throws Exception
- {
+ public void appendPrepareRecord(final long txID,
+ final EncodingSupport transactionData,
+ final boolean sync) throws Exception {
SyncIOCompletion syncCompletion = getSyncCallback(sync);
appendPrepareRecord(txID, transactionData, sync, syncCompletion);
- if (syncCompletion != null)
- {
+ if (syncCompletion != null) {
syncCompletion.waitCompletion();
}
}
- public void appendDeleteRecordTransactional(final long txID, final long id, final byte[] record) throws Exception
- {
+ public void appendDeleteRecordTransactional(final long txID, final long id, final byte[] record) throws Exception {
appendDeleteRecordTransactional(txID, id, new ByteArrayEncoding(record));
}
- public void
- appendUpdateRecord(final long id, final byte recordType, final EncodingSupport record, final boolean sync) throws Exception
- {
+ public void appendUpdateRecord(final long id,
+ final byte recordType,
+ final EncodingSupport record,
+ final boolean sync) throws Exception {
SyncIOCompletion callback = getSyncCallback(sync);
appendUpdateRecord(id, recordType, record, sync, callback);
- if (callback != null)
- {
+ if (callback != null) {
callback.waitCompletion();
}
}
- public void appendRollbackRecord(final long txID, final boolean sync) throws Exception
- {
+ public void appendRollbackRecord(final long txID, final boolean sync) throws Exception {
SyncIOCompletion syncCompletion = getSyncCallback(sync);
appendRollbackRecord(txID, sync, syncCompletion);
- if (syncCompletion != null)
- {
+ if (syncCompletion != null) {
syncCompletion.waitCompletion();
}
}
- public void appendDeleteRecord(final long id, final boolean sync) throws Exception
- {
+ public void appendDeleteRecord(final long id, final boolean sync) throws Exception {
SyncIOCompletion callback = getSyncCallback(sync);
appendDeleteRecord(id, sync, callback);
- if (callback != null)
- {
+ if (callback != null) {
callback.waitCompletion();
}
}
abstract void scheduleReclaim();
- protected SyncIOCompletion getSyncCallback(final boolean sync)
- {
- if (supportsCallback)
- {
- if (sync)
- {
+ protected SyncIOCompletion getSyncCallback(final boolean sync) {
+ if (supportsCallback) {
+ if (sync) {
return new SimpleWaitIOCallback();
}
return DummyCallback.getInstance();
@@ -194,29 +199,24 @@ abstract class JournalBase implements Journal
return null;
}
- private static final class NullEncoding implements EncodingSupport
- {
+ private static final class NullEncoding implements EncodingSupport {
private static NullEncoding instance = new NullEncoding();
- public void decode(final ActiveMQBuffer buffer)
- {
+ public void decode(final ActiveMQBuffer buffer) {
// no-op
}
- public void encode(final ActiveMQBuffer buffer)
- {
+ public void encode(final ActiveMQBuffer buffer) {
// no-op
}
- public int getEncodeSize()
- {
+ public int getEncodeSize() {
return 0;
}
}
- public int getFileSize()
- {
+ public int getFileSize() {
return fileSize;
}
}