You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by an...@apache.org on 2015/07/30 11:14:28 UTC
[6/9] activemq-artemis git commit: ARTEMIS-163 First pass on the
native AIO refactoring
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/IOCompletion.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/IOCompletion.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/IOCompletion.java
index b85a845..d0140f1 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/IOCompletion.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/IOCompletion.java
@@ -16,7 +16,9 @@
*/
package org.apache.activemq.artemis.core.journal;
-public interface IOCompletion extends IOAsyncTask
+import org.apache.activemq.artemis.core.io.IOCallback;
+
+public interface IOCompletion extends IOCallback
{
void storeLineUp();
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/IOCriticalErrorListener.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/IOCriticalErrorListener.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/IOCriticalErrorListener.java
deleted file mode 100644
index fc0bbf9..0000000
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/IOCriticalErrorListener.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.journal;
-
-public interface IOCriticalErrorListener
-{
- void onIOException(Exception code, String message, SequentialFile file);
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java
index f3335b0..6b0beab 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.journal;
import java.util.List;
import java.util.Map;
+import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.journal.impl.JournalFile;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/SequentialFile.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/SequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/SequentialFile.java
deleted file mode 100644
index 34e6d02..0000000
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/SequentialFile.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.journal;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.api.core.ActiveMQException;
-import org.apache.activemq.artemis.core.journal.impl.TimedBuffer;
-
-public interface SequentialFile
-{
- /*
- * Creates the file if it doesn't already exist, then opens it
- */
- void open() throws Exception;
-
- boolean isOpen();
-
- boolean exists();
-
- /**
- * The maximum number of simultaneous writes accepted
- * @param maxIO
- * @throws Exception
- */
- void open(int maxIO, boolean useExecutor) throws Exception;
-
- boolean fits(int size);
-
- int getAlignment() throws Exception;
-
- int calculateBlockStart(int position) throws Exception;
-
- String getFileName();
-
- void fill(int position, int size, byte fillCharacter) throws Exception;
-
- void delete() throws IOException, InterruptedException, ActiveMQException;
-
- void write(ActiveMQBuffer bytes, boolean sync, IOAsyncTask callback) throws Exception;
-
- void write(ActiveMQBuffer bytes, boolean sync) throws Exception;
-
- void write(EncodingSupport bytes, boolean sync, IOAsyncTask callback) throws Exception;
-
- void write(EncodingSupport bytes, boolean sync) throws Exception;
-
- /**
- * Write directly to the file without using any buffer
- * @param bytes the ByteBuffer must be compatible with the SequentialFile implementation (AIO or
- * NIO). To be safe, use a buffer from the corresponding
- * {@link SequentialFileFactory#newBuffer(int)}.
- */
- void writeDirect(ByteBuffer bytes, boolean sync, IOAsyncTask callback);
-
- /**
- * Write directly to the file without using any buffer
- * @param bytes the ByteBuffer must be compatible with the SequentialFile implementation (AIO or
- * NIO). To be safe, use a buffer from the corresponding
- * {@link SequentialFileFactory#newBuffer(int)}.
- */
- void writeDirect(ByteBuffer bytes, boolean sync) throws Exception;
-
- /**
- * Write directly to the file. This is used by compacting and other places where we write a big
- * buffer in a single shot. writeInternal should always block until the entire write is sync on
- * disk.
- * @param bytes the ByteBuffer must be compatible with the SequentialFile implementation (AIO or
- * NIO). To be safe, use a buffer from the corresponding
- * {@link SequentialFileFactory#newBuffer(int)}.
- */
- void writeInternal(ByteBuffer bytes) throws Exception;
-
- /**
- * @param bytes the ByteBuffer must be compatible with the SequentialFile implementation (AIO or
- * NIO). To be safe, use a buffer from the corresponding
- * {@link SequentialFileFactory#newBuffer(int)}.
- */
- int read(ByteBuffer bytes, IOAsyncTask callback) throws Exception;
-
- /**
- * @param bytes the ByteBuffer must be compatible with the SequentialFile implementation (AIO or
- * NIO). To be safe, use a buffer from the corresponding
- * {@link SequentialFileFactory#newBuffer(int)}.
- */
- int read(ByteBuffer bytes) throws Exception;
-
- void position(long pos) throws IOException;
-
- long position();
-
- void close() throws Exception;
-
- void waitForClose() throws Exception;
-
- void sync() throws IOException;
-
- long size() throws Exception;
-
- void renameTo(String newFileName) throws Exception;
-
- SequentialFile cloneFile();
-
- void copyTo(SequentialFile newFileName) throws Exception;
-
- void setTimedBuffer(TimedBuffer buffer);
-
- /**
- * Returns a native File of the file underlying this sequential file.
- */
- File getJavaFile();
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/SequentialFileFactory.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/SequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/SequentialFileFactory.java
deleted file mode 100644
index cb47bd9..0000000
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/SequentialFileFactory.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.journal;
-
-import java.io.File;
-import java.nio.ByteBuffer;
-import java.util.List;
-
-/**
- *
- * A SequentialFileFactory
- */
-public interface SequentialFileFactory
-{
- SequentialFile createSequentialFile(String fileName, int maxIO);
-
- /**
- * Lists files that end with the given extension.
- * <p>
- * This method inserts a ".' before the extension.
- * @param extension
- * @return
- * @throws Exception
- */
- List<String> listFiles(String extension) throws Exception;
-
- boolean isSupportsCallbacks();
-
- /** The SequentialFile will call this method when a disk IO Error happens during the live phase. */
- void onIOError(Exception exception, String message, SequentialFile file);
-
- /** used for cases where you need direct buffer outside of the journal context.
- * This is because the native layer has a method that can be reused in certain cases like paging */
- ByteBuffer allocateDirectBuffer(int size);
-
- /** used for cases where you need direct buffer outside of the journal context.
- * This is because the native layer has a method that can be reused in certain cases like paging */
- void releaseDirectBuffer(ByteBuffer buffer);
-
- /**
- * Note: You need to release the buffer if is used for reading operations. You don't need to do
- * it if using writing operations (AIO Buffer Lister will take of writing operations)
- * @param size
- * @return the allocated ByteBuffer
- */
- ByteBuffer newBuffer(int size);
-
- void releaseBuffer(ByteBuffer buffer);
-
- void activateBuffer(SequentialFile file);
-
- void deactivateBuffer();
-
- // To be used in tests only
- ByteBuffer wrapBuffer(byte[] bytes);
-
- int getAlignment();
-
- int calculateBlockSize(int bytes);
-
- File getDirectory();
-
- void clearBuffer(ByteBuffer buffer);
-
- void start();
-
- void stop();
-
- /**
- * Creates the directory if it does not exist yet.
- */
- void createDirs() throws Exception;
-
- void flush();
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AIOSequentialFile.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AIOSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AIOSequentialFile.java
deleted file mode 100644
index acef8a5..0000000
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AIOSequentialFile.java
+++ /dev/null
@@ -1,326 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.journal.impl;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.concurrent.Executor;
-
-import org.apache.activemq.artemis.api.core.ActiveMQException;
-import org.apache.activemq.artemis.core.asyncio.AsynchronousFile;
-import org.apache.activemq.artemis.core.asyncio.BufferCallback;
-import org.apache.activemq.artemis.core.asyncio.IOExceptionListener;
-import org.apache.activemq.artemis.core.asyncio.impl.AsynchronousFileImpl;
-import org.apache.activemq.artemis.core.journal.IOAsyncTask;
-import org.apache.activemq.artemis.core.journal.SequentialFile;
-import org.apache.activemq.artemis.core.journal.SequentialFileFactory;
-
-public class AIOSequentialFile extends AbstractSequentialFile implements IOExceptionListener
-{
- private boolean opened = false;
-
- private final int maxIO;
-
- private AsynchronousFile aioFile;
-
- private final BufferCallback bufferCallback;
-
- /** The pool for Thread pollers */
- private final Executor pollerExecutor;
-
- public AIOSequentialFile(final SequentialFileFactory factory,
- final int bufferSize,
- final long bufferTimeoutMilliseconds,
- final File directory,
- final String fileName,
- final int maxIO,
- final BufferCallback bufferCallback,
- final Executor writerExecutor,
- final Executor pollerExecutor)
- {
- super(directory, fileName, factory, writerExecutor);
- this.maxIO = maxIO;
- this.bufferCallback = bufferCallback;
- this.pollerExecutor = pollerExecutor;
- }
-
- public boolean isOpen()
- {
- return opened;
- }
-
- public int getAlignment()
- {
- checkOpened();
-
- return aioFile.getBlockSize();
- }
-
- public int calculateBlockStart(final int position)
- {
- int alignment = getAlignment();
-
- int pos = (position / alignment + (position % alignment != 0 ? 1 : 0)) * alignment;
-
- return pos;
- }
-
- public SequentialFile cloneFile()
- {
- return new AIOSequentialFile(factory,
- -1,
- -1,
- getFile().getParentFile(),
- getFile().getName(),
- maxIO,
- bufferCallback,
- writerExecutor,
- pollerExecutor);
- }
-
- @Override
- public synchronized void close() throws IOException, InterruptedException, ActiveMQException
- {
- if (!opened)
- {
- return;
- }
-
- super.close();
-
- opened = false;
-
- timedBuffer = null;
-
- aioFile.close();
- aioFile = null;
-
- notifyAll();
- }
-
- @Override
- public synchronized void waitForClose() throws Exception
- {
- while (isOpen())
- {
- wait();
- }
- }
-
- public void fill(final int position, final int size, final byte fillCharacter) throws Exception
- {
- checkOpened();
-
- int fileblockSize = aioFile.getBlockSize();
-
- int blockSize = fileblockSize;
-
- if (size % (100 * 1024 * 1024) == 0)
- {
- blockSize = 100 * 1024 * 1024;
- }
- else if (size % (10 * 1024 * 1024) == 0)
- {
- blockSize = 10 * 1024 * 1024;
- }
- else if (size % (1024 * 1024) == 0)
- {
- blockSize = 1024 * 1024;
- }
- else if (size % (10 * 1024) == 0)
- {
- blockSize = 10 * 1024;
- }
- else
- {
- blockSize = fileblockSize;
- }
-
- int blocks = size / blockSize;
-
- if (size % blockSize != 0)
- {
- blocks++;
- }
-
- int filePosition = position;
-
- if (position % fileblockSize != 0)
- {
- filePosition = (position / fileblockSize + 1) * fileblockSize;
- }
-
- aioFile.fill(filePosition, blocks, blockSize, fillCharacter);
-
- fileSize = aioFile.size();
- }
-
- public void open() throws Exception
- {
- open(maxIO, true);
- }
-
- public synchronized void open(final int maxIO, final boolean useExecutor) throws ActiveMQException
- {
- opened = true;
-
- aioFile = new AsynchronousFileImpl(useExecutor ? writerExecutor : null, pollerExecutor, this);
-
- try
- {
- aioFile.open(getFile().getAbsolutePath(), maxIO);
- }
- catch (ActiveMQException e)
- {
- factory.onIOError(e, e.getMessage(), this);
- throw e;
- }
-
- position.set(0);
-
- aioFile.setBufferCallback(bufferCallback);
-
- fileSize = aioFile.size();
- }
-
- public void setBufferCallback(final BufferCallback callback)
- {
- aioFile.setBufferCallback(callback);
- }
-
- public int read(final ByteBuffer bytes, final IOAsyncTask callback) throws ActiveMQException
- {
- int bytesToRead = bytes.limit();
-
- long positionToRead = position.getAndAdd(bytesToRead);
-
- bytes.rewind();
-
- aioFile.read(positionToRead, bytesToRead, bytes, callback);
-
- return bytesToRead;
- }
-
- public int read(final ByteBuffer bytes) throws Exception
- {
- SimpleWaitIOCallback waitCompletion = new SimpleWaitIOCallback();
-
- int bytesRead = read(bytes, waitCompletion);
-
- waitCompletion.waitCompletion();
-
- return bytesRead;
- }
-
- public void sync()
- {
- throw new UnsupportedOperationException("This method is not supported on AIO");
- }
-
- public long size() throws Exception
- {
- if (aioFile == null)
- {
- return getFile().length();
- }
- else
- {
- return aioFile.size();
- }
- }
-
- @Override
- public String toString()
- {
- return "AIOSequentialFile:" + getFile().getAbsolutePath();
- }
-
- // Public methods
- // -----------------------------------------------------------------------------------------------------
-
- @Override
- public void onIOException(Exception code, String message)
- {
- factory.onIOError(code, message, this);
- }
-
-
- public void writeDirect(final ByteBuffer bytes, final boolean sync) throws Exception
- {
- if (sync)
- {
- SimpleWaitIOCallback completion = new SimpleWaitIOCallback();
-
- writeDirect(bytes, true, completion);
-
- completion.waitCompletion();
- }
- else
- {
- writeDirect(bytes, false, DummyCallback.getInstance());
- }
- }
-
- /**
- *
- * @param sync Not used on AIO
- * */
- public void writeDirect(final ByteBuffer bytes, final boolean sync, final IOAsyncTask callback)
- {
- final int bytesToWrite = factory.calculateBlockSize(bytes.limit());
-
- final long positionToWrite = position.getAndAdd(bytesToWrite);
-
- aioFile.write(positionToWrite, bytesToWrite, bytes, callback);
- }
-
- public void writeInternal(final ByteBuffer bytes) throws ActiveMQException
- {
- final int bytesToWrite = factory.calculateBlockSize(bytes.limit());
-
- final long positionToWrite = position.getAndAdd(bytesToWrite);
-
- aioFile.writeInternal(positionToWrite, bytesToWrite, bytes);
- }
-
- // Protected methods
- // -----------------------------------------------------------------------------------------------------
-
- @Override
- protected ByteBuffer newBuffer(int size, int limit)
- {
- size = factory.calculateBlockSize(size);
- limit = factory.calculateBlockSize(limit);
-
- ByteBuffer buffer = factory.newBuffer(size);
- buffer.limit(limit);
- return buffer;
- }
-
- // Private methods
- // -----------------------------------------------------------------------------------------------------
-
- private void checkOpened()
- {
- if (aioFile == null || !opened)
- {
- throw new IllegalStateException("File not opened");
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AIOSequentialFileFactory.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AIOSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AIOSequentialFileFactory.java
deleted file mode 100644
index 65e6a6f..0000000
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AIOSequentialFileFactory.java
+++ /dev/null
@@ -1,358 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.journal.impl;
-
-import java.io.File;
-import java.nio.ByteBuffer;
-import java.security.AccessController;
-import java.security.PrivilegedAction;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
-import org.apache.activemq.artemis.core.asyncio.BufferCallback;
-import org.apache.activemq.artemis.core.asyncio.impl.AsynchronousFileImpl;
-import org.apache.activemq.artemis.core.journal.IOCriticalErrorListener;
-import org.apache.activemq.artemis.core.journal.SequentialFile;
-import org.apache.activemq.artemis.core.libaio.Native;
-import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
-import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
-
-public final class AIOSequentialFileFactory extends AbstractSequentialFileFactory
-{
- private static final boolean trace = ActiveMQJournalLogger.LOGGER.isTraceEnabled();
-
- private final ReuseBuffersController buffersControl = new ReuseBuffersController();
-
- private ExecutorService pollerExecutor;
-
- // This method exists just to make debug easier.
- // I could replace log.trace by log.info temporarily while I was debugging
- // Journal
- private static void trace(final String message)
- {
- ActiveMQJournalLogger.LOGGER.trace(message);
- }
-
- public AIOSequentialFileFactory(final File journalDir)
- {
- this(journalDir,
- JournalConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO,
- JournalConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO,
- false,
- null);
- }
-
- public AIOSequentialFileFactory(final File journalDir, final IOCriticalErrorListener listener)
- {
- this(journalDir,
- JournalConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO,
- JournalConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO,
- false,
- listener);
- }
-
- public AIOSequentialFileFactory(final File journalDir,
- final int bufferSize,
- final int bufferTimeout,
- final boolean logRates)
- {
- this(journalDir, bufferSize, bufferTimeout, logRates, null);
- }
-
- public AIOSequentialFileFactory(final File journalDir,
- final int bufferSize,
- final int bufferTimeout,
- final boolean logRates,
- final IOCriticalErrorListener listener)
- {
- super(journalDir, true, bufferSize, bufferTimeout, logRates, listener);
- }
-
- public SequentialFile createSequentialFile(final String fileName, final int maxIO)
- {
- return new AIOSequentialFile(this,
- bufferSize,
- bufferTimeout,
- journalDir,
- fileName,
- maxIO,
- buffersControl.callback,
- writeExecutor,
- pollerExecutor);
- }
-
- public boolean isSupportsCallbacks()
- {
- return true;
- }
-
- public static boolean isSupported()
- {
- return AsynchronousFileImpl.isLoaded();
- }
-
- public ByteBuffer allocateDirectBuffer(final int size)
- {
-
- int blocks = size / 512;
- if (size % 512 != 0)
- {
- blocks++;
- }
-
- // The buffer on AIO has to be a multiple of 512
- ByteBuffer buffer = AsynchronousFileImpl.newBuffer(blocks * 512);
-
- buffer.limit(size);
-
- return buffer;
- }
-
- public void releaseDirectBuffer(final ByteBuffer buffer)
- {
- Native.destroyBuffer(buffer);
- }
-
- public ByteBuffer newBuffer(int size)
- {
- if (size % 512 != 0)
- {
- size = (size / 512 + 1) * 512;
- }
-
- return buffersControl.newBuffer(size);
- }
-
- public void clearBuffer(final ByteBuffer directByteBuffer)
- {
- AsynchronousFileImpl.clearBuffer(directByteBuffer);
- }
-
- public int getAlignment()
- {
- return 512;
- }
-
- // For tests only
- public ByteBuffer wrapBuffer(final byte[] bytes)
- {
- ByteBuffer newbuffer = newBuffer(bytes.length);
- newbuffer.put(bytes);
- return newbuffer;
- }
-
- public int calculateBlockSize(final int position)
- {
- int alignment = getAlignment();
-
- int pos = (position / alignment + (position % alignment != 0 ? 1 : 0)) * alignment;
-
- return pos;
- }
-
- /* (non-Javadoc)
- * @see org.apache.activemq.artemis.core.journal.SequentialFileFactory#releaseBuffer(java.nio.ByteBuffer)
- */
- @Override
- public synchronized void releaseBuffer(final ByteBuffer buffer)
- {
- Native.destroyBuffer(buffer);
- }
-
- @Override
- public void start()
- {
- super.start();
-
- pollerExecutor = Executors.newCachedThreadPool(new ActiveMQThreadFactory("ActiveMQ-AIO-poller-pool" + System.identityHashCode(this),
- true,
- AIOSequentialFileFactory.getThisClassLoader()));
-
- }
-
- @Override
- public void stop()
- {
- buffersControl.stop();
-
- if (pollerExecutor != null)
- {
- pollerExecutor.shutdown();
-
- try
- {
- if (!pollerExecutor.awaitTermination(AbstractSequentialFileFactory.EXECUTOR_TIMEOUT, TimeUnit.SECONDS))
- {
- ActiveMQJournalLogger.LOGGER.timeoutOnPollerShutdown(new Exception("trace"));
- }
- }
- catch (InterruptedException e)
- {
- throw new ActiveMQInterruptedException(e);
- }
- }
-
- super.stop();
- }
-
- @Override
- protected void finalize()
- {
- stop();
- }
-
- /**
- * Class that will control buffer-reuse
- */
- private class ReuseBuffersController
- {
- private volatile long bufferReuseLastTime = System.currentTimeMillis();
-
- /**
- * This queue is fed by {@link org.apache.activemq.artemis.core.journal.impl.AIOSequentialFileFactory.ReuseBuffersController.LocalBufferCallback}
- * which is called directly by NIO or NIO. On the case of the AIO this is almost called by the native layer as
- * soon as the buffer is not being used any more and ready to be reused or GCed
- */
- private final ConcurrentLinkedQueue<ByteBuffer> reuseBuffersQueue = new ConcurrentLinkedQueue<ByteBuffer>();
-
- private boolean stopped = false;
-
- final BufferCallback callback = new LocalBufferCallback();
-
- public ByteBuffer newBuffer(final int size)
- {
- // if a new buffer wasn't requested in 10 seconds, we clear the queue
- // This is being done this way as we don't need another Timeout Thread
- // just to cleanup this
- if (bufferSize > 0 && System.currentTimeMillis() - bufferReuseLastTime > 10000)
- {
- if (AIOSequentialFileFactory.trace)
- {
- AIOSequentialFileFactory.trace("Clearing reuse buffers queue with " + reuseBuffersQueue.size() +
- " elements");
- }
-
- bufferReuseLastTime = System.currentTimeMillis();
-
- clearPoll();
- }
-
- // if a buffer is bigger than the configured-bufferSize, we just create a new
- // buffer.
- if (size > bufferSize)
- {
- return AsynchronousFileImpl.newBuffer(size);
- }
- else
- {
- // We need to allocate buffers following the rules of the storage
- // being used (AIO/NIO)
- int alignedSize = calculateBlockSize(size);
-
- // Try getting a buffer from the queue...
- ByteBuffer buffer = reuseBuffersQueue.poll();
-
- if (buffer == null)
- {
- // if empty create a new one.
- buffer = AsynchronousFileImpl.newBuffer(bufferSize);
-
- buffer.limit(alignedSize);
- }
- else
- {
- clearBuffer(buffer);
-
- // set the limit of the buffer to the bufferSize being required
- buffer.limit(alignedSize);
- }
-
- buffer.rewind();
-
- return buffer;
- }
- }
-
- public synchronized void stop()
- {
- stopped = true;
- clearPoll();
- }
-
- public synchronized void clearPoll()
- {
- ByteBuffer reusedBuffer;
-
- while ((reusedBuffer = reuseBuffersQueue.poll()) != null)
- {
- releaseBuffer(reusedBuffer);
- }
- }
-
- private class LocalBufferCallback implements BufferCallback
- {
- public void bufferDone(final ByteBuffer buffer)
- {
- synchronized (ReuseBuffersController.this)
- {
-
- if (stopped)
- {
- releaseBuffer(buffer);
- }
- else
- {
- bufferReuseLastTime = System.currentTimeMillis();
-
- // If a buffer has any other than the configured bufferSize, the buffer
- // will be just sent to GC
- if (buffer.capacity() == bufferSize)
- {
- reuseBuffersQueue.offer(buffer);
- }
- else
- {
- releaseBuffer(buffer);
- }
- }
- }
- }
- }
- }
-
- private static ClassLoader getThisClassLoader()
- {
- return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>()
- {
- public ClassLoader run()
- {
- return AIOSequentialFileFactory.class.getClassLoader();
- }
- });
-
- }
-
- @Override
- public String toString()
- {
- return AIOSequentialFileFactory.class.getSimpleName() + "(buffersControl.stopped=" + buffersControl.stopped +
- "):" + super.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java
index e21b046..b36a0c4 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java
@@ -24,8 +24,8 @@ import java.util.Set;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.Pair;
-import org.apache.activemq.artemis.core.journal.SequentialFile;
-import org.apache.activemq.artemis.core.journal.SequentialFileFactory;
+import org.apache.activemq.artemis.core.io.SequentialFile;
+import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.journal.impl.dataformat.ByteArrayEncoding;
import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalAddRecord;
import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalInternalRecord;
@@ -87,7 +87,7 @@ public abstract class AbstractJournalUpdateTask implements JournalReaderCallback
final List<Pair<String, String>> renames) throws Exception
{
- SequentialFile controlFile = fileFactory.createSequentialFile(AbstractJournalUpdateTask.FILE_COMPACT_CONTROL, 1);
+ SequentialFile controlFile = fileFactory.createSequentialFile(AbstractJournalUpdateTask.FILE_COMPACT_CONTROL);
try
{
@@ -182,7 +182,7 @@ public abstract class AbstractJournalUpdateTask implements JournalReaderCallback
// To Fix the size of the file
writingChannel.writerIndex(writingChannel.capacity());
- sequentialFile.writeInternal(writingChannel.toByteBuffer());
+ sequentialFile.writeDirect(writingChannel.toByteBuffer(), true);
sequentialFile.close();
newDataFiles.add(currentFile);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractSequentialFile.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractSequentialFile.java
deleted file mode 100644
index a4eed58..0000000
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractSequentialFile.java
+++ /dev/null
@@ -1,407 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.journal.impl;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
-import org.apache.activemq.artemis.api.core.ActiveMQException;
-import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
-import org.apache.activemq.artemis.core.journal.EncodingSupport;
-import org.apache.activemq.artemis.core.journal.IOAsyncTask;
-import org.apache.activemq.artemis.core.journal.SequentialFile;
-import org.apache.activemq.artemis.core.journal.SequentialFileFactory;
-import org.apache.activemq.artemis.journal.ActiveMQJournalBundle;
-import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
-
-public abstract class AbstractSequentialFile implements SequentialFile
-{
-
- private File file;
-
- protected final File directory;
-
- protected final SequentialFileFactory factory;
-
- protected long fileSize = 0;
-
- protected final AtomicLong position = new AtomicLong(0);
-
- protected TimedBuffer timedBuffer;
-
- /**
- * Instead of having AIOSequentialFile implementing the Observer, I have done it on an inner class.
- * This is the class returned to the factory when the file is being activated.
- */
- protected final TimedBufferObserver timedBufferObserver = new LocalBufferObserver();
-
- /**
- * Used for asynchronous writes
- */
- protected final Executor writerExecutor;
-
- /**
- * @param file
- * @param directory
- */
- public AbstractSequentialFile(final File directory,
- final String file,
- final SequentialFileFactory factory,
- final Executor writerExecutor)
- {
- super();
- this.file = new File(directory, file);
- this.directory = directory;
- this.factory = factory;
- this.writerExecutor = writerExecutor;
- }
-
- // Public --------------------------------------------------------
-
- public final boolean exists()
- {
- return file.exists();
- }
-
- public final String getFileName()
- {
- return file.getName();
- }
-
- public final void delete() throws IOException, InterruptedException, ActiveMQException
- {
- if (isOpen())
- {
- close();
- }
-
- if (file.exists() && !file.delete())
- {
- ActiveMQJournalLogger.LOGGER.errorDeletingFile(this);
- }
- }
-
- public void copyTo(SequentialFile newFileName) throws Exception
- {
- try
- {
- ActiveMQJournalLogger.LOGGER.debug("Copying " + this + " as " + newFileName);
- if (!newFileName.isOpen())
- {
- newFileName.open();
- }
-
- if (!isOpen())
- {
- this.open();
- }
-
-
- ByteBuffer buffer = ByteBuffer.allocate(10 * 1024);
-
- for (;;)
- {
- buffer.rewind();
- int size = this.read(buffer);
- newFileName.writeDirect(buffer, false);
- if (size < 10 * 1024)
- {
- break;
- }
- }
- newFileName.close();
- this.close();
- }
- catch (IOException e)
- {
- factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
- throw e;
- }
- }
-
- /**
- * @throws IOException only declare exception due to signature. Sub-class needs it.
- */
- @Override
- public void position(final long pos) throws IOException
- {
- position.set(pos);
- }
-
- public long position()
- {
- return position.get();
- }
-
- public final void renameTo(final String newFileName) throws IOException, InterruptedException,
- ActiveMQException
- {
- try
- {
- close();
- }
- catch (IOException e)
- {
- factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
- throw e;
- }
-
- File newFile = new File(directory + "/" + newFileName);
-
- if (!file.equals(newFile))
- {
- if (!file.renameTo(newFile))
- {
- throw ActiveMQJournalBundle.BUNDLE.ioRenameFileError(file.getName(), newFileName);
- }
- file = newFile;
- }
- }
-
- /**
- * @throws IOException we declare throwing IOException because sub-classes need to do it
- * @throws ActiveMQException
- */
- public synchronized void close() throws IOException, InterruptedException, ActiveMQException
- {
- final CountDownLatch donelatch = new CountDownLatch(1);
-
- if (writerExecutor != null)
- {
- writerExecutor.execute(new Runnable()
- {
- public void run()
- {
- donelatch.countDown();
- }
- });
-
- while (!donelatch.await(60, TimeUnit.SECONDS))
- {
- ActiveMQJournalLogger.LOGGER.couldNotCompleteTask(new Exception("trace"), file.getName());
- }
- }
- }
-
- public final boolean fits(final int size)
- {
- if (timedBuffer == null)
- {
- return position.get() + size <= fileSize;
- }
- else
- {
- return timedBuffer.checkSize(size);
- }
- }
-
- public void setTimedBuffer(final TimedBuffer buffer)
- {
- if (timedBuffer != null)
- {
- timedBuffer.setObserver(null);
- }
-
- timedBuffer = buffer;
-
- if (buffer != null)
- {
- buffer.setObserver(timedBufferObserver);
- }
-
- }
-
- public void write(final ActiveMQBuffer bytes, final boolean sync, final IOAsyncTask callback) throws IOException
- {
- if (timedBuffer != null)
- {
- bytes.setIndex(0, bytes.capacity());
- timedBuffer.addBytes(bytes, sync, callback);
- }
- else
- {
- ByteBuffer buffer = factory.newBuffer(bytes.capacity());
- buffer.put(bytes.toByteBuffer().array());
- buffer.rewind();
- writeDirect(buffer, sync, callback);
- }
- }
-
- public void write(final ActiveMQBuffer bytes, final boolean sync) throws IOException, InterruptedException,
- ActiveMQException
- {
- if (sync)
- {
- SimpleWaitIOCallback completion = new SimpleWaitIOCallback();
-
- write(bytes, true, completion);
-
- completion.waitCompletion();
- }
- else
- {
- write(bytes, false, DummyCallback.getInstance());
- }
- }
-
- public void write(final EncodingSupport bytes, final boolean sync, final IOAsyncTask callback)
- {
- if (timedBuffer != null)
- {
- timedBuffer.addBytes(bytes, sync, callback);
- }
- else
- {
- ByteBuffer buffer = factory.newBuffer(bytes.getEncodeSize());
-
- // If not using the TimedBuffer, a final copy is necessary
- // Because AIO will need a specific Buffer
- // And NIO will also need a whole buffer to perform the write
-
- ActiveMQBuffer outBuffer = ActiveMQBuffers.wrappedBuffer(buffer);
- bytes.encode(outBuffer);
- buffer.rewind();
- writeDirect(buffer, sync, callback);
- }
- }
-
- public void write(final EncodingSupport bytes, final boolean sync) throws InterruptedException, ActiveMQException
- {
- if (sync)
- {
- SimpleWaitIOCallback completion = new SimpleWaitIOCallback();
-
- write(bytes, true, completion);
-
- completion.waitCompletion();
- }
- else
- {
- write(bytes, false, DummyCallback.getInstance());
- }
- }
-
- protected File getFile()
- {
- return file;
- }
-
- private static final class DelegateCallback implements IOAsyncTask
- {
- final List<IOAsyncTask> delegates;
-
- private DelegateCallback(final List<IOAsyncTask> delegates)
- {
- this.delegates = delegates;
- }
-
- public void done()
- {
- for (IOAsyncTask callback : delegates)
- {
- try
- {
- callback.done();
- }
- catch (Throwable e)
- {
- ActiveMQJournalLogger.LOGGER.errorCompletingCallback(e);
- }
- }
- }
-
- public void onError(final int errorCode, final String errorMessage)
- {
- for (IOAsyncTask callback : delegates)
- {
- try
- {
- callback.onError(errorCode, errorMessage);
- }
- catch (Throwable e)
- {
- ActiveMQJournalLogger.LOGGER.errorCallingErrorCallback(e);
- }
- }
- }
- }
-
- protected ByteBuffer newBuffer(int size, int limit)
- {
- size = factory.calculateBlockSize(size);
- limit = factory.calculateBlockSize(limit);
-
- ByteBuffer buffer = factory.newBuffer(size);
- buffer.limit(limit);
- return buffer;
- }
-
- protected class LocalBufferObserver implements TimedBufferObserver
- {
- public void flushBuffer(final ByteBuffer buffer, final boolean requestedSync, final List<IOAsyncTask> callbacks)
- {
- buffer.flip();
-
- if (buffer.limit() == 0)
- {
- factory.releaseBuffer(buffer);
- }
- else
- {
- writeDirect(buffer, requestedSync, new DelegateCallback(callbacks));
- }
- }
-
- public ByteBuffer newBuffer(final int size, final int limit)
- {
- return AbstractSequentialFile.this.newBuffer(size, limit);
- }
-
- public int getRemainingBytes()
- {
- if (fileSize - position.get() > Integer.MAX_VALUE)
- {
- return Integer.MAX_VALUE;
- }
- else
- {
- return (int)(fileSize - position.get());
- }
- }
-
- @Override
- public String toString()
- {
- return "TimedBufferObserver on file (" + getFile().getName() + ")";
- }
-
- }
-
- @Override
- public File getJavaFile()
- {
- return getFile().getAbsoluteFile();
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractSequentialFileFactory.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractSequentialFileFactory.java
deleted file mode 100644
index ec0ab4d..0000000
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractSequentialFileFactory.java
+++ /dev/null
@@ -1,218 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.journal.impl;
-
-import java.io.File;
-import java.io.FilenameFilter;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.security.AccessController;
-import java.security.PrivilegedAction;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
-import org.apache.activemq.artemis.core.journal.IOCriticalErrorListener;
-import org.apache.activemq.artemis.core.journal.SequentialFile;
-import org.apache.activemq.artemis.core.journal.SequentialFileFactory;
-import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
-import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
-
-/**
- * An abstract SequentialFileFactory containing basic functionality for both AIO and NIO SequentialFactories
- */
-abstract class AbstractSequentialFileFactory implements SequentialFileFactory
-{
-
- // Timeout used to wait executors to shutdown
- protected static final int EXECUTOR_TIMEOUT = 60;
-
- protected final File journalDir;
-
- protected final TimedBuffer timedBuffer;
-
- protected final int bufferSize;
-
- protected final long bufferTimeout;
-
- private final IOCriticalErrorListener critialErrorListener;
-
- /**
- * Asynchronous writes need to be done at another executor.
- * This needs to be done at NIO, or else we would have the callers thread blocking for the return.
- * At AIO this is necessary as context switches on writes would fire flushes at the kernel.
- * */
- protected ExecutorService writeExecutor;
-
- AbstractSequentialFileFactory(final File journalDir,
- final boolean buffered,
- final int bufferSize,
- final int bufferTimeout,
- final boolean logRates,
- final IOCriticalErrorListener criticalErrorListener)
- {
- this.journalDir = journalDir;
-
- if (buffered)
- {
- timedBuffer = new TimedBuffer(bufferSize, bufferTimeout, logRates);
- }
- else
- {
- timedBuffer = null;
- }
- this.bufferSize = bufferSize;
- this.bufferTimeout = bufferTimeout;
- this.critialErrorListener = criticalErrorListener;
- }
-
- public void stop()
- {
- if (timedBuffer != null)
- {
- timedBuffer.stop();
- }
-
- if (isSupportsCallbacks() && writeExecutor != null)
- {
- writeExecutor.shutdown();
-
- try
- {
- if (!writeExecutor.awaitTermination(AbstractSequentialFileFactory.EXECUTOR_TIMEOUT, TimeUnit.SECONDS))
- {
- ActiveMQJournalLogger.LOGGER.timeoutOnWriterShutdown(new Exception("trace"));
- }
- }
- catch (InterruptedException e)
- {
- throw new ActiveMQInterruptedException(e);
- }
- }
- }
-
- @Override
- public File getDirectory()
- {
- return journalDir;
- }
-
- public void start()
- {
- if (timedBuffer != null)
- {
- timedBuffer.start();
- }
-
- if (isSupportsCallbacks())
- {
- writeExecutor = Executors.newSingleThreadExecutor(new ActiveMQThreadFactory("ActiveMQ-Asynchronous-Persistent-Writes" + System.identityHashCode(this),
- true,
- AbstractSequentialFileFactory.getThisClassLoader()));
- }
-
- }
-
- @Override
- public void onIOError(Exception exception, String message, SequentialFile file)
- {
- if (critialErrorListener != null)
- {
- critialErrorListener.onIOException(exception, message, file);
- }
- }
-
- @Override
- public void activateBuffer(final SequentialFile file)
- {
- if (timedBuffer != null)
- {
- file.setTimedBuffer(timedBuffer);
- }
- }
-
- public void flush()
- {
- if (timedBuffer != null)
- {
- timedBuffer.flush();
- }
- }
-
- public void deactivateBuffer()
- {
- if (timedBuffer != null)
- {
- // When moving to a new file, we need to make sure any pending buffer will be transferred to the buffer
- timedBuffer.flush();
- timedBuffer.setObserver(null);
- }
- }
-
- public void releaseBuffer(final ByteBuffer buffer)
- {
- }
-
- /**
- * Create the directory if it doesn't exist yet
- */
- public void createDirs() throws Exception
- {
- boolean ok = journalDir.mkdirs();
- if (!ok)
- {
- throw new IOException("Failed to create directory " + journalDir);
- }
- }
-
- public List<String> listFiles(final String extension) throws Exception
- {
- FilenameFilter fnf = new FilenameFilter()
- {
- public boolean accept(final File file, final String name)
- {
- return name.endsWith("." + extension);
- }
- };
-
- String[] fileNames = journalDir.list(fnf);
-
- if (fileNames == null)
- {
- return Collections.EMPTY_LIST;
- }
-
- return Arrays.asList(fileNames);
- }
-
- private static ClassLoader getThisClassLoader()
- {
- return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>()
- {
- public ClassLoader run()
- {
- return AbstractSequentialFileFactory.class.getClassLoader();
- }
- });
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/DummyCallback.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/DummyCallback.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/DummyCallback.java
deleted file mode 100644
index 9c4c3d6..0000000
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/DummyCallback.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.journal.impl;
-
-import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
-
-class DummyCallback extends SyncIOCompletion
-{
- private static final DummyCallback instance = new DummyCallback();
-
- public static DummyCallback getInstance()
- {
- return DummyCallback.instance;
- }
-
- public void done()
- {
- }
-
- public void onError(final int errorCode, final String errorMessage)
- {
- ActiveMQJournalLogger.LOGGER.errorWritingData(new Exception(errorMessage), errorMessage, errorCode);
- }
-
- @Override
- public void waitCompletion() throws Exception
- {
- }
-
- @Override
- public void storeLineUp()
- {
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java
index a41e72f..5a0f11f 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java
@@ -32,7 +32,7 @@ import org.apache.activemq.artemis.core.journal.JournalLoadInformation;
import org.apache.activemq.artemis.core.journal.LoaderCallback;
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
import org.apache.activemq.artemis.core.journal.RecordInfo;
-import org.apache.activemq.artemis.core.journal.SequentialFileFactory;
+import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.journal.TransactionFailureCallback;
import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalAddRecord;
import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalAddRecordTX;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java
index d6a856a..1ba8f0b 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java
@@ -17,6 +17,7 @@
package org.apache.activemq.artemis.core.journal.impl;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.core.io.DummyCallback;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.journal.IOCompletion;
import org.apache.activemq.artemis.core.journal.Journal;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java
index 5b0a1b8..1f657a2 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java
@@ -28,8 +28,8 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.core.journal.RecordInfo;
-import org.apache.activemq.artemis.core.journal.SequentialFile;
-import org.apache.activemq.artemis.core.journal.SequentialFileFactory;
+import org.apache.activemq.artemis.core.io.SequentialFile;
+import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.journal.impl.dataformat.ByteArrayEncoding;
import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalAddRecord;
import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalAddRecordTX;
@@ -64,7 +64,7 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
final List<String> newFiles,
final List<Pair<String, String>> renameFile) throws Exception
{
- SequentialFile controlFile = fileFactory.createSequentialFile(AbstractJournalUpdateTask.FILE_COMPACT_CONTROL, 1);
+ SequentialFile controlFile = fileFactory.createSequentialFile(AbstractJournalUpdateTask.FILE_COMPACT_CONTROL);
if (controlFile.exists())
{
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFile.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFile.java
index e3b1624..dcfc1a2 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFile.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFile.java
@@ -16,7 +16,7 @@
*/
package org.apache.activemq.artemis.core.journal.impl;
-import org.apache.activemq.artemis.core.journal.SequentialFile;
+import org.apache.activemq.artemis.core.io.SequentialFile;
public interface JournalFile
{
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFileImpl.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFileImpl.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFileImpl.java
index 4438a96..7e96575 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFileImpl.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFileImpl.java
@@ -21,7 +21,7 @@ import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.activemq.artemis.core.journal.SequentialFile;
+import org.apache.activemq.artemis.core.io.SequentialFile;
public class JournalFileImpl implements JournalFile
{
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFilesRepository.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFilesRepository.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFilesRepository.java
index 052dc25..268a23d 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFilesRepository.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFilesRepository.java
@@ -31,8 +31,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-import org.apache.activemq.artemis.core.journal.SequentialFile;
-import org.apache.activemq.artemis.core.journal.SequentialFileFactory;
+import org.apache.activemq.artemis.core.io.SequentialFile;
+import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
/**
@@ -662,13 +662,13 @@ public class JournalFilesRepository
String tmpFileName = fileName + ".tmp";
- SequentialFile sequentialFile = fileFactory.createSequentialFile(tmpFileName, maxAIO);
+ SequentialFile sequentialFile = fileFactory.createSequentialFile(tmpFileName);
sequentialFile.open(1, false);
if (init)
{
- sequentialFile.fill(0, fileSize, JournalImpl.FILL_CHARACTER);
+ sequentialFile.fill(fileSize);
JournalImpl.initFileHeader(fileFactory, sequentialFile, userVersion, fileID);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
index 6f411eb..068e697 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
@@ -46,15 +46,15 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.Pair;
+import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
-import org.apache.activemq.artemis.core.journal.IOAsyncTask;
import org.apache.activemq.artemis.core.journal.IOCompletion;
import org.apache.activemq.artemis.core.journal.JournalLoadInformation;
import org.apache.activemq.artemis.core.journal.LoaderCallback;
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
import org.apache.activemq.artemis.core.journal.RecordInfo;
-import org.apache.activemq.artemis.core.journal.SequentialFile;
-import org.apache.activemq.artemis.core.journal.SequentialFileFactory;
+import org.apache.activemq.artemis.core.io.SequentialFile;
+import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.journal.TestableJournal;
import org.apache.activemq.artemis.core.journal.TransactionFailureCallback;
import org.apache.activemq.artemis.core.journal.impl.dataformat.ByteArrayEncoding;
@@ -293,7 +293,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
final CountDownLatch latch = new CountDownLatch(numIts * 2);
- class MyIOAsyncTask implements IOCompletion
+ class MyAIOCallback implements IOCompletion
{
public void done()
{
@@ -310,7 +310,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
}
}
- final MyIOAsyncTask task = new MyIOAsyncTask();
+ final MyAIOCallback task = new MyAIOCallback();
final int recordSize = 1024;
@@ -373,11 +373,11 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
for (String fileName : fileNames)
{
- SequentialFile file = fileFactory.createSequentialFile(fileName, filesRepository.getMaxAIO());
+ SequentialFile file = fileFactory.createSequentialFile(fileName);
if (file.size() >= SIZE_HEADER)
{
- file.open(1, false);
+ file.open();
try
{
@@ -2776,11 +2776,11 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
final boolean completeTransaction,
final boolean sync,
final JournalTransaction tx,
- final IOAsyncTask parameterCallback) throws Exception
+ final IOCallback parameterCallback) throws Exception
{
checkJournalIsLoaded();
- final IOAsyncTask callback;
+ final IOCallback callback;
final int size = encoder.getEncodeSize();
@@ -2896,7 +2896,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
{
for (String dataFile : dataFiles)
{
- SequentialFile file = fileFactory.createSequentialFile(dataFile, 1);
+ SequentialFile file = fileFactory.createSequentialFile(dataFile);
if (file.exists())
{
file.delete();
@@ -2905,7 +2905,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
for (String newFile : newFiles)
{
- SequentialFile file = fileFactory.createSequentialFile(newFile, 1);
+ SequentialFile file = fileFactory.createSequentialFile(newFile);
if (file.exists())
{
final String originalName = file.getFileName();
@@ -2916,8 +2916,8 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
for (Pair<String, String> rename : renames)
{
- SequentialFile fileTmp = fileFactory.createSequentialFile(rename.getA(), 1);
- SequentialFile fileTo = fileFactory.createSequentialFile(rename.getB(), 1);
+ SequentialFile fileTmp = fileFactory.createSequentialFile(rename.getA());
+ SequentialFile fileTo = fileFactory.createSequentialFile(rename.getB());
// We should do the rename only if the tmp file still exist, or else we could
// delete a valid file depending on where the crash occurred during the control file delete
if (fileTmp.exists())
@@ -2951,7 +2951,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
for (String fileToDelete : leftFiles)
{
ActiveMQJournalLogger.LOGGER.deletingOrphanedFile(fileToDelete);
- SequentialFile file = fileFactory.createSequentialFile(fileToDelete, 1);
+ SequentialFile file = fileFactory.createSequentialFile(fileToDelete);
file.delete();
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/NIOSequentialFile.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/NIOSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/NIOSequentialFile.java
deleted file mode 100644
index 0dac7f2..0000000
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/NIOSequentialFile.java
+++ /dev/null
@@ -1,404 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.journal.impl;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.activemq.artemis.api.core.ActiveMQException;
-import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
-import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
-import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
-import org.apache.activemq.artemis.core.journal.IOAsyncTask;
-import org.apache.activemq.artemis.core.journal.SequentialFile;
-import org.apache.activemq.artemis.core.journal.SequentialFileFactory;
-import org.apache.activemq.artemis.journal.ActiveMQJournalBundle;
-import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
-
-public final class NIOSequentialFile extends AbstractSequentialFile
-{
- private FileChannel channel;
-
- private RandomAccessFile rfile;
-
- /**
- * The write semaphore here is only used when writing asynchronously
- */
- private Semaphore maxIOSemaphore;
-
- private final int defaultMaxIO;
-
- private int maxIO;
-
- public NIOSequentialFile(final SequentialFileFactory factory,
- final File directory,
- final String file,
- final int maxIO,
- final Executor writerExecutor)
- {
- super(directory, file, factory, writerExecutor);
- defaultMaxIO = maxIO;
- }
-
- public int getAlignment()
- {
- return 1;
- }
-
- public int calculateBlockStart(final int position)
- {
- return position;
- }
-
- public synchronized boolean isOpen()
- {
- return channel != null;
- }
-
- /**
- * this.maxIO represents the default maxIO.
- * Some operations while initializing files on the journal may require a different maxIO
- */
- public synchronized void open() throws IOException
- {
- open(defaultMaxIO, true);
- }
-
- public void open(final int maxIO, final boolean useExecutor) throws IOException
- {
- try
- {
- rfile = new RandomAccessFile(getFile(), "rw");
-
- channel = rfile.getChannel();
-
- fileSize = channel.size();
- }
- catch (IOException e)
- {
- factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
- throw e;
- }
-
- if (writerExecutor != null && useExecutor)
- {
- maxIOSemaphore = new Semaphore(maxIO);
- this.maxIO = maxIO;
- }
- }
-
- public void fill(final int position, final int size, final byte fillCharacter) throws IOException
- {
- ByteBuffer bb = ByteBuffer.allocate(size);
-
- for (int i = 0; i < size; i++)
- {
- bb.put(fillCharacter);
- }
-
- bb.flip();
-
- try
- {
- channel.position(position);
- channel.write(bb);
- channel.force(false);
- channel.position(0);
- }
- catch (IOException e)
- {
- factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
- throw e;
- }
-
- fileSize = channel.size();
- }
-
- public synchronized void waitForClose() throws InterruptedException
- {
- while (isOpen())
- {
- wait();
- }
- }
-
- @Override
- public synchronized void close() throws IOException, InterruptedException, ActiveMQException
- {
- super.close();
-
- if (maxIOSemaphore != null)
- {
- while (!maxIOSemaphore.tryAcquire(maxIO, 60, TimeUnit.SECONDS))
- {
- ActiveMQJournalLogger.LOGGER.errorClosingFile(getFileName());
- }
- }
-
- maxIOSemaphore = null;
- try
- {
- if (channel != null)
- {
- channel.close();
- }
-
- if (rfile != null)
- {
- rfile.close();
- }
- }
- catch (IOException e)
- {
- factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
- throw e;
- }
- channel = null;
-
- rfile = null;
-
- notifyAll();
- }
-
- public int read(final ByteBuffer bytes) throws Exception
- {
- return read(bytes, null);
- }
-
- public synchronized int read(final ByteBuffer bytes, final IOAsyncTask callback) throws IOException,
- ActiveMQIllegalStateException
- {
- try
- {
- if (channel == null)
- {
- throw new ActiveMQIllegalStateException("File " + this.getFileName() + " has a null channel");
- }
- int bytesRead = channel.read(bytes);
-
- if (callback != null)
- {
- callback.done();
- }
-
- bytes.flip();
-
- return bytesRead;
- }
- catch (IOException e)
- {
- if (callback != null)
- {
- callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), e.getLocalizedMessage());
- }
-
- factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
-
- throw e;
- }
- }
-
- public void sync() throws IOException
- {
- if (channel != null)
- {
- try
- {
- channel.force(false);
- }
- catch (IOException e)
- {
- factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
- throw e;
- }
- }
- }
-
- public long size() throws IOException
- {
- if (channel == null)
- {
- return getFile().length();
- }
-
- try
- {
- return channel.size();
- }
- catch (IOException e)
- {
- factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
- throw e;
- }
- }
-
- @Override
- public void position(final long pos) throws IOException
- {
- try
- {
- super.position(pos);
- channel.position(pos);
- }
- catch (IOException e)
- {
- factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
- throw e;
- }
- }
-
- @Override
- public String toString()
- {
- return "NIOSequentialFile " + getFile();
- }
-
- public SequentialFile cloneFile()
- {
- return new NIOSequentialFile(factory, directory, getFileName(), maxIO, writerExecutor);
- }
-
- public void writeDirect(final ByteBuffer bytes, final boolean sync, final IOAsyncTask callback)
- {
- if (callback == null)
- {
- throw new NullPointerException("callback parameter need to be set");
- }
-
- try
- {
- internalWrite(bytes, sync, callback);
- }
- catch (Exception e)
- {
- callback.onError(ActiveMQExceptionType.GENERIC_EXCEPTION.getCode(), e.getMessage());
- }
- }
-
- public void writeDirect(final ByteBuffer bytes, final boolean sync) throws Exception
- {
- internalWrite(bytes, sync, null);
- }
-
- public void writeInternal(final ByteBuffer bytes) throws Exception
- {
- internalWrite(bytes, true, null);
- }
-
- @Override
- protected ByteBuffer newBuffer(int size, final int limit)
- {
- // For NIO, we don't need to allocate a buffer the entire size of the timed buffer, unlike AIO
-
- size = limit;
-
- return super.newBuffer(size, limit);
- }
-
- private void internalWrite(final ByteBuffer bytes, final boolean sync, final IOAsyncTask callback) throws IOException, ActiveMQIOErrorException, InterruptedException
- {
- if (!isOpen())
- {
- if (callback != null)
- {
- callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), "File not opened");
- }
- else
- {
- throw ActiveMQJournalBundle.BUNDLE.fileNotOpened();
- }
- return;
- }
-
- position.addAndGet(bytes.limit());
-
- if (maxIOSemaphore == null || callback == null)
- {
- // if maxIOSemaphore == null, that means we are not using executors and the writes are synchronous
- try
- {
- doInternalWrite(bytes, sync, callback);
- }
- catch (IOException e)
- {
- factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
- }
- }
- else
- {
- // This is a flow control on writing, just like maxAIO on libaio
- maxIOSemaphore.acquire();
-
- writerExecutor.execute(new Runnable()
- {
- public void run()
- {
- try
- {
- try
- {
- doInternalWrite(bytes, sync, callback);
- }
- catch (IOException e)
- {
- ActiveMQJournalLogger.LOGGER.errorSubmittingWrite(e);
- factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), NIOSequentialFile.this);
- callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), e.getMessage());
- }
- catch (Throwable e)
- {
- ActiveMQJournalLogger.LOGGER.errorSubmittingWrite(e);
- callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), e.getMessage());
- }
- }
- finally
- {
- maxIOSemaphore.release();
- }
- }
- });
- }
- }
-
- /**
- * @param bytes
- * @param sync
- * @param callback
- * @throws IOException
- * @throws Exception
- */
- private void doInternalWrite(final ByteBuffer bytes, final boolean sync, final IOAsyncTask callback) throws IOException
- {
- channel.write(bytes);
-
- if (sync)
- {
- sync();
- }
-
- if (callback != null)
- {
- callback.done();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/NIOSequentialFileFactory.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/NIOSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/NIOSequentialFileFactory.java
deleted file mode 100644
index e471928..0000000
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/NIOSequentialFileFactory.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.journal.impl;
-
-import java.io.File;
-import java.lang.ref.WeakReference;
-import java.nio.ByteBuffer;
-
-import org.apache.activemq.artemis.core.journal.IOCriticalErrorListener;
-import org.apache.activemq.artemis.core.journal.SequentialFile;
-
-public class NIOSequentialFileFactory extends AbstractSequentialFileFactory
-{
- public NIOSequentialFileFactory(final File journalDir)
- {
- this(journalDir, null);
- }
-
- public NIOSequentialFileFactory(final File journalDir, final IOCriticalErrorListener listener)
- {
- this(journalDir,
- false,
- JournalConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO,
- JournalConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO,
- false,
- listener);
- }
-
- public NIOSequentialFileFactory(final File journalDir, final boolean buffered)
- {
- this(journalDir, buffered, null);
- }
-
- public NIOSequentialFileFactory(final File journalDir,
- final boolean buffered,
- final IOCriticalErrorListener listener)
- {
- this(journalDir,
- buffered,
- JournalConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO,
- JournalConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO,
- false,
- listener);
- }
-
- public NIOSequentialFileFactory(final File journalDir,
- final boolean buffered,
- final int bufferSize,
- final int bufferTimeout,
- final boolean logRates)
- {
- this(journalDir, buffered, bufferSize, bufferTimeout, logRates, null);
- }
-
- public NIOSequentialFileFactory(final File journalDir,
- final boolean buffered,
- final int bufferSize,
- final int bufferTimeout,
- final boolean logRates,
- final IOCriticalErrorListener listener)
- {
- super(journalDir, buffered, bufferSize, bufferTimeout, logRates, listener);
- }
-
- public SequentialFile createSequentialFile(final String fileName, int maxIO)
- {
- if (maxIO < 1)
- {
- // A single threaded IO
- maxIO = 1;
- }
-
- return new NIOSequentialFile(this, journalDir, fileName, maxIO, writeExecutor);
- }
-
- public boolean isSupportsCallbacks()
- {
- return timedBuffer != null;
- }
-
-
- public ByteBuffer allocateDirectBuffer(final int size)
- {
- // Using direct buffer, as described on https://jira.jboss.org/browse/HORNETQ-467
- ByteBuffer buffer2 = null;
- try
- {
- buffer2 = ByteBuffer.allocateDirect(size);
- }
- catch (OutOfMemoryError error)
- {
- // This is a workaround for the way the JDK will deal with native buffers.
- // the main portion is outside of the VM heap
- // and the JDK will not have any reference about it to take GC into account
- // so we force a GC and try again.
- WeakReference<Object> obj = new WeakReference<Object>(new Object());
- try
- {
- long timeout = System.currentTimeMillis() + 5000;
- while (System.currentTimeMillis() > timeout && obj.get() != null)
- {
- System.gc();
- Thread.sleep(100);
- }
- }
- catch (InterruptedException e)
- {
- }
-
- buffer2 = ByteBuffer.allocateDirect(size);
-
- }
- return buffer2;
- }
-
- public void releaseDirectBuffer(ByteBuffer buffer)
- {
- // nothing we can do on this case. we can just have good faith on GC
- }
-
- public ByteBuffer newBuffer(final int size)
- {
- return ByteBuffer.allocate(size);
- }
-
- public void clearBuffer(final ByteBuffer buffer)
- {
- final int limit = buffer.limit();
- buffer.rewind();
-
- for (int i = 0; i < limit; i++)
- {
- buffer.put((byte)0);
- }
-
- buffer.rewind();
- }
-
- public ByteBuffer wrapBuffer(final byte[] bytes)
- {
- return ByteBuffer.wrap(bytes);
- }
-
- public int getAlignment()
- {
- return 1;
- }
-
- public int calculateBlockSize(final int bytes)
- {
- return bytes;
- }
-
-}